# -*- coding: utf-8 -*- """ =============================================================================== 传统GeoTIFF转COG COG (Cloud Optimized GeoTIFF) 是一种优化设计的 GeoTIFF 文件格式, 特别适用于云环境下 的存储和访问. 自带分块存储功能与影像金字塔, 能够在请求时按需加载, 实现了高效的压缩和传输. ------------------------------------------------------------------------------- Authors: CVEO Team Last Updated: 2026-02-03 =============================================================================== """ import os import sys import logging from pathlib import Path from osgeo import gdal import numpy as np # 添加父目录到 sys.path 以导入 utils BASE_DIR = Path(__file__).parent.parent sys.path.append(str(BASE_DIR)) from utils.common_utils import setup_logging gdal.UseExceptions() # 设置 GDAL 选项以优化性能 gdal.SetConfigOption("GDAL_NUM_THREADS", "ALL_CPUS") gdal.SetConfigOption("GDAL_CACHEMAX", "1024") def check_and_get_image_info(ds): """ 检查影像基本信息以及是否存在 NaN 或 NoData 值 Returns ------- tuple: (has_invalid, original_no_data) has_invalid: bool 是否存在 NaN 或 NoData 值 original_no_data: float or None 原影像定义的 NoData 值 """ width = ds.RasterXSize height = ds.RasterYSize band_count = ds.RasterCount logging.info(f"影像尺寸: {width}x{height}, 波段数: {band_count}") has_invalid = False original_no_data = None # 读取第一个波段的 NoData 值作为参考 if band_count > 0: original_no_data = ds.GetRasterBand(1).GetNoDataValue() if original_no_data is not None: logging.info(f"波段 1: 原影像定义的 NoData 值为 {original_no_data}") has_invalid = True else: logging.info("波段 1: 原影像未定义 NoData 值") # 仅检查是否存在 NaN 值 return has_invalid, original_no_data def tif_to_cog( input_path: str, output_path: str, output_type: int = gdal.GDT_Float32, no_data: float = np.nan, compress: str = "DEFLATE", scaleParams: list = None, ): """ 将传统 GeoTIFF 转换为 COG 格式 Parameters ---------- input_path : str 输入 GeoTIFF 文件路径 output_path : str 输出 COG 文件路径 output_type : int, optional 输出数据类型, 本质是整数索引, 默认 float32, by default gdal.GDT_Float32 no_data : float, optional 无效值, 默认 np.nan, by default np.nan compress : str, optional 压缩算法, 可选 "DEFLATE" 或 "LZW", by default "DEFLATE" """ logging.info(f"输入文件: {input_path}") if not os.path.exists(str(input_path)): input_path = str(input_path).replace(".tif", "(2).tif") if not os.path.exists(input_path): logging.error(f"无法找到输入文件: {input_path}") return ds = gdal.Open(input_path) if ds is None: logging.error(f"无法打开输入文件: {input_path}") return # 1. 检查影像信息和无效值 has_invalid, original_no_data = check_and_get_image_info(ds) # 2. 确定最终使用的 NoData 值 target_no_data = no_data # 如果用户没有指定特定的 no_data (即传入的是 nan), 则尝试沿用原数据的 nodata # 如果原数据也没有 nodata, 则根据 output_type 设置默认值 if np.isnan(target_no_data): if original_no_data is None: if output_type == gdal.GDT_Int16: target_no_data = -32768 elif output_type == gdal.GDT_Float32: target_no_data = -9999 elif output_type == gdal.GDT_Byte: target_no_data = None else: target_no_data = np.nan logging.warning( f"原数据未设无效值且未指定新无效值, 现根据输出类型设为: {target_no_data}" ) else: target_no_data = original_no_data logging.info(f"沿用原数据无效值: {target_no_data}") else: logging.info(f"使用指定的新无效值: {target_no_data}") # 3. 转换逻辑 # 若原始影像存在无效值(NaN 或 NoData) 且 指定了新的 NoData 值, 且 新旧 NoData 值不一致 / 存在 NaN 值时使用重映射 (gdal.Warp) use_warp = False if target_no_data is not None and not np.isnan(target_no_data): if has_invalid: if original_no_data != target_no_data: use_warp = True logging.info( f"原始影像存在无效值且已指定新无效值 ({original_no_data} -> {target_no_data}), 将使用 Warp 进行处理" ) else: pass # 开始转换 logging.info(f"开始转换为 COG 格式...") creation_options = [ f"COMPRESS={compress}", "PREDICTOR=2", # 差值预测, 利于影像压缩 "NUM_THREADS=ALL_CPUS", "BIGTIFF=IF_SAFER", # "ZLEVEL=4", # 压缩级别, 支持 1-9, 默认为 6, COG 不支持设置 # "TILED=YES", # 分块参数, COG 格式自带分块, 不支持手动设置 # "PHOTOMETRIC=RGB", # 可视化参数, COG 格式不支持设置 ] if use_warp: # 使用 Warp warp_options = gdal.WarpOptions( format="COG", # 输出为 COG 格式, 自动构建金字塔 outputType=output_type, dstNodata=target_no_data, # srcNodata 未指定时, 将自动读取源数据的 nodata # 若源数据存在 NaN 却未定义 nodata, 将自动处理 float 的 NaN # srcNodata=original_no_data, creationOptions=creation_options, callback=gdal.TermProgress_nocb, # 进度回调, 不显示进度条 multithread=True, ) gdal.Warp(str(output_path), ds, options=warp_options) else: # 使用 Translate (无需修改原数据时使用, 更高效) # 但 gdal.Translate 无法将原始 NoData 直接转为新的 nodata translate_options = gdal.TranslateOptions( format="COG", outputType=output_type, noData=target_no_data, creationOptions=creation_options, callback=gdal.TermProgress_nocb, ) gdal.Translate(str(output_path), ds, options=translate_options) logging.info(f"结果已保存到: {output_path}") ds = None return def main(input_path, output_path, output_type=gdal.GDT_Float32): input_path = Path(input_path) output_dir = Path(output_path).parent os.makedirs(output_dir, exist_ok=True) # 配置日志记录 log_file = output_dir / "toCOG.log" setup_logging(str(log_file)) # 关于无效值 # Float32 类型可以设为 -9999 if output_type == gdal.GDT_Float32: no_data = -9999 else: no_data = np.nan tif_to_cog(str(input_path), str(output_path), output_type, no_data=no_data) if __name__ == "__main__": # 输入目录: 包含分块tif影像的根目录 input_root = Path(r"D:\CVEOdata\RS_Data\Terrain") # 输出目录: 存放最终COG结果的目录 output_root = Path(r"D:\CVEOdata\RS_Data\Terrain") input_path = input_root / "GLO30.DSM.2014.Hubei.30m.tif" output_path = output_root / "GLO30.DSM.2014.Hubei.30m.tif" output_type = gdal.GDT_Byte