diff --git a/DATA_SuPER/DEM_SuPER.py b/DATA_SuPER/DEM_SuPER.py index 94d2c91..bad9a6b 100644 --- a/DATA_SuPER/DEM_SuPER.py +++ b/DATA_SuPER/DEM_SuPER.py @@ -4,25 +4,35 @@ This module contains functions related to preprocessing DEM data. For example, elevation, slope, aspect -Step1: Use earthaccess search and download NASADEM Data +Step1: Use earthaccess search and download DEM Data - NASADEM_HGT - includes 30m DEM, based on SRTM data - https://lpdaac.usgs.gov/products/nasadem_hgtv001/ - NASADEM_SC - includes 30m slope, aspect, based on NASADEM_HGT - https://lpdaac.usgs.gov/products/nasadem_scv001/ +- ALOS PALSAR RTC Project + - includes 12.5, 30m DEM, based on ALOS PALSAR data + - https://www.earthdata.nasa.gov/data/projects/alos-palsar-rtc-project -Step2: Process DEM data +Step2a: Process NASADEM data - 下载的 NASADEM 均为 *.zip 文件, 需先进行解压 - NASADEM 文件名称结构为: NASADEM_类型_网格编号/网格编号.数据类型 - 高程示例: NASADEM_HGT_n30e113/n30e113.hgt - 坡度示例: NASADEM_SC_n30e113/n30e113.slope - 坡向示例: NASADEM_SC_n30e113/n30e113.aspect -- 读取文件按网格进行裁剪并镶嵌, 坡度和坡向数据需要进行缩放处理, 将网格范围的结果保存为 *.tif 文件 +- 读取文件按指定范围进行裁剪并镶嵌, 坡度和坡向数据需要进行缩放处理, 将网格范围的结果保存为 *.tif 文件 + +Step2b: Process ALOS PALSAR RTC Project data +- 下载的 ALOS PALSAR RTC Project 均为 *.zip 文件, 需先进行解压 +- ALOS PALSAR RTC Project 文件名称结构为: AP_轨道号_CCC_DDDD_RT1.数据类型.tif + - 高程示例: AP_16112_FBS_F0620_RT1.dem.tif +- 读取文件按指定范围进行裁剪并镶嵌, 坡度和坡向数据需要进行缩放处理, 将网格范围的结果保存为 *.tif 文件 + ------------------------------------------------------------------------------- Authors: Hong Xie -Last Updated: 2025-08-05 +Last Updated: 2025-10-10 =============================================================================== """ @@ -41,7 +51,12 @@ from rioxarray import open_rasterio sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) -from utils.common_utils import setup_dask_environment, clip_image, mosaic_images +from utils.common_utils import ( + setup_dask_environment, + setup_logging, + clip_image, + mosaic_images, +) from HLS_SuPER.HLS_Su import earthdata_search @@ -103,53 +118,88 @@ def download_granule(granule_urls: list[str], output_dir: str) -> bool: return True -def unzip_nasadem_files(zip_file_list: list[str], unzip_dir: str): +def unzip_dem_files( + zip_file_list: list[str], unzip_dir: str, mode_str: str = "NASADEM" +) -> None: """ - 解压下载的 NASADEM ZIP 文件, 并将解压后的文件统一为可读写的 .hgt 格式 + 解压下载的 DEM ZIP 文件, + + 若为 NASADEM, 则将其解压后的文件统一为可读写的 .hgt 格式; + 若为 ALOSDEM, 则将其解压后的文件统一为可读写的 .tif 格式. + + Parameters + ---------- + zip_file_list: list + 待解压的 ZIP 文件列表 + unzip_dir: str + 解压目录 + mode_str: str, optional + 解压模式, 默认为 "NASADEM", 可选 "NASADEM", "ALOSDEM" """ try: for zip_path in zip_file_list: if not zipfile.is_zipfile(zip_path): continue with zipfile.ZipFile(zip_path, "r") as zip_ref: - # 仅解压包含 .hgt, .slope, .aspect 的文件 - for hgt_file in [f for f in zip_ref.namelist() if f.endswith((".hgt", ".slope", ".aspect"))]: - # 解压时重命名文件 - new_name = ( - hgt_file.replace(".hgt", ".elevation.hgt") - if hgt_file.endswith(".hgt") - else f"{hgt_file}.hgt" - ) + if mode_str == "NASADEM": + # 示例文件名称: NASADEM_HGT_n30e113.zip + # 仅解压包含 .hgt, .slope, .aspect 的文件 + for hgt_file in [ + f + for f in zip_ref.namelist() + if f.endswith((".hgt", ".slope", ".aspect")) + ]: + # 解压时重命名文件 + new_name = ( + hgt_file.replace(".hgt", ".elevation.hgt") + if hgt_file.endswith(".hgt") + else f"{hgt_file}.hgt" + ) unzip_file_path = os.path.join(unzip_dir, new_name) if os.path.exists(unzip_file_path): continue with zip_ref.open(hgt_file) as source_file: - with open(unzip_file_path, 'wb') as unzip_file: + with open(unzip_file_path, "wb") as unzip_file: unzip_file.write(source_file.read()) + elif mode_str == "ALOSDEM": + # 仅解压包含 dem.tif 的文件 + for tif_file in [ + f for f in zip_ref.namelist() if f.endswith("dem.tif") + ]: + new_name = tif_file.replace("dem.tif", "elevation.tif") + unzip_file_path = os.path.join(unzip_dir, new_name) + if os.path.exists(unzip_file_path): + continue + with zip_ref.open(tif_file) as source_file: + with open(unzip_file_path, "wb") as unzip_file: + unzip_file.write(source_file.read()) except Exception as e: - logging.error(f"Error unzipping NASADEM to {unzip_dir}: {e}") + logging.error(f"Error unzipping {mode_str} to {unzip_dir}: {e}") return def process_granule( unzip_dir: str, output_dir: str, + mode_str: str, name: str, roi: list, clip=True, tile_id: str = None, ) -> bool: """ - 读取解压并重命名处理后的指定类型 NASADEM 数据并进行预处理, 包括读取, 裁剪, 镶嵌, 并对坡度坡向进行缩放 + 读取解压并重命名处理后的指定类型 DEM 数据并进行预处理, 包括读取, 裁剪, 镶嵌, 并对坡度坡向进行缩放 Parameters ---------- unzip_dir: str - 解压后的 NASADEM 文件根目录 + 解压后的 DEM 文件根目录 output_dir: str 输出根目录 + mode_str: str + 数据模式, 可选 NASADEM, ALOSDEM name: str - 数据类型, 包括 elevation, slope, aspect + 数据类型, 可选 elevation, slope, aspect roi: list 网格范围 clip: bool @@ -163,17 +213,19 @@ def process_granule( 处理状态 True or False """ - dem_file_list = glob.glob(os.path.join(unzip_dir, f"*{name}.hgt")) - out_tif_name = f"DEM.NASADEM.{tile_id}.2000.{name}.tif" + if mode_str == "NASADEM": + dem_file_list = glob.glob(os.path.join(unzip_dir, f"*{name}.hgt")) + out_tif_name = f"DEM.NASADEM.{tile_id}.2000.{name}.tif" + elif mode_str == "ALOSDEM": + dem_file_list = glob.glob(os.path.join(unzip_dir, f"*{name}.tif")) + out_tif_name = f"DEM.ALOSDEM.{tile_id}.2014.{name}.tif" output_file = os.path.join(output_dir, out_tif_name) if not os.path.isfile(output_file): try: dem_raster_list = [] for dem_path in dem_file_list: dem = ( - open_rasterio(dem_path) - .squeeze(dim="band", drop=True) - .rename(name) + open_rasterio(dem_path).squeeze(dim="band", drop=True).rename(name) ) if name == "slope" or name == "aspect": org_attrs = dem.attrs @@ -190,7 +242,9 @@ def process_granule( dem_mosaiced = mosaic_images(dem_raster_list, nodata=-32768) if roi is not None and clip: dem_mosaiced = clip_image(dem_mosaiced, roi, clip_by_box=True) - dem_mosaiced.rio.to_raster(output_file, driver="COG", compress="DEFLATE") + dem_mosaiced.rio.to_raster( + output_file, driver="COG", compress="DEFLATE" + ) except Exception as e: logging.error(f"Error processing files in {name}: {e}") return False @@ -200,54 +254,72 @@ def process_granule( return True -def main(region: list, asset_name: list, tile_id: str): +def main( + output_root_dir: str, + region: list, + asset_name: list, + tile_id: str, + mode_str: str = "NASADEM", +): bbox = tuple(list(region.total_bounds)) - # 示例文件名称: NASADEM_HGT_n30e113.zip results_urls = [] - output_root_dir = ".\\data\\DEM\\NASADEM" + mode_str = mode_str.upper() + output_root_dir = os.path.join(output_root_dir, mode_str) # 放置下载的 ZIP 文件 download_dir = os.path.join(output_root_dir, "ZIP") # 放置解压并预处理后的文件 unzip_dir = os.path.join(download_dir, "UNZIP") output_dir = os.path.join(output_root_dir, "TIF", tile_id) + os.makedirs(download_dir, exist_ok=True) os.makedirs(unzip_dir, exist_ok=True) os.makedirs(output_dir, exist_ok=True) - results_urls_file = f"{output_root_dir}\\NASADEM_{tile_id}_results_urls.json" + results_urls_file = os.path.join( + output_root_dir, f"{mode_str}_{tile_id}_results_urls.json" + ) + + # 配置日志 + logs_file = os.path.join(output_root_dir, f"{mode_str}_{tile_id}_SuPER.log") + setup_logging(logs_file) + # 默认覆盖上一次检索记录 results_urls = earthdata_search(asset_name, roi=bbox) with open(results_urls_file, "w") as f: json.dump(results_urls, f) - # 构造待解压的文件列表 - zip_file_list = [os.path.join(download_dir, os.path.basename(result[0])) for result in results_urls] + logging.info(f"Found {len(results_urls)} {mode_str} granules.") + if len(results_urls) == 0: + return - # 配置日志 - logging.basicConfig( - level=logging.INFO, - format="%(levelname)s:%(asctime)s ||| %(message)s", - handlers=[ - logging.StreamHandler(sys.stdout), - logging.FileHandler(f"{output_root_dir}\\NASADEM_SuPER.log"), - ], - ) - logging.info(f"Found {len(results_urls)} NASADEM granules.") + # 构造待解压的文件列表 + zip_file_list = [ + os.path.join(download_dir, os.path.basename(result[0])) + for result in results_urls + ] client = dask.distributed.Client(timeout=60, memory_limit="8GB") client.run(setup_dask_environment) all_start_time = time.time() client.scatter(results_urls) - logging.info(f"Start processing NASADEM ...") + logging.info(f"Start processing {mode_str} ...") download_tasks = [ dask.delayed(download_granule)(granule_url, download_dir) for granule_url in results_urls ] - unzip_tasks = dask.delayed(unzip_nasadem_files)(zip_file_list, unzip_dir) - process_tasks = [ - dask.delayed(process_granule)( - unzip_dir, output_dir, name, region, True, tile_id - ) - for name in ["elevation", "slope", "aspect"] - ] + # 根据模式传递正确的解压标识 + unzip_tasks = dask.delayed(unzip_dem_files)(zip_file_list, unzip_dir, mode_str) + if mode_str == "NASADEM": + process_tasks = [ + dask.delayed(process_granule)( + unzip_dir, output_dir, mode_str, name, region, True, tile_id + ) + for name in ["elevation", "slope", "aspect"] + ] + elif mode_str == "ALOSDEM": + process_tasks = [ + dask.delayed(process_granule)( + unzip_dir, output_dir, mode_str, "elevation", region, True, tile_id + ) + ] dask.compute(*download_tasks) dask.compute(unzip_tasks) @@ -256,14 +328,19 @@ def main(region: list, asset_name: list, tile_id: str): client.close() all_total_time = time.time() - all_start_time logging.info( - f"All NASADEM Downloading complete and proccessed. Total time: {all_total_time} seconds" + f"All {mode_str} Downloading complete and processed. Total time: {all_total_time} seconds" ) if __name__ == "__main__": earthaccess.login(persist=True) # region = gpd.read_file("./data/vectors/wuling_guanqu_polygon.geojson") - tile_id = "49REL" + # tile_id = "49REL" + tile_id = "Wuhan" region = gpd.read_file(f"./data/vectors/{tile_id}.geojson") - asset_name = ["NASADEM_HGT", "NASADEM_SC"] - main(region, asset_name, tile_id) + # asset_name = ["NASADEM_HGT", "NASADEM_SC"] + # mode_str = "NASADEM" + asset_name = ["ALOS_PSR_RTC_HIGH"] + mode_str = "ALOSDEM" + output_root_dir = ".\\data\\DEM" + main(output_root_dir, region, asset_name, tile_id, mode_str)