using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.ComponentModel; using System.Data; using System.Drawing; using System.IO; using System.IO.Ports; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; using System.Windows.Forms; using uPLibrary.Networking.M2Mqtt; using uPLibrary.Networking.M2Mqtt.Messages; using Newtonsoft.Json; using aiot_paho_csharp; namespace sscom_sender { public class MqttConfig { public string? ProductKey { get; set; } public string? Topic { get; set; } public string? DeviceName { get; set; } public string? DeviceSecret { get; set; } public string? Broker { get; set; } public int Port { get; set; } public bool UseSsl { get; set; } } public partial class Form1 : Form { private SerialPort? gpsPort; private SerialPort? spectrometerPort; private MqttClient? mqttClient; private bool isRunning = false; private string? logFileName; // 数据队列 private ConcurrentQueue gpsDataQueue = new ConcurrentQueue(); private ConcurrentQueue spectrometerDataQueue = new ConcurrentQueue(); private ConcurrentQueue logQueue = new ConcurrentQueue(); // 线程控制 private CancellationTokenSource? cancellationTokenSource; private Task? gpsTask; private Task? spectrometerTask; private Task? mqttTask; private Task? logTask; // 数据缓冲区 private StringBuilder gpsBuffer = new StringBuilder(); private StringBuilder spectrometerBuffer = new StringBuilder(); public Form1() { InitializeComponent(); InitializeForm(); } private void InitializeForm() { // 初始化串口列表 RefreshSerialPorts(); // 加载MQTT配置 LoadMqttConfig(); // 用异步的方式InitializeMQTT(); // 初始化MQTT连接 _ = Task.Run(() => InitializeMQTT()); // 创建日志文件名 logFileName = Path.Combine(Application.StartupPath, "logs", $"log_{DateTime.Now:yyyyMMdd_HHmmss}.txt"); string? logDir = Path.GetDirectoryName(logFileName); if (!string.IsNullOrEmpty(logDir)) { _ = Directory.CreateDirectory(logDir); } } private void RefreshSerialPorts() { string[] ports = SerialPort.GetPortNames(); comboBoxGPSPort.Items.Clear(); comboBoxSpectrometerPort.Items.Clear(); foreach (string port in ports) { _ = comboBoxGPSPort.Items.Add(port); _ = comboBoxSpectrometerPort.Items.Add(port); } if (ports.Length > 0) { comboBoxGPSPort.SelectedIndex = 0; if (ports.Length > 1) comboBoxSpectrometerPort.SelectedIndex = 1; else comboBoxSpectrometerPort.SelectedIndex = 0; } } private void buttonStartStop_Click(object sender, EventArgs e) { if (!isRunning) { StartDataCollection(); } else { StopDataCollection(); } } private void StartDataCollection() { try { // 初始化串口 InitializeSerialPorts(); // 启动线程 cancellationTokenSource = new CancellationTokenSource(); var token = cancellationTokenSource.Token; gpsTask = Task.Run(() => GPSDataReader(token), token); spectrometerTask = Task.Run(() => SpectrometerDataReader(token), token); mqttTask = Task.Run(() => MQTTDataSender(token), token); logTask = Task.Run(() => LogWriter(token), token); isRunning = true; buttonStartStop.Text = "停止"; buttonStartStop.BackColor = Color.Red; LogMessage("数据采集已启动"); } catch (Exception ex) { _ = MessageBox.Show($"启动失败: {ex.Message}", "错误", MessageBoxButtons.OK, MessageBoxIcon.Error); } } private void StopDataCollection() { try { isRunning = false; cancellationTokenSource?.Cancel(); // 立即更新UI状态 buttonStartStop.Text = "开始"; buttonStartStop.BackColor = SystemColors.Control; LogMessage("正在停止数据采集..."); // 异步等待任务完成,避免阻塞UI线程 _ = Task.Run(() => { try { // 给任务一些时间优雅退出 var tasks = new[] { gpsTask, spectrometerTask, mqttTask, logTask } .Where(t => t != null) .Cast() .ToArray(); if (tasks.Length > 0) { _ = Task.WaitAll(tasks, 2000); // 减少等待时间到2秒 } // 关闭串口 try { gpsPort?.Close(); } catch { } try { spectrometerPort?.Close(); } catch { } // 断开MQTT try { if (mqttClient != null) { mqttClient.Disconnect(); mqttClient = null; this.Invoke(new Action(() => UpdateMqttStatus("未连接", false))); } } catch { } // 在UI线程上更新状态 this.Invoke(new Action(() => { LogMessage("数据采集已停止"); })); } catch (Exception ex) { this.Invoke(new Action(() => { LogMessage($"停止时发生错误: {ex.Message}"); })); } }); } catch (Exception ex) { LogMessage($"停止时发生错误: {ex.Message}"); } } private void InitializeSerialPorts() { // GPS串口 if (comboBoxGPSPort.SelectedItem != null) { gpsPort = new SerialPort(comboBoxGPSPort.SelectedItem.ToString(), 115200, Parity.None, 8, StopBits.One); gpsPort.DataReceived += GPSPort_DataReceived; gpsPort.Open(); // 发送GPS初始化命令 gpsPort.WriteLine("bestposa com1 1"); } // 谱仪串口 if (comboBoxSpectrometerPort.SelectedItem != null) { spectrometerPort = new SerialPort(comboBoxSpectrometerPort.SelectedItem.ToString(), 115200, Parity.None, 8, StopBits.One); spectrometerPort.DataReceived += SpectrometerPort_DataReceived; spectrometerPort.Open(); if (!spectrometerPort.IsOpen) { LogMessage("谱仪端口打开失败"); return; } LogMessage("谱仪端口已打开"); // 发送谱仪开始命令 } } private MqttConfig? mqttConfig; private void LoadMqttConfig() { try { string configPath = Path.Combine(Application.StartupPath, "mqtt_config.json"); LogMessage($"加载MQTT配置文件: {configPath}"); if (File.Exists(configPath)) { string json = File.ReadAllText(configPath); var config = JsonConvert.DeserializeObject(json); if (config != null) { mqttConfig = config; LogMessage("MQTT配置已加载"); } else { LogMessage("MQTT配置文件内容无效,使用默认配置"); mqttConfig = new MqttConfig { ProductKey = "gfcq950RDqt", Topic = "/gfcq950RDqt/PubDevice/user/update", DeviceName = "PubDevice", DeviceSecret = "1031a49a4f61c29a086f79b41ed971c7", Broker = "iot-06z00jfiubx584v.mqtt.iothub.aliyuncs.com", Port = 1883, UseSsl = true }; string defaultJson = JsonConvert.SerializeObject(mqttConfig, Formatting.Indented); File.WriteAllText(configPath, defaultJson); LogMessage("已创建默认MQTT配置文件"); } } else { LogMessage("MQTT配置文件不存在,使用默认配置"); mqttConfig = new MqttConfig { ProductKey = "gfcq950RDqt", Topic = "/gfcq950RDqt/PubDevice/user/update", DeviceName = "PubDevice", DeviceSecret = "1031a49a4f61c29a086f79b41ed971c7", Broker = "iot-06z00jfiubx584v.mqtt.iothub.aliyuncs.com", Port = 1883, UseSsl = true }; // 保存默认配置到文件 string json = JsonConvert.SerializeObject(mqttConfig, Formatting.Indented); File.WriteAllText(configPath, json); LogMessage("已创建默认MQTT配置文件"); } } catch (Exception ex) { LogMessage($"加载MQTT配置失败: {ex.Message}"); mqttConfig = null; } } private void InitializeMQTT() { try { if (mqttConfig == null) { LogMessage("MQTT配置未加载,跳过MQTT连接"); UpdateMqttStatus("配置未加载", false); return; } if (string.IsNullOrEmpty(mqttConfig.ProductKey) || string.IsNullOrEmpty(mqttConfig.DeviceName) || string.IsNullOrEmpty(mqttConfig.DeviceSecret)) { LogMessage("MQTT配置项 ProductKey、DeviceName 或 DeviceSecret 为空,无法连接MQTT"); UpdateMqttStatus("配置项缺失", false); return; } MqttSign sign = new MqttSign(); _ = sign.calculate(mqttConfig.ProductKey, mqttConfig.DeviceName, mqttConfig.DeviceSecret); string broker = mqttConfig.Broker!; LogMessage($"MQTT连接到: {broker}"); mqttClient = new MqttClient(broker, mqttConfig.Port, mqttConfig.UseSsl, null, null, MqttSslProtocols.TLSv1_2); _ = mqttClient.Connect(sign.getClientid(), sign.getUsername(), sign.getPassword()); LogMessage($"MQTT已连接到: {broker}"); UpdateMqttStatus("已连接", true); } catch (Exception ex) { LogMessage($"MQTT连接失败: {ex.Message},将继续进行数据采集和日志记录"); UpdateMqttStatus("连接失败", false); mqttClient = null; // 确保mqttClient为null,避免后续使用 } } private void UpdateMqttStatus(string status, bool isConnected) { if (this.InvokeRequired) { this.Invoke(new Action(() => UpdateMqttStatus(status, isConnected))); return; } labelMqttStatus.Text = status; labelMqttStatus.ForeColor = isConnected ? System.Drawing.Color.Green : System.Drawing.Color.Red; } private void GPSPort_DataReceived(object sender, SerialDataReceivedEventArgs e) { try { if (gpsPort != null) { string data = gpsPort.ReadExisting(); _ = gpsBuffer.Append(data); } // 处理完整的行 string bufferContent = gpsBuffer.ToString(); string[] lines = bufferContent.Split(new[] { '\r', '\n' }, StringSplitOptions.RemoveEmptyEntries); if (bufferContent.EndsWith("\r") || bufferContent.EndsWith("\n")) { _ = gpsBuffer.Clear(); foreach (string line in lines) { ProcessGPSData(line); } } else { _ = gpsBuffer.Clear(); for (int i = 0; i < lines.Length - 1; i++) { ProcessGPSData(lines[i]); } _ = gpsBuffer.Append(lines[lines.Length - 1]); } } catch (Exception ex) { LogMessage($"GPS数据接收错误: {ex.Message}"); } } private void SpectrometerPort_DataReceived(object sender, SerialDataReceivedEventArgs e) { try { string data = spectrometerPort != null ? spectrometerPort.ReadExisting() : string.Empty; _ = spectrometerBuffer.Append(data); // 处理缓冲区中的数据 string bufferContent = spectrometerBuffer.ToString(); // 清理无用的字符串 bufferContent = bufferContent.Replace("Please again", "").Replace("ok", ""); // 按空格分割所有数据 string[] allParts = bufferContent.Split(new char[] { ' ' }, StringSplitOptions.RemoveEmptyEntries); if (allParts.Length < 16) { return; } // 查找日期格式的数据位置 for (int i = 0; i < allParts.Length - 15; i++) // 确保后面还有15个字段 { // 检查是否为日期格式 xxxx.xx.xx if (System.Text.RegularExpressions.Regex.IsMatch(allParts[i], @"^\d{4}\.\d{2}\.\d{2}$")) { // 检查后面是否有时间格式 xx:xx:xx if (i + 1 < allParts.Length && System.Text.RegularExpressions.Regex.IsMatch(allParts[i + 1], @"^\d{2}:\d{2}:\d{2}$")) { // 检查是否有足够的16个字段(日期 + 时间 + 14个数值) if (i + 15 < allParts.Length) { // 验证后面14个字段都是数值格式 bool allValid = true; for (int j = i + 2; j <= i + 15; j++) { if (!System.Text.RegularExpressions.Regex.IsMatch(allParts[j], @"^\d+\.\d{2}$")) { allValid = false; break; } } if (allValid) { // 构建完整的数据行 string[] validData = new string[16]; for (int k = 0; k < 16; k++) { validData[k] = allParts[i + k]; } string completeDataLine = string.Join(" ", validData); // 处理这条完整的数据 ProcessSpectrometerData(completeDataLine); // 从缓冲区中移除已处理的数据 int endIndex = i + 16; if (endIndex < allParts.Length) { string[] remainingParts = new string[allParts.Length - endIndex]; Array.Copy(allParts, endIndex, remainingParts, 0, remainingParts.Length); _ = spectrometerBuffer.Clear(); _ = spectrometerBuffer.Append(string.Join(" ", remainingParts)); } else { _ = spectrometerBuffer.Clear(); } return; // 处理完一条数据后退出 } } } } } // 如果缓冲区太大,清理一部分旧数据 if (spectrometerBuffer.Length > 10000) { string content = spectrometerBuffer.ToString(); _ = spectrometerBuffer.Clear(); _ = spectrometerBuffer.Append(content.Substring(content.Length / 2)); } } catch (Exception ex) { LogMessage($"谱仪数据接收错误: {ex.Message}"); } } private void ProcessGPSData(string data) { LogMessage($"GPS原始数据: {data}", false); if (data.StartsWith("#BESTPOSA")) { try { string[] parts = data.Split(','); // 查找WGS84标识的位置 // LogMessage($"GPS数据部分数量: {parts.Length}"); int wgs84Index = -1; for (int i = 0; i < parts.Length; i++) { // LogMessage($"GPS数据部分: {parts[i]}"); if (parts[i].Contains("WGS84")) { wgs84Index = i; break; } } // LogMessage($"WGS84索引: {wgs84Index}"); // WGS84后面的两个字段是经纬度 if (wgs84Index >= 0 && parts.Length > wgs84Index + 2) { double lat = double.Parse(parts[wgs84Index + 1]); double lng = double.Parse(parts[wgs84Index + 2]); GPSData gpsData = new GPSData { Latitude = lat, Longitude = lng, Timestamp = DateTime.Now }; gpsDataQueue.Enqueue(gpsData); LogMessage($"GPS数据: 纬度={lat:F8}, 经度={lng:F8}", false); } } catch (Exception ex) { LogMessage($"GPS数据解析错误: {ex.Message}"); } } } private void ProcessSpectrometerData(string data) { LogMessage($"谱仪完整原始数据: {data}", false); if (data.Equals("ok")) { LogMessage("谱仪已准备好"); return; } try { string[] parts = data.Split(' '); // 检查是否为完整的反馈格式:16个字段(日期 时间 + 14个数值) if (parts.Length == 16) { // 验证日期格式 xxxx.xx.xx if (!System.Text.RegularExpressions.Regex.IsMatch(parts[0], @"^\d{4}\.\d{2}\.\d{2}$")) return; // 验证时间格式 xx:xx:xx if (!System.Text.RegularExpressions.Regex.IsMatch(parts[1], @"^\d{2}:\d{2}:\d{2}$")) return; // 解析时间戳 (xxxx.xx.xx xx:xx:xx) // string dateStr = parts[0]; // string timeStr = parts[1]; SpectrometerData spectData = new SpectrometerData { Timestamp = DateTime.Now, // 使用当前时间,也可以解析返回的时间 RealTime = double.Parse(parts[2]), // aaaaaaa.aa 实时间 DeadTime = double.Parse(parts[3]), // bbbbbbb.bb 死时间 DoseRate = double.Parse(parts[14]), // JJJJJJ.JJ 环境总辐射剂量率值(第15个字段) TotalCountRate = double.Parse(parts[15]) // cccccccccc.cc γ谱的全谱计数率(第16个字段) }; spectrometerDataQueue.Enqueue(spectData); LogMessage($"谱仪数据: 时间={spectData.Timestamp}, 剂量率={spectData.DoseRate:F2} nSv/h, 全谱计数率={spectData.TotalCountRate:F2} cps"); } } catch (Exception ex) { LogMessage($"谱仪数据解析错误: {ex.Message}, 数据: {data}"); } } private async void GPSDataReader(CancellationToken token) { while (!token.IsCancellationRequested && isRunning) { await Task.Delay(1000, token); } } private async void SpectrometerDataReader(CancellationToken token) { while (!token.IsCancellationRequested && isRunning) { try { LogMessage("尝试读取谱仪数据"); if (spectrometerPort != null && spectrometerPort.IsOpen) { spectrometerPort.DiscardInBuffer(); spectrometerPort.Write("$clear\r\n"); await Task.Delay(50, token); // 等待刷新完成 int interval = (int)(numericUpDownRefreshInterval.Value * 1000); spectrometerPort.Write("$start\r\n"); // spectrometerPort.ReadTimeout = 1500; await Task.Delay(interval, token); // 等待刷新完成 // // 先发送刷新指令 // spectrometerPort.Write("$refresh\r\n"); // await Task.Delay(500, token); // 等待刷新完成 spectrometerPort.Write("$getSperesult\r\n"); await Task.Delay(50, token); // 等待刷新完成 } await Task.Delay(50, token); } catch (Exception ex) { LogMessage($"谱仪数据读取错误: {ex.Message}"); await Task.Delay(5000, token); } } } private async void MQTTDataSender(CancellationToken token) { // 在数据发送线程中初始化MQTT连接 // InitializeMQTT(); while (!token.IsCancellationRequested && isRunning) { try { if (spectrometerDataQueue.TryDequeue(out SpectrometerData? spectData) && spectData != null) { // 查找最近的GPS数据 GPSData? nearestGPS = FindNearestGPSData(spectData.Timestamp); if (nearestGPS != null) { var mqttData = new { OptTime = spectData.Timestamp.ToString("yyyy-MM-dd HH:mm:ss.fff"), PointLat = nearestGPS.Latitude.ToString("F8"), PointLng = nearestGPS.Longitude.ToString("F8"), Value = spectData.DoseRate / 1000000000.0 // 转换为合适的单位 }; string json = JsonConvert.SerializeObject(mqttData); string? topic = mqttConfig?.Topic; if (mqttClient != null && mqttClient.IsConnected) { _ = mqttClient.Publish(topic, Encoding.UTF8.GetBytes(json)); LogMessage($"MQTT发送: {json}"); } else { LogMessage($"MQTT未连接,数据记录: {json}"); } } } await Task.Delay(100, token); } catch (Exception ex) { LogMessage($"MQTT发送错误: {ex.Message}"); await Task.Delay(5000, token); } } } private GPSData? FindNearestGPSData(DateTime targetTime) { GPSData? nearest = null; TimeSpan minDiff = TimeSpan.MaxValue; var gpsDataList = gpsDataQueue.ToArray(); foreach (var gpsData in gpsDataList) { TimeSpan diff = Math.Abs((gpsData.Timestamp - targetTime).Ticks) == (gpsData.Timestamp - targetTime).Ticks ? gpsData.Timestamp - targetTime : targetTime - gpsData.Timestamp; if (diff < minDiff) { minDiff = diff; nearest = gpsData; } } return nearest; } private async void LogWriter(CancellationToken token) { while (!token.IsCancellationRequested) { try { if (logQueue.TryDequeue(out string? logMessage) && logMessage != null && !string.IsNullOrEmpty(logFileName)) { await File.AppendAllTextAsync(logFileName, logMessage + Environment.NewLine, token); } await Task.Delay(100, token); } catch (Exception ex) { // 避免日志写入错误导致的无限循环 Console.WriteLine($"日志写入错误: {ex.Message}"); await Task.Delay(1000, token); } } } private void LogMessage(string message, bool screenOutput = true) { // 处理日志消息 message = message.Replace("\r", "RR"); message = message.Replace("\n", "NN"); string logEntry = $"[{DateTime.Now:yyyy-MM-dd HH:mm:ss.fff}] {message}"; // 添加到日志队列 logQueue.Enqueue(logEntry); if (screenOutput) { // 更新UI if (textBoxLog.InvokeRequired) { textBoxLog.Invoke(new Action(() => { textBoxLog.AppendText(logEntry + Environment.NewLine); textBoxLog.ScrollToCaret(); })); } else { textBoxLog.AppendText(logEntry + Environment.NewLine); textBoxLog.ScrollToCaret(); } } } protected override void OnFormClosing(FormClosingEventArgs e) { if (isRunning) { StopDataCollection(); } base.OnFormClosing(e); } } public class GPSData { public double Latitude { get; set; } public double Longitude { get; set; } public DateTime Timestamp { get; set; } } public class SpectrometerData { public DateTime Timestamp { get; set; } public double RealTime { get; set; } public double DeadTime { get; set; } public double DoseRate { get; set; } public double TotalCountRate { get; set; } } }