feat(DEM_SuPER): 优化ALOSDEM文件处理逻辑, 直接读取并转储压缩包内部的DEM文件.

This commit is contained in:
谢泓 2025-10-20 10:37:22 +08:00
parent 62b2370fd7
commit 63c85931e3

View File

@ -32,13 +32,14 @@ Step2b: Process ALOS PALSAR RTC Project data
------------------------------------------------------------------------------- -------------------------------------------------------------------------------
Authors: Hong Xie Authors: Hong Xie
Last Updated: 2025-10-11 Last Updated: 2025-10-20
=============================================================================== ===============================================================================
""" """
import os import os
import sys import sys
import glob import glob
from pathlib import Path
import json import json
import requests import requests
import zipfile import zipfile
@ -46,11 +47,10 @@ import time
import dask.distributed import dask.distributed
import logging import logging
import earthaccess import earthaccess
import geopandas as gpd
import numpy as np import numpy as np
from rioxarray import open_rasterio from rioxarray import open_rasterio
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) sys.path.append(str(Path(".").parent))
from utils.common_utils import ( from utils.common_utils import (
setup_dask_environment, setup_dask_environment,
@ -132,21 +132,21 @@ def download_granule(granule_urls: list[str], output_dir: str) -> bool:
return True return True
def unzip_dem_files( def process_dem_zip(
zip_file_list: list[str], unzip_dir: str, mode_str: str = "NASADEM" zip_file_list: list[str], output_dir: Path, mode_str: str = "NASADEM"
) -> None: ) -> None:
""" """
解压下载的 DEM ZIP 文件, 处理下载的 DEM ZIP 文件,
若为 NASADEM, 则将其解压后的文件统一为可读写的 .hgt 格式; 若为 NASADEM, 则将其解压后的文件统一为可读写的 .hgt 格式;
若为 ALOSDEM, 将其解压后的文件统一为可读写的 .tif 格式. 若为 ALOSDEM, 直接读取 dem 并转储为 DEFLATE 压缩后的 tif 文件.
Parameters Parameters
---------- ----------
zip_file_list: list zip_file_list: list
待解压的 ZIP 文件列表 待解压的 ZIP 文件列表
unzip_dir: str output_dir: Path
解压目录 输出目录
mode_str: str, optional mode_str: str, optional
解压模式, 默认为 "NASADEM", 可选 "NASADEM", "ALOSDEM" 解压模式, 默认为 "NASADEM", 可选 "NASADEM", "ALOSDEM"
""" """
@ -170,7 +170,7 @@ def unzip_dem_files(
else f"{tif_file}.hgt" else f"{tif_file}.hgt"
) )
# 解压文件到指定目录 # 解压文件到指定目录
unzip_file_path = os.path.join(unzip_dir, new_name) unzip_file_path = os.path.join(output_dir, new_name)
if os.path.exists(unzip_file_path): if os.path.exists(unzip_file_path):
continue continue
with zip_ref.open(tif_file) as source_file: with zip_ref.open(tif_file) as source_file:
@ -180,21 +180,21 @@ def unzip_dem_files(
# 示例ZIP文件名称: AP_14354_FBS_F3000_RT1.zip # 示例ZIP文件名称: AP_14354_FBS_F3000_RT1.zip
# 仅解压包含 dem.tif 的文件, 其内部路径为 AP_14354_FBS_F3000_RT1/AP_14354_FBS_F3000_RT1.dem.tif # 仅解压包含 dem.tif 的文件, 其内部路径为 AP_14354_FBS_F3000_RT1/AP_14354_FBS_F3000_RT1.dem.tif
tif_file = [f for f in zip_ref.namelist() if f.endswith("dem.tif")][0] tif_file = [f for f in zip_ref.namelist() if f.endswith("dem.tif")][0]
# 解压文件到指定目录 # 读取压缩包内的文件并转储到指定目录
unzip_file_path = os.path.join(unzip_dir, os.path.basename(tif_file)) dem = open_rasterio(zip_ref.open(tif_file))
if os.path.exists(unzip_file_path): # 转储为 INT32 DEFLATE 压缩后的 .tif 文件
continue dem.rio.to_raster(
with zip_ref.open(tif_file) as source_file: output_dir / os.path.basename(tif_file),
with open(unzip_file_path, "wb") as unzip_file: compress="DEFLATE",
unzip_file.write(source_file.read()) )
except Exception as e: except Exception as e:
logging.error(f"Error unzipping {mode_str} to {unzip_dir}: {e}") logging.error(f"Error unzipping {mode_str} to {output_dir}: {e}")
return return
def process_granule( def process_granule(
unzip_dir: str, unzip_dir: Path,
output_dir: str, output_dir: Path,
mode_str: str, mode_str: str,
name: str, name: str,
roi: list, roi: list,
@ -206,9 +206,9 @@ def process_granule(
Parameters Parameters
---------- ----------
unzip_dir: str unzip_dir: Path
解压后的 DEM 文件根目录 解压后的 DEM 文件根目录
output_dir: str output_dir: Path
输出根目录 输出根目录
mode_str: str mode_str: str
数据模式, 可选 NASADEM, ALOSDEM 数据模式, 可选 NASADEM, ALOSDEM
@ -232,6 +232,9 @@ def process_granule(
out_tif_name = f"DEM.{mode_str}.{tile_id}.2000.{name}.tif" out_tif_name = f"DEM.{mode_str}.{tile_id}.2000.{name}.tif"
elif mode_str == "ALOSDEM": elif mode_str == "ALOSDEM":
dem_file_list = glob.glob(os.path.join(unzip_dir, "*.dem.tif")) dem_file_list = glob.glob(os.path.join(unzip_dir, "*.dem.tif"))
# 示例ZIP文件名称: AP_14354_FBS_F3000_RT1.zip
# 仅解压包含 dem.tif 的文件, 其内部路径为 AP_14354_FBS_F3000_RT1/AP_14354_FBS_F3000_RT1.dem.tif
tif_file = [f for f in zip_ref.namelist() if f.endswith("dem.tif")][0]
out_tif_name = f"DEM.{mode_str}.{tile_id}.2011.{name}.tif" out_tif_name = f"DEM.{mode_str}.{tile_id}.2011.{name}.tif"
output_file = os.path.join(output_dir, out_tif_name) output_file = os.path.join(output_dir, out_tif_name)
if not os.path.isfile(output_file): if not os.path.isfile(output_file):
@ -274,38 +277,39 @@ def process_granule(
def main( def main(
output_root_dir: str, output_root_dir: Path,
region_file: str, region_file: Path,
asset_name: list,
tile_id: str, tile_id: str,
mode_str: str = "NASADEM", mode_str: str = "NASADEM",
): ):
region = gpd.read_file(region_file)
bbox = tuple(list(region.total_bounds)) # 最大外接矩形
results_urls = [] results_urls = []
mode_str = mode_str.upper() mode_str = mode_str.upper()
output_root_dir = os.path.join(output_root_dir, mode_str) output_root_dir = output_root_dir / mode_str
# 放置下载的 ZIP 文件 # 放置下载的 ZIP 文件
download_dir = os.path.join(output_root_dir, "ZIP") download_dir = output_root_dir / "ZIP"
# 放置解压并预处理后的文件 # 放置解压并预处理后的文件
unzip_dir = os.path.join(download_dir, "UNZIP") unzip_dir = download_dir / "UNZIP"
output_dir = os.path.join(output_root_dir, "TIF", tile_id) output_dir = output_root_dir / "TIF" / tile_id
os.makedirs(download_dir, exist_ok=True) os.makedirs(download_dir, exist_ok=True)
os.makedirs(unzip_dir, exist_ok=True) os.makedirs(unzip_dir, exist_ok=True)
os.makedirs(output_dir, exist_ok=True) os.makedirs(output_dir, exist_ok=True)
results_urls_file = os.path.join( results_urls_file = output_root_dir / f"{mode_str}_{tile_id}_results_urls.json"
output_root_dir, f"{mode_str}_{tile_id}_results_urls.json"
)
# 配置日志 # 配置日志
logs_file = os.path.join(output_root_dir, f"{mode_str}_{tile_id}_SuPER.log") logs_file = output_root_dir / f"{mode_str}_{tile_id}_SuPER.log"
setup_logging(logs_file) setup_logging(logs_file)
# 默认覆盖上一次检索记录 # 检索数据并存储, 默认覆盖上一次检索记录
results_urls = earthdata_search(asset_name, roi=bbox) if mode_str == "NASADEM":
if mode_str == "ALOSDEM": asset_name = ["NASADEM_HGT", "NASADEM_SC"]
results_urls = earthdata_search(asset_name, region_file=region_file)
elif mode_str == "ALOSDEM":
asset_name = ["ALOS_PSR_RTC_HIGH"]
results_urls = earthdata_search(asset_name, region_file=region_file)
# ALOSDEM 数据仅保存含 "FBS" 字符串的 URL # ALOSDEM 数据仅保存含 "FBS" 字符串的 URL
results_urls = [url for url in results_urls if "FBS" in url[0]] results_urls = [url for url in results_urls if "FBS" in url[0]]
else:
results_urls = []
with open(results_urls_file, "w") as f: with open(results_urls_file, "w") as f:
json.dump(results_urls, f) json.dump(results_urls, f)
logging.info(f"Found {len(results_urls)} {mode_str} granules after filtering.") logging.info(f"Found {len(results_urls)} {mode_str} granules after filtering.")
@ -314,7 +318,7 @@ def main(
# 构造待解压的文件列表 # 构造待解压的文件列表
zip_file_list = [ zip_file_list = [
os.path.join(download_dir, os.path.basename(result[0])) download_dir / os.path.basename(result[0])
for result in results_urls for result in results_urls
] ]
@ -329,7 +333,7 @@ def main(
for granule_url in results_urls for granule_url in results_urls
] ]
# 根据模式传递正确的解压标识 # 根据模式传递正确的解压标识
unzip_tasks = dask.delayed(unzip_dem_files)(zip_file_list, unzip_dir, mode_str) unzip_tasks = dask.delayed(process_dem_zip)(zip_file_list, unzip_dir, mode_str)
if mode_str == "NASADEM": if mode_str == "NASADEM":
process_tasks = [ process_tasks = [
dask.delayed(process_granule)( dask.delayed(process_granule)(
@ -337,16 +341,12 @@ def main(
) )
for name in ["elevation", "slope", "aspect"] for name in ["elevation", "slope", "aspect"]
] ]
elif mode_str == "ALOSDEM":
process_tasks = [
dask.delayed(process_granule)(
unzip_dir, output_dir, mode_str, "elevation", region, True, tile_id
)
]
dask.compute(*download_tasks) dask.compute(*download_tasks)
dask.compute(unzip_tasks) dask.compute(unzip_tasks)
dask.compute(*process_tasks)
if mode_str == "NASADEM":
dask.compute(*process_tasks)
client.close() client.close()
all_total_time = time.time() - all_start_time all_total_time = time.time() - all_start_time
@ -357,13 +357,11 @@ def main(
if __name__ == "__main__": if __name__ == "__main__":
earthaccess.login(strategy="netrc", persist=True) earthaccess.login(strategy="netrc", persist=True)
# region_file = "./data/vectors/wuling_guanqu_polygon.geojson"
# tile_id = "49REL" # tile_id = "49REL"
tile_id = "Wuhan" # tile_id = "Wuhan"
region_file = f"./data/vectors/{tile_id}.geojson" tile_id = "Hubei"
# asset_name = ["NASADEM_HGT", "NASADEM_SC"] region_file = Path(f"./data/vectors/{tile_id}.geojson")
# mode_str = "NASADEM" # mode_str = "NASADEM"
asset_name = ["ALOS_PSR_RTC_HIGH"]
mode_str = "ALOSDEM" mode_str = "ALOSDEM"
output_root_dir = ".\\data\\DEM" output_root_dir = Path(".\\data\\DEM")
main(output_root_dir, region_file, asset_name, tile_id, mode_str) main(output_root_dir, region_file, tile_id, mode_str)