270 lines
9.1 KiB
Python

# -*- coding: utf-8 -*-
"""
===============================================================================
This module contains functions related to preprocessing DEM data.
For example, elevation, slope, aspect
Step1: Use earthaccess search and download NASADEM Data
- NASADEM_HGT
- includes 30m DEM, based on SRTM data
- https://lpdaac.usgs.gov/products/nasadem_hgtv001/
- NASADEM_SC
- includes 30m slope, aspect, based on NASADEM_HGT
- https://lpdaac.usgs.gov/products/nasadem_scv001/
Step2: Process DEM data
- 下载的 NASADEM 均为 *.zip 文件, 需先进行解压
- NASADEM 文件名称结构为: NASADEM_类型_网格编号/网格编号.数据类型
- 高程示例: NASADEM_HGT_n30e113/n30e113.hgt
- 坡度示例: NASADEM_SC_n30e113/n30e113.slope
- 坡向示例: NASADEM_SC_n30e113/n30e113.aspect
- 读取文件按网格进行裁剪并镶嵌, 坡度和坡向数据需要进行缩放处理, 将网格范围的结果保存为 *.tif 文件
-------------------------------------------------------------------------------
Authors: Hong Xie
Last Updated: 2025-08-05
===============================================================================
"""
import os
import sys
import glob
import json
import zipfile
import time
import dask.distributed
import logging
import earthaccess
import geopandas as gpd
import numpy as np
from rioxarray import open_rasterio
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from utils.common_utils import setup_dask_environment, clip_image, mosaic_images
from HLS_SuPER.HLS_Su import earthdata_search
def reorganize_nasadem_urls(dem_results_urls: list):
"""
重组 NASADEM 下载链接
将同一格网内的高程, 坡度坡向数据链接进行组合
Parameters
----------
dem_results_urls: list
查询返回的 NASADEM 数据 URL 列表
Returns
-------
grouped_results_urls: list
重组后的 NASADEM 数据 URL 列表
"""
tile_ids = []
for granule in dem_results_urls:
tile_id = granule[0].split("/")[-2].split("_")[-1]
tile_ids.append(tile_id)
tile_ids = np.array(tile_ids)
# 根据瓦片ID找到对应的索引
tile_id_indices = np.where(tile_ids == tile_id)
# 根据索引过滤结果
return [dem_results_urls[i] for i in tile_id_indices[0]]
def download_granule(granule_urls: list[str], output_dir: str) -> bool:
"""
下载单批数据
Parameters
----------
granule_urls: list
查询返回的规范化待下载数据 URL 列表
output_dir: str
下载目录
Returns
-------
download_state: bool
下载状态 True or False
"""
# 检查是否已下载
if not all(
os.path.isfile(os.path.join(output_dir, os.path.basename(url)))
for url in granule_urls
):
try:
earthaccess.download(granule_urls, output_dir)
except Exception as e:
logging.error(f"Error downloading data: {e}. Skipping.")
return False
logging.info("All Data already downloaded.")
return True
def unzip_nasadem_files(zip_file_list: list[str], unzip_dir: str):
"""
解压下载的 NASADEM ZIP 文件, 并将解压后的文件统一为可读写的 .hgt 格式
"""
try:
for zip_path in zip_file_list:
if not zipfile.is_zipfile(zip_path):
continue
with zipfile.ZipFile(zip_path, "r") as zip_ref:
# 仅解压包含 .hgt, .slope, .aspect 的文件
for hgt_file in [f for f in zip_ref.namelist() if f.endswith((".hgt", ".slope", ".aspect"))]:
# 解压时重命名文件
new_name = (
hgt_file.replace(".hgt", ".elevation.hgt")
if hgt_file.endswith(".hgt")
else f"{hgt_file}.hgt"
)
unzip_file_path = os.path.join(unzip_dir, new_name)
if os.path.exists(unzip_file_path):
continue
with zip_ref.open(hgt_file) as source_file:
with open(unzip_file_path, 'wb') as unzip_file:
unzip_file.write(source_file.read())
except Exception as e:
logging.error(f"Error unzipping NASADEM to {unzip_dir}: {e}")
return
def process_granule(
unzip_dir: str,
output_dir: str,
name: str,
roi: list,
clip=True,
tile_id: str = None,
) -> bool:
"""
读取解压并重命名处理后的指定类型 NASADEM 数据并进行预处理, 包括读取, 裁剪, 镶嵌, 并对坡度坡向进行缩放
Parameters
----------
unzip_dir: str
解压后的 NASADEM 文件根目录
output_dir: str
输出根目录
name: str
数据类型, 包括 elevation, slope, aspect
roi: list
网格范围
clip: bool
是否裁剪
tile_id: str
网格编号
Returns
-------
process_state: bool
处理状态 True or False
"""
dem_file_list = glob.glob(os.path.join(unzip_dir, f"*{name}.hgt"))
out_tif_name = f"DEM.NASADEM.{tile_id}.2000.{name}.tif"
output_file = os.path.join(output_dir, out_tif_name)
if not os.path.isfile(output_file):
try:
dem_raster_list = []
for dem_path in dem_file_list:
dem = (
open_rasterio(dem_path)
.squeeze(dim="band", drop=True)
.rename(name)
)
if name == "slope" or name == "aspect":
org_attrs = dem.attrs
dem = dem * 0.01
# 恢复源数据属性信息
dem.attrs = org_attrs.copy()
dem.rio.write_crs("EPSG:4326", inplace=True)
dem.attrs["scale_factor"] = 1
dem_raster_list.append(dem)
if len(dem_raster_list) >= 1:
if name == "slope" or name == "aspect":
dem_mosaiced = mosaic_images(dem_raster_list, nodata=-9999)
else:
dem_mosaiced = mosaic_images(dem_raster_list, nodata=-32768)
if roi is not None and clip:
dem_mosaiced = clip_image(dem_mosaiced, roi, clip_by_box=True)
dem_mosaiced.rio.to_raster(output_file, driver="COG", compress="DEFLATE")
except Exception as e:
logging.error(f"Error processing files in {name}: {e}")
return False
logging.info(f"Processed {output_file} successfully.")
else:
logging.warning(f"{output_file} already exists. Skipping.")
return True
def main(region: list, asset_name: list, tile_id: str):
bbox = tuple(list(region.total_bounds))
# 示例文件名称: NASADEM_HGT_n30e113.zip
results_urls = []
output_root_dir = ".\\data\\DEM\\NASADEM"
# 放置下载的 ZIP 文件
download_dir = os.path.join(output_root_dir, "ZIP")
# 放置解压并预处理后的文件
unzip_dir = os.path.join(download_dir, "UNZIP")
output_dir = os.path.join(output_root_dir, "TIF", tile_id)
os.makedirs(unzip_dir, exist_ok=True)
os.makedirs(output_dir, exist_ok=True)
results_urls_file = f"{output_root_dir}\\NASADEM_{tile_id}_results_urls.json"
# 默认覆盖上一次检索记录
results_urls = earthdata_search(asset_name, roi=bbox)
with open(results_urls_file, "w") as f:
json.dump(results_urls, f)
# 构造待解压的文件列表
zip_file_list = [os.path.join(download_dir, os.path.basename(result[0])) for result in results_urls]
# 配置日志
logging.basicConfig(
level=logging.INFO,
format="%(levelname)s:%(asctime)s ||| %(message)s",
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler(f"{output_root_dir}\\NASADEM_SuPER.log"),
],
)
logging.info(f"Found {len(results_urls)} NASADEM granules.")
client = dask.distributed.Client(timeout=60, memory_limit="8GB")
client.run(setup_dask_environment)
all_start_time = time.time()
client.scatter(results_urls)
logging.info(f"Start processing NASADEM ...")
download_tasks = [
dask.delayed(download_granule)(granule_url, download_dir)
for granule_url in results_urls
]
unzip_tasks = dask.delayed(unzip_nasadem_files)(zip_file_list, unzip_dir)
process_tasks = [
dask.delayed(process_granule)(
unzip_dir, output_dir, name, region, True, tile_id
)
for name in ["elevation", "slope", "aspect"]
]
dask.compute(*download_tasks)
dask.compute(unzip_tasks)
dask.compute(*process_tasks)
client.close()
all_total_time = time.time() - all_start_time
logging.info(
f"All NASADEM Downloading complete and proccessed. Total time: {all_total_time} seconds"
)
if __name__ == "__main__":
earthaccess.login(persist=True)
# region = gpd.read_file("./data/vectors/wuling_guanqu_polygon.geojson")
tile_id = "49REL"
region = gpd.read_file(f"./data/vectors/{tile_id}.geojson")
asset_name = ["NASADEM_HGT", "NASADEM_SC"]
main(region, asset_name, tile_id)