from usr import modbus_rtu_serial from usr import WaterPressDetector from umqtt import MQTTClient from misc import Power from machine import Pin from machine import UART import uos import net import sim import app_fota import request import modem import checkNet import ujson import utime import _thread PROJECT_NAME = "WaterPressAndLevelGateway" PROJECT_VERSION = "0.0.1" with open('/usr/update_setting.json', mode='r') as f: local_update_setting = ujson.load(f) f.close() PROJECT_NAME = local_update_setting['app_name'] PROJECT_VERSION = local_update_setting['app_version'] checknet = checkNet.CheckNetwork(PROJECT_NAME, PROJECT_VERSION) usr_fota = app_fota.new() modbus_port = modbus_rtu_serial.Serial() config_port = UART(UART.UART0, 9600, 8, 0, 1, 0) STATUS_LED = Pin(Pin.GPIO2, Pin.OUT, Pin.PULL_DISABLE, 0) detectors = [] lock = _thread.allocate_lock() settings = {} IMEI = modem.getDevImei() ICCID = sim.getIccid() CSQ = net.csqQueryPoll() server = 'mqtt.vitalong.cn' port = 1883 mqtt_client = None sys_log = { 'stagecode': '', 'net_error': '', 'subcode': '', 'ICCID': ICCID, 'IMEI': IMEI, 'CSQ': CSQ, 'disconn_reason': 'no' } def update(): global local_update_setting print("开始检查更新...") try: response = request.get(url=local_update_setting['update_url'] + 'update_setting.json') server_update_setting = response.json() if local_update_setting['app_name'] == server_update_setting['app_name'] and local_update_setting['app_version'] == server_update_setting['app_version']: print("已是最新版本,无需升级") elif local_update_setting['app_name'] == server_update_setting['app_name'] and local_update_setting['app_version'] < server_update_setting['app_version']: print("检测到新版本:\r app name:\t{},\r app version:\t{},\r file list:\t{}\r开始升级...".format( server_update_setting['app_name'], server_update_setting['app_version'], server_update_setting['file_list'])) file_list = server_update_setting['file_list'].split(',') download_list = [] for file in file_list: dict_file = {'url': (local_update_setting['update_url'] + file).replace(" ", ''), 'file_name': ('/usr/' + file).replace(" ", '')} download_list.append(dict_file) print("下载文件列表: ", download_list) result = usr_fota.bulk_download(download_list) print("更新文件下载失败列表: ", result) local_update_setting = server_update_setting uos.remove("/usr/update_setting.json") with open("/usr/update_setting.json", mode="w") as fw: ujson.dump(local_update_setting, fw) fw.close() usr_fota.set_update_flag() print("更新下载完成,将在3秒后重启完成更新。") utime.sleep(3) Power.powerRestart() except Exception as e: print("更新失败,error:", e) def th_update(): global local_update_setting while True: utime.sleep(20) print("开始检查更新...") try: local_time = utime.localtime() if local_time[3] == 00 and local_time[4] == int(IMEI[-2:]) % 60: response = request.get(url=local_update_setting['update_url'] + 'update_setting.json') server_update_setting = response.json() if local_update_setting['app_name'] == server_update_setting['app_name'] and local_update_setting['app_version'] == \ server_update_setting['app_version']: print("已是最新版本,无需升级") elif local_update_setting['app_name'] == server_update_setting['app_name'] and local_update_setting['app_version'] < \ server_update_setting['app_version']: print("检测到新版本:\r app name:\t{},\r app version:\t{},\r file list:\t{}\r开始升级...".format( server_update_setting['app_name'], server_update_setting['app_version'], server_update_setting['file_list'])) file_list = server_update_setting['file_list'].split(',') download_list = [] for file in file_list: dict_file = {'url': (local_update_setting['update_url'] + file).replace(" ", ''), 'file_name': ('/usr/' + file).replace(" ", '')} download_list.append(dict_file) print("下载文件列表: ", download_list) result = usr_fota.bulk_download(download_list) print("更新文件下载失败列表: ", result) local_update_setting = server_update_setting uos.remove("/usr/update_setting.json") with open("/usr/update_setting.json", mode="w") as fw: ujson.dump(local_update_setting, fw) fw.close() usr_fota.set_update_flag() print("更新下载完成,将在3秒后重启完成更新。") utime.sleep(3) Power.powerRestart() except Exception as e: print("更新失败,error:", e) def init(): global settings global server global port global detectors global mqtt_client try: with open('/usr/setting.json', mode='r') as f: settings = ujson.load(f) server = settings['server'] port = settings['port'] mqtt_client = MQTTClient("WaterPressAndLevelGateway/" + IMEI, server, port, user="vitalong", password="vitalong@2019", keepalive=120) f.close() except Exception as e: print("加载用户设置失败: {},转为默认设置。".format(e)) with open('/usr/setting_default.json', mode='r') as f: settings = ujson.load(f) server = settings['server'] port = settings['port'] mqtt_client = MQTTClient("WaterPressAndLevelGateway/" + IMEI, server, port, user="vitalong", password="vitalong@2019", keepalive=120) f.close() signal_strength = str(net.csqQueryPoll()) mqtt_client.set_callback(sub_cb) mqtt_client.connect(True) mqtt_client.subscribe("topic/F5/2/" + IMEI) print("连接至:{}, 订阅主题: topic/F5/2/{}".format(server, IMEI)) print("Mqtt 客户端初始化成功!") STATUS_LED.write(1) if settings['start_addr'] <= 0 or settings['start_addr'] > 240 or settings['end_addr'] <= 0 or settings[ 'end_addr'] > 240 or settings['end_addr'] < settings['start_addr']: settings['start_addr'] = 1 settings['end_addr'] = 1 detectors = [] slave_type = '{:0>4d}'.format(settings['slave_type']) for i in range(settings['start_addr'], settings['end_addr'] + 1): try: detector = WaterPressDetector.Detector() detector.device_id = i if slave_type == '0089': detector.data[0] = slave_type detector.data[1] = str(modbus_port.read_input_registers(i, 0, 1, signed=False)[0]) detector.data[2] = signal_strength detector.data[4] = settings['high_limit'] detector.data[5] = settings['low_limit'] detector.data[6] = '0000' detectors.append(detector) except Exception as e: print(e, "current device address is: ", i) detector = WaterPressDetector.Detector() detector.device_id = i if slave_type == '0089': detector.data[0] = slave_type detector.data[2] = signal_strength detector.data[4] = settings['high_limit'] detector.data[5] = settings['low_limit'] detectors.append(detector) def th_polling(): while True: lock.acquire() signal_strength = str(net.csqQueryPoll()) for idx, detector in enumerate(detectors): try: utime.sleep_ms(300) detector.data[1] = str(modbus_port.read_input_registers(detector.device_id, 0, 1, signed=False)[0]) if int(detector.data[1]) > int('65000'): detector.data[1] = '0000' detector.data[2] = signal_strength if detector.data[-1] == '0001': detector.data[-1] = '0000' push_msg = '{DeviceId:"' + '{:0>3d}'.format(detector.device_id) + '",Cmd:"83",Data:"' + str( detector.data).replace('[', '').replace(']', '').replace('\'', '').replace(' ', '') + '"}' mqtt_client.publish("topic/F5/1/" + IMEI, push_msg) if int(detector.data[4]) != 0: if int(detector.data[1]) >= int(detector.data[4]) or int(detector.data[1]) < int(detector.data[5]): if detector.data[3] == '0': detector.data[3] = '1' print("报警推送",detector.data) push_msg = '{DeviceId:"' + '{:0>3d}'.format(detector.device_id) + '",Cmd:"83",Data:"' + str( detector.data).replace('[', '').replace(']', '').replace('\'', '').replace(' ', '') + '"}' mqtt_client.publish("topic/F5/1/" + IMEI, push_msg) elif detector.data[3] == '1': detector.data[3] = '0' print("报警恢复推送", detector.data) push_msg = '{DeviceId:"' + '{:0>3d}'.format(detector.device_id) + '",Cmd:"83",Data:"' + str( detector.data).replace('[', '').replace(']', '').replace('\'', '').replace(' ', '') + '"}' mqtt_client.publish("topic/F5/1/" + IMEI, push_msg) else: detector.data[3] = '0' detectors[idx] = detector except Exception as e: print("轮询探测器时出现异常: ", e, "地址: ", detector.device_id) detector.data[-1] = '0001' detectors[idx] = detector lock.release() print("polling thread is running.\r") def sub_cb(topic, msg): global local_update_setting try: lock.acquire() print("receive thread receive a msg:\r") if topic.decode() == "topic/F5/2/" + IMEI: recv_data = ujson.loads(msg.decode()) if recv_data['Cmd'] == '1': print("设置目标服务器及端口(未实现)\r") print(recv_data['Data']) elif recv_data['Cmd'] == '2': print("设置探测器起止地址\r") data = recv_data['Data'].split(',') settings['start_addr'] = int(data[0]) settings['end_addr'] = int(data[1]) settings['slave_type'] = int(data[2]) + 86 print(settings) try: uos.remove("/usr/setting.json") except Exception as e: print("删除配置失败:", e) pass with open("/usr/setting.json", mode="w") as fd: ujson.dump(settings, fd) fd.close() mqtt_client.publish("topic/F5/1/" + IMEI, '{"DeviceId":"0","Cmd":"2","Data":"1"}') utime.sleep(0.5) Power.powerRestart() elif recv_data['Cmd'] == '8': print("网关上报时间间隔设置\r") settings['upload_interval'] = int(recv_data['Data']) * 60 print(settings) try: uos.remove("usr/setting.json") except Exception as e: print("删除配置失败:", e) pass with open("usr/setting.json", mode="w") as fd: ujson.dump(settings, fd) fd.close() mqtt_client.publish("topic/F5/1/" + IMEI, '{"DeviceId":"0","Cmd":"8","Data":"1"}') utime.sleep(0.5) Power.powerRestart() elif recv_data['Cmd'] == '9': print("设置探测器报警阀值\r") data = recv_data['Data'].split(',') settings['high_limit'] = data[0] settings['low_limit'] = data[1] print(settings) try: uos.remove("usr/setting.json") except Exception as e: print("删除配置失败:", e) pass with open("usr/setting.json", mode="w") as fd: ujson.dump(settings, fd) fd.close() mqtt_client.publish("topic/F5/1/" + IMEI, '{"DeviceId":"0","Cmd":"9","Data":"1"}') utime.sleep(0.5) Power.powerRestart() elif recv_data['Cmd'] == '11': print("时间校准\r") elif recv_data['Cmd'] == '12': print("远程重启\r") mqtt_client.publish("topic/F5/1/" + IMEI, "received restart command, will restart after 1s...") utime.sleep(1) Power.powerRestart() elif recv_data['Cmd'] == '13': print("软件更新配置\r") local_update_setting = recv_data['Data'] try: uos.remove("/usr/update_setting.json") except: pass with open("/usr/update_setting.json", mode='w') as fw: ujson.dump(local_update_setting, fw) fw.close() mqtt_client.publish("topic/F5/1/" + IMEI, '{"DeviceId":"","Cmd":"13","Data":"1"}') elif recv_data['Cmd'] == '14': print("软件版本查询\r") push_msg = 'app_name:' + local_update_setting['app_name'] + ', version:' + local_update_setting['app_version'] mqtt_client.publish("topic/F5/1/" + IMEI, push_msg) lock.release() except Exception as e: print("mqtt消息处理出现异常: ", e) lock.release() def th_push(): global mqtt_client while True: for detector in detectors: try: push_msg = '{DeviceId:"' + '{:0>3d}'.format(detector.device_id) + '",Cmd:"82",Data:"' + str( detector.data).replace('[', '').replace(']', '').replace('\'', '').replace(' ', '') + '"}' mqtt_client.publish("topic/F5/1/" + IMEI, push_msg) except Exception as e: print("推送出现异常,设备ID{},异常:{}\r设备数据:{}".format(detector.device_id, e, detector.data)) print("push thread is running.\r") utime.sleep(settings['upload_interval']) def th_recv(): global mqtt_client while True: try: mqtt_client.wait_msg() # 阻塞函数,监听消息 except Exception as e: print("mqtt消息接收线程出现异常:{}".format(e), type(e)) if e.args[0] == 0: pass else: print("mqtt消息接收线程出现异常:{}导致系统重启。".format(e)) sys_log['disconn_reason'] = 'mqtt except: ' + str(e) config_port.write(str(sys_log)) utime.sleep_ms(50) config_port.write("Reboot caused by mqtt receive exception") utime.sleep_ms(50) Power.powerRestart() def th_log(): while True: try: if config_port.any() is not 0: msg = config_port.read(config_port.any()).decode('utf8').split(':') print('msg:', msg, 'command:', msg[0]) command = msg[0] print(command) if command is 'log': if sys_log['stagecode'] is 1: sys_log['net_error'] = 'SIM Card Error' elif sys_log['stagecode'] is 2: sys_log['net_error'] = 'Data Call Time Out' elif sys_log['stagecode'] is 3: sys_log['net_error'] = 'No NET Error' sys_log['ICCID'] = sim.getIccid() sys_log['CSQ'] = net.csqQueryPoll() config_port.write(str(sys_log)) if command is 'server': settings['server'] = msg[1] settings['port'] = msg[2] config_port.write(settings['server'] + ':' + settings['port']) uos.remove("/usr/setting.json") with open("/usr/setting.json", mode="w") as fw: ujson.dump(settings, fw) fw.close() utime.sleep(0.5) Power.powerRestart() except Exception as e: config_port.write(str(e)) utime.sleep(0.5) if __name__ == '__main__': try: utime.sleep(5) checknet.poweron_print_once() stagecode, subcode = checknet.wait_network_connected(30) sys_log['stagecode'] = stagecode sys_log['subcode'] = subcode _thread.start_new_thread(th_log, ()) if stagecode == 1 and subcode == 0: print("未插SIM卡,请检查。") while True: STATUS_LED.write(1) utime.sleep(0.5) STATUS_LED.write(0) utime.sleep(1) elif stagecode == 3 and subcode == 1: print('网络连接成功!\r') update() utime.sleep(0.5) _thread.start_new_thread(th_update, ()) utime.sleep(0.5) print("正在初始化...") init() _thread.start_new_thread(th_polling, ()) utime.sleep(0.5) _thread.start_new_thread(th_recv, ()) while True: th_push() else: print('网络连接失败! stagecode = {}, subcode = {}\r'.format(stagecode, subcode)) print("自动重启以尝试重新拨号...") utime.sleep_ms(50) Power.powerRestart() except Exception as e: print("主线程出现异常: {},导致模组重启 ".format(e)) sys_log['disconn_reason'] = 'main thread exception: ' + str(e) config_port.write(str(sys_log)) utime.sleep_ms(50) config_port.write("reboot cased by main thread exception.") utime.sleep_ms(50) Power.powerRestart()