123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116 |
- import asyncio
- import datetime
- from lora_recv_frame import LoraRecvFrame
- from frame.device_commu import recv_device_data
- #启动UDP的服务器(异步函数,只能用await调用它)
- async def start_udp_server(host:str,port: int):
- print(f"UDP server listening on {host}:{port}")
- #获取当前正在运行的异步事件循环(EventLoop)对象
- loop = asyncio.get_running_loop()
- transport, protocol = await loop.create_datagram_endpoint(
- lambda : UDPProtocol(), # 下面定义的类
- local_addr=(host,port)
- )
- # 防止transport被GC回收
- start_udp_server._transport = transport
- try:
- await asyncio.Event().wait()
- finally:
- transport.close()
- #继承python标准的DatagramProtocol类的自定义协议处理器
- class UDPProtocol(asyncio.DatagramProtocol):
- def __init__(self):
- self.transport = None #保存transport
- def connection_made(self, transport):
- self.transport = transport
- print("UDP connection ready")
- def datagram_received(self, data, addr):
- #print(f"收到原始数据: {data},长度: {len(data)}")
- #print(f"Received UDP from {addr}: {data.hex()}")
- hex_data = ' '.join(f'{b:02X}' for b in data)
- message = f"UDP接收 {addr}: {hex_data},长度:{len(data)}"
- #print(f"UDP接收 {addr}: {' '.join(f'{b:02X}' for b in data)}")
- print(message)
- notify_clients(message)
- #接收到数据
- recv_device_data(data,self.transport,addr)
- # recvFrame = LoraRecvFrame(data)
- # if not recvFrame.is_valid():
- # message = f"无效帧: {recvFrame.get_reason()}"
- # print(message)
- # notify_clients(message)
- # return
- #
- # info = recvFrame.to_dict()
- # message = f"有效帧,序号: {recvFrame.serial_number}"
- # print(message)
- # notify_clients(message)
- #
- #
- # # 检查长度以避免越界
- # if len(data) >= 9:
- # # 提取第7、8、9字节(注意索引从0开始)
- # prefix = data[6:9] # 取 data[6], data[7], data[8]
- # else:
- # prefix = b'\x00\x00\x00' # 不足9字节就用0填充
- #
- # # 构造 response:前三字节是 prefix,后面是原始 data
- # #response = prefix + data
- # # 获取当前时间(年-2000, 月, 日, 时, 分, 秒)
- # now = datetime.datetime.now()
- # year = now.year - 2000
- # month = now.month
- # day = now.day
- # hour = now.hour
- # minute = now.minute
- # second = now.second
- #
- # # 打包成6字节时间
- # time_bytes = bytes([year, month, day, hour, minute, second])
- # # 构造最终 response
- # response = prefix + time_bytes + data
- #
- # self.transport.sendto(response,addr)
- # #print(f"Send response to {addr}: {response.hex()}")
- # message = f"UDP应答 {addr}: {' '.join(f'{b:02X}'for b in response)}"
- # print(message)
- # notify_clients(message)
- #引入这个列表用户存储前端连接
- web_clients = set()
- def notify_clients(msg:str):
- for ws in list(web_clients):
- try:
- asyncio.create_task(ws.send_text(msg))
- except Exception:
- web_clients.discard(ws)
- #测试程序是否可用
- async def main():
- print("UDP server listening on 0.0.0.0:9001")
- loop = asyncio.get_running_loop()
- transport, protocol = await loop.create_datagram_endpoint(
- lambda: UDPProtocol(),
- local_addr=("0.0.0.0", 9001)
- )
- try:
- await asyncio.Event().wait() # 一直阻塞
- finally:
- transport.close()
- if __name__ == "__main__":
- asyncio.run(main())
|