udp_jpeg_receiver.py 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236
  1. """
  2. UDP透传JPEG图片接收器
  3. - 透传模式:直接传输JPEG数据,无协议封装
  4. - 自动重组:检测FFD8开始,FFD9结束
  5. - 底层分包:UDP自动分包,直接拼接即可
  6. """
  7. import socket
  8. import threading
  9. import queue
  10. import cv2
  11. import numpy as np
  12. import time
  13. class UDPJPEGReceiver:
  14. """UDP JPEG图片接收器 - 透传模式"""
  15. # JPEG标记
  16. JPEG_START = b'\xFF\xD8' # JPEG文件头
  17. JPEG_END = b'\xFF\xD9' # JPEG文件尾
  18. def __init__(self, host='0.0.0.0', port=5000, timeout=2.0):
  19. """
  20. Args:
  21. host: 监听地址,'0.0.0.0'表示所有接口
  22. port: UDP端口号
  23. timeout: 图片接收超时时间(秒),超时后重置缓冲区
  24. """
  25. self.host = host
  26. self.port = port
  27. self.timeout = timeout
  28. # UDP socket
  29. self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  30. try:
  31. self.socket.bind((host, port))
  32. except OSError as e:
  33. if e.winerror == 10049: # Windows错误:地址无效
  34. raise OSError(
  35. f"Cannot bind to {host}:{port}. "
  36. f"This address is not a local address. "
  37. f"Please use '0.0.0.0' to listen on all interfaces, "
  38. f"or use your local IP address (e.g., '192.168.x.x')."
  39. ) from e
  40. else:
  41. raise
  42. self.socket.settimeout(0.1) # 100ms超时,用于定期检查
  43. # 接收状态
  44. self.buffer = bytearray() # 当前图片数据缓冲区
  45. self.receiving = False # 是否正在接收图片
  46. self.start_time = 0 # 接收开始时间
  47. # 完整图片队列(只保留最新2张)
  48. self.image_queue = queue.Queue(maxsize=2)
  49. # 线程控制
  50. self.running = False
  51. self.receive_thread = None
  52. def _receive_loop(self):
  53. """后台接收循环"""
  54. while self.running:
  55. try:
  56. # 接收UDP包
  57. data, addr = self.socket.recvfrom(65507)
  58. # 处理数据
  59. self._process_data(data)
  60. # 检查超时
  61. if self.receiving and (time.time() - self.start_time) > self.timeout:
  62. print(f"[UDP] Timeout: Clearing incomplete image buffer ({len(self.buffer)} bytes)")
  63. self._reset()
  64. except socket.timeout:
  65. # 定期检查超时
  66. if self.receiving and (time.time() - self.start_time) > self.timeout:
  67. print(f"[UDP] Timeout: Clearing incomplete image buffer ({len(self.buffer)} bytes)")
  68. self._reset()
  69. continue
  70. except Exception as e:
  71. print(f"[UDP] Receive error: {e}")
  72. break
  73. def _process_data(self, data: bytes):
  74. """处理接收到的数据包"""
  75. # 检查是否包含JPEG开始标记(FFD8)
  76. if self.JPEG_START in data:
  77. start_idx = data.find(self.JPEG_START)
  78. if not self.receiving:
  79. # 开始接收新图片
  80. self.receiving = True
  81. self.start_time = time.time()
  82. self.buffer = bytearray()
  83. # 从FFD8开始的数据
  84. self.buffer.extend(data[start_idx:])
  85. # 静音起始日志
  86. # print(f"[UDP] JPEG start detected (FFD8 at offset {start_idx})")
  87. return
  88. # 如果正在接收,追加数据
  89. if self.receiving:
  90. self.buffer.extend(data)
  91. # 检查是否包含JPEG结束标记(FFD9)
  92. if self.JPEG_END in self.buffer:
  93. end_idx = self.buffer.find(self.JPEG_END)
  94. # 提取完整图片(包含FFD9)
  95. jpeg_data = bytes(self.buffer[:end_idx + 2])
  96. # 验证JPEG完整性
  97. if self._validate_jpeg(jpeg_data):
  98. # 尝试解码验证(更严格的验证)
  99. test_array = np.frombuffer(jpeg_data, dtype=np.uint8)
  100. test_frame = cv2.imdecode(test_array, cv2.IMREAD_COLOR)
  101. if test_frame is not None:
  102. # 解码成功,放入队列
  103. if self.image_queue.full():
  104. try:
  105. self.image_queue.get_nowait()
  106. except queue.Empty:
  107. pass
  108. self.image_queue.put(jpeg_data)
  109. # print(f"[UDP] JPEG complete: {len(jpeg_data)} bytes")
  110. else:
  111. # JPEG数据损坏,可能是UDP包丢失(静音日志)
  112. # print(f"[UDP] JPEG validation failed: cannot decode (size: {len(jpeg_data)} bytes)")
  113. # 不输出"Corrupt JPEG data"消息,因为cv2.imdecode已经处理了
  114. pass
  115. else:
  116. # 无效 JPEG(头尾标记异常,静音日志)
  117. # print(f"[UDP] Invalid JPEG: header/trailer check failed (size: {len(jpeg_data)} bytes)")
  118. pass
  119. # 重置缓冲区
  120. self._reset()
  121. def _validate_jpeg(self, data: bytes) -> bool:
  122. """验证JPEG数据完整性"""
  123. if len(data) < 4:
  124. return False
  125. # 检查开始标记
  126. if data[:2] != self.JPEG_START:
  127. return False
  128. # 检查结束标记
  129. if data[-2:] != self.JPEG_END:
  130. return False
  131. return True
  132. def _reset(self):
  133. """重置接收缓冲区"""
  134. self.buffer = bytearray()
  135. self.receiving = False
  136. self.start_time = 0
  137. def start(self):
  138. """启动接收线程"""
  139. self.running = True
  140. self.receive_thread = threading.Thread(
  141. target=self._receive_loop,
  142. daemon=True,
  143. name="UDPJPEGReceiver"
  144. )
  145. self.receive_thread.start()
  146. print(f"[UDP] Receiver started on {self.host}:{self.port}")
  147. def stop(self):
  148. """停止接收"""
  149. self.running = False
  150. if self.receive_thread:
  151. self.receive_thread.join(timeout=1.0)
  152. self.socket.close()
  153. print("[UDP] Receiver stopped")
  154. def get_image(self, timeout=0.1):
  155. """
  156. 获取一张完整图片
  157. Args:
  158. timeout: 等待超时时间(秒)
  159. Returns:
  160. numpy.ndarray: 解码后的图像(BGR格式),如果超时返回None
  161. """
  162. try:
  163. # 从队列获取完整JPEG数据
  164. jpeg_data = self.image_queue.get(timeout=timeout)
  165. # 解码JPEG
  166. image_array = np.frombuffer(jpeg_data, dtype=np.uint8)
  167. frame = cv2.imdecode(image_array, cv2.IMREAD_COLOR)
  168. if frame is None:
  169. print(f"[UDP] Failed to decode JPEG (size: {len(jpeg_data)} bytes)")
  170. # 尝试保存损坏的数据用于调试
  171. # with open('corrupt_jpeg.bin', 'wb') as f:
  172. # f.write(jpeg_data)
  173. return None
  174. return frame
  175. except queue.Empty:
  176. return None
  177. except Exception as e:
  178. print(f"[UDP] Error decoding JPEG: {e}")
  179. return None
  180. if __name__ == "__main__":
  181. # 测试代码
  182. receiver = UDPJPEGReceiver(host='0.0.0.0', port=5000)
  183. receiver.start()
  184. try:
  185. while True:
  186. frame = receiver.get_image(timeout=1.0)
  187. if frame is not None:
  188. print(f"Received image: {frame.shape}")
  189. cv2.imshow('UDP Image', frame)
  190. if cv2.waitKey(1) & 0xFF == ord('q'):
  191. break
  192. except KeyboardInterrupt:
  193. pass
  194. finally:
  195. receiver.stop()
  196. cv2.destroyAllWindows()