diff --git a/DATA_SuPER/DEM_SuPER.py b/DATA_SuPER/DEM_SuPER.py index 53a643c..0018783 100644 --- a/DATA_SuPER/DEM_SuPER.py +++ b/DATA_SuPER/DEM_SuPER.py @@ -32,13 +32,14 @@ Step2b: Process ALOS PALSAR RTC Project data ------------------------------------------------------------------------------- Authors: Hong Xie -Last Updated: 2025-10-11 +Last Updated: 2025-10-20 =============================================================================== """ import os import sys import glob +from pathlib import Path import json import requests import zipfile @@ -46,11 +47,10 @@ import time import dask.distributed import logging import earthaccess -import geopandas as gpd import numpy as np from rioxarray import open_rasterio -sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +sys.path.append(str(Path(".").parent)) from utils.common_utils import ( setup_dask_environment, @@ -132,21 +132,21 @@ def download_granule(granule_urls: list[str], output_dir: str) -> bool: return True -def unzip_dem_files( - zip_file_list: list[str], unzip_dir: str, mode_str: str = "NASADEM" +def process_dem_zip( + zip_file_list: list[str], output_dir: Path, mode_str: str = "NASADEM" ) -> None: """ - 解压下载的 DEM ZIP 文件, + 处理下载的 DEM ZIP 文件, 若为 NASADEM, 则将其解压后的文件统一为可读写的 .hgt 格式; - 若为 ALOSDEM, 则将其解压后的文件统一为可读写的 .tif 格式. + 若为 ALOSDEM, 则直接读取 dem 并转储为 DEFLATE 压缩后的 tif 文件. Parameters ---------- zip_file_list: list 待解压的 ZIP 文件列表 - unzip_dir: str - 解压目录 + output_dir: Path + 输出目录 mode_str: str, optional 解压模式, 默认为 "NASADEM", 可选 "NASADEM", "ALOSDEM" """ @@ -170,7 +170,7 @@ def unzip_dem_files( else f"{tif_file}.hgt" ) # 解压文件到指定目录 - unzip_file_path = os.path.join(unzip_dir, new_name) + unzip_file_path = os.path.join(output_dir, new_name) if os.path.exists(unzip_file_path): continue with zip_ref.open(tif_file) as source_file: @@ -180,21 +180,21 @@ def unzip_dem_files( # 示例ZIP文件名称: AP_14354_FBS_F3000_RT1.zip # 仅解压包含 dem.tif 的文件, 其内部路径为 AP_14354_FBS_F3000_RT1/AP_14354_FBS_F3000_RT1.dem.tif tif_file = [f for f in zip_ref.namelist() if f.endswith("dem.tif")][0] - # 解压文件到指定目录 - unzip_file_path = os.path.join(unzip_dir, os.path.basename(tif_file)) - if os.path.exists(unzip_file_path): - continue - with zip_ref.open(tif_file) as source_file: - with open(unzip_file_path, "wb") as unzip_file: - unzip_file.write(source_file.read()) + # 读取压缩包内的文件并转储到指定目录 + dem = open_rasterio(zip_ref.open(tif_file)) + # 转储为 INT32 DEFLATE 压缩后的 .tif 文件 + dem.rio.to_raster( + output_dir / os.path.basename(tif_file), + compress="DEFLATE", + ) except Exception as e: - logging.error(f"Error unzipping {mode_str} to {unzip_dir}: {e}") + logging.error(f"Error unzipping {mode_str} to {output_dir}: {e}") return def process_granule( - unzip_dir: str, - output_dir: str, + unzip_dir: Path, + output_dir: Path, mode_str: str, name: str, roi: list, @@ -206,9 +206,9 @@ def process_granule( Parameters ---------- - unzip_dir: str + unzip_dir: Path 解压后的 DEM 文件根目录 - output_dir: str + output_dir: Path 输出根目录 mode_str: str 数据模式, 可选 NASADEM, ALOSDEM @@ -232,6 +232,9 @@ def process_granule( out_tif_name = f"DEM.{mode_str}.{tile_id}.2000.{name}.tif" elif mode_str == "ALOSDEM": dem_file_list = glob.glob(os.path.join(unzip_dir, "*.dem.tif")) + # 示例ZIP文件名称: AP_14354_FBS_F3000_RT1.zip + # 仅解压包含 dem.tif 的文件, 其内部路径为 AP_14354_FBS_F3000_RT1/AP_14354_FBS_F3000_RT1.dem.tif + tif_file = [f for f in zip_ref.namelist() if f.endswith("dem.tif")][0] out_tif_name = f"DEM.{mode_str}.{tile_id}.2011.{name}.tif" output_file = os.path.join(output_dir, out_tif_name) if not os.path.isfile(output_file): @@ -274,38 +277,39 @@ def process_granule( def main( - output_root_dir: str, - region_file: str, - asset_name: list, + output_root_dir: Path, + region_file: Path, tile_id: str, mode_str: str = "NASADEM", ): - region = gpd.read_file(region_file) - bbox = tuple(list(region.total_bounds)) # 最大外接矩形 results_urls = [] mode_str = mode_str.upper() - output_root_dir = os.path.join(output_root_dir, mode_str) + output_root_dir = output_root_dir / mode_str # 放置下载的 ZIP 文件 - download_dir = os.path.join(output_root_dir, "ZIP") + download_dir = output_root_dir / "ZIP" # 放置解压并预处理后的文件 - unzip_dir = os.path.join(download_dir, "UNZIP") - output_dir = os.path.join(output_root_dir, "TIF", tile_id) + unzip_dir = download_dir / "UNZIP" + output_dir = output_root_dir / "TIF" / tile_id os.makedirs(download_dir, exist_ok=True) os.makedirs(unzip_dir, exist_ok=True) os.makedirs(output_dir, exist_ok=True) - results_urls_file = os.path.join( - output_root_dir, f"{mode_str}_{tile_id}_results_urls.json" - ) + results_urls_file = output_root_dir / f"{mode_str}_{tile_id}_results_urls.json" # 配置日志 - logs_file = os.path.join(output_root_dir, f"{mode_str}_{tile_id}_SuPER.log") + logs_file = output_root_dir / f"{mode_str}_{tile_id}_SuPER.log" setup_logging(logs_file) - # 默认覆盖上一次检索记录 - results_urls = earthdata_search(asset_name, roi=bbox) - if mode_str == "ALOSDEM": + # 检索数据并存储, 默认覆盖上一次检索记录 + if mode_str == "NASADEM": + asset_name = ["NASADEM_HGT", "NASADEM_SC"] + results_urls = earthdata_search(asset_name, region_file=region_file) + elif mode_str == "ALOSDEM": + asset_name = ["ALOS_PSR_RTC_HIGH"] + results_urls = earthdata_search(asset_name, region_file=region_file) # ALOSDEM 数据仅保存含 "FBS" 字符串的 URL results_urls = [url for url in results_urls if "FBS" in url[0]] + else: + results_urls = [] with open(results_urls_file, "w") as f: json.dump(results_urls, f) logging.info(f"Found {len(results_urls)} {mode_str} granules after filtering.") @@ -314,7 +318,7 @@ def main( # 构造待解压的文件列表 zip_file_list = [ - os.path.join(download_dir, os.path.basename(result[0])) + download_dir / os.path.basename(result[0]) for result in results_urls ] @@ -329,7 +333,7 @@ def main( for granule_url in results_urls ] # 根据模式传递正确的解压标识 - unzip_tasks = dask.delayed(unzip_dem_files)(zip_file_list, unzip_dir, mode_str) + unzip_tasks = dask.delayed(process_dem_zip)(zip_file_list, unzip_dir, mode_str) if mode_str == "NASADEM": process_tasks = [ dask.delayed(process_granule)( @@ -337,16 +341,12 @@ def main( ) for name in ["elevation", "slope", "aspect"] ] - elif mode_str == "ALOSDEM": - process_tasks = [ - dask.delayed(process_granule)( - unzip_dir, output_dir, mode_str, "elevation", region, True, tile_id - ) - ] dask.compute(*download_tasks) dask.compute(unzip_tasks) - dask.compute(*process_tasks) + + if mode_str == "NASADEM": + dask.compute(*process_tasks) client.close() all_total_time = time.time() - all_start_time @@ -357,13 +357,11 @@ def main( if __name__ == "__main__": earthaccess.login(strategy="netrc", persist=True) - # region_file = "./data/vectors/wuling_guanqu_polygon.geojson" # tile_id = "49REL" - tile_id = "Wuhan" - region_file = f"./data/vectors/{tile_id}.geojson" - # asset_name = ["NASADEM_HGT", "NASADEM_SC"] + # tile_id = "Wuhan" + tile_id = "Hubei" + region_file = Path(f"./data/vectors/{tile_id}.geojson") # mode_str = "NASADEM" - asset_name = ["ALOS_PSR_RTC_HIGH"] mode_str = "ALOSDEM" - output_root_dir = ".\\data\\DEM" - main(output_root_dir, region_file, asset_name, tile_id, mode_str) + output_root_dir = Path(".\\data\\DEM") + main(output_root_dir, region_file, tile_id, mode_str)