# -*- coding: utf-8 -*- """ =============================================================================== This module contains functions related to preprocessing GPM data. For GPM Daily Step1: Use earthaccess search and download GPM IMERG Data - GPM_3IMERGDL - Precipitation 1 day 0.1 degree x 0.1 degree - https://disc.gsfc.nasa.gov/datasets/GPM_3IMERGDL_07/summary Step2: Process GPM data - 下载的 GPM 均为 *.nc 文件, 需要转换为 *.tif 文件 ------------------------------------------------------------------------------- Authors: Hong Xie Last Updated: 2025-07-07 =============================================================================== """ import os import sys import json import time from datetime import datetime import dask.distributed import logging import earthaccess from xarray import open_dataset sys.path.append("D:/NASA_EarthData_Script") from utils.common_utils import setup_dask_environment from HLS_SuPER.HLS_Su import earthdata_search def convert(source_nc_path: str, target_tif_path: str) -> None: """ Converts a GPM netCDF4 file to a geotiff file. Parameters ---------- source_nc_path: str Path to the source netCDF4 file. target_tif_path: str Path to the target geotiff file. """ # 读取 netCDF4 数据 gpm = open_dataset(source_nc_path) gpm_preci = ( gpm["precipitation"] .squeeze("time", drop=True) .transpose("lat", "lon") .rename({"lat": "y", "lon": "x"}) ) gpm_preci.attrs = {**gpm["precipitation"].attrs.copy(), **gpm.attrs.copy()} # 先指定地理坐标再导出数据 gpm_preci.rio.write_crs("EPSG:4326", inplace=True) gpm_preci.rio.to_raster(target_tif_path, driver="COG", compress="DEFLATE") return def process_granule( granule_urls: list[str], download_dir: str, output_dir: str, asset_name: str, ) -> bool: """ 下载 GPM netCDF4 数据并进行预处理, 读取其中的降水量数据并保存为 COG 格式 Parameters ---------- granule_urls: list[str] 下载的 GPM netCDF4 数据的 URL 列表 download_dir: str 下载根目录 output_dir: str 输出根目录 asset_name: str 资产名称 Returns ------- process_state: bool 处理状态 True or False """ # 3B-DAY-L.MS.MRG.3IMERG.20240301-S000000-E235959.V07B.nc4 download_nc_name = os.path.basename(granule_urls[0]) # 读取日期并格式化为 YYYYDOY date = download_nc_name.split(".")[4].split("-")[0] date = datetime.strptime(date, "%Y%m%d").strftime("%Y%j") download_file = os.path.join(download_dir, download_nc_name) out_tif_name = f"GPM.{asset_name}.global.{date}.precip.tif" output_file = os.path.join(output_dir, out_tif_name) if not os.path.isfile(output_file): if not os.path.isfile(download_file): # Step1: 下载 GPM netCDF4 文件 try: earthaccess.download(granule_urls, download_dir) logging.info(f"Downloaded {download_nc_name} to {download_dir}.") except Exception as e: logging.error(f"Error downloading {download_nc_name}: {e}") return False else: logging.info(f"{download_nc_name} already exists. Skipping download.") try: # Step2: 转换为 COG 格式 convert(download_file, output_file) except Exception as e: if "NetCDF: HDF error" in str(e) or "did not find a match" in str(e): os.remove(download_file) logging.info( f"Removed corrupted file {download_file}. Retrying download." ) process_granule(granule_urls, download_dir, output_dir, asset_name) else: logging.error(f"Error processing files in {download_file}: {e}") return False logging.info(f"Processed {output_file} successfully.") else: logging.warning(f"{output_file} already exists. Skipping.") return True def main( output_root_dir: str, asset_name: str, year: str | int, dates: tuple[str, str] ): results_urls = [] year_results_dir = os.path.join(output_root_dir, asset_name, str(year)) # 放置下载的 netCDF4 文件 download_dir = os.path.join(year_results_dir, "NC4") # 放置预处理后的文件 output_dir = os.path.join(year_results_dir, "TIF") os.makedirs(download_dir, exist_ok=True) os.makedirs(output_dir, exist_ok=True) results_urls_file = f"{year_results_dir}\\{asset_name}_{year}_results_urls.json" time_range = (f"{year}-{dates[0]}T00:00:00", f"{year}-{dates[1]}T23:59:59") if not os.path.isfile(results_urls_file): results_urls = earthdata_search([asset_name], time_range) with open(results_urls_file, "w") as f: json.dump(results_urls, f) else: results_urls = json.load(open(results_urls_file)) # 配置日志 log_file = os.path.join(year_results_dir, f"{asset_name}_{year}_SuPER.log") logging.basicConfig( level=logging.INFO, format="%(levelname)s:%(asctime)s ||| %(message)s", handlers=[ logging.StreamHandler(sys.stdout), logging.FileHandler(log_file), ], ) logging.info(f"Found {len(results_urls)} {asset_name} granules.") client = dask.distributed.Client(timeout=60, memory_limit="8GB") client.run(setup_dask_environment) all_start_time = time.time() client.scatter(results_urls) logging.info(f"Start processing {asset_name} ...") process_tasks = [ dask.delayed(process_granule)(granule_url, download_dir, output_dir, asset_name) for granule_url in results_urls ] dask.compute(*process_tasks) client.close() all_total_time = time.time() - all_start_time logging.info( f"All {asset_name} Downloading complete and proccessed. Total time: {all_total_time:.2f} s." ) if __name__ == "__main__": earthaccess.login(persist=True) output_root_dir = ".\\data\\GPM" asset_name = "GPM_3IMERGDL" year = 2024 dates = ("03-01", "10-31") main(output_root_dir, asset_name, year, dates)