421 lines
No EOL
14 KiB
Python
421 lines
No EOL
14 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
"""
|
|
Crea un MBTiles Terrain-RGB (PNG) mosaicato da tutti i TIF in una directory.
|
|
|
|
- Mosaico per-tile (FirstMethod) -> niente buchi ai bordi
|
|
- Resampling 'nearest' -> preserva i byte RGB (quote intatte)
|
|
- Buffer di tile (tile_buffer) per chiudere il "seam" agli zoom bassi
|
|
- mask=None NON considerata "vuota" (si salta solo mask presente e tutta 0)
|
|
- Scrittura MBTiles standard (tile_row in TMS)
|
|
|
|
Requisiti:
|
|
pip install rio-tiler mercantile numpy pillow rasterio
|
|
"""
|
|
|
|
import os
|
|
import math
|
|
import glob
|
|
import argparse
|
|
import sqlite3
|
|
from typing import List, Tuple, Optional, Any
|
|
|
|
import mercantile
|
|
import numpy as np
|
|
from rio_tiler.io import COGReader
|
|
from rio_tiler.mosaic import mosaic_reader
|
|
from rio_tiler.mosaic.methods import FirstMethod
|
|
from rio_tiler.utils import render
|
|
|
|
# Compatibilit con rio-tiler (ImageData)
|
|
try:
|
|
from rio_tiler.models import ImageData
|
|
except Exception:
|
|
ImageData = None # type: ignore
|
|
|
|
# ==============================
|
|
# Parametri di default
|
|
# ==============================
|
|
|
|
DEFAULT_MIN_ZOOM = 5
|
|
DEFAULT_MAX_ZOOM = 14
|
|
DEFAULT_TILESIZE = 256
|
|
DEFAULT_THREADS = 4
|
|
DEFAULT_RESAMPLING = "nearest"
|
|
DEFAULT_SKIP_EMPTY = False # di default NON saltiamo tile
|
|
DEFAULT_BUFFER = 0 # buffer in pixel; auto-buffer se 0
|
|
DEFAULT_BBOX_EPS = 0.001 # espansione bbox in gradi (100 m)
|
|
|
|
# BBOX Italia (WGS84, lon/lat) approx
|
|
DEFAULT_ITALY_BBOX = (6.6, 36.6, 18.8, 47.3) # (W, S, E, N)
|
|
|
|
# ==============================
|
|
# Utility
|
|
# ==============================
|
|
|
|
def parse_bbox(bbox_str: Optional[str]) -> Optional[Tuple[float, float, float, float]]:
|
|
if not bbox_str:
|
|
return None
|
|
parts = bbox_str.split(",")
|
|
if len(parts) != 4:
|
|
raise ValueError("La bbox deve essere nel formato: minx,miny,maxx,maxy")
|
|
return tuple(map(float, parts)) # type: ignore
|
|
|
|
def list_rasters_in_dir(input_dir: str) -> List[str]:
|
|
patterns = ["*.tif", "*.tiff", "*.TIF", "*.TIFF"]
|
|
files = []
|
|
for p in patterns:
|
|
files.extend(glob.glob(os.path.join(input_dir, p)))
|
|
return sorted(files)
|
|
|
|
def get_union_bounds(assets: List[str]) -> Tuple[float, float, float, float]:
|
|
"""Unisce i bounds (lon/lat WGS84) dei COG/GeoTIFF. Aggiunge un piccolo epsilon."""
|
|
minx = miny = math.inf
|
|
maxx = maxy = -math.inf
|
|
for a in assets:
|
|
with COGReader(a) as cog:
|
|
b = cog.bounds # (minx, miny, maxx, maxy) in EPSG:4326
|
|
minx = min(minx, b[0]); miny = min(miny, b[1])
|
|
maxx = max(maxx, b[2]); maxy = max(maxy, b[3])
|
|
eps = 1e-10
|
|
return (minx - eps, miny - eps, maxx + eps, maxy + eps)
|
|
|
|
def expand_bbox(b: Tuple[float,float,float,float], eps: float) -> Tuple[float,float,float,float]:
|
|
"""Espande la bbox di un epsilon in tutte le direzioni."""
|
|
return (b[0]-eps, b[1]-eps, b[2]+eps, b[3]+eps)
|
|
|
|
def intersect_bbox(a: Tuple[float, float, float, float],
|
|
b: Tuple[float, float, float, float]) -> Optional[Tuple[float, float, float, float]]:
|
|
"""Intersezione di due bbox. Ritorna None se vuota."""
|
|
minx = max(a[0], b[0]); miny = max(a[1], b[1])
|
|
maxx = min(a[2], b[2]); maxy = min(a[3], b[3])
|
|
if minx >= maxx or miny >= maxy:
|
|
return None
|
|
return (minx, miny, maxx, maxy)
|
|
|
|
def reader(asset: str, x: int, y: int, z: int, **kwargs):
|
|
"""Reader per mosaic_reader: restituisce un ImageData (o data/mask)."""
|
|
with COGReader(asset) as cog:
|
|
return cog.tile(x, y, z, **kwargs)
|
|
|
|
def xyz_to_tms_y(z: int, y: int) -> int:
|
|
"""Converte Y in TMS per MBTiles (capovolto)."""
|
|
return (2**z - 1) - y
|
|
|
|
def init_mbtiles(path: str, metadata: dict, fast: bool = True) -> sqlite3.Connection:
|
|
"""Crea un MBTiles vuoto con metadata. Se esiste, lo sovrascrive."""
|
|
if os.path.exists(path):
|
|
os.remove(path)
|
|
con = sqlite3.connect(path)
|
|
cur = con.cursor()
|
|
|
|
if fast:
|
|
cur.execute("PRAGMA synchronous=OFF;")
|
|
cur.execute("PRAGMA journal_mode=MEMORY;")
|
|
cur.execute("PRAGMA temp_store=MEMORY;")
|
|
|
|
cur.executescript("""
|
|
CREATE TABLE metadata (name TEXT, value TEXT);
|
|
CREATE TABLE tiles (
|
|
zoom_level INTEGER,
|
|
tile_column INTEGER,
|
|
tile_row INTEGER,
|
|
tile_data BLOB
|
|
);
|
|
CREATE UNIQUE INDEX tile_index ON tiles (zoom_level, tile_column, tile_row);
|
|
""")
|
|
|
|
cur.executemany(
|
|
"INSERT INTO metadata (name, value) VALUES (?, ?)",
|
|
[(k, str(v)) for k, v in metadata.items()]
|
|
)
|
|
con.commit()
|
|
return con
|
|
|
|
def normalize_mask(mask: Any) -> Optional[np.ndarray]:
|
|
"""
|
|
Rende la mask un array 2D uint8 (0/255).
|
|
- Accetta: None, bool/uint8 array, o list/tuple contenenti array numerici.
|
|
- Ignora elementi non-numerici (es. liste di stringhe 'assets_used').
|
|
"""
|
|
if mask is None:
|
|
return None
|
|
|
|
def to_uint8_2d(arr: np.ndarray) -> Optional[np.ndarray]:
|
|
# 3D con 1 banda -> squeeze
|
|
if arr.ndim == 3 and arr.shape[0] == 1:
|
|
arr = arr[0]
|
|
elif arr.ndim == 3:
|
|
arr = arr.squeeze()
|
|
if arr.ndim != 2:
|
|
return None
|
|
# Porta a uint8 0/255
|
|
if arr.dtype == np.uint8:
|
|
return arr
|
|
if arr.dtype == np.bool_:
|
|
return arr.astype(np.uint8) * 255
|
|
if np.issubdtype(arr.dtype, np.number):
|
|
return (arr > 0).astype(np.uint8) * 255
|
|
return None
|
|
|
|
# Lista/Tupla -> OR per-pixel degli elementi numerici
|
|
if isinstance(mask, (list, tuple)):
|
|
masks: List[np.ndarray] = []
|
|
for m in mask:
|
|
if m is None:
|
|
continue
|
|
arr = np.asarray(m)
|
|
if (not np.issubdtype(arr.dtype, np.number)) and (arr.dtype != np.bool_):
|
|
continue
|
|
conv = to_uint8_2d(arr)
|
|
if conv is not None:
|
|
masks.append(conv)
|
|
if not masks:
|
|
return None
|
|
# allinea shapes alla minima comune
|
|
h = min(m.shape[0] for m in masks)
|
|
w = min(m.shape[1] for m in masks)
|
|
if any((m.shape[0] != h or m.shape[1] != w) for m in masks):
|
|
masks = [m[:h, :w] for m in masks]
|
|
return np.maximum.reduce(masks)
|
|
|
|
# Caso singolo array
|
|
arr = np.asarray(mask)
|
|
if (not np.issubdtype(arr.dtype, np.number)) and (arr.dtype != np.bool_):
|
|
return None
|
|
return to_uint8_2d(arr)
|
|
|
|
def split_mosaic_output(out: Any) -> Tuple[np.ndarray, Optional[np.ndarray]]:
|
|
"""
|
|
Estrae (data, mask) dall'output di mosaic_reader a prova di versione:
|
|
- ImageData (rio_tiler >= 5/6): usa .data e .mask
|
|
- Tuple: (ImageData, assets_used) o (data, mask) o (data, assets_used, mask)
|
|
- Dict con 'data'/'mask'
|
|
"""
|
|
if ImageData is not None and isinstance(out, ImageData):
|
|
return out.data, out.mask
|
|
|
|
if isinstance(out, dict) and "data" in out:
|
|
return out["data"], out.get("mask")
|
|
|
|
if isinstance(out, (tuple, list)):
|
|
# (ImageData, assets_used)
|
|
if len(out) >= 1 and (ImageData is not None) and isinstance(out[0], ImageData):
|
|
img = out[0]
|
|
return img.data, img.mask
|
|
|
|
arrays = [o for o in out if isinstance(o, np.ndarray)]
|
|
data = None
|
|
mask = None
|
|
for a in arrays:
|
|
if a.ndim == 3:
|
|
data = a
|
|
break
|
|
for a in arrays:
|
|
if a.ndim == 2:
|
|
mask = a
|
|
break
|
|
if mask is None:
|
|
for a in arrays:
|
|
if a.ndim == 3 and a.shape[0] == 1:
|
|
mask = a[0]
|
|
break
|
|
if data is None and arrays:
|
|
data = arrays[0]
|
|
if data is None:
|
|
raise RuntimeError("Impossibile determinare 'data' dalla risposta di mosaic_reader.")
|
|
return data, mask
|
|
|
|
# fallback
|
|
try:
|
|
data, mask = out # type: ignore
|
|
return data, mask
|
|
except Exception:
|
|
raise RuntimeError("Impossibile determinare 'data'/'mask' dalla risposta di mosaic_reader (tipo sconosciuto).")
|
|
|
|
def safe_mosaic_reader(
|
|
assets: List[str],
|
|
x: int, y: int, z: int,
|
|
tilesize: int,
|
|
resampling: str,
|
|
threads: int,
|
|
tile_buffer: int
|
|
) -> Any:
|
|
"""
|
|
Chiama mosaic_reader provando firme diverse (compatibilit tra versioni):
|
|
1) mosaic_assets=..., tile_buffer=...
|
|
2) mosaic_assets=...
|
|
3) assets=..., tile_buffer=...
|
|
4) assets=...
|
|
"""
|
|
base = dict(
|
|
reader=reader,
|
|
x=x, y=y, z=z,
|
|
pixel_selection=FirstMethod(),
|
|
tilesize=tilesize,
|
|
resampling_method=resampling,
|
|
threads=threads,
|
|
)
|
|
# 1) mosaic_assets + tile_buffer
|
|
try:
|
|
return mosaic_reader(mosaic_assets=assets, **base, tile_buffer=tile_buffer)
|
|
except TypeError:
|
|
pass
|
|
# 2) mosaic_assets senza tile_buffer
|
|
try:
|
|
return mosaic_reader(mosaic_assets=assets, **base)
|
|
except TypeError:
|
|
pass
|
|
# 3) assets + tile_buffer
|
|
try:
|
|
return mosaic_reader(assets=assets, **base, tile_buffer=tile_buffer)
|
|
except TypeError:
|
|
pass
|
|
# 4) assets senza tile_buffer
|
|
return mosaic_reader(assets=assets, **base)
|
|
|
|
# ==============================
|
|
# Core
|
|
# ==============================
|
|
|
|
def generate_mbtiles_from_dir(
|
|
input_dir: str,
|
|
out_mbtiles: str,
|
|
minzoom: int = DEFAULT_MIN_ZOOM,
|
|
maxzoom: int = DEFAULT_MAX_ZOOM,
|
|
tilesize: int = DEFAULT_TILESIZE,
|
|
resampling: str = DEFAULT_RESAMPLING,
|
|
threads: int = DEFAULT_THREADS,
|
|
skip_empty: bool = DEFAULT_SKIP_EMPTY,
|
|
bbox: Optional[Tuple[float, float, float, float]] = None,
|
|
bbox_eps: float = DEFAULT_BBOX_EPS,
|
|
buffer: int = DEFAULT_BUFFER,
|
|
):
|
|
assets = list_rasters_in_dir(input_dir)
|
|
if not assets:
|
|
raise RuntimeError(f"Nessun file .tif trovato in: {input_dir}")
|
|
|
|
# AOI: usa bbox se fornita (espansa), altrimenti union_bounds (espansa)
|
|
if bbox is None:
|
|
aoi = expand_bbox(get_union_bounds(assets), bbox_eps)
|
|
else:
|
|
aoi = expand_bbox(bbox, bbox_eps)
|
|
|
|
# Metadati MBTiles
|
|
center_lon = (aoi[0] + aoi[2]) / 2.0
|
|
center_lat = (aoi[1] + aoi[3]) / 2.0
|
|
metadata = {
|
|
"name": f"Terrain-RGB z{minzoom}-{maxzoom}",
|
|
"type": "raster",
|
|
"format": "png",
|
|
"minzoom": minzoom,
|
|
"maxzoom": maxzoom,
|
|
"bounds": f"{aoi[0]},{aoi[1]},{aoi[2]},{aoi[3]}",
|
|
"center": f"{center_lon:.6f},{center_lat:.6f},{minzoom}",
|
|
}
|
|
|
|
print(f"-> Trovati {len(assets)} raster in: {input_dir}")
|
|
print(f"-> AOI usata (WGS84, espansa di {bbox_eps}): {aoi}")
|
|
print(f"-> Zoom: {minzoom}..{maxzoom}, tilesize={tilesize}, resampling={resampling}, threads={threads}, buffer={buffer} (auto<=11)")
|
|
print(f"-> Output: {out_mbtiles}")
|
|
|
|
con = init_mbtiles(out_mbtiles, metadata, fast=True)
|
|
cur = con.cursor()
|
|
|
|
try:
|
|
for z in range(minzoom, maxzoom + 1):
|
|
# Auto-buffer: se non specificato, usa 2 px per zoom <= 11
|
|
z_buffer = buffer
|
|
if z_buffer <= 0 and z <= 11:
|
|
z_buffer = 2
|
|
|
|
print(f"\n== Zoom {z} (tile_buffer={z_buffer}) ==")
|
|
tiles = list(mercantile.tiles(aoi[0], aoi[1], aoi[2], aoi[3], z))
|
|
total = len(tiles)
|
|
for idx, t in enumerate(tiles, 1):
|
|
out = safe_mosaic_reader(
|
|
assets=assets,
|
|
x=t.x, y=t.y, z=z,
|
|
tilesize=tilesize,
|
|
resampling=resampling,
|
|
threads=threads,
|
|
tile_buffer=z_buffer,
|
|
)
|
|
|
|
data, raw_mask = split_mosaic_output(out)
|
|
mask = normalize_mask(raw_mask)
|
|
|
|
# NON considerare mai mask=None come vuota; salta solo se mask presente e tutta 0
|
|
if skip_empty and (mask is not None) and (mask.max() == 0):
|
|
continue
|
|
|
|
png = render(data, mask=mask, img_format="PNG")
|
|
tms_y = xyz_to_tms_y(z, t.y)
|
|
|
|
cur.execute(
|
|
"INSERT OR REPLACE INTO tiles (zoom_level, tile_column, tile_row, tile_data) VALUES (?,?,?,?)",
|
|
(z, t.x, tms_y, sqlite3.Binary(png))
|
|
)
|
|
|
|
if idx % 500 == 0 or idx == total:
|
|
print(f" {idx}/{total} tile...")
|
|
|
|
con.commit() # commit per livello
|
|
|
|
except KeyboardInterrupt:
|
|
print("\nInterrotto dall'utente. Scrittura parziale salvata.")
|
|
finally:
|
|
try:
|
|
cur.execute("ANALYZE;")
|
|
cur.execute("VACUUM;")
|
|
con.commit()
|
|
except Exception:
|
|
pass
|
|
con.close()
|
|
|
|
print(f"\nCompletato: {out_mbtiles}")
|
|
|
|
# ==============================
|
|
# CLI
|
|
# ==============================
|
|
|
|
def main():
|
|
ap = argparse.ArgumentParser(
|
|
description="Crea un MBTiles Terrain-RGB (PNG) mosaicato da tutti i TIF in una directory (z=5..14 di default)."
|
|
)
|
|
ap.add_argument("input_dir", help="Directory con i TIF/TIFF RGB (COG o GeoTIFF).")
|
|
ap.add_argument("out_mbtiles", help="Percorso file .mbtiles da creare.")
|
|
ap.add_argument("--min-zoom", type=int, default=DEFAULT_MIN_ZOOM)
|
|
ap.add_argument("--max-zoom", type=int, default=DEFAULT_MAX_ZOOM)
|
|
ap.add_argument("--tilesize", type=int, default=DEFAULT_TILESIZE, choices=[256, 512])
|
|
ap.add_argument("--threads", type=int, default=DEFAULT_THREADS)
|
|
ap.add_argument("--resampling", default=DEFAULT_RESAMPLING, choices=["nearest"])
|
|
ap.add_argument("--skip-empty", action="store_true", default=DEFAULT_SKIP_EMPTY,
|
|
help="Se impostato, non scrive tile completamente vuote (mask=0).")
|
|
ap.add_argument("--bbox", type=str, default=None,
|
|
help="BBox WGS84 minx,miny,maxx,maxy (se non indicata, usa i bounds dei dati).")
|
|
ap.add_argument("--bbox-eps", type=float, default=DEFAULT_BBOX_EPS,
|
|
help="Espansione della bbox in gradi per includere tile al bordo (default 0.001).")
|
|
ap.add_argument("--buffer", type=int, default=DEFAULT_BUFFER,
|
|
help="Tile buffer in pixel (consigliato 1-2 per zoom bassi). Se 0, usa auto-buffer=2 per z<=11.")
|
|
args = ap.parse_args()
|
|
|
|
bbox = parse_bbox(args.bbox) if args.bbox else None
|
|
|
|
generate_mbtiles_from_dir(
|
|
input_dir=args.input_dir,
|
|
out_mbtiles=args.out_mbtiles,
|
|
minzoom=args.min_zoom,
|
|
maxzoom=args.max_zoom,
|
|
tilesize=args.tilesize,
|
|
resampling=args.resampling,
|
|
threads=args.threads,
|
|
skip_empty=args.skip_empty,
|
|
bbox=bbox,
|
|
bbox_eps=args.bbox_eps,
|
|
buffer=args.buffer,
|
|
)
|
|
|
|
if __name__ == "__main__":
|
|
main() |