# -*- coding: utf-8 -*- """ =============================================================================== This module contains functions related to searching and preprocessing HLS data. ------------------------------------------------------------------------------- Authors: Mahsa Jami, Cole Krehbiel, and Erik Bolch Contact: lpdaac@usgs.gov Editor: Hong Xie Last Updated: 2025-01-06 =============================================================================== """ # Import necessary packages import numpy as np import earthaccess # Main function to search and filter HLS data def hls_search( roi: list, band_dict: dict, dates=None, cloud_cover=None, tile_id=None, log=False ): """ This function uses earthaccess to search for HLS data using an roi and temporal parameter, filter by cloud cover and delivers a list of results urls for the selected bands. """ # Search for data results = earthaccess.search_data( short_name=list(band_dict.keys()), # Band dict contains shortnames as keys polygon=roi, temporal=dates, ) # (Add) 根据瓦片ID过滤影像 if tile_id: results = tileid_filter(results, tile_id) # Filter by cloud cover if cloud_cover: results = hls_cc_filter(results, cloud_cover) # Get results urls results_urls = [granule.data_links() for granule in results] # Flatten url list # results_urls = [item for sublist in results_urls for item in sublist] # Filter url list based on selected bands selected_results_urls = [ get_selected_bands_urls(granule_urls, band_dict) for granule_urls in results_urls ] return selected_results_urls def tileid_filter(results, tile_id): """ (Add) 基于给定的瓦片ID过滤earthaccess检索的数据结果 实测可过滤数据集: HLS.L30, HLS.S30, MCD43A4 """ tile_ids = [] for result in results: # 从json中检索瓦片ID,转换为字符串并放入数组中 tmp_id = str(result["meta"]["native-id"].split(".")[2]) tile_ids.append(tmp_id) tile_ids = np.array(tile_ids) # 根据瓦片ID找到对应的索引 tile_id_indices = np.where(tile_ids == tile_id) # 根据索引过滤结果 return [results[i] for i in tile_id_indices[0]] # Filter earthaccess results based on cloud cover threshold def hls_cc_filter(results, cc_threshold): """ This function filters a list of earthaccess results based on a cloud cover threshold. """ cc = [] for result in results: # Retrieve Cloud Cover from json, convert to float and place in numpy array cc.append( float( next( ( aa for aa in result["umm"]["AdditionalAttributes"] if aa.get("Name") == "CLOUD_COVERAGE" ), None, )["Values"][0] ) ) cc = np.array(cc) # Find indices based on cloud cover threshold cc_indices = np.where(cc <= cc_threshold) # Filter results based on indices return [results[i] for i in cc_indices[0]] # Filter results urls based on selected bands def get_selected_bands_urls(url_list, band_dict): """ This function filters a list of results urls based on HLS collection and selected bands. """ selected_bands_urls = [] # Loop through urls for url in url_list: # Filter bands based on band dictionary for collection, nested_dict in band_dict.items(): if collection in url: for band in nested_dict.values(): if band in url: selected_bands_urls.append(url) return selected_bands_urls