260 lines
9.3 KiB
Python

# -*- coding: utf-8 -*-
"""
===============================================================================
This module contains functions related to preprocessing SMAP data.
For example, SMAP L3, SMAP L4.
Reference:
- https://github.com/nsidc/smap_python_notebooks/blob/main/notebooks/2.0%20Read%20and%20Plot%20SMAP%20data.ipynb
- https://github.com/google/earthengine-catalog/blob/main/pipelines/smap_convert_l4.py
- https://github.com/arthur-e/pyl4c
- https://hdfeos.org/zoo/NSIDC/SMAP_L4_SM_gph_20200915T193000_Vv7032_001.h5.py
-------------------------------------------------------------------------------
Authors: Hong Xie
Last Updated: 2025-07-04
===============================================================================
"""
import os
import sys
import json
import time
from datetime import datetime
from affine import Affine
import dask.distributed
import logging
import earthaccess
import h5py
from osgeo import gdal
import xarray as xr
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from utils.common_params import EASE2_GRID_PARAMS, EPSG
from utils.common_utils import (
array_to_raster,
load_band_as_arr,
reproject_image,
setup_dask_environment,
)
from HLS_SuPER.HLS_Su import earthdata_search
def convert(source_h5_path: str, target_tif_path: str, date: str) -> None:
"""
Converts a SMAP L4 HDF file to a geotiff file.
reference: https://github.com/google/earthengine-catalog/blob/main/pipelines/smap_convert_l4.py
args:
source_h5_path (str): Path to the source HDF file.
target_tif_path (str): Path to the target GeoTiff file.
date (str): Date of the data in YYYYDOY format.
"""
base_path = os.path.dirname(source_h5_path)
tmp_path = os.path.join(base_path, "tmp")
os.makedirs(tmp_path, exist_ok=True)
# Options for gdal_translate
translate_options = gdal.TranslateOptions(
format="GTiff",
outputSRS="+proj=cea +lon_0=0 +lat_ts=30 +ellps=WGS84 +units=m",
outputBounds=[-17367530.45, 7314540.11, 17367530.45, -7314540.11],
)
# array_to_raster params
gt = EASE2_GRID_PARAMS["M09"]["geotransform"]
wkt = EPSG[6933]
# initialize temp tiff list
tif_list = []
# convert individual variables to separate GeoTiff files
VAR_LIST = ["sm_surface"]
sizeoflist = len(VAR_LIST)
for iband in range(0, sizeoflist):
var = VAR_LIST[iband]
sds_array = load_band_as_arr(
"HDF5:" + source_h5_path + "://Geophysical_Data/" + var
)
dst_tmp = os.path.join(tmp_path, f"{date}_{str(iband + 1)}.tif")
sds_gdal = array_to_raster(sds_array, gt, wkt)
ds = gdal.Translate(dst_tmp, sds_gdal, options=translate_options)
ds = None
tif_list.append(dst_tmp)
# build a VRT(Virtual Dataset) that includes the list of input tif files
tmp_vrt = os.path.join(tmp_path, f"{date}_tmp.vrt")
gdal.BuildVRT(tmp_vrt, tif_list, options="-separate")
warp_options = gdal.WarpOptions(
creationOptions=["COMPRESS=DEFLATE"],
srcSRS="EPSG:6933",
dstSRS="EPSG:4326",
dstNodata=-9999,
)
# run gdal_Warp to reproject the VRT data and save in the target tif file with
# compression
ds = gdal.Warp(target_tif_path, tmp_vrt, options=warp_options)
ds = None
# remove temporary files
for f in [tmp_vrt] + tif_list:
if os.path.exists(f):
os.remove(f)
return
def open_smap(file_path, asset_name):
"""
[Deprecated]
打开 *.h5 格式的 SMAP HDF 数据, 并将其转换为 WGS84 坐标系下的 xarray.DataArray 格式.
"""
if asset_name == "SPL4SMGP":
ds = h5py.File(file_path, "r")
sm_data = ds["Geophysical_Data"]["sm_surface"][:, :]
lat = ds["cell_lat"][:, 0]
lon = ds["cell_lon"][0, :]
smap_ratser = xr.DataArray(
data=sm_data,
coords={
"y": lat,
"x": lon,
},
dims=["y", "x"],
name="sm_surface",
attrs=ds.attrs,
)
# 添加CRS定义, SMAP L4 数据使用EASE-Grid 2.0 9km Grid (EPSG:6933)
crs_str = "EPSG:6933"
# SMAP L4 使用的 EPSG:6933 的 Transform 实际为 (-17367530.45, 9000, 0, 7314540.83, 0, -9000)
transform = Affine.from_gdal(*(-17367530.45, 9000, 0, 7314540.83, 0, -9000))
smap_ratser.rio.write_transform(transform, inplace=True)
smap_ratser.rio.write_crs(crs_str, inplace=True)
smap_ratser = reproject_image(smap_ratser, "EPSG:4326")
return smap_ratser
def process_granule(
granule_urls: list[str],
output_dir: str,
asset_name: str,
):
# SMAP_L4_SM_gph_20240601T013000_Vv7031_001.h5
download_hdf_name = granule_urls[0].split("/")[-1]
# 获取名称与日期, 原 date 格式为 YYYYMMDD
file_name_part = download_hdf_name.split("_")
date = file_name_part[4].split("T")[0]
# 将日期格式化为 YYYYDOY
date = datetime.strptime(date, "%Y%m%d").strftime("%Y%j")
download_path = os.path.join(output_dir, "HDF")
os.makedirs(download_path, exist_ok=True)
download_file = os.path.join(download_path, download_hdf_name)
out_tif_name = f"SMAP.{asset_name}.global.{date}.SM.tif"
output_path = os.path.join(output_dir, "TIF")
os.makedirs(output_path, exist_ok=True)
output_file = os.path.join(output_path, out_tif_name)
if not os.path.isfile(output_file):
# Step1: 下载 HDF 文件
if not os.path.isfile(download_file):
try:
earthaccess.download(granule_urls, download_path)
except Exception as e:
logging.error(f"Error downloading {download_file}: {e}")
return
else:
logging.warning(f"{download_file} already exists. Skipping.")
# Step2: 处理 HDF 文件
try:
# smap = open_smap(download_file, asset_name)
convert(download_file, output_file, date)
except Exception as e:
if 'does not exist in the file system, and is not recognized as a supported dataset name' in str(e):
os.remove(download_file)
logging.info(f"Removed corrupted file {download_file}. Retrying download.")
process_granule(granule_urls, output_dir, asset_name)
else:
logging.error(f"Error translating {download_file}: {e}")
return
logging.info(f"Processed {output_file} successfully.")
else:
logging.warning(f"{output_file} already exists. Skipping.")
def main(output_root_dir: str, asset_name: str, years: list[str | int], dates: tuple[str, str], hours: tuple[str, str]):
results_urls = []
output_dir = os.path.join(output_root_dir, asset_name)
os.makedirs(output_dir, exist_ok=True)
results_urls_file = os.path.join(output_dir, f"{asset_name}_results_urls.json")
for year in years:
year_results_dir = os.path.join(output_dir, year)
os.makedirs(year_results_dir, exist_ok=True)
year_results_file = os.path.join(year_results_dir, f"{asset_name}_{year}_results_urls.json")
year_temporal = (f"{year}-{dates[0]}T00:00:00", f"{year}-{dates[1]}T23:59:59")
year_results = earthdata_search([asset_name], year_temporal, hours=hours)
results_urls.extend(year_results)
with open(year_results_file, "w") as f:
json.dump(year_results, f)
with open(results_urls_file, "w") as f:
json.dump(results_urls, f)
# 配置日志
logging.basicConfig(
level=logging.INFO,
format="%(levelname)s:%(asctime)s ||| %(message)s",
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler(os.path.join(output_dir, f"{asset_name}_SuPER.log")),
],
)
client = dask.distributed.Client(timeout=60, memory_limit="8GB")
client.run(setup_dask_environment)
all_start_time = time.time()
for year in years:
year_results_dir = os.path.join(output_dir, year)
year_results_file = os.path.join(year_results_dir, f"{asset_name}_{year}_results_urls.json")
year_results = json.load(open(year_results_file))
client.scatter(year_results)
start_time = time.time()
logging.info(f"Start {year}...")
tasks = [
dask.delayed(process_granule)(
granule_url,
output_dir=year_results_dir,
asset_name=asset_name,
)
for granule_url in year_results
]
dask.compute(*tasks)
total_time = time.time() - start_time
logging.info(
f"{year} SMAP {asset_name} Downloading complete and proccessed. Total time: {total_time} seconds"
)
client.close()
all_total_time = time.time() - all_start_time
logging.info(
f"All SMAP {asset_name} Downloading complete and proccessed. Total time: {all_total_time} seconds"
)
if __name__ == "__main__":
earthaccess.login(persist=True)
output_root_dir = ".\\data\\SMAP"
asset_name = "SPL4SMGP"
# 示例文件名称: SMAP_L4_SM_gph_20240601T013000_Vv7031_001.h5
years = ["2024", "2023", "2022"]
dates = ("03-01", "10-31")
# hours = ("03:00:00", "06:00:00")
hours = ("09:00:00", "12:00:00")
main(output_root_dir, asset_name, years, dates, hours)