| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122 |
- # Modified from https://github.com/Annbless/DUTCode
- # Copyright (c) Alibaba, Inc. and its affiliates.
- import glob
- import math
- import os
- import subprocess
- import tempfile
- from typing import Any, Dict, Optional, Union
- import cv2
- import numpy as np
- import torch
- from modelscope.metainfo import Pipelines
- from modelscope.metrics.video_stabilization_metric import warpprocess
- from modelscope.models.cv.video_stabilization.DUTRAFTStabilizer import \
- DUTRAFTStabilizer
- from modelscope.outputs import OutputKeys
- from modelscope.pipelines.base import Input, Pipeline
- from modelscope.pipelines.builder import PIPELINES
- from modelscope.preprocessors import LoadImage
- from modelscope.preprocessors.cv import VideoReader
- from modelscope.utils.constant import ModelFile, Tasks
- from modelscope.utils.logger import get_logger
- logger = get_logger()
- def check_file_exist(filename, msg_tmpl='file "{}" does not exist'):
- if not osp.isfile(filename):
- raise FileNotFoundError(msg_tmpl.format(filename))
- __all__ = ['VideoStabilizationPipeline']
- @PIPELINES.register_module(
- Tasks.video_stabilization, module_name=Pipelines.video_stabilization)
- class VideoStabilizationPipeline(Pipeline):
- """ Video Stabilization Pipeline.
- Examples:
- >>> import cv2
- >>> from modelscope.outputs import OutputKeys
- >>> from modelscope.pipelines import pipeline
- >>> from modelscope.utils.constant import Tasks
- >>> test_video = 'https://modelscope.oss-cn-beijing.aliyuncs.com/test/videos/video_stabilization_test_video.avi'
- >>> video_stabilization = pipeline(Tasks.video_stabilization, model='damo/cv_dut-raft_video-stabilization_base')
- >>> out_video_path = video_stabilization(test_video)[OutputKeys.OUTPUT_VIDEO]
- >>> print('Pipeline: the output video path is {}'.format(out_video_path))
- """
- def __init__(self,
- model: Union[DUTRAFTStabilizer, str],
- preprocessor=None,
- **kwargs):
- super().__init__(model=model, preprocessor=preprocessor, **kwargs)
- logger.info('load video stabilization model done')
- def preprocess(self, input: Input) -> Dict[str, Any]:
- # read video
- video_reader = VideoReader(input)
- fps = video_reader.fps
- width = video_reader.width
- height = video_reader.height
- return {
- 'vid_path': input,
- 'fps': fps,
- 'width': width,
- 'height': height
- }
- def forward(self, input: Dict[str, Any]) -> Dict[str, Any]:
- results = self.model._inference_forward(input['vid_path'])
- results = warpprocess(results)
- out_images = results['output']
- out_images = out_images.numpy().astype(np.uint8)
- out_images = [
- np.transpose(out_images[idx], (1, 2, 0))
- for idx in range(out_images.shape[0])
- ]
- base_crop_width = results['base_crop_width']
- return {
- 'output': out_images,
- 'fps': input['fps'],
- 'base_crop_width': base_crop_width
- }
- def postprocess(self, inputs: Dict[str, Any], **kwargs) -> Dict[str, Any]:
- output_video_path = kwargs.get('output_video', None)
- is_cvt_h264 = kwargs.get('is_cvt_h264', False)
- if output_video_path is None:
- output_video_path = tempfile.NamedTemporaryFile(suffix='.mp4').name
- h, w = inputs['output'][0].shape[-3:-1]
- fourcc = cv2.VideoWriter_fourcc(*'mp4v')
- video_writer = cv2.VideoWriter(output_video_path, fourcc,
- inputs['fps'], (w, h))
- for idx, frame in enumerate(inputs['output']):
- horizontal_border = int(inputs['base_crop_width'] * w / 1280)
- vertical_border = int(horizontal_border * h / w)
- new_frame = frame[vertical_border:-vertical_border,
- horizontal_border:-horizontal_border]
- new_frame = cv2.resize(new_frame, (w, h))
- video_writer.write(new_frame)
- video_writer.release()
- if is_cvt_h264:
- assert os.system(
- 'ffmpeg -version'
- ) == 0, 'ffmpeg is not installed correctly, please refer to https://trac.ffmpeg.org/wiki/CompilationGuide.'
- output_video_path_for_web = output_video_path[:-4] + '_web.mp4'
- convert_cmd = f'ffmpeg -i {output_video_path} -vcodec h264 -crf 5 {output_video_path_for_web}'
- subprocess.call(convert_cmd, shell=True)
- return {OutputKeys.OUTPUT_VIDEO: output_video_path_for_web}
- else:
- return {OutputKeys.OUTPUT_VIDEO: output_video_path}
|