# -*- coding: utf-8 -*- """ =============================================================================== This module contains functions related to preprocessing SMAP data. For example, SMAP L3, SMAP L4. Reference: - https://github.com/nsidc/smap_python_notebooks/blob/main/notebooks/2.0%20Read%20and%20Plot%20SMAP%20data.ipynb - https://github.com/google/earthengine-catalog/blob/main/pipelines/smap_convert_l4.py - https://github.com/arthur-e/pyl4c - https://hdfeos.org/zoo/NSIDC/SMAP_L4_SM_gph_20200915T193000_Vv7032_001.h5.py ------------------------------------------------------------------------------- Authors: Hong Xie Last Updated: 2025-07-04 =============================================================================== """ import os import sys import json import time from datetime import datetime from affine import Affine import dask.distributed import logging import earthaccess import h5py from osgeo import gdal import xarray as xr # 动态获取项目根目录路径 project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.append(project_root) from utils.common_params import EASE2_GRID_PARAMS, EPSG from utils.common_utils import ( array_to_raster, load_band_as_arr, reproject_image, setup_dask_environment, ) from HLS_SuPER.HLS_Su import earthdata_search def convert(source_h5_path: str, target_tif_path: str, date: str) -> None: """ Converts a SMAP L4 HDF file to a geotiff file. reference: https://github.com/google/earthengine-catalog/blob/main/pipelines/smap_convert_l4.py args: source_h5_path (str): Path to the source HDF file. target_tif_path (str): Path to the target GeoTiff file. date (str): Date of the data in YYYYDOY format. """ base_path = os.path.dirname(source_h5_path) tmp_path = os.path.join(base_path, "tmp") os.makedirs(tmp_path, exist_ok=True) # Options for gdal_translate translate_options = gdal.TranslateOptions( format="GTiff", outputSRS="+proj=cea +lon_0=0 +lat_ts=30 +ellps=WGS84 +units=m", outputBounds=[-17367530.45, 7314540.11, 17367530.45, -7314540.11], ) # array_to_raster params gt = EASE2_GRID_PARAMS["M09"]["geotransform"] wkt = EPSG[6933] # initialize temp tiff list tif_list = [] # convert individual variables to separate GeoTiff files VAR_LIST = ["sm_surface"] sizeoflist = len(VAR_LIST) for iband in range(0, sizeoflist): var = VAR_LIST[iband] sds_array = load_band_as_arr( "HDF5:" + source_h5_path + "://Geophysical_Data/" + var ) dst_tmp = os.path.join(tmp_path, f"{date}_{str(iband + 1)}.tif") sds_gdal = array_to_raster(sds_array, gt, wkt) ds = gdal.Translate(dst_tmp, sds_gdal, options=translate_options) ds = None tif_list.append(dst_tmp) # build a VRT(Virtual Dataset) that includes the list of input tif files tmp_vrt = os.path.join(tmp_path, f"{date}_tmp.vrt") gdal.BuildVRT(tmp_vrt, tif_list, options="-separate") warp_options = gdal.WarpOptions( creationOptions=["COMPRESS=DEFLATE"], srcSRS="EPSG:6933", dstSRS="EPSG:4326", dstNodata=-9999, ) # run gdal_Warp to reproject the VRT data and save in the target tif file with # compression ds = gdal.Warp(target_tif_path, tmp_vrt, options=warp_options) ds = None # remove temporary files for f in [tmp_vrt] + tif_list: if os.path.exists(f): os.remove(f) return def open_smap(file_path, asset_name): """ [Deprecated] 打开 *.h5 格式的 SMAP HDF 数据, 并将其转换为 WGS84 坐标系下的 xarray.DataArray 格式. """ if asset_name == "SPL4SMGP": ds = h5py.File(file_path, "r") sm_data = ds["Geophysical_Data"]["sm_surface"][:, :] lat = ds["cell_lat"][:, 0] lon = ds["cell_lon"][0, :] smap_ratser = xr.DataArray( data=sm_data, coords={ "y": lat, "x": lon, }, dims=["y", "x"], name="sm_surface", attrs=ds.attrs, ) # 添加CRS定义, SMAP L4 数据使用EASE-Grid 2.0 9km Grid (EPSG:6933) crs_str = "EPSG:6933" # SMAP L4 使用的 EPSG:6933 的 Transform 实际为 (-17367530.45, 9000, 0, 7314540.83, 0, -9000) transform = Affine.from_gdal(*(-17367530.45, 9000, 0, 7314540.83, 0, -9000)) smap_ratser.rio.write_transform(transform, inplace=True) smap_ratser.rio.write_crs(crs_str, inplace=True) smap_ratser = reproject_image(smap_ratser, "EPSG:4326") return smap_ratser def process_granule( granule_urls: list[str], output_dir: str, asset_name: str, ): # SMAP_L4_SM_gph_20240601T013000_Vv7031_001.h5 download_hdf_name = granule_urls[0].split("/")[-1] # 获取名称与日期, 原 date 格式为 YYYYMMDD file_name_part = download_hdf_name.split("_") date = file_name_part[4].split("T")[0] # 将日期格式化为 YYYYDOY date = datetime.strptime(date, "%Y%m%d").strftime("%Y%j") download_path = os.path.join(output_dir, "HDF") os.makedirs(download_path, exist_ok=True) download_file = os.path.join(download_path, download_hdf_name) out_tif_name = f"SMAP.{asset_name}.global.{date}.SM.tif" output_path = os.path.join(output_dir, "TIF") os.makedirs(output_path, exist_ok=True) output_file = os.path.join(output_path, out_tif_name) if not os.path.isfile(output_file): # Step1: 下载 HDF 文件 if not os.path.isfile(download_file): try: earthaccess.download(granule_urls, download_path) except Exception as e: logging.error(f"Error downloading {download_file}: {e}") return else: logging.warning(f"{download_file} already exists. Skipping.") # Step2: 处理 HDF 文件 try: # smap = open_smap(download_file, asset_name) convert(download_file, output_file, date) except Exception as e: if 'does not exist in the file system, and is not recognized as a supported dataset name' in str(e): os.remove(download_file) logging.info(f"Removed corrupted file {download_file}. Retrying download.") process_granule(granule_urls, output_dir, asset_name) else: logging.error(f"Error translating {download_file}: {e}") return logging.info(f"Processed {output_file} successfully.") else: logging.warning(f"{output_file} already exists. Skipping.") def main(output_root_dir: str, asset_name: str, years: list[str | int], dates: tuple[str, str], hours: tuple[str, str]): results_urls = [] output_dir = os.path.join(output_root_dir, asset_name) os.makedirs(output_dir, exist_ok=True) results_urls_file = os.path.join(output_dir, f"{asset_name}_results_urls.json") for year in years: year_results_dir = os.path.join(output_dir, year) os.makedirs(year_results_dir, exist_ok=True) year_results_file = os.path.join(year_results_dir, f"{asset_name}_{year}_results_urls.json") year_temporal = (f"{year}-{dates[0]}T00:00:00", f"{year}-{dates[1]}T23:59:59") year_results = earthdata_search([asset_name], year_temporal, hours=hours) results_urls.extend(year_results) with open(year_results_file, "w") as f: json.dump(year_results, f) with open(results_urls_file, "w") as f: json.dump(results_urls, f) # 配置日志 logging.basicConfig( level=logging.INFO, format="%(levelname)s:%(asctime)s ||| %(message)s", handlers=[ logging.StreamHandler(sys.stdout), logging.FileHandler(os.path.join(output_dir, f"{asset_name}_SuPER.log")), ], ) client = dask.distributed.Client(timeout=60, memory_limit="8GB") client.run(setup_dask_environment) all_start_time = time.time() for year in years: year_results_dir = os.path.join(output_dir, year) year_results_file = os.path.join(year_results_dir, f"{asset_name}_{year}_results_urls.json") year_results = json.load(open(year_results_file)) client.scatter(year_results) start_time = time.time() logging.info(f"Start {year}...") tasks = [ dask.delayed(process_granule)( granule_url, output_dir=year_results_dir, asset_name=asset_name, ) for granule_url in year_results ] dask.compute(*tasks) total_time = time.time() - start_time logging.info( f"{year} SMAP {asset_name} Downloading complete and proccessed. Total time: {total_time} seconds" ) client.close() all_total_time = time.time() - all_start_time logging.info( f"All SMAP {asset_name} Downloading complete and proccessed. Total time: {all_total_time} seconds" ) if __name__ == "__main__": earthaccess.login(persist=True) output_root_dir = ".\\data\\SMAP" asset_name = "SPL4SMGP" # 示例文件名称: SMAP_L4_SM_gph_20240601T013000_Vv7031_001.h5 years = ["2024", "2023", "2022"] dates = ("03-01", "10-31") # hours = ("03:00:00", "06:00:00") hours = ("09:00:00", "12:00:00") main(output_root_dir, asset_name, years, dates, hours)