xhong 5e96d0e026 refactor: 重构项目目录,添加全套遥感数据处理模块
- 重构项目目录结构,将零散代码整理到src目录下,创建各子包初始化文件
- 包括HLS、GPM、SMAP、MODIS等多源遥感数据的搜索、下载与预处理脚本
- 包括通用工具函数库,包含坐标参数定义、格式转换、影像处理等工具
- 同步更新README.md示例命令路径,修正脚本调用的目录错误
- 新增AGENTS.md文档说明开发环境配置
- 更新.gitignore规则,添加环境变量和构建缓存的忽略配置
2026-05-18 19:09:07 +08:00

352 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
"""
===============================================================================
This module contains functions related to preprocessing MODIS data.
For example, MCD43A3, MCD43A4, MOD11A1.
-------------------------------------------------------------------------------
Authors: Hong Xie
Last Updated: 2025-10-16
===============================================================================
"""
import os
import sys
import json
import time
import logging
from pathlib import Path
import earthaccess
import rioxarray as rxr
import dask.distributed
import geopandas as gpd
# 添加项目根目录到 sys.path确保作为独立脚本运行时也能正确导入
sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
from src.utils.common_utils import (
clip_image,
reproject_image,
setup_dask_environment,
setup_logging,
)
from src.HLS_SuPER.HLS_Su import earthdata_search
def open_mcd43a4(file_path):
"""
Open MODIS MCD43A4 EOS-HDF4 file.
"""
# MCD43A4 影像中有 14 个波段, 仅需加载其中的 6 个核心波段
# 排除其他波段, 如第 5 个波段的水汽波段
bands = [
"Nadir_Reflectance_Band1",
"Nadir_Reflectance_Band2",
"Nadir_Reflectance_Band3",
"Nadir_Reflectance_Band4",
"Nadir_Reflectance_Band6",
"Nadir_Reflectance_Band7",
]
# 原始波段名称到波段名称的映射
bands_mapping = {
"Nadir_Reflectance_Band1": "BLUE",
"Nadir_Reflectance_Band2": "NIR",
"Nadir_Reflectance_Band3": "RED",
"Nadir_Reflectance_Band4": "GREEN",
"Nadir_Reflectance_Band6": "SWIR1",
"Nadir_Reflectance_Band7": "SWIR2",
}
# 波段顺序
sorted_bands = ["BLUE", "GREEN", "RED", "NIR", "SWIR1", "SWIR2"]
# 打开 MODIS MCD43A4 HDF4 文件
mcd43a4_bands = (
rxr.open_rasterio(file_path, variable=bands, masked=True)
.squeeze("band", drop=True)
.rename(bands_mapping)
)
# 对波段进行排序, 这是 xarray 的特殊写法, 可以直接对 Data variables 进行排序
mcd43a4_bands = mcd43a4_bands[sorted_bands]
return mcd43a4_bands
def open_mcd43a3(file_path):
"""
Open MODIS MCD43A3 EOS-HDF4 file.
"""
# MCD43A3 影像中有 14 个波段, 仅需加载其中的 6 个核心波段
bands = [
"MOD_Grid_BRDF:Albedo_BSA_Band1",
"MOD_Grid_BRDF:Albedo_BSA_Band2",
"MOD_Grid_BRDF:Albedo_BSA_Band3",
"MOD_Grid_BRDF:Albedo_BSA_Band4",
"MOD_Grid_BRDF:Albedo_BSA_Band6",
"MOD_Grid_BRDF:Albedo_BSA_Band7",
]
mcd43a3_bands = rxr.open_rasterio(file_path, variable=bands, masked=True).squeeze(
"band", drop=True
)
return mcd43a3_bands
def open_mod11a1(file_path):
"""
Open MODIS MOD11A1 EOS-HDF4 file.
"""
# MOD11A1 影像中有 12 个波段, 仅提取出日间温度波段
bands = ["LST_Day_1km"]
# 原始波段名称到波段名称的映射
bands_mapping = {"LST_Day_1km": "LST"}
# 打开 MODIS MOD11A1 HDF4 文件
mod11a1_bands = (
rxr.open_rasterio(file_path, variable=bands, masked=True)
.squeeze("band", drop=True)
.rename(bands_mapping)
)
return mod11a1_bands
def open_modis(file_path, prod_name):
"""
Open MODIS EOS-HDF4 file.
"""
if prod_name == "MOD11A1":
return open_mod11a1(file_path)
elif prod_name == "MCD43A3":
return open_mcd43a3(file_path)
elif prod_name == "MCD43A4":
return open_mcd43a4(file_path)
else:
raise ValueError(f"Unknown MODIS product: {prod_name}.")
def process_modis(
download_file,
prod_name,
roi,
clip=True,
scale=True,
target_crs=None,
output_file=None,
):
"""
对 MODIS 数据进行预处理, 包括裁剪, 重投影和缩放.
"""
modis = open_modis(download_file, prod_name)
if roi is not None and clip:
modis = clip_image(modis, roi)
if target_crs is not None and modis is not None:
modis = reproject_image(modis, target_crs)
# 重投影后再裁剪一次
if roi is not None and clip:
modis = clip_image(modis, roi)
if modis.isnull().all():
logging.error(f"Processing {download_file}. Roi area all pixels are nodata.")
if scale:
# 缩放计算后会丢源属性和坐标系, 需要先备份源数据属性信息
org_attrs = modis.attrs
if prod_name == "MOD11A1":
# MOD11A1 数据的温度波段单位为 K, 缩放因子为 0.02, 需要转换为摄氏度
modis = modis * 0.02 - 273.15
elif prod_name == "MCD43A3":
modis = modis * 0.001
elif prod_name == "MCD43A4":
modis = modis * 0.0001
# 恢复源数据属性信息
modis.attrs = org_attrs.copy()
modis.rio.write_crs(target_crs, inplace=True)
modis.attrs["scale_factor"] = 1
else:
if prod_name == "MOD11A1":
modis.attrs["scale_factor"] = 0.02
elif prod_name == "MCD43A3":
modis.attrs["scale_factor"] = 0.001
elif prod_name == "MCD43A4":
modis.attrs["scale_factor"] = 0.0001
modis.rio.to_raster(output_file, compress="DEFLATE")
return
def process_granule(
granule_urls,
roi,
clip,
scale,
output_dir,
tile_id=None,
target_crs="EPSG:4326",
):
download_hdf_name = os.path.basename(granule_urls[0])
# 获取名称与日期
file_name_part = download_hdf_name.split(".")
date = file_name_part[1][1:]
prod_name = file_name_part[0]
download_path = os.path.join(output_dir, "HDF")
os.makedirs(download_path, exist_ok=True)
download_file = os.path.join(download_path, download_hdf_name)
if prod_name == "MOD11A1":
out_tif_name = f"MODIS.{prod_name}.{tile_id}.{date}.LST.tif"
elif prod_name == "MCD43A3":
out_tif_name = f"MODIS.{prod_name}.{tile_id}.{date}.Albedo.tif"
elif prod_name == "MCD43A4":
out_tif_name = f"MODIS.{prod_name}.{tile_id}.{date}.NBRDF.tif"
else:
out_tif_name = download_hdf_name.replace(".hdf", ".tif")
# 除 MCD43A4 需用于光谱指数计算外, MOD11A1 日间温度与 MCD43A3 反照率无需再按日期归档
if prod_name == "MOD11A1" or prod_name == "MCD43A3":
output_path = os.path.join(output_dir, "TIF")
else:
output_path = os.path.join(output_dir, "TIF", date)
os.makedirs(output_path, exist_ok=True)
output_file = os.path.join(output_path, out_tif_name)
# Step1: 下载 HDF 文件
if not os.path.isfile(download_file):
try:
earthaccess.download(granule_urls, download_path)
except Exception as e:
logging.error(f"Error downloading {download_file}: {e}")
return
else:
logging.warning(f"{download_file} already exists. Skipping.")
# Step2: 处理 HDF 文件
if not os.path.isfile(output_file):
try:
process_modis(
download_file, prod_name, roi, clip, scale, target_crs, output_file
)
logging.info(f"Processed {output_file} successfully.")
except Exception as e:
os.remove(download_file)
logging.info(f"Removed corrupted file {download_file}. Retrying download.")
process_granule(
granule_urls, roi, clip, scale, output_dir, target_crs, tile_id
)
else:
logging.warning(f"{output_file} already exists. Skipping.")
def main(
region_file: Path,
asset_name: str,
modis_tile_id: str,
years: list,
dates: tuple[str, str],
tile_id: str,
target_crs: str,
output_root_dir: str,
):
region = gpd.read_file(region_file)
results_urls = []
output_dir = os.path.join(output_root_dir, asset_name)
os.makedirs(output_dir, exist_ok=True)
results_urls_file = os.path.join(
output_dir, f"{asset_name}_{modis_tile_id}_results_urls.json"
)
for year in years:
year_results_dir = os.path.join(output_dir, year)
os.makedirs(year_results_dir, exist_ok=True)
year_results_file = os.path.join(
year_results_dir, f"{asset_name}_{modis_tile_id}_{year}_results_urls.json"
)
year_temporal = (f"{year}-{dates[0]}T00:00:00", f"{year}-{dates[1]}T23:59:59")
year_results = earthdata_search(
[asset_name], year_temporal, region_file, modis_tile_id
)
results_urls.extend(year_results)
with open(year_results_file, "w") as f:
json.dump(year_results, f)
with open(results_urls_file, "w") as f:
json.dump(results_urls, f)
client = dask.distributed.Client(timeout=60, memory_limit="8GB")
client.run(setup_dask_environment)
client.run(setup_logging)
all_start_time = time.time()
for year in years:
year_results_dir = os.path.join(output_dir, year)
year_results_file = os.path.join(
year_results_dir, f"{asset_name}_{modis_tile_id}_{year}_results_urls.json"
)
year_results = json.load(open(year_results_file))
# 配置主进程日志
logs_file = os.path.join(
year_results_dir, f"{asset_name}_{tile_id}_{year}_SuPER.log"
)
setup_logging(logs_file)
client.scatter(year_results)
start_time = time.time()
logging.info(f"Start {year}...")
tasks = [
dask.delayed(process_granule)(
granule_url,
roi=region,
clip=True,
scale=True,
output_dir=year_results_dir,
tile_id=tile_id,
target_crs=target_crs,
)
for granule_url in year_results
]
dask.compute(*tasks)
total_time = time.time() - start_time
# Dask任务结束后读取dask_worker.txt日志文件内容, 并注入到logs_file中
with open(logs_file, "a") as f:
with open("dask_worker.log", "r") as f2:
f.write(f2.read())
# 随后清空dask_worker.txt文件
with open("dask_worker.log", "w") as f:
f.write("")
logging.info(
f"{year} MODIS {asset_name} Downloading complete and proccessed. Total time: {total_time} seconds"
)
client.close()
all_total_time = time.time() - all_start_time
logging.info(
f"All MODIS {asset_name} Downloading complete and proccessed. Total time: {all_total_time} seconds"
)
# 最后删除dask_worker.log文件
os.remove("dask_worker.log")
return
if __name__ == "__main__":
earthaccess.login(persist=True)
# region = gpd.read_file("./data/vectors/wuling_guanqu_polygon.geojson")
tile_id = "49REL"
region_file = f"./data/vectors/{tile_id}.geojson"
target_crs = "EPSG:32649"
asset_name = "MOD11A1"
# asset_name = "MCD43A3"
# asset_name = "MCD43A4"
modis_tile_id = "h27v06"
# 示例文件名称: MCD43A4.A2024001.h27v05.061.2024010140610.hdf
years = ["2024"]
dates = ("03-01", "10-31")
output_root_dir = ".\\data\\MODIS\\"
main(
region_file,
asset_name,
modis_tile_id,
years,
dates,
tile_id,
target_crs,
output_root_dir,
)