diff --git a/DATA_SuPER/DEM_SuPER.py b/DATA_SuPER/DEM_SuPER.py new file mode 100644 index 0000000..737ba0a --- /dev/null +++ b/DATA_SuPER/DEM_SuPER.py @@ -0,0 +1,270 @@ +# -*- coding: utf-8 -*- +""" +=============================================================================== +This module contains functions related to preprocessing DEM data. +For example, elevation, slope, aspect + +Step1: Use earthaccess search and download NASADEM Data +- NASADEM_HGT + - includes 30m DEM, based on SRTM data + - https://lpdaac.usgs.gov/products/nasadem_hgtv001/ +- NASADEM_SC + - includes 30m slope, aspect, based on NASADEM_HGT + - https://lpdaac.usgs.gov/products/nasadem_scv001/ + +Step2: Process DEM data +- 下载的 NASADEM 均为 *.zip 文件, 需先进行解压 +- NASADEM 文件名称结构为: NASADEM_类型_网格编号/网格编号.数据类型 + - 高程示例: NASADEM_HGT_n30e113/n30e113.hgt + - 坡度示例: NASADEM_SC_n30e113/n30e113.slope + - 坡向示例: NASADEM_SC_n30e113/n30e113.aspect +- 读取文件按网格进行裁剪并镶嵌, 坡度和坡向数据需要进行缩放处理, 将网格范围的结果保存为 *.tif 文件 + +------------------------------------------------------------------------------- +Authors: Hong Xie +Last Updated: 2025-07-03 +=============================================================================== +""" + +import os +import sys +import glob +import json +import zipfile +import time +import dask.distributed +import logging +import earthaccess +import geopandas as gpd +import numpy as np +from rioxarray import open_rasterio + +sys.path.append("D:/NASA_EarthData_Script") + +from utils.common_utils import setup_dask_environment, clip_image, mosaic_images +from HLS_SuPER.HLS_Su import earthdata_search + + +def reorganize_nasadem_urls(dem_results_urls: list): + """ + 重组 NASADEM 下载链接 + + 将同一格网内的高程, 坡度坡向数据链接进行组合 + + Parameters + ---------- + dem_results_urls: list + 查询返回的 NASADEM 数据 URL 列表 + + Returns + ------- + grouped_results_urls: list + 重组后的 NASADEM 数据 URL 列表 + """ + tile_ids = [] + for granule in dem_results_urls: + tile_id = granule[0].split("/")[-2].split("_")[-1] + tile_ids.append(tile_id) + + tile_ids = np.array(tile_ids) + # 根据瓦片ID找到对应的索引 + tile_id_indices = np.where(tile_ids == tile_id) + # 根据索引过滤结果 + return [dem_results_urls[i] for i in tile_id_indices[0]] + + +def download_granule(granule_urls: list[str], output_dir: str) -> bool: + """ + 下载单批数据 + + Parameters + ---------- + granule_urls: list + 查询返回的规范化待下载数据 URL 列表 + output_dir: str + 下载目录 + + Returns + ------- + download_state: bool + 下载状态 True or False + """ + # 检查是否已下载 + if not all( + os.path.isfile(os.path.join(output_dir, os.path.basename(url))) + for url in granule_urls + ): + try: + earthaccess.download(granule_urls, output_dir) + except Exception as e: + logging.error(f"Error downloading data: {e}. Skipping.") + return False + logging.info("All Data already downloaded.") + return True + + +def unzip_nasadem_files(zip_file_list: list[str], unzip_dir: str): + """ + 解压下载的 NASADEM ZIP 文件, 并将解压后的文件统一为可读写的 .hgt 格式 + """ + try: + for zip_path in zip_file_list: + if not zipfile.is_zipfile(zip_path): + continue + with zipfile.ZipFile(zip_path, "r") as zip_ref: + # 仅解压包含 .hgt, .slope, .aspect 的文件 + for hgt_file in [f for f in zip_ref.namelist() if f.endswith((".hgt", ".slope", ".aspect"))]: + # 解压时重命名文件 + new_name = ( + hgt_file.replace(".hgt", ".elevation.hgt") + if hgt_file.endswith(".hgt") + else f"{hgt_file}.hgt" + ) + unzip_file_path = os.path.join(unzip_dir, new_name) + if os.path.exists(unzip_file_path): + continue + with zip_ref.open(hgt_file) as source_file: + with open(unzip_file_path, 'wb') as unzip_file: + unzip_file.write(source_file.read()) + except Exception as e: + logging.error(f"Error unzipping NASADEM to {unzip_dir}: {e}") + return + + +def process_granule( + unzip_dir: str, + output_dir: str, + name: str, + roi: list, + clip=True, + tile_id: str = None, +) -> bool: + """ + 读取解压并重命名处理后的指定类型 NASADEM 数据并进行预处理, 包括读取, 裁剪, 镶嵌, 并对坡度坡向进行缩放 + + Parameters + ---------- + unzip_dir: str + 解压后的 NASADEM 文件根目录 + output_dir: str + 输出根目录 + name: str + 数据类型, 包括 elevation, slope, aspect + roi: list + 网格范围 + clip: bool + 是否裁剪 + tile_id: str + 网格编号 + + Returns + ------- + process_state: bool + 处理状态 True or False + """ + + dem_file_list = glob.glob(os.path.join(unzip_dir, f"*{name}.hgt")) + out_tif_name = f"DEM.NASADEM.{tile_id}.2000.{name}.tif" + output_file = os.path.join(output_dir, out_tif_name) + if not os.path.isfile(output_file): + try: + dem_raster_list = [] + for dem_path in dem_file_list: + dem = ( + open_rasterio(dem_path) + .squeeze(dim="band", drop=True) + .rename(name) + ) + if name == "slope" or name == "aspect": + org_attrs = dem.attrs + dem = dem * 0.01 + # 恢复源数据属性信息 + dem.attrs = org_attrs.copy() + dem.rio.write_crs("EPSG:4326", inplace=True) + dem.attrs["scale_factor"] = 1 + dem_raster_list.append(dem) + if len(dem_raster_list) > 1: + if name == "slope" or name == "aspect": + dem_mosaiced = mosaic_images(dem_raster_list, nodata=-9999) + else: + dem_mosaiced = mosaic_images(dem_raster_list, nodata=-32768) + if roi is not None and clip: + dem_mosaiced = clip_image(dem_mosaiced, roi) + dem_mosaiced.rio.to_raster(output_file, driver="COG", compress="DEFLATE") + except Exception as e: + logging.error(f"Error processing files in {name}: {e}") + return False + logging.info(f"Processed {output_file} successfully.") + else: + logging.warning(f"{output_file} already exists. Skipping.") + return True + + +def main(region: list, asset_name: list, tile_id: str): + bbox = tuple(list(region.total_bounds)) + # 示例文件名称: NASADEM_HGT_n30e113.zip + results_urls = [] + output_root_dir = ".\\data\\DEM\\NASADEM" + # 放置下载的 ZIP 文件 + download_dir = os.path.join(output_root_dir, "ZIP") + # 放置解压并预处理后的文件 + unzip_dir = os.path.join(download_dir, "UNZIP") + output_dir = os.path.join(output_root_dir, "TIF", tile_id) + os.makedirs(unzip_dir, exist_ok=True) + results_urls_file = f"{output_root_dir}\\NASADEM_{tile_id}_results_urls.json" + if not os.path.isfile(results_urls_file): + results_urls = earthdata_search(asset_name, roi=bbox) + with open(results_urls_file, "w") as f: + json.dump(results_urls, f) + else: + results_urls = json.load(open(results_urls_file)) + # 构造待解压的文件列表 + zip_file_list = [os.path.join(download_dir, os.path.basename(result[0])) for result in results_urls] + + # 配置日志 + logging.basicConfig( + level=logging.INFO, + format="%(levelname)s:%(asctime)s ||| %(message)s", + handlers=[ + logging.StreamHandler(sys.stdout), + logging.FileHandler(f"{output_root_dir}\\NASADEM_SuPER.log"), + ], + ) + logging.info(f"Found {len(results_urls)} NASADEM granules.") + + client = dask.distributed.Client(timeout=60, memory_limit="8GB") + client.run(setup_dask_environment) + all_start_time = time.time() + client.scatter(results_urls) + + logging.info(f"Start processing NASADEM ...") + download_tasks = [ + dask.delayed(download_granule)(granule_url, download_dir) + for granule_url in results_urls + ] + unzip_tasks = dask.delayed(unzip_nasadem_files)(zip_file_list, unzip_dir) + process_tasks = [ + dask.delayed(process_granule)( + unzip_dir, output_dir, name, region, True, tile_id + ) + for name in ["elevation", "slope", "aspect"] + ] + + dask.compute(*download_tasks) + dask.compute(unzip_tasks) + dask.compute(*process_tasks) + + client.close() + all_total_time = time.time() - all_start_time + logging.info( + f"All NASADEM Downloading complete and proccessed. Total time: {all_total_time} seconds" + ) + + +if __name__ == "__main__": + earthaccess.login(persist=True) + # region = gpd.read_file("./data/vectors/wuling_guanqu_polygon.geojson") + tile_id = "49REL" + region = gpd.read_file(f"./data/vectors/{tile_id}.geojson") + asset_name = ["NASADEM_HGT", "NASADEM_SC"] + main(region, asset_name, tile_id) diff --git a/DATA_SuPER/GPM_SuPER.py b/DATA_SuPER/GPM_SuPER.py new file mode 100644 index 0000000..8b28b6e --- /dev/null +++ b/DATA_SuPER/GPM_SuPER.py @@ -0,0 +1,188 @@ +# -*- coding: utf-8 -*- +""" +=============================================================================== +This module contains functions related to preprocessing GPM data. +For GPM Daily + +Step1: Use earthaccess search and download GPM IMERG Data +- GPM_3IMERGDL + - Precipitation 1 day 0.1 degree x 0.1 degree + - https://disc.gsfc.nasa.gov/datasets/GPM_3IMERGDL_07/summary + +Step2: Process GPM data +- 下载的 GPM 均为 *.nc 文件, 需要转换为 *.tif 文件 + +------------------------------------------------------------------------------- +Authors: Hong Xie +Last Updated: 2025-07-07 +=============================================================================== +""" + +import os +import sys +import json +import time +from datetime import datetime +import dask.distributed +import logging +import earthaccess +from xarray import open_dataset + +sys.path.append("D:/NASA_EarthData_Script") + +from utils.common_utils import setup_dask_environment +from HLS_SuPER.HLS_Su import earthdata_search + + +def convert(source_nc_path: str, target_tif_path: str) -> None: + """ + Converts a GPM netCDF4 file to a geotiff file. + + Parameters + ---------- + source_nc_path: str + Path to the source netCDF4 file. + target_tif_path: str + Path to the target geotiff file. + """ + # 读取 netCDF4 数据 + gpm = open_dataset(source_nc_path) + gpm_preci = ( + gpm["precipitation"] + .squeeze("time", drop=True) + .transpose("lat", "lon") + .rename({"lat": "y", "lon": "x"}) + ) + gpm_preci.attrs = {**gpm["precipitation"].attrs.copy(), **gpm.attrs.copy()} + # 先指定地理坐标再导出数据 + gpm_preci.rio.write_crs("EPSG:4326", inplace=True) + gpm_preci.rio.to_raster(target_tif_path, driver="COG", compress="DEFLATE") + return + + +def process_granule( + granule_urls: list[str], + download_dir: str, + output_dir: str, + asset_name: str, +) -> bool: + """ + 下载 GPM netCDF4 数据并进行预处理, 读取其中的降水量数据并保存为 COG 格式 + + Parameters + ---------- + granule_urls: list[str] + 下载的 GPM netCDF4 数据的 URL 列表 + download_dir: str + 下载根目录 + output_dir: str + 输出根目录 + asset_name: str + 资产名称 + + Returns + ------- + process_state: bool + 处理状态 True or False + """ + + # 3B-DAY-L.MS.MRG.3IMERG.20240301-S000000-E235959.V07B.nc4 + download_nc_name = os.path.basename(granule_urls[0]) + # 读取日期并格式化为 YYYYDOY + date = download_nc_name.split(".")[4].split("-")[0] + date = datetime.strptime(date, "%Y%m%d").strftime("%Y%j") + + download_file = os.path.join(download_dir, download_nc_name) + + out_tif_name = f"GPM.{asset_name}.global.{date}.precip.tif" + output_file = os.path.join(output_dir, out_tif_name) + + if not os.path.isfile(output_file): + if not os.path.isfile(download_file): + # Step1: 下载 GPM netCDF4 文件 + try: + earthaccess.download(granule_urls, download_dir) + logging.info(f"Downloaded {download_nc_name} to {download_dir}.") + except Exception as e: + logging.error(f"Error downloading {download_nc_name}: {e}") + return False + else: + logging.info(f"{download_nc_name} already exists. Skipping download.") + + try: + # Step2: 转换为 COG 格式 + convert(download_file, output_file) + except Exception as e: + if "NetCDF: HDF error" in str(e) or "did not find a match" in str(e): + os.remove(download_file) + logging.info( + f"Removed corrupted file {download_file}. Retrying download." + ) + process_granule(granule_urls, download_dir, output_dir, asset_name) + else: + logging.error(f"Error processing files in {download_file}: {e}") + return False + logging.info(f"Processed {output_file} successfully.") + else: + logging.warning(f"{output_file} already exists. Skipping.") + return True + + +def main( + output_root_dir: str, asset_name: str, year: str | int, dates: tuple[str, str] +): + results_urls = [] + year_results_dir = os.path.join(output_root_dir, asset_name, str(year)) + # 放置下载的 netCDF4 文件 + download_dir = os.path.join(year_results_dir, "NC4") + # 放置预处理后的文件 + output_dir = os.path.join(year_results_dir, "TIF") + os.makedirs(download_dir, exist_ok=True) + os.makedirs(output_dir, exist_ok=True) + results_urls_file = f"{year_results_dir}\\{asset_name}_{year}_results_urls.json" + time_range = (f"{year}-{dates[0]}T00:00:00", f"{year}-{dates[1]}T23:59:59") + if not os.path.isfile(results_urls_file): + results_urls = earthdata_search([asset_name], time_range) + with open(results_urls_file, "w") as f: + json.dump(results_urls, f) + else: + results_urls = json.load(open(results_urls_file)) + + # 配置日志 + log_file = os.path.join(year_results_dir, f"{asset_name}_{year}_SuPER.log") + logging.basicConfig( + level=logging.INFO, + format="%(levelname)s:%(asctime)s ||| %(message)s", + handlers=[ + logging.StreamHandler(sys.stdout), + logging.FileHandler(log_file), + ], + ) + logging.info(f"Found {len(results_urls)} {asset_name} granules.") + + client = dask.distributed.Client(timeout=60, memory_limit="8GB") + client.run(setup_dask_environment) + all_start_time = time.time() + client.scatter(results_urls) + + logging.info(f"Start processing {asset_name} ...") + process_tasks = [ + dask.delayed(process_granule)(granule_url, download_dir, output_dir, asset_name) + for granule_url in results_urls + ] + dask.compute(*process_tasks) + + client.close() + all_total_time = time.time() - all_start_time + logging.info( + f"All {asset_name} Downloading complete and proccessed. Total time: {all_total_time:.2f} s." + ) + + +if __name__ == "__main__": + earthaccess.login(persist=True) + output_root_dir = ".\\data\\GPM" + asset_name = "GPM_3IMERGDL" + year = 2024 + dates = ("03-01", "10-31") + main(output_root_dir, asset_name, year, dates) diff --git a/DATA_SuPER/MODIS_SuPER.py b/DATA_SuPER/MODIS_SuPER.py new file mode 100644 index 0000000..30d5891 --- /dev/null +++ b/DATA_SuPER/MODIS_SuPER.py @@ -0,0 +1,321 @@ +# -*- coding: utf-8 -*- +""" +=============================================================================== +This module contains functions related to preprocessing MODIS data. +For example, MCD43A3, MCD43A4, MOD11A1. + +------------------------------------------------------------------------------- +Authors: Hong Xie +Last Updated: 2025-07-15 +=============================================================================== +""" + +import os +import sys +import json +import time +import logging +import earthaccess +import rioxarray as rxr +import dask.distributed +import geopandas as gpd + +sys.path.append("D:/NASA_EarthData_Script") + +from utils.common_utils import clip_image, reproject_image, setup_dask_environment +from HLS_SuPER.HLS_Su import earthdata_search + + +def open_mcd43a4(file_path): + """ + Open MODIS MCD43A4 EOS-HDF4 file. + """ + # MCD43A4 影像中有 14 个波段, 仅需加载其中的 6 个核心波段 + # 排除其他波段, 如第 5 个波段的水汽波段 + bands = [ + "Nadir_Reflectance_Band1", + "Nadir_Reflectance_Band2", + "Nadir_Reflectance_Band3", + "Nadir_Reflectance_Band4", + "Nadir_Reflectance_Band6", + "Nadir_Reflectance_Band7", + ] + # 原始波段名称到波段名称的映射 + bands_mapping = { + "Nadir_Reflectance_Band1": "BLUE", + "Nadir_Reflectance_Band2": "NIR", + "Nadir_Reflectance_Band3": "RED", + "Nadir_Reflectance_Band4": "GREEN", + "Nadir_Reflectance_Band6": "SWIR1", + "Nadir_Reflectance_Band7": "SWIR2", + } + # 波段顺序 + sorted_bands = ["BLUE", "GREEN", "RED", "NIR", "SWIR1", "SWIR2"] + + # 打开 MODIS MCD43A4 HDF4 文件 + mcd43a4_bands = ( + rxr.open_rasterio(file_path, variable=bands, masked=True) + .squeeze("band", drop=True) + .rename(bands_mapping) + ) + + # 对波段进行排序, 这是 xarray 的特殊写法, 可以直接对 Data variables 进行排序 + mcd43a4_bands = mcd43a4_bands[sorted_bands] + return mcd43a4_bands + + +def open_mcd43a3(file_path): + """ + Open MODIS MCD43A3 EOS-HDF4 file. + """ + # MCD43A3 影像中有 14 个波段, 仅需加载其中的 6 个核心波段 + bands = [ + "MOD_Grid_BRDF:Albedo_BSA_Band1", + "MOD_Grid_BRDF:Albedo_BSA_Band2", + "MOD_Grid_BRDF:Albedo_BSA_Band3", + "MOD_Grid_BRDF:Albedo_BSA_Band4", + "MOD_Grid_BRDF:Albedo_BSA_Band6", + "MOD_Grid_BRDF:Albedo_BSA_Band7", + ] + + mcd43a3_bands = rxr.open_rasterio(file_path, variable=bands, masked=True).squeeze( + "band", drop=True + ) + return mcd43a3_bands + + +def open_mod11a1(file_path): + """ + Open MODIS MOD11A1 EOS-HDF4 file. + """ + # MOD11A1 影像中有 12 个波段, 仅提取出日间温度波段 + bands = ["LST_Day_1km"] + # 原始波段名称到波段名称的映射 + bands_mapping = {"LST_Day_1km": "LST"} + + # 打开 MODIS MOD11A1 HDF4 文件 + mod11a1_bands = ( + rxr.open_rasterio(file_path, variable=bands, masked=True) + .squeeze("band", drop=True) + .rename(bands_mapping) + ) + + return mod11a1_bands + + +def open_modis(file_path, prod_name): + """ + Open MODIS EOS-HDF4 file. + """ + if prod_name == "MOD11A1": + return open_mod11a1(file_path) + elif prod_name == "MCD43A3": + return open_mcd43a3(file_path) + elif prod_name == "MCD43A4": + return open_mcd43a4(file_path) + else: + raise ValueError(f"Unknown MODIS product: {prod_name}.") + + +def process_modis(download_file, prod_name, roi, clip=True, scale=True, target_crs=None, output_file=None): + """ + 对 MODIS 数据进行预处理, 包括裁剪, 重投影和缩放. + """ + + modis = open_modis(download_file, prod_name) + + if roi is not None and clip: + modis = clip_image(modis, roi) + + if target_crs is not None: + modis = reproject_image(modis, target_crs) + + # 重投影后再裁剪一次 + if roi is not None and clip: + modis = clip_image(modis, roi) + + if scale: + # 缩放计算后会丢源属性和坐标系, 需要先备份源数据属性信息 + org_attrs = modis.attrs + if prod_name == "MOD11A1": + # MOD11A1 数据的温度波段单位为 K, 缩放因子为 0.02, 需要转换为摄氏度 + modis = modis * 0.02 - 273.15 + elif prod_name == "MCD43A3": + modis = modis * 0.001 + elif prod_name == "MCD43A4": + modis = modis * 0.0001 + # 恢复源数据属性信息 + modis.attrs = org_attrs.copy() + modis.rio.write_crs(target_crs, inplace=True) + modis.attrs["scale_factor"] = 1 + else: + if prod_name == "MOD11A1": + modis.attrs["scale_factor"] = 0.02 + elif prod_name == "MCD43A3": + modis.attrs["scale_factor"] = 0.001 + elif prod_name == "MCD43A4": + modis.attrs["scale_factor"] = 0.0001 + modis.rio.to_raster(output_file, compress="DEFLATE") + return + + +def process_granule( + granule_urls, + roi, + clip, + scale, + output_dir, + target_crs="EPSG:4326", + tile_id=None, +): + logging.basicConfig( + level=logging.INFO, + format="%(levelname)s:%(asctime)s ||| %(message)s", + handlers=[logging.StreamHandler(sys.stdout)], + ) + + download_hdf_name = os.path.basename(granule_urls[0]) + # 获取名称与日期 + file_name_part = download_hdf_name.split(".") + date = file_name_part[1][1:] + prod_name = file_name_part[0] + download_path = os.path.join(output_dir, "HDF") + os.makedirs(download_path, exist_ok=True) + download_file = os.path.join(download_path, download_hdf_name) + + if prod_name == "MOD11A1": + out_tif_name = f"MODIS.{prod_name}.{tile_id}.{date}.LST.tif" + elif prod_name == "MCD43A3": + out_tif_name = f"MODIS.{prod_name}.{tile_id}.{date}.Albedo.tif" + elif prod_name == "MCD43A4": + out_tif_name = f"MODIS.{prod_name}.{tile_id}.{date}.NBRDF.tif" + else: + out_tif_name = download_hdf_name.replace(".hdf", ".tif") + # 除 MCD43A4 需用于光谱指数计算外, MOD11A1 日间温度与 MCD43A4 反照率无需再按日期归档 + if prod_name == "MOD11A1" or prod_name == "MCD43A3": + output_path = os.path.join(output_dir, "TIF") + else: + output_path = os.path.join(output_dir, "TIF", date) + os.makedirs(output_path, exist_ok=True) + output_file = os.path.join(output_path, out_tif_name) + + if not os.path.isfile(output_file): + # Step1: 下载 HDF 文件 + if not os.path.isfile(download_file): + try: + earthaccess.download(granule_urls, download_path) + except Exception as e: + logging.error(f"Error downloading {download_file}: {e}") + return + else: + logging.warning(f"{download_file} already exists. Skipping.") + + # Step2: 处理 HDF 文件 + try: + process_modis(download_file, prod_name, roi, clip, scale, target_crs, output_file) + except Exception as e: + os.remove(download_file) + logging.info(f"Removed corrupted file {download_file}. Retrying download.") + process_granule(granule_urls, roi, clip, scale, output_dir, target_crs, tile_id) + logging.info(f"Processed {output_file} successfully.") + else: + logging.warning(f"{output_file} already exists. Skipping.") + + +def main( + region: list, + asset_name: str, + modis_tile_id: str, + years: list, + dates: tuple[str, str], + tile_id: str, + output_root_dir: str, +): + bbox = tuple(list(region.total_bounds)) + results_urls = [] + output_dir = os.path.join(output_root_dir, asset_name) + os.makedirs(output_dir, exist_ok=True) + results_urls_file = os.path.join( + output_dir, f"{asset_name}_{modis_tile_id}_results_urls.json" + ) + for year in years: + year_results_dir = os.path.join(output_dir, year) + os.makedirs(year_results_dir, exist_ok=True) + year_results_file = os.path.join( + year_results_dir, f"{asset_name}_{modis_tile_id}_{year}_results_urls.json" + ) + year_temporal = (f"{year}-{dates[0]}T00:00:00", f"{year}-{dates[1]}T23:59:59") + year_results = earthdata_search( + [asset_name], year_temporal, bbox, modis_tile_id + ) + results_urls.extend(year_results) + with open(year_results_file, "w") as f: + json.dump(year_results, f) + + with open(results_urls_file, "w") as f: + json.dump(results_urls, f) + + # 配置日志, 首次配置生效, 后续嵌套配置无效 + logging.basicConfig( + level=logging.INFO, # 级别为INFO及以上的日志会被记录 + format="%(levelname)s:%(asctime)s ||| %(message)s", + handlers=[ + logging.StreamHandler(sys.stdout), # 输出到控制台 + logging.FileHandler( + f"{output_dir}\\{asset_name}_{tile_id}_SuPER.log" + ), # 输出到日志文件 + ], + ) + + client = dask.distributed.Client(timeout=60, memory_limit="8GB") + client.run(setup_dask_environment) + all_start_time = time.time() + for year in years: + year_results_dir = os.path.join(output_dir, year) + year_results_file = os.path.join( + year_results_dir, f"{asset_name}_{modis_tile_id}_{year}_results_urls.json" + ) + year_results = json.load(open(year_results_file)) + client.scatter(year_results) + + start_time = time.time() + logging.info(f"Start {year}...") + tasks = [ + dask.delayed(process_granule)( + granule_url, + roi=region, + clip=True, + scale=True, + output_dir=year_results_dir, + target_crs="EPSG:32649", + tile_id=tile_id, + ) + for granule_url in year_results + ] + dask.compute(*tasks) + + total_time = time.time() - start_time + logging.info( + f"{year} MODIS {asset_name} Downloading complete and proccessed. Total time: {total_time} seconds" + ) + client.close() + all_total_time = time.time() - all_start_time + logging.info( + f"All MODIS {asset_name} Downloading complete and proccessed. Total time: {all_total_time} seconds" + ) + + +if __name__ == "__main__": + earthaccess.login(persist=True) + # region = gpd.read_file("./data/vectors/wuling_guanqu_polygon.geojson") + tile_id = "49REL" + region = gpd.read_file(f"./data/vectors/{tile_id}.geojson") + # asset_name = "MOD11A1" + # asset_name = "MCD43A3" + asset_name = "MCD43A4" + modis_tile_id = "h27v06" + # 示例文件名称: MCD43A4.A2024001.h27v05.061.2024010140610.hdf + years = ["2024", "2023", "2022"] + dates = ("03-01", "10-31") + output_root_dir = ".\\data\\MODIS\\" + main(region, asset_name, modis_tile_id, years, dates, tile_id, output_root_dir) diff --git a/DATA_SuPER/S1_SAR_SuPER.py b/DATA_SuPER/S1_SAR_SuPER.py new file mode 100644 index 0000000..7748f88 --- /dev/null +++ b/DATA_SuPER/S1_SAR_SuPER.py @@ -0,0 +1,463 @@ +# -*- coding: utf-8 -*- +""" +=============================================================================== +本模块采用辐射地形校正的 Sentinel-1 卫星数据 (Radiometric Terrain Corrected +Sentinel-1, RTC-S1) 作为合成孔径雷达 (SAR) 反向散射数据源, 替代 Sentinel-1 原 L1 级 +数据预处理流程. RTC-S1 为经专业预处理的 标准产品, 可显著简化数据准备工作流程, 仅需 +通过数据拼接和空间裁剪即可快速获取适用于土壤湿度反演的标准化输入数据. + +产品说明: +RTC-S1 数据产品由美国宇航局喷气推进实验室 (JPL) OPERA 项目组研制, +基于 Sentinel-1 SLC L1 级原始数据, 通过标准化处理流程生成的雷达反向散射系数产品. +- 官方数据门户: https://www.jpl.nasa.gov/go/opera/products/rtc-product/ +- 官方使用示例: https://github.com/OPERA-Cal-Val/OPERA_Applications/blob/main/RTC/notebooks/RTC_notebook.ipynb +- 标准处理流程: + 1. 多视处理 (Multilooking) + 2. 精密轨道校正 + 3. 热噪声去除 (Thermal Denoising) + 4. 辐射定标 (Radiometric Calibration) + 5. 地形校正 (Terrain Flattening) + 6. UTM投影重采样 +- 产品特性: + - 时间覆盖: 2021-01-01起 (持续更新) + - 空间分辨率: 30米 + - 数据格式: GeoTIFF/HDF5 +- 进一步预处理: + - 将同一天内同一区域的多个burst拼接为单景影像 + - 将后向散射系数转换为分贝 (dB) 单位 + - 按照UTM军事格网 (UTM-MGRS) 格式裁剪为子区影像 + +------------------------------------------------------------------------------- +Authors: Hong Xie +Last Updated: 2025-03-10 +=============================================================================== +""" + +import os +import sys +import glob +import json +import time +from datetime import datetime +import warnings +import dask.distributed +import logging +import earthaccess +from geopandas import read_file +import numpy as np +import xarray as xr +from rioxarray import open_rasterio + +sys.path.append("D:/NASA_EarthData_Script") + +from utils.common_utils import setup_dask_environment, clip_image, mosaic_images +from HLS_SuPER.HLS_Su import earthdata_search + + +def download_granule( + asset_name: list[str], granule_urls: list[str], output_dir: str +) -> bool: + """ + 下载单批 OPERA RTC-S1 数据 + + Parameters + ---------- + asset_name: str + 数据集名称 + granule_urls: list + 查询返回的规范化 RTC-S1 数据 URL 列表 + output_dir: str + 输出目录 + + Returns + ------- + download_state: bool + 下载状态 True or False + """ + asset_name = asset_name[0] + if "STATIC" not in asset_name: + # SAR 极化文件: OPERA_L2_RTC-S1_T040-083952-IW1_20240605T102012Z_20240612T053743Z_S1A_30_v1.0_VH.tif + granule_urls = [url for url in granule_urls if ".h5" not in url] + # 拆解文件名 + date = granule_urls[0].split("_")[4].split("T")[0] + # 将日期格式化为 YYYYDOY + date = datetime.strptime(date, "%Y%m%d").strftime("%Y%j") + download_dir = os.path.join(output_dir, date, "burst") + else: + # 元数据文件: OPERA_L2_RTC-S1-STATIC_T113-240749-IW1_20140403_S1A_30_v1.0_local_incidence_angle.tif + granule_urls = [url for url in granule_urls if "mask" not in url] + download_dir = os.path.join(output_dir, "burst") + os.makedirs(download_dir, exist_ok=True) + # 检查是否已下载 + if not all( + os.path.isfile(os.path.join(download_dir, url.split("/")[-1])) + for url in granule_urls + ): + try: + earthaccess.download(granule_urls, download_dir) + except Exception as e: + logging.error(f"Error downloading RTC-S1 data: {e}. Skipping.") + return False + logging.info("RTC-S1 data already downloaded.") + return True + + +def convert_to_db(image: xr.DataArray) -> xr.DataArray: + """ + 将后向散射系数转换为分贝值(dB), 优化包含: + 1. 处理零值和负值异常 + 2. 显式处理无效值 + 3. 保持数据坐标系属性 + + Parameters + ---------- + image: xr.DataArray + 原始后向散射系数数组 + + Returns + ------- + db: xr.DataArray + 转换后的分贝值数组, 无效值保留为-9999 + """ + # 创建有效值掩模, 排除0, 负值和空值 + valid_mask = (image > 0) & image.notnull() + + # 带保护的log10转换 + with warnings.catch_warnings(): + warnings.simplefilter("ignore", RuntimeWarning) + db_values = 10 * np.log10(image.where(valid_mask)) + + # 保持原始数据属性 + db = xr.DataArray( + db_values, coords=image.coords, dims=image.dims, attrs=image.attrs + ) + db.rio.write_nodata(-9999, inplace=True) + # 添加转换记录 + db.attrs["processing_history"] = ( + "converted to dB with zero/negative value filtering" + ) + return db + + +def process_sar_granule( + url_basenames: list[str], + output_dir: str, + date: str, + tile_id: str, + roi: list, + clip=True, +) -> bool: + """ + OPERA RTC-S1 预处理 + + 对同一天 OPERA RTC-S1 数据进行镶嵌与裁剪. + + Parameters + ---------- + url_basenames : list[str] + 用于文件匹配的 RTC-S1 数据下载链接的文件名列表 + output_dir : str + 输出根目录 + date : str + 日期, 格式为 YYYY%j + tile_id : str + 输出网格 tile id + roi : list + 感兴趣区域 (ROI) 的坐标 + clip : bool + 是否按照 ROI 裁剪, 默认为 True + + Returns + ------- + process_state: bool + 是否成功处理 + """ + if not os.path.isdir(os.path.join(output_dir, date)): + return False + final_output_dir = os.path.join(output_dir, date) + burst_dir = os.path.join(output_dir, date, "burst") + # 筛选与本次下载链接匹配的本地文件 + vv_tif_list = [ + f + for f in glob.glob(os.path.join(burst_dir, f"*_VV.tif")) + if os.path.basename(f) in url_basenames + ] + vh_tif_list = [ + f + for f in glob.glob(os.path.join(burst_dir, f"*_VH.tif")) + if os.path.basename(f) in url_basenames + ] + if not vv_tif_list or not vh_tif_list: + return False + vv_file_name = f"SAR.RTC-S1.{tile_id}.{date}.VV.tif" + vh_file_name = f"SAR.RTC-S1.{tile_id}.{date}.VH.tif" + vv_file_path = os.path.join(final_output_dir, vv_file_name) + vh_file_path = os.path.join(final_output_dir, vh_file_name) + if os.path.isfile(vv_file_path) and os.path.isfile(vh_file_path): + logging.info(f"{vv_file_name} and {vh_file_name} already processed.") + return False + + with open_rasterio(vv_tif_list[0]) as ds: + crs = ds.rio.crs + + # 将同一天数据集镶嵌, 生成形状为 (1, y, x) 的 numpy 数组, 以及 transform + vv_ds = mosaic_images(vv_tif_list, tif_crs=crs) + vv_ds = vv_ds.assign_attrs( + DOY=date, + file_name=vv_file_name, + ) + + vh_ds = mosaic_images(vh_tif_list, tif_crs=crs) + vh_ds = vh_ds.assign_attrs( + DOY=date, + file_name=vh_file_name, + ) + # 裁剪 + if clip: + vv_ds = clip_image(vv_ds, roi) + vh_ds = clip_image(vh_ds, roi) + # 转换为 dB + vv_ds = convert_to_db(vv_ds) + vh_ds = convert_to_db(vh_ds) + # 网格区域的最终影像大小为 (1, 3661, 3661) + # TODO: 但部分影像为 (1, 2089, 3661), 需要确保其最终形状为 (1, 3661, 3661), 缺失区域填充值为-9999 + if vv_ds.shape[1] != 3661 or vh_ds.shape[1] != 3661: + logging.warning( + f"{vv_file_name} and {vh_file_name} shape not perfect match the UTM-MRGS grid {tile_id}." + ) + # 保存 + vv_ds.rio.to_raster( + vv_file_path, + driver="COG", + compress="DEFLATE", + dtype="float32", + ) + vh_ds.rio.to_raster( + vh_file_path, + driver="COG", + compress="DEFLATE", + dtype="float32", + ) + logging.info(f"{date} RTC-S1 VV and VH data saved.") + return True + + +def process_sar_static_granule( + url_basenames: list[str], output_dir: str, tile_id: str, roi: list, clip=True +) -> bool: + """ + OPERA RTC-S1-STATIC 静态数据预处理 + + 对同一区域的 OPERA RTC-S1-STATIC 静态数据进行镶嵌与裁剪. + + Parameters + ---------- + url_basenames : list[str] + 用于文件匹配的 RTC-S1-STATIC 数据下载链接的文件名列表 + output_dir: str + 输出根目录 + tile_id: str + tile id + roi: list + 感兴趣区域 (ROI) 的坐标 + clip: bool + 是否按照 ROI 裁剪, 默认为 True + + Returns + ------- + process_state: bool + 是否成功处理 + """ + burst_dir = os.path.join(output_dir, "burst") + # 筛选与本次下载链接匹配的本地文件 + lia_tif_list = [ + f + for f in glob.glob(os.path.join(burst_dir, "*local_incidence_angle.tif")) + if os.path.basename(f) in url_basenames + ] + ia_tif_list = [ + f + for f in glob.glob(os.path.join(burst_dir, "*incidence_angle.tif")) + if os.path.basename(f) in url_basenames + ] + # Local-incidence angle (LIA) data 局部入射角数据 + lia_file_name = f"SAR.RTC-S1.{tile_id}.2015.LIA.tif" + # Incidence angle (IA) data 入射角数据 + ia_file_name = f"SAR.RTC-S1.{tile_id}.2015.IA.tif" + lia_file_path = os.path.join(output_dir, lia_file_name) + ia_file_path = os.path.join(output_dir, ia_file_name) + if os.path.isfile(lia_file_path) and os.path.isfile(ia_file_path): + logging.info(f"{lia_file_name} and {ia_file_name} already processed.") + return True + + with open_rasterio(lia_tif_list[0]) as ds: + crs = ds.rio.crs + + lia_ds = mosaic_images(lia_tif_list) + lia_ds = lia_ds.assign_attrs( + file_name=lia_file_name, + ) + lia_ds.rio.write_crs(crs, inplace=True) + + ia_ds = mosaic_images(ia_tif_list) + ia_ds = ia_ds.assign_attrs( + file_name=ia_file_name, + ) + ia_ds.rio.write_crs(crs, inplace=True) + + if clip: + lia_ds = clip_image(lia_ds, roi) + ia_ds = clip_image(ia_ds, roi) + + lia_ds.rio.to_raster( + lia_file_path, + driver="COG", + compress="DEFLATE", + dtype="float32", + ) + ia_ds.rio.to_raster( + ia_file_path, + driver="COG", + compress="DEFLATE", + dtype="float32", + ) + logging.info(f"{tile_id} SAR.RTC-S1 static data saved.") + return True + + +def main( + asset_name: list[str], + region: list, + years: list[str | int], + output_dir: str, + tile_id: str, +): + bbox = tuple(list(region.total_bounds)) + # 示例文件名称: OPERA_L2_RTC-S1_T040-083952-IW1_20240605T102012Z_20240612T053743Z_S1A_30_v1.0_VH.tif + static_name = ["OPERA_L2_RTC-S1-STATIC_V1"] + sar_output_dir = os.path.join(output_dir, "RTC_S1") + static_output_dir = os.path.join(output_dir, "RTC_S1_STATIC") + os.makedirs(sar_output_dir, exist_ok=True) + os.makedirs(static_output_dir, exist_ok=True) + results_urls = [] + results_urls_file = os.path.join( + sar_output_dir, f"RTC_S1_{tile_id}_results_urls.json" + ) + static_urls_file = os.path.join( + static_output_dir, f"RTC_S1_STATIC_{tile_id}_results_urls.json" + ) + for year in years: + year_results_dir = os.path.join(sar_output_dir, year) + os.makedirs(year_results_dir, exist_ok=True) + year_results_file = os.path.join( + year_results_dir, f"RTC_S1_{tile_id}_{year}_results_urls.json" + ) + year_temporal = (f"{year}-06-01T00:00:00", f"{year}-08-31T23:59:59") + year_results = earthdata_search(asset_name, year_temporal, roi=bbox) + results_urls.extend(year_results) + with open(year_results_file, "w") as f: + json.dump(year_results, f) + + with open(results_urls_file, "w") as f: + json.dump(results_urls, f) + + static_granule_urls = earthdata_search(static_name, roi=bbox) + static_url_basenames = [ + os.path.basename(url) for sublist in static_granule_urls for url in sublist + ] + + with open(static_urls_file, "w") as f: + json.dump(static_granule_urls, f) + + # 配置日志 + logging.basicConfig( + level=logging.INFO, + format="%(levelname)s:%(asctime)s ||| %(message)s", + handlers=[ + logging.StreamHandler(sys.stdout), + logging.FileHandler( + os.path.join(output_dir, f"RTC_S1_SuPER_{tile_id}_{year}.log") + ), + ], + ) + logging.info(f"Found {len(results_urls)} RTC-S1 granules.") + logging.info(f"Found {len(static_granule_urls)} RTC-S1 static granules.") + + client = dask.distributed.Client(timeout=60, memory_limit="8GB") + client.run(setup_dask_environment) + all_start_time = time.time() + logging.info(f"Start downloading and processing RTC-S1 ...") + for year in years: + year_results_dir = os.path.join(sar_output_dir, year) + year_results_file = os.path.join( + year_results_dir, f"RTC_S1_{tile_id}_{year}_results_urls.json" + ) + year_results = json.load(open(year_results_file)) + url_basenames = [ + os.path.basename(url) for sublist in year_results for url in sublist + ] + client.scatter(year_results) + + start_time = time.time() + logging.info(f"Start {year}...") + + download_tasks = [ + dask.delayed(download_granule)(asset_name, granule_urls, year_results_dir) + for granule_urls in year_results + ] + + process_tasks = [ + dask.delayed(process_sar_granule)( + url_basenames, year_results_dir, date, tile_id, region + ) + for date in os.listdir(year_results_dir) + ] + + # Step1: 下载 RTC RTC-S1 数据 + dask.compute(*download_tasks) + # Step2: RTC-S1 数据处理 + dask.compute(*process_tasks) + + total_time = time.time() - start_time + logging.info( + f"{year} RTC-S1 downloading complete and proccessed. Total time: {total_time:.2f} seconds" + ) + + # Step3: 下载 RTC-S1-STATIC 数据 + start_time = time.time() + logging.info(f"Start downloading and processing RTC-S1-STATIC ...") + download_tasks = [ + dask.delayed(download_granule)(static_name, granule_urls, static_output_dir) + for granule_urls in static_granule_urls + ] + process_task = dask.delayed(process_sar_static_granule)( + static_url_basenames, static_output_dir, tile_id, region + ) + + dask.compute(*download_tasks) + dask.compute(process_task) + + total_time = time.time() - start_time + logging.info( + f"RTC-S1-STATIC downloading complete and proccessed. Total time: {total_time:.2f} seconds" + ) + + client.close(timeout=30) # 延长关闭超时时间 + + all_total_time = time.time() - all_start_time + logging.info( + f"All RTC-S1 Downloading complete and proccessed. Total time: {all_total_time:.2f} seconds" + ) + + +if __name__ == "__main__": + earthaccess.login(persist=True) + asset_name = ["OPERA_L2_RTC-S1_V1"] + tile_id = "49REL" + region = read_file(f".\\data\\vectors\\{tile_id}.geojson") + # region = read_file("./data/vectors/wuling_guanqu_polygon.geojson") + # years = ["2024", "2023", "2022"] + years = ["2024"] + output_dir = ".\\data\\SAR" + os.makedirs(output_dir, exist_ok=True) + main(asset_name, region, years, output_dir, tile_id) diff --git a/DATA_SuPER/SMAP_SuPER.py b/DATA_SuPER/SMAP_SuPER.py new file mode 100644 index 0000000..95bf2a1 --- /dev/null +++ b/DATA_SuPER/SMAP_SuPER.py @@ -0,0 +1,259 @@ +# -*- coding: utf-8 -*- +""" +=============================================================================== +This module contains functions related to preprocessing SMAP data. +For example, SMAP L3, SMAP L4. +Reference: +- https://github.com/nsidc/smap_python_notebooks/blob/main/notebooks/2.0%20Read%20and%20Plot%20SMAP%20data.ipynb +- https://github.com/google/earthengine-catalog/blob/main/pipelines/smap_convert_l4.py +- https://github.com/arthur-e/pyl4c +- https://hdfeos.org/zoo/NSIDC/SMAP_L4_SM_gph_20200915T193000_Vv7032_001.h5.py + +------------------------------------------------------------------------------- +Authors: Hong Xie +Last Updated: 2025-07-04 +=============================================================================== +""" + +import os +import sys +import json +import time +from datetime import datetime +from affine import Affine +import dask.distributed +import logging +import earthaccess +import h5py +from osgeo import gdal +import xarray as xr + +sys.path.append("D:/NASA_EarthData_Script") + +from utils.common_params import EASE2_GRID_PARAMS, EPSG +from utils.common_utils import ( + array_to_raster, + load_band_as_arr, + reproject_image, + setup_dask_environment, +) +from HLS_SuPER.HLS_Su import earthdata_search + + +def convert(source_h5_path: str, target_tif_path: str, date: str) -> None: + """ + Converts a SMAP L4 HDF file to a geotiff file. + + reference: https://github.com/google/earthengine-catalog/blob/main/pipelines/smap_convert_l4.py + + args: + source_h5_path (str): Path to the source HDF file. + target_tif_path (str): Path to the target GeoTiff file. + date (str): Date of the data in YYYYDOY format. + """ + + base_path = os.path.dirname(source_h5_path) + tmp_path = os.path.join(base_path, "tmp") + os.makedirs(tmp_path, exist_ok=True) + + # Options for gdal_translate + translate_options = gdal.TranslateOptions( + format="GTiff", + outputSRS="+proj=cea +lon_0=0 +lat_ts=30 +ellps=WGS84 +units=m", + outputBounds=[-17367530.45, 7314540.11, 17367530.45, -7314540.11], + ) + + # array_to_raster params + gt = EASE2_GRID_PARAMS["M09"]["geotransform"] + wkt = EPSG[6933] + + # initialize temp tiff list + tif_list = [] + + # convert individual variables to separate GeoTiff files + VAR_LIST = ["sm_surface"] + sizeoflist = len(VAR_LIST) + for iband in range(0, sizeoflist): + var = VAR_LIST[iband] + sds_array = load_band_as_arr( + "HDF5:" + source_h5_path + "://Geophysical_Data/" + var + ) + dst_tmp = os.path.join(tmp_path, f"{date}_{str(iband + 1)}.tif") + sds_gdal = array_to_raster(sds_array, gt, wkt) + ds = gdal.Translate(dst_tmp, sds_gdal, options=translate_options) + ds = None + tif_list.append(dst_tmp) + + # build a VRT(Virtual Dataset) that includes the list of input tif files + tmp_vrt = os.path.join(tmp_path, f"{date}_tmp.vrt") + gdal.BuildVRT(tmp_vrt, tif_list, options="-separate") + + warp_options = gdal.WarpOptions( + creationOptions=["COMPRESS=DEFLATE"], + srcSRS="EPSG:6933", + dstSRS="EPSG:4326", + dstNodata=-9999, + ) + + # run gdal_Warp to reproject the VRT data and save in the target tif file with + # compression + ds = gdal.Warp(target_tif_path, tmp_vrt, options=warp_options) + ds = None + + # remove temporary files + for f in [tmp_vrt] + tif_list: + if os.path.exists(f): + os.remove(f) + + return + + +def open_smap(file_path, asset_name): + """ + [Deprecated] + 打开 *.h5 格式的 SMAP HDF 数据, 并将其转换为 WGS84 坐标系下的 xarray.DataArray 格式. + """ + if asset_name == "SPL4SMGP": + ds = h5py.File(file_path, "r") + sm_data = ds["Geophysical_Data"]["sm_surface"][:, :] + lat = ds["cell_lat"][:, 0] + lon = ds["cell_lon"][0, :] + smap_ratser = xr.DataArray( + data=sm_data, + coords={ + "y": lat, + "x": lon, + }, + dims=["y", "x"], + name="sm_surface", + attrs=ds.attrs, + ) + # 添加CRS定义, SMAP L4 数据使用EASE-Grid 2.0 9km Grid (EPSG:6933) + crs_str = "EPSG:6933" + # SMAP L4 使用的 EPSG:6933 的 Transform 实际为 (-17367530.45, 9000, 0, 7314540.83, 0, -9000) + transform = Affine.from_gdal(*(-17367530.45, 9000, 0, 7314540.83, 0, -9000)) + smap_ratser.rio.write_transform(transform, inplace=True) + smap_ratser.rio.write_crs(crs_str, inplace=True) + smap_ratser = reproject_image(smap_ratser, "EPSG:4326") + return smap_ratser + + +def process_granule( + granule_urls: list[str], + output_dir: str, + asset_name: str, +): + # SMAP_L4_SM_gph_20240601T013000_Vv7031_001.h5 + download_hdf_name = granule_urls[0].split("/")[-1] + # 获取名称与日期, 原 date 格式为 YYYYMMDD + file_name_part = download_hdf_name.split("_") + date = file_name_part[4].split("T")[0] + # 将日期格式化为 YYYYDOY + date = datetime.strptime(date, "%Y%m%d").strftime("%Y%j") + download_path = os.path.join(output_dir, "HDF") + os.makedirs(download_path, exist_ok=True) + download_file = os.path.join(download_path, download_hdf_name) + + out_tif_name = f"SMAP.{asset_name}.global.{date}.SM.tif" + output_path = os.path.join(output_dir, "TIF") + os.makedirs(output_path, exist_ok=True) + output_file = os.path.join(output_path, out_tif_name) + + if not os.path.isfile(output_file): + # Step1: 下载 HDF 文件 + if not os.path.isfile(download_file): + try: + earthaccess.download(granule_urls, download_path) + except Exception as e: + logging.error(f"Error downloading {download_file}: {e}") + return + else: + logging.warning(f"{download_file} already exists. Skipping.") + + # Step2: 处理 HDF 文件 + try: + # smap = open_smap(download_file, asset_name) + convert(download_file, output_file, date) + except Exception as e: + if 'does not exist in the file system, and is not recognized as a supported dataset name' in str(e): + os.remove(download_file) + logging.info(f"Removed corrupted file {download_file}. Retrying download.") + process_granule(granule_urls, output_dir, asset_name) + else: + logging.error(f"Error translating {download_file}: {e}") + return + logging.info(f"Processed {output_file} successfully.") + else: + logging.warning(f"{output_file} already exists. Skipping.") + + +def main(output_root_dir: str, asset_name: str, years: list[str | int], dates: tuple[str, str], hours: tuple[str, str]): + results_urls = [] + output_dir = os.path.join(output_root_dir, asset_name) + os.makedirs(output_dir, exist_ok=True) + results_urls_file = os.path.join(output_dir, f"{asset_name}_results_urls.json") + for year in years: + year_results_dir = os.path.join(output_dir, year) + os.makedirs(year_results_dir, exist_ok=True) + year_results_file = os.path.join(year_results_dir, f"{asset_name}_{year}_results_urls.json") + year_temporal = (f"{year}-{dates[0]}T00:00:00", f"{year}-{dates[1]}T23:59:59") + year_results = earthdata_search([asset_name], year_temporal, hours=hours) + results_urls.extend(year_results) + with open(year_results_file, "w") as f: + json.dump(year_results, f) + + with open(results_urls_file, "w") as f: + json.dump(results_urls, f) + + # 配置日志 + logging.basicConfig( + level=logging.INFO, + format="%(levelname)s:%(asctime)s ||| %(message)s", + handlers=[ + logging.StreamHandler(sys.stdout), + logging.FileHandler(os.path.join(output_dir, f"{asset_name}_SuPER.log")), + ], + ) + + client = dask.distributed.Client(timeout=60, memory_limit="8GB") + client.run(setup_dask_environment) + all_start_time = time.time() + for year in years: + year_results_dir = os.path.join(output_dir, year) + year_results_file = os.path.join(year_results_dir, f"{asset_name}_{year}_results_urls.json") + year_results = json.load(open(year_results_file)) + client.scatter(year_results) + + start_time = time.time() + logging.info(f"Start {year}...") + tasks = [ + dask.delayed(process_granule)( + granule_url, + output_dir=year_results_dir, + asset_name=asset_name, + ) + for granule_url in year_results + ] + dask.compute(*tasks) + + total_time = time.time() - start_time + logging.info( + f"{year} SMAP {asset_name} Downloading complete and proccessed. Total time: {total_time} seconds" + ) + client.close() + all_total_time = time.time() - all_start_time + logging.info( + f"All SMAP {asset_name} Downloading complete and proccessed. Total time: {all_total_time} seconds" + ) + + +if __name__ == "__main__": + earthaccess.login(persist=True) + output_root_dir = ".\\data\\SMAP" + asset_name = "SPL4SMGP" + # 示例文件名称: SMAP_L4_SM_gph_20240601T013000_Vv7031_001.h5 + years = ["2024", "2023", "2022"] + dates = ("03-01", "10-31") + # hours = ("03:00:00", "06:00:00") + hours = ("09:00:00", "12:00:00") + main(output_root_dir, asset_name, years, dates, hours) diff --git a/DATA_SuPER/__init__.py b/DATA_SuPER/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/DATA_SuPER/insitu_SuPER.py b/DATA_SuPER/insitu_SuPER.py new file mode 100644 index 0000000..d64b9c2 --- /dev/null +++ b/DATA_SuPER/insitu_SuPER.py @@ -0,0 +1,585 @@ +""" +湖南省水文站点数据爬取 + +1. 处理爬取到的站点分布数据 +2. 逐站点下载站点数据 +3. 数据清洗进行数据解密 +4. 检查数据完整性 + +数据示例: +站点位置信息: +{ + "STCD": "61100800", + "STNM": "东安 ", + "ADDVCD": "431122000000000", + "DRNA": 689, + "FRGRD": "2", + "LGTD": 111.288333, + "LTTD": 26.402222, + "RVNM": "紫溪 ", + "STLC": "永州市东安县白牙市镇蒋家村 ", + "STTP": "ZQ", + "HNNM": "湘江 " +}, +站点观测数据: +{ + "_id": "eac8a911a751d75d6f67e4ba", + "CRPGRWPRD": null, + "CRPTY": null, + "EXKEY": "@", + "HITRSN": null, + "SLM10": 19.9, + "SLM100": null, + "SLM20": 22.1, + "SLM30": null, + "SLM40": 24.7, + "SLM60": null, + "SLM80": null, + "SLMMMT": null, + "SRLSLM": null, + "STCD": "61107700", + "TM": "2024-01-31T12:00:00.000Z", + "VTAVSLM": null, + "AVG": "22.2", + "XDSD": "91.17" +}, +""" + +import os +import sys +import csv +import glob +import pandas as pd +import requests +import logging +import time +from datetime import datetime, timedelta + +sys.path.append("D:\NASA_EarthData_Script") + + +class getInsituData: + """ + 获取湖南省水文站点数据 + """ + + def __init__(self, output_dir: str, start_date: str, end_date: str) -> None: + """ + 初始化 + + Parameters + ---------- + output_dir : str + 保存根路径 + start_date : str + 起始日期, 如 "2024-01-01" + end_date : str + 结束日期, 如 "2024-01-31" + """ + + self.output_dir = output_dir + self.start_date = start_date + self.end_date = end_date + + def get_data_from_url(self, target_url: str) -> list[dict]: + """ + 获取湖南省水文站点数据 + + Parameters + ---------- + target_url : str + 目标URL + + Returns + ------- + result_list : list[dict] + 站点数据列表 + """ + + # 模拟请求头 + headers = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36", + "Accept": "application/json, text/plain, */*", + "Accept-Encoding": "gzip, deflate", + "Accept-Language": "zh-CN,zh;q=0.9", + "Host": "58.20.42.94:9090", + "Origin": "http://yzt.hnswkcj.com:9090", + "Proxy-Connection": "keep-alive", + "Referer": "http://yzt.hnswkcj.com:9090/", + } + # 模拟请求 + result_list = [] + try: + with requests.get(target_url, headers=headers, timeout=10) as response: + result_list = response.json()["data"] + except Exception as e: + result_list = [] + return result_list + + # 使用POST请求获取数据 + def get_data_from_post(self) -> list[dict]: + """ + 获取湖南省水文站点数据 + + Parameters + ---------- + target_url : str + 目标URL + data : dict + POST请求参数 + + Returns + ------- + result_list : list[dict] + 站点数据列表 + """ + + # 模拟请求头 + headers = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36", + "Accept": "application/json, text/plain, */*", + "Accept-Encoding": "gzip, deflate", + "Accept-Language": "zh-CN,zh;q=0.9", + "Host": "58.20.42.94:9090", + "Origin": "http://yzt.hnswkcj.com:9090", + "Proxy-Connection": "keep-alive", + "Referer": "http://yzt.hnswkcj.com:9090/", + } + # POST 请求参数 (仅筛选墒情站点) + data = { + "sw": { + "isQuery": "true", + "czlb": "水文", + "xzbjz": [], + "ytfl": [], + "czdj": [], + "zwlb": [], + "swjs": [], + }, + "swei": { + "isQuery": "false", + "xzbjz": [], + "ytfl": [], + "czdj": [], + "zwlb": [], + "swjs": [], + }, + "js": { + "isQuery": "false", + "xzbjz": [], + "ytfl": [], + "czdj": [], + "zwlb": [], + "swjs": [], + }, + "zf": { + "isQuery": "false", + "xzbjz": [], + "ytfl": [], + "czdj": [], + "zwlb": [], + "swjs": [], + }, + "ns": { + "isQuery": "false", + "xzbjz": [], + "ytfl": [], + "czdj": [], + "zwlb": [], + "swjs": [], + }, + "swen": { + "isQuery": "false", + "xzbjz": [], + "ytfl": [], + "czdj": [], + "zwlb": [], + "swjs": [], + }, + "sq": {"isQuery": "true", "trlx": []}, + "sz": {"isQuery": "false"}, + "dxs": {"isQuery": "false"}, + "slz": {"isQuery": "false"}, + "stll": {"isQuery": "false"}, + "rhpwk": {"isQuery": "false"}, + "nhjc": {"isQuery": "false"}, + "adcd": "43", + } + # 模拟请求 + target_url = "http://58.20.42.94:9090/api/core/zwnew/zwxxylbMap" + result_list = [] + try: + with requests.post( + target_url, headers=headers, data=data, timeout=10 + ) as response: + result_list = response.json()["data"] + except Exception as e: + result_list = [] + return result_list + + def get_insitu_list(self) -> list[dict]: + """ + 获取并存储湖南省全省土壤墒情站点信息 + + Parameters + ---------- + None + + Returns + ------- + insitu_list : list[dict] + 数据清洗后的站点信息列表 + """ + + logging.info("正在获取湖南省全省土壤墒情站点信息...") + output_file_path = os.path.join(self.output_dir, "hunan_insitu_position.csv") + if os.path.exists(output_file_path): + logging.info("湖南省土壤墒情站点信息已存在, 正在读取...") + with open(output_file_path, "r", encoding="gbk") as csvfile: + clean_insitu_list = list(csv.DictReader(csvfile)) + else: + target_url = ( + "http://58.20.42.94:9090/api/core/db/mm_soil/list/0/10000000/qb" + ) + insitu_list = self.get_data_from_url(target_url)[0]["data"] + + clean_insitu_list = [] + for insitu in insitu_list: + insitu["STNM"] = insitu.pop("STNM").strip() + insitu["STCD"] = str(insitu.pop("STCD")) + insitu["ADDVCD"] = str(insitu.pop("ADDVCD")) + insitu["DRNA"] = insitu.pop("DRNA") + insitu["FRGRD"] = insitu.pop("FRGRD") + insitu["lng"] = insitu.pop("LGTD") + insitu["lat"] = insitu.pop("LTTD") + insitu["RVNM"] = insitu.pop("RVNM").strip() + insitu["STLC"] = insitu.pop("STLC").strip() + insitu["STTP"] = insitu.pop("STTP") + insitu["HNNM"] = insitu.pop("HNNM").strip() + clean_insitu_list.append(insitu) + + # 将清洗后的数据保存到csv文件 + with open(output_file_path, "w", newline="", encoding="gbk") as csvfile: + # 创建csv写入器对象 + csvwriter = csv.writer(csvfile) + field_names = clean_insitu_list[0].keys() + csvwriter = csv.DictWriter(csvfile, fieldnames=field_names) + # 写入标题行 + csvwriter.writeheader() + # 写入数据行 + for insitu in clean_insitu_list: + csvwriter.writerow(insitu) + logging.info( + f"{len(clean_insitu_list)}条站点数据已存入csv文件中: {output_file_path}." + ) + return clean_insitu_list + + def clean_insitu_data( + self, insitu_name: str, insitu_data_list: list[dict], output_path: str + ) -> None: + """ + 清洗并存储指定站点的观测数据 + + Parameters + ---------- + insitu_name : str + 站点名称 + insitu_data_list : list[dict] + 站点数据列表 + output_path : str + 输出文件路径 + """ + + clean_insitu_data = [] + for data in insitu_data_list: + clean_data = {} + # 获取数据时间为UTC时间 + clean_data["TIME"] = ( + datetime.strptime(data.pop("TM"), "%Y-%m-%dT%H:%M:%S.%fZ") + ).strftime("%Y-%m-%dT%H:%M:%S") + clean_data["SLM10"] = data.pop("SLM10") + clean_data["SLM20"] = data.pop("SLM20") + clean_data["SLM40"] = data.pop("SLM40") + clean_data["AVG"] = data.pop("AVG") + clean_data["XDSD"] = data.pop("XDSD") + clean_insitu_data.append(clean_data) + + with open(output_path, "w", newline="", encoding="gbk") as csvfile: + csvwriter = csv.writer(csvfile) + field_names = clean_insitu_data[0].keys() + csvwriter = csv.DictWriter(csvfile, fieldnames=field_names) + csvwriter.writeheader() + # 按时间列TIME排序后再写入 + clean_insitu_data.sort(key=lambda x: x["TIME"]) + csvwriter.writerows(clean_insitu_data) + logging.info(f"{insitu_name} 站土壤墒情数据已保存至:{output_path}") + return + + def get_insitu_data( + self, + insitu_name: str, + insitu_id: str, + output_file_path: str, + ) -> None: + """ + 获取并存储指定站点指定时间范围内的观测数据 + + 网站页面中请求时间不能超过3个月, 否则会被前端拦截报错 + 但直接交互不会被拦截 + + Parameters + ---------- + insitu_name : str + 站点名称 + insitu_id : str + 站点编号 + output_file_path : str + 输出文件路径 + """ + + # 获取数据时间为UTC时间, 但请求时间为本地时间, 为确保数据完整性, 请求时间范围需要扩大1天 + real_start_date = ( + datetime.strptime(self.start_date, "%Y-%m-%d") - timedelta(days=1) + ).strftime("%Y-%m-%d") + real_end_date = ( + datetime.strptime(self.end_date, "%Y-%m-%d") + timedelta(days=1) + ).strftime("%Y-%m-%d") + target_url = f"http://58.20.42.94:9090/api/core/soil/soildrTM/{insitu_id}/{real_start_date}%2000:00/{real_end_date}%2023:59" + result_list = self.get_data_from_url(target_url) + if len(result_list) != 0: + self.clean_insitu_data(insitu_name, result_list, output_file_path) + else: + logging.warning(f"暂未查询到 {insitu_name} 站土壤墒情数据.") + return + + def save_all_insitu_data(self) -> None: + """ + 获取并存储所有站点指定时间范围内的观测数据 + """ + start_time = time.time() + insitu_list = self.get_insitu_list() + for insitu in insitu_list: + insitu_id = insitu["STCD"] + insitu_name = insitu["STNM"] + insitu_output_dir = os.path.join(self.output_dir, str(insitu_name)) + os.makedirs(insitu_output_dir, exist_ok=True) + output_file_path = os.path.join( + insitu_output_dir, + f"{insitu_name}_{self.start_date}_{self.end_date}.csv", + ) + if os.path.exists(output_file_path): + continue + self.get_insitu_data(insitu_name, insitu_id, output_file_path) + + total_time = time.time() - start_time + logging.info(f"数据获取完毕. Total time: {total_time} seconds") + + +class checkInsituData: + """ + 检查站点数据完整性 + """ + + def __init__(self, output_dir: str, year: int, start_date: str, end_date: str) -> None: + self.output_dir = output_dir + self.year = year + self.start_date = start_date + self.end_date = end_date + self.start_dt = datetime.strptime(self.start_date, "%Y-%m-%d") + self.end_dt = datetime.strptime(self.end_date, "%Y-%m-%d") + self.all_insitu_file = os.path.join(output_dir, "hunan_insitu_position.csv") + + def check_all_insitu_data(self) -> None: + """ + 检查所有站点数据完整性 + + 1. 检查站点是否有数据文件 + 2. 检查数据时间范围是否覆盖全部日期 + 3. 检查每日数据是否至少有3次观测 + 4. 根据检查结果标记数据完整性等级 (LEVEL, 0=无数据, 1=部分完整, 2=完整) + """ + + rows = [] + with open(self.all_insitu_file, "r", encoding="gbk") as csvfile: + rows = list(csv.DictReader(csvfile)) + + # 计算日期范围和最小数据量要求 + day_num = (self.end_dt - self.start_dt).days + 1 + min_data_num = day_num * 3 + # 生成完整的日期范围 (用于检查缺失日期) + full_dates = pd.date_range(start=self.start_dt, end=self.end_dt, freq="D") + + for row in rows: + insitu_name = row["STNM"] + insitu_files = glob.glob( + os.path.join( + self.output_dir, + str(insitu_name), + f"*_{self.start_date}_{self.end_date}.csv", + ) + ) + # 添加新字段记录数据完整性 + level_field = f"LEVEL_{self.year}" + row[level_field] = 0 # 默认为无数据 + if len(insitu_files) == 0: + continue + insitu_df = pd.read_csv(insitu_files[0], parse_dates=["TIME"]) + cleaned_data = self.clean_data(insitu_df) + # 保存清理后的数据 (本地时间和UTC时间) + base_path = insitu_files[0].replace(".csv", "") + # UTC版本 + cleaned_data.to_csv(f"{base_path}_clean_UTC.csv", index=False) + + # 检查1: 总数据量是否满足最低要求 + if len(cleaned_data) == 0: + continue + row[level_field] = 1 # 标记为部分数据 + if len(cleaned_data) < min_data_num: + continue + # 检查2: 每日数据是否完整 (至少3次观测) + daily_counts = cleaned_data.set_index("TIME").resample("D").size() + + # 检查是否覆盖所有日期且每日至少有3次观测 + missing_dates = full_dates.difference(daily_counts.index) + insufficient_days = daily_counts[daily_counts < 3] + # if missing_dates.empty or insufficient_days.empty: + if missing_dates.empty: + row[level_field] = 2 # 标记为完整数据 + # 本地时间版本 + cleaned_data_UTC8 = cleaned_data.copy() + cleaned_data_UTC8["TIME"] = cleaned_data_UTC8["TIME"] + pd.Timedelta(hours=8) + cleaned_data_UTC8.to_csv(f"{base_path}_clean.csv", index=False) + + # 将修改后的数据写回文件 + output_file = self.all_insitu_file.replace(".csv", "_checked.csv") + with open(output_file, "w", encoding="gbk", newline="") as csvfile: + writer = csv.DictWriter(csvfile, fieldnames=rows[0].keys()) + writer.writeheader() + writer.writerows(rows) + + # 提取 LEVEL 列表 + levels = [row[level_field] for row in rows] + nodata_count = levels.count(0) + partial_count = levels.count(1) + complete_count = levels.count(2) + logging.info( + f"站点数据完整性检查完毕, {self.start_date} - {self.end_date} 完整数据站点数: {complete_count}, 不完整数据站点数: {partial_count}, 无数据站点数: {nodata_count}." + ) + return + + def clean_data(self, data: pd.DataFrame): + """ + 通过请求直接获取的数据为加密数据, 需要进行数据清洗解密 + """ + # 经观察, UTC 时间 00:00, 06:00, 08:00, 12:00, 16:00, 18:00 为真实观测时间 + data["REAL_SLM10"] = data["SLM10"].where( + data["TIME"].dt.hour.isin([0, 6, 8, 12, 16, 18]) + ) + # 仅保留SLM10与REAL_SLM10相等的数据点, 并删除REAL_SLM10列 + data_cleaned = data[data["SLM10"] == data["REAL_SLM10"]].drop( + columns=["REAL_SLM10"] + ) + # 剔除超过时间范围的数据 + data_cleaned = data_cleaned[ + (data_cleaned["TIME"] >= self.start_dt) & (data_cleaned["TIME"] < self.end_dt + timedelta(days=1)) + ] + # 考虑时间重复的情况, 以及极端值噪声过滤 + drop_index = [] + for i in range(1, len(data_cleaned) - 1): + current_time = data_cleaned.iloc[i]["TIME"] + current_value = data_cleaned.iloc[i]["SLM10"] + if i == 0: + next_value = data_cleaned.iloc[i + 1]["SLM10"] + if abs(current_value - next_value) > current_value: + drop_index.append(i) + elif i == len(data_cleaned) - 1: + previous_value = data_cleaned.iloc[i - 1]["SLM10"] + if abs(current_value - previous_value) > current_value: + drop_index.append(i) + else: + previous_value = data_cleaned.iloc[i - 1]["SLM10"] + next_value = data_cleaned.iloc[i + 1]["SLM10"] + previous_time = data_cleaned.iloc[i - 1]["TIME"] + next_time = data_cleaned.iloc[i + 1]["TIME"] + if current_time == next_time: + # 仅在首次发现时间重复时, 判断异常噪声 + if current_value > next_value: + if (current_value - next_value) >= next_value * 0.6: + if (previous_value - next_value) >= next_value * 0.6: + # 下一条数据为极小值噪声 + drop_index.append(i + 1) + elif ( + current_value - previous_value + ) >= previous_value * 0.6: + # 当前数据为极大值噪声 + drop_index.append(i) + else: + # 当前数据为极大值噪声 + drop_index.append(i) + elif current_value < next_value: + if (next_value - current_value) >= current_value * 0.6: + if (previous_value - current_value) >= current_value * 0.6: + # 当前数据为极小值噪声 + drop_index.append(i) + elif (next_value - previous_value) >= current_value * 0.6: + # 下一条数据为极大值噪声 + drop_index.append(i + 1) + else: + # 下一条数据为极大值噪声 + drop_index.append(i + 1) + elif current_time == previous_time: + # 若与上一条数据时间相同, 已处理跳过 + continue + else: + # 处理在观测时间不重复时的异常噪声值 + if current_value < previous_value and current_value < next_value: + # 若为极小值 + threshold = current_value * 0.6 + if (previous_value - current_value) >= threshold or ( + next_value - current_value + ) >= threshold: + drop_index.append(i) + elif current_value > previous_value and current_value > next_value: + # 若为极大值 + if (current_value - previous_value) >= previous_value or ( + current_value - next_value + ) >= next_value: + drop_index.append(i) + data_cleaned = data_cleaned.drop(data_cleaned.index[drop_index]) + return data_cleaned + + +def main( + output_dir: str, + year: str | int = 2024, + start_date: str = "01-01", + end_date: str = None, +) -> None: + start_date = f"{year}-{start_date}" + end_date = f"{year}-{end_date}" if end_date is not None else start_date + logging.basicConfig( + level=logging.INFO, + format="%(levelname)s:%(asctime)s ||| %(message)s", + handlers=[ + logging.StreamHandler(sys.stdout), + logging.FileHandler( + f"{output_dir}\\insitu_SuPER_{start_date}_{end_date}.log" + ), + ], + ) + getInsituData(output_dir, start_date, end_date).save_all_insitu_data() + checkInsituData(output_dir, year, start_date, end_date).check_all_insitu_data() + + +if __name__ == "__main__": + output_dir = ".\\data\\vectors\\HUNAN_INSITUS" + os.makedirs(output_dir, exist_ok=True) + year = 2024 + start_date = "01-01" + end_date = "12-31" + main(output_dir, year, start_date, end_date) + # start_date = "2024-01-01" + # end_date = "2024-01-03" + # target_url = f"http://58.20.42.94:9090/api/core/soil/soildrTM/61400400/{start_date}%2000:00/{end_date}%2000:00" + # results = get_data_from_url(target_url) + # print(results) diff --git a/HLS_SuPER/HLS_PER.py b/HLS_SuPER/HLS_PER.py index 747fe9f..3639778 100644 --- a/HLS_SuPER/HLS_PER.py +++ b/HLS_SuPER/HLS_PER.py @@ -8,7 +8,7 @@ search results. ------------------------------------------------------------------------------- Authors: Cole Krehbiel, Mahsa Jami, and Erik Bolch Editor: Hong Xie -Last Updated: 2025-01-06 +Last Updated: 2025-03-30 =============================================================================== """ @@ -52,14 +52,12 @@ def open_hls( Some HLS Landsat scenes have the metadata in the wrong location. """ # Open using rioxarray - da_org = rxr.open_rasterio(url, chunks=chunk_size, mask_and_scale=False).squeeze( + da = rxr.open_rasterio(url, chunks=chunk_size, mask_and_scale=False).squeeze( "band", drop=True ) # (Add) 若未获取到数据, 则返回 None - if da_org is None: + if da is None: return None - # (Add) 复制源数据进行后续操作, 以便最后复制源数据属性信息 - da = da_org.copy() # (Add) 读取波段名称 split_asset = url.split("/")[-1].split(".") @@ -68,17 +66,24 @@ def open_hls( # 判断是否为 Fmask 波段 is_fmask = True if (band_name in ["Fmask", "FMASK"]) else False # 判断是否为 L30 独有的热红外波段 - is_tir = True if (band_name in ["B10", "B11", "TIR1", "TIR2"]) and (asset_name == "L30") else False + is_tir = ( + True + if (band_name in ["B10", "B11", "TIR1", "TIR2"]) and (asset_name == "L30") + else False + ) # Reproject ROI and Clip if ROI is provided and clip is True if roi is not None and clip: roi = roi.to_crs(da.spatial_ref.crs_wkt) - da = da.rio.clip(roi.geometry.values, roi.crs, all_touched=True) + da = da.rio.clip(roi.geometry.values, roi.crs, all_touched=True, from_disk=True) # (Add) 即使大部分影像已经被缩放且填补了缺失值, 但可能仍然有些影像需要进行手动在本地GIS软件中进行缩放和填补缺失值 # Apply Scale Factor if desired for non-quality layer if not is_fmask: if scale: + # (Add) 备份源数据的坐标与属性后再进行操作, 以便最后恢复源数据属性信息 + org_attrs = da.attrs + org_crs = da.rio.crs # Mask Fill Values da = xr.where(da == -9999, np.nan, da) # Scale Data @@ -94,9 +99,9 @@ def open_hls( if clip: da.rio.write_crs(roi.crs, inplace=True) else: - da.rio.write_crs(da_org.rio.crs, inplace=True) + da.rio.write_crs(org_crs, inplace=True) # 再复制源数据属性 - da.attrs = da_org.attrs.copy() + da.attrs = org_attrs.copy() da.attrs["scale_factor"] = 1.0 # Add Scale Factor to Attributes Manually - This will overwrite/add if the data is missing. @@ -106,8 +111,6 @@ def open_hls( da.attrs["scale_factor"] = 0.01 else: da.attrs["scale_factor"] = 0.0001 - # 清除源数据 - da_org = None return da @@ -200,7 +203,9 @@ def process_granule( # compress 参数是源自 rioxarray 继承的 rasterio 的选项, 可以参考 https://rasterio.readthedocs.io/en/latest/api/rasterio.enums.html#rasterio.enums.Compression # 若未指定则默认为 LZW, 生成速度较快, 但文件较大 # 设置为 DEFLATE 是为了与官网直接下载文件一致且输出文件更小, 但生成速度略慢, 单张约慢 15s. - qa_da.rio.to_raster(raster_path=quality_output_file, driver="COG", compress="DEFLATE") + qa_da.rio.to_raster( + raster_path=quality_output_file, driver="COG", compress="DEFLATE" + ) else: qa_da = open_hls(quality_output_file, roi, clip, scale, chunk_size) logging.info( @@ -216,7 +221,7 @@ def process_granule( for url in granule_urls: # Check if File exists in Output Directory output_name = create_output_name(url, band_dict) - output_file = f"{output_dir}/{output_name}" + output_file = os.path.join(output_dir, output_name) # Check if scene is already processed if not os.path.isfile(output_file): @@ -240,11 +245,16 @@ def process_granule( # Write Output if "FMASK" in output_name and not quality_filter: # (Add) 若 quality_filter=False, 则需要将质量层文件另外保存 - da.rio.to_raster(raster_path=output_file, driver="COG", compress="DEFLATE") + da.rio.to_raster( + raster_path=output_file, driver="COG", compress="DEFLATE" + ) else: # (Add) 固定输出为 float32 类型, 否则会默认 float64 类型 da.rio.to_raster( - raster_path=output_file, driver="COG", dtype="float32", compress="DEFLATE" + raster_path=output_file, + driver="COG", + dtype="float32", + compress="DEFLATE", ) else: logging.info( diff --git a/HLS_SuPER/HLS_Su.py b/HLS_SuPER/HLS_Su.py index 63c54dc..7b97d81 100644 --- a/HLS_SuPER/HLS_Su.py +++ b/HLS_SuPER/HLS_Su.py @@ -7,7 +7,7 @@ This module contains functions related to searching and preprocessing HLS data. Authors: Mahsa Jami, Cole Krehbiel, and Erik Bolch Contact: lpdaac@usgs.gov Editor: Hong Xie -Last Updated: 2025-01-06 +Last Updated: 2025-02-20 =============================================================================== """ @@ -16,6 +16,56 @@ import numpy as np import earthaccess +def earthdata_search( + asset_name: list, + dates: tuple = None, + roi: list = None, + tile_id: str = None, + hours: tuple = None, + log=False, +): + """ + This function uses earthaccess to search for Open Source Earth Data using an roi and temporal parameter, filter by tile id and delivers a list of results urls. + + For example: + - MODIS: MCD43A3, MCD43A4, MOD11A1, MOD11A2, ... + - SMAP: SPL3SMP_E, SPL4SMGP, ... + - DEM: NASADEM_HGT, NASADEM_SC, ... + """ + + # Search for data + if dates and not roi: + # 全球范围的数据集不需要roi参数 + results = earthaccess.search_data( + short_name=list(asset_name), + temporal=dates, + ) + elif roi and not dates: + # 适用非时间序列数据, 如 DEM + results = earthaccess.search_data( + short_name=list(asset_name), + bounding_box=roi, + ) + else: + results = earthaccess.search_data( + short_name=list(asset_name), + bounding_box=roi, + temporal=dates, + ) + + # 根据瓦片ID过滤影像 + if tile_id: + results = tileid_filter(results, tile_id) + + # 根据影像当日时间过滤影像, 用于GMP, SMAP等高时间分辨率数据 + if hours: + results = hours_filter(results, hours) + + # Get results urls + results_urls = [granule.data_links() for granule in results] + return results_urls + + # Main function to search and filter HLS data def hls_search( roi: list, band_dict: dict, dates=None, cloud_cover=None, tile_id=None, log=False @@ -54,21 +104,61 @@ def hls_search( def tileid_filter(results, tile_id): """ - (Add) 基于给定的瓦片ID过滤earthaccess检索的数据结果 + (Add) 基于给定的瓦片ID过滤 earthaccess 检索的数据结果 - 实测可过滤数据集: HLS.L30, HLS.S30, MCD43A4 + 实测可过滤数据集: + HLS.L30, HLS.S30, MCD43A3, MCD43A4, MOD11A1, NASADEM, OPERA_L2_RTC-S1_V1, OPERA_L2_RTC-S1-STATIC_V1 ... """ tile_ids = [] for result in results: - # 从json中检索瓦片ID,转换为字符串并放入数组中 - tmp_id = str(result["meta"]["native-id"].split(".")[2]) - tile_ids.append(tmp_id) - tile_ids = np.array(tile_ids) - # 根据瓦片ID找到对应的索引 - tile_id_indices = np.where(tile_ids == tile_id) + # 从json中检索瓦片ID, 转换为字符串并放入数组中 + native_id = result["meta"]["native-id"] + tmp_id = None + try: + if "OPERA_L2_RTC-S1" in native_id: + tmp_id = str(native_id.split("_")[3].split("-")[0]) + else: + tmp_id = str(native_id.split(".")[2]) + except IndexError: + tmp_id = str(native_id.split("_")[2]) + except: + continue + if tmp_id: + tile_ids.append(tmp_id) + if len(tile_ids) > 0: + tile_ids = np.array(tile_ids) + # 根据瓦片ID找到对应的索引 + tile_id_indices = np.where(tile_ids == tile_id) + # 根据索引过滤结果 + return [results[i] for i in tile_id_indices[0]] + else: + return results + + +def hours_filter(results, hours): + """ + (Add) 基于给定的影像当日时间过滤earthaccess检索的数据结果 + + 实测可过滤数据集: SMAP SPL4SMGP + """ + + tmp_hours = [] + hours = tuple(map(lambda x: x.replace(":", ""), hours)) # 如: ('010000', '143000') + for result in results: + # 从json中检索影像当日时间, 转换为字符串并放入数组中 + try: + tmp_hour = str( + result["meta"]["native-id"].split("_")[4].split("T")[1] + ) # 如: 013000 + tmp_hours.append(tmp_hour) + except: + pass + tmp_hours = np.array(tmp_hours) + # 影像当日时间若在时间范围内, 找到对应的索引 + hour_indices = np.where((tmp_hours >= hours[0]) & (tmp_hours <= hours[-1])) # 根据索引过滤结果 - return [results[i] for i in tile_id_indices[0]] + return [results[i] for i in hour_indices[0]] # Filter earthaccess results based on cloud cover threshold diff --git a/HLS_SuPER/HLS_SuPER.py b/HLS_SuPER/HLS_SuPER.py index d9b635a..43e46ca 100644 --- a/HLS_SuPER/HLS_SuPER.py +++ b/HLS_SuPER/HLS_SuPER.py @@ -5,7 +5,7 @@ HLS Subsetting, Processing, and Exporting Reformatted Data Prep Script Authors: Cole Krehbiel, Mahsa Jami, and Erik Bolch Contact: lpdaac@usgs.gov Editor: Hong Xie -Last Updated: 2025-01-06 +Last Updated: 2025-01-15 =============================================================================== """ @@ -23,12 +23,14 @@ import time import json import earthaccess -from shapely import Polygon -from shapely.geometry import polygon, box +from shapely.geometry import box import geopandas as gpd from datetime import datetime as dt import dask.distributed +sys.path.append("D:/NASA_EarthData_Script") + +from utils.common_utils import setup_dask_environment from HLS_Su import hls_search from HLS_PER import process_granule, create_timeseries_dataset @@ -412,31 +414,6 @@ def confirm_action(prompt): print("Invalid input. Please enter 'y' or 'n'.") -def setup_dask_environment(): - """ - Passes RIO environment variables to dask workers for authentication. - """ - import os - import rasterio - - cookie_file_path = os.path.expanduser("~/cookies.txt") - - global env - gdal_config = { - "GDAL_HTTP_UNSAFESSL": "YES", - "GDAL_HTTP_COOKIEFILE": cookie_file_path, - "GDAL_HTTP_COOKIEJAR": cookie_file_path, - "GDAL_DISABLE_READDIR_ON_OPEN": "YES", - "CPL_VSIL_CURL_ALLOWED_EXTENSIONS": "TIF", - "GDAL_HTTP_MAX_RETRY": "10", - "GDAL_HTTP_RETRY_DELAY": "0.5", - "GDAL_HTTP_TIMEOUT": "300", - } - - env = rasterio.Env(**gdal_config) - env.__enter__() - - def files_collection(output_dir): """ 将已下载的HLS影像数据按照文件名中的日期进行归档 diff --git a/README.md b/README.md index e8a8fcf..1d38e8a 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# NASA EARTHDATA 数据爬取与预处理 —— 以 HLS 数据集为例 +# NASA EARTHDATA 数据自动化爬取与预处理 ## 仓库说明 @@ -14,7 +14,7 @@ - 实测单格网全年影像爬取+预处理约1h - 单用户每秒限制100次请求 -## 1 安装 miniforge +## 1 安装基础环境 ### 1.1 miniforge @@ -103,16 +103,60 @@ mamba env create -f setup/lpdaac_windows.yml mamba activate lpdaac_windows ``` -## 3 HLS 数据爬取 +## 3 设计思路 -### 3.1 账户准备 +### 3.1 数据组织 + +``` +data/ + MODIS/ + MOD11A1/ 地表温度 LST + 2024/ 按年份归档 + HDF/ 原始下载数据 + MOD11A1.A2024001.MODIS网格ID.061.2024002094840.hdf + ... + TIF/ 格式转换后的数据 + MODIS.MOD11A1.网格ID.2024001.LST.tif + ... + MCD43A3/ 反照率 Albedo + 同上 + MCD43A4/ 双向天底反射率 NBRDF + 2024/ + HDF/ + MOD11A1.A2024001.MODIS网格ID.061.2024002094840.hdf + ... + TIF/ + 2024001/ 反射率数据按照DOY归档 + MODIS.MCD43A4.网格ID.2024001.NBRDF.tif + ... + HLS/ + 2024/ + 2024001/ + HLS.S30.T49REL.2024001T030631.v2.0.RED.subset.tif + HLS.S30.T49REL.2024001T030631.v2.0.GREEN.subset.tif + HLS.S30.T49REL.2024001T030631.v2.0.BLUE.subset.tif + ... + SMAP/ + DEM/ + SAR/ 雷达数据 + RTC_S1/ Sentinel-1 数据 + RTC_S1_STATIC/ Sentinel-1 入射角 + GPM/ 降雨数据 +``` + +### 3.2 NASA Earthdata 账户准备 - 参考自NASA官网示例Demo:https://github.com/nasa/LPDAAC-Data-Resources/blob/main/setup/setup_instructions_python.md - 首次运行爬取命令时,需要输入用户名和密码,用户名和密码可以在 [Earthdata](https://urs.earthdata.nasa.gov/) 注册获取。 - 需要注意的是,密码中最好不要出 `@/#/$/%` 等符号,爬取时可能会出错。 - 单个用户每秒限制最多100次请求,参考自:https://forum.earthdata.nasa.gov/viewtopic.php?t=3734 -### 3.2 脚本可用参数 + +## 4 使用示例 + +### 4.1 HLS 数据 + +#### 4.1.1 脚本可用参数 - `-roi`:感兴趣区,需要按照 **左下右上** 的逆时针顺序设置点坐标,同时还支持 `shp` 与 `geojson/json` 格式文件 - `-clip`:是否对影像进行裁剪,默认 `False` @@ -126,7 +170,7 @@ mamba activate lpdaac_windows - `-qf`:是否使用质量波段过滤云/云阴影像元,默认 `True` - `-scale`:是否对影像使用缩放因子,默认 `True` -### 3.3 爬取云端数据并在内存中进行预处理示例 +#### 4.1.2 爬取云端数据并在内存中进行预处理示例 - 爬取 L30 与 S30 的核心光谱波段:仅按照感兴趣区,瓦片ID,起止时间以及产品名称筛选影像,不进行云量筛选影像,对影像进行去云掩膜处理 @@ -143,11 +187,15 @@ python .\\HLS_SuPER\\HLS_SuPER.py -roi '112.9834,30.5286,114.32373,31.64448' -ti - 仅爬取 L30 的热红外波段:仅按照感兴趣区,瓦片ID,起止时间以及产品名称筛选影像,不进行云量筛选影像,对影像进行去云掩膜处理 ```sh -python .\\HLS_SuPER\\HLS_SuPER.py -roi '112.9834,30.5286,114.32373,31.64448' -tile T49RGQ -dir .\\data\\HLS\\L30\\TIR -start 2024-01-01 -end 2024-01-31 -prod HLSL30 -bands TIR1,TIR2,Fmask -scale True +python .\\HLS_SuPER\\HLS_SuPER.py -roi .\\data\\vectors\\49REL.geojson -tile T49REL -dir .\\data\\HLS\\2024 -start 2024-06-01 -end 2024-09-01 -prod HLSL30 -bands TIR1,TIR2 ``` -- 【测试用】根据给定的范围文件 `*.geojson`,不进行云量筛选,直接爬取 L30 与 S30 2024 年的核心光谱波段 +- 【测试用】根据给定的范围文件 `*.geojson`,不进行云量筛选,直接爬取数据 ```sh -python .\\HLS_SuPER\\HLS_SuPER.py -roi .\\data\\vectors\\wuling_guanqu_polygon.geojson -tile T49RGQ -dir .\\data\\HLS\\ALL\\2024 -start 2024-06-01 -end 2024-09-01 -prod both -bands BLUE,GREEN,RED,NIR1,SWIR1,SWIR2,Fmask -scale True +python .\\HLS_SuPER\\HLS_SuPER.py -roi .\\data\\vectors\\49REL.geojson -tile T49REL -dir .\\data\\HLS\\2024 -start 2024-03-01 -end 2024-11-01 -prod both -bands BLUE,GREEN,RED,NIR1,SWIR1,SWIR2,Fmask ``` + +### 4.2 其他数据 + +v1.0: 直接运行`DATA_SuPER/`目录下所需数据对应的*.py文件即可. \ No newline at end of file diff --git a/setup/lpdaac_windows.yml b/setup/lpdaac_windows.yml index 52332b5..11dbd1a 100644 --- a/setup/lpdaac_windows.yml +++ b/setup/lpdaac_windows.yml @@ -19,7 +19,6 @@ dependencies: - asciitree=0.3.3=py_2 - asttokens=2.4.1=pyhd8ed1ab_0 - async-timeout=4.0.3=pyhd8ed1ab_0 - - attrs=23.1.0=pyh71513ae_1 - aws-c-auth=0.7.3=h0127223_1 - aws-c-cal=0.6.1=hfb91821_1 - aws-c-common=0.9.0=hcfcfb64_0 @@ -50,13 +49,13 @@ dependencies: - brotli-python=1.0.9=py310h00ffb61_9 - bzip2=1.0.8=hcfcfb64_5 - c-ares=1.21.0=hcfcfb64_0 - - ca-certificates=2023.7.22=h56e8100_0 + - ca-certificates=2025.1.31=h56e8100_0 - cached-property=1.5.2=hd8ed1ab_1 - cached_property=1.5.2=pyha770c72_1 - cachetools=5.3.2=pyhd8ed1ab_0 - cairo=1.18.0=h1fef639_0 - cartopy=0.22.0=py310hecd3228_1 - - certifi=2023.7.22=pyhd8ed1ab_0 + - certifi=2025.1.31=pyhd8ed1ab_0 - cffi=1.16.0=py310h8d17308_0 - cfitsio=4.3.0=h9b0cee5_0 - cftime=1.6.3=py310h3e78b6c_0 @@ -82,7 +81,6 @@ dependencies: - defusedxml=0.7.1=pyhd8ed1ab_0 - distlib=0.3.7=pyhd8ed1ab_0 - distributed=2023.10.1=pyhd8ed1ab_0 - - earthaccess=0.7.1=pyhd8ed1ab_0 - entrypoints=0.4=pyhd8ed1ab_0 - exceptiongroup=1.1.3=pyhd8ed1ab_0 - executing=2.0.1=pyhd8ed1ab_0 @@ -131,7 +129,6 @@ dependencies: - imageio=2.31.5=pyh8c1a49c_0 - importlib-metadata=6.8.0=pyha770c72_0 - importlib_metadata=6.8.0=hd8ed1ab_0 - - importlib_resources=6.1.1=pyhd8ed1ab_0 - intel-openmp=2023.2.0=h57928b3_50497 - ipykernel=6.26.0=pyha63f2e9_0 - ipython=8.17.2=pyh5737063_0 @@ -200,7 +197,7 @@ dependencies: - liblapack=3.9.0=19_win64_mkl - libnetcdf=4.9.2=nompi_h8284064_112 - libpng=1.6.39=h19919ed_0 - - libpq=16.1=h43585b0_0 + - libpq=16.3=hab9416b_0 - libprotobuf=3.21.12=h12be248_2 - librttopo=1.1.0=h92c5fdb_14 - libsodium=1.0.18=h8d14728_1 @@ -213,7 +210,7 @@ dependencies: - libutf8proc=2.8.0=h82a8f57_0 - libwebp-base=1.3.2=hcfcfb64_0 - libxcb=1.15=hcd874cb_0 - - libxml2=2.11.5=hc3477c8_1 + - libxml2=2.12.7=h283a6d9_1 - libzip=1.10.1=h1d365fa_3 - libzlib=1.2.13=hcfcfb64_5 - linkify-it-py=2.0.0=pyhd8ed1ab_0 @@ -264,7 +261,7 @@ dependencies: - opencensus=0.11.3=pyhd8ed1ab_0 - opencensus-context=0.1.3=py310h5588dad_2 - openjpeg=2.5.0=h3d672ee_3 - - openssl=3.1.4=hcfcfb64_0 + - openssl=3.4.1=ha4e3fda_0 - orc=1.9.0=hada7b9e_1 - overrides=7.4.0=pyhd8ed1ab_0 - packaging=23.2=pyhd8ed1ab_0 @@ -285,7 +282,7 @@ dependencies: - platformdirs=3.11.0=pyhd8ed1ab_0 - poppler=23.10.0=hc2f3c52_0 - poppler-data=0.4.12=hd8ed1ab_0 - - postgresql=16.1=hc80876b_0 + - postgresql=16.3=h7f155c9_0 - pqdm=0.2.0=pyhd8ed1ab_0 - proj=9.3.0=he13c7e8_2 - prometheus_client=0.18.0=pyhd8ed1ab_0 @@ -316,7 +313,6 @@ dependencies: - pystac=1.9.0=pyhd8ed1ab_0 - pystac-client=0.7.5=pyhd8ed1ab_0 - python=3.10.13=h4de0772_0_cpython - - python-cmr=0.9.0=pyhd8ed1ab_0 - python-dateutil=2.8.2=pyhd8ed1ab_0 - python-fastjsonschema=2.18.1=pyhd8ed1ab_0 - python-json-logger=2.0.7=pyhd8ed1ab_0 @@ -379,8 +375,6 @@ dependencies: - tqdm=4.66.1=pyhd8ed1ab_0 - traitlets=5.13.0=pyhd8ed1ab_0 - types-python-dateutil=2.8.19.14=pyhd8ed1ab_0 - - typing-extensions=4.8.0=hd8ed1ab_0 - - typing_extensions=4.8.0=pyha770c72_0 - typing_utils=0.1.0=pyhd8ed1ab_0 - tzdata=2023c=h71feb2d_0 - uc-micro-py=1.0.1=pyhd8ed1ab_0 @@ -396,7 +390,6 @@ dependencies: - wcwidth=0.2.9=pyhd8ed1ab_0 - webcolors=1.13=pyhd8ed1ab_0 - webencodings=0.5.1=pyhd8ed1ab_2 - - websocket-client=1.6.4=pyhd8ed1ab_0 - wheel=0.41.3=pyhd8ed1ab_0 - widgetsnbextension=4.0.9=pyhd8ed1ab_0 - win_inet_pton=1.1.0=pyhd8ed1ab_6 @@ -418,3 +411,16 @@ dependencies: - zipp=3.17.0=pyhd8ed1ab_0 - zlib=1.2.13=hcfcfb64_5 - zstd=1.5.5=h12be248_0 + - pip: + - attrs==25.3.0 + - earthaccess==0.14.0 + - h11==0.14.0 + - importlib-resources==6.5.2 + - outcome==1.3.0.post0 + - python-cmr==0.13.0 + - selenium==4.30.0 + - trio==0.29.0 + - trio-websocket==0.12.2 + - typing-extensions==4.12.2 + - websocket-client==1.8.0 + - wsproto==1.2.0 diff --git a/utils/__init__.py b/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/utils/common_params.py b/utils/common_params.py new file mode 100644 index 0000000..8f21699 --- /dev/null +++ b/utils/common_params.py @@ -0,0 +1,44 @@ +""" +通用性参数 + +目前只包含 SMAP L4 数据的相关的坐标参数 + +Reference: +- https://github.com/arthur-e/pyl4c +""" + +EPSG = { + # EPSG:6933 圆柱等面积投影坐标系可用于全球范围数据, 如 SMAP L4 数据 + 6933: """PROJCS["WGS 84 / NSIDC EASE-Grid 2.0 Global", + GEOGCS["WGS 84", + DATUM["WGS_1984", + SPHEROID["WGS 84",6378137,298.257223563, + AUTHORITY["EPSG","7030"]], + AUTHORITY["EPSG","6326"]], + PRIMEM["Greenwich",0, + AUTHORITY["EPSG","8901"]], + UNIT["degree",0.0174532925199433, + AUTHORITY["EPSG","9122"]], + AUTHORITY["EPSG","4326"]], + PROJECTION["Cylindrical_Equal_Area"], + PARAMETER["standard_parallel_1",30], + PARAMETER["central_meridian",0], + PARAMETER["false_easting",0], + PARAMETER["false_northing",0], + UNIT["metre",1,AUTHORITY["EPSG","9001"]], + AXIS["Easting",EAST], + AXIS["Northing",NORTH], + AUTHORITY["EPSG","6933"]]""", +} + +EASE2_GRID_PARAMS = { + # A GeoTransform for a north-up raster is: + # (x_min, pixel_width, 0, y_max, 0, -pixel_height) + "M09": { + "epsg": 6933, + "geotransform": (-17367530.45, 9000, 0, 7314540.83, 0, -9000), + "resolution": 9008.055210146, # From Brodzik et al. + "shape": (1624, 3856), + "size": 1624 * 3856, + }, +} diff --git a/utils/common_utils.py b/utils/common_utils.py new file mode 100644 index 0000000..803dd18 --- /dev/null +++ b/utils/common_utils.py @@ -0,0 +1,787 @@ +# -*- coding: utf-8 -*- +""" +=============================================================================== +本模块为公共工具函数模块 + +------------------------------------------------------------------------------- +Authors: Hong Xie +Last Updated: 2025-07-07 +=============================================================================== +""" + +import os +import sys +import glob +import json +import logging +from datetime import datetime +import earthaccess +import numpy as np +import pandas as pd +from affine import Affine +from osgeo import gdal, gdal_array +from shapely import box +import xarray as xr +from rasterio.enums import Resampling +from rasterio.merge import merge +from rioxarray.merge import merge_arrays +from rioxarray import open_rasterio +import geopandas as gpd +import matplotlib.pyplot as plt + +gdal.UseExceptions() + + +def get_month_from_filenames(file_path): + """ + 从格式化后的文件名中提取月份 + + args: + file_path (str): 文件路径 + returns: + int: 月份 + """ + # 获取文件名中的年份与DOY + date = os.path.basename(file_path).split(".")[3] + # 结合年份和DOY, 判断当前文件的月份 + month = datetime.strptime(date, "%Y%j").month + return month + + +def group_by_month(file_list, out_path): + """ + 根据文件名中的日期, 将文件按月分组 + """ + grouped_files = {} + # 遍历文件列表, 将文件按月分组 + for file in file_list: + month = get_month_from_filenames(file) + # 将文件添加到对应月份的列表中 + if month not in grouped_files: + grouped_files[month] = [] + grouped_files[month].append(file) + # 将字典转换为按月份排序的列表 + grouped_files = [grouped_files[month] for month in sorted(grouped_files.keys())] + # 将结果存入json文件中 + with open(out_path, "w") as f: + json.dump(grouped_files, f) + return grouped_files + + +def time_index_from_filenames(file_list): + """ + 根据文件名创建时间索引, 用于时间序列分析 + """ + return [datetime.strptime(file.split(".")[-4], "%Y%jT%H%M%S") for file in file_list] + + +def square_kernel(radius): + """ + 方形核函数 + + 生成一个边长为 2 * radius + 1 且值全为 1 的方形核 + + args: + radius: 方形核半径, 使用半径可以确保核的大小为奇数 + """ + # 方形核的边长 = 2 * 核半径 + 1 + kernel_size = 2 * radius + 1 + kernel = np.ones((kernel_size, kernel_size), dtype=np.uint8) + return kernel + + +def array_calc_to_xr( + arr1: xr.DataArray | xr.Dataset, + arr2: xr.DataArray | xr.Dataset, + func: str, +) -> xr.DataArray: + """ + 数组计算 [返回带有属性的xarray.DataArray] + + 仅支持形状坐标一致的两数组, 先进行非空掩膜, 再进行计算 + + Parameters + ---------- + arr1 : xr.DataArray | xr.Dataset + 数组1 + arr2 : xr.DataArray | xr.Dataset + 数组2 + func : str + 计算函数, 支持 "subtract", "add", "multiply", "divide" + Returns + ------- + xr.DataArray + 计算结果 + """ + if isinstance(arr1, xr.Dataset): + arr1 = arr1.to_array() + if isinstance(arr2, xr.Dataset): + arr2 = arr2.to_array() + # 备份源数据坐标与属性 + org_attrs = arr1.attrs + org_dim = arr1.dims + org_coords = arr1.coords + org_crs = arr1.rio.crs if arr1.rio.crs else None + # 将输入数组转换为纯numpy数组, 计算时仅关注数组本身 + arr1 = arr1.values.astype(np.float32, copy=False) + arr2 = arr2.values.astype(np.float32, copy=False) + # 先过滤掉数组1中值为-9999的元素, 然后使用数组1对数组2进行非空掩膜 + arr1[arr1 == -9999] = np.nan + arr2[arr2 == -9999] = np.nan + # 再过滤数组1与数组2均为空的元素 + if arr1.shape == arr2.shape: + mask = np.isnan(arr1) & np.isnan(arr2) + arr1[mask] = np.nan + arr2[mask] = np.nan + else: + mask = None + # 根据计算函数进行计算 + func = func.lower() + if func == "subtract": + result = arr1 - arr2 + elif func == "add": + result = arr1 + arr2 + elif func == "multiply": + result = arr1 * arr2 + elif func == "divide": + # 若分母为0,则结果为nan + arr2[arr2 == 0] = np.nan + result = arr1 / arr2 + else: + raise ValueError("Unsupported operation") + # 释放不再使用的内存 + del arr1, arr2, mask + # 恢复因计算而丢失的属性与空间坐标系 + result = xr.DataArray( + data=result, + coords=org_coords, + dims=org_dim, + attrs=org_attrs, + ) + if org_crs: + result.rio.write_crs(org_crs, inplace=True) + return result + + +def array_calc( + arr1: xr.DataArray | xr.Dataset | np.ndarray, + arr2: xr.DataArray | xr.Dataset | np.ndarray, + func: str, +) -> np.ndarray: + """ + 数组计算 [仅返回数组本身即np.ndarray] + + 仅支持形状坐标一致的两数组, 先进行非空掩膜, 再进行计算 + + Parameters + ---------- + arr1 : xr.DataArray | xr.Dataset | np.ndarray + 数组1 [支持xarray.DataArray, xarray.Dataset, numpy.ndarray] + arr2 : xr.DataArray | xr.Dataset | np.ndarray + 数组2 [支持xarray.DataArray, xarray.Dataset, numpy.ndarray] + func : str + 计算函数, 支持 "subtract", "add", "multiply", "divide" + Returns + ------- + np.ndarray + 计算结果 + """ + # 将输入数组转换为纯numpy数组, 计算时仅关注数组本身 + if isinstance(arr1, xr.Dataset): + arr1 = arr1.to_array().values.astype(np.float32, copy=False) + if isinstance(arr2, xr.Dataset): + arr2 = arr2.to_array().values.astype(np.float32, copy=False) + if isinstance(arr1, xr.DataArray): + arr1 = arr1.values.astype(np.float32, copy=False) + if isinstance(arr2, xr.DataArray): + arr2 = arr2.values.astype(np.float32, copy=False) + # 先过滤掉数组1中值为-9999的元素, 然后使用数组1对数组2进行非空掩膜 + arr1[arr1 == -9999] = np.nan + arr2[arr2 == -9999] = np.nan + # 再过滤数组1与数组2均为空的元素 + if arr1.shape == arr2.shape: + if func == "divide": + # 若分母为0,则结果为nan + arr2[arr2 == 0] = np.nan + mask = np.isnan(arr1) & np.isnan(arr2) + arr1[mask] = np.nan + arr2[mask] = np.nan + else: + mask = None + # 根据计算函数进行计算 + func = func.lower() + if func == "subtract": + result = arr1 - arr2 + elif func == "add": + result = arr1 + arr2 + elif func == "multiply": + result = arr1 * arr2 + elif func == "divide": + result = arr1 / arr2 + else: + raise ValueError("Unsupported operation") + # 释放不再使用的内存 + del arr1, arr2, mask + return result + + +def setup_dask_environment(): + """ + Passes RIO environment variables to dask workers for authentication. + """ + import os + import rasterio + + cookie_file_path = os.path.expanduser("~/cookies.txt") + + global env + gdal_config = { + "GDAL_HTTP_UNSAFESSL": "YES", + "GDAL_HTTP_COOKIEFILE": cookie_file_path, + "GDAL_HTTP_COOKIEJAR": cookie_file_path, + "GDAL_DISABLE_READDIR_ON_OPEN": "YES", + "CPL_VSIL_CURL_ALLOWED_EXTENSIONS": "TIF", + "GDAL_HTTP_MAX_RETRY": "10", + "GDAL_HTTP_RETRY_DELAY": "0.5", + "GDAL_HTTP_TIMEOUT": "300", + } + + env = rasterio.Env(**gdal_config) + env.__enter__() + + +def setup_logging(log_file: str = "dask_worker.log"): + """ + 在Dask工作进程中设置logging + + Parameters + ---------- + log_file : str, optional + 日志文件路径, by default "dask_worker.log" + """ + + logging.basicConfig( + level=logging.INFO, + format="%(levelname)s:%(asctime)s ||| %(message)s", + handlers=[ + logging.StreamHandler(sys.stdout), + logging.FileHandler(log_file), + ], + ) + +def load_band_as_arr(org_tif_path, band_num=1): + """ + 读取波段数据 + + args: + org_tif_path (str): 原始tif文件路径 + band_num (int): 波段号, 默认为 1 + returns: + numpy.ndarray: 数组化波段数据 + """ + org_tif = gdal.Open(org_tif_path) + if not org_tif: + raise ValueError(f"GDAL could not open {org_tif_path}") + band = org_tif.GetRasterBand(band_num) + data = band.ReadAsArray() + # 获取 NoData 值 + nodata = band.GetNoDataValue() + if nodata is not None: + # 将 NoData 值替换为 NaN + data[data == nodata] = np.nan + return data + + +def get_proj_info(org_tif_path): + """ + 获取原始影像的投影和变换 + + args: + org_tif_path (str): 原始tif文件路径 + returns: + str: 投影与变换 + """ + org_tif = gdal.Open(org_tif_path) + projection = org_tif.GetProjection() + transform = org_tif.GetGeoTransform() + org_tif = None + return projection, transform + + +def calc_time_series(file_list, calc_method): + """ + 时间序列合成 + + args: + file_list (list): 文件列表 + calc_method (str): 计算方法, 包括 "mean", "median", "max", "min" + returns: + numpy.ndarray: 时间序列合成结果 + """ + if not file_list: + raise ValueError("file_list is empty.") + calc_method = calc_method.lower() + if calc_method == "mean": + data = np.nanmean([load_band_as_arr(file) for file in file_list], axis=0) + elif calc_method == "median": + data = np.nanmedian([load_band_as_arr(file) for file in file_list], axis=0) + elif calc_method == "max": + data = np.nanmax([load_band_as_arr(file) for file in file_list], axis=0) + elif calc_method == "min": + data = np.nanmin([load_band_as_arr(file) for file in file_list], axis=0) + else: + raise ValueError("Invalid calc_method.") + return data.astype(np.float32) + + +def save_as_tif(data, projection, transform, file_path): + """ + 保存为tif + + args: + data (numpy.ndarray): 要保存的数据 + projection (str): 投影 + transform (str): 变换 + file_path (str): 文件输出完整路径 + """ + if data is None: + return + y, x = data.shape + gtiff_driver = gdal.GetDriverByName("GTiff") + out_ds = gtiff_driver.Create( + file_path, x, y, 1, gdal.GDT_Float32, options=["COMPRESS=DEFLATE"] + ) + out_ds.SetGeoTransform(transform) + out_ds.SetProjection(projection) + out_band = out_ds.GetRasterBand(1) + out_band.WriteArray(data) + out_band.FlushCache() + out_ds = None # 确保文件正确关闭 + return + + +def array_to_raster( + data: np.ndarray, transform, wkt, dtype: str = None, nodata=-9999 +) -> gdal.Dataset: + """ + 将 numpy 数组转换为 gdal.Dataset 对象 + + reference: https://github.com/arthur-e/pyl4c/blob/master/pyl4c/spatial.py + + args: + data (numpy.ndarray): 待转换的 numpy 数组 + transform: (仿射)投影变换矩阵 + wkt (str): 投影坐标系信息 + dtype (str): 数据类型, 默认为 None + nodata (float): NoData 值, 默认为 -9999 + returns: + gdal.Dataset: 转换后的 gdal.Dataset 对象 + """ + if dtype is not None: + data = data.astype(dtype) + try: + rast = gdal_array.OpenNumPyArray(data) + except AttributeError: + # For backwards compatibility with older version of GDAL + rast = gdal.Open(gdal_array.GetArrayFilename(data)) + except: + rast = gdal_array.OpenArray(data) + rast.SetGeoTransform(transform) + rast.SetProjection(wkt) + if nodata is not None: + for band in range(1, rast.RasterCount + 1): + rast.GetRasterBand(band).SetNoDataValue(nodata) + return rast + + +def create_quality_mask(quality_data, bit_nums: list = [0, 1, 2, 3, 4, 5]): + """ + Uses the Fmask layer and bit numbers to create a binary mask of good pixels. + By default, bits 0-5 are used. + """ + mask_array = np.zeros((quality_data.shape[0], quality_data.shape[1])) + # Remove/Mask Fill Values and Convert to Integer + quality_data = np.nan_to_num(quality_data, 0).astype(np.int8) + for bit in bit_nums: + # Create a Single Binary Mask Layer + mask_temp = np.array(quality_data) & 1 << bit > 0 + mask_array = np.logical_or(mask_array, mask_temp) + return mask_array + + +def clip_image( + image: xr.DataArray | xr.Dataset, roi: gpd.GeoDataFrame, clip_by_box=True +): + """ + Clip Image data to ROI. + + args: + image (xarray.DataArray | xarray.Dataset): 通过 rioxarray.open_rasterio 加载的图像数据. + roi (gpd.GeoDataFrame): 感兴趣区数据. + clip_by_box (bool): 是否使用 bbox 进行裁剪, 默认为 True. + """ + + if roi is None: + return image + org_crs = image.rio.crs + if not roi.crs == org_crs: + # 若 crs 不一致, 则重新投影感兴趣区数据 + roi_bound = roi.to_crs(org_crs) + else: + roi_bound = roi + if clip_by_box: + clip_area = [box(*roi_bound.total_bounds)] + else: + clip_area = roi_bound.geometry.values + # 设置nodata值防止裁剪时未相交部分出现nan值 [仅对DataArray有效] + if isinstance(image, xr.DataArray): + nodata_value = -9999 + image.rio.write_nodata(nodata_value, inplace=True) + image_cliped = image.rio.clip( + clip_area, roi_bound.crs, all_touched=True, from_disk=True + ) + return image_cliped + + +def clip_roi_image(file_path: str, grid: gpd.GeoDataFrame) -> xr.DataArray | None: + """ + 按研究区范围裁剪影像 + + args: + file_path (str): 待裁剪影像路径 + grid (gpd.GeoDataFrame): 格网范围 + return: + raster_cliped (xr.DataArray): 裁剪后的影像 + """ + raster = open_rasterio(file_path) + try: + doy = os.path.basename(file_path).split(".")[3] + except Exception as e: + doy = None + if doy: + raster.attrs["DOY"] = doy + # 先对数据进行降维, 若存在band波段, 则降维为二维数组; 若不存在band波段, 则继续计算 + if "band" in raster.dims and raster.sizes["band"] == 1: + raster = raster.squeeze("band") + # 由于当前实施均在同一格网下进行, 且MODIS数据原始坐标为正弦投影, 在原始影像爬取与预处理阶段使用格网裁剪时并未裁剪出目标效果 + # 所以需要先使用格网裁剪, 再后使用感兴趣区域裁剪 + # TODO: 跨格网实施算法时, 修改为使用感兴趣区域裁剪 + if grid is not None: + raster_cliped = clip_image(raster, grid) + else: + raster_cliped = raster + # TODO: 待完善, 若裁剪后的数据全为空或空值数量大于裁剪后数据总数量, 则跳过 + # raster_clip_grid = raster_clip_grid.where(raster_clip_grid != -9999) + # if ( + # raster_clip_grid.count().item() == 0 + # or raster_clip_grid.isnull().sum() > raster_clip_grid.count().item() + # ): + # return + raster_cliped.attrs = raster.attrs.copy() + raster_cliped.attrs["file_name"] = os.path.basename(file_path) + return raster_cliped + + +def reproject_image( + image: xr.DataArray, + target_crs: str = None, + target_shape: tuple = None, + target_image: xr.DataArray = None, +): + """ + Reproject Image data to target CRS or target data. + + args: + image (xarray.DataArray): 通过 rioxarray.open_rasterio 加载的图像数据. + target_crs (str): Target CRS, eg. EPSG:4326. + target_shape (tuple): Target shape, eg. (1000, 1000). + target_image (xarray.DataArray): Target image, eg. rioxarray.open_rasterio 加载的图像数据. + """ + if target_image is not None: + # 使用 target_image 进行重投影匹配 + if ( + target_image.shape[1] > image.shape[1] + or target_image.shape[2] > image.shape[2] + ) or ( + target_image.shape[1] == image.shape[1] + and target_image.shape[2] == image.shape[2] + ): + # 若判断为降尺度/等尺度, 则直接使用 cubic_spline 重采样投影到目标影像 + image_reprojed = image.rio.reproject_match( + target_image, resampling=Resampling.cubic_spline + ) + else: + # print("target_image shape is not match with image shape", image.shape, "to", target_image.shape) + # 若判断为升尺度, 为减少图像像元丢失, 需要使用聚合类方法, 考虑使用 average/mode/med 重采样投影到目标影像 + image_reprojed = image.rio.reproject_match( + target_image, resampling=Resampling.med + ) + elif target_crs is not None: + # 使用 target_crs 进行重投影 + reproject_kwargs = { + "dst_crs": target_crs, + "resampling": Resampling.cubic_spline, + } + if target_shape is not None: + reproject_kwargs["shape"] = target_shape + image_reprojed = image.rio.reproject(**reproject_kwargs) + else: + # 没有任何重投影参数, 返回原始图像 + image_reprojed = image + return image_reprojed + + +def mosaic_images( + tif_list: list[str | xr.DataArray], + nodata=np.nan, + tif_crs: any = None, + method: str = "first", +) -> xr.DataArray: + """ + 影像镶嵌合成 + + 将同一天/同一区域的数据集进行镶嵌, 生成形状为 (1, y, x) 的 numpy 数组, 以及 transform + + Parameters + ---------- + tif_list : list[str | xr.DataArray] + 待镶嵌的影像文件路径列表 或已加载的 xarray.DataArray 影像数据列表 + nodata : float | int + 空值填充值 + tif_crs : Any, optional + tif影像的crs + method : str, optional + 合成方法, 默认为 "first", 可选 "last", "min", "max" + + Returns + ------- + ds : xr.DataArray + 镶嵌后的影像数据 + """ + if isinstance(tif_list[0], str): + ds_np, transform = merge( + tif_list, + nodata=nodata, + method=method, + resampling=Resampling.cubic_spline, + ) + # 将结果重新构建为 xarray 数据集 + # 单张SAR影像直接读取 transform: 233400.0 30.0 0.0 3463020.0 0.0 -30.0 + # 而镶嵌后输出 transform 为: (30.0, 0.0, 221250.0, 0.0, -30.0, 3536970.0) + x_scale, _, x_origin, _, y_scale, y_origin = transform[:6] + affine_transform = Affine.from_gdal( + *(y_origin, x_scale, 0.0, x_origin, 0.0, y_scale) + ) + y = np.arange(ds_np.shape[1]) * y_scale + y_origin + x = np.arange(ds_np.shape[2]) * x_scale + x_origin + ds = xr.DataArray( + ds_np, + coords={ + "band": np.arange(ds_np.shape[0]), + "y": y, + "x": x, + }, + dims=["band", "y", "x"], + ) + ds.rio.write_transform(affine_transform, inplace=True) + else: + ds = merge_arrays(tif_list, nodata=nodata, method=method) + if tif_crs is not None: + ds.rio.write_crs(tif_crs, inplace=True) + return ds + + +def merge_time_series(raster_dir: str) -> xr.Dataset: + """ + 合成时间序列 + + 读取指定目录内的所有tif文件, 将它们按照时间方向合并为一个xarray.Dataset + + Parameters + ---------- + raster_dir : str + 包含tif文件的目录路径 + + Returns + ------- + raster_dataset : xr.Dataset + 包含时间维度的xarray.Dataset + """ + raster_list = [] + # 遍历每个文件, 读取时间属性, 并将其作为新波段合并到dataset中 + for file in glob.glob(os.path.join(raster_dir, "*.tif")): + date = os.path.basename(file).split(".")[3] + if len(str(date)) < 7: + continue + # 读取影像 + raster_data = open_rasterio(file, masked=True).squeeze(dim="band", drop=True) + # 获取时间属性 + time_attr = datetime.strptime(date, "%Y%j") + # 将时间属性作为新的维度 + raster_data = raster_data.assign_coords(time=time_attr) + # 将新的波段添加到dataset中 + raster_list.append(raster_data) + + raster_dataset = xr.concat(raster_list, dim="time").sortby("time") + return raster_dataset + + +def get_value_at_point( + dataset: xr.Dataset, value_name: str, lon: float, lat: float +) -> pd.DataFrame: + """ + 从 Dataset 中提取点位置的值, 提取矢量区域所在的值的时间序列 + + Parameters + ---------- + dataset : xr.Dataset + 包含 x, y, time 维度的数据集 + value_name : str + 数据集的变量名 + lon : float + 经度 + lat : float + 纬度 + + Returns + ------- + df : pd.DataFrame + 包含时间序列的 DataFrame + """ + point_values = dataset.sel(x=lon, y=lat, method="nearest").to_dataset(dim="time") + # 将结果转换为 DataFrame + df = ( + point_values.to_dataframe() + .reset_index() + .melt(id_vars=["y", "x"], var_name="TIME", value_name=value_name) + .drop(columns=["y", "x"]) + ) + # 剔除time列为spatial_ref和值为-9999无效值的行 + df = df[df["TIME"] != "spatial_ref"] + df = df[df[value_name] != -9999] + # 过滤掉Nan值 + df.dropna(subset=[value_name], inplace=True) + df["TIME"] = pd.to_datetime(df["TIME"]) + return df + + +def valid_raster(raster_path, threshold=0.4) -> tuple[str, float] | None: + """ + 判断栅格数据是否有效 + + 有效数据占比超过阈值则认为有效, 返回栅格数据路径, 否则返回 None + + Parameters + ---------- + raster_path : str + 栅格数据路径 + threshold : float + 有效数据占比阈值 + + Returns + ------- + raster_path : str + 栅格数据路径 + valid_data_percentage : float + 有效数据占比 + """ + with open_rasterio(raster_path, masked=True) as raster: + raster = raster.where(raster != -9999) + all_data_count = raster.size + valid_data_count = raster.count().item() + valid_data_percentage = valid_data_count / all_data_count + if valid_data_percentage >= threshold: + return raster_path, valid_data_percentage + else: + return None + + +def valid_raster_list( + raster_path_list, threshold=0.4, only_path=False +) -> list[tuple[str, float]]: + """ + 判断栅格数据列表是否有效 + + 有效数据占比超过阈值则认为有效, 返回栅格数据路径列表, 否则返回空列表 + + Parameters + ---------- + raster_path_list : list[str] + 栅格数据路径列表 + threshold : float + 有效数据占比阈值 + only_path : bool, optional + 仅保留有效数据路径列表, 默认为 False + + Returns + ------- + valid_raster_path_list : list[tuple[str, float]] | list[str] + 有效栅格数据路径列表, 每个元素为 (栅格数据路径, 有效数据占比) 或栅格数据路径 + [] + 无效栅格数据 + """ + raster_path_list = list( + map( + lambda x: valid_raster(x, threshold), + raster_path_list, + ) + ) + # 剔除列表中的 None 值 + raster_path_list = list(filter(None, raster_path_list)) + if only_path: + raster_path_list = list(map(lambda x: x[0], raster_path_list)) + return raster_path_list + + +def match_utm_grid(roi: gpd.GeoDataFrame, mgrs_kml_file: str) -> gpd.GeoDataFrame: + """ + 根据 ROI 匹配对应的 ESA Sentinel-2 UTM 格网信息. + + NOTE: 需要在 fiona > 1.9 环境下才能运行. + """ + import fiona + + # enable KML support which is disabled by default + fiona.drvsupport.supported_drivers["LIBKML"] = "rw" + # bbox = tuple(list(roi.total_bounds)) + if not os.path.isfile(mgrs_kml_file): + kml_url = "https://hls.gsfc.nasa.gov/wp-content/uploads/2016/03/S2A_OPER_GIP_TILPAR_MPC__20151209T095117_V20150622T000000_21000101T000000_B00.kml" + earthaccess.download([kml_url], mgrs_kml_file) + # 尽管 geopandas.read_file() 可以直接读取在线资源, 但是由于格网 kml 文件过大 (约 106M), 导致加载会非常慢, 所以先下载到本地再进行读取 + mgrs_gdf = gpd.read_file(mgrs_kml_file, roi) + # 空间连接, 筛选网格中与ROI边界相交的部分 + grid_in_roi = gpd.sjoin(mgrs_gdf, roi, predicate="intersects", how="left") + # 剔除连接后的产生的冗余属性 + grid_in_roi = grid_in_roi[mgrs_gdf.columns].drop( + columns=[ + "description", + "timestamp", + "begin", + "end", + "altitudeMode", + "drawOrder", + ] + ) + # 处理GeometryCollection类型 + for i in range(len(grid_in_roi)): + grid = grid_in_roi.iloc[i] + # 将 GeometryCollection 转换为 Polygon + if grid.geometry.geom_type == "GeometryCollection": + grid_in_roi.at[i, "geometry"] = grid.geometry.geoms[0] + return grid_in_roi + + +def plot(data, title=None, cmap="gray"): + """ + 绘制影像图像 + + args: + data (numpy.ndarray): 要绘制的数据 + title (str): 标题 + cmap (str): 颜色映射 + """ + plt.imshow(data) + plt.title(title) + plt.axis("off") # 关闭坐标轴 + plt.colorbar() + plt.show()