from cgitb import enable from collections import OrderedDict from email.message import Message import imp import inspect import os from pathlib import Path from pydoc import render_doc import shutil from statistics import stdev from tabnanny import check from threading import Thread from time import sleep, time from typing import Dict, List import uuid import numpy as np from osgeo import gdal, gdal_array,osr from rscder.utils.icons import IconInstance from rscder.gui.info import InfoBox from rscder.utils.setting import Settings from qgis.core import (\ QgsRasterLayer, QgsMarkerSymbol, QgsUnitTypes, QgsCategorizedSymbolRenderer, QgsRendererCategory, QgsPalLayerSettings, QgsRuleBasedLabeling, QgsTextFormat, QgsLineSymbol, QgsSingleSymbolRenderer, QgsSimpleLineSymbolLayer, QgsVectorLayer, QgsCoordinateReferenceSystem, QgsFeature, QgsGeometry, QgsPointXY, QgsMultiBandColorRenderer) from PyQt5.QtCore import QObject, pyqtSignal, Qt, QThread,QSize from PyQt5.QtWidgets import QTreeWidgetItem, QAction,QMessageBox from PyQt5.QtGui import QColor, QIcon, QFont,QPixmap import yaml from rscder.utils.geomath import geo2imageRC,imageRC2geo from .misc import singleton def relative_path(path: str, root:str) -> str: return os.path.relpath(path, root) @singleton class Project(QObject): instance:'Project' project_init = pyqtSignal(bool) project_setup = pyqtSignal(str) # layer_load = pyqtSignal() layer_tree_update = pyqtSignal() layer_show_update = pyqtSignal() ABSOLUTE_MODE = 'absolute' RELATIVE_MODE = 'relative' def run_auto_save(self): # t = QThread(self) if self.in_save: return if not self.is_init: return if not Settings.General().auto_save: return t = Thread(target=self.auto_save) t.start() def auto_save(self): # pre = time() self.in_save = True self.save() self.in_save = False def __init__(self, parent=None): super().__init__(parent) self.is_init = False self.cell_size = Settings.Project().cell_size self.max_memory = Settings.Project().max_memory self.max_threads = Settings.Project().max_threads self.root = str(Path(Settings.General().root)/'default') self.file_mode = Project.ABSOLUTE_MODE self.layers:Dict[str, PairLayer] = OrderedDict() self.current_layer = None self.is_closed = False self.in_save = False def set_close(): self.is_closed = True parent.closed.connect(set_close) self.run_auto_save() def connect(self, pair_canvas, layer_tree, message_box, result_table, eye): self.pair_canvas = pair_canvas self.layer_tree = layer_tree self.message_box = message_box self.result_table = result_table self.eye=eye IconInstance(self) self.layer_tree_update.connect(layer_tree.update_layer) self.layer_show_update.connect(pair_canvas.update_layer) self.layer_show_update.connect(eye.update_layer) eye.extent.connect(pair_canvas.zoom_to_extent) self.layer_tree_update.connect(self.run_auto_save) self.layer_show_update.connect(self.run_auto_save) def setup(self, path = None, name = None): ''' create: path is not None and name is not None open: path is file and name is None ''' if path is not None and name is not None: self.root = str(Path(path)/name) self.file = str(Path(self.root)/(name + '.prj')) elif name is None: self.file = path self.root = os.path.split(path)[0] else: self.message_box.error('打开或创建工程失败') try: if not os.path.exists(self.root): os.makedirs(self.root, exist_ok=True) if not os.path.exists(self.file): with open(self.file, 'w') as f: pass else: self.load() self.project_setup.emit(self.file) self.is_init = True self.project_init.emit(True) except: self.message_box.error('打开或创建工程失败') def save(self): data_dict = { 'cell_size': self.cell_size, 'max_memory': self.max_memory, 'max_threads': self.max_threads, 'root': self.root, 'layers': [ layer.to_dict() for layer in self.layers.values() ], } # for layer in self.layers.values(): # layer.save() with open(self.file, 'w') as f: yaml.safe_dump(data_dict, f) def clear(self): ''' clear all layers ''' self.layers:Dict[str, PairLayer] = dict() self.layer_tree.clear() self.pair_canvas.clear() self.message_box.clear() self.result_table.clear() def load(self): try: with open(self.file, 'r') as f: data = yaml.safe_load(f) if data is None: return # data = yaml.safe_load(open(self.file, 'r')) self.cell_size = data['cell_size'] self.max_memory = data['max_memory'] self.max_threads = data['max_threads'] self.root = data['root'] self.layers = dict() for layer in data['layers']: player = PairLayer.from_dict(layer) if player.check(): self.layers[player.id] = player self.layer_show_update.emit() self.layer_tree_update.emit() if len(list(self.layers.values())) > 0: self.current_layer = list(self.layers.values())[0] self.pair_canvas.zoom_to_layer(list(self.layers.values())[0].main_l1.layer) except Exception as e: self.message_box.error(str(e)) self.clear() def zoom_to_layer(self, data): self.pair_canvas.zoom_to_layer(data['layer']) self.eye.zoom(data['layer']) @property def cmi_path(self): pth = os.path.join(self.root, 'cmi') if not os.path.exists(pth): os.makedirs(pth) return pth @property def bcdm_path(self): pth = os.path.join(self.root, 'bcdm') if not os.path.exists(pth): os.makedirs(pth) return pth @property def evalution_path(self): pth = os.path.join(self.root, 'evalution') if not os.path.exists(pth): os.makedirs(pth) return pth @property def other_path(self): pth = os.path.join(self.root, 'other') if not os.path.exists(pth): os.makedirs(pth) return pth def add_layer(self, pth1, pth2,style_info1,style_info2): if not self.is_init: self.setup(Settings.General().root, 'default') player = PairLayer(pth1, pth2,style_info1,style_info2) if player.check(): self.layers[player.id] = player self.layer_show_update.emit() self.layer_tree_update.emit() self.pair_canvas.zoom_to_layer(player.main_l1.layer) else: self.message_box.error(f'{player.name} and {player.name} are not same size') def to_dict(obj:'BasicLayer'): init_args = inspect.getfullargspec(obj.__class__.__init__)[0][1:] data = {} for args in init_args: if hasattr(obj, args): data[args] = getattr(obj, args) data['type']=obj.__class__.__name__ return data def from_dict(data:dict): cls_type = data.pop('type') if cls_type is not None and cls_type in globals(): return globals()[cls_type](**data) class BasicLayer(QObject): LEFT_VIEW=1 RIGHT_VIEW=2 BOATH_VIEW=3 IN_MEMORY=1 IN_FILE=2 layer_tree_update = pyqtSignal() layer_show_update = pyqtSignal() zoom_to_layer = pyqtSignal(dict) def __init__(self, name='未命名', enable = False, icon = None, path = None, path_mode = IN_MEMORY, view_mode = BOATH_VIEW,): super().__init__(Project()) self.enable = enable self.name = name self.icon = icon self._path = path self._expand = True self.path_mode = path_mode self.view_mode = view_mode self.layer = None self.layer_parent = None self.layer_tree_update.connect(Project().layer_tree_update) self.layer_show_update.connect(Project().layer_show_update) self.zoom_to_layer.connect(Project().zoom_to_layer) @property def path(self): if self.path_mode == BasicLayer.IN_FILE and Project().file_mode == Project.RELATIVE_MODE: return os.path.relpath(self._path, Project().root) return self._path def get_item(self, root): item = QTreeWidgetItem(root) if self.icon is not None: item.setIcon(0, self.icon) item.setText(0, self.name) item.setCheckState(0, Qt.Checked if self.enable else Qt.Unchecked) item.item_update = self.item_update item.get_actions = self.get_actions item.setExpanded(self._expand) return item def item_update(self, item:QTreeWidgetItem): # item = self._item # print('start update') self.name = item.text(0) self._expand = item.isExpanded() pre = self.enable cur = item.checkState(0) == Qt.Checked if pre != cur: self.enable = cur self.layer_show_update.emit() # print('end update') def set_layer_parent(self, layer): self.layer_parent = layer def get_actions(self): actions = [] zoom_to_action = QAction(IconInstance().GRID_ON, '缩放至所在图层', self) actions.append(zoom_to_action) def zoom_to(): self.zoom_to_layer.emit(dict(layer=self.layer)) zoom_to_action.triggered.connect(zoom_to) del_action = QAction(IconInstance().DELETE, '删除图层', self) def del_layer(): if self.layer_parent is None: Project().layers.pop(self.id) else: self.layer_parent.remove_layer(self) self.layer_tree_update.emit() self.layer_show_update.emit() del_action.triggered.connect(del_layer) actions.append(del_action) if self.info: show_info=QAction(IconInstance().HELP,'属性信息') actions.append(show_info) def showIofn(info): #print(info) a=InfoBox(info) a.exec_() show_info.triggered.connect(lambda :showIofn(self.info)) return actions # return actions @property def info(self): return None class GridLayer(BasicLayer): def set_render(self): symbol_layer = QgsSimpleLineSymbolLayer() width = 1 if self.geo is not None: width = self.geo[1] symbol_layer.setWidth(width) symbol_layer.setColor(QColor.fromRgb(255,255,255, 255)) symbol = QgsLineSymbol() symbol.changeSymbolLayer(0, symbol_layer) symbol.setWidthUnit(QgsUnitTypes.RenderMapUnits) render = QgsSingleSymbolRenderer(symbol) self.layer.setRenderer(render) def __init__(self, proj, geo, x_size, y_size, enable=True, name='格网', cell_size=(100,100), style_opts={}): super().__init__(name, enable, icon=IconInstance().GRID_ON) self.cell_size = cell_size self.proj = proj if proj == '': geo = [0, 1, 0, 0, 0, -1] self.geo = geo self.x_size = np.abs(x_size) self.y_size = np.abs(y_size) self.x_min = geo[0] self.y_min = geo[3] self.x_res = geo[1] self.y_res = -np.abs(geo[5]) self.x_max = self.x_min + self.x_res * self.x_size self.y_max = self.y_min + self.y_res * self.y_size self.x_lines = [] for xi in range(self.x_size // self.cell_size[0] +1): self.x_lines.append(self.x_min + self.x_res * xi * self.cell_size[0]) if self.x_lines[-1] > self.x_max: self.x_lines.pop() self.x_lines.append(self.x_max) self.y_lines = [] for yi in range(np.abs(self.y_size) // self.cell_size[1]+1): self.y_lines.append(self.y_min + self.y_res * yi * self.cell_size[1]) if self.y_lines[-1] < self.y_max: self.y_lines.pop() self.y_lines.append(self.y_max) crs = QgsCoordinateReferenceSystem() proj_str = '' if proj != '': crs.createFromString('WKT:{}'.format(proj)) proj_str = crs.toProj() # print(crs) lines_layer = QgsVectorLayer('LineString?crs={}'.format(proj_str), 'temp-grid-outline', "memory") if not lines_layer.isValid(): Project().message_box.error('Failed to create grid outline layer') return lines_layer.setLabelsEnabled(False) lines_layer.startEditing() features = [] for x in self.x_lines: line = QgsFeature() line.setGeometry(QgsGeometry.fromPolylineXY([QgsPointXY(x, self.y_min), QgsPointXY(x, self.y_max)])) features.append(line) for y in self.y_lines: line = QgsFeature() line.setGeometry(QgsGeometry.fromPolylineXY([QgsPointXY(self.x_min, y), QgsPointXY(self.x_max, y)])) features.append(line) lines_layer.addFeatures(features) lines_layer.commitChanges() self.layer = lines_layer self.set_render() class Mask(object): def __init__(self,geoxy) -> None: super().__init__() self.xy=geoxy class RasterLayer(BasicLayer): def __init__(self, name=None, enable=False, path=None, view_mode=BasicLayer.BOATH_VIEW,style_info={'r':1,'g':2,'b':3,'NIR':0}): if name is None: name = os.path.splitext(os.path.basename(path))[0] super().__init__(name, enable, IconInstance().RASTER, path, BasicLayer.IN_FILE, view_mode) self.layer = QgsRasterLayer(self.path, self.name) self.style_info=style_info self.apply_style() def compare(self, other:'RasterLayer'): ''' 与其他图像比较,看是否处于同一投影,有无重叠区域,能否进行变化检测 other:其它同类图层 return:-1,无法进行比较;0,有重叠区域;1,完全重叠。 ''' ds1:gdal.Dataset = gdal.Open(self.path) ds2 :gdal.Dataset = gdal.Open(other.path) # if ds1.RasterXSize == ds2.RasterXSize and ds1.RasterYSize == ds2.RasterYSize: # return 0 geo1=ds1.GetGeoTransform() geo2=ds2.GetGeoTransform() map1xy=[imageRC2geo(geo1,0,0),imageRC2geo(geo1,ds1.RasterXSize,ds1.RasterYSize)] map2xy=[imageRC2geo(geo2,0,0),imageRC2geo(geo2,ds2.RasterXSize,ds2.RasterYSize)] map1xx=[map1xy[0][0],map1xy[1][0]] map1yy=[map1xy[0][1],map1xy[1][1]] map2xx=[map2xy[0][0],map2xy[1][0]] map2yy=[map2xy[0][1],map2xy[1][1]] map1xx.sort() map1yy.sort() map2yy.sort() map2xx.sort() if ds1 is None or ds2 is None: Project().message_box.error('图层打开失败') return -1 # if not ds1.GetProjection()==ds2.GetProjection(): # Project().message_box.error('投影不一致') # return -1 elif (map1xx[0]>map2xx[1] or map1xx[1]map2yy[1] or map1yy[1] None: self.layers:List[BasicLayer] = [] self.id = str(uuid.uuid1()) self.checked = False self.main_l1 = MultiBandRasterLayer(path = pth1, enable=True, view_mode=BasicLayer.LEFT_VIEW,style_info=style_info1) self.main_l2 = MultiBandRasterLayer(path = pth2, enable=True, view_mode=BasicLayer.RIGHT_VIEW,style_info=style_info2) self.main_l1.set_layer_parent(self) self.main_l2.set_layer_parent(self) self.grid = None self.cell_size = Project().cell_size name = os.path.basename(pth1)[:4] + '-' + os.path.basename(pth2)[:4] self.mask=mask # self.layer_update.connect(Project().layer_updated) super().__init__(name, True, IconInstance().DOCUMENT) self.layer = self.main_l1.layer if self.check(): self.layers.append(self.main_l1) self.layers.append(self.main_l2) # self.layers.append(self.grid) @property def l1_geo(self): return self.main_l1.geo @property def l2_geo(self): return self.main_l2.geo def check(self): if self.checked: return self.checked self.checked = self.main_l1.compare(self.main_l2) if not self.checked==-1:#重叠 self.mask=Mask(self.main_l1.overlap(self.main_l2)) x1,y1=geo2imageRC(self.l1_geo,*self.mask.xy[0:2]) x2,y2=geo2imageRC(self.l1_geo,*self.mask.xy[2:]) self.size=[x2-x1,y2-y1] self.geo = self.main_l1.geo self.proj = self.main_l1.proj gridgeo=[self.mask.xy[0],self.l1_geo[1],self.l1_geo[2],self.mask.xy[1],self.l1_geo[4],self.l1_geo[5]] self.grid = GridLayer(self.proj, gridgeo , self.size[0], self.size[1], cell_size=Project().cell_size) self.grid.set_layer_parent(self) self.checked = True else: self.checked = False return self.checked def add_result_layer(self, result): result.set_layer_parent(self) self.layers.insert(0, result) self.layer_show_update.emit() self.layer_tree_update.emit() def has_layer(self, layer): for ilayer in self.layers: if ilayer is layer: return True return False def remove_layer(self, layer): idx = -1 for ilayer in self.layers: idx += 1 if ilayer is layer: break if idx >= len(self.layers): return if layer is self.grid or layer is self.main_l1 or layer is self.main_l2: return self.layers.pop(idx) del layer def to_dict(self): data=dict( name=self.name, enable=self.enable, pth1=self.main_l1.path, pth2=self.main_l2.path, style_info1=self.main_l1.style_info, style_info2=self.main_l2.style_info, layers=[to_dict(l) for l in self.layers if not (l is self.grid or l is self.main_l1 or l is self.main_l2) ], ) return data @property def info(self): ds= gdal.Open(self.main_l1.path) srs= osr.SpatialReference(ds.GetProjectionRef()) metadata={} # 空间参考系统 metadata['proj4'] = srs.ExportToProj4() metadata['wkt'] = srs.ExportToWkt() # 地理坐标系 metadata['geocs'] = srs.GetAttrValue('GEOGCS') metadata['uom'] = srs.GetAttrValue('UNIT') # 投影坐标系 metadata['projcs'] = srs.GetAttrValue('PROJCS') # if projected metadata['epsg'] = srs.GetAuthorityCode(None) ds=None #格网信息 gridData={} gridData['格网大小']='{}像素'.format(self.grid.cell_size) gridData['格网线']=['宽度:1像素', '颜色:白色'] #公共掩模 maskData={} if self.mask: maskData['左上角坐标']=self.mask.xy[0:2] maskData['右下角坐标']=self.mask.xy[2:] mapinfo={ '坐标系':metadata, '格网':gridData, '公共掩膜':maskData } return mapinfo @staticmethod def from_dict(data): player = PairLayer(data['pth1'], data['pth2'], data['style_info1'], data['style_info2']) player.name = data['name'] for layer in data['layers']: l = from_dict(layer) l.set_layer_parent(player) player.layers.insert(0,l) return player