| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381 |
- # Copyright (c) Alibaba, Inc. and its affiliates.
- # Copyright 2022-present, the HuggingFace Inc. team.
- import atexit
- import contextlib
- import os
- import time
- import types
- from concurrent.futures import Future, ThreadPoolExecutor
- from io import SEEK_END, SEEK_SET, BytesIO
- from pathlib import Path
- from threading import Lock, Thread
- from typing import Dict, List, Optional, Union
- from modelscope.hub.api import HubApi
- from modelscope.hub.constants import Visibility
- from modelscope.utils.constant import DEFAULT_REPOSITORY_REVISION
- from modelscope.utils.logger import get_logger
- from modelscope.utils.repo_utils import CommitInfo, RepoUtils
- logger = get_logger()
- IGNORE_GIT_FOLDER_PATTERNS = ['.git', '.git/*', '*/.git', '**/.git/**']
- @contextlib.contextmanager
- def patch_upload_folder_for_scheduler(scheduler_instance):
- """Patch upload_folder for CommitScheduler"""
- api = scheduler_instance.api
- original_prepare = api._prepare_upload_folder
- def patched_prepare_upload_folder(
- api_self,
- folder_path_or_files: Union[str, Path, List[str], List[Path]],
- path_in_repo: str,
- allow_patterns: Optional[Union[List[str], str]] = None,
- ignore_patterns: Optional[Union[List[str], str]] = None,
- ) -> List[Union[tuple, list]]:
- """
- Patched version that supports incremental updates for CommitScheduler.
- """
- with scheduler_instance.lock:
- if isinstance(folder_path_or_files, list):
- raise ValueError(
- 'Uploading multiple files or folders is not supported for scheduled commit.'
- )
- elif os.path.isfile(folder_path_or_files):
- raise ValueError(
- 'Uploading file is not supported for scheduled commit.')
- else:
- folder_path = Path(folder_path_or_files).expanduser().resolve()
- logger.debug('Listing files to upload for scheduled commit.')
- relpath_to_abspath = {
- path.relative_to(folder_path).as_posix(): path
- for path in sorted(folder_path.glob('**/*')) if path.is_file()
- }
- prefix = f"{path_in_repo.strip('/')}/" if path_in_repo else ''
- prepared_repo_objects = []
- files_to_track = {}
- for relpath in RepoUtils.filter_repo_objects(
- relpath_to_abspath.keys(),
- allow_patterns=allow_patterns,
- ignore_patterns=ignore_patterns):
- local_path = relpath_to_abspath[relpath]
- stat = local_path.stat()
- if scheduler_instance.last_uploaded.get(
- local_path
- ) is None or scheduler_instance.last_uploaded[
- local_path] != stat.st_mtime:
- partial_file = PartialFileIO(local_path, stat.st_size)
- prepared_repo_objects.append(
- (prefix + relpath, partial_file))
- files_to_track[local_path] = stat.st_mtime
- scheduler_instance._pending_tracker_updates = files_to_track
- if not prepared_repo_objects:
- logger.debug(
- 'No changed files to upload for scheduled commit.')
- return prepared_repo_objects
- try:
- api._prepare_upload_folder = types.MethodType(
- patched_prepare_upload_folder, api)
- yield
- finally:
- api._prepare_upload_folder = original_prepare
- class CommitScheduler:
- """
- A scheduler that automatically uploads a local folder to ModelScope Hub at
- specified intervals (e.g., every 5 minutes).
- It's recommended to use the scheduler as a context manager to ensure proper
- cleanup and final commit execution when your script completes. Alternatively,
- you can manually stop the scheduler using the `stop` method.
- Args:
- repo_id (`str`):
- The id of the repo to commit to.
- folder_path (`str` or `Path`):
- Local folder path that will be monitored and uploaded periodically.
- interval (`int` or `float`, *optional*):
- Time interval in minutes between each upload operation. Defaults to 5 minutes.
- path_in_repo (`str`, *optional*):
- Target directory path within the repository, such as `"models/"`.
- If not specified, files are uploaded to the repository root.
- repo_type (`str`, *optional*):
- Repository type for the target repo. Defaults to `model`.
- revision (`str`, *optional*):
- Target branch or revision for commits. Defaults to `master`.
- visibility (`str`, *optional*):
- The visibility of the repo,
- could be `public`, `private`, `internal`, default to `public`.
- token (`str`, *optional*):
- The token to use to commit to the repo. Defaults to the token saved on the machine.
- allow_patterns (`List[str]` or `str`, *optional*):
- File patterns to include in uploads. Only files matching these patterns will be uploaded.
- ignore_patterns (`List[str]` or `str`, *optional*):
- File patterns to exclude from uploads. Files matching these patterns will be skipped.
- hub_api (`HubApi`, *optional*):
- Custom [`HubApi`] instance for Hub operations. Allows for customized
- configurations like user agent or token settings.
- Example:
- ```py
- >>> from pathlib import Path
- >>> from modelscope.hub import CommitScheduler
- # Create scheduler with 10-minute intervals
- >>> data_file = Path("workspace/experiment.log")
- >>> scheduler = CommitScheduler(
- ... repo_id="my_experiments",
- ... repo_type="dataset",
- ... folder_path=data_file.parent,
- ... interval=10
- ... )
- >>> with data_file.open("a") as f:
- ... f.write("experiment started")
- # Later in the workflow...
- >>> with data_file.open("a") as f:
- ... f.write("experiment completed")
- ```
- Context manager usage:
- ```py
- >>> from pathlib import Path
- >>> from modelscope.hub import CommitScheduler
- >>> with CommitScheduler(
- ... repo_id="my_experiments",
- ... repo_type="dataset",
- ... folder_path="workspace",
- ... interval=10
- ... ) as scheduler:
- ... log_file = Path("workspace/progress.log")
- ... with log_file.open("a") as f:
- ... f.write("starting process")
- ... # ... perform work ...
- ... with log_file.open("a") as f:
- ... f.write("process finished")
- # Scheduler automatically stops and performs final upload
- ```
- """
- def __init__(
- self,
- *,
- repo_id: str,
- folder_path: Union[str, Path],
- interval: Union[int, float] = 5,
- path_in_repo: Optional[str] = None,
- repo_type: Optional[str] = None,
- revision: Optional[str] = DEFAULT_REPOSITORY_REVISION,
- visibility: Optional[str] = Visibility.PUBLIC,
- token: Optional[str] = None,
- allow_patterns: Optional[Union[List[str], str]] = None,
- ignore_patterns: Optional[Union[List[str], str]] = None,
- hub_api: Optional[HubApi] = None,
- ) -> None:
- self.api = hub_api or HubApi()
- self.folder_path = Path(folder_path).expanduser().resolve()
- if not self.folder_path.exists():
- raise ValueError(f'Folder path does not exist: {folder_path}')
- self.path_in_repo = path_in_repo or ''
- self.allow_patterns = allow_patterns
- if ignore_patterns is None:
- ignore_patterns = []
- elif isinstance(ignore_patterns, str):
- ignore_patterns = [ignore_patterns]
- self.ignore_patterns = ignore_patterns + IGNORE_GIT_FOLDER_PATTERNS
- self.repo_url = self.api.create_repo(
- repo_id=repo_id,
- token=token,
- repo_type=repo_type,
- visibility=visibility,
- exist_ok=True,
- create_default_config=False,
- )
- self.repo_id = repo_id
- self.repo_type = repo_type
- self.revision = revision
- self.token = token
- # Keep track of already uploaded files
- self.last_uploaded: Dict[Path, float] = {}
- if interval <= 0:
- raise ValueError(
- f'"interval" must be a positive integer, not "{interval}".')
- self.lock = Lock()
- self.interval = interval
- self.__stopped = False
- logger.info(
- f'Scheduled job to push {self.folder_path} to {self.repo_id} at an interval of {self.interval} minutes.'
- )
- self.executor = ThreadPoolExecutor(max_workers=1)
- self._scheduler_thread = Thread(
- target=self._run_scheduler, daemon=True)
- self._scheduler_thread.start()
- atexit.register(self.commit_scheduled_changes)
- def stop(self) -> None:
- """Stop the scheduler."""
- self.__stopped = True
- def __enter__(self) -> 'CommitScheduler':
- return self
- def __exit__(self, exc_type, exc_value, traceback) -> None:
- self.trigger().result()
- self.stop()
- return
- def _run_scheduler(self) -> None:
- while not self.__stopped:
- self.last_future = self.trigger()
- time.sleep(self.interval * 60)
- def trigger(self) -> Future:
- """Trigger a background commit and return a future."""
- return self.executor.submit(self._commit_scheduled_changes)
- def _commit_scheduled_changes(self) -> Optional[CommitInfo]:
- if self.__stopped:
- return None
- logger.info('(Background) scheduled commit triggered.')
- try:
- value = self.commit_scheduled_changes()
- return value
- except Exception as e:
- logger.error(f'Error while pushing to Hub: {e}')
- raise
- def commit_scheduled_changes(self) -> Optional[CommitInfo]:
- """Push folder to the Hub and return commit info if changes are found."""
- try:
- self._pending_tracker_updates = {}
- with patch_upload_folder_for_scheduler(self):
- commit_info = self.api.upload_folder(
- repo_id=self.repo_id,
- folder_path=self.folder_path,
- path_in_repo=self.path_in_repo,
- commit_message='Scheduled Commit',
- token=self.token,
- repo_type=self.repo_type,
- allow_patterns=self.allow_patterns,
- ignore_patterns=self.ignore_patterns,
- revision=self.revision,
- )
- if commit_info is None:
- logger.debug(
- 'No changed files to upload for scheduled commit.')
- return None
- with self.lock:
- if hasattr(self, '_pending_tracker_updates'):
- self.last_uploaded.update(self._pending_tracker_updates)
- logger.debug(
- f'Updated modification tracker for {len(self._pending_tracker_updates)} files.'
- )
- del self._pending_tracker_updates
- return commit_info
- except Exception as e:
- # Treat "No files to upload" as a normal ― no-change ― situation instead of an error.
- if 'No files to upload' in str(e):
- logger.debug(
- 'No changed files to upload for scheduled commit.')
- return None
- if hasattr(self, '_pending_tracker_updates'):
- del self._pending_tracker_updates
- logger.error(f'Error during scheduled commit: {e}')
- raise
- class PartialFileIO(BytesIO):
- """A file-like object that reads only the first part of a file."""
- def __init__(self, file_path: Union[str, Path], size_limit: int) -> None:
- self._file_path = Path(file_path)
- self._file = None
- self._size_limit = size_limit
- self.open()
- def open(self) -> None:
- """Open the file and initialize size limit."""
- if self._file is not None:
- return
- try:
- self._file = self._file_path.open('rb')
- self._size_limit = min(
- self._size_limit or float('inf'),
- os.fstat(self._file.fileno()).st_size)
- except OSError as e:
- logger.error(f'Failed to open file {self._file_path}: {e}')
- raise
- def close(self) -> None:
- """Close the file if it's open."""
- if self._file is not None:
- self._file.close()
- self._file = None
- def __del__(self) -> None:
- self.close()
- return super().__del__()
- def __repr__(self) -> str:
- return f'<PartialFileIO file_path={self._file_path} size_limit={self._size_limit}>'
- def __len__(self) -> int:
- return self._size_limit
- def __getattribute__(self, name: str):
- if name.startswith('_') or name in {
- 'read', 'tell', 'seek', 'close', 'open'
- }: # only 5 public methods supported
- return super().__getattribute__(name)
- raise NotImplementedError(f"PartialFileIO does not support '{name}'.")
- def tell(self) -> int:
- return self._file.tell()
- def seek(self, __offset: int, __whence: int = SEEK_SET) -> int:
- """Seek to a position in the file, but never beyond size_limit."""
- if __whence == SEEK_END:
- __offset = len(self) + __offset
- __whence = SEEK_SET
- pos = self._file.seek(__offset, __whence)
- if pos > self._size_limit:
- return self._file.seek(self._size_limit)
- return pos
- def read(self, __size: Optional[int] = -1) -> bytes:
- """Read at most _size bytes from the current position."""
- current = self.tell()
- if __size is None or __size < 0:
- # Read until file limit
- truncated_size = self._size_limit - current
- else:
- # Read until file limit or __size
- truncated_size = min(__size, self._size_limit - current)
- return self._file.read(truncated_size)
|