| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236 |
- """
- UDP透传JPEG图片接收器
- - 透传模式:直接传输JPEG数据,无协议封装
- - 自动重组:检测FFD8开始,FFD9结束
- - 底层分包:UDP自动分包,直接拼接即可
- """
- import socket
- import threading
- import queue
- import cv2
- import numpy as np
- import time
- class UDPJPEGReceiver:
- """UDP JPEG图片接收器 - 透传模式"""
-
- # JPEG标记
- JPEG_START = b'\xFF\xD8' # JPEG文件头
- JPEG_END = b'\xFF\xD9' # JPEG文件尾
-
- def __init__(self, host='0.0.0.0', port=5000, timeout=2.0):
- """
- Args:
- host: 监听地址,'0.0.0.0'表示所有接口
- port: UDP端口号
- timeout: 图片接收超时时间(秒),超时后重置缓冲区
- """
- self.host = host
- self.port = port
- self.timeout = timeout
-
- # UDP socket
- self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- try:
- self.socket.bind((host, port))
- except OSError as e:
- if e.winerror == 10049: # Windows错误:地址无效
- raise OSError(
- f"Cannot bind to {host}:{port}. "
- f"This address is not a local address. "
- f"Please use '0.0.0.0' to listen on all interfaces, "
- f"or use your local IP address (e.g., '192.168.x.x')."
- ) from e
- else:
- raise
- self.socket.settimeout(0.1) # 100ms超时,用于定期检查
-
- # 接收状态
- self.buffer = bytearray() # 当前图片数据缓冲区
- self.receiving = False # 是否正在接收图片
- self.start_time = 0 # 接收开始时间
-
- # 完整图片队列(只保留最新2张)
- self.image_queue = queue.Queue(maxsize=2)
-
- # 线程控制
- self.running = False
- self.receive_thread = None
-
- def _receive_loop(self):
- """后台接收循环"""
- while self.running:
- try:
- # 接收UDP包
- data, addr = self.socket.recvfrom(65507)
-
- # 处理数据
- self._process_data(data)
-
- # 检查超时
- if self.receiving and (time.time() - self.start_time) > self.timeout:
- print(f"[UDP] Timeout: Clearing incomplete image buffer ({len(self.buffer)} bytes)")
- self._reset()
-
- except socket.timeout:
- # 定期检查超时
- if self.receiving and (time.time() - self.start_time) > self.timeout:
- print(f"[UDP] Timeout: Clearing incomplete image buffer ({len(self.buffer)} bytes)")
- self._reset()
- continue
- except Exception as e:
- print(f"[UDP] Receive error: {e}")
- break
-
- def _process_data(self, data: bytes):
- """处理接收到的数据包"""
- # 检查是否包含JPEG开始标记(FFD8)
- if self.JPEG_START in data:
- start_idx = data.find(self.JPEG_START)
-
- if not self.receiving:
- # 开始接收新图片
- self.receiving = True
- self.start_time = time.time()
- self.buffer = bytearray()
- # 从FFD8开始的数据
- self.buffer.extend(data[start_idx:])
- # 静音起始日志
- # print(f"[UDP] JPEG start detected (FFD8 at offset {start_idx})")
- return
-
- # 如果正在接收,追加数据
- if self.receiving:
- self.buffer.extend(data)
-
- # 检查是否包含JPEG结束标记(FFD9)
- if self.JPEG_END in self.buffer:
- end_idx = self.buffer.find(self.JPEG_END)
-
- # 提取完整图片(包含FFD9)
- jpeg_data = bytes(self.buffer[:end_idx + 2])
-
- # 验证JPEG完整性
- if self._validate_jpeg(jpeg_data):
- # 尝试解码验证(更严格的验证)
- test_array = np.frombuffer(jpeg_data, dtype=np.uint8)
- test_frame = cv2.imdecode(test_array, cv2.IMREAD_COLOR)
-
- if test_frame is not None:
- # 解码成功,放入队列
- if self.image_queue.full():
- try:
- self.image_queue.get_nowait()
- except queue.Empty:
- pass
-
- self.image_queue.put(jpeg_data)
- # print(f"[UDP] JPEG complete: {len(jpeg_data)} bytes")
- else:
- # JPEG数据损坏,可能是UDP包丢失(静音日志)
- # print(f"[UDP] JPEG validation failed: cannot decode (size: {len(jpeg_data)} bytes)")
- # 不输出"Corrupt JPEG data"消息,因为cv2.imdecode已经处理了
- pass
- else:
- # 无效 JPEG(头尾标记异常,静音日志)
- # print(f"[UDP] Invalid JPEG: header/trailer check failed (size: {len(jpeg_data)} bytes)")
- pass
-
- # 重置缓冲区
- self._reset()
-
- def _validate_jpeg(self, data: bytes) -> bool:
- """验证JPEG数据完整性"""
- if len(data) < 4:
- return False
-
- # 检查开始标记
- if data[:2] != self.JPEG_START:
- return False
-
- # 检查结束标记
- if data[-2:] != self.JPEG_END:
- return False
-
- return True
-
- def _reset(self):
- """重置接收缓冲区"""
- self.buffer = bytearray()
- self.receiving = False
- self.start_time = 0
-
- def start(self):
- """启动接收线程"""
- self.running = True
- self.receive_thread = threading.Thread(
- target=self._receive_loop,
- daemon=True,
- name="UDPJPEGReceiver"
- )
- self.receive_thread.start()
- print(f"[UDP] Receiver started on {self.host}:{self.port}")
-
- def stop(self):
- """停止接收"""
- self.running = False
- if self.receive_thread:
- self.receive_thread.join(timeout=1.0)
- self.socket.close()
- print("[UDP] Receiver stopped")
-
- def get_image(self, timeout=0.1):
- """
- 获取一张完整图片
-
- Args:
- timeout: 等待超时时间(秒)
-
- Returns:
- numpy.ndarray: 解码后的图像(BGR格式),如果超时返回None
- """
- try:
- # 从队列获取完整JPEG数据
- jpeg_data = self.image_queue.get(timeout=timeout)
-
- # 解码JPEG
- image_array = np.frombuffer(jpeg_data, dtype=np.uint8)
- frame = cv2.imdecode(image_array, cv2.IMREAD_COLOR)
-
- if frame is None:
- print(f"[UDP] Failed to decode JPEG (size: {len(jpeg_data)} bytes)")
- # 尝试保存损坏的数据用于调试
- # with open('corrupt_jpeg.bin', 'wb') as f:
- # f.write(jpeg_data)
- return None
-
- return frame
-
- except queue.Empty:
- return None
- except Exception as e:
- print(f"[UDP] Error decoding JPEG: {e}")
- return None
- if __name__ == "__main__":
- # 测试代码
- receiver = UDPJPEGReceiver(host='0.0.0.0', port=5000)
- receiver.start()
-
- try:
- while True:
- frame = receiver.get_image(timeout=1.0)
- if frame is not None:
- print(f"Received image: {frame.shape}")
- cv2.imshow('UDP Image', frame)
- if cv2.waitKey(1) & 0xFF == ord('q'):
- break
- except KeyboardInterrupt:
- pass
- finally:
- receiver.stop()
- cv2.destroyAllWindows()
|