feat(DEM_SuPER): 添加对ALOS 12.5m DEM的本地解压支持.

This commit is contained in:
谢泓 2025-10-11 13:20:23 +08:00
parent b872e9e3f1
commit 418923f7df

View File

@ -32,7 +32,7 @@ Step2b: Process ALOS PALSAR RTC Project data
-------------------------------------------------------------------------------
Authors: Hong Xie
Last Updated: 2025-10-10
Last Updated: 2025-10-11
===============================================================================
"""
@ -40,6 +40,7 @@ import os
import sys
import glob
import json
import requests
import zipfile
import time
import dask.distributed
@ -112,6 +113,19 @@ def download_granule(granule_urls: list[str], output_dir: str) -> bool:
try:
earthaccess.download(granule_urls, output_dir)
except Exception as e:
# 下载失败时, 先尝试使用 requests 库下载
for url in granule_urls:
response = requests.get(url)
if response.status_code == 200:
with open(
os.path.join(output_dir, os.path.basename(url)), "wb"
) as f:
f.write(response.content)
else:
logging.error(
f"Error downloading data: {response.status_code}. Skipping."
)
return False
logging.error(f"Error downloading data: {e}. Skipping.")
return False
logging.info("All Data already downloaded.")
@ -138,41 +152,41 @@ def unzip_dem_files(
"""
try:
for zip_path in zip_file_list:
if not zipfile.is_zipfile(zip_path):
if not os.path.isfile(zip_path) or not zipfile.is_zipfile(zip_path):
continue
with zipfile.ZipFile(zip_path, "r") as zip_ref:
if mode_str == "NASADEM":
# 示例文件名称: NASADEM_HGT_n30e113.zip
# 示例ZIP文件名称: NASADEM_HGT_n30e113.zip
# 仅解压包含 .hgt, .slope, .aspect 的文件
for hgt_file in [
for tif_file in [
f
for f in zip_ref.namelist()
if f.endswith((".hgt", ".slope", ".aspect"))
]:
# 解压时重命名文件
new_name = (
hgt_file.replace(".hgt", ".elevation.hgt")
if hgt_file.endswith(".hgt")
else f"{hgt_file}.hgt"
tif_file.replace(".hgt", ".elevation.hgt")
if tif_file.endswith(".hgt")
else f"{tif_file}.hgt"
)
unzip_file_path = os.path.join(unzip_dir, new_name)
if os.path.exists(unzip_file_path):
continue
with zip_ref.open(hgt_file) as source_file:
with open(unzip_file_path, "wb") as unzip_file:
unzip_file.write(source_file.read())
elif mode_str == "ALOSDEM":
# 仅解压包含 dem.tif 的文件
for tif_file in [
f for f in zip_ref.namelist() if f.endswith("dem.tif")
]:
new_name = tif_file.replace("dem.tif", "elevation.tif")
# 解压文件到指定目录
unzip_file_path = os.path.join(unzip_dir, new_name)
if os.path.exists(unzip_file_path):
continue
with zip_ref.open(tif_file) as source_file:
with open(unzip_file_path, "wb") as unzip_file:
unzip_file.write(source_file.read())
elif mode_str == "ALOSDEM":
# 示例ZIP文件名称: AP_14354_FBS_F3000_RT1.zip
# 仅解压包含 dem.tif 的文件, 其内部路径为 AP_14354_FBS_F3000_RT1/AP_14354_FBS_F3000_RT1.dem.tif
tif_file = [f for f in zip_ref.namelist() if f.endswith("dem.tif")][0]
# 解压文件到指定目录
unzip_file_path = os.path.join(unzip_dir, os.path.basename(tif_file))
if os.path.exists(unzip_file_path):
continue
with zip_ref.open(tif_file) as source_file:
with open(unzip_file_path, "wb") as unzip_file:
unzip_file.write(source_file.read())
except Exception as e:
logging.error(f"Error unzipping {mode_str} to {unzip_dir}: {e}")
return
@ -215,26 +229,26 @@ def process_granule(
if mode_str == "NASADEM":
dem_file_list = glob.glob(os.path.join(unzip_dir, f"*{name}.hgt"))
out_tif_name = f"DEM.NASADEM.{tile_id}.2000.{name}.tif"
out_tif_name = f"DEM.{mode_str}.{tile_id}.2000.{name}.tif"
elif mode_str == "ALOSDEM":
dem_file_list = glob.glob(os.path.join(unzip_dir, f"*{name}.tif"))
out_tif_name = f"DEM.ALOSDEM.{tile_id}.2014.{name}.tif"
out_tif_name = f"DEM.{mode_str}.{tile_id}.2011.{name}.tif"
output_file = os.path.join(output_dir, out_tif_name)
if not os.path.isfile(output_file):
try:
dem_raster_list = []
for dem_path in dem_file_list:
dem = (
raster = (
open_rasterio(dem_path).squeeze(dim="band", drop=True).rename(name)
)
if name == "slope" or name == "aspect":
org_attrs = dem.attrs
dem = dem * 0.01
org_attrs = raster.attrs
raster = raster * 0.01
# 恢复源数据属性信息
dem.attrs = org_attrs.copy()
dem.rio.write_crs("EPSG:4326", inplace=True)
dem.attrs["scale_factor"] = 1
dem_raster_list.append(dem)
raster.attrs = org_attrs.copy()
raster.rio.write_crs("EPSG:4326", inplace=True)
raster.attrs["scale_factor"] = 1
dem_raster_list.append(raster)
if len(dem_raster_list) >= 1:
if name == "slope" or name == "aspect":
dem_mosaiced = mosaic_images(dem_raster_list, nodata=-9999)
@ -333,7 +347,7 @@ def main(
if __name__ == "__main__":
earthaccess.login(persist=True)
earthaccess.login(strategy="netrc", persist=True)
# region = gpd.read_file("./data/vectors/wuling_guanqu_polygon.geojson")
# tile_id = "49REL"
tile_id = "Wuhan"