| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827 |
- #!/usr/bin/env python3
- # LightGlue demo with camera position tracking in reference image
- from pathlib import Path
- import argparse
- import cv2
- import matplotlib.cm as cm
- import torch
- import numpy as np
- import time
- from lightglue import LightGlue, SuperPoint
- from lightglue.utils import numpy_image_to_torch
- # 导入UDP JPEG接收器
- try:
- from udp_jpeg_receiver import UDPJPEGReceiver
- except ImportError:
- UDPJPEGReceiver = None
- torch.set_grad_enabled(False)
- class AverageTimer:
- """Class to help manage printing simple timing of code execution."""
-
- def __init__(self, smoothing=0.3, newline=False):
- self.smoothing = smoothing
- self.newline = newline
- self.times = {}
- self.will_print = {}
- self.reset()
-
- def reset(self):
- now = time.time()
- self.start = now
- self.last_time = now
- for name in self.will_print:
- self.will_print[name] = False
-
- def update(self, name='default'):
- now = time.time()
- dt = now - self.last_time
- if name in self.times:
- dt = self.smoothing * dt + (1 - self.smoothing) * self.times[name]
- self.times[name] = dt
- self.will_print[name] = True
- self.last_time = now
-
- def print(self, text='Timer'):
- total = 0.
- print('[{}]'.format(text), end=' ')
- for key in self.times:
- val = self.times[key]
- if self.will_print[key]:
- print('%s=%.3f' % (key, val), end=' ')
- total += val
- print('total=%.3f sec {%.1f FPS}' % (total, 1./total), end=' ')
- if self.newline:
- print(flush=True)
- else:
- print(end='\r', flush=True)
- self.reset()
- class VideoStreamer:
- """Class to help with reading images from a video stream."""
-
- def __init__(self, source, resize, skip, image_glob, max_length=1000000):
- self.source = source
- self.skip = skip
- self.max_length = max_length
- self.resize = resize
- self.i = 0
- self.cap = None
- self.is_ip_camera = False
- self.is_udp_jpeg = False
- self.udp_receiver = None
- self._is_digit_source = isinstance(source, int) or (isinstance(source, str) and source.isdigit())
-
- # 检测UDP透传JPEG模式
- if isinstance(source, str) and source.startswith('udp://'):
- if UDPJPEGReceiver is None:
- raise ImportError("UDPJPEGReceiver not available. Make sure udp_jpeg_receiver.py exists.")
-
- # 解析UDP地址:udp://host:port 或 udp://:port
- parts = source.replace('udp://', '').split(':')
- if len(parts) == 2:
- host = parts[0] if parts[0] else '0.0.0.0'
- port = int(parts[1])
- else:
- host = '0.0.0.0'
- port = int(parts[0])
-
- # 验证host是否是本机地址,如果不是则使用0.0.0.0
- import socket as sock
- try:
- # 尝试绑定到指定地址
- test_socket = sock.socket(sock.AF_INET, sock.SOCK_DGRAM)
- test_socket.bind((host, 0)) # 使用端口0测试
- test_socket.close()
- # 如果成功,说明是本机地址
- except OSError:
- # 绑定失败,说明不是本机地址,使用0.0.0.0
- print(f"Warning: {host} is not a local address, using 0.0.0.0 instead")
- host = '0.0.0.0'
-
- self.is_udp_jpeg = True
- self.udp_receiver = UDPJPEGReceiver(host=host, port=port)
- self.udp_receiver.start()
- print(f'UDP JPEG receiver initialized: {host}:{port}')
-
- elif Path(source).is_dir():
- self.listing = []
- for ext in image_glob:
- self.listing.extend(list(Path(source).glob(ext)))
-
- self.listing = self.listing[:self.max_length]
- self.max_length = len(self.listing)
- if self.max_length == 0:
- raise IOError('No images found in directory: {}'.format(source))
- print(f'Found {self.max_length} images in {source}')
- elif Path(source).exists():
- self.cap = cv2.VideoCapture(source)
- else:
- # Assume it's a webcam or IP camera
- # 对于IP摄像头,尝试使用FFMPEG后端以获得更好的控制
- if not self._is_digit_source and not Path(source).exists():
- # 这是IP摄像头URL
- self.is_ip_camera = True
- self.cap = cv2.VideoCapture(source, cv2.CAP_FFMPEG)
- else:
- self.cap = cv2.VideoCapture(int(source) if self._is_digit_source else source)
-
- # 优化IP摄像头网络流设置 - 减少延迟
- if self.is_ip_camera: # 如果是IP摄像头URL
- self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) # 最小化缓冲区
- self.cap.set(cv2.CAP_PROP_FPS, 30) # 尝试设置帧率
- self.cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'MJPG')) # 使用MJPEG编码(低延迟)
-
- def next_frame(self):
- # UDP JPEG模式
- if self.is_udp_jpeg:
- frame = self.udp_receiver.get_image(timeout=0.1)
- if frame is None:
- return None, False
-
- # Convert to grayscale
- if len(frame.shape) == 3:
- frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
-
- # 智能Resize:只在尺寸不匹配时才resize
- if len(self.resize) == 2:
- h, w = frame.shape[:2]
- # 如果尺寸不匹配才resize
- if w != self.resize[0] or h != self.resize[1]:
- frame = cv2.resize(frame, tuple(self.resize))
- elif len(self.resize) == 1 and self.resize[0] > 0:
- h, w = frame.shape[:2]
- max_dim = max(h, w)
- # 如果最大尺寸不匹配才resize
- if max_dim != self.resize[0]:
- scale = self.resize[0] / max_dim
- new_w, new_h = int(w * scale), int(h * scale)
- frame = cv2.resize(frame, (new_w, new_h))
-
- return frame, True
-
- if self.cap is not None:
- # 对于IP摄像头,清空缓冲区以获取最新帧
- if self.is_ip_camera:
- # 这是一个IP摄像头URL,清空缓冲区
- for _ in range(3): # 清空最多3帧旧数据(减少overhead)
- ret = self.cap.grab()
- if not ret:
- break
- ret, frame = self.cap.read()
- if not ret:
- return None, False
- # Convert to grayscale
- if len(frame.shape) == 3:
- frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
- else:
- if self.i >= self.max_length:
- return None, False
- image_file = self.listing[self.i]
- frame = cv2.imread(str(image_file), cv2.IMREAD_GRAYSCALE)
- if frame is None:
- print(f'Failed to load image: {image_file}')
- return None, False
- self.i += 1
-
- # Resize
- if len(self.resize) == 2:
- frame = cv2.resize(frame, tuple(self.resize))
- elif len(self.resize) == 1 and self.resize[0] > 0:
- h, w = frame.shape[:2]
- scale = self.resize[0] / max(h, w)
- new_w, new_h = int(w * scale), int(h * scale)
- frame = cv2.resize(frame, (new_w, new_h))
-
- # Skip frames
- if self.cap is not None:
- for _ in range(self.skip):
- ret, _ = self.cap.read()
- if not ret:
- return frame, True
-
- return frame, True
-
- def cleanup(self):
- if self.is_udp_jpeg and self.udp_receiver is not None:
- self.udp_receiver.stop()
- if self.cap is not None:
- self.cap.release()
- def frame2tensor(frame, device):
- """Convert frame to tensor."""
- if len(frame.shape) == 2:
- frame = frame[None, None] # Add batch and channel dimensions
- elif len(frame.shape) == 3:
- frame = frame[None] # Add batch dimension
- return torch.tensor(frame / 255., dtype=torch.float).to(device)
- def make_matching_plot_fast(image0, image1, kpts0, kpts1, mkpts0, mkpts1,
- color, text, path=None, show_keypoints=False,
- small_text=None, margin=10):
- """Create a visualization of matches between two images."""
- H0, W0 = image0.shape[:2]
- H1, W1 = image1.shape[:2]
- H, W = max(H0, H1), W0 + W1 + margin
-
- out = 255 * np.ones((H, W, 3), np.uint8)
- # Place images side by side
- out[:H0, :W0] = cv2.cvtColor(image0, cv2.COLOR_GRAY2BGR) if len(image0.shape) == 2 else image0
- out[:H1, W0+margin:] = cv2.cvtColor(image1, cv2.COLOR_GRAY2BGR) if len(image1.shape) == 2 else image1
-
- # Draw matches (lines only, no keypoints)
- if len(mkpts0) > 0:
- mkpts0_int = mkpts0.astype(int)
- mkpts1_int = mkpts1.astype(int)
- for i, ((x0, y0), (x1, y1)) in enumerate(zip(mkpts0_int, mkpts1_int)):
- c = (int(color[i][2] * 255), int(color[i][1] * 255), int(color[i][0] * 255))
- cv2.line(out, (x0, y0), (x1 + W0 + margin, y1), c, 1, lineType=cv2.LINE_AA)
-
- # No text information - clean display
-
- if path is not None:
- cv2.imwrite(str(path), out)
-
- return out
- def draw_camera_position_on_reference(reference_frame, camera_center_current, H, num_matches=0, min_matches=10, inliers_ratio=0.0):
- """
- 在参考图像上绘制摄像头当前位置的投影
-
- Args:
- reference_frame: 参考图像
- camera_center_current: 当前帧中摄像头的中心位置 (x, y)
- H: 单应性矩阵 (从参考图像到当前帧)
- num_matches: 当前匹配的特征点数量
- min_matches: 最小匹配数量阈值
- inliers_ratio: 内点比例
-
- Returns:
- 绘制了摄像头位置的参考图像
- """
- h_ref, w_ref = reference_frame.shape[:2]
- ref_colored = cv2.cvtColor(reference_frame.copy(), cv2.COLOR_GRAY2BGR)
- center_ref_int = (int(w_ref // 2), int(h_ref // 2))
-
- # 绘制参考图像中心(绿色十字)
- cv2.circle(ref_colored, center_ref_int, 15, (0, 255, 0), 2)
- cv2.line(ref_colored, (center_ref_int[0]-20, center_ref_int[1]),
- (center_ref_int[0]+20, center_ref_int[1]), (0, 255, 0), 3)
- cv2.line(ref_colored, (center_ref_int[0], center_ref_int[1]-20),
- (center_ref_int[0], center_ref_int[1]+20), (0, 255, 0), 3)
-
- # 检查匹配数量是否足够
- if H is None or num_matches < min_matches:
- # 匹配数量不足,不绘制摄像头位置
- if num_matches < min_matches:
- status_text = f"Insufficient matches: {num_matches}/{min_matches}"
- cv2.putText(ref_colored, status_text,
- (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)
- cv2.putText(ref_colored, "Camera position not available",
- (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)
- else:
- cv2.putText(ref_colored, "Reference Center",
- (center_ref_int[0] + 25, center_ref_int[1] - 10),
- cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
- return ref_colored
-
- # 匹配数量足够,计算摄像头位置
- try:
- H_inv = np.linalg.inv(H)
- camera_center_ref = cv2.perspectiveTransform(
- np.array([[camera_center_current]], dtype=np.float32).reshape(-1, 1, 2),
- H_inv
- )[0, 0]
-
- # 确保投影点在图像范围内
- camera_center_ref = np.clip(camera_center_ref, [0, 0], [w_ref-1, h_ref-1])
-
- # 绘制摄像头当前位置(红色十字)
- camera_pos_int = (int(camera_center_ref[0]), int(camera_center_ref[1]))
- cv2.circle(ref_colored, camera_pos_int, 12, (0, 0, 255), 2)
- cv2.line(ref_colored, (camera_pos_int[0]-15, camera_pos_int[1]),
- (camera_pos_int[0]+15, camera_pos_int[1]), (0, 0, 255), 3)
- cv2.line(ref_colored, (camera_pos_int[0], camera_pos_int[1]-15),
- (camera_pos_int[0], camera_pos_int[1]+15), (0, 0, 255), 3)
-
- # 绘制连接线
- cv2.line(ref_colored, center_ref_int, camera_pos_int, (255, 0, 255), 2)
-
- # 添加标签
- cv2.putText(ref_colored, "Reference Center",
- (center_ref_int[0] + 25, center_ref_int[1] - 10),
- cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
- cv2.putText(ref_colored, "Camera Position",
- (camera_pos_int[0] + 25, camera_pos_int[1] - 10),
- cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2)
-
- # 添加内点比例信息
- cv2.putText(ref_colored, f"Matches: {num_matches}",
- (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
- cv2.putText(ref_colored, f"Inliers: {inliers_ratio:.1%}",
- (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
-
- return ref_colored
-
- except np.linalg.LinAlgError:
- # 单应性矩阵不可逆
- cv2.putText(ref_colored, "Reference Center (Matrix Error)",
- (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
- cv2.putText(ref_colored, "Camera position not available",
- (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)
- return ref_colored
- if __name__ == '__main__':
- parser = argparse.ArgumentParser(
- description='LightGlue demo with camera position tracking',
- formatter_class=argparse.ArgumentDefaultsHelpFormatter)
- parser.add_argument(
- '--input', type=str, default='0',
- help='ID of a USB webcam, URL of an IP camera, '
- 'UDP stream (udp://host:port), '
- 'or path to an image directory or movie file')
- parser.add_argument(
- '--reference_image', type=str, default=None,
- help='Path to reference image to match against (if None, use first frame)')
- parser.add_argument(
- '--output_dir', type=str, default=None,
- help='Directory where to write output frames (If None, no output)')
-
- parser.add_argument(
- '--image_glob', type=str, nargs='+', default=['*.png', '*.jpg', '*.jpeg'],
- help='Glob if a directory of images is specified')
- parser.add_argument(
- '--skip', type=int, default=1,
- help='Images to skip if input is a movie or directory')
- parser.add_argument(
- '--max_length', type=int, default=1000000,
- help='Maximum length if input is a movie or directory')
- parser.add_argument(
- '--resize', type=int, nargs='+', default=[640, 480],
- help='Resize the input image before running inference. If two numbers, '
- 'resize to the exact dimensions, if one number, resize the max '
- 'dimension, if -1, do not resize')
-
- parser.add_argument(
- '--max_keypoints', type=int, default=2048,
- help='Maximum number of keypoints detected by SuperPoint')
- parser.add_argument(
- '--keypoint_threshold', type=float, default=0.005,
- help='SuperPoint keypoint detector confidence threshold')
- parser.add_argument(
- '--nms_radius', type=int, default=4,
- help='SuperPoint Non Maximum Suppression (NMS) radius')
- parser.add_argument(
- '--match_threshold', type=float, default=0.2,
- help='LightGlue match threshold')
-
- parser.add_argument(
- '--show_keypoints', action='store_true',
- help='Show the detected keypoints')
- parser.add_argument(
- '--no_display', action='store_true',
- help='Do not display images to screen. Useful if running remotely')
- parser.add_argument(
- '--force_cpu', action='store_true',
- help='Force pytorch to run in CPU mode.')
- parser.add_argument(
- '--min_matches', type=int, default=10,
- help='Minimum number of matches to compute homography')
- parser.add_argument(
- '--flip_horizontal', action='store_true',
- help='Flip camera feed horizontally (mirror)')
- parser.add_argument(
- '--flip_vertical', action='store_true',
- help='Flip camera feed vertically')
- parser.add_argument(
- '--rotate', type=int, default=0, choices=[0, 90, 180, 270],
- help='Rotate camera feed (0, 90, 180, 270 degrees clockwise)')
-
- # LightGlue specific parameters
- parser.add_argument(
- '--depth_confidence', type=float, default=0.95,
- help='LightGlue depth confidence for early stopping (-1 to disable)')
- parser.add_argument(
- '--width_confidence', type=float, default=0.99,
- help='LightGlue width confidence for point pruning (-1 to disable)')
- parser.add_argument(
- '--no_ui', action='store_true',
- help='Disable UI interface and run demo directly')
-
- opt = parser.parse_args()
-
- # Hide console output when launched from UI
- if opt.no_ui:
- import os
- import sys
- # Redirect stdout and stderr to suppress console output
- sys.stdout = open(os.devnull, 'w')
- sys.stderr = open(os.devnull, 'w')
-
- if len(opt.resize) == 2 and opt.resize[1] == -1:
- opt.resize = opt.resize[0:1]
- if len(opt.resize) == 2:
- print('Will resize to {}x{} (WxH)'.format(
- opt.resize[0], opt.resize[1]))
- elif len(opt.resize) == 1 and opt.resize[0] > 0:
- print('Will resize max dimension to {}'.format(opt.resize[0]))
- elif len(opt.resize) == 1:
- print('Will not resize images')
- else:
- raise ValueError('Cannot specify more than two integers for --resize')
-
- device = 'cuda' if torch.cuda.is_available() and not opt.force_cpu else 'cpu'
- print('Running inference on device \"{}\"'.format(device))
-
- # Initialize LightGlue and SuperPoint
- extractor = SuperPoint(
- max_num_keypoints=opt.max_keypoints,
- detection_threshold=opt.keypoint_threshold,
- nms_radius=opt.nms_radius
- ).eval().to(device)
-
- matcher = LightGlue(
- features='superpoint',
- depth_confidence=opt.depth_confidence,
- width_confidence=opt.width_confidence,
- filter_threshold=opt.match_threshold
- ).eval().to(device)
-
- print('Loaded SuperPoint and LightGlue models')
-
- # Load reference image if provided
- if opt.reference_image is not None:
- print(f'==> Loading reference image: {opt.reference_image}')
- ref_image = cv2.imread(opt.reference_image, cv2.IMREAD_GRAYSCALE)
- if ref_image is None:
- raise IOError(f'Cannot load reference image: {opt.reference_image}')
-
- # Resize reference image
- h, w = ref_image.shape[:2]
- if len(opt.resize) == 2:
- ref_image = cv2.resize(ref_image, tuple(opt.resize))
- elif len(opt.resize) == 1 and opt.resize[0] > 0:
- scale = opt.resize[0] / max(h, w)
- new_w, new_h = int(w * scale), int(h * scale)
- ref_image = cv2.resize(ref_image, (new_w, new_h))
-
- # Extract features from reference image
- ref_tensor = frame2tensor(ref_image, device)
- last_data = extractor({'image': ref_tensor})
- last_frame = ref_image
- last_image_id = 0
- print(f'==> Reference image loaded: {ref_image.shape}')
- else:
- # Use first frame from video stream as reference
- vs = VideoStreamer(opt.input, opt.resize, opt.skip,
- opt.image_glob, opt.max_length)
- frame, ret = vs.next_frame()
- assert ret, 'Error when reading the first frame (try different --input?)'
-
- # Apply rotation and flipping to first frame if requested
- if opt.rotate == 90:
- frame = cv2.rotate(frame, cv2.ROTATE_90_CLOCKWISE)
- elif opt.rotate == 180:
- frame = cv2.rotate(frame, cv2.ROTATE_180)
- elif opt.rotate == 270:
- frame = cv2.rotate(frame, cv2.ROTATE_90_COUNTERCLOCKWISE)
-
- if opt.flip_horizontal:
- frame = cv2.flip(frame, 1)
- if opt.flip_vertical:
- frame = cv2.flip(frame, 0)
-
- frame_tensor = frame2tensor(frame, device)
- last_data = extractor({'image': frame_tensor})
- last_frame = frame
- last_image_id = 0
-
- # Initialize video streamer if not already done
- if opt.reference_image is not None:
- vs = VideoStreamer(opt.input, opt.resize, opt.skip,
- opt.image_glob, opt.max_length)
-
- # 打印IP摄像头连接状态信息
- if hasattr(vs, 'cap') and vs.cap is not None:
- if isinstance(vs.cap, cv2.VideoCapture) and not opt.input.isdigit():
- actual_fps = vs.cap.get(cv2.CAP_PROP_FPS)
- actual_buf = vs.cap.get(cv2.CAP_PROP_BUFFERSIZE)
- print(f'IP Camera configured - FPS: {actual_fps:.1f}, Buffer: {actual_buf}')
-
- # Store reference image dimensions for bounding box
- h0, w0 = last_frame.shape[:2]
-
- if opt.output_dir is not None:
- print('==> Will write outputs to {}'.format(opt.output_dir))
- Path(opt.output_dir).mkdir(exist_ok=True)
-
- # Create windows to display the demo.
- # Only show Camera Position in Reference window
- if opt.no_display:
- print('Skipping visualization, will not show a GUI.')
- else:
- cv2.namedWindow('Camera Position in Reference', cv2.WINDOW_NORMAL)
- cv2.resizeWindow('Camera Position in Reference', 640, 480)
-
- # Print the keyboard help menu.
- print('==> Keyboard control:\n'
- '\tn: select the current frame as the anchor\n'
- '\te/r: increase/decrease the keypoint confidence threshold\n'
- '\td: decrease the match filtering threshold\n'
- '\tf: toggle FPS display\n'
- '\tk: toggle the visualization of keypoints\n'
- '\tq: quit')
-
- timer = AverageTimer()
- fps_display = 0.0 # For displaying FPS
- last_time = time.time()
- original_size = None # To store original frame size before resize
- show_fps = False # Toggle for FPS display
-
- while True:
- frame, ret = vs.next_frame()
- if not ret:
- print('Finished demo_lightglue_camera_position.py')
- break
-
- # Get original size before any transformation
- if original_size is None and hasattr(vs, 'cap') and vs.cap:
- # Try to get from video capture properties
- if isinstance(vs.cap, cv2.VideoCapture):
- orig_w = int(vs.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
- orig_h = int(vs.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
- if orig_w > 0 and orig_h > 0:
- original_size = (orig_w, orig_h)
-
- # Apply rotation and flipping if requested
- if opt.rotate == 90:
- frame = cv2.rotate(frame, cv2.ROTATE_90_CLOCKWISE)
- if original_size:
- original_size = (original_size[1], original_size[0]) # Swap for rotation
- elif opt.rotate == 180:
- frame = cv2.rotate(frame, cv2.ROTATE_180)
- elif opt.rotate == 270:
- frame = cv2.rotate(frame, cv2.ROTATE_90_COUNTERCLOCKWISE)
- if original_size:
- original_size = (original_size[1], original_size[0]) # Swap for rotation
-
- if opt.flip_horizontal:
- frame = cv2.flip(frame, 1) # 1 means horizontal flip
- if opt.flip_vertical:
- frame = cv2.flip(frame, 0) # 0 means vertical flip
-
- timer.update('data')
- stem0, stem1 = last_image_id, vs.i - 1 if hasattr(vs, 'i') else 0
-
- # Extract features from current frame
- frame_tensor = frame2tensor(frame, device)
- curr_data = extractor({'image': frame_tensor})
-
- # Match features
- matches01 = matcher({'image0': last_data, 'image1': curr_data})
-
- # Get keypoints and matches
- kpts0 = last_data['keypoints'][0].cpu().numpy()
- kpts1 = curr_data['keypoints'][0].cpu().numpy()
- matches = matches01['matches0'][0].cpu().numpy()
- confidence = matches01['matching_scores0'][0].cpu().numpy()
- timer.update('forward')
-
- # Calculate FPS
- current_time = time.time()
- time_diff = current_time - last_time
- if time_diff > 0:
- fps_display = 0.9 * fps_display + 0.1 * (1.0 / time_diff) # Smoothed FPS
- last_time = current_time
-
- # Extract valid matches
- valid = matches > -1
- mkpts0 = kpts0[valid]
- mkpts1 = kpts1[matches[valid]]
- mconf = confidence[valid]
- color = cm.jet(mconf)
-
- # Compute homography and draw bounding box
- box_color = (0, 255, 0) # Green
- num_matches = len(mkpts0)
- H = None
-
- # Initialize variables for stability (using global variables)
- global last_good_H, last_good_camera_pos, last_camera_pos, smooth_alpha
- if 'last_good_H' not in globals():
- last_good_H = None
- last_good_camera_pos = None
- last_camera_pos = None
- smooth_alpha = 0.7 # 平滑系数,越大越平滑
-
- if num_matches >= opt.min_matches:
- # Compute homography
- H, mask = cv2.findHomography(mkpts0, mkpts1, cv2.RANSAC, 5.0)
-
- if H is not None:
- # Calculate inliers ratio
- inliers_count = np.sum(mask)
- inliers_ratio = inliers_count / num_matches
-
- # Quality check for stability
- quality_good = inliers_ratio >= 0.25 and num_matches >= 10
-
- # Print inliers ratio to console with quality indicator (ASCII only for Windows consoles)
- quality_indicator = "OK" if quality_good else "WARN"
- print(f"[Homography] {quality_indicator} Total matches: {num_matches}, Inliers: {inliers_count}, Inliers ratio: {inliers_ratio:.2%}")
-
- if quality_good:
- # Quality is good, use current homography
- last_good_H = H.copy()
- current_H = H
- box_color = (0, 255, 0) # Green
- else:
- # Quality is poor, use last good homography if available
- if last_good_H is not None:
- current_H = last_good_H
- box_color = (255, 165, 0) # Orange (using fallback)
- print(f"[Homography] Using fallback matrix (quality too low)")
- else:
- current_H = H
- box_color = (0, 255, 255) # Yellow (first frame)
-
- # Define corners of the reference image
- h0, w0 = last_frame.shape[:2]
- corners_ref = np.float32([[0, 0], [w0, 0], [w0, h0], [0, h0]]).reshape(-1, 1, 2)
-
- # Transform corners to current frame
- corners_curr = cv2.perspectiveTransform(corners_ref, current_H)
-
- # Draw bounding box on current frame
- h1, w1 = frame.shape[:2]
- frame_with_box = cv2.cvtColor(frame.copy(), cv2.COLOR_GRAY2BGR)
- frame_with_box = cv2.polylines(frame_with_box, [np.int32(corners_curr)],
- True, box_color, 3, cv2.LINE_AA)
-
- # Add text with inliers ratio and quality
- quality_text = "Good" if quality_good else "Fallback"
- cv2.putText(frame_with_box, f'{quality_text}! Matches: {num_matches} | Inliers: {inliers_ratio:.1%}',
- (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.6, box_color, 2)
-
- else:
- # Homography computation failed
- print(f"[Homography] Failed - Total matches: {num_matches}")
- frame_with_box = cv2.cvtColor(frame.copy(), cv2.COLOR_GRAY2BGR)
- cv2.putText(frame_with_box, f'Tracking... Matches: {num_matches}',
- (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 0), 2)
- current_H = None
- else:
- frame_with_box = cv2.cvtColor(frame.copy(), cv2.COLOR_GRAY2BGR)
- cv2.putText(frame_with_box, f'Searching... Matches: {num_matches}',
- (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)
-
- # Add FPS display in bottom right corner
- h_box, w_box = frame_with_box.shape[:2]
- fps_text = f'FPS: {fps_display:.1f}'
- cv2.putText(frame_with_box, fps_text,
- (w_box - 120, h_box - 15), cv2.FONT_HERSHEY_SIMPLEX,
- 0.6, (255, 255, 255), 2)
-
- # Draw red crosshair in the center of Object Detection window
- center_x, center_y = w_box // 2, h_box // 2
- crosshair_size = 20
- cv2.line(frame_with_box,
- (center_x - crosshair_size, center_y),
- (center_x + crosshair_size, center_y),
- (0, 0, 255), 4, cv2.LINE_AA) # Red horizontal line
- cv2.line(frame_with_box,
- (center_x, center_y - crosshair_size),
- (center_x, center_y + crosshair_size),
- (0, 0, 255), 4, cv2.LINE_AA) # Red vertical line
-
- # 计算当前帧中摄像头的中心位置
- h_curr, w_curr = frame.shape[:2]
- camera_center_current = (w_curr // 2, h_curr // 2)
-
- # 在参考图像上绘制摄像头位置
- inliers_ratio = 0.0
- if 'current_H' in locals() and current_H is not None and num_matches >= opt.min_matches:
- inliers_count = np.sum(mask) if 'mask' in locals() else 0
- inliers_ratio = inliers_count / num_matches if num_matches > 0 else 0.0
-
- # 计算摄像头在参考图像中的位置
- try:
- H_inv = np.linalg.inv(current_H)
- camera_center_ref = cv2.perspectiveTransform(
- np.array([[camera_center_current]], dtype=np.float32).reshape(-1, 1, 2),
- H_inv
- )[0, 0]
-
- # 应用平滑滤波
- if last_camera_pos is not None:
- # 指数平滑
- camera_center_ref = smooth_alpha * last_camera_pos + (1 - smooth_alpha) * camera_center_ref
-
- last_camera_pos = camera_center_ref.copy()
-
- # 更新camera_center_current为平滑后的位置
- camera_center_current_smooth = cv2.perspectiveTransform(
- np.array([[camera_center_ref]], dtype=np.float32).reshape(-1, 1, 2),
- current_H
- )[0, 0]
-
- except np.linalg.LinAlgError:
- # 矩阵不可逆,使用原始位置
- pass
-
- reference_with_camera_pos = draw_camera_position_on_reference(
- last_frame, camera_center_current, current_H if 'current_H' in locals() else H,
- num_matches, opt.min_matches, inliers_ratio
- )
-
- # Add FPS display to Camera Position window if enabled
- if show_fps:
- fps_text = f'FPS: {fps_display:.1f}'
- cv2.putText(reference_with_camera_pos, fps_text,
- (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2)
-
- text = [
- 'LightGlue with Camera Position Tracking',
- 'Keypoints: {}:{}'.format(len(kpts0), len(kpts1)),
- 'Matches: {}'.format(len(mkpts0))
- ]
- small_text = [
- 'Keypoint Threshold: {:.4f}'.format(opt.keypoint_threshold),
- 'Match Threshold: {:.2f}'.format(opt.match_threshold),
- 'Image Pair: {:06}:{:06}'.format(stem0, stem1),
- 'Stopped at layer: {}/{}'.format(matches01['stop'], 9)
- ]
-
- # Create visualization with matches
- out = make_matching_plot_fast(
- last_frame, frame, kpts0, kpts1, mkpts0, mkpts1, color, text,
- path=None, show_keypoints=opt.show_keypoints, small_text=small_text)
-
- # No additional text or elements added to out image
-
- # Show only Camera Position in Reference window
- if not opt.no_display:
- cv2.imshow('Camera Position in Reference', reference_with_camera_pos)
- key = chr(cv2.waitKey(1) & 0xFF)
- else:
- key = ''
-
- # Handle keyboard input for both modes
- if key == 'q':
- vs.cleanup()
- print('Exiting (via q) demo_lightglue_camera_position.py')
- break
- elif key == 'n': # set the current frame as anchor
- last_data = curr_data
- last_frame = frame
- last_image_id = stem1
- elif key in ['e', 'r']:
- # Increase/decrease keypoint threshold by 10% each keypress.
- d = 0.1 * (-1 if key == 'e' else 1)
- opt.keypoint_threshold = min(max(
- 0.0001, opt.keypoint_threshold * (1 + d)), 1)
- extractor.conf.detection_threshold = opt.keypoint_threshold
- print('\nChanged the keypoint threshold to {:.4f}'.format(
- opt.keypoint_threshold))
- elif key == 'd':
- # Decrease match threshold by 0.05
- opt.match_threshold = min(max(
- 0.05, opt.match_threshold - 0.05), .95)
- matcher.conf.filter_threshold = opt.match_threshold
- print('\nChanged the match threshold to {:.2f}'.format(
- opt.match_threshold))
- elif key == 'f':
- # Toggle FPS display
- show_fps = not show_fps
- elif key == 'k':
- opt.show_keypoints = not opt.show_keypoints
-
- timer.update('viz')
- timer.print('LightGlue')
-
- if opt.output_dir is not None:
- stem = 'matches_{:06}_{:06}'.format(stem0, stem1)
- out_file = str(Path(opt.output_dir, stem + '.png'))
- print('\nWriting image to {}'.format(out_file))
- cv2.imwrite(out_file, out)
-
- # Also save detection result
- det_file = str(Path(opt.output_dir, 'detection_' + stem + '.png'))
- cv2.imwrite(det_file, frame_with_box)
-
- # Save camera position result
- cam_file = str(Path(opt.output_dir, 'camera_pos_' + stem + '.png'))
- cv2.imwrite(cam_file, reference_with_camera_pos)
-
- cv2.destroyAllWindows()
- vs.cleanup()
|