# -*- coding: utf-8 -*- """ =============================================================================== This module contains functions related to preprocessing DEM data. For example, elevation, slope, aspect Step1: Use earthaccess search and download DEM Data - NASADEM_HGT - includes 30m DEM, based on SRTM data - https://lpdaac.usgs.gov/products/nasadem_hgtv001/ - NASADEM_SC - includes 30m slope, aspect, based on NASADEM_HGT - https://lpdaac.usgs.gov/products/nasadem_scv001/ - ALOS PALSAR RTC Project - includes 12.5, 30m DEM, based on ALOS PALSAR data - https://www.earthdata.nasa.gov/data/projects/alos-palsar-rtc-project Step2a: Process NASADEM data - 下载的 NASADEM 均为 *.zip 文件, 需先进行解压 - NASADEM 文件名称结构为: NASADEM_类型_网格编号/网格编号.数据类型 - 高程示例: NASADEM_HGT_n30e113/n30e113.hgt - 坡度示例: NASADEM_SC_n30e113/n30e113.slope - 坡向示例: NASADEM_SC_n30e113/n30e113.aspect - 读取文件按指定范围进行裁剪并镶嵌, 坡度和坡向数据需要进行缩放处理, 将网格范围的结果保存为 *.tif 文件 Step2b: Process ALOS PALSAR RTC Project data - 下载的 ALOS PALSAR RTC Project 均为 *.zip 文件, 需先进行解压 - ALOS PALSAR RTC Project 文件名称结构为: AP_轨道号_CCC_DDDD_RT1.数据类型.tif - 高程示例: AP_16112_FBS_F0620_RT1.dem.tif - 读取文件按指定范围进行裁剪并镶嵌, 坡度和坡向数据需要进行缩放处理, 将网格范围的结果保存为 *.tif 文件 ------------------------------------------------------------------------------- Authors: Hong Xie Last Updated: 2025-10-10 =============================================================================== """ import os import sys import glob import json import zipfile import time import dask.distributed import logging import earthaccess import geopandas as gpd import numpy as np from rioxarray import open_rasterio sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from utils.common_utils import ( setup_dask_environment, setup_logging, clip_image, mosaic_images, ) from HLS_SuPER.HLS_Su import earthdata_search def reorganize_nasadem_urls(dem_results_urls: list): """ 重组 NASADEM 下载链接 将同一格网内的高程, 坡度坡向数据链接进行组合 Parameters ---------- dem_results_urls: list 查询返回的 NASADEM 数据 URL 列表 Returns ------- grouped_results_urls: list 重组后的 NASADEM 数据 URL 列表 """ tile_ids = [] for granule in dem_results_urls: tile_id = granule[0].split("/")[-2].split("_")[-1] tile_ids.append(tile_id) tile_ids = np.array(tile_ids) # 根据瓦片ID找到对应的索引 tile_id_indices = np.where(tile_ids == tile_id) # 根据索引过滤结果 return [dem_results_urls[i] for i in tile_id_indices[0]] def download_granule(granule_urls: list[str], output_dir: str) -> bool: """ 下载单批数据 Parameters ---------- granule_urls: list 查询返回的规范化待下载数据 URL 列表 output_dir: str 下载目录 Returns ------- download_state: bool 下载状态 True or False """ # 检查是否已下载 if not all( os.path.isfile(os.path.join(output_dir, os.path.basename(url))) for url in granule_urls ): try: earthaccess.download(granule_urls, output_dir) except Exception as e: logging.error(f"Error downloading data: {e}. Skipping.") return False logging.info("All Data already downloaded.") return True def unzip_dem_files( zip_file_list: list[str], unzip_dir: str, mode_str: str = "NASADEM" ) -> None: """ 解压下载的 DEM ZIP 文件, 若为 NASADEM, 则将其解压后的文件统一为可读写的 .hgt 格式; 若为 ALOSDEM, 则将其解压后的文件统一为可读写的 .tif 格式. Parameters ---------- zip_file_list: list 待解压的 ZIP 文件列表 unzip_dir: str 解压目录 mode_str: str, optional 解压模式, 默认为 "NASADEM", 可选 "NASADEM", "ALOSDEM" """ try: for zip_path in zip_file_list: if not zipfile.is_zipfile(zip_path): continue with zipfile.ZipFile(zip_path, "r") as zip_ref: if mode_str == "NASADEM": # 示例文件名称: NASADEM_HGT_n30e113.zip # 仅解压包含 .hgt, .slope, .aspect 的文件 for hgt_file in [ f for f in zip_ref.namelist() if f.endswith((".hgt", ".slope", ".aspect")) ]: # 解压时重命名文件 new_name = ( hgt_file.replace(".hgt", ".elevation.hgt") if hgt_file.endswith(".hgt") else f"{hgt_file}.hgt" ) unzip_file_path = os.path.join(unzip_dir, new_name) if os.path.exists(unzip_file_path): continue with zip_ref.open(hgt_file) as source_file: with open(unzip_file_path, "wb") as unzip_file: unzip_file.write(source_file.read()) elif mode_str == "ALOSDEM": # 仅解压包含 dem.tif 的文件 for tif_file in [ f for f in zip_ref.namelist() if f.endswith("dem.tif") ]: new_name = tif_file.replace("dem.tif", "elevation.tif") unzip_file_path = os.path.join(unzip_dir, new_name) if os.path.exists(unzip_file_path): continue with zip_ref.open(tif_file) as source_file: with open(unzip_file_path, "wb") as unzip_file: unzip_file.write(source_file.read()) except Exception as e: logging.error(f"Error unzipping {mode_str} to {unzip_dir}: {e}") return def process_granule( unzip_dir: str, output_dir: str, mode_str: str, name: str, roi: list, clip=True, tile_id: str = None, ) -> bool: """ 读取解压并重命名处理后的指定类型 DEM 数据并进行预处理, 包括读取, 裁剪, 镶嵌, 并对坡度坡向进行缩放 Parameters ---------- unzip_dir: str 解压后的 DEM 文件根目录 output_dir: str 输出根目录 mode_str: str 数据模式, 可选 NASADEM, ALOSDEM name: str 数据类型, 可选 elevation, slope, aspect roi: list 网格范围 clip: bool 是否裁剪 tile_id: str 网格编号 Returns ------- process_state: bool 处理状态 True or False """ if mode_str == "NASADEM": dem_file_list = glob.glob(os.path.join(unzip_dir, f"*{name}.hgt")) out_tif_name = f"DEM.NASADEM.{tile_id}.2000.{name}.tif" elif mode_str == "ALOSDEM": dem_file_list = glob.glob(os.path.join(unzip_dir, f"*{name}.tif")) out_tif_name = f"DEM.ALOSDEM.{tile_id}.2014.{name}.tif" output_file = os.path.join(output_dir, out_tif_name) if not os.path.isfile(output_file): try: dem_raster_list = [] for dem_path in dem_file_list: dem = ( open_rasterio(dem_path).squeeze(dim="band", drop=True).rename(name) ) if name == "slope" or name == "aspect": org_attrs = dem.attrs dem = dem * 0.01 # 恢复源数据属性信息 dem.attrs = org_attrs.copy() dem.rio.write_crs("EPSG:4326", inplace=True) dem.attrs["scale_factor"] = 1 dem_raster_list.append(dem) if len(dem_raster_list) >= 1: if name == "slope" or name == "aspect": dem_mosaiced = mosaic_images(dem_raster_list, nodata=-9999) else: dem_mosaiced = mosaic_images(dem_raster_list, nodata=-32768) if roi is not None and clip: dem_mosaiced = clip_image(dem_mosaiced, roi, clip_by_box=True) dem_mosaiced.rio.to_raster( output_file, driver="COG", compress="DEFLATE" ) except Exception as e: logging.error(f"Error processing files in {name}: {e}") return False logging.info(f"Processed {output_file} successfully.") else: logging.warning(f"{output_file} already exists. Skipping.") return True def main( output_root_dir: str, region: list, asset_name: list, tile_id: str, mode_str: str = "NASADEM", ): bbox = tuple(list(region.total_bounds)) results_urls = [] mode_str = mode_str.upper() output_root_dir = os.path.join(output_root_dir, mode_str) # 放置下载的 ZIP 文件 download_dir = os.path.join(output_root_dir, "ZIP") # 放置解压并预处理后的文件 unzip_dir = os.path.join(download_dir, "UNZIP") output_dir = os.path.join(output_root_dir, "TIF", tile_id) os.makedirs(download_dir, exist_ok=True) os.makedirs(unzip_dir, exist_ok=True) os.makedirs(output_dir, exist_ok=True) results_urls_file = os.path.join( output_root_dir, f"{mode_str}_{tile_id}_results_urls.json" ) # 配置日志 logs_file = os.path.join(output_root_dir, f"{mode_str}_{tile_id}_SuPER.log") setup_logging(logs_file) # 默认覆盖上一次检索记录 results_urls = earthdata_search(asset_name, roi=bbox) with open(results_urls_file, "w") as f: json.dump(results_urls, f) logging.info(f"Found {len(results_urls)} {mode_str} granules.") if len(results_urls) == 0: return # 构造待解压的文件列表 zip_file_list = [ os.path.join(download_dir, os.path.basename(result[0])) for result in results_urls ] client = dask.distributed.Client(timeout=60, memory_limit="8GB") client.run(setup_dask_environment) all_start_time = time.time() client.scatter(results_urls) logging.info(f"Start processing {mode_str} ...") download_tasks = [ dask.delayed(download_granule)(granule_url, download_dir) for granule_url in results_urls ] # 根据模式传递正确的解压标识 unzip_tasks = dask.delayed(unzip_dem_files)(zip_file_list, unzip_dir, mode_str) if mode_str == "NASADEM": process_tasks = [ dask.delayed(process_granule)( unzip_dir, output_dir, mode_str, name, region, True, tile_id ) for name in ["elevation", "slope", "aspect"] ] elif mode_str == "ALOSDEM": process_tasks = [ dask.delayed(process_granule)( unzip_dir, output_dir, mode_str, "elevation", region, True, tile_id ) ] dask.compute(*download_tasks) dask.compute(unzip_tasks) dask.compute(*process_tasks) client.close() all_total_time = time.time() - all_start_time logging.info( f"All {mode_str} Downloading complete and processed. Total time: {all_total_time} seconds" ) if __name__ == "__main__": earthaccess.login(persist=True) # region = gpd.read_file("./data/vectors/wuling_guanqu_polygon.geojson") # tile_id = "49REL" tile_id = "Wuhan" region = gpd.read_file(f"./data/vectors/{tile_id}.geojson") # asset_name = ["NASADEM_HGT", "NASADEM_SC"] # mode_str = "NASADEM" asset_name = ["ALOS_PSR_RTC_HIGH"] mode_str = "ALOSDEM" output_root_dir = ".\\data\\DEM" main(output_root_dir, region, asset_name, tile_id, mode_str)