86 lines
2.3 KiB
Python
86 lines
2.3 KiB
Python
import boto3
|
||
import os
|
||
import logging
|
||
from botocore import UNSIGNED
|
||
from botocore.config import Config
|
||
|
||
BUCKET = "copernicus-dem-30m"
|
||
OUTPUT_DIR = "./glo30_europe"
|
||
LOG_FILE = "download_europe.log"
|
||
|
||
# Europa approx:
|
||
lat_range = range(34, 73) # N34–N72
|
||
lon_east = range(0, 46) # E000–E045
|
||
lon_west = range(1, 26) # W001–W025
|
||
|
||
logging.basicConfig(
|
||
filename=LOG_FILE,
|
||
level=logging.INFO,
|
||
format="%(asctime)s [%(levelname)s] %(message)s",
|
||
)
|
||
|
||
console = logging.getLogger("console")
|
||
console.setLevel(logging.INFO)
|
||
console.addHandler(logging.StreamHandler())
|
||
|
||
def log(msg):
|
||
logging.info(msg)
|
||
console.info(msg)
|
||
|
||
os.makedirs(OUTPUT_DIR, exist_ok=True)
|
||
|
||
s3 = boto3.client("s3", config=Config(signature_version=UNSIGNED))
|
||
|
||
def folder_name(lat, lon, east=True):
|
||
if east:
|
||
return f"Copernicus_DSM_COG_10_N{lat:02d}_00_E{lon:03d}_00_DEM/"
|
||
else:
|
||
return f"Copernicus_DSM_COG_10_N{lat:02d}_00_W{lon:03d}_00_DEM/"
|
||
|
||
log("Inizio scansione directory europee...")
|
||
|
||
found_folders = []
|
||
|
||
paginator = s3.get_paginator("list_objects_v2")
|
||
|
||
for page in paginator.paginate(Bucket=BUCKET, Delimiter="/"):
|
||
for prefix in page.get("CommonPrefixes", []):
|
||
folder = prefix["Prefix"]
|
||
|
||
# EASTERN EUROPE
|
||
for lat in lat_range:
|
||
for lon in lon_east:
|
||
if folder == folder_name(lat, lon, east=True):
|
||
found_folders.append(folder)
|
||
log(f"[FOUND] {folder}")
|
||
|
||
# WESTERN EUROPE (longitudes W)
|
||
for lat in lat_range:
|
||
for lon in lon_west:
|
||
if folder == folder_name(lat, lon, east=False):
|
||
found_folders.append(folder)
|
||
log(f"[FOUND] {folder}")
|
||
|
||
log(f"Trovate {len(found_folders)} cartelle europee")
|
||
|
||
# -----------------------------
|
||
# DOWNLOAD DEM PRINCIPALE
|
||
# -----------------------------
|
||
for folder in found_folders:
|
||
tif_name = folder[:-1] + ".tif"
|
||
key = folder + tif_name.split("/")[-1]
|
||
out_path = os.path.join(OUTPUT_DIR, tif_name.split("/")[-1])
|
||
|
||
if os.path.exists(out_path):
|
||
log(f"[SKIP] {out_path} già presente")
|
||
continue
|
||
|
||
log(f"[DOWNLOAD] {key}")
|
||
try:
|
||
s3.download_file(BUCKET, key, out_path)
|
||
log(f"[OK] {key} scaricato")
|
||
except Exception as e:
|
||
log(f"[ERROR] {key} → {e}")
|
||
|
||
log("Completato.")
|
||
|