# Copyright (c) Alibaba, Inc. and its affiliates. # This file is adapted from the AllenNLP library at https://github.com/allenai/allennlp # Part of the implementation is borrowed from wimglenn/johnnydep import copy import filecmp import importlib import os import pkgutil import shutil import sys from contextlib import contextmanager from fnmatch import fnmatch from pathlib import Path from typing import Any, Iterable, List, Optional, Set, Union import json import pkg_resources from modelscope import snapshot_download from modelscope.fileio.file import LocalStorage from modelscope.utils.ast_utils import FilesAstScanning from modelscope.utils.constant import DEFAULT_MODEL_REVISION from modelscope.utils.file_utils import get_modelscope_cache_dir from modelscope.utils.logger import get_logger logger = get_logger() storage = LocalStorage() MODELSCOPE_FILE_DIR = get_modelscope_cache_dir() MODELSCOPE_DYNAMIC_MODULE = 'modelscope_modules' BASE_MODULE_DIR = os.path.join(MODELSCOPE_FILE_DIR, MODELSCOPE_DYNAMIC_MODULE) PLUGINS_FILENAME = '.modelscope_plugins' OFFICIAL_PLUGINS = [ { 'name': 'adaseq', 'desc': 'Provide hundreds of additions NERs algorithms, check: https://github.com/modelscope/AdaSeq', 'version': '', 'url': '' }, ] LOCAL_PLUGINS_FILENAME = '.modelscope_plugins' GLOBAL_PLUGINS_FILENAME = os.path.join(Path.home(), '.modelscope', 'plugins') DEFAULT_PLUGINS = [] @contextmanager def pushd(new_dir: str, verbose: bool = False): """ Changes the current directory to the given path and prepends it to `sys.path`. This method is intended to use with `with`, so after its usage, the current directory will be set to the previous value. """ previous_dir = os.getcwd() if verbose: logger.info(f'Changing directory to {new_dir}') # type: ignore os.chdir(new_dir) try: yield finally: if verbose: logger.info(f'Changing directory back to {previous_dir}') os.chdir(previous_dir) @contextmanager def push_python_path(path: str): """ Prepends the given path to `sys.path`. This method is intended to use with `with`, so after its usage, its value will be removed from `sys.path`. """ path = Path(path).resolve() path = str(path) sys.path.insert(0, path) try: yield finally: sys.path.remove(path) def discover_file_plugins( filename: str = LOCAL_PLUGINS_FILENAME) -> Iterable[str]: """ Discover plugins from file """ with open(filename) as f: for module_name in f: module_name = module_name.strip() if module_name: yield module_name def discover_plugins(requirement_path=None) -> Iterable[str]: """ Discover plugins Args: requirement_path: The file path of requirement """ plugins: Set[str] = set() if requirement_path is None: if os.path.isfile(LOCAL_PLUGINS_FILENAME): with push_python_path('.'): for plugin in discover_file_plugins(LOCAL_PLUGINS_FILENAME): if plugin in plugins: continue yield plugin plugins.add(plugin) if os.path.isfile(GLOBAL_PLUGINS_FILENAME): for plugin in discover_file_plugins(GLOBAL_PLUGINS_FILENAME): if plugin in plugins: continue yield plugin plugins.add(plugin) else: if os.path.isfile(requirement_path): for plugin in discover_file_plugins(requirement_path): if plugin in plugins: continue yield plugin plugins.add(plugin) def import_all_plugins(plugins: List[str] = None) -> List[str]: """ Imports default plugins, input plugins and file discovered plugins. """ import_module_and_submodules( 'modelscope', include={ 'modelscope.metrics.builder', 'modelscope.models.builder', 'modelscope.pipelines.builder', 'modelscope.preprocessors.builder', 'modelscope.trainers.builder', }, exclude={ 'modelscope.metrics.*', 'modelscope.models.*', 'modelscope.pipelines.*', 'modelscope.preprocessors.*', 'modelscope.trainers.*', 'modelscope.msdatasets', 'modelscope.utils', 'modelscope.exporters', }) imported_plugins: List[str] = [] imported_plugins.extend(import_plugins(DEFAULT_PLUGINS)) imported_plugins.extend(import_plugins(plugins)) imported_plugins.extend(import_file_plugins()) return imported_plugins def import_plugins(plugins: List[str] = None) -> List[str]: """ Imports the plugins listed in the arguments. """ imported_plugins: List[str] = [] if plugins is None or len(plugins) == 0: return imported_plugins # Workaround for a presumed Python issue where spawned processes can't find modules in the current directory. cwd = os.getcwd() if cwd not in sys.path: sys.path.append(cwd) for module_name in plugins: try: # TODO: include and exclude should be configurable, hard code now import_module_and_submodules( module_name, include={ 'easycv.toolkit.modelscope', 'easycv.hooks', 'easycv.models', 'easycv.core', 'easycv.toolkit', 'easycv.predictors', }, exclude={ 'easycv.toolkit.*', 'easycv.*', }) logger.info('Plugin %s available', module_name) imported_plugins.append(module_name) except ModuleNotFoundError as e: logger.error(f'Plugin {module_name} could not be loaded: {e}') return imported_plugins def import_file_plugins(requirement_path=None) -> List[str]: """ Imports the plugins found with `discover_plugins()`. Args: requirement_path: The file path of requirement """ imported_plugins: List[str] = [] # Workaround for a presumed Python issue where spawned processes can't find modules in the current directory. cwd = os.getcwd() if cwd not in sys.path: sys.path.append(cwd) for module_name in discover_plugins(requirement_path): try: importlib.import_module(module_name) logger.info('Plugin %s available', module_name) imported_plugins.append(module_name) except ModuleNotFoundError as e: logger.error(f'Plugin {module_name} could not be loaded: {e}') return imported_plugins def import_module_and_submodules(package_name: str, include: Optional[Set[str]] = None, exclude: Optional[Set[str]] = None) -> None: """ Import all public submodules under the given package. """ # take care of None include = include if include else set() exclude = exclude if exclude else set() def fn_in(package_name: str, pattern_set: Set[str]) -> bool: for pattern in pattern_set: if fnmatch(package_name, pattern): return True return False if not fn_in(package_name, include) and fn_in(package_name, exclude): return importlib.invalidate_caches() # For some reason, python doesn't always add this by default to your path, but you pretty much # always want it when using `--include-package`. And if it's already there, adding it again at # the end won't hurt anything. with push_python_path('.'): # Import at top level try: module = importlib.import_module(package_name) path = getattr(module, '__path__', []) path_string = '' if not path else path[0] # walk_packages only finds immediate children, so need to recurse. for module_finder, name, _ in pkgutil.iter_modules(path): # Sometimes when you import third-party libraries that are on your path, # `pkgutil.walk_packages` returns those too, so we need to skip them. # `pkgutil.iter_modules` avoid import those package if path_string and module_finder.path != path_string: # type: ignore[union-attr] continue if name.startswith('_'): # skip directly importing private subpackages continue if name.startswith('test'): # skip tests continue subpackage = f'{package_name}.{name}' import_module_and_submodules( subpackage, include=include, exclude=exclude) except SystemExit as e: # this case is specific for easy_cv's tools/predict.py exit logger.warning( f'{package_name} not imported: {str(e)}, but should continue') pass except Exception as e: logger.warning(f'{package_name} not imported: {str(e)}') if len(package_name.split('.')) == 1: raise ModuleNotFoundError('Package not installed') def install_module_from_requirements(requirement_path, ): """ install module from requirements Args: requirement_path: The path of requirement file No returns, raise error if failed """ install_list = [] with open(requirement_path, 'r', encoding='utf-8') as f: requirements = f.read().splitlines() for req in requirements: if req == '': continue installed, _ = PluginsManager.check_plugin_installed(req) if not installed: install_list.append(req) if len(install_list) > 0: status_code, _, args = PluginsManager.pip_command( 'install', install_list, ) if status_code != 0: raise ImportError( f'Failed to install requirements from {requirement_path}') def import_module_from_file(module_name, file_path): """ install module by name with file path Args: module_name: the module name need to be import file_path: the related file path that matched with the module name Returns: return the module class """ spec = importlib.util.spec_from_file_location(module_name, file_path) module = importlib.util.module_from_spec(spec) spec.loader.exec_module(module) return module def create_module_from_files(file_list, file_prefix, module_name): """ Create a python module from a list of files by copying them to the destination directory. Args: file_list (List[str]): List of relative file paths to be copied. file_prefix (str): Path prefix for each file in file_list. module_name (str): Name of the module. Returns: None """ def create_empty_file(file_path): with open(file_path, 'w') as _: pass dest_dir = os.path.join(BASE_MODULE_DIR, module_name) for file_path in file_list: file_dir = os.path.dirname(file_path) target_dir = os.path.join(dest_dir, file_dir) os.makedirs(target_dir, exist_ok=True) init_file = os.path.join(target_dir, '__init__.py') if not os.path.exists(init_file): create_empty_file(init_file) target_file = os.path.join(target_dir, os.path.basename(file_path)) src_file = os.path.join(file_prefix, file_path) if not os.path.exists(target_file) or not filecmp.cmp( src_file, target_file): shutil.copyfile(src_file, target_file) importlib.invalidate_caches() def import_module_from_model_dir(model_dir): """ import all the necessary module from a model dir Args: model_dir: model file location No returns, raise error if failed """ from pathlib import Path file_scanner = FilesAstScanning() file_scanner.traversal_files(model_dir, include_init=True) file_dirs = file_scanner.file_dirs requirements = file_scanner.requirement_dirs # install the requirements firstly install_requirements_by_files(requirements) if BASE_MODULE_DIR not in sys.path: sys.path.append(BASE_MODULE_DIR) module_name = Path(model_dir).stem # in order to keep forward compatibility, we add module path to # sys.path so that submodule can be imported directly as before MODULE_PATH = os.path.join(BASE_MODULE_DIR, module_name) if MODULE_PATH not in sys.path: sys.path.append(MODULE_PATH) relative_file_dirs = [ file.replace(model_dir.rstrip(os.sep) + os.sep, '') for file in file_dirs ] create_module_from_files(relative_file_dirs, model_dir, module_name) for file in relative_file_dirs: submodule = module_name + '.' + file.replace('.py', '').replace( os.sep, '.') importlib.import_module(submodule) def install_requirements_by_names(plugins: List[str]): """ install the requirements by names Args: plugins: name of plugins (pai-easyscv, transformers) No returns, raise error if failed """ plugins_manager = PluginsManager() uninstalled_plugins = [] for plugin in plugins: plugin_installed, version = plugins_manager.check_plugin_installed( plugin) if not plugin_installed: uninstalled_plugins.append(plugin) status, _ = plugins_manager.install_plugins(uninstalled_plugins) if status != 0: raise EnvironmentError( f'The required packages {",".join(uninstalled_plugins)} are not installed.', f'Please run the command `modelscope plugin install {" ".join(uninstalled_plugins)}` to install them.' ) def install_requirements_by_files(requirements: List[str]): """ install the requirements by files Args: requirements: a list of files including requirements info (requirements.txt) No returns, raise error if failed """ for requirement in requirements: install_module_from_requirements(requirement) def register_plugins_repo(plugins: List[str]) -> None: """ Try to install and import plugins from repo""" if plugins is not None: install_requirements_by_names(plugins) modules = [] for plugin in plugins: module_name, module_version, _ = get_modules_from_package(plugin) modules.extend(module_name) import_plugins(modules) def register_modelhub_repo(model_dir, allow_remote=False) -> None: """ Try to install and import remote model from modelhub""" if allow_remote: logger.warning( f'Use allow_remote=True. Will invoke codes from {model_dir}. Please make sure ' 'that you can trust the external codes.') try: import_module_from_model_dir(model_dir) except KeyError: logger.warning( 'Multi component keys in the hub are registered in same file') pass DEFAULT_INDEX = 'https://pypi.org/simple/' def get_modules_from_package(package): """ to get the modules from an installed package Args: package: The distribution name or package name Returns: import_names: The modules that in the package distribution import_version: The version of those modules, should be same and identical package_name: The package name, if installed by whl file, the package is unknown, should be passed """ from zipfile import ZipFile from tempfile import mkdtemp from subprocess import check_output, STDOUT from glob import glob import hashlib from urllib.parse import urlparse from urllib import request as urllib2 from pip._internal.utils.packaging import get_requirement def urlretrieve(url, filename, data=None, auth=None): if auth is not None: # https://docs.python.org/2.7/howto/urllib2.html#id6 password_mgr = urllib2.HTTPPasswordMgrWithDefaultRealm() # Add the username and password. # If we knew the realm, we could use it instead of None. username, password = auth top_level_url = urlparse(url).netloc password_mgr.add_password(None, top_level_url, username, password) handler = urllib2.HTTPBasicAuthHandler(password_mgr) # create "opener" (OpenerDirector instance) opener = urllib2.build_opener(handler) else: opener = urllib2.build_opener() res = opener.open(url, data=data) headers = res.info() with open(filename, 'wb') as fp: fp.write(res.read()) return filename, headers def compute_checksum(target, algorithm='sha256', blocksize=2**13): hashtype = getattr(hashlib, algorithm) hash_ = hashtype() logger.debug('computing checksum', target=target, algorithm=algorithm) with open(target, 'rb') as f: for chunk in iter(lambda: f.read(blocksize), b''): hash_.update(chunk) result = hash_.hexdigest() logger.debug('computed checksum', result=result) return result def _get_pip_version(): # try to get pip version without actually importing pip # setuptools gets upset if you import pip before importing setuptools.. try: import importlib.metadata # Python 3.8+ return importlib.metadata.version('pip') except Exception: pass import pip return pip.__version__ def _download_dist(url, scratch_file, index_url, extra_index_url): auth = None if index_url: parsed = urlparse(index_url) if parsed.username and parsed.password and parsed.hostname == urlparse( url).hostname: # handling private PyPI credentials in index_url auth = (parsed.username, parsed.password) if extra_index_url: parsed = urlparse(extra_index_url) if parsed.username and parsed.password and parsed.hostname == urlparse( url).hostname: # handling private PyPI credentials in extra_index_url auth = (parsed.username, parsed.password) target, _headers = urlretrieve(url, scratch_file, auth=auth) return target, _headers def _get_wheel_args(index_url, env, extra_index_url): args = [ sys.executable, '-m', 'pip', 'wheel', '-vvv', # --verbose x3 '--no-deps', '--no-cache-dir', '--disable-pip-version-check', ] if index_url is not None: args += ['--index-url', index_url] if index_url != DEFAULT_INDEX: hostname = urlparse(index_url).hostname if hostname: args += ['--trusted-host', hostname] if extra_index_url is not None: args += [ '--extra-index-url', extra_index_url, '--trusted-host', urlparse(extra_index_url).hostname ] if env is None: pip_version = _get_pip_version() else: pip_version = dict(env)['pip_version'] args[0] = dict(env)['python_executable'] pip_major, pip_minor = pip_version.split('.')[0:2] pip_major = int(pip_major) pip_minor = int(pip_minor) if pip_major >= 10: args.append('--progress-bar=off') if (20, 3) <= (pip_major, pip_minor) < (21, 1): # See https://github.com/pypa/pip/issues/9139#issuecomment-735443177 args.append('--use-deprecated=legacy-resolver') return args def get(dist_name, index_url=None, env=None, extra_index_url=None, tmpdir=None, ignore_errors=False): args = _get_wheel_args(index_url, env, extra_index_url) + [dist_name] scratch_dir = mkdtemp(dir=tmpdir) logger.debug( 'wheeling and dealing', scratch_dir=os.path.abspath(scratch_dir), args=' '.join(args)) try: out = check_output( args, stderr=STDOUT, cwd=scratch_dir).decode('utf-8') except ChildProcessError as err: out = getattr(err, 'output', b'').decode('utf-8') logger.warning(out) if not ignore_errors: raise logger.debug('wheel command completed ok', dist_name=dist_name) links = [] local_links = [] lines = out.splitlines() for i, line in enumerate(lines): line = line.strip() if line.startswith('Downloading from URL '): parts = line.split() link = parts[3] links.append(link) elif line.startswith('Downloading '): parts = line.split() last = parts[-1] if len(parts) == 3 and last.startswith('(') and last.endswith( ')'): link = parts[-2] elif len(parts) == 4 and parts[-2].startswith( '(') and last.endswith(')'): link = parts[-3] if not urlparse(link).scheme: # newest pip versions have changed to not log the full url # in the download event. it is becoming more and more annoying # to preserve compatibility across a wide range of pip versions next_line = lines[i + 1].strip() if next_line.startswith( 'Added ') and ' to build tracker' in next_line: link = next_line.split( ' to build tracker')[0].split()[-1] else: link = last links.append(link) elif line.startswith( 'Source in ') and 'which satisfies requirement' in line: link = line.split()[-1] links.append(link) elif line.startswith('Added ') and ' from file://' in line: [link] = [x for x in line.split() if x.startswith('file://')] local_links.append(link) if not links: # prefer http scheme over file links += local_links links = list(dict.fromkeys(links)) # order-preserving dedupe if not links: logger.warning('could not find download link', out=out) raise Exception('failed to collect dist') if len(links) == 2: # sometimes we collect the same link, once with a url fragment/checksum and once without first, second = links if first.startswith(second): del links[1] elif second.startswith(first): del links[0] if len(links) > 1: logger.debug('more than 1 link collected', out=out, links=links) # Since PEP 517, maybe an sdist will also need to collect other distributions # for the build system, even with --no-deps specified. pendulum==1.4.4 is one # example, which uses poetry and doesn't publish any python37 wheel to PyPI. # However, the dist itself should still be the first one downloaded. link = links[0] whls = glob(os.path.join(os.path.abspath(scratch_dir), '*.whl')) try: [whl] = whls except ValueError: if ignore_errors: whl = '' else: raise url, _sep, checksum = link.partition('#') url = url.replace( '/%2Bf/', '/+f/' ) # some versions of pip did not unquote this fragment in the log if not checksum.startswith('md5=') and not checksum.startswith( 'sha256='): # PyPI gives you the checksum in url fragment, as a convenience. But not all indices are so kind. algorithm = 'md5' if os.path.basename(whl).lower() == url.rsplit('/', 1)[-1].lower(): target = whl else: scratch_file = os.path.join(scratch_dir, os.path.basename(url)) target, _headers = _download_dist(url, scratch_file, index_url, extra_index_url) checksum = compute_checksum(target=target, algorithm=algorithm) checksum = '='.join([algorithm, checksum]) result = {'path': whl, 'url': url, 'checksum': checksum} return result def discover_import_names(whl_file): import re logger.debug('finding import names') zipfile = ZipFile(file=whl_file) namelist = zipfile.namelist() [top_level_fname ] = [x for x in namelist if x.endswith('top_level.txt')] [metadata_fname ] = [x for x in namelist if x.endswith('.dist-info/METADATA')] all_names = zipfile.read(top_level_fname).decode( 'utf-8').strip().splitlines() metadata = zipfile.read(metadata_fname).decode('utf-8') public_names = [n for n in all_names if not n.startswith('_')] version_pattern = re.compile(r'^Version: (?P.+)$', re.MULTILINE) name_pattern = re.compile(r'^Name: (?P.+)$', re.MULTILINE) version_match = version_pattern.search(metadata) name_match = name_pattern.search(metadata) module_version = version_match.group('version') module_name = name_match.group('name') return public_names, module_version, module_name tmpdir = mkdtemp() if package.endswith('.whl'): """if user using .whl file then parse the whl to get the module name""" if not os.path.isfile(package): file_name = os.path.basename(package) file_path = os.path.join(tmpdir, file_name) whl_file, _ = _download_dist(package, file_path, None, None) else: whl_file = package else: """if user using package name then generate whl file and parse the file to get the module name by the discover_import_names method """ req = get_requirement(package) package = req.name data = get(package, tmpdir=tmpdir) whl_file = data['path'] import_names, import_version, package_name = discover_import_names( whl_file) shutil.rmtree(tmpdir) return import_names, import_version, package_name class PluginsManager(object): """ plugins manager class """ def __init__(self, cache_dir=MODELSCOPE_FILE_DIR, plugins_file=PLUGINS_FILENAME): cache_dir = os.getenv('MODELSCOPE_CACHE', cache_dir) plugins_file = os.getenv('MODELSCOPE_PLUGINS_FILE', plugins_file) self._file_path = os.path.join(cache_dir, plugins_file) @property def file_path(self): return self._file_path @file_path.setter def file_path(self, value): self._file_path = value @staticmethod def check_plugin_installed(package): """ Check if the plugin is installed, and if the version is valid Args: package: the package name need to be installed Returns: if_installed: True if installed version: the version of installed or None if not installed """ if package.split('.')[-1] == 'whl': # install from whl should test package name instead of module name _, module_version, package_name = get_modules_from_package(package) local_installed, version = PluginsManager._check_plugin_installed( package_name) if local_installed and module_version != version: return False, version elif not local_installed: return False, version return True, module_version else: return PluginsManager._check_plugin_installed(package) @staticmethod def _check_plugin_installed(package, verified_version=None): from pip._internal.utils.packaging import get_requirement, specifiers req = get_requirement(package) try: importlib.reload(pkg_resources) package_meta_info = pkg_resources.working_set.by_key[req.name] version = package_meta_info.version # To test if the package is installed installed = True # If installed, test if the version is correct for spec in req.specifier: installed_valid_version = spec.contains(version) if not installed_valid_version: installed = False break except KeyError: version = '' installed = False if installed and verified_version is not None and verified_version != version: return False, verified_version else: return installed, version @staticmethod def pip_command( command, command_args: List[str], ): """ Args: command: install, uninstall command command_args: the args to be used with command, should be in list such as ['-r', 'requirements'] Returns: status_code: The pip command status code, 0 if success, else is failed options: parsed option from system args by pip command args: the unknown args that could be parsed by pip command """ from pip._internal.commands import create_command importlib.reload(pkg_resources) if command == 'install': command_args.append('-f') command_args.append( 'https://modelscope.oss-cn-beijing.aliyuncs.com/releases/repo.html' ) command = create_command(command) options, args = command.parse_args(command_args) status_code = command.main(command_args) # reload the pkg_resources in order to get the latest pkgs information importlib.reload(pkg_resources) return status_code, options, args def install_plugins(self, install_args: List[str], index_url: Optional[str] = None, force_update=False) -> Any: """Install packages via pip Args: install_args (list): List of arguments passed to `pip install`. index_url (str, optional): The pypi index url. force_update: If force update on or off """ if len(install_args) == 0: return 0, [] if index_url is not None: install_args += ['-i', index_url] if force_update is not False: install_args += ['-f'] status_code, options, args = PluginsManager.pip_command( 'install', install_args, ) if status_code == 0: logger.info(f'The plugins {",".join(args)} is installed') # TODO Add Ast index for ast update record # Add the plugins info to the local record installed_package = self.parse_args_info(args, options) self.update_plugins_file(installed_package) return status_code, install_args def parse_args_info(self, args: List[str], options): """ parse arguments input info Args: args: the list of args from pip command output options: the options that parsed from system args by pip command method Returns: installed_package: generate installed package info in order to store in the file the info includes: name, url and desc of the package """ installed_package = [] # the case of install with requirements if len(args) == 0: src_dir = options.src_dir requirements = options.requirments for requirement in requirements: package_info = { 'name': requirement, 'url': os.path.join(src_dir, requirement), 'desc': '', 'version': '' } installed_package.append(package_info) def get_package_info(package_name): from pathlib import Path package_info = { 'name': package_name, 'url': options.index_url, 'desc': '' } # the case with git + http if package_name.split('.')[-1] == 'git': package_name = Path(package_name).stem plugin_installed, version = self.check_plugin_installed( package_name) if plugin_installed: package_info['version'] = version package_info['name'] = package_name else: logger.warning( f'The package {package_name} is not in the lib, this might be happened' f' when installing the package with git+https method, should be ignored' ) package_info['version'] = '' return package_info for package in args: package_info = get_package_info(package) installed_package.append(package_info) return installed_package def uninstall_plugins(self, uninstall_args: Union[str, List], is_yes=False): """ uninstall plugins Args: uninstall_args: args used to uninstall by pip command is_yes: force yes without verified Returns: status code, and uninstall args """ if is_yes is not None: uninstall_args += ['-y'] status_code, options, args = PluginsManager.pip_command( 'uninstall', uninstall_args, ) if status_code == 0: logger.info(f'The plugins {",".join(args)} is uninstalled') # TODO Add Ast index for ast update record # Add to the local record self.remove_plugins_from_file(args) return status_code, uninstall_args def _get_plugins_from_file(self): """ get plugins from file """ logger.info(f'Loading plugins information from {self.file_path}') if os.path.exists(self.file_path): local_plugins_info_bytes = storage.read(self.file_path) local_plugins_info = json.loads(local_plugins_info_bytes) else: local_plugins_info = {} return local_plugins_info def _update_plugins( self, new_plugins_list, local_plugins_info, override=False, ): for item in new_plugins_list: package_name = item.pop('name') # update package information if existed if package_name in local_plugins_info and not override: original_item = local_plugins_info[package_name] from pkg_resources import parse_version item_version = parse_version( item['version'] if item['version'] != '' else '0.0.0') origin_version = parse_version( original_item['version'] if original_item['version'] != '' else '0.0.0') desc = item['desc'] if original_item['desc'] != '' and desc == '': desc = original_item['desc'] item = item if item_version > origin_version else original_item item['desc'] = desc # Double-check if the item is installed with the version number if item['version'] == '': plugin_installed, version = self.check_plugin_installed( package_name) item['version'] = version local_plugins_info[package_name] = item return local_plugins_info def _print_plugins_info(self, local_plugins_info): print('{:<15} |{:<10} |{:<100}'.format('NAME', 'VERSION', 'DESCRIPTION')) print('') for k, v in local_plugins_info.items(): print('{:<15} |{:<10} |{:<100}'.format(k, v['version'], v['desc'])) def list_plugins( self, show_all=False, ): """ Args: show_all: show installed and official supported if True, else only those installed Returns: local_plugins_info: show the list of plugins info """ local_plugins_info = self._get_plugins_from_file() # update plugins with default local_official_plugins = copy.deepcopy(OFFICIAL_PLUGINS) local_plugins_info = self._update_plugins(local_official_plugins, local_plugins_info) if show_all is True: self._print_plugins_info(local_plugins_info) return local_plugins_info # Consider those package with version is installed not_installed_list = [] for item in local_plugins_info: if local_plugins_info[item]['version'] == '': not_installed_list.append(item) for item in not_installed_list: local_plugins_info.pop(item) self._print_plugins_info(local_plugins_info) return local_plugins_info def update_plugins_file( self, plugins_list, override=False, ): """update the plugins file in order to maintain the latest plugins information Args: plugins_list: The plugins list contain the information of plugins name, version, introduction, install url and the status of delete or update override: Override the file by the list if True, else only update. Returns: local_plugins_info_json: the json version of updated plugins info """ local_plugins_info = self._get_plugins_from_file() # local_plugins_info is empty if first time loading, should add OFFICIAL_PLUGINS information if local_plugins_info == {}: plugins_list.extend(copy.deepcopy(OFFICIAL_PLUGINS)) local_plugins_info = self._update_plugins(plugins_list, local_plugins_info, override) local_plugins_info_json = json.dumps(local_plugins_info) storage.write(local_plugins_info_json.encode(), self.file_path) return local_plugins_info_json def remove_plugins_from_file( self, package_names: Union[str, list], ): """remove the plugins from file Args: package_names: package name Returns: local_plugins_info_json: the json version of updated plugins info """ local_plugins_info = self._get_plugins_from_file() if type(package_names) is str: package_names = list(package_names) for item in package_names: if item in local_plugins_info: local_plugins_info.pop(item) local_plugins_info_json = json.dumps(local_plugins_info) storage.write(local_plugins_info_json.encode(), self.file_path) return local_plugins_info_json class EnvsManager(object): name = 'envs' def __init__(self, model_id, model_revision=DEFAULT_MODEL_REVISION, cache_dir=MODELSCOPE_FILE_DIR): """ Args: model_id: id of the model, not dir model_revision: revision of the model, default as master cache_dir: the system modelscope cache dir """ cache_dir = os.getenv('MODELSCOPE_CACHE', cache_dir) self.env_dir = os.path.join(cache_dir, EnvsManager.name, model_id) model_dir = snapshot_download(model_id, revision=model_revision) from modelscope.utils.hub import read_config cfg = read_config(model_dir) self.plugins = cfg.get('plugins', []) self.allow_remote = cfg.get('allow_remote', False) import venv self.env_builder = venv.EnvBuilder( system_site_packages=True, clear=False, symlinks=True, with_pip=False) def get_env_dir(self): return self.env_dir def get_activate_dir(self): return os.path.join(self.env_dir, 'bin', 'activate') def check_if_need_env(self): if len(self.plugins) or self.allow_remote: return True else: return False def create_env(self): if not os.path.exists(self.env_dir): os.makedirs(self.env_dir) try: self.env_builder.create(self.env_dir) except Exception as e: self.clean_env() raise EnvironmentError( f'Failed to create virtual env at {self.env_dir} with error: {e}' ) def clean_env(self): if os.path.exists(self.env_dir): self.env_builder.clear_directory(self.env_dir) @staticmethod def run_process(cmd): import subprocess status, result = subprocess.getstatusoutput(cmd) logger.debug('The status and the results are: {}, {}'.format( status, result)) if status != 0: raise Exception( 'running the cmd: {} failed, with message: {}'.format( cmd, result)) return result if __name__ == '__main__': install_requirements_by_files(['adaseq']) import_name, import_version, package_name = get_modules_from_package( 'pai-easycv')