| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246 |
- # Copyright (c) Alibaba, Inc. and its affiliates.
- import concurrent.futures
- import os
- import shutil
- import tempfile
- from multiprocessing import Manager, Process, Value
- from pathlib import Path
- from typing import List, Optional, Union
- import json
- from modelscope.hub.api import HubApi
- from modelscope.hub.constants import ModelVisibility
- from modelscope.utils.constant import DEFAULT_REPOSITORY_REVISION
- from modelscope.utils.logger import get_logger
- logger = get_logger()
- _executor = concurrent.futures.ProcessPoolExecutor(max_workers=8)
- _queues = dict()
- _flags = dict()
- _tasks = dict()
- _manager = None
- def _push_files_to_hub(
- path_or_fileobj: Union[str, Path],
- path_in_repo: str,
- repo_id: str,
- token: Union[str, bool, None] = None,
- revision: Optional[str] = DEFAULT_REPOSITORY_REVISION,
- commit_message: Optional[str] = None,
- commit_description: Optional[str] = None,
- ):
- """Push files to model hub incrementally
- This function if used for patch_hub, user is not recommended to call this.
- This function will be merged to push_to_hub in later sprints.
- """
- if not os.path.exists(path_or_fileobj):
- return
- from modelscope import HubApi
- api = HubApi()
- api.login(token)
- if not commit_message:
- commit_message = 'Updating files'
- if commit_description:
- commit_message = commit_message + '\n' + commit_description
- with tempfile.TemporaryDirectory() as temp_cache_dir:
- from modelscope.hub.repository import Repository
- repo = Repository(temp_cache_dir, repo_id, revision=revision)
- if path_in_repo:
- sub_folder = os.path.join(temp_cache_dir, path_in_repo)
- else:
- sub_folder = temp_cache_dir
- os.makedirs(sub_folder, exist_ok=True)
- if os.path.isfile(path_or_fileobj):
- dest_file = os.path.join(sub_folder,
- os.path.basename(path_or_fileobj))
- shutil.copyfile(path_or_fileobj, dest_file)
- else:
- shutil.copytree(path_or_fileobj, sub_folder, dirs_exist_ok=True)
- repo.push(commit_message)
- def _api_push_to_hub(repo_name,
- output_dir,
- token,
- private=True,
- commit_message='',
- tag=None,
- source_repo='',
- ignore_file_pattern=None,
- revision=DEFAULT_REPOSITORY_REVISION):
- try:
- api = HubApi()
- api.login(token)
- api.push_model(
- repo_name,
- output_dir,
- visibility=ModelVisibility.PUBLIC
- if not private else ModelVisibility.PRIVATE,
- chinese_name=repo_name,
- commit_message=commit_message,
- tag=tag,
- original_model_id=source_repo,
- ignore_file_pattern=ignore_file_pattern,
- revision=revision)
- commit_message = commit_message or 'No commit message'
- logger.info(
- f'Successfully upload the model to {repo_name} with message: {commit_message}'
- )
- return True
- except Exception as e:
- logger.error(
- f'Error happens when uploading model {repo_name} with message: {commit_message}: {e}'
- )
- return False
- def push_to_hub(repo_name,
- output_dir,
- token=None,
- private=True,
- retry=3,
- commit_message='',
- tag=None,
- source_repo='',
- ignore_file_pattern=None,
- revision=DEFAULT_REPOSITORY_REVISION):
- """
- Args:
- repo_name: The repo name for the modelhub repo
- output_dir: The local output_dir for the checkpoint
- token: The user api token, function will check the `MODELSCOPE_API_TOKEN` variable if this argument is None
- private: If is a private repo, default True
- retry: Retry times if something error in uploading, default 3
- commit_message: The commit message
- tag: The tag of this commit
- source_repo: The source repo (model id) which this model comes from
- ignore_file_pattern: The file pattern to be ignored in uploading.
- revision: The branch to commit to
- Returns:
- The boolean value to represent whether the model is uploaded.
- """
- if token is None:
- token = os.environ.get('MODELSCOPE_API_TOKEN')
- if ignore_file_pattern is None:
- ignore_file_pattern = os.environ.get('UPLOAD_IGNORE_FILE_PATTERN')
- assert repo_name is not None
- assert token is not None, 'Either pass in a token or to set `MODELSCOPE_API_TOKEN` in the environment variables.'
- assert os.path.isdir(output_dir)
- assert 'configuration.json' in os.listdir(output_dir) or 'configuration.yaml' in os.listdir(output_dir) \
- or 'configuration.yml' in os.listdir(output_dir)
- logger.info(
- f'Uploading {output_dir} to {repo_name} with message {commit_message}')
- for i in range(retry):
- if _api_push_to_hub(repo_name, output_dir, token, private,
- commit_message, tag, source_repo,
- ignore_file_pattern, revision):
- return True
- return False
- def push_to_hub_async(repo_name,
- output_dir,
- token=None,
- private=True,
- commit_message='',
- tag=None,
- source_repo='',
- ignore_file_pattern=None,
- revision=DEFAULT_REPOSITORY_REVISION):
- """
- Args:
- repo_name: The repo name for the modelhub repo
- output_dir: The local output_dir for the checkpoint
- token: The user api token, function will check the `MODELSCOPE_API_TOKEN` variable if this argument is None
- private: If is a private repo, default True
- commit_message: The commit message
- tag: The tag of this commit
- source_repo: The source repo (model id) which this model comes from
- ignore_file_pattern: The file pattern to be ignored in uploading
- revision: The branch to commit to
- Returns:
- A handler to check the result and the status
- """
- if token is None:
- token = os.environ.get('MODELSCOPE_API_TOKEN')
- if ignore_file_pattern is None:
- ignore_file_pattern = os.environ.get('UPLOAD_IGNORE_FILE_PATTERN')
- assert repo_name is not None
- assert token is not None, 'Either pass in a token or to set `MODELSCOPE_API_TOKEN` in the environment variables.'
- assert os.path.isdir(output_dir)
- assert 'configuration.json' in os.listdir(output_dir) or 'configuration.yaml' in os.listdir(output_dir) \
- or 'configuration.yml' in os.listdir(output_dir)
- logger.info(
- f'Uploading {output_dir} to {repo_name} with message {commit_message}')
- return _executor.submit(_api_push_to_hub, repo_name, output_dir, token,
- private, commit_message, tag, source_repo,
- ignore_file_pattern, revision)
- def submit_task(q, b):
- while True:
- b.value = False
- item = q.get()
- logger.info(item)
- b.value = True
- if not item.pop('done', False):
- delete_dir = item.pop('delete_dir', False)
- output_dir = item.get('output_dir')
- try:
- push_to_hub(**item)
- if delete_dir and os.path.exists(output_dir):
- shutil.rmtree(output_dir)
- except Exception as e:
- logger.error(e)
- else:
- break
- class UploadStrategy:
- cancel = 'cancel'
- wait = 'wait'
- def push_to_hub_in_queue(queue_name, strategy=UploadStrategy.cancel, **kwargs):
- assert queue_name is not None and len(
- queue_name) > 0, 'Please specify a valid queue name!'
- global _manager
- if _manager is None:
- _manager = Manager()
- if queue_name not in _queues:
- _queues[queue_name] = _manager.Queue()
- _flags[queue_name] = Value('b', False)
- process = Process(
- target=submit_task, args=(_queues[queue_name], _flags[queue_name]))
- process.start()
- _tasks[queue_name] = process
- queue = _queues[queue_name]
- flag: Value = _flags[queue_name]
- if kwargs.get('done', False):
- queue.put(kwargs)
- elif flag.value and strategy == UploadStrategy.cancel:
- logger.error(
- f'Another uploading is running, '
- f'this uploading with message {kwargs.get("commit_message")} will be canceled.'
- )
- else:
- queue.put(kwargs)
- def wait_for_done(queue_name):
- process: Process = _tasks.pop(queue_name, None)
- if process is None:
- return
- process.join()
- _queues.pop(queue_name)
- _flags.pop(queue_name)
|