from usr import modbus_rtu_serial from usr import st7789v from usr import fonts from usr.Sensor import Sensor from umqtt import MQTTClient from misc import Power from machine import Pin from machine import Timer from machine import KeyPad import uos import net import sim import ethernet import app_fota import request import modem import qrcode import checkNet import ujson import utime import _thread import ntptime PROJECT_NAME = "CarPortGateway" PROJECT_VERSION = "0.1" red = 0xF800 green = 0x07E0 blue = 0x001F white = 0xFFFF black = 0x0000 purple = 0xF81F with open('/usr/update_setting.json', mode='r') as f: local_update_setting = ujson.load(f) f.close() with open("/usr/setting.json", mode='r') as f: settings = ujson.load(f) f.close() network_config = settings["network_setting"] channel_gather = settings["channel_gather"] channel_setting = settings["channel_setting"] channel_express = settings["channel_express"] PROJECT_NAME = local_update_setting['app_name'] PROJECT_VERSION = local_update_setting['app_version'] checknet = checkNet.CheckNetwork(PROJECT_NAME, PROJECT_VERSION) usr_fota = app_fota.new() modbus_port = modbus_rtu_serial.Serial() STATUS_LED = Pin(Pin.GPIO2, Pin.OUT, Pin.PULL_DISABLE, 0) timer0 = Timer(Timer.Timer0) timer1 = Timer(Timer.Timer1) sensors = [] lock = _thread.allocate_lock() IMEI = modem.getDevImei() SIM_STATUS = sim.getStatus() CSQ = net.csqQueryPoll() MAC = [] w5500 = None lcd_st7789v = st7789v.ST7789V(240, 240) mqtt_client = None push_topic = "topic/zhongan/" + IMEI + "/post" push_reply_topic = "topic/zhongan/" + IMEI + "/post_reply" recv_topic = "topic/zhongan/" + IMEI + "/get" recv_reply_topic = "topic/zhongan/" + IMEI + "/get_reply" upload_flag = 1 msg_template = { "id": 0, "version": PROJECT_VERSION, "deviceType": settings["device_type"], "deviceMode": settings["device_model"], "address": 0, "time": 0, "messageType": "Heart", "alarmData": [0], "equValue": [0, 0, 0, 0, 0, 0], "deviceState": "Normal" } key_status = [] page = 0 last_page = 99 current = 0 # server = ['{:0>3d}'.format(int(i)) for i in settings["server"].split('.')] edit_setting = [int(i) for i in list('{:0>5d}'.format(int(settings["port"])))] # edit_setting = [] # for i in server: # for j in i: # edit_setting.append(int(j)) # edit_setting = edit_setting + port # edit_setting.append(settings["start_addr"]) # edit_setting.append(settings["end_addr"]) keypad = KeyPad() keypad.init() def status_disp(): while True: try: if SIM_STATUS != 0: signal = net.csqQueryPoll() if signal == -1 or signal == 99: lcd_st7789v.lcd.lcd_show_jpg("/usr/sn.jpg", 0, 0) elif signal > 25: lcd_st7789v.lcd.lcd_show_jpg("/usr/ss.jpg", 0, 0) elif signal > 20: lcd_st7789v.lcd.lcd_show_jpg("/usr/sg.jpg", 0, 0) elif signal > 15: lcd_st7789v.lcd.lcd_show_jpg("/usr/sm.jpg", 0, 0) else: lcd_st7789v.lcd.lcd_show_jpg("/usr/sw.jpg", 0, 0) else: if STATUS_LED.read() == 1: lcd_st7789v.lcd.lcd_show_jpg("/usr/c.jpg", 0, 0) else: lcd_st7789v.lcd.lcd_show_jpg("/usr/nc.jpg", 0, 0) lcd_st7789v.lcd.lcd_show_jpg("/usr/logo.jpg", 170, 0) utime.sleep_ms(200) except Exception as e: print("status_disp error:", e) utime.sleep_ms(200) def data_disp(): global last_page try: if page == 0: if last_page != 0: lcd_st7789v.lcd.lcd_show_jpg("/usr/scr.jpg", 0, 48) lcd_st7789v.lcd_show_chn_str(5, 51, 24, 24, "服 务 器", black, white) lcd_st7789v.lcd_show_chn_str(5, 78, 24, 24, "端 口", black, white) # lcd_st7789v.lcd_show_chn_str(5, 105, 24, 24, "探 测 器", black, white) # lcd_st7789v.lcd_show_chn_str(5, 132, 24, 24, "开 始 地 址", black, white) # lcd_st7789v.lcd_show_chn_str(5, 159, 24, 24, "结 束 地 址", black, white) lcd_st7789v.lcd_show_ascii_str(5, 186, 16, 24, "IMEI:", black, white) # lcd_st7789v.lcd_show_ascii_str(75, 55, 8, 16, ':' + settings["server"], black, white) lcd_st7789v.lcd_show_ascii_str(75, 55, 8, 16, ':iot.csin.cloud', black, white) lcd_st7789v.lcd_show_ascii_str(50, 82, 8, 16, ':' + str(settings["port"]), black, white) # lcd_st7789v.lcd_show_ascii_str(50, 82, 8, 16, ':1883', black, white) # lcd_st7789v.lcd_show_ascii(75, 105, 16, 24, ':', black, white) # lcd_st7789v.lcd_show_chn_str(94, 105, 24, 24, "液 位", black, white) # lcd_st7789v.lcd_show_ascii_str(100, 132, 16, 24, ':' + str(settings["start_addr"]), black, white) # lcd_st7789v.lcd_show_ascii_str(100, 159, 16, 24, ':' + str(settings["end_addr"]), black, white) lcd_st7789v.lcd_show_ascii_str(0, 213, 16, 24, IMEI, black, white) last_page = 0 elif page == 1: if last_page != 1: lcd_st7789v.lcd.lcd_show_jpg("/usr/scr.jpg", 0, 48) qrcode.show('88.911.5307351272316096512/GATE' + IMEI, 6, 45, 65) last_page = 1 elif page == 2: if last_page != 2: lcd_st7789v.lcd.lcd_show_jpg("/usr/scr.jpg", 0, 48) # lcd_st7789v.lcd_show_chn_str(5, 51, 24, 24, "服 务 器", black, white) # lcd_st7789v.lcd_show_ascii(77, 51, 16, 24, ":", black, white) # lcd_st7789v.lcd_show_chn_str(48, 78, 16, 24, ".", black, white) # lcd_st7789v.lcd_show_chn_str(112, 78, 16, 24, ".", black, white) # lcd_st7789v.lcd_show_chn_str(176, 78, 16, 24, ".", black, white) lcd_st7789v.lcd_show_chn_str(5, 105, 24, 24, "端 口", black, white) lcd_st7789v.lcd_show_ascii(53, 105, 16, 24, ":", black, white) # lcd_st7789v.lcd_show_chn_str(5, 132, 24, 24, "开 始 地 址", black, white) # lcd_st7789v.lcd_show_ascii(101, 132, 16, 24, ":", black, white) # lcd_st7789v.lcd_show_chn_str(5, 159, 24, 24, "结 束 地 址", black, white) # lcd_st7789v.lcd_show_ascii(101, 159, 16, 24, ":", black, white) last_page = 2 for idx, num in enumerate(edit_setting): if idx == current: lcd_st7789v.lcd_show_ascii(53 + idx * 16, 105, 16, 24, str(num), white, black) else: lcd_st7789v.lcd_show_ascii(53 + idx * 16, 105, 16, 24, str(num), black, white) # if idx < 3: # if idx == current: # lcd_st7789v.lcd_show_ascii(idx * 16, 78, 16, 24, str(num), white, black) # else: # lcd_st7789v.lcd_show_ascii(idx * 16, 78, 16, 24, str(num), black, white) # elif idx < 6: # if idx == current: # lcd_st7789v.lcd_show_ascii(64 + (idx - 3) * 16, 78, 16, 24, str(num), white, black) # else: # lcd_st7789v.lcd_show_ascii(64 + (idx - 3) * 16, 78, 16, 24, str(num), black, white) # elif idx < 9: # if idx == current: # lcd_st7789v.lcd_show_ascii(128 + (idx - 6) * 16, 78, 16, 24, str(num), white, black) # else: # lcd_st7789v.lcd_show_ascii(128 + (idx - 6) * 16, 78, 16, 24, str(num), black, white) # elif idx < 12: # if idx == current: # lcd_st7789v.lcd_show_ascii(192 + (idx - 9) * 16, 78, 16, 24, str(num), white, black) # else: # lcd_st7789v.lcd_show_ascii(192 + (idx - 9) * 16, 78, 16, 24, str(num), black, white) # elif 12 <= idx < 17: # if idx == current: # lcd_st7789v.lcd_show_ascii(53 + (idx - 11) * 16, 105, 16, 24, str(num), white, black) # else: # lcd_st7789v.lcd_show_ascii(53 + (idx - 11) * 16, 105, 16, 24, str(num), black, white) # elif idx == 17: # if idx == current: # lcd_st7789v.lcd_show_ascii(117, 132, 16, 24, str(num), white, black) # else: # lcd_st7789v.lcd_show_ascii(117, 132, 16, 24, str(num), black, white) # elif idx == 18: # if idx == current: # lcd_st7789v.lcd_show_ascii(117, 159, 16, 24, str(num), white, black) # else: # lcd_st7789v.lcd_show_ascii(117, 159, 16, 24, str(num), black, white) except Exception as e: print("data_disp error:", e) def long_press_check(t): global timer1 global page global key_status t.stop() if key_status == [1, 1, 2]: print("长按检测成功") page = 2 data_disp() def keyboard_cb(keys): global page global key_status global current key_status = keys print("按键状态:", key_status) if keys == [1, 1, 2]: timer0.start(period=3000, mode=timer0.ONE_SHOT, callback=long_press_check) if keys[0] == 0: timer0.stop() if keys[1] == 2 and keys[2] == 1: if page == 2: if current < len(edit_setting) - 1: current += 1 data_disp() # elif page != 1: # page = 1 # data_disp() elif keys[1] == 3 and keys[2] == 2: if page == 2: if current > 0: current -= 1 data_disp() elif page != 0: page = 0 data_disp() elif keys[1] == 1 and keys[2] == 1: if page == 2: if current == 17 or current == 18: if edit_setting[current] > 1: edit_setting[current] -= 1 data_disp() elif edit_setting[current] > 0: edit_setting[current] -= 1 data_disp() elif keys[1] == 3 and keys[2] == 1: if page == 2 and edit_setting[current] < 9: edit_setting[current] += 1 data_disp() elif keys[1] == 2 and keys[2] == 2: page = 0 data_disp() elif keys[1] == 1 and keys[2] == 2: if page == 2: # settings["server"] = str(int(str(edit_setting[0:3]).replace('[', '').replace(']', '').replace(',', '').replace(' ', ''))) + '.' \ # + str(int(str(edit_setting[3:6]).replace('[', '').replace(']', '').replace(',', '').replace(' ', ''))) + '.' \ # + str(int(str(edit_setting[6:9]).replace('[', '').replace(']', '').replace(',', '').replace(' ', ''))) + '.' \ # + str(int(str(edit_setting[9:12]).replace('[', '').replace(']', '').replace(',', '').replace(' ', ''))) settings["port"] = str(int(str(edit_setting).replace('[', '').replace(']', '').replace(',', '').replace(' ', ''))) # settings["start_addr"] = edit_setting[17] # settings["end_addr"] = edit_setting[18] with open("/usr/setting.json", mode="w") as fw: ujson.dump(settings, fw) fw.close() keypad.set_callback(keyboard_cb) def net_init(): global w5500 global network_config try: MAC = [IMEI[3:5], IMEI[5:7], IMEI[7:9], IMEI[9:11], IMEI[11:13], IMEI[13:15]] mac = [int(i) for i in MAC] if (mac[0] % 2) == 1: mac[0] = mac[0] + 1 w5500 = ethernet.W5500(bytes(mac), '192.168.1.100', '255.255.255.0', '192.168.1.0', 0, Pin.GPIO11, Pin.GPIO18, Pin.GPIO17, 0) # w5500 = ethernet.W5500(bytes(mac)) #无显示使用 if network_config['DHCP'] == 0: w5500.set_addr(network_config['IP'], network_config['MASK'], network_config['GATEWAY']) w5500.set_dns(network_config['DNS1'], network_config['DNS2']) w5500.set_up() print('net init ok!') else: w5500.dhcp() utime.sleep(5) print('dhcp: ', str(w5500.ipconfig())) except Exception as e: print('net init error: ', e) utime.sleep(3) Power.powerRestart() def th_update(): global local_update_setting while True: print("开始检查更新...") try: local_time = utime.localtime() if local_time[3] == 00 and local_time[4] == int(IMEI[-2:]) % 60: response = request.get(url=local_update_setting['update_url'] + 'update_setting.json') server_update_setting = response.json() if local_update_setting['app_name'] == server_update_setting['app_name'] and local_update_setting['app_version'] == \ server_update_setting['app_version']: print("已是最新版本,无需升级") elif local_update_setting['app_name'] == server_update_setting['app_name'] and local_update_setting['app_version'] < \ server_update_setting['app_version']: print("检测到新版本:\r app name:\t{},\r app version:\t{},\r file list:\t{}\r开始升级...".format( server_update_setting['app_name'], server_update_setting['app_version'], server_update_setting['file_list'])) file_list = server_update_setting['file_list'].split(',') download_list = [] for file in file_list: dict_file = {'url': (local_update_setting['update_url'] + file).replace(" ", ''), 'file_name': ('/usr/' + file).replace(" ", '')} download_list.append(dict_file) print("下载文件列表: ", download_list) result = usr_fota.bulk_download(download_list) print("更新文件下载失败列表: ", result) local_update_setting = server_update_setting uos.remove("/usr/update_setting.json") with open("/usr/update_setting.json", mode="w") as fw: ujson.dump(local_update_setting, fw) fw.close() usr_fota.set_update_flag() print("更新下载完成,将在3秒后重启完成更新。") utime.sleep(3) Power.powerRestart() except Exception as e: print("更新失败,error:", e) utime.sleep(30) def init(): global sensors global mqtt_client global recv_topic global modbus_port ntptime.settime(timezone=8) mqtt_client = MQTTClient("CarPortGateway/" + IMEI, settings["server"], settings["port"], user=settings["username"], password=settings["password"], keepalive=120) mqtt_client.set_callback(sub_cb) mqtt_client.connect(True) mqtt_client.subscribe(recv_topic) mqtt_client.subscribe(push_reply_topic) print("连接至:{}, 订阅主题:{} 、 {}".format(settings["server"], recv_topic, push_reply_topic)) print("Mqtt 客户端初始化成功!") STATUS_LED.write(1) modbus_port._uart.set_callback(th_polling) for i in range(settings['start_addr'], settings['end_addr'] + 1): try: sensor = Sensor(i) sensors.append(sensor) except Exception as e: print("传感器初始化失败: ", e) def th_polling(para): global channel_gather global push_topic global mqtt_client global msg_template global timer1 global upload_flag global modbus_port sensor = sensors[0] try: serial_data = modbus_port._uart.read(modbus_port._uart.any()) if len(serial_data) > 0: print("modbus接收到数据: ", serial_data) resp_crc = serial_data[-2:] expected_crc = modbus_port._calculate_crc16(serial_data[:-2]) if resp_crc == expected_crc: print("modbus数据校验成功") reg_value = [serial_data[7] << 8 | serial_data[8], serial_data[9] << 8 | serial_data[10]] status = reg_value[0] sensor.pump_status = status & 0b0000000000000001 sensor.alarm_button = (status & 0b0000000000000010) >> 1 sensor.camera_fire = (status & 0b0000000000000100) >> 2 sensor.camera_alarm = (status & 0b0000000000001000) >> 3 sensor.forced_cut = (status & 0b0000000000010000) >> 4 sensor.fault_report = (status & 0b0000000000100000) >> 5 sensor.manual_mode = (status & 0b0000000001000000) >> 6 sensor.press_level = reg_value[1] / 10 sensor.online_state = 0 sensor.channel_state[0] = sensor.pump_status if sensor.pump_status == 0 else 4 sensor.channel_state[1] = sensor.alarm_button sensor.channel_state[2] = sensor.camera_fire if sensor.camera_fire == 0 else 4 sensor.channel_state[3] = sensor.camera_alarm sensor.channel_state[4] = sensor.forced_cut if sensor.forced_cut == 0 else 4 sensor.channel_state[5] = sensor.fault_report if sensor.fault_report == 0 else 2 sensor.channel_state[6] = 0 sensor.channel_state[7] = 0 sensor.channel_state[8] = 0 sensor.channel_state[9] = 0 sensor.channel_state[10] = 0 if sensor.channel_state != sensor.channel_state_last: print("设备{}状态变化上报...".format(sensor.device_id)) if msg_template["id"] < 65535: msg_template["id"] += 1 else: msg_template["id"] = 0 msg_template["address"] = sensor.device_id msg_template["time"] = utime.mktime(utime.localtime()) msg_template["equValue"] = [ sensor.pump_status, sensor.alarm_button, sensor.camera_fire, sensor.camera_alarm, sensor.forced_cut, sensor.fault_report, sensor.manual_mode, sensor.press_level, settings["upload_interval"]/60, net.csqQueryPoll(), sensor.online_state ] if 4 in sensor.channel_state[0:5]: msg_template["alarmData"] = sensor.channel_state msg_template["messageType"] = "Alarm" msg_template["deviceState"] = "Fire" elif 1 in sensor.channel_state[0:5]: msg_template["alarmData"] = sensor.channel_state msg_template["messageType"] = "Alarm" msg_template["deviceState"] = "Alarm" elif sensor.channel_state[5] == 2: msg_template["alarmData"] = sensor.channel_state msg_template["messageType"] = "Failure" msg_template["deviceState"] = "Failure" else: msg_template["alarmData"] = sensor.channel_state msg_template["messageType"] = "Heart" msg_template["deviceState"] = "Normal" sensor.channel_state_last = sensor.channel_state.copy() mqtt_client.publish(push_topic, ujson.dumps(msg_template)) # upload_flag = 1 # timer1.start(period=10000, mode=timer1.ONE_SHOT, callback=push_cancel) # while upload_flag: # utime.sleep_ms(50) else: print("modbus数据校验失败") except Exception as e: print("解析探测器时出现异常: ", e, "地址: ", sensor.device_id) sensor.online_state = 1 sensors[0] = sensor def push_cancel(t): global upload_flag upload_flag = 0 t.stop() def sub_cb(topic, msg): global modbus_port global local_update_setting global settings global push_topic global push_reply_topic global recv_topic global recv_reply_topic global upload_flag global timer1 msg_reply = { "id": 0, "version": 0, "address": 0, "message": "success", "channelNum": [], "equValue": [] } try: print("received a msg: \r") if topic.decode() == recv_topic: try: recv_data = ujson.loads(msg.decode()) print(recv_data) for i, channel_num in enumerate(recv_data["channelNum"]): if channel_num == 10: settings["upload_interval"] = float(recv_data["equValue"][i]) * 60 print("参数下发:", settings) with open("/usr/setting.json", mode="w") as fw: ujson.dump(settings, fw) fw.close() msg_reply["id"] = recv_data["id"] msg_reply["version"] = recv_data["version"] msg_reply["address"] = recv_data["address"] msg_reply["message"] = "success" msg_reply["channelNum"] = recv_data["channelNum"] msg_reply["equValue"] = recv_data["equValue"] mqtt_client.publish(recv_reply_topic, ujson.dumps(msg_reply)) except Exception as e: print("参数下发处理异常: ", e) msg_reply["id"] = recv_data["id"] msg_reply["version"] = recv_data["version"] msg_reply["address"] = recv_data["address"] msg_reply["message"] = "failure" msg_reply["channelNum"] = recv_data["channelNum"] msg_reply["equValue"] = recv_data["equValue"] mqtt_client.publish(recv_reply_topic, ujson.dumps(msg_reply)) elif topic.decode() == push_reply_topic: reply_data = ujson.loads(msg.decode()) print(reply_data) timer1.stop() upload_flag = 0 except Exception as e: print("mqtt消息处理出现异常: ", e) def th_push(): #TODO: 去掉液位 global sensors global mqtt_client global push_topic global msg_template while True: for s in sensors: if msg_template["id"] < 65535: msg_template["id"] += 1 else: msg_template["id"] = 0 msg_template["address"] = s.device_id msg_template["time"] = utime.mktime(utime.localtime()) msg_template["equValue"] = [ s.pump_status, s.alarm_button, s.camera_fire, s.camera_alarm, s.forced_cut, s.fault_report, s.manual_mode, s.press_level, settings["upload_interval"]/60, net.csqQueryPoll(), s.online_state ] msg_template["alarmData"] = s.channel_state msg_template["messageType"] = "Heart" if 4 in s.channel_state[0:5]: msg_template["deviceState"] = "Fire" elif 1 in s.channel_state[0:5]: msg_template["deviceState"] = "Alarm" elif s.channel_state[5] == 2: msg_template["deviceState"] = "Failure" else: msg_template["deviceState"] = "Normal" mqtt_client.publish(push_topic, ujson.dumps(msg_template)) print("push thread is running.\r") utime.sleep(settings['upload_interval']) def th_recv(): global mqtt_client while True: try: mqtt_client.wait_msg() # 阻塞函数,监听消息 except Exception as e: print("mqtt消息接收线程出现异常:{}".format(e), type(e)) if e.args[0] == 0: pass else: print("mqtt消息接收线程出现异常:{}导致系统重启。".format(e)) Power.powerRestart() def update(): global local_update_setting try: response = request.get(url=local_update_setting['update_url'] + 'update_setting.json') server_update_setting = response.json() if local_update_setting['app_name'] == server_update_setting['app_name'] and local_update_setting['app_version'] == \ server_update_setting['app_version']: print("已是最新版本,无需升级") elif local_update_setting['app_name'] == server_update_setting['app_name'] and local_update_setting['app_version'] < \ server_update_setting['app_version']: print("检测到新版本:\r app name:\t{},\r app version:\t{},\r file list:\t{}\r开始升级...".format( server_update_setting['app_name'], server_update_setting['app_version'], server_update_setting['file_list'])) file_list = server_update_setting['file_list'].split(',') download_list = [] for file in file_list: dict_file = { 'url': (local_update_setting['update_url'] + file).replace(" ", ''), 'file_name': ('/usr/' + file).replace(" ", '')} download_list.append(dict_file) print("下载文件列表: ", download_list) result = usr_fota.bulk_download(download_list) print("更新文件下载失败列表: ", result) local_update_setting = server_update_setting uos.remove("/usr/update_setting.json") with open("/usr/update_setting.json", mode="w") as fw: ujson.dump(local_update_setting, fw) fw.close() usr_fota.set_update_flag() print("更新下载完成,将在3秒后重启完成更新。") utime.sleep(3) Power.powerRestart() except Exception as e: print("更新失败,error:", e) if __name__ == '__main__': try: utime.sleep(5) checknet.poweron_print_once() lcd_st7789v.lcd_show_clear(white) _thread.start_new_thread(status_disp, ()) data_disp() SIM_STATUS = sim.getStatus() if SIM_STATUS == 0: print("使用以太网") net_init() update() init() _thread.start_new_thread(th_recv, ()) _thread.start_new_thread(th_update, ()) while True: utime.sleep(2) th_push() else: stagecode, subcode = checknet.wait_network_connected(180) if stagecode == 3 and subcode == 1: print('网络连接成功!\r') update() print("正在初始化...") init() _thread.start_new_thread(th_recv, ()) _thread.start_new_thread(th_update, ()) while True: utime.sleep(2) th_push() else: print('网络连接失败! stagecode = {}, subcode = {}\r'.format(stagecode, subcode)) print("自动重启以尝试重新拨号...") utime.sleep_ms(50) Power.powerRestart() except Exception as e: print("主线程出现异常: {},导致模组重启 ".format(e)) Power.powerRestart()