| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269 |
- # Part of the implementation is borrowed and modified from DIFRINT,
- # publicly available at https://github.com/jinsc37/DIFRINT/blob/master/metrics.py
- import os
- import sys
- import tempfile
- from typing import Dict
- import cv2
- import numpy as np
- from tqdm import tqdm
- from modelscope.metainfo import Metrics
- from modelscope.models.cv.video_stabilization.utils.WarpUtils import \
- warpListImage
- from modelscope.utils.registry import default_group
- from .base import Metric
- from .builder import METRICS, MetricKeys
- @METRICS.register_module(
- group_key=default_group, module_name=Metrics.video_stabilization_metric)
- class VideoStabilizationMetric(Metric):
- """The metric for video summarization task.
- """
- def __init__(self):
- self.inputs = []
- self.outputs = []
- def add(self, outputs: Dict, inputs: Dict):
- out = video_merger(warpprocess(outputs))
- self.outputs.append(out['video'])
- self.inputs.append(inputs['input'][0])
- def evaluate(self):
- CR = []
- DV = []
- SS = []
- for output, input in zip(self.outputs, self.inputs):
- cropping_ratio, distortion_value, stability_score = \
- metrics(input, output)
- if cropping_ratio <= 1 and distortion_value <= 1 and stability_score <= 1:
- CR.append(cropping_ratio)
- DV.append(distortion_value)
- SS.append(stability_score)
- else:
- print('Removed one error item when computing metrics.')
- return {
- MetricKeys.CROPPING_RATIO: sum(CR) / len(CR),
- MetricKeys.DISTORTION_VALUE: sum(DV) / len(DV),
- MetricKeys.STABILITY_SCORE: sum(SS) / len(SS),
- }
- def merge(self, other: 'VideoStabilizationMetric'):
- self.inputs.extend(other.inputs)
- self.outputs.extend(other.outputs)
- def __getstate__(self):
- return self.inputs, self.outputs
- def __setstate__(self, state):
- self.inputs, self.outputs = state
- def warpprocess(inputs):
- """ video stabilization postprocess
- Args:
- inputs: input data
- Return:
- dict of results: a dict containing outputs of model.
- """
- x_paths = inputs['origin_motion'][:, :, :, 0]
- y_paths = inputs['origin_motion'][:, :, :, 1]
- sx_paths = inputs['smooth_path'][:, :, :, 0]
- sy_paths = inputs['smooth_path'][:, :, :, 1]
- new_x_motion_meshes = sx_paths - x_paths
- new_y_motion_meshes = sy_paths - y_paths
- out_images = warpListImage(inputs['ori_images'], new_x_motion_meshes,
- new_y_motion_meshes, inputs['width'],
- inputs['height'])
- return {
- 'output': out_images,
- 'fps': inputs['fps'],
- 'width': inputs['width'],
- 'height': inputs['height'],
- 'base_crop_width': inputs['base_crop_width']
- }
- def video_merger(inputs):
- out_images = inputs['output'].numpy().astype(np.uint8)
- out_images = [
- np.transpose(out_images[idx], (1, 2, 0))
- for idx in range(out_images.shape[0])
- ]
- output_video_path = tempfile.NamedTemporaryFile(suffix='.mp4').name
- fourcc = cv2.VideoWriter_fourcc(*'mp4v')
- w = inputs['width']
- h = inputs['height']
- base_crop_width = inputs['base_crop_width']
- video_writer = cv2.VideoWriter(output_video_path, fourcc, inputs['fps'],
- (w, h))
- for idx, frame in enumerate(out_images):
- horizontal_border = int(base_crop_width * w / 1280)
- vertical_border = int(horizontal_border * h / w)
- new_frame = frame[vertical_border:-vertical_border,
- horizontal_border:-horizontal_border]
- new_frame = cv2.resize(new_frame, (w, h))
- video_writer.write(new_frame)
- video_writer.release()
- return {'video': output_video_path}
- def metrics(original_v, pred_v):
- # Create brute-force matcher object
- bf = cv2.BFMatcher()
- sift = cv2.SIFT_create()
- # Apply the homography transformation if we have enough good matches
- MIN_MATCH_COUNT = 10
- ratio = 0.7
- thresh = 5.0
- CR_seq = []
- DV_seq = []
- Pt = np.eye(3)
- P_seq = []
- vc_o = cv2.VideoCapture(original_v)
- vc_p = cv2.VideoCapture(pred_v)
- rval_o = vc_o.isOpened()
- rval_p = vc_p.isOpened()
- imgs1 = []
- imgs1o = []
- while (rval_o and rval_p):
- rval_o, img1 = vc_o.read()
- rval_p, img1o = vc_p.read()
- if rval_o and rval_p:
- imgs1.append(img1)
- imgs1o.append(img1o)
- is_got_bad_item = False
- print('processing ' + original_v.split('/')[-1] + ':')
- for i in tqdm(range(len(imgs1))):
- # Load the images in gray scale
- img1 = imgs1[i]
- img1o = imgs1o[i]
- # Detect the SIFT key points and compute the descriptors for the two images
- keyPoints1, descriptors1 = sift.detectAndCompute(img1, None)
- keyPoints1o, descriptors1o = sift.detectAndCompute(img1o, None)
- # Match the descriptors
- matches = bf.knnMatch(descriptors1, descriptors1o, k=2)
- # Select the good matches using the ratio test
- goodMatches = []
- for m, n in matches:
- if m.distance < ratio * n.distance:
- goodMatches.append(m)
- if len(goodMatches) > MIN_MATCH_COUNT:
- # Get the good key points positions
- sourcePoints = np.float32([
- keyPoints1[m.queryIdx].pt for m in goodMatches
- ]).reshape(-1, 1, 2)
- destinationPoints = np.float32([
- keyPoints1o[m.trainIdx].pt for m in goodMatches
- ]).reshape(-1, 1, 2)
- # Obtain the homography matrix
- M, _ = cv2.findHomography(
- sourcePoints,
- destinationPoints,
- method=cv2.RANSAC,
- ransacReprojThreshold=thresh)
- else:
- is_got_bad_item = True
- # end
- if not is_got_bad_item:
- # Obtain Scale, Translation, Rotation, Distortion value
- # Based on https://math.stackexchange.com/questions/78137/decomposition-of-a-nonsquare-affine-matrix
- scaleRecovered = np.sqrt(M[0, 1]**2 + M[0, 0]**2)
- w, _ = np.linalg.eig(M[0:2, 0:2])
- w = np.sort(w)[::-1]
- DV = w[1] / w[0]
- CR_seq.append(1 / scaleRecovered)
- DV_seq.append(DV)
- # For Stability score calculation
- if i + 1 < len(imgs1):
- img2o = imgs1o[i + 1]
- keyPoints2o, descriptors2o = sift.detectAndCompute(img2o, None)
- matches = bf.knnMatch(descriptors1o, descriptors2o, k=2)
- goodMatches = []
- for m, n in matches:
- if m.distance < ratio * n.distance:
- goodMatches.append(m)
- if len(goodMatches) > MIN_MATCH_COUNT:
- # Get the good key points positions
- sourcePoints = np.float32([
- keyPoints1o[m.queryIdx].pt for m in goodMatches
- ]).reshape(-1, 1, 2)
- destinationPoints = np.float32([
- keyPoints2o[m.trainIdx].pt for m in goodMatches
- ]).reshape(-1, 1, 2)
- # Obtain the homography matrix
- M, _ = cv2.findHomography(
- sourcePoints,
- destinationPoints,
- method=cv2.RANSAC,
- ransacReprojThreshold=thresh)
- # end
- P_seq.append(np.matmul(Pt, M))
- Pt = np.matmul(Pt, M)
- # end
- # end
- if is_got_bad_item:
- return -1, -1, -1
- # Make 1D temporal signals
- P_seq_t = []
- P_seq_r = []
- for Mp in P_seq:
- transRecovered = np.sqrt(Mp[0, 2]**2 + Mp[1, 2]**2)
- # Based on https://math.stackexchange.com/questions/78137/decomposition-of-a-nonsquare-affine-matrix
- thetaRecovered = np.arctan2(Mp[1, 0], Mp[0, 0]) * 180 / np.pi
- P_seq_t.append(transRecovered)
- P_seq_r.append(thetaRecovered)
- # FFT
- fft_t = np.fft.fft(P_seq_t)
- fft_r = np.fft.fft(P_seq_r)
- fft_t = np.abs(fft_t)**2
- fft_r = np.abs(fft_r)**2
- fft_t = np.delete(fft_t, 0)
- fft_r = np.delete(fft_r, 0)
- fft_t = fft_t[:len(fft_t) // 2]
- fft_r = fft_r[:len(fft_r) // 2]
- SS_t = np.sum(fft_t[:5]) / np.sum(fft_t)
- SS_r = np.sum(fft_r[:5]) / np.sum(fft_r)
- return np.min([np.mean(CR_seq),
- 1]), np.absolute(np.min(DV_seq)), (SS_t + SS_r) / 2
|