# -*- coding: utf-8 -*- """ =============================================================================== This module contains functions related to searching and preprocessing HLS data. ------------------------------------------------------------------------------- Authors: Mahsa Jami, Cole Krehbiel, and Erik Bolch Contact: lpdaac@usgs.gov Editor: Hong Xie Last Updated: 2025-02-20 =============================================================================== """ # Import necessary packages import numpy as np import earthaccess def earthdata_search( asset_name: list, dates: tuple = None, roi: list = None, tile_id: str = None, hours: tuple = None, log=False, ): """ This function uses earthaccess to search for Open Source Earth Data using an roi and temporal parameter, filter by tile id and delivers a list of results urls. For example: - MODIS: MCD43A3, MCD43A4, MOD11A1, MOD11A2, ... - SMAP: SPL3SMP_E, SPL4SMGP, ... - DEM: NASADEM_HGT, NASADEM_SC, ... """ # Search for data if dates and not roi: # 全球范围的数据集不需要roi参数 results = earthaccess.search_data( short_name=list(asset_name), temporal=dates, ) elif roi and not dates: # 适用非时间序列数据, 如 DEM results = earthaccess.search_data( short_name=list(asset_name), bounding_box=roi, ) else: results = earthaccess.search_data( short_name=list(asset_name), bounding_box=roi, temporal=dates, ) # 根据瓦片ID过滤影像 if tile_id: results = tileid_filter(results, tile_id) # 根据影像当日时间过滤影像, 用于GMP, SMAP等高时间分辨率数据 if hours: results = hours_filter(results, hours) # Get results urls results_urls = [granule.data_links() for granule in results] return results_urls # Main function to search and filter HLS data def hls_search( roi: list, band_dict: dict, dates=None, cloud_cover=None, tile_id=None, log=False ): """ This function uses earthaccess to search for HLS data using an roi and temporal parameter, filter by cloud cover and delivers a list of results urls for the selected bands. """ # Search for data results = earthaccess.search_data( short_name=list(band_dict.keys()), # Band dict contains shortnames as keys polygon=roi, temporal=dates, ) # (Add) 根据瓦片ID过滤影像 if tile_id: results = tileid_filter(results, tile_id) # Filter by cloud cover if cloud_cover: results = hls_cc_filter(results, cloud_cover) # Get results urls results_urls = [granule.data_links() for granule in results] # Flatten url list # results_urls = [item for sublist in results_urls for item in sublist] # Filter url list based on selected bands selected_results_urls = [ get_selected_bands_urls(granule_urls, band_dict) for granule_urls in results_urls ] return selected_results_urls def tileid_filter(results, tile_id): """ (Add) 基于给定的瓦片ID过滤 earthaccess 检索的数据结果 实测可过滤数据集: HLS.L30, HLS.S30, MCD43A3, MCD43A4, MOD11A1, NASADEM, OPERA_L2_RTC-S1_V1, OPERA_L2_RTC-S1-STATIC_V1 ... """ tile_ids = [] for result in results: # 从json中检索瓦片ID, 转换为字符串并放入数组中 native_id = result["meta"]["native-id"] tmp_id = None try: if "OPERA_L2_RTC-S1" in native_id: tmp_id = str(native_id.split("_")[3].split("-")[0]) else: tmp_id = str(native_id.split(".")[2]) except IndexError: tmp_id = str(native_id.split("_")[2]) except: continue if tmp_id: tile_ids.append(tmp_id) if len(tile_ids) > 0: tile_ids = np.array(tile_ids) # 根据瓦片ID找到对应的索引 tile_id_indices = np.where(tile_ids == tile_id) # 根据索引过滤结果 return [results[i] for i in tile_id_indices[0]] else: return results def hours_filter(results, hours): """ (Add) 基于给定的影像当日时间过滤earthaccess检索的数据结果 实测可过滤数据集: SMAP SPL4SMGP """ tmp_hours = [] hours = tuple(map(lambda x: x.replace(":", ""), hours)) # 如: ('010000', '143000') for result in results: # 从json中检索影像当日时间, 转换为字符串并放入数组中 try: tmp_hour = str( result["meta"]["native-id"].split("_")[4].split("T")[1] ) # 如: 013000 tmp_hours.append(tmp_hour) except: pass tmp_hours = np.array(tmp_hours) # 影像当日时间若在时间范围内, 找到对应的索引 hour_indices = np.where((tmp_hours >= hours[0]) & (tmp_hours <= hours[-1])) # 根据索引过滤结果 return [results[i] for i in hour_indices[0]] # Filter earthaccess results based on cloud cover threshold def hls_cc_filter(results, cc_threshold): """ This function filters a list of earthaccess results based on a cloud cover threshold. """ cc = [] for result in results: # Retrieve Cloud Cover from json, convert to float and place in numpy array cc.append( float( next( ( aa for aa in result["umm"]["AdditionalAttributes"] if aa.get("Name") == "CLOUD_COVERAGE" ), None, )["Values"][0] ) ) cc = np.array(cc) # Find indices based on cloud cover threshold cc_indices = np.where(cc <= cc_threshold) # Filter results based on indices return [results[i] for i in cc_indices[0]] # Filter results urls based on selected bands def get_selected_bands_urls(url_list, band_dict): """ This function filters a list of results urls based on HLS collection and selected bands. """ selected_bands_urls = [] # Loop through urls for url in url_list: # Filter bands based on band dictionary for collection, nested_dict in band_dict.items(): if collection in url: for band in nested_dict.values(): if band in url: selected_bands_urls.append(url) return selected_bands_urls