terrain2/dir_to_terrainrgb_mbtiles1.py

421 lines
No EOL
14 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Crea un MBTiles Terrain-RGB (PNG) mosaicato da tutti i TIF in una directory.
- Mosaico per-tile (FirstMethod) -> niente buchi ai bordi
- Resampling 'nearest' -> preserva i byte RGB (quote intatte)
- Buffer di tile (tile_buffer) per chiudere il "seam" agli zoom bassi
- mask=None NON considerata "vuota" (si salta solo mask presente e tutta 0)
- Scrittura MBTiles standard (tile_row in TMS)
Requisiti:
pip install rio-tiler mercantile numpy pillow rasterio
"""
import os
import math
import glob
import argparse
import sqlite3
from typing import List, Tuple, Optional, Any
import mercantile
import numpy as np
from rio_tiler.io import COGReader
from rio_tiler.mosaic import mosaic_reader
from rio_tiler.mosaic.methods import FirstMethod
from rio_tiler.utils import render
# Compatibilit con rio-tiler (ImageData)
try:
from rio_tiler.models import ImageData
except Exception:
ImageData = None # type: ignore
# ==============================
# Parametri di default
# ==============================
DEFAULT_MIN_ZOOM = 5
DEFAULT_MAX_ZOOM = 14
DEFAULT_TILESIZE = 256
DEFAULT_THREADS = 4
DEFAULT_RESAMPLING = "nearest"
DEFAULT_SKIP_EMPTY = False # di default NON saltiamo tile
DEFAULT_BUFFER = 0 # buffer in pixel; auto-buffer se 0
DEFAULT_BBOX_EPS = 0.001 # espansione bbox in gradi (100 m)
# BBOX Italia (WGS84, lon/lat) approx
DEFAULT_ITALY_BBOX = (6.6, 36.6, 18.8, 47.3) # (W, S, E, N)
# ==============================
# Utility
# ==============================
def parse_bbox(bbox_str: Optional[str]) -> Optional[Tuple[float, float, float, float]]:
if not bbox_str:
return None
parts = bbox_str.split(",")
if len(parts) != 4:
raise ValueError("La bbox deve essere nel formato: minx,miny,maxx,maxy")
return tuple(map(float, parts)) # type: ignore
def list_rasters_in_dir(input_dir: str) -> List[str]:
patterns = ["*.tif", "*.tiff", "*.TIF", "*.TIFF"]
files = []
for p in patterns:
files.extend(glob.glob(os.path.join(input_dir, p)))
return sorted(files)
def get_union_bounds(assets: List[str]) -> Tuple[float, float, float, float]:
"""Unisce i bounds (lon/lat WGS84) dei COG/GeoTIFF. Aggiunge un piccolo epsilon."""
minx = miny = math.inf
maxx = maxy = -math.inf
for a in assets:
with COGReader(a) as cog:
b = cog.bounds # (minx, miny, maxx, maxy) in EPSG:4326
minx = min(minx, b[0]); miny = min(miny, b[1])
maxx = max(maxx, b[2]); maxy = max(maxy, b[3])
eps = 1e-10
return (minx - eps, miny - eps, maxx + eps, maxy + eps)
def expand_bbox(b: Tuple[float,float,float,float], eps: float) -> Tuple[float,float,float,float]:
"""Espande la bbox di un epsilon in tutte le direzioni."""
return (b[0]-eps, b[1]-eps, b[2]+eps, b[3]+eps)
def intersect_bbox(a: Tuple[float, float, float, float],
b: Tuple[float, float, float, float]) -> Optional[Tuple[float, float, float, float]]:
"""Intersezione di due bbox. Ritorna None se vuota."""
minx = max(a[0], b[0]); miny = max(a[1], b[1])
maxx = min(a[2], b[2]); maxy = min(a[3], b[3])
if minx >= maxx or miny >= maxy:
return None
return (minx, miny, maxx, maxy)
def reader(asset: str, x: int, y: int, z: int, **kwargs):
"""Reader per mosaic_reader: restituisce un ImageData (o data/mask)."""
with COGReader(asset) as cog:
return cog.tile(x, y, z, **kwargs)
def xyz_to_tms_y(z: int, y: int) -> int:
"""Converte Y in TMS per MBTiles (capovolto)."""
return (2**z - 1) - y
def init_mbtiles(path: str, metadata: dict, fast: bool = True) -> sqlite3.Connection:
"""Crea un MBTiles vuoto con metadata. Se esiste, lo sovrascrive."""
if os.path.exists(path):
os.remove(path)
con = sqlite3.connect(path)
cur = con.cursor()
if fast:
cur.execute("PRAGMA synchronous=OFF;")
cur.execute("PRAGMA journal_mode=MEMORY;")
cur.execute("PRAGMA temp_store=MEMORY;")
cur.executescript("""
CREATE TABLE metadata (name TEXT, value TEXT);
CREATE TABLE tiles (
zoom_level INTEGER,
tile_column INTEGER,
tile_row INTEGER,
tile_data BLOB
);
CREATE UNIQUE INDEX tile_index ON tiles (zoom_level, tile_column, tile_row);
""")
cur.executemany(
"INSERT INTO metadata (name, value) VALUES (?, ?)",
[(k, str(v)) for k, v in metadata.items()]
)
con.commit()
return con
def normalize_mask(mask: Any) -> Optional[np.ndarray]:
"""
Rende la mask un array 2D uint8 (0/255).
- Accetta: None, bool/uint8 array, o list/tuple contenenti array numerici.
- Ignora elementi non-numerici (es. liste di stringhe 'assets_used').
"""
if mask is None:
return None
def to_uint8_2d(arr: np.ndarray) -> Optional[np.ndarray]:
# 3D con 1 banda -> squeeze
if arr.ndim == 3 and arr.shape[0] == 1:
arr = arr[0]
elif arr.ndim == 3:
arr = arr.squeeze()
if arr.ndim != 2:
return None
# Porta a uint8 0/255
if arr.dtype == np.uint8:
return arr
if arr.dtype == np.bool_:
return arr.astype(np.uint8) * 255
if np.issubdtype(arr.dtype, np.number):
return (arr > 0).astype(np.uint8) * 255
return None
# Lista/Tupla -> OR per-pixel degli elementi numerici
if isinstance(mask, (list, tuple)):
masks: List[np.ndarray] = []
for m in mask:
if m is None:
continue
arr = np.asarray(m)
if (not np.issubdtype(arr.dtype, np.number)) and (arr.dtype != np.bool_):
continue
conv = to_uint8_2d(arr)
if conv is not None:
masks.append(conv)
if not masks:
return None
# allinea shapes alla minima comune
h = min(m.shape[0] for m in masks)
w = min(m.shape[1] for m in masks)
if any((m.shape[0] != h or m.shape[1] != w) for m in masks):
masks = [m[:h, :w] for m in masks]
return np.maximum.reduce(masks)
# Caso singolo array
arr = np.asarray(mask)
if (not np.issubdtype(arr.dtype, np.number)) and (arr.dtype != np.bool_):
return None
return to_uint8_2d(arr)
def split_mosaic_output(out: Any) -> Tuple[np.ndarray, Optional[np.ndarray]]:
"""
Estrae (data, mask) dall'output di mosaic_reader a prova di versione:
- ImageData (rio_tiler >= 5/6): usa .data e .mask
- Tuple: (ImageData, assets_used) o (data, mask) o (data, assets_used, mask)
- Dict con 'data'/'mask'
"""
if ImageData is not None and isinstance(out, ImageData):
return out.data, out.mask
if isinstance(out, dict) and "data" in out:
return out["data"], out.get("mask")
if isinstance(out, (tuple, list)):
# (ImageData, assets_used)
if len(out) >= 1 and (ImageData is not None) and isinstance(out[0], ImageData):
img = out[0]
return img.data, img.mask
arrays = [o for o in out if isinstance(o, np.ndarray)]
data = None
mask = None
for a in arrays:
if a.ndim == 3:
data = a
break
for a in arrays:
if a.ndim == 2:
mask = a
break
if mask is None:
for a in arrays:
if a.ndim == 3 and a.shape[0] == 1:
mask = a[0]
break
if data is None and arrays:
data = arrays[0]
if data is None:
raise RuntimeError("Impossibile determinare 'data' dalla risposta di mosaic_reader.")
return data, mask
# fallback
try:
data, mask = out # type: ignore
return data, mask
except Exception:
raise RuntimeError("Impossibile determinare 'data'/'mask' dalla risposta di mosaic_reader (tipo sconosciuto).")
def safe_mosaic_reader(
assets: List[str],
x: int, y: int, z: int,
tilesize: int,
resampling: str,
threads: int,
tile_buffer: int
) -> Any:
"""
Chiama mosaic_reader provando firme diverse (compatibilit tra versioni):
1) mosaic_assets=..., tile_buffer=...
2) mosaic_assets=...
3) assets=..., tile_buffer=...
4) assets=...
"""
base = dict(
reader=reader,
x=x, y=y, z=z,
pixel_selection=FirstMethod(),
tilesize=tilesize,
resampling_method=resampling,
threads=threads,
)
# 1) mosaic_assets + tile_buffer
try:
return mosaic_reader(mosaic_assets=assets, **base, tile_buffer=tile_buffer)
except TypeError:
pass
# 2) mosaic_assets senza tile_buffer
try:
return mosaic_reader(mosaic_assets=assets, **base)
except TypeError:
pass
# 3) assets + tile_buffer
try:
return mosaic_reader(assets=assets, **base, tile_buffer=tile_buffer)
except TypeError:
pass
# 4) assets senza tile_buffer
return mosaic_reader(assets=assets, **base)
# ==============================
# Core
# ==============================
def generate_mbtiles_from_dir(
input_dir: str,
out_mbtiles: str,
minzoom: int = DEFAULT_MIN_ZOOM,
maxzoom: int = DEFAULT_MAX_ZOOM,
tilesize: int = DEFAULT_TILESIZE,
resampling: str = DEFAULT_RESAMPLING,
threads: int = DEFAULT_THREADS,
skip_empty: bool = DEFAULT_SKIP_EMPTY,
bbox: Optional[Tuple[float, float, float, float]] = None,
bbox_eps: float = DEFAULT_BBOX_EPS,
buffer: int = DEFAULT_BUFFER,
):
assets = list_rasters_in_dir(input_dir)
if not assets:
raise RuntimeError(f"Nessun file .tif trovato in: {input_dir}")
# AOI: usa bbox se fornita (espansa), altrimenti union_bounds (espansa)
if bbox is None:
aoi = expand_bbox(get_union_bounds(assets), bbox_eps)
else:
aoi = expand_bbox(bbox, bbox_eps)
# Metadati MBTiles
center_lon = (aoi[0] + aoi[2]) / 2.0
center_lat = (aoi[1] + aoi[3]) / 2.0
metadata = {
"name": f"Terrain-RGB z{minzoom}-{maxzoom}",
"type": "raster",
"format": "png",
"minzoom": minzoom,
"maxzoom": maxzoom,
"bounds": f"{aoi[0]},{aoi[1]},{aoi[2]},{aoi[3]}",
"center": f"{center_lon:.6f},{center_lat:.6f},{minzoom}",
}
print(f"-> Trovati {len(assets)} raster in: {input_dir}")
print(f"-> AOI usata (WGS84, espansa di {bbox_eps}): {aoi}")
print(f"-> Zoom: {minzoom}..{maxzoom}, tilesize={tilesize}, resampling={resampling}, threads={threads}, buffer={buffer} (auto<=11)")
print(f"-> Output: {out_mbtiles}")
con = init_mbtiles(out_mbtiles, metadata, fast=True)
cur = con.cursor()
try:
for z in range(minzoom, maxzoom + 1):
# Auto-buffer: se non specificato, usa 2 px per zoom <= 11
z_buffer = buffer
if z_buffer <= 0 and z <= 11:
z_buffer = 2
print(f"\n== Zoom {z} (tile_buffer={z_buffer}) ==")
tiles = list(mercantile.tiles(aoi[0], aoi[1], aoi[2], aoi[3], z))
total = len(tiles)
for idx, t in enumerate(tiles, 1):
out = safe_mosaic_reader(
assets=assets,
x=t.x, y=t.y, z=z,
tilesize=tilesize,
resampling=resampling,
threads=threads,
tile_buffer=z_buffer,
)
data, raw_mask = split_mosaic_output(out)
mask = normalize_mask(raw_mask)
# NON considerare mai mask=None come vuota; salta solo se mask presente e tutta 0
if skip_empty and (mask is not None) and (mask.max() == 0):
continue
png = render(data, mask=mask, img_format="PNG")
tms_y = xyz_to_tms_y(z, t.y)
cur.execute(
"INSERT OR REPLACE INTO tiles (zoom_level, tile_column, tile_row, tile_data) VALUES (?,?,?,?)",
(z, t.x, tms_y, sqlite3.Binary(png))
)
if idx % 500 == 0 or idx == total:
print(f" {idx}/{total} tile...")
con.commit() # commit per livello
except KeyboardInterrupt:
print("\nInterrotto dall'utente. Scrittura parziale salvata.")
finally:
try:
cur.execute("ANALYZE;")
cur.execute("VACUUM;")
con.commit()
except Exception:
pass
con.close()
print(f"\nCompletato: {out_mbtiles}")
# ==============================
# CLI
# ==============================
def main():
ap = argparse.ArgumentParser(
description="Crea un MBTiles Terrain-RGB (PNG) mosaicato da tutti i TIF in una directory (z=5..14 di default)."
)
ap.add_argument("input_dir", help="Directory con i TIF/TIFF RGB (COG o GeoTIFF).")
ap.add_argument("out_mbtiles", help="Percorso file .mbtiles da creare.")
ap.add_argument("--min-zoom", type=int, default=DEFAULT_MIN_ZOOM)
ap.add_argument("--max-zoom", type=int, default=DEFAULT_MAX_ZOOM)
ap.add_argument("--tilesize", type=int, default=DEFAULT_TILESIZE, choices=[256, 512])
ap.add_argument("--threads", type=int, default=DEFAULT_THREADS)
ap.add_argument("--resampling", default=DEFAULT_RESAMPLING, choices=["nearest"])
ap.add_argument("--skip-empty", action="store_true", default=DEFAULT_SKIP_EMPTY,
help="Se impostato, non scrive tile completamente vuote (mask=0).")
ap.add_argument("--bbox", type=str, default=None,
help="BBox WGS84 minx,miny,maxx,maxy (se non indicata, usa i bounds dei dati).")
ap.add_argument("--bbox-eps", type=float, default=DEFAULT_BBOX_EPS,
help="Espansione della bbox in gradi per includere tile al bordo (default 0.001).")
ap.add_argument("--buffer", type=int, default=DEFAULT_BUFFER,
help="Tile buffer in pixel (consigliato 1-2 per zoom bassi). Se 0, usa auto-buffer=2 per z<=11.")
args = ap.parse_args()
bbox = parse_bbox(args.bbox) if args.bbox else None
generate_mbtiles_from_dir(
input_dir=args.input_dir,
out_mbtiles=args.out_mbtiles,
minzoom=args.min_zoom,
maxzoom=args.max_zoom,
tilesize=args.tilesize,
resampling=args.resampling,
threads=args.threads,
skip_empty=args.skip_empty,
bbox=bbox,
bbox_eps=args.bbox_eps,
buffer=args.buffer,
)
if __name__ == "__main__":
main()