commit_scheduler.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381
  1. # Copyright (c) Alibaba, Inc. and its affiliates.
  2. # Copyright 2022-present, the HuggingFace Inc. team.
  3. import atexit
  4. import contextlib
  5. import os
  6. import time
  7. import types
  8. from concurrent.futures import Future, ThreadPoolExecutor
  9. from io import SEEK_END, SEEK_SET, BytesIO
  10. from pathlib import Path
  11. from threading import Lock, Thread
  12. from typing import Dict, List, Optional, Union
  13. from modelscope.hub.api import HubApi
  14. from modelscope.hub.constants import Visibility
  15. from modelscope.utils.constant import DEFAULT_REPOSITORY_REVISION
  16. from modelscope.utils.logger import get_logger
  17. from modelscope.utils.repo_utils import CommitInfo, RepoUtils
  18. logger = get_logger()
  19. IGNORE_GIT_FOLDER_PATTERNS = ['.git', '.git/*', '*/.git', '**/.git/**']
  20. @contextlib.contextmanager
  21. def patch_upload_folder_for_scheduler(scheduler_instance):
  22. """Patch upload_folder for CommitScheduler"""
  23. api = scheduler_instance.api
  24. original_prepare = api._prepare_upload_folder
  25. def patched_prepare_upload_folder(
  26. api_self,
  27. folder_path_or_files: Union[str, Path, List[str], List[Path]],
  28. path_in_repo: str,
  29. allow_patterns: Optional[Union[List[str], str]] = None,
  30. ignore_patterns: Optional[Union[List[str], str]] = None,
  31. ) -> List[Union[tuple, list]]:
  32. """
  33. Patched version that supports incremental updates for CommitScheduler.
  34. """
  35. with scheduler_instance.lock:
  36. if isinstance(folder_path_or_files, list):
  37. raise ValueError(
  38. 'Uploading multiple files or folders is not supported for scheduled commit.'
  39. )
  40. elif os.path.isfile(folder_path_or_files):
  41. raise ValueError(
  42. 'Uploading file is not supported for scheduled commit.')
  43. else:
  44. folder_path = Path(folder_path_or_files).expanduser().resolve()
  45. logger.debug('Listing files to upload for scheduled commit.')
  46. relpath_to_abspath = {
  47. path.relative_to(folder_path).as_posix(): path
  48. for path in sorted(folder_path.glob('**/*')) if path.is_file()
  49. }
  50. prefix = f"{path_in_repo.strip('/')}/" if path_in_repo else ''
  51. prepared_repo_objects = []
  52. files_to_track = {}
  53. for relpath in RepoUtils.filter_repo_objects(
  54. relpath_to_abspath.keys(),
  55. allow_patterns=allow_patterns,
  56. ignore_patterns=ignore_patterns):
  57. local_path = relpath_to_abspath[relpath]
  58. stat = local_path.stat()
  59. if scheduler_instance.last_uploaded.get(
  60. local_path
  61. ) is None or scheduler_instance.last_uploaded[
  62. local_path] != stat.st_mtime:
  63. partial_file = PartialFileIO(local_path, stat.st_size)
  64. prepared_repo_objects.append(
  65. (prefix + relpath, partial_file))
  66. files_to_track[local_path] = stat.st_mtime
  67. scheduler_instance._pending_tracker_updates = files_to_track
  68. if not prepared_repo_objects:
  69. logger.debug(
  70. 'No changed files to upload for scheduled commit.')
  71. return prepared_repo_objects
  72. try:
  73. api._prepare_upload_folder = types.MethodType(
  74. patched_prepare_upload_folder, api)
  75. yield
  76. finally:
  77. api._prepare_upload_folder = original_prepare
  78. class CommitScheduler:
  79. """
  80. A scheduler that automatically uploads a local folder to ModelScope Hub at
  81. specified intervals (e.g., every 5 minutes).
  82. It's recommended to use the scheduler as a context manager to ensure proper
  83. cleanup and final commit execution when your script completes. Alternatively,
  84. you can manually stop the scheduler using the `stop` method.
  85. Args:
  86. repo_id (`str`):
  87. The id of the repo to commit to.
  88. folder_path (`str` or `Path`):
  89. Local folder path that will be monitored and uploaded periodically.
  90. interval (`int` or `float`, *optional*):
  91. Time interval in minutes between each upload operation. Defaults to 5 minutes.
  92. path_in_repo (`str`, *optional*):
  93. Target directory path within the repository, such as `"models/"`.
  94. If not specified, files are uploaded to the repository root.
  95. repo_type (`str`, *optional*):
  96. Repository type for the target repo. Defaults to `model`.
  97. revision (`str`, *optional*):
  98. Target branch or revision for commits. Defaults to `master`.
  99. visibility (`str`, *optional*):
  100. The visibility of the repo,
  101. could be `public`, `private`, `internal`, default to `public`.
  102. token (`str`, *optional*):
  103. The token to use to commit to the repo. Defaults to the token saved on the machine.
  104. allow_patterns (`List[str]` or `str`, *optional*):
  105. File patterns to include in uploads. Only files matching these patterns will be uploaded.
  106. ignore_patterns (`List[str]` or `str`, *optional*):
  107. File patterns to exclude from uploads. Files matching these patterns will be skipped.
  108. hub_api (`HubApi`, *optional*):
  109. Custom [`HubApi`] instance for Hub operations. Allows for customized
  110. configurations like user agent or token settings.
  111. Example:
  112. ```py
  113. >>> from pathlib import Path
  114. >>> from modelscope.hub import CommitScheduler
  115. # Create scheduler with 10-minute intervals
  116. >>> data_file = Path("workspace/experiment.log")
  117. >>> scheduler = CommitScheduler(
  118. ... repo_id="my_experiments",
  119. ... repo_type="dataset",
  120. ... folder_path=data_file.parent,
  121. ... interval=10
  122. ... )
  123. >>> with data_file.open("a") as f:
  124. ... f.write("experiment started")
  125. # Later in the workflow...
  126. >>> with data_file.open("a") as f:
  127. ... f.write("experiment completed")
  128. ```
  129. Context manager usage:
  130. ```py
  131. >>> from pathlib import Path
  132. >>> from modelscope.hub import CommitScheduler
  133. >>> with CommitScheduler(
  134. ... repo_id="my_experiments",
  135. ... repo_type="dataset",
  136. ... folder_path="workspace",
  137. ... interval=10
  138. ... ) as scheduler:
  139. ... log_file = Path("workspace/progress.log")
  140. ... with log_file.open("a") as f:
  141. ... f.write("starting process")
  142. ... # ... perform work ...
  143. ... with log_file.open("a") as f:
  144. ... f.write("process finished")
  145. # Scheduler automatically stops and performs final upload
  146. ```
  147. """
  148. def __init__(
  149. self,
  150. *,
  151. repo_id: str,
  152. folder_path: Union[str, Path],
  153. interval: Union[int, float] = 5,
  154. path_in_repo: Optional[str] = None,
  155. repo_type: Optional[str] = None,
  156. revision: Optional[str] = DEFAULT_REPOSITORY_REVISION,
  157. visibility: Optional[str] = Visibility.PUBLIC,
  158. token: Optional[str] = None,
  159. allow_patterns: Optional[Union[List[str], str]] = None,
  160. ignore_patterns: Optional[Union[List[str], str]] = None,
  161. hub_api: Optional[HubApi] = None,
  162. ) -> None:
  163. self.api = hub_api or HubApi()
  164. self.folder_path = Path(folder_path).expanduser().resolve()
  165. if not self.folder_path.exists():
  166. raise ValueError(f'Folder path does not exist: {folder_path}')
  167. self.path_in_repo = path_in_repo or ''
  168. self.allow_patterns = allow_patterns
  169. if ignore_patterns is None:
  170. ignore_patterns = []
  171. elif isinstance(ignore_patterns, str):
  172. ignore_patterns = [ignore_patterns]
  173. self.ignore_patterns = ignore_patterns + IGNORE_GIT_FOLDER_PATTERNS
  174. self.repo_url = self.api.create_repo(
  175. repo_id=repo_id,
  176. token=token,
  177. repo_type=repo_type,
  178. visibility=visibility,
  179. exist_ok=True,
  180. create_default_config=False,
  181. )
  182. self.repo_id = repo_id
  183. self.repo_type = repo_type
  184. self.revision = revision
  185. self.token = token
  186. # Keep track of already uploaded files
  187. self.last_uploaded: Dict[Path, float] = {}
  188. if interval <= 0:
  189. raise ValueError(
  190. f'"interval" must be a positive integer, not "{interval}".')
  191. self.lock = Lock()
  192. self.interval = interval
  193. self.__stopped = False
  194. logger.info(
  195. f'Scheduled job to push {self.folder_path} to {self.repo_id} at an interval of {self.interval} minutes.'
  196. )
  197. self.executor = ThreadPoolExecutor(max_workers=1)
  198. self._scheduler_thread = Thread(
  199. target=self._run_scheduler, daemon=True)
  200. self._scheduler_thread.start()
  201. atexit.register(self.commit_scheduled_changes)
  202. def stop(self) -> None:
  203. """Stop the scheduler."""
  204. self.__stopped = True
  205. def __enter__(self) -> 'CommitScheduler':
  206. return self
  207. def __exit__(self, exc_type, exc_value, traceback) -> None:
  208. self.trigger().result()
  209. self.stop()
  210. return
  211. def _run_scheduler(self) -> None:
  212. while not self.__stopped:
  213. self.last_future = self.trigger()
  214. time.sleep(self.interval * 60)
  215. def trigger(self) -> Future:
  216. """Trigger a background commit and return a future."""
  217. return self.executor.submit(self._commit_scheduled_changes)
  218. def _commit_scheduled_changes(self) -> Optional[CommitInfo]:
  219. if self.__stopped:
  220. return None
  221. logger.info('(Background) scheduled commit triggered.')
  222. try:
  223. value = self.commit_scheduled_changes()
  224. return value
  225. except Exception as e:
  226. logger.error(f'Error while pushing to Hub: {e}')
  227. raise
  228. def commit_scheduled_changes(self) -> Optional[CommitInfo]:
  229. """Push folder to the Hub and return commit info if changes are found."""
  230. try:
  231. self._pending_tracker_updates = {}
  232. with patch_upload_folder_for_scheduler(self):
  233. commit_info = self.api.upload_folder(
  234. repo_id=self.repo_id,
  235. folder_path=self.folder_path,
  236. path_in_repo=self.path_in_repo,
  237. commit_message='Scheduled Commit',
  238. token=self.token,
  239. repo_type=self.repo_type,
  240. allow_patterns=self.allow_patterns,
  241. ignore_patterns=self.ignore_patterns,
  242. revision=self.revision,
  243. )
  244. if commit_info is None:
  245. logger.debug(
  246. 'No changed files to upload for scheduled commit.')
  247. return None
  248. with self.lock:
  249. if hasattr(self, '_pending_tracker_updates'):
  250. self.last_uploaded.update(self._pending_tracker_updates)
  251. logger.debug(
  252. f'Updated modification tracker for {len(self._pending_tracker_updates)} files.'
  253. )
  254. del self._pending_tracker_updates
  255. return commit_info
  256. except Exception as e:
  257. # Treat "No files to upload" as a normal ― no-change ― situation instead of an error.
  258. if 'No files to upload' in str(e):
  259. logger.debug(
  260. 'No changed files to upload for scheduled commit.')
  261. return None
  262. if hasattr(self, '_pending_tracker_updates'):
  263. del self._pending_tracker_updates
  264. logger.error(f'Error during scheduled commit: {e}')
  265. raise
  266. class PartialFileIO(BytesIO):
  267. """A file-like object that reads only the first part of a file."""
  268. def __init__(self, file_path: Union[str, Path], size_limit: int) -> None:
  269. self._file_path = Path(file_path)
  270. self._file = None
  271. self._size_limit = size_limit
  272. self.open()
  273. def open(self) -> None:
  274. """Open the file and initialize size limit."""
  275. if self._file is not None:
  276. return
  277. try:
  278. self._file = self._file_path.open('rb')
  279. self._size_limit = min(
  280. self._size_limit or float('inf'),
  281. os.fstat(self._file.fileno()).st_size)
  282. except OSError as e:
  283. logger.error(f'Failed to open file {self._file_path}: {e}')
  284. raise
  285. def close(self) -> None:
  286. """Close the file if it's open."""
  287. if self._file is not None:
  288. self._file.close()
  289. self._file = None
  290. def __del__(self) -> None:
  291. self.close()
  292. return super().__del__()
  293. def __repr__(self) -> str:
  294. return f'<PartialFileIO file_path={self._file_path} size_limit={self._size_limit}>'
  295. def __len__(self) -> int:
  296. return self._size_limit
  297. def __getattribute__(self, name: str):
  298. if name.startswith('_') or name in {
  299. 'read', 'tell', 'seek', 'close', 'open'
  300. }: # only 5 public methods supported
  301. return super().__getattribute__(name)
  302. raise NotImplementedError(f"PartialFileIO does not support '{name}'.")
  303. def tell(self) -> int:
  304. return self._file.tell()
  305. def seek(self, __offset: int, __whence: int = SEEK_SET) -> int:
  306. """Seek to a position in the file, but never beyond size_limit."""
  307. if __whence == SEEK_END:
  308. __offset = len(self) + __offset
  309. __whence = SEEK_SET
  310. pos = self._file.seek(__offset, __whence)
  311. if pos > self._size_limit:
  312. return self._file.seek(self._size_limit)
  313. return pos
  314. def read(self, __size: Optional[int] = -1) -> bytes:
  315. """Read at most _size bytes from the current position."""
  316. current = self.tell()
  317. if __size is None or __size < 0:
  318. # Read until file limit
  319. truncated_size = self._size_limit - current
  320. else:
  321. # Read until file limit or __size
  322. truncated_size = min(__size, self._size_limit - current)
  323. return self._file.read(truncated_size)