from usr import modbus_rtu_serial from usr import AirSwitch from umqtt import MQTTClient from misc import Power from machine import Pin from machine import UART import uos import sim import net import app_fota import request import modem import checkNet import ujson import utime import _thread PROJECT_NAME = "IntelligentAirSwitch" PROJECT_VERSION = "0.0.1" with open('/usr/update_setting.json', mode='r') as f: local_update_setting = ujson.load(f) f.close() PROJECT_NAME = local_update_setting['app_name'] PROJECT_VERSION = local_update_setting['app_version'] checknet = checkNet.CheckNetwork(PROJECT_NAME, PROJECT_VERSION) usr_fota = app_fota.new() modbus_port = modbus_rtu_serial.Serial() config_port = UART(UART.UART0, 9600, 8, 0, 1, 0) STATUS_LED = Pin(Pin.GPIO2, Pin.OUT, Pin.PULL_DISABLE, 0) detectors = [] lock = _thread.allocate_lock() settings = {} server = 'mqtt.vitalong.cn' port = 1883 IMEI = modem.getDevImei() ICCID = sim.getIccid() CSQ = net.csqQueryPoll() mqtt_client = None receive_topic = "topic/F9/2/" + IMEI publish_topic = "topic/F9/1/" + IMEI sys_log = { 'stagecode': '', 'net_error': '', 'subcode': '', 'ICCID': ICCID, 'IMEI': IMEI, 'CSQ': CSQ, 'disconn_reason': 'no' } push_msg = { "DeviceId": "0", "Cmd": "0", "Data": "" } def update(): global local_update_setting print("开始检查更新...") try: response = request.get(url=local_update_setting['update_url'] + 'update_setting.json') server_update_setting = response.json() if local_update_setting['app_name'] == server_update_setting['app_name'] and local_update_setting['app_version'] == server_update_setting['app_version']: print("已是最新版本,无需升级") elif local_update_setting['app_name'] == server_update_setting['app_name'] and local_update_setting['app_version'] < server_update_setting['app_version']: print("检测到新版本:\r app name:\t{},\r app version:\t{},\r file list:\t{}\r开始升级...".format(server_update_setting['app_name'], server_update_setting['app_version'], server_update_setting['file_list'])) file_list = server_update_setting['file_list'].split(',') download_list = [] for file in file_list: dict_file = {'url': (local_update_setting['update_url'] + file).replace(" ", ''), 'file_name': ('/usr/' + file).replace(" ", '')} download_list.append(dict_file) print("下载文件列表: ", download_list) result = usr_fota.bulk_download(download_list) print("更新文件下载失败列表: ", result) local_update_setting = server_update_setting uos.remove("/usr/update_setting.json") with open("/usr/update_setting.json", mode="w") as fw: ujson.dump(local_update_setting, fw) fw.close() usr_fota.set_update_flag() print("更新下载完成,将在3秒后重启完成更新。") utime.sleep(3) Power.powerRestart() except Exception as e: print("更新失败,error:", e) def th_update(): global local_update_setting while True: utime.sleep(20) print("开始检查更新...") try: local_time = utime.localtime() if local_time[3] == 1 and local_time[4] == int(IMEI[-2:]) % 60: response = request.get(url=local_update_setting['update_url'] + 'update_setting.json') server_update_setting = response.json() if local_update_setting['app_name'] == server_update_setting['app_name'] and local_update_setting['app_version'] == server_update_setting['app_version']: print("已是最新版本,无需升级") elif local_update_setting['app_name'] == server_update_setting['app_name'] and local_update_setting['app_version'] < server_update_setting['app_version']: print("检测到新版本:\r app name:\t{},\r app version:\t{},\r file list:\t{}\r开始升级...".format(server_update_setting['app_name'], server_update_setting['app_version'], server_update_setting['file_list'])) file_list = server_update_setting['file_list'].split(',') download_list = [] for file in file_list: dict_file = {'url': (local_update_setting['update_url'] + file).replace(" ", ''), 'file_name': ('/usr/' + file).replace(" ", '')} download_list.append(dict_file) print("下载文件列表: ", download_list) result = usr_fota.bulk_download(download_list) print("更新文件下载失败列表: ", result) local_update_setting = server_update_setting uos.remove("/usr/update_setting.json") with open("/usr/update_setting.json", mode="w") as fw: ujson.dump(local_update_setting, fw) fw.close() usr_fota.set_update_flag() print("更新下载完成,将在3秒后重启完成更新。") utime.sleep(3) Power.powerRestart() except Exception as e: print("更新失败,error:", e) def init(): global settings global server global port global detectors global mqtt_client try: with open('/usr/setting.json', mode='r') as f: settings = ujson.load(f) server = settings['server'] port = settings['port'] mqtt_client = MQTTClient("IntelligentAirSwitchGateway/" + IMEI, server, port, user="vitalong", password="vitalong@2019", keepalive=120) f.close() except Exception as e: print("加载用户设置失败: {},转为默认设置。".format(e)) with open('/usr/setting_default.json', mode='r') as fd: settings = ujson.load(fd) server = settings['server'] port = settings['port'] mqtt_client = MQTTClient("FireDetectorGateway/" + IMEI, server, port, user="vitalong", password="vitalong@2019", keepalive=120) fd.close() mqtt_client.set_callback(sub_cb) mqtt_client.connect(True) mqtt_client.subscribe(receive_topic) print("连接至:{}, 订阅主题: {}".format(server, receive_topic)) print("Mqtt 客户端初始化成功!") STATUS_LED.write(1) if settings['start_addr'] <= 0 or settings['start_addr'] > 240 or settings['end_addr'] <= 0 or settings['end_addr'] > 240 or settings['end_addr'] < settings['start_addr']: settings['start_addr'] = 1 settings['end_addr'] = 1 detectors = [] for i in range(settings['start_addr'], settings['end_addr'] + 1): try: detector = AirSwitch.Detector(device_id=i) detectors.append(detector) except Exception as e: print(e, "current device address is: ", i) def poll(detector): global modbus_port global mqtt_client modbus_port._uart.read(modbus_port._uart.any()) data1 = modbus_port.read_holding_registers(detector.device_id, 0, 12, signed=False) utime.sleep_ms(300) data2 = modbus_port.read_holding_registers(detector.device_id, 0x07, 3, signed=True) utime.sleep_ms(300) data3 = modbus_port.read_holding_registers(detector.device_id, 0x38, 8, signed=True) utime.sleep_ms(300) data4 = modbus_port.read_holding_registers(detector.device_id, 0x40, 17, signed=False) utime.sleep_ms(300) data5 = modbus_port.read_holding_registers(detector.device_id, 0x91, 3, signed=False) if len(data1) == 12 and len(data2) == 3 and len(data3) == 8 and len(data4) == 17 and len(data5) == 3: detector.holding_reg[0:12] = data1[0:12] detector.holding_reg[7:10] = data2[:] detector.holding_reg[10] = detector.holding_reg[10] & 1 detector.holding_reg[12:20] = data3[:] detector.holding_reg[20:29] = data4[0:9] detector.holding_reg[29] = (data4[10] << 8) | data4[9] detector.holding_reg[30] = (data4[12] << 8) | data4[11] detector.holding_reg[31] = (data4[14] << 8) | data4[13] detector.holding_reg[32] = (data4[16] << 8) | data4[15] detector.holding_reg[33:36] = data5[:] detector.holding_reg[36] = 0 if (detector.holding_reg[10:12] != detector.state_temp) and (len(detector.holding_reg) is 37): push_msg['DeviceId'] = str(detector.device_id) push_msg['Cmd'] = "83" push_msg['Data'] = str(detector.holding_reg).replace('[', '').replace(']', '').replace(' ', '') push_msg_str = str(push_msg).replace(' ', '').replace('\'', '"') mqtt_client.publish(publish_topic, push_msg_str) print("状态改变推送") detector.state_temp = detector.holding_reg[10:12] print(detector.holding_reg, '->', len(detector.holding_reg)) return detector else: Power.powerRestart() def th_polling(): global push_msg global detectors while True: utime.sleep(2) lock.acquire() for idx, detector in enumerate(detectors): try: detector = poll(detector) except Exception as e: print("轮询失败1次: ", e) utime.sleep(3) try: detector = poll(detector) except Exception as e: print("轮询失败2次: ", e) utime.sleep(3) try: detector = poll(detector) except Exception as e: print("轮询失败3次: ", e) utime.sleep(3) try: detector = poll(detector) except Exception as e: print("轮询失败4次: ", e) utime.sleep(3) try: detector = poll(detector) except Exception as e: print("轮询失败5次: ", e) utime.sleep(3) try: detector = poll(detector) except Exception as e: print("轮询失败6次: ", e) utime.sleep(3) try: detector = poll(detector) except Exception as e: print("轮询失败7次: ", e) utime.sleep(3) try: detector = poll(detector) except Exception as e: print("轮询失败8次: ", e) utime.sleep(3) try: detector = poll(detector) except Exception as e: print("轮询失败9次: ", e) utime.sleep(3) try: detector = poll(detector) except Exception as e: print("轮询探测器十次出现异常: {}, 地址: {}".format(e, detector.device_id)) detector.holding_reg[36] = 1 finally: pass detectors[idx] = detector lock.release() print("polling thread is running.\r") def sub_cb(topic, msg): global local_update_setting global detectors global push_msg try: lock.acquire() print("receive thread receive a msg: {} from {}\r".format(msg.decode(), topic.decode())) if topic.decode() == receive_topic: recv_data = ujson.loads(msg.decode()) print(recv_data) if recv_data['Cmd'] == '2': print("设置探测器起止地址\r") data = recv_data['Data'].split(',') settings['start_addr'] = int(data[0]) settings['end_addr'] = int(data[1]) print(settings) try: uos.remove("/usr/setting.json") except Exception as e: print("删除文件setting.json出现异常: ", e) with open("/usr/setting.json", mode="w") as fd: ujson.dump(settings, fd) fd.close() mqtt_client.publish(publish_topic, '{"DeviceId":"0","Cmd":"2","Data":"1"}') utime.sleep(2) Power.powerRestart() elif recv_data['Cmd'] == '8': print("网关上报时间间隔设置\r") settings['upload_interval'] = int(recv_data['Data']) * 60 print(settings) try: uos.remove("usr/setting.json") except Exception as e: print("删除文件setting.json出现异常: ", e) with open("usr/setting.json", mode="w") as fd: ujson.dump(settings, fd) fd.close() mqtt_client.publish(publish_topic, '{"DeviceId":"0","Cmd":"8","Data":"1"}') utime.sleep(2) Power.powerRestart() elif recv_data['Cmd'] == '9': print("设置探测器报警阀值\r") data = recv_data['Data'].split(',') data = [int(i) for i in data] print("地址: ", recv_data['DeviceId'], "电流过载阈值: ", data[0], "漏电电流阈值: ", data[1], "电压过载阈值: ", data[2], "电压欠压阈值: ", data[3], "温度超限阈值: ", data[4]) modbus_port.write_multiple_registers(int(recv_data['DeviceId']), 0x2A, data[0:4], signed=False) utime.sleep_ms(20) modbus_port.write_single_register(int(recv_data['DeviceId']), 0x2F, data[4], signed=False) mqtt_client.publish(publish_topic, '{"DeviceId":"' + recv_data['DeviceId'] + '","Cmd":"9","Data":"1"}') elif recv_data['Cmd'] == '10': print("探测器阈值查询\r地址: ", int(recv_data['DeviceId'])) push_msg['DeviceId'] = recv_data['DeviceId'] push_msg['Cmd'] = '81' data = [d for d in modbus_port.read_holding_registers(int(recv_data['DeviceId']), 0x2A, 6, signed=False)] data.remove(data[-2]) push_msg['Data'] = str(data).replace('[', '').replace(']', '').replace(' ', '') push_msg_str = str(push_msg).replace('\'', '"').replace(' ', '') mqtt_client.publish(publish_topic, push_msg_str) elif recv_data['Cmd'] == '12': print('远程分闸\r') modbus_port.write_single_register(int(recv_data['DeviceId']), 0x27, 0x0299, signed=False) utime.sleep_ms(20) if (modbus_port.read_holding_registers(int(recv_data['DeviceId']), 0x0A, 1, signed=False)[0] & 1) is 0: mqtt_client.publish(publish_topic, '{"DeviceId":"' + recv_data['DeviceId'] + '","Cmd":"12","Data":"1"}') elif recv_data['Cmd'] == '13': print('远程合闸\r') modbus_port.write_single_register(int(recv_data['DeviceId']), 0x27, 0x0188, signed=False) utime.sleep_ms(20) if (modbus_port.read_holding_registers(int(recv_data['DeviceId']), 0x0A, 1, signed=False)[0] & 1) is 1: mqtt_client.publish(publish_topic, '{"DeviceId":"' + recv_data['DeviceId'] + '","Cmd":"13","Data":"1"}') elif recv_data['Cmd'] == '14': print('清除电能数据\r') modbus_port.write_multiple_registers(int(recv_data['DeviceId']), 0x49, [0, 0], signed=False) utime.sleep_ms(20) if (modbus_port.read_holding_registers(int(recv_data['DeviceId']), 0x49, 1, signed=False)[0] | modbus_port.read_holding_registers(int(recv_data['DeviceId']), 0x4A, 1, signed=False)[0]) is 0: mqtt_client.publish(publish_topic, '{"DeviceId":"' + recv_data['DeviceId'] + '","Cmd":"14","Data":"1"}') elif recv_data['Cmd'] == '15': print("远程重启\r") mqtt_client.publish(publish_topic, '{"DeviceId":"0","Cmd":"15","Data":"1"}') utime.sleep(2) Power.powerRestart() elif recv_data['Cmd'] == '16': print("软件更新配置\r") local_update_setting = recv_data['Data'] try: uos.remove("/usr/update_setting.json") except Exception as e: print("删除文件update_setting.json出现异常: ", e) with open("/usr/update_setting.json", mode='w') as fw: ujson.dump(local_update_setting, fw) fw.close() mqtt_client.publish(publish_topic, '{"DeviceId":"0","Cmd":"16","Data":"1"}') elif recv_data['Cmd'] == '17': print("软件版本查询\r") msg = '{"app_name":"' + local_update_setting['app_name'] + '", "version":"' + local_update_setting['app_version'] + '"}' mqtt_client.publish(publish_topic, msg) lock.release() except Exception as e: print("mqtt消息处理出现异常: ", e) lock.release() def th_push(): global mqtt_client global push_msg while True: for detector in detectors: lock.acquire() try: push_msg['DeviceId'] = str(detector.device_id) push_msg['Cmd'] = "82" push_msg['Data'] = str(detector.holding_reg).replace('[', '').replace(']', '').replace(' ', '') push_msg_str = str(push_msg).replace(' ', '').replace('\'', '"') mqtt_client.publish(publish_topic, push_msg_str) except Exception as e: print("推送异常: ", e) print(detector.holding_reg) lock.release() print("push thread is running.\r") utime.sleep(settings['upload_interval']) def th_recv(): global mqtt_client while True: try: mqtt_client.wait_msg() except Exception as e: print("mqtt消息接收线程出现异常:{}".format(e), type(e)) if e.args[0] == 0: pass else: print("mqtt消息接收线程出现异常:{}导致系统重启。".format(e)) utime.sleep_ms(50) Power.powerRestart() def th_log(): while True: try: if config_port.any() is not 0: msg = config_port.read(config_port.any()).decode('utf8').split(':') print('msg:', msg, 'command:', msg[0]) command = msg[0] print(command) if command is 'log': if sys_log['stagecode'] is 1: sys_log['net_error'] = 'SIM Card Error' elif sys_log['stagecode'] is 2: sys_log['net_error'] = 'Data Call Time Out' elif sys_log['stagecode'] is 3: sys_log['net_error'] = 'No NET Error' sys_log['ICCID'] = sim.getIccid() sys_log['CSQ'] = net.csqQueryPoll() config_port.write(str(sys_log)) if command is 'server': settings['server'] = msg[1] settings['port'] = msg[2] config_port.write(settings['server'] + ':' + settings['port']) uos.remove("/usr/setting.json") with open("/usr/setting.json", mode="w") as fw: ujson.dump(settings, fw) fw.close() utime.sleep(0.5) Power.powerRestart() except Exception as e: config_port.write(str(e)) utime.sleep(0.5) if __name__ == '__main__': try: utime.sleep(5) checknet.poweron_print_once() stagecode, subcode = checknet.wait_network_connected() sys_log['stagecode'] = stagecode sys_log['subcode'] = subcode _thread.start_new_thread(th_log, ()) if stagecode == 1 and subcode == 0: while True: print("未插SIM卡,请检查。") STATUS_LED.write(1) utime.sleep(0.5) STATUS_LED.write(0) utime.sleep(1) elif stagecode == 3 and subcode == 1: print('网络连接成功!\r') update() utime.sleep(0.5) _thread.start_new_thread(th_update, ()) utime.sleep(0.5) print("正在初始化...") init() _thread.start_new_thread(th_polling, ()) utime.sleep(0.5) _thread.start_new_thread(th_recv, ()) while True: utime.sleep(2) th_push() else: print('网络连接失败! stagecode = {}, subcode = {}\r'.format(stagecode, subcode)) print("自动重启以尝试重新拨号...") utime.sleep_ms(50) Power.powerRestart() except Exception as e: print("主线程出现异常: {},导致模组重启 ".format(e)) utime.sleep_ms(50) Power.powerRestart()