import os from pathlib import Path from typing import Dict, List import uuid from osgeo import gdal, gdal_array from rscder.utils.setting import Settings from qgis.core import QgsRasterLayer, QgsLineSymbol, QgsSingleSymbolRenderer, QgsSimpleLineSymbolLayer, QgsVectorLayer, QgsCoordinateReferenceSystem, QgsFeature, QgsGeometry, QgsPointXY from PyQt5.QtCore import QObject, pyqtSignal from PyQt5.QtGui import QColor import yaml from .misc import singleton def relative_path(path: str, root:str) -> str: return os.path.relpath(path, root) @singleton class Project(QObject): project_init = pyqtSignal(bool) layer_load = pyqtSignal(str) ABSOLUTE_MODE = 'absolute' RELATIVE_MODE = 'relative' def __init__(self, parent=None): super().__init__(parent) self.is_init = False self.cell_size = Settings.Project().cell_size self.max_memory = Settings.Project().max_memory self.max_threads = Settings.Project().max_threads self.root = str(Path(Settings.General().root)/'default') self.file_mode = Project.ABSOLUTE_MODE self.layers:Dict[str, PairLayer] = dict() def connect(self, pair_canvas, layer_tree, message_box, result_table): self.pair_canvas = pair_canvas self.layer_tree = layer_tree self.message_box = message_box self.result_table = result_table self.layer_load.connect(layer_tree.add_layer) self.layer_load.connect(pair_canvas.add_layer) # self.layer_load.connect(message_box.add_layer) def setup(self, path = None, name = None): self.is_init = True if path is not None: self.root = path if name is None: self.file = str(Path(self.root)/'default.prj') else: self.file = str(Path(self.root)/name) if not os.path.exists(self.root): os.makedirs(self.root, exist_ok=True) if not os.path.exists(self.file): with open(self.file, 'w') as f: pass else: self.load() # self.project_created.emit() self.project_init.emit(True) def save(self): data_dict = { 'cell_size': self.cell_size, 'max_memory': self.max_memory, 'max_threads': self.max_threads, 'root': self.root, 'layers': [ layer.to_dict(None if self.file_mode == Project.ABSOLUTE_MODE else self.root) for layer in self.layers.values() ], 'results': [] } with open(self.file, 'w') as f: yaml.safe_dump(data_dict, f) # yaml.safe_dump(data_dict, open(self.file, 'w')) def clear(self): ''' clear all layers ''' self.layers = dict() self.layer_tree.clear() self.pair_canvas.clear() self.message_box.clear() self.result_table.clear() def load(self): with open(self.file, 'r') as f: data = yaml.safe_load(f) if data is None: return # data = yaml.safe_load(open(self.file, 'r')) self.cell_size = data['cell_size'] self.max_memory = data['max_memory'] self.max_threads = data['max_threads'] self.root = data['root'] self.layers = dict() for layer in data['layers']: player = PairLayer.from_dict(layer, None if self.file_mode == Project.ABSOLUTE_MODE else self.root) if player.check(): self.layers[player.id] = player self.layer_load.emit(player.id) def add_layer(self, pth1, pth2): # self.root = str(Path(pth1).parent) player = PairLayer(pth1, pth2, self.cell_size) if player.check(): # self.layers.append(player) self.layers[player.id] = player self.layer_load.emit(player.id) else: self.message_box.error(player.msg) class VectorLayer: pass class GridLayer: def set_render(self): symbol_layer = QgsSimpleLineSymbolLayer() symbol_layer.setWidth(1) symbol_layer.setColor(QColor.fromRgb(255,255,255, 100)) symbol = QgsLineSymbol() symbol.changeSymbolLayer(0, symbol_layer) render = QgsSingleSymbolRenderer(symbol) self.lines_layer.setRenderer(render) def __init__(self, cell_size, ds): self.cell_size = cell_size self.ds = ds proj = ds.GetProjection() geo = ds.GetGeoTransform() self.proj = proj self.geo = geo self.x_size = ds.RasterXSize self.y_size = ds.RasterYSize self.x_min = geo[0] self.y_min = geo[3] self.x_res = geo[1] self.y_res = geo[5] self.x_max = self.x_min + self.x_res * self.x_size self.y_max = self.y_min + self.y_res * self.y_size self.x_lines = [] for xi in range(self.x_size // self.cell_size[0]): self.x_lines.append(self.x_min + self.x_res * xi * self.cell_size[0]) if self.x_lines[-1] == self.x_max: self.x_lines.pop() self.x_lines.append(self.x_max) self.y_lines = [] for yi in range(self.y_size // self.cell_size[1]): self.y_lines.append(self.y_min + self.y_res * yi * self.cell_size[1]) if self.y_lines[-1] == self.y_max: self.y_lines.pop() self.y_lines.append(self.y_max) crs = QgsCoordinateReferenceSystem() crs.createFromString('WKT:{}'.format(ds.GetProjection())) # print(crs) lines_layer = QgsVectorLayer('LineString?crs={}'.format(crs.toProj()), 'temp-grid-outline', "memory") if not lines_layer.isValid(): Project().message_box.error('Failed to create grid outline layer') return lines_layer.setLabelsEnabled(False) lines_layer.startEditing() features = [] for x in self.x_lines: line = QgsFeature() line.setGeometry(QgsGeometry.fromPolylineXY([QgsPointXY(x, self.y_min), QgsPointXY(x, self.y_max)])) features.append(line) for y in self.y_lines: line = QgsFeature() line.setGeometry(QgsGeometry.fromPolylineXY([QgsPointXY(self.x_min, y), QgsPointXY(self.x_max, y)])) features.append(line) lines_layer.addFeatures(features) lines_layer.commitChanges() self.lines_layer = lines_layer self.set_render() # self.x_lines = [ self.x_min + i * self.x_res for i in range(self.x_size) ] @property def grid_layer(self): return self.lines_layer def to_dict(self): return { 'cell_size': self.cell_size } @staticmethod def from_dict(data): return GridLayer() class ResultLayer: POINT = 0 RASTER = 1 def __init__(self, layer_type): self.layer_type = layer_type self.data = [] class PairLayer: def to_dict(self, root = None): if root is None: return { 'pth1': self.pth1, 'pth2': self.pth2, 'l1_name': self.l1_name, 'l2_name': self.l2_name, 'cell_size': self.cell_size, } else: return { 'pth1': relative_path(self.pth1, root), 'pth2': relative_path(self.pth2, root), 'l1_name': self.l1_name, 'l2_name': self.l2_name, 'cell_size': self.cell_size, } @staticmethod def from_dict(data, root = None): if root is None: layer = PairLayer(data['pth1'], data['pth2'], data['cell_size']) else: layer = PairLayer(os.path.join(root, data['pth1']), os.path.join(root, data['pth2']), data['cell_size']) layer.l1_name = data['l1_name'] layer.l2_name = data['l2_name'] # layer.grid_layer = GridLayer.from_dict(data['grid_layer']) return layer def __init__(self, pth1, pth2, cell_size) -> None: self.pth1 = pth1 self.pth2 = pth2 self.id = str(uuid.uuid1()) self.l1_name = os.path.basename(pth1) self.l2_name = os.path.basename(pth2) self.cell_size = cell_size # self.grid_layer = GridLayer(cell_size) self.msg = '' self.checked = False def check(self): if self.checked: return self.checked if not os.path.exists(self.pth1): self.msg = '图层1不存在' return False if not os.path.exists(self.pth2): self.msg = '图层2不存在' return False ds1 = gdal.Open(self.pth1) ds2 = gdal.Open(self.pth2) if ds1 is None or ds2 is None: self.msg = '图层打开失败' return False if ds1.RasterXSize != ds2.RasterXSize or ds1.RasterYSize != ds2.RasterYSize: self.msg = '图层尺寸不一致' return False self.grid_layer = GridLayer(self.cell_size, ds1) del ds1 del ds2 self.l1 = QgsRasterLayer(self.pth1, self.l1_name) self.l2 = QgsRasterLayer(self.pth2, self.l2_name) self.checked = True return True