276 lines
9.6 KiB
Python

# -*- coding: utf-8 -*-
"""
===============================================================================
This module contains functions related to searching and preprocessing HLS data.
-------------------------------------------------------------------------------
Authors: Mahsa Jami, Cole Krehbiel, and Erik Bolch
Contact: lpdaac@usgs.gov
Editor: Hong Xie
Last Updated: 2025-10-16
===============================================================================
"""
# Import necessary packages
import os
import logging
from pathlib import Path
import numpy as np
import earthaccess
import geopandas as gpd
from shapely.geometry import box
from shapely.geometry.polygon import orient
def ensure_ccw(geom):
"""
Ensure the exterior ring of the polygon is counterclockwise.
"""
if geom.exterior.is_ccw:
return geom # Already counterclockwise
else:
return orient(geom, sign=1.0) # Make it counterclockwise
def format_roi(roi: Path):
"""
Determines if submitted ROI is a file or bbox coordinates.
If a file, opens a GeoJSON or shapefile and creates a list of polygon vertices in the correct order. If the file has multiple polygons it will use a unary union convex hull of the external bounds.
If bbox coordinates, creates a geodataframe with a single Polygon geometry.
Returns a geopandas dataframe for clipping and a list of vertices for searching.
"""
if os.path.isfile(roi): # and roi.endswith(("geojson", "shp")):
try:
# Open ROI if file
roi = gpd.read_file(roi)
# Check if ROI is in Geographic CRS, if not, convert to it
if not roi.crs.is_geographic:
roi = roi.to_crs("EPSG:4326")
logging.info(
"Note: ROI submitted is being converted to Geographic CRS (EPSG:4326)"
)
# (Add) 添加对多种几何图形类型的支持, 将MultiPolygon合并为Polygon
if len(roi) > 1 or roi.geometry[0].geom_type == "MultiPolygon":
# Merge all Polygon geometries and create external boundary
logging.info(
"Multiple polygons detected. Creating single geometry of external coordinates."
)
single_geometry = roi.union_all().convex_hull
roi = gpd.GeoDataFrame(geometry=[single_geometry], crs=roi.crs)
roi["geometry"] = roi["geometry"].apply(ensure_ccw)
# List Vertices in correct order for search
# (Add) 使用外包矩形坐标简化提交检索时使用的坐标, 仅支持逆时针
vertices_list = list(roi.geometry[0].exterior.coords)
except (FileNotFoundError, ValueError):
sys.exit(
f"The GeoJSON/shapefile is either not valid or could not be found.\nPlease double check the name and provide the absolute path to the file or make sure that it is located in {os.getcwd()}"
)
else:
# If bbox coordinates are submitted
bbox = tuple(map(float, roi.strip("'\"").split(",")))
print(bbox)
# Convert bbox to a geodataframe for clipping
roi = gpd.GeoDataFrame(geometry=[box(*bbox)], crs="EPSG:4326")
roi["geometry"] = roi["geometry"].apply(ensure_ccw)
vertices_list = list(roi.geometry[0].convex_hull.coords)
return (roi, vertices_list)
def earthdata_search(
asset_name: list,
dates: tuple = None,
region_file: Path = None,
tile_id: str = None,
hours: tuple = None,
log=False,
):
"""
This function uses earthaccess to search for Open Source Earth Data using an roi and temporal parameter, filter by tile id and delivers a list of results urls.
For example:
- MODIS: MCD43A3, MCD43A4, MOD11A1, MOD11A2, ...
- SMAP: SPL3SMP_E, SPL4SMGP, ...
- GPM: GPM_3IMERGDL, ...
- DEM: NASADEM_HGT, NASADEM_SC, ALOS_PSR_RTC_HIGH, ALOS_PSR_RTC_LOW, ...
"""
# Search for data
if not region_file:
# 全球范围的数据集不需要roi参数, 如 SMAP, GPM
results = earthaccess.search_data(
short_name=list(asset_name),
temporal=dates,
)
else:
# 基于 roi 的外包多边形进行数据检索
roi, vertices_list = format_roi(region_file)
if not dates:
# 适用非时间序列数据, 如 DEM
results = earthaccess.search_data(
short_name=list(asset_name),
polygon=vertices_list,
)
else:
results = earthaccess.search_data(
short_name=list(asset_name),
polygon=vertices_list,
temporal=dates,
)
# 根据瓦片ID过滤影像
if tile_id:
results = tileid_filter(results, tile_id)
# 根据影像当日时间过滤影像, 用于GMP, SMAP等高时间分辨率数据
if hours:
results = hours_filter(results, hours)
# Get results urls
results_urls = [granule.data_links() for granule in results]
return results_urls
# Main function to search and filter HLS data
def hls_search(
roi: list, band_dict: dict, dates=None, cloud_cover=None, tile_id=None, log=False
):
"""
This function uses earthaccess to search for HLS data using an roi and temporal parameter, filter by cloud cover and delivers a list of results urls for the selected bands.
"""
# Search for data
results = earthaccess.search_data(
short_name=list(band_dict.keys()), # Band dict contains shortnames as keys
polygon=roi,
temporal=dates,
)
# (Add) 根据瓦片ID过滤影像
if tile_id:
results = tileid_filter(results, tile_id)
# Filter by cloud cover
if cloud_cover:
results = hls_cc_filter(results, cloud_cover)
# Get results urls
results_urls = [granule.data_links() for granule in results]
# Flatten url list
# results_urls = [item for sublist in results_urls for item in sublist]
# Filter url list based on selected bands
selected_results_urls = [
get_selected_bands_urls(granule_urls, band_dict)
for granule_urls in results_urls
]
return selected_results_urls
def tileid_filter(results, tile_id):
"""
(Add) 基于给定的瓦片ID过滤 earthaccess 检索的数据结果
实测可过滤数据集:
HLS.L30, HLS.S30, MCD43A3, MCD43A4, MOD11A1, NASADEM, OPERA_L2_RTC-S1_V1, OPERA_L2_RTC-S1-STATIC_V1 ...
"""
tile_ids = []
for result in results:
# 从json中检索瓦片ID, 转换为字符串并放入数组中
native_id = result["meta"]["native-id"]
tmp_id = None
try:
if "OPERA_L2_RTC-S1" in native_id:
tmp_id = str(native_id.split("_")[3].split("-")[0])
else:
tmp_id = str(native_id.split(".")[2])
except IndexError:
tmp_id = str(native_id.split("_")[2])
except:
continue
if tmp_id:
tile_ids.append(tmp_id)
if len(tile_ids) > 0:
tile_ids = np.array(tile_ids)
# 根据瓦片ID找到对应的索引
tile_id_indices = np.where(tile_ids == tile_id)
# 根据索引过滤结果
return [results[i] for i in tile_id_indices[0]]
else:
return results
def hours_filter(results, hours):
"""
(Add) 基于给定的影像当日时间过滤earthaccess检索的数据结果
实测可过滤数据集: SMAP SPL4SMGP
"""
tmp_hours = []
hours = tuple(map(lambda x: x.replace(":", ""), hours)) # 如: ('010000', '143000')
for result in results:
# 从json中检索影像当日时间, 转换为字符串并放入数组中
try:
tmp_hour = str(
result["meta"]["native-id"].split("_")[4].split("T")[1]
) # 如: 013000
tmp_hours.append(tmp_hour)
except:
pass
tmp_hours = np.array(tmp_hours)
# 影像当日时间若在时间范围内, 找到对应的索引
hour_indices = np.where((tmp_hours >= hours[0]) & (tmp_hours <= hours[-1]))
# 根据索引过滤结果
return [results[i] for i in hour_indices[0]]
# Filter earthaccess results based on cloud cover threshold
def hls_cc_filter(results, cc_threshold):
"""
This function filters a list of earthaccess results based on a cloud cover threshold.
"""
cc = []
for result in results:
# Retrieve Cloud Cover from json, convert to float and place in numpy array
cc.append(
float(
next(
(
aa
for aa in result["umm"]["AdditionalAttributes"]
if aa.get("Name") == "CLOUD_COVERAGE"
),
None,
)["Values"][0]
)
)
cc = np.array(cc)
# Find indices based on cloud cover threshold
cc_indices = np.where(cc <= cc_threshold)
# Filter results based on indices
return [results[i] for i in cc_indices[0]]
# Filter results urls based on selected bands
def get_selected_bands_urls(url_list, band_dict):
"""
This function filters a list of results urls based on HLS collection and selected bands.
"""
selected_bands_urls = []
# Loop through urls
for url in url_list:
# Filter bands based on band dictionary
for collection, nested_dict in band_dict.items():
if collection in url:
for band in nested_dict.values():
if band in url:
selected_bands_urls.append(url)
return selected_bands_urls