from collections import OrderedDict import os from pathlib import Path from threading import Thread from time import sleep, time from typing import Dict, List import uuid import numpy as np from osgeo import gdal, gdal_array from rscder.utils.setting import Settings from qgis.core import QgsRasterLayer, QgsMarkerSymbol, QgsUnitTypes, QgsCategorizedSymbolRenderer, QgsRendererCategory, QgsPalLayerSettings, QgsRuleBasedLabeling, QgsTextFormat, QgsLineSymbol, QgsSingleSymbolRenderer, QgsSimpleLineSymbolLayer, QgsVectorLayer, QgsCoordinateReferenceSystem, QgsFeature, QgsGeometry, QgsPointXY from PyQt5.QtCore import QObject, pyqtSignal from PyQt5.QtGui import QColor import yaml from .misc import singleton def relative_path(path: str, root:str) -> str: return os.path.relpath(path, root) @singleton class Project(QObject): project_init = pyqtSignal(bool) layer_load = pyqtSignal(str) ABSOLUTE_MODE = 'absolute' RELATIVE_MODE = 'relative' def run_auto_save(self): t = Thread(target=self.auto_save) t.run() def auto_save(self): # pre = time() while True: if Settings.General().auto_save and self.is_init: self.save() if self.is_closed: break sleep(Settings.General().auto_save_intervel) def __init__(self, parent=None): super().__init__(parent) self.is_init = False self.cell_size = Settings.Project().cell_size self.max_memory = Settings.Project().max_memory self.max_threads = Settings.Project().max_threads self.root = str(Path(Settings.General().root)/'default') self.file_mode = Project.ABSOLUTE_MODE self.layers:Dict[str, PairLayer] = OrderedDict() self.current_layer = None self.is_closed = False self.run_auto_save() def connect(self, pair_canvas, layer_tree, message_box, result_table): self.pair_canvas = pair_canvas self.layer_tree = layer_tree self.message_box = message_box self.result_table = result_table self.layer_load.connect(layer_tree.add_layer) self.layer_load.connect(pair_canvas.add_layer) def change_result(self, layer_id, result_id, data): if layer_id in self.layers: result = self.layers[layer_id].results[result_id] if result.layer_type == ResultLayer.POINT: result.update(data) elif result.layer_type == ResultLayer.RASTER: pass def setup(self, path = None, name = None): self.is_init = True if path is not None: self.root = path if name is None: self.file = str(Path(self.root)/'default.prj') else: self.file = str(Path(self.root)/name) if not os.path.exists(self.root): os.makedirs(self.root, exist_ok=True) if not os.path.exists(self.file): with open(self.file, 'w') as f: pass else: self.load() # self.cmi_dir = str(Path(self.root)/'cmi') # self.project_created.emit() self.project_init.emit(True) def save(self): data_dict = { 'cell_size': self.cell_size, 'max_memory': self.max_memory, 'max_threads': self.max_threads, 'root': self.root, 'layers': [ layer.to_dict(None if self.file_mode == Project.ABSOLUTE_MODE else self.root) for layer in self.layers.values() ], } for layer in self.layers.values(): layer.save() with open(self.file, 'w') as f: yaml.safe_dump(data_dict, f) # yaml.safe_dump(data_dict, open(self.file, 'w')) def clear(self): ''' clear all layers ''' self.layers = dict() self.layer_tree.clear() self.pair_canvas.clear() self.message_box.clear() self.result_table.clear() def load(self): try: with open(self.file, 'r') as f: data = yaml.safe_load(f) if data is None: return # data = yaml.safe_load(open(self.file, 'r')) self.cell_size = data['cell_size'] self.max_memory = data['max_memory'] self.max_threads = data['max_threads'] self.root = data['root'] self.layers = dict() for layer in data['layers']: player = PairLayer.from_dict(layer, None if self.file_mode == Project.ABSOLUTE_MODE else self.root) if player.check(): self.layers[player.id] = player self.layer_load.emit(player.id) except Exception as e: self.message_box.error(str(e)) self.clear() def add_layer(self, pth1, pth2): # self.root = str(Path(pth1).parent) player = PairLayer(pth1, pth2, self.cell_size) if player.check(): # self.layers.append(player) self.layers[player.id] = player self.layer_load.emit(player.id) else: self.message_box.error(player.msg) class GridLayer: def set_render(self): symbol_layer = QgsSimpleLineSymbolLayer() symbol_layer.setWidth(1 * self.x_res) symbol_layer.setColor(QColor.fromRgb(255,255,255, 200)) symbol = QgsLineSymbol() symbol.changeSymbolLayer(0, symbol_layer) symbol.setWidthUnit(QgsUnitTypes.RenderMapUnits) render = QgsSingleSymbolRenderer(symbol) self.lines_layer.setRenderer(render) def __init__(self, cell_size, ds): self.cell_size = cell_size self.ds = ds proj = ds.GetProjection() geo = ds.GetGeoTransform() self.proj = proj self.geo = geo self.x_size = ds.RasterXSize self.y_size = ds.RasterYSize self.x_min = geo[0] self.y_min = geo[3] self.x_res = geo[1] self.y_res = geo[5] self.x_max = self.x_min + self.x_res * self.x_size self.y_max = self.y_min + self.y_res * self.y_size self.x_lines = [] for xi in range(self.x_size // self.cell_size[0]): self.x_lines.append(self.x_min + self.x_res * xi * self.cell_size[0]) if self.x_lines[-1] == self.x_max: self.x_lines.pop() self.x_lines.append(self.x_max) self.y_lines = [] for yi in range(self.y_size // self.cell_size[1]): self.y_lines.append(self.y_min + self.y_res * yi * self.cell_size[1]) if self.y_lines[-1] == self.y_max: self.y_lines.pop() self.y_lines.append(self.y_max) crs = QgsCoordinateReferenceSystem() crs.createFromString('WKT:{}'.format(ds.GetProjection())) # print(crs) lines_layer = QgsVectorLayer('LineString?crs={}'.format(crs.toProj()), 'temp-grid-outline', "memory") if not lines_layer.isValid(): Project().message_box.error('Failed to create grid outline layer') return lines_layer.setLabelsEnabled(False) lines_layer.startEditing() features = [] for x in self.x_lines: line = QgsFeature() line.setGeometry(QgsGeometry.fromPolylineXY([QgsPointXY(x, self.y_min), QgsPointXY(x, self.y_max)])) features.append(line) for y in self.y_lines: line = QgsFeature() line.setGeometry(QgsGeometry.fromPolylineXY([QgsPointXY(self.x_min, y), QgsPointXY(self.x_max, y)])) features.append(line) lines_layer.addFeatures(features) lines_layer.commitChanges() self.lines_layer = lines_layer self.set_render() # self.x_lines = [ self.x_min + i * self.x_res for i in range(self.x_size) ] @property def grid_layer(self): return self.lines_layer def to_dict(self): return { 'cell_size': self.cell_size } @staticmethod def from_dict(data): return GridLayer() class ResultLayer: POINT = 0 RASTER = 1 def __init__(self, name, parent, layer_type = POINT): self.layer_type = layer_type self.data = None self.layer = None self.name = name self.path = None self.wkt = None self.enable = False self.parent = parent def save(self): if self.layer_type == ResultLayer.POINT: with open(self.path, 'w') as f: f.write('x,y,diff,status\n') for i in range(len(self.data)): f.write('{},{},{},{}\n'.format(self.data[i][0], self.data[i][1], self.data[i][2], int(self.data[i][3]))) def update(self, data): if self.layer_type == ResultLayer.POINT: row = data['row'] value = data['value'] self.data[row][-1] = value self.update_point_layer(row) elif self.layer_type == ResultLayer.RASTER: pass def load_file(self, path): self.path = path if self.layer_type == ResultLayer.POINT: self.load_point_file() elif self.layer_type == ResultLayer.RASTER: self.load_raster_file() else: raise Exception('Unknown layer type') def format_point_layer(self, layer): layer.setLabelsEnabled(True) lyr = QgsPalLayerSettings() lyr.enabled = True lyr.fieldName = 'fid' lyr.placement = QgsPalLayerSettings.OverPoint lyr.textNamedStyle = 'Medium' text_format = QgsTextFormat() text_format.color = QColor('#ffffff') text_format.background().color = QColor('#000000') text_format.buffer().setEnabled(True) text_format.buffer().setSize(1) text_format.buffer().setOpacity(0.5) lyr.setFormat(text_format) root = QgsRuleBasedLabeling.Rule(QgsPalLayerSettings()) rule = QgsRuleBasedLabeling.Rule(lyr) rule.setDescription('label') root.appendChild(rule) #Apply label configuration rules = QgsRuleBasedLabeling(root) layer.setLabeling(rules) def set_render(self, layer): symbol_change = QgsMarkerSymbol.createSimple({'color': '#ffff00', 'size': 5 * self.parent.xres }) symbol_change.setSizeUnit(QgsUnitTypes.RenderUnit.RenderMetersInMapUnits) category_change = QgsRendererCategory(1, symbol_change,'change') symbol_unchange = QgsMarkerSymbol.createSimple({'color': '#00000000', 'size': '0'}) category_unchange = QgsRendererCategory(0, symbol_unchange, 'unchange') render = QgsCategorizedSymbolRenderer('status', [category_change, category_unchange]) layer.setRenderer(render) def load_point_file(self): data = np.loadtxt(self.path, delimiter=',', skiprows=1) if data is None: return self.data = data self.make_point_layer() def make_point_layer(self): if self.wkt is not None: crs = QgsCoordinateReferenceSystem() crs.createFromString('WKT:{}'.format(self.wkt)) else: crs = QgsCoordinateReferenceSystem() uri = 'Point?crs={}&field=status:integer'.format(crs.toProj()) layer = QgsVectorLayer(uri, self.name, "memory") if not layer.isValid(): Project().message_box.error('Failed to create layer') return self.format_point_layer(layer) layer.startEditing() features = [] # status_index = for i, d in enumerate(self.data): point = QgsFeature(layer.fields()) point.setId(i) point.setGeometry(QgsGeometry.fromPointXY(QgsPointXY(d[0], d[1]))) point.setAttribute('status', int(d[-1])) # point.setAttribute('id', i) features.append(point) layer.addFeatures(features) self.set_render(layer) layer.commitChanges() self.layer = layer def update_point_layer(self, row = 0): if self.layer is None: return self.layer.startEditing() if row < 0: for i, d in enumerate(self.data): feature = self.layer.getFeature(i+1) if feature is None: continue feature.setAttribute('status', d[-1]) else: feature = self.layer.getFeature(row+1) # print(feature) if feature is None: return # print(feature.fields().names()) # self.layer.deleteFeature(feature.id()) # del feature feature.setAttribute('status', int(self.data[row][-1])) self.layer.updateFeature(feature) self.layer.commitChanges() def load_raster_file(self): ds = gdal.Open(self.path) if ds is None: return self.layer = QgsRasterLayer(self.path, self.name) @staticmethod def from_dict(data, parent, root = None): result = ResultLayer(data['name'], parent, data['layer_type']) result.wkt = data['wkt'] if root is not None: result.load_file(str(Path(root) / data['path'])) else: result.load_file(data['path']) return result def to_dict(self, root=None): return { 'name': self.name, 'layer_type': self.layer_type, 'wkt': self.wkt, 'path': self.path if root is None else str(Path(self.path).relative_to(root)) } # def load_file(self, path): class PairLayer: def to_dict(self, root = None): if root is None: return { 'pth1': self.pth1, 'pth2': self.pth2, 'l1_name': self.l1_name, 'l2_name': self.l2_name, 'cell_size': self.cell_size, 'results': [r.to_dict(root) for r in self.results], 'name': self.name } else: return { 'pth1': relative_path(self.pth1, root), 'pth2': relative_path(self.pth2, root), 'name': self.name, 'l1_name': self.l1_name, 'l2_name': self.l2_name, 'cell_size': self.cell_size, 'results': [r.to_dict(root) for r in self.results] } @staticmethod def from_dict(data, root = None): if root is None: layer = PairLayer(data['pth1'], data['pth2'], data['cell_size']) else: layer = PairLayer(os.path.join(root, data['pth1']), os.path.join(root, data['pth2']), data['cell_size']) layer.l1_name = data['l1_name'] layer.l2_name = data['l2_name'] layer.name = data['name'] for r in data['results']: layer.results.append(ResultLayer.from_dict(r, layer, root)) # layer.grid_layer = GridLayer.from_dict(data['grid_layer']) return layer def save(self): for r in self.results: r.save() def __init__(self, pth1, pth2, cell_size) -> None: self.pth1 = pth1 self.pth2 = pth2 self.enable = True self.l1_enable = True self.l2_enable = True self.grid_enable = True self.id = str(uuid.uuid1()) self.name = '{}-{}'.format(os.path.basename(pth1), os.path.basename(pth2)) self.l1_name = os.path.basename(pth1) self.l2_name = os.path.basename(pth2) self.l1_pres = [] # 预处理数据 self.l2_pres = [] self.cell_size = cell_size self.msg = '' self.checked = False self.xsize = 0 self.ysize = 0 self.xres = 0 self.yres = 0 self.wkt = None self.results:List[ResultLayer] = [] def check(self): if self.checked: return self.checked if not os.path.exists(self.pth1): self.msg = '图层1不存在' return False if not os.path.exists(self.pth2): self.msg = '图层2不存在' return False ds1 = gdal.Open(self.pth1) ds2 = gdal.Open(self.pth2) if ds1 is None or ds2 is None: self.msg = '图层打开失败' return False if ds1.RasterXSize != ds2.RasterXSize or ds1.RasterYSize != ds2.RasterYSize: self.msg = '图层尺寸不一致' return False self.xsize = ds1.RasterXSize self.ysize = ds1.RasterYSize self.xres = ds1.GetGeoTransform()[1] self.yres = ds1.GetGeoTransform()[5] self.wkt = ds1.GetProjection() self.grid_layer = GridLayer(self.cell_size, ds1) del ds1 del ds2 self.l1 = QgsRasterLayer(self.pth1, self.l1_name) self.l2 = QgsRasterLayer(self.pth2, self.l2_name) self.checked = True return True