udp_server.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. import asyncio
  2. import datetime
  3. from lora_recv_frame import LoraRecvFrame
  4. from frame.device_commu import recv_device_data
  5. #启动UDP的服务器(异步函数,只能用await调用它)
  6. async def start_udp_server(host:str,port: int):
  7. print(f"UDP server listening on {host}:{port}")
  8. #获取当前正在运行的异步事件循环(EventLoop)对象
  9. loop = asyncio.get_running_loop()
  10. transport, protocol = await loop.create_datagram_endpoint(
  11. lambda : UDPProtocol(), # 下面定义的类
  12. local_addr=(host,port)
  13. )
  14. # 防止transport被GC回收
  15. start_udp_server._transport = transport
  16. try:
  17. await asyncio.Event().wait()
  18. finally:
  19. transport.close()
  20. #继承python标准的DatagramProtocol类的自定义协议处理器
  21. class UDPProtocol(asyncio.DatagramProtocol):
  22. def __init__(self):
  23. self.transport = None #保存transport
  24. def connection_made(self, transport):
  25. self.transport = transport
  26. print("UDP connection ready")
  27. def datagram_received(self, data, addr):
  28. #print(f"收到原始数据: {data},长度: {len(data)}")
  29. #print(f"Received UDP from {addr}: {data.hex()}")
  30. hex_data = ' '.join(f'{b:02X}' for b in data)
  31. message = f"UDP接收 {addr}: {hex_data},长度:{len(data)}"
  32. #print(f"UDP接收 {addr}: {' '.join(f'{b:02X}' for b in data)}")
  33. print(message)
  34. notify_clients(message)
  35. #接收到数据
  36. recv_device_data(data,self.transport,addr)
  37. # recvFrame = LoraRecvFrame(data)
  38. # if not recvFrame.is_valid():
  39. # message = f"无效帧: {recvFrame.get_reason()}"
  40. # print(message)
  41. # notify_clients(message)
  42. # return
  43. #
  44. # info = recvFrame.to_dict()
  45. # message = f"有效帧,序号: {recvFrame.serial_number}"
  46. # print(message)
  47. # notify_clients(message)
  48. #
  49. #
  50. # # 检查长度以避免越界
  51. # if len(data) >= 9:
  52. # # 提取第7、8、9字节(注意索引从0开始)
  53. # prefix = data[6:9] # 取 data[6], data[7], data[8]
  54. # else:
  55. # prefix = b'\x00\x00\x00' # 不足9字节就用0填充
  56. #
  57. # # 构造 response:前三字节是 prefix,后面是原始 data
  58. # #response = prefix + data
  59. # # 获取当前时间(年-2000, 月, 日, 时, 分, 秒)
  60. # now = datetime.datetime.now()
  61. # year = now.year - 2000
  62. # month = now.month
  63. # day = now.day
  64. # hour = now.hour
  65. # minute = now.minute
  66. # second = now.second
  67. #
  68. # # 打包成6字节时间
  69. # time_bytes = bytes([year, month, day, hour, minute, second])
  70. # # 构造最终 response
  71. # response = prefix + time_bytes + data
  72. #
  73. # self.transport.sendto(response,addr)
  74. # #print(f"Send response to {addr}: {response.hex()}")
  75. # message = f"UDP应答 {addr}: {' '.join(f'{b:02X}'for b in response)}"
  76. # print(message)
  77. # notify_clients(message)
  78. #引入这个列表用户存储前端连接
  79. web_clients = set()
  80. def notify_clients(msg:str):
  81. for ws in list(web_clients):
  82. try:
  83. asyncio.create_task(ws.send_text(msg))
  84. except Exception:
  85. web_clients.discard(ws)
  86. #测试程序是否可用
  87. async def main():
  88. print("UDP server listening on 0.0.0.0:9001")
  89. loop = asyncio.get_running_loop()
  90. transport, protocol = await loop.create_datagram_endpoint(
  91. lambda: UDPProtocol(),
  92. local_addr=("0.0.0.0", 9001)
  93. )
  94. try:
  95. await asyncio.Event().wait() # 一直阻塞
  96. finally:
  97. transport.close()
  98. if __name__ == "__main__":
  99. asyncio.run(main())