image_matching_pipeline.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. # Copyright (c) Alibaba, Inc. and its affiliates.
  2. from typing import Any, Dict, List, Union
  3. import cv2
  4. import numpy as np
  5. import PIL
  6. import torch
  7. from modelscope.metainfo import Pipelines
  8. from modelscope.outputs import OutputKeys
  9. from modelscope.pipelines.base import Input, Model, Pipeline
  10. from modelscope.pipelines.builder import PIPELINES
  11. from modelscope.preprocessors import LoadImage
  12. from modelscope.utils.constant import Tasks
  13. from modelscope.utils.logger import get_logger
  14. logger = get_logger()
  15. @PIPELINES.register_module(
  16. Tasks.image_matching, module_name=Pipelines.image_matching)
  17. class ImageMatchingPipeline(Pipeline):
  18. """ Image Matching Pipeline.
  19. Examples:
  20. >>> from modelscope.outputs import OutputKeys
  21. >>> from modelscope.pipelines import pipeline
  22. >>> from modelscope.utils.constant import Tasks
  23. >>> task = 'image-matching'
  24. >>> model_id = 'damo/cv_quadtree_attention_image-matching_outdoor'
  25. >>> input_location = [
  26. >>> ['data/test/images/image_matching1.jpg',
  27. >>> 'data/test/images/image_matching2.jpg']
  28. >>> ]
  29. >>> estimator = pipeline(Tasks.image_matching, model=self.model_id)
  30. >>> result = estimator(input_location)
  31. >>> kpts0, kpts1, conf = result[0][OutputKeys.MATCHES]
  32. >>> print(f'Found {len(kpts0)} matches')
  33. """
  34. def __init__(self, model: str, **kwargs):
  35. """
  36. use `model` to create a image matching pipeline for prediction
  37. Args:
  38. model: model id on modelscope hub.
  39. """
  40. super().__init__(model=model, **kwargs)
  41. # check if cuda is available
  42. if not torch.cuda.is_available():
  43. raise RuntimeError(
  44. 'Cuda is not available. Image matching model only supports cuda.'
  45. )
  46. logger.info('image matching model, pipeline init')
  47. def resize_image(self, img, max_image_size):
  48. h, w = img.shape[:2]
  49. scale = 1
  50. if max(h, w) > max_image_size:
  51. scale = max_image_size / max(h, w)
  52. new_w, new_h = int(w * scale), int(h * scale)
  53. img = cv2.resize(img, (new_w, new_h), interpolation=cv2.INTER_AREA)
  54. return img, scale
  55. def compute_paded_size(self, size, div):
  56. return int(np.ceil(size / div) * div)
  57. def pad_image(self, img, h=None, w=None, div=32):
  58. cur_h, cur_w = img.shape[:2]
  59. if h is None and w is None:
  60. h, w = cur_h, cur_w
  61. h_pad, w_pad = self.compute_paded_size(h,
  62. div), self.compute_paded_size(
  63. w, div)
  64. img = cv2.copyMakeBorder(
  65. img,
  66. 0,
  67. h_pad - cur_h,
  68. 0,
  69. w_pad - cur_w,
  70. cv2.BORDER_CONSTANT,
  71. value=0)
  72. return img
  73. def load_image(self, img_name):
  74. img = LoadImage.convert_to_ndarray(img_name).astype(np.float32)
  75. img = img / 255.
  76. # convert rgb to gray
  77. if len(img.shape) == 3:
  78. img = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
  79. return img
  80. def preprocess(self, input: Input, max_image_size=1024):
  81. assert len(input) == 2, 'input should be a list of two images'
  82. img1 = self.load_image(input[0])
  83. img1, scale1 = self.resize_image(img1, max_image_size)
  84. scaled_h1, scaled_w1 = img1.shape[:2]
  85. img2 = self.load_image(input[1])
  86. img2, scale2 = self.resize_image(img2, max_image_size)
  87. scaled_h2, scaled_w2 = img2.shape[:2]
  88. h_max, w_max = max(scaled_h1, scaled_h2), max(scaled_w1, scaled_w2)
  89. img1 = self.pad_image(img1, h_max, w_max)
  90. img2 = self.pad_image(img2, h_max, w_max)
  91. img1 = torch.from_numpy(img1)[None][None].cuda().float()
  92. img2 = torch.from_numpy(img2)[None][None].cuda().float()
  93. return {
  94. 'image0':
  95. img1,
  96. 'image1':
  97. img2,
  98. 'preprocess_info':
  99. [scale1, scale2, scaled_h1, scaled_w1, scaled_h2, scaled_w2]
  100. }
  101. def postprocess_match(self, kpt1, kpt2, conf, scale1, scale2, scaled_h1,
  102. scaled_w1, scaled_h2, scaled_w2):
  103. # filter out points outside the image
  104. valid_match = (kpt1[:, 0] < scaled_w1) & (kpt1[:, 1] < scaled_h1) & (
  105. kpt2[:, 0] < scaled_w2) & (
  106. kpt2[:, 1] < scaled_h2)
  107. kpt1, kpt2 = kpt1[valid_match], kpt2[valid_match]
  108. kpt1 = kpt1 / scale1
  109. kpt2 = kpt2 / scale2
  110. conf = conf[valid_match]
  111. return kpt1, kpt2, conf
  112. def forward(self, input: Dict[str, Any]) -> Dict[str, Any]:
  113. results = self.model.inference(input)
  114. return results
  115. def postprocess(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
  116. results = self.model.postprocess(inputs)
  117. matches = results[OutputKeys.MATCHES]
  118. kpts0 = matches['kpts0'].cpu().numpy()
  119. kpts1 = matches['kpts1'].cpu().numpy()
  120. conf = matches['conf'].cpu().numpy()
  121. preprocess_info = [v.cpu().numpy() for v in inputs['preprocess_info']]
  122. kpts0, kpts1, conf = self.postprocess_match(kpts0, kpts1, conf,
  123. *preprocess_info)
  124. outputs = {
  125. OutputKeys.MATCHES: [kpts0, kpts1, conf],
  126. }
  127. return outputs
  128. def __call__(self, input, **kwargs):
  129. """
  130. Match two images and return the matched keypoints and confidence.
  131. Args:
  132. input (`List[List[str]]`): A list of two image paths.
  133. Return:
  134. A list of result.
  135. The list contain the following values:
  136. - kpts0 -- Matched keypoints in the first image
  137. - kpts1 -- Matched keypoints in the second image
  138. - conf -- Confidence of the match
  139. """
  140. return super().__call__(input, **kwargs)