191 lines
6.2 KiB
Python

# -*- coding: utf-8 -*-
"""
===============================================================================
This module contains functions related to preprocessing GPM data.
For GPM Daily
Step1: Use earthaccess search and download GPM IMERG Data
- GPM_3IMERGDL
- Precipitation 1 day 0.1 degree x 0.1 degree
- https://disc.gsfc.nasa.gov/datasets/GPM_3IMERGDL_07/summary
Step2: Process GPM data
- 下载的 GPM 均为 *.nc 文件, 需要转换为 *.tif 文件
-------------------------------------------------------------------------------
Authors: Hong Xie
Last Updated: 2025-07-07
===============================================================================
"""
import os
import sys
import json
import time
from datetime import datetime
import dask.distributed
import logging
import earthaccess
from xarray import open_dataset
# 动态获取项目根目录路径
project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(project_root)
from utils.common_utils import setup_dask_environment
from HLS_SuPER.HLS_Su import earthdata_search
def convert(source_nc_path: str, target_tif_path: str) -> None:
"""
Converts a GPM netCDF4 file to a geotiff file.
Parameters
----------
source_nc_path: str
Path to the source netCDF4 file.
target_tif_path: str
Path to the target geotiff file.
"""
# 读取 netCDF4 数据
gpm = open_dataset(source_nc_path)
gpm_preci = (
gpm["precipitation"]
.squeeze("time", drop=True)
.transpose("lat", "lon")
.rename({"lat": "y", "lon": "x"})
)
gpm_preci.attrs = {**gpm["precipitation"].attrs.copy(), **gpm.attrs.copy()}
# 先指定地理坐标再导出数据
gpm_preci.rio.write_crs("EPSG:4326", inplace=True)
gpm_preci.rio.to_raster(target_tif_path, driver="COG", compress="DEFLATE")
return
def process_granule(
granule_urls: list[str],
download_dir: str,
output_dir: str,
asset_name: str,
) -> bool:
"""
下载 GPM netCDF4 数据并进行预处理, 读取其中的降水量数据并保存为 COG 格式
Parameters
----------
granule_urls: list[str]
下载的 GPM netCDF4 数据的 URL 列表
download_dir: str
下载根目录
output_dir: str
输出根目录
asset_name: str
资产名称
Returns
-------
process_state: bool
处理状态 True or False
"""
# 3B-DAY-L.MS.MRG.3IMERG.20240301-S000000-E235959.V07B.nc4
download_nc_name = os.path.basename(granule_urls[0])
# 读取日期并格式化为 YYYYDOY
date = download_nc_name.split(".")[4].split("-")[0]
date = datetime.strptime(date, "%Y%m%d").strftime("%Y%j")
download_file = os.path.join(download_dir, download_nc_name)
out_tif_name = f"GPM.{asset_name}.global.{date}.precip.tif"
output_file = os.path.join(output_dir, out_tif_name)
if not os.path.isfile(output_file):
if not os.path.isfile(download_file):
# Step1: 下载 GPM netCDF4 文件
try:
earthaccess.download(granule_urls, download_dir)
logging.info(f"Downloaded {download_nc_name} to {download_dir}.")
except Exception as e:
logging.error(f"Error downloading {download_nc_name}: {e}")
return False
else:
logging.info(f"{download_nc_name} already exists. Skipping download.")
try:
# Step2: 转换为 COG 格式
convert(download_file, output_file)
except Exception as e:
if "NetCDF: HDF error" in str(e) or "did not find a match" in str(e):
os.remove(download_file)
logging.info(
f"Removed corrupted file {download_file}. Retrying download."
)
process_granule(granule_urls, download_dir, output_dir, asset_name)
else:
logging.error(f"Error processing files in {download_file}: {e}")
return False
logging.info(f"Processed {output_file} successfully.")
else:
logging.warning(f"{output_file} already exists. Skipping.")
return True
def main(
output_root_dir: str, asset_name: str, year: str | int, dates: tuple[str, str]
):
results_urls = []
year_results_dir = os.path.join(output_root_dir, asset_name, str(year))
# 放置下载的 netCDF4 文件
download_dir = os.path.join(year_results_dir, "NC4")
# 放置预处理后的文件
output_dir = os.path.join(year_results_dir, "TIF")
os.makedirs(download_dir, exist_ok=True)
os.makedirs(output_dir, exist_ok=True)
results_urls_file = f"{year_results_dir}\\{asset_name}_{year}_results_urls.json"
time_range = (f"{year}-{dates[0]}T00:00:00", f"{year}-{dates[1]}T23:59:59")
if not os.path.isfile(results_urls_file):
results_urls = earthdata_search([asset_name], time_range)
with open(results_urls_file, "w") as f:
json.dump(results_urls, f)
else:
results_urls = json.load(open(results_urls_file))
# 配置日志
log_file = os.path.join(year_results_dir, f"{asset_name}_{year}_SuPER.log")
logging.basicConfig(
level=logging.INFO,
format="%(levelname)s:%(asctime)s ||| %(message)s",
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler(log_file),
],
)
logging.info(f"Found {len(results_urls)} {asset_name} granules.")
client = dask.distributed.Client(timeout=60, memory_limit="8GB")
client.run(setup_dask_environment)
all_start_time = time.time()
client.scatter(results_urls)
logging.info(f"Start processing {asset_name} ...")
process_tasks = [
dask.delayed(process_granule)(granule_url, download_dir, output_dir, asset_name)
for granule_url in results_urls
]
dask.compute(*process_tasks)
client.close()
all_total_time = time.time() - all_start_time
logging.info(
f"All {asset_name} Downloading complete and proccessed. Total time: {all_total_time:.2f} s."
)
if __name__ == "__main__":
earthaccess.login(persist=True)
output_root_dir = ".\\data\\GPM"
asset_name = "GPM_3IMERGDL"
year = 2024
dates = ("03-01", "10-31")
main(output_root_dir, asset_name, year, dates)