# -*- coding: utf-8 -*- """ =============================================================================== This module contains functions related to searching and preprocessing HLS data. ------------------------------------------------------------------------------- Authors: Mahsa Jami, Cole Krehbiel, and Erik Bolch Contact: lpdaac@usgs.gov Editor: Hong Xie Last Updated: 2025-10-16 =============================================================================== """ # Import necessary packages import os import logging from pathlib import Path import numpy as np import earthaccess import geopandas as gpd from shapely.geometry import box from shapely.geometry.polygon import orient def ensure_ccw(geom): """ Ensure the exterior ring of the polygon is counterclockwise. """ if geom.exterior.is_ccw: return geom # Already counterclockwise else: return orient(geom, sign=1.0) # Make it counterclockwise def format_roi(roi: Path): """ Determines if submitted ROI is a file or bbox coordinates. If a file, opens a GeoJSON or shapefile and creates a list of polygon vertices in the correct order. If the file has multiple polygons it will use a unary union convex hull of the external bounds. If bbox coordinates, creates a geodataframe with a single Polygon geometry. Returns a geopandas dataframe for clipping and a list of vertices for searching. """ if os.path.isfile(roi): # and roi.endswith(("geojson", "shp")): try: # Open ROI if file roi = gpd.read_file(roi) # Check if ROI is in Geographic CRS, if not, convert to it if not roi.crs.is_geographic: roi = roi.to_crs("EPSG:4326") logging.info( "Note: ROI submitted is being converted to Geographic CRS (EPSG:4326)" ) # (Add) 添加对多种几何图形类型的支持, 将MultiPolygon合并为Polygon if len(roi) > 1 or roi.geometry[0].geom_type == "MultiPolygon": # Merge all Polygon geometries and create external boundary logging.info( "Multiple polygons detected. Creating single geometry of external coordinates." ) single_geometry = roi.union_all().convex_hull roi = gpd.GeoDataFrame(geometry=[single_geometry], crs=roi.crs) roi["geometry"] = roi["geometry"].apply(ensure_ccw) # List Vertices in correct order for search # (Add) 使用外包矩形坐标简化提交检索时使用的坐标, 仅支持逆时针 vertices_list = list(roi.geometry[0].exterior.coords) except (FileNotFoundError, ValueError): sys.exit( f"The GeoJSON/shapefile is either not valid or could not be found.\nPlease double check the name and provide the absolute path to the file or make sure that it is located in {os.getcwd()}" ) else: # If bbox coordinates are submitted bbox = tuple(map(float, roi.strip("'\"").split(","))) print(bbox) # Convert bbox to a geodataframe for clipping roi = gpd.GeoDataFrame(geometry=[box(*bbox)], crs="EPSG:4326") roi["geometry"] = roi["geometry"].apply(ensure_ccw) vertices_list = list(roi.geometry[0].convex_hull.coords) return (roi, vertices_list) def earthdata_search( asset_name: list, dates: tuple = None, region_file: Path = None, tile_id: str = None, hours: tuple = None, log=False, ): """ This function uses earthaccess to search for Open Source Earth Data using an roi and temporal parameter, filter by tile id and delivers a list of results urls. For example: - MODIS: MCD43A3, MCD43A4, MOD11A1, MOD11A2, ... - SMAP: SPL3SMP_E, SPL4SMGP, ... - GPM: GPM_3IMERGDL, ... - DEM: NASADEM_HGT, NASADEM_SC, ALOS_PSR_RTC_HIGH, ALOS_PSR_RTC_LOW, ... """ # Search for data if not region_file: # 全球范围的数据集不需要roi参数, 如 SMAP, GPM results = earthaccess.search_data( short_name=list(asset_name), temporal=dates, ) else: # 基于 roi 的外包多边形进行数据检索 roi, vertices_list = format_roi(region_file) if not dates: # 适用非时间序列数据, 如 DEM results = earthaccess.search_data( short_name=list(asset_name), polygon=vertices_list, ) else: results = earthaccess.search_data( short_name=list(asset_name), polygon=vertices_list, temporal=dates, ) # 根据瓦片ID过滤影像 if tile_id: results = tileid_filter(results, tile_id) # 根据影像当日时间过滤影像, 用于GMP, SMAP等高时间分辨率数据 if hours: results = hours_filter(results, hours) # Get results urls results_urls = [granule.data_links() for granule in results] return results_urls # Main function to search and filter HLS data def hls_search( roi: list, band_dict: dict, dates=None, cloud_cover=None, tile_id=None, log=False ): """ This function uses earthaccess to search for HLS data using an roi and temporal parameter, filter by cloud cover and delivers a list of results urls for the selected bands. """ # Search for data results = earthaccess.search_data( short_name=list(band_dict.keys()), # Band dict contains shortnames as keys polygon=roi, temporal=dates, ) # (Add) 根据瓦片ID过滤影像 if tile_id: results = tileid_filter(results, tile_id) # Filter by cloud cover if cloud_cover: results = hls_cc_filter(results, cloud_cover) # Get results urls results_urls = [granule.data_links() for granule in results] # Flatten url list # results_urls = [item for sublist in results_urls for item in sublist] # Filter url list based on selected bands selected_results_urls = [ get_selected_bands_urls(granule_urls, band_dict) for granule_urls in results_urls ] return selected_results_urls def tileid_filter(results, tile_id): """ (Add) 基于给定的瓦片ID过滤 earthaccess 检索的数据结果 实测可过滤数据集: HLS.L30, HLS.S30, MCD43A3, MCD43A4, MOD11A1, NASADEM, OPERA_L2_RTC-S1_V1, OPERA_L2_RTC-S1-STATIC_V1 ... """ tile_ids = [] for result in results: # 从json中检索瓦片ID, 转换为字符串并放入数组中 native_id = result["meta"]["native-id"] tmp_id = None try: if "OPERA_L2_RTC-S1" in native_id: tmp_id = str(native_id.split("_")[3].split("-")[0]) else: tmp_id = str(native_id.split(".")[2]) except IndexError: tmp_id = str(native_id.split("_")[2]) except: continue if tmp_id: tile_ids.append(tmp_id) if len(tile_ids) > 0: tile_ids = np.array(tile_ids) # 根据瓦片ID找到对应的索引 tile_id_indices = np.where(tile_ids == tile_id) # 根据索引过滤结果 return [results[i] for i in tile_id_indices[0]] else: return results def hours_filter(results, hours): """ (Add) 基于给定的影像当日时间过滤earthaccess检索的数据结果 实测可过滤数据集: SMAP SPL4SMGP """ tmp_hours = [] hours = tuple(map(lambda x: x.replace(":", ""), hours)) # 如: ('010000', '143000') for result in results: # 从json中检索影像当日时间, 转换为字符串并放入数组中 try: tmp_hour = str( result["meta"]["native-id"].split("_")[4].split("T")[1] ) # 如: 013000 tmp_hours.append(tmp_hour) except: pass tmp_hours = np.array(tmp_hours) # 影像当日时间若在时间范围内, 找到对应的索引 hour_indices = np.where((tmp_hours >= hours[0]) & (tmp_hours <= hours[-1])) # 根据索引过滤结果 return [results[i] for i in hour_indices[0]] # Filter earthaccess results based on cloud cover threshold def hls_cc_filter(results, cc_threshold): """ This function filters a list of earthaccess results based on a cloud cover threshold. """ cc = [] for result in results: # Retrieve Cloud Cover from json, convert to float and place in numpy array cc.append( float( next( ( aa for aa in result["umm"]["AdditionalAttributes"] if aa.get("Name") == "CLOUD_COVERAGE" ), None, )["Values"][0] ) ) cc = np.array(cc) # Find indices based on cloud cover threshold cc_indices = np.where(cc <= cc_threshold) # Filter results based on indices return [results[i] for i in cc_indices[0]] # Filter results urls based on selected bands def get_selected_bands_urls(url_list, band_dict): """ This function filters a list of results urls based on HLS collection and selected bands. """ selected_bands_urls = [] # Loop through urls for url in url_list: # Filter bands based on band dictionary for collection, nested_dict in band_dict.items(): if collection in url: for band in nested_dict.values(): if band in url: selected_bands_urls.append(url) return selected_bands_urls