from usr import modbus_rtu_serial from usr import FireEquipmentPowerDetector from umqtt import MQTTClient from misc import Power from machine import Pin from machine import UART import uos import sim import net import app_fota import request import modem import checkNet import ujson import utime import _thread PROJECT_NAME = "FireEquipmentPowerSupplyGateway" PROJECT_VERSION = "0.0.2" with open('/usr/update_setting.json', mode='r') as f: local_update_setting = ujson.load(f) f.close() PROJECT_NAME = local_update_setting['app_name'] PROJECT_VERSION = local_update_setting['app_version'] checknet = checkNet.CheckNetwork(PROJECT_NAME, PROJECT_VERSION) usr_fota = app_fota.new() modbus_port = modbus_rtu_serial.Serial() config_port = UART(UART.UART0, 9600, 8, 0, 1, 0) STATUS_LED = Pin(Pin.GPIO2, Pin.OUT, Pin.PULL_DISABLE, 0) detectors = [] lock = _thread.allocate_lock() settings = {} server = 'mqtt.vitalong.cn' port = 1883 IMEI = modem.getDevImei() ICCID = sim.getIccid() CSQ = net.csqQueryPoll() mqtt_client = None receive_topic = "topic/F4/2/" + IMEI publish_topic = "topic/F4/1/" + IMEI test_topic = "topic/F4/original/" + IMEI sys_log = { 'stagecode': '', 'net_error': '', 'subcode': '', 'ICCID': ICCID, 'IMEI': IMEI, 'CSQ': CSQ, 'disconn_reason': 'no' } def update(): global local_update_setting print("开始检查更新...") try: response = request.get(url=local_update_setting['update_url'] + 'update_setting.json') server_update_setting = response.json() if local_update_setting['app_name'] == server_update_setting['app_name'] and local_update_setting['app_version'] == server_update_setting['app_version']: print("已是最新版本,无需升级") elif local_update_setting['app_name'] == server_update_setting['app_name'] and local_update_setting['app_version'] < server_update_setting['app_version']: print("检测到新版本:\r app name:\t{},\r app version:\t{},\r file list:\t{}\r开始升级...".format(server_update_setting['app_name'], server_update_setting['app_version'], server_update_setting['file_list'])) file_list = server_update_setting['file_list'].split(',') download_list = [] for file in file_list: dict_file = {'url': (local_update_setting['update_url'] + file).replace(" ", ''), 'file_name': ('/usr/' + file).replace(" ", '')} download_list.append(dict_file) print("下载文件列表: ", download_list) result = usr_fota.bulk_download(download_list) print("更新文件下载失败列表: ", result) local_update_setting = server_update_setting uos.remove("/usr/update_setting.json") with open("/usr/update_setting.json", mode="w") as fw: ujson.dump(local_update_setting, fw) fw.close() usr_fota.set_update_flag() print("更新下载完成,将在3秒后重启完成更新。") utime.sleep(3) Power.powerRestart() except Exception as e: print("更新失败,error:", e) def th_update(): global local_update_setting while True: utime.sleep(20) print("开始检查更新...") try: local_time = utime.localtime() if local_time[3] == 00 and local_time[4] == int(IMEI[-2:]) % 60: response = request.get(url=local_update_setting['update_url'] + 'update_setting.json') server_update_setting = response.json() if local_update_setting['app_name'] == server_update_setting['app_name'] and local_update_setting['app_version'] == server_update_setting['app_version']: print("已是最新版本,无需升级") elif local_update_setting['app_name'] == server_update_setting['app_name'] and local_update_setting['app_version'] < server_update_setting['app_version']: print("检测到新版本:\r app name:\t{},\r app version:\t{},\r file list:\t{}\r开始升级...".format(server_update_setting['app_name'], server_update_setting['app_version'], server_update_setting['file_list'])) file_list = server_update_setting['file_list'].split(',') download_list = [] for file in file_list: dict_file = {'url': (local_update_setting['update_url'] + file).replace(" ", ''), 'file_name': ('/usr/' + file).replace(" ", '')} download_list.append(dict_file) print("下载文件列表: ", download_list) result = usr_fota.bulk_download(download_list) print("更新文件下载失败列表: ", result) local_update_setting = server_update_setting uos.remove("/usr/update_setting.json") with open("/usr/update_setting.json", mode="w") as fw: ujson.dump(local_update_setting, fw) fw.close() usr_fota.set_update_flag() print("更新下载完成,将在3秒后重启完成更新。") utime.sleep(3) Power.powerRestart() except Exception as e: print("更新失败,error:", e) def init(): global settings global server global port global detectors global mqtt_client try: with open('/usr/setting.json', mode='r') as f: settings = ujson.load(f) server = settings['server'] port = settings['port'] mqtt_client = MQTTClient("FireDetectorGateway/" + IMEI, server, port, user="vitalong", password="vitalong@2019", keepalive=120) f.close() except Exception as e: print("加载用户设置失败: {},转为默认设置。".format(e)) with open('/usr/setting_default.json', mode='r') as f: settings = ujson.load(f) server = settings['server'] port = settings['port'] mqtt_client = MQTTClient("FireDetectorGateway/" + IMEI, server, port, user="vitalong", password="vitalong@2019", keepalive=120) f.close() mqtt_client.set_callback(sub_cb) mqtt_client.connect(True) mqtt_client.subscribe(receive_topic) print("连接至:{}, 订阅主题: {}".format(server, receive_topic)) print("Mqtt 客户端初始化成功!") STATUS_LED.write(1) if settings['start_addr'] <= 0 or settings['start_addr'] > 240 or settings['end_addr'] <= 0 or settings['end_addr'] > 240 or settings['end_addr'] < settings['start_addr']: settings['start_addr'] = 1 settings['end_addr'] = 1 detectors = [] for i in range(settings['start_addr'], settings['end_addr'] + 1): try: detector = FireEquipmentPowerDetector.Detector(device_id=i) detector.input_reg = ['{:0>4}'.format(j) for j in modbus_port.read_input_registers(detector.device_id, 0, 12, signed=False)] detector.input_reg.append('0000') detector.input_reg[1] = '{:0>4x}'.format(int(detector.input_reg[1])) detector.input_reg[2] = '{:0>4x}'.format(int(detector.input_reg[2])) detectors.append(detector) except Exception as e: print(e, "current device address is: ", i) detector = FireEquipmentPowerDetector.Detector(device_id=i) detector.input_reg[-1] = '0001' detectors.append(detector) def th_polling(): while True: utime.sleep(1) lock.acquire() for idx, detector in enumerate(detectors): try: detector.input_reg = ['{:0>4}'.format(j) for j in modbus_port.read_input_registers(detector.device_id, 0, 12, signed=False)] detector.input_reg.append('0000') detector.input_reg[1] = '{:0>4x}'.format(int(detector.input_reg[1])) detector.input_reg[2] = '{:0>4x}'.format(int(detector.input_reg[2])) if detector.input_reg[3] > '0500' or \ detector.input_reg[4] > '0500' or \ detector.input_reg[5] > '0500' or \ detector.input_reg[6] > '0500' or \ detector.input_reg[7] > '0500' or \ detector.input_reg[8] > '0500' or \ detector.input_reg[9] > '1000' or \ detector.input_reg[10] > '1000' or \ detector.input_reg[11] > '1000': detector.input_reg = detector.data_temp if detector.input_reg[3] < '0010' and \ detector.input_reg[4] < '0010' and \ detector.input_reg[5] < '0010': detector.input_reg[1] = '0015' detector.input_reg[2] = '0003' detector.data_temp = detector.input_reg if detector.input_reg[1:3] + detector.input_reg[12:] != detector.state_temp and \ detector.input_reg[3] < '0500' and \ detector.input_reg[4] < '0500' and \ detector.input_reg[5] < '0500' and \ detector.input_reg[6] < '0500' and \ detector.input_reg[7] < '0500' and \ detector.input_reg[8] < '0500' and \ detector.input_reg[9] < '1000' and \ detector.input_reg[10] < '1000' and \ detector.input_reg[11] < '1000': print("报警/通信状态改变推送") push_msg = '{"DeviceId":"' + '{:0>3d}'.format(detector.device_id) + '","Cmd":"83","Data":"' + str(detector.input_reg).replace('[', '').replace(']', '').replace('\'', '').replace(' ', '') + '"}' mqtt_client.publish(publish_topic, push_msg) detector.state_temp = detector.input_reg[1:3] + detector.input_reg[12:] detectors[idx] = detector except Exception as e: print("轮询探测器出现异常: {}, 地址: {}".format(e, detector.device_id)) detector.input_reg[-1] = '0001' detectors[idx] = detector lock.release() print("polling thread is running.\r") def sub_cb(topic, msg): global local_update_setting global detectors global test_topic global publish_topic try: lock.acquire() print("receive thread receive a msg: {} from {}\r".format(msg.decode(), topic.decode())) if topic.decode() == receive_topic: recv_data = ujson.loads(msg.decode()) print(recv_data) if recv_data['Cmd'] == '2': print("设置探测器起止地址\r") data = recv_data['Data'].split(',') settings['start_addr'] = int(data[0]) settings['end_addr'] = int(data[1]) settings['has_host'] = int(data[2]) print(settings) try: uos.remove("/usr/setting.json") except Exception as e: print("删除文件setting.json出现异常: ", e) with open("/usr/setting.json", mode="w") as fd: ujson.dump(settings, fd) fd.close() mqtt_client.publish(publish_topic, '{"DeviceId":"0","Cmd":"2","Data":"1"}') utime.sleep(0.5) Power.powerRestart() elif recv_data['Cmd'] == '4': print("探测器全部自检\r") try: modbus_port.write_single_coil(0, 1, 0xFF00) except Exception as e: print(e.args[0]) mqtt_client.publish(publish_topic, '{"DeviceId":"0","Cmd":"4","Data":"1"}') elif recv_data['Cmd'] == '6': print("探测器全部复位\r") try: modbus_port.write_single_coil(0, 0, 0xFF00) except Exception as e: print(e.args[0]) mqtt_client.publish(publish_topic, '{"DeviceId":"0","Cmd":"6","Data":"1"}') elif recv_data['Cmd'] == '8': print("网关上报时间间隔设置\r") settings['upload_interval'] = int(recv_data['Data']) * 60 print(settings) try: uos.remove("usr/setting.json") except Exception as e: print("删除文件setting.json出现异常: ", e) with open("usr/setting.json", mode="w") as fd: ujson.dump(settings, fd) fd.close() mqtt_client.publish(publish_topic, '{"DeviceId":"0","Cmd":"8","Data":"1"}') utime.sleep(0.5) Power.powerRestart() elif recv_data['Cmd'] == '9': print("设置探测器报警阀值\r") data = recv_data['Data'].split(',') print("地址: ", recv_data['DeviceId'], "过压阀值: ", data[0], "欠压阀值: ", data[1], "过流阀值: ", data[2]) modbus_port.write_single_register(int(recv_data['DeviceId']), 3, int(data[0]), signed=False) utime.sleep_ms(200) modbus_port.write_single_register(int(recv_data['DeviceId']), 4, int(data[1]), signed=False) utime.sleep_ms(200) modbus_port.write_single_register(int(recv_data['DeviceId']), 5, int(data[2]), signed=False) utime.sleep_ms(200) push_msg = '{"DeviceId":"' + '{:0>3d}'.format(int(recv_data['DeviceId'])) + '","Cmd":"9","Data":"1"}' mqtt_client.publish(publish_topic, push_msg) elif recv_data['Cmd'] == '10': print("探测器阈值查询\rAddress: ", int(recv_data['DeviceId'])) holding_reg = ['{:0>4}'.format(i) for i in modbus_port.read_holding_registers(int(recv_data['DeviceId']), 3, 10, signed=False)] holding_reg = holding_reg[0:3] + holding_reg[9:] push_msg = '{"DeviceId":"' + '{:0>3d}'.format(int(recv_data['DeviceId'])) + '","Cmd":"81","Data":"' + str(holding_reg).replace('[', '').replace(']', '').replace('\'', '').replace(' ', '') + '"}' mqtt_client.publish(publish_topic, push_msg) elif recv_data['Cmd'] == '12': print("远程重启\r") mqtt_client.publish(publish_topic, "received restart command, will restart after 1s...") utime.sleep(1) Power.powerRestart() elif recv_data['Cmd'] == '13': print("软件更新配置\r") local_update_setting = recv_data['Data'] try: uos.remove("/usr/update_setting.json") except Exception as e: print("删除文件update_setting.json出现异常: ", e) with open("/usr/update_setting.json", mode='w') as fw: ujson.dump(local_update_setting, fw) fw.close() mqtt_client.publish(publish_topic, '{"DeviceId":"0","Cmd":"13","Data":"1"}') elif recv_data['Cmd'] == '14': print("软件版本查询\r") push_msg = 'app_name: ' + local_update_setting['app_name'] + ', version: ' + local_update_setting['app_version'] mqtt_client.publish("topic/F4/1/" + IMEI, push_msg) elif recv_data['Cmd'] == '15': print("互感器型号查询\r") Transformer_type = modbus_port.read_holding_registers(int(recv_data['DeviceId']), 6, 1, signed=False) utime.sleep_ms(100) push_msg = '{"DeviceId":"' + '{:0>3d}'.format(int(recv_data['DeviceId'])) + '","Cmd":"15","Data":"' + str(Transformer_type).replace('(', '').replace(')', '').replace(',', '').replace(' ', '') + '"}' mqtt_client.publish(publish_topic, push_msg) elif recv_data['Cmd'] == '16': print("互感器型号配置\r") print("地址: ", recv_data['DeviceId'], "互感器类型代号: ", recv_data['Data']) modbus_port.write_single_register(int(recv_data['DeviceId']), 6, int(recv_data['Data']), signed=False) utime.sleep_ms(100) push_msg = '{"DeviceId":"' + '{:0>3d}'.format(int(recv_data['DeviceId'])) + '","Cmd":"16","Data":"1"}' mqtt_client.publish(publish_topic, push_msg) elif recv_data['Cmd'] == '17': print("查询探测器运行数据缓存\r") data_str = str(detectors[int(recv_data['DeviceId']) - 1].data_temp).replace('[', '').replace('\'', '').replace(' ', '').replace(']', '') push_msg = '{"DeviceId":"' + '{:0>3d}'.format(int(recv_data['DeviceId'])) + '","Cmd":"17","Data":"' + data_str + '"}' mqtt_client.publish(publish_topic, push_msg) elif recv_data['Cmd'] == '18': print("配置探测器运行数据缓存\r") print("data -- ", recv_data['Data']) data_str = recv_data['Data'].replace(' ', '').replace('"', '').replace('\'', '') detectors[int(recv_data['DeviceId']) - 1].data_temp = data_str.split(',') push_msg = '{"DeviceId":"' + '{:0>3d}'.format(int(recv_data['DeviceId'])) + '","Cmd":"18","Data":"1"}' mqtt_client.publish(publish_topic, push_msg) elif recv_data['Cmd'] == '19': print("查询原始值\r") data = ['{:0>4}'.format(i) for i in modbus_port.read_input_registers(int(recv_data['DeviceId']), 0, 12, signed=False)] mqtt_client.publish(test_topic, str(data)) lock.release() except Exception as e: print("mqtt消息处理出现异常: ", e) lock.release() def th_push(): global mqtt_client while True: utime.sleep(settings['upload_interval']) for detector in detectors: try: push_msg = '{"DeviceId":"' + '{:0>3d}'.format(detector.device_id) + '","Cmd":"82","Data":"' + str(detector.input_reg).replace('[', '').replace(']', '').replace('\'', '').replace(' ', '') + '"}' mqtt_client.publish(publish_topic, push_msg) except Exception as e: print("推送异常: ", e) print(detector.input_reg) print("push thread is running.\r") def th_recv(): global mqtt_client while True: try: mqtt_client.wait_msg() except Exception as e: print("mqtt消息接收线程出现异常:{}".format(e), type(e)) if e.args[0] == 0: pass else: print("mqtt消息接收线程出现异常:{}导致系统重启。".format(e)) utime.sleep_ms(50) Power.powerRestart() def th_log(): while True: try: if config_port.any() is not 0: msg = config_port.read(config_port.any()).decode('utf8').split(':') print('msg:', msg, 'command:', msg[0]) command = msg[0] print(command) if command is 'log': if sys_log['stagecode'] is 1: sys_log['net_error'] = 'SIM Card Error' elif sys_log['stagecode'] is 2: sys_log['net_error'] = 'Data Call Time Out' elif sys_log['stagecode'] is 3: sys_log['net_error'] = 'No NET Error' sys_log['ICCID'] = sim.getIccid() sys_log['CSQ'] = net.csqQueryPoll() config_port.write(str(sys_log)) if command is 'server': settings['server'] = msg[1] settings['port'] = msg[2] config_port.write(settings['server'] + ':' + settings['port']) uos.remove("/usr/setting.json") with open("/usr/setting.json", mode="w") as fw: ujson.dump(settings, fw) fw.close() utime.sleep(0.5) Power.powerRestart() except Exception as e: config_port.write(str(e)) utime.sleep(0.5) if __name__ == '__main__': try: utime.sleep(5) checknet.poweron_print_once() stagecode, subcode = checknet.wait_network_connected() sys_log['stagecode'] = stagecode sys_log['subcode'] = subcode _thread.start_new_thread(th_log, ()) if stagecode == 1 and subcode == 0: print("未插SIM卡,请检查。") while True: STATUS_LED.write(1) utime.sleep(0.5) STATUS_LED.write(0) utime.sleep(1) elif stagecode == 3 and subcode == 1: print('网络连接成功!\r') update() utime.sleep(0.5) _thread.start_new_thread(th_update, ()) utime.sleep(0.5) print("正在初始化...") init() _thread.start_new_thread(th_polling, ()) utime.sleep(0.5) _thread.start_new_thread(th_recv, ()) while True: th_push() else: print('网络连接失败! stagecode = {}, subcode = {}\r'.format(stagecode, subcode)) print("自动重启以尝试重新拨号...") utime.sleep_ms(50) Power.powerRestart() except Exception as e: print("主线程出现异常: {},导致模组重启 ".format(e)) utime.sleep_ms(50) Power.powerRestart()