open-earthdata-cli/src/utils/raw_to_cog.py
xhong 5e96d0e026 refactor: 重构项目目录,添加全套遥感数据处理模块
- 重构项目目录结构,将零散代码整理到src目录下,创建各子包初始化文件
- 包括HLS、GPM、SMAP、MODIS等多源遥感数据的搜索、下载与预处理脚本
- 包括通用工具函数库,包含坐标参数定义、格式转换、影像处理等工具
- 同步更新README.md示例命令路径,修正脚本调用的目录错误
- 新增AGENTS.md文档说明开发环境配置
- 更新.gitignore规则,添加环境变量和构建缓存的忽略配置
2026-05-18 19:09:07 +08:00

215 lines
7.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
"""
===============================================================================
传统GeoTIFF转COG
COG (Cloud Optimized GeoTIFF) 是一种优化设计的 GeoTIFF 文件格式, 特别适用于云环境下
的存储和访问. 自带分块存储功能与影像金字塔, 能够在请求时按需加载, 实现了高效的压缩和传输.
-------------------------------------------------------------------------------
Authors: CVEO Team
Last Updated: 2026-04-22
===============================================================================
"""
import logging
import os
import sys
from pathlib import Path
import numpy as np
from osgeo import gdal
# 添加项目根目录到 sys.path确保作为独立脚本运行时也能正确导入
BASE_DIR = Path(__file__).parent.parent.parent
sys.path.append(str(BASE_DIR))
from src.utils.common_utils import setup_logging
gdal.UseExceptions()
# 设置 GDAL 选项以优化性能
gdal.SetConfigOption("GDAL_NUM_THREADS", "ALL_CPUS")
gdal.SetConfigOption("GDAL_CACHEMAX", "1024")
def check_and_get_image_info(ds):
"""
检查影像基本信息以及是否存在 NaN 或 NoData 值
Returns
-------
tuple: (has_invalid, original_no_data)
has_invalid: bool
是否存在 NaN 或 NoData 值
original_no_data: float or None
原影像定义的 NoData 值
"""
width = ds.RasterXSize
height = ds.RasterYSize
band_count = ds.RasterCount
logging.info(f"影像尺寸: {width}x{height}, 波段数: {band_count}")
has_invalid = False
original_no_data = None
# 读取第一个波段的 NoData 值作为参考
if band_count > 0:
original_no_data = ds.GetRasterBand(1).GetNoDataValue()
if original_no_data is not None:
logging.info(f"波段 1: 原影像定义的 NoData 值为 {original_no_data}")
has_invalid = True
else:
logging.info("波段 1: 原影像未定义 NoData 值")
# 仅检查是否存在 NaN 值
return has_invalid, original_no_data
def tif_to_cog(
input_path: str,
output_path: str,
output_type: int = gdal.GDT_Float32,
no_data: float = np.nan,
compress: str = "DEFLATE",
):
"""
将传统 GeoTIFF 转换为 COG 格式
Parameters
----------
input_path : str
输入 GeoTIFF 文件路径
output_path : str
输出 COG 文件路径
output_type : int, optional
输出数据类型, 本质是整数索引, 默认 float32, by default gdal.GDT_Float32
no_data : float, optional
无效值, 默认 np.nan, by default np.nan
compress : str, optional
压缩算法, 可选 "DEFLATE""LZW", by default "DEFLATE"
"""
logging.info(f"输入文件: {input_path}")
if not os.path.exists(str(input_path)):
input_path = str(input_path).replace(".tif", "(2).tif")
if not os.path.exists(input_path):
logging.error(f"无法找到输入文件: {input_path}")
return
ds = gdal.Open(input_path)
if ds is None:
logging.error(f"无法打开输入文件: {input_path}")
return
# 1. 检查影像信息和无效值
has_invalid, original_no_data = check_and_get_image_info(ds)
# 2. 确定最终使用的 NoData 值
target_no_data = no_data
# 如果用户没有指定特定的 no_data (即传入的是 nan), 则尝试沿用原数据的 nodata
# 如果原数据也没有 nodata, 则根据 output_type 设置默认值
if np.isnan(target_no_data):
if original_no_data is None:
if output_type == gdal.GDT_Int16:
target_no_data = -32768
elif output_type == gdal.GDT_Float32:
target_no_data = -9999
elif output_type == gdal.GDT_Byte:
target_no_data = None
else:
target_no_data = np.nan
logging.warning(
f"原数据未设无效值且未指定新无效值, 现根据输出类型设为: {target_no_data}"
)
else:
target_no_data = original_no_data
logging.info(f"沿用原数据无效值: {target_no_data}")
else:
logging.info(f"使用指定的新无效值: {target_no_data}")
# 3. 转换逻辑
# 若原始影像存在无效值(NaN 或 NoData) 且 指定了新的 NoData 值, 且 新旧 NoData 值不一致 / 存在 NaN 值时使用重映射 (gdal.Warp)
use_warp = False
if target_no_data is not None and not np.isnan(target_no_data):
if has_invalid:
if original_no_data != target_no_data:
use_warp = True
logging.info(
f"原始影像存在无效值且已指定新无效值 ({original_no_data} -> {target_no_data}), 将使用 Warp 进行处理"
)
else:
pass
# 开始转换
logging.info("开始转换为 COG 格式...")
creation_options = [
f"COMPRESS={compress}",
"PREDICTOR=2", # 差值预测, 利于影像压缩
"NUM_THREADS=ALL_CPUS",
"BIGTIFF=IF_SAFER",
# "ZLEVEL=4", # 压缩级别, 支持 1-9, 默认为 6, COG 不支持设置
# "TILED=YES", # 分块参数, COG 格式自带分块, 不支持手动设置
# "PHOTOMETRIC=RGB", # 可视化参数, COG 格式不支持设置
]
if use_warp:
# 使用 Warp
warp_options = gdal.WarpOptions(
format="COG", # 输出为 COG 格式, 自动构建金字塔
outputType=output_type,
dstNodata=target_no_data,
# srcNodata 未指定时, 将自动读取源数据的 nodata
# 若源数据存在 NaN 却未定义 nodata, 将自动处理 float 的 NaN
# srcNodata=original_no_data,
creationOptions=creation_options,
callback=gdal.TermProgress_nocb, # 进度回调, 不显示进度条
multithread=True,
)
gdal.Warp(str(output_path), ds, options=warp_options)
else:
# 使用 Translate (无需修改原数据时使用, 更高效)
# 但 gdal.Translate 无法将原始 NoData 直接转为新的 nodata
translate_options = gdal.TranslateOptions(
format="COG",
outputType=output_type,
noData=target_no_data,
creationOptions=creation_options,
callback=gdal.TermProgress_nocb,
)
gdal.Translate(str(output_path), ds, options=translate_options)
logging.info(f"结果已保存到: {output_path}")
ds = None
return
def main(input_path, output_path, output_type=gdal.GDT_Float32):
input_path = Path(input_path)
output_dir = Path(output_path).parent
os.makedirs(output_dir, exist_ok=True)
# 配置日志记录
log_file = output_dir / "toCOG.log"
setup_logging(str(log_file))
# 关于无效值
# Float32 类型可以设为 -9999
if output_type == gdal.GDT_Float32:
no_data = -9999
else:
no_data = np.nan
tif_to_cog(str(input_path), str(output_path), output_type, no_data=no_data)
if __name__ == "__main__":
# 输入目录: 包含分块tif影像的根目录
input_root = Path(r"D:\CVEOdata\RS_Data\Terrain\test")
# 输出目录: 存放最终COG结果的目录
output_root = Path(r"D:\CVEOdata\RS_Data\Terrain")
input_path = input_root / "GLO30.DSM.2014.Hubei.30m.tif"
output_path = output_root / "GLO30.DSM.2014.Hubei.30m.tif"
output_type = gdal.GDT_Float32
main(input_path, output_path, output_type)