import asyncio import datetime from lora_recv_frame import LoraRecvFrame from frame.device_commu import recv_device_data #启动UDP的服务器(异步函数,只能用await调用它) async def start_udp_server(host:str,port: int): print(f"UDP server listening on {host}:{port}") #获取当前正在运行的异步事件循环(EventLoop)对象 loop = asyncio.get_running_loop() transport, protocol = await loop.create_datagram_endpoint( lambda : UDPProtocol(), # 下面定义的类 local_addr=(host,port) ) # 防止transport被GC回收 start_udp_server._transport = transport try: await asyncio.Event().wait() finally: transport.close() #继承python标准的DatagramProtocol类的自定义协议处理器 class UDPProtocol(asyncio.DatagramProtocol): def __init__(self): self.transport = None #保存transport def connection_made(self, transport): self.transport = transport print("UDP connection ready") def datagram_received(self, data, addr): #print(f"收到原始数据: {data},长度: {len(data)}") #print(f"Received UDP from {addr}: {data.hex()}") hex_data = ' '.join(f'{b:02X}' for b in data) message = f"UDP接收 {addr}: {hex_data},长度:{len(data)}" #print(f"UDP接收 {addr}: {' '.join(f'{b:02X}' for b in data)}") print(message) notify_clients(message) #接收到数据 recv_device_data(data,self.transport,addr) # recvFrame = LoraRecvFrame(data) # if not recvFrame.is_valid(): # message = f"无效帧: {recvFrame.get_reason()}" # print(message) # notify_clients(message) # return # # info = recvFrame.to_dict() # message = f"有效帧,序号: {recvFrame.serial_number}" # print(message) # notify_clients(message) # # # # 检查长度以避免越界 # if len(data) >= 9: # # 提取第7、8、9字节(注意索引从0开始) # prefix = data[6:9] # 取 data[6], data[7], data[8] # else: # prefix = b'\x00\x00\x00' # 不足9字节就用0填充 # # # 构造 response:前三字节是 prefix,后面是原始 data # #response = prefix + data # # 获取当前时间(年-2000, 月, 日, 时, 分, 秒) # now = datetime.datetime.now() # year = now.year - 2000 # month = now.month # day = now.day # hour = now.hour # minute = now.minute # second = now.second # # # 打包成6字节时间 # time_bytes = bytes([year, month, day, hour, minute, second]) # # 构造最终 response # response = prefix + time_bytes + data # # self.transport.sendto(response,addr) # #print(f"Send response to {addr}: {response.hex()}") # message = f"UDP应答 {addr}: {' '.join(f'{b:02X}'for b in response)}" # print(message) # notify_clients(message) #引入这个列表用户存储前端连接 web_clients = set() def notify_clients(msg:str): for ws in list(web_clients): try: asyncio.create_task(ws.send_text(msg)) except Exception: web_clients.discard(ws) #测试程序是否可用 async def main(): print("UDP server listening on 0.0.0.0:9001") loop = asyncio.get_running_loop() transport, protocol = await loop.create_datagram_endpoint( lambda: UDPProtocol(), local_addr=("0.0.0.0", 9001) ) try: await asyncio.Event().wait() # 一直阻塞 finally: transport.close() if __name__ == "__main__": asyncio.run(main())