dir_watcher.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404
  1. import abc
  2. import fnmatch
  3. import glob
  4. import logging
  5. import os
  6. import queue
  7. import time
  8. from typing import TYPE_CHECKING, Any, Mapping, MutableMapping, MutableSet, Optional
  9. from wandb import util
  10. from wandb.sdk.lib.filesystem import GlobStr
  11. from wandb.sdk.lib.paths import LogicalPath
  12. if TYPE_CHECKING:
  13. import wandb.vendor.watchdog_0_9_0.observers.api as wd_api
  14. import wandb.vendor.watchdog_0_9_0.observers.polling as wd_polling
  15. import wandb.vendor.watchdog_0_9_0.watchdog.events as wd_events
  16. from wandb.sdk.internal.file_pusher import FilePusher
  17. from wandb.sdk.internal.settings_static import SettingsStatic
  18. from wandb.sdk.lib.filesystem import PolicyName
  19. else:
  20. wd_polling = util.vendor_import("wandb_watchdog.observers.polling")
  21. wd_events = util.vendor_import("wandb_watchdog.events")
  22. PathStr = str # TODO(spencerpearson): would be nice to use Path here
  23. logger = logging.getLogger(__name__)
  24. class FileEventHandler(abc.ABC):
  25. def __init__(
  26. self,
  27. file_path: PathStr,
  28. save_name: LogicalPath,
  29. file_pusher: "FilePusher",
  30. *args: Any,
  31. **kwargs: Any,
  32. ) -> None:
  33. self.file_path = file_path
  34. # Convert windows paths to unix paths
  35. self.save_name = LogicalPath(save_name)
  36. self._file_pusher = file_pusher
  37. self._last_sync: Optional[float] = None
  38. @property
  39. @abc.abstractmethod
  40. def policy(self) -> "PolicyName":
  41. raise NotImplementedError
  42. @abc.abstractmethod
  43. def on_modified(self, force: bool = False) -> None:
  44. raise NotImplementedError
  45. @abc.abstractmethod
  46. def finish(self) -> None:
  47. raise NotImplementedError
  48. def on_renamed(self, new_path: PathStr, new_name: LogicalPath) -> None:
  49. self.file_path = new_path
  50. self.save_name = new_name
  51. self.on_modified()
  52. class PolicyNow(FileEventHandler):
  53. """This policy only uploads files now."""
  54. def on_modified(self, force: bool = False) -> None:
  55. # only upload if we've never uploaded or when .save is called
  56. if self._last_sync is None or force:
  57. self._file_pusher.file_changed(self.save_name, self.file_path)
  58. self._last_sync = os.path.getmtime(self.file_path)
  59. def finish(self) -> None:
  60. pass
  61. @property
  62. def policy(self) -> "PolicyName":
  63. return "now"
  64. class PolicyEnd(FileEventHandler):
  65. """This policy only updates at the end of the run."""
  66. def on_modified(self, force: bool = False) -> None:
  67. pass
  68. # TODO: make sure we call this
  69. def finish(self) -> None:
  70. # We use copy=False to avoid possibly expensive copies, and because
  71. # user files shouldn't still be changing at the end of the run.
  72. self._last_sync = os.path.getmtime(self.file_path)
  73. self._file_pusher.file_changed(self.save_name, self.file_path, copy=False)
  74. @property
  75. def policy(self) -> "PolicyName":
  76. return "end"
  77. class PolicyLive(FileEventHandler):
  78. """Event handler that uploads respecting throttling.
  79. Uploads files every RATE_LIMIT_SECONDS, which changes as the size increases to deal
  80. with throttling.
  81. """
  82. RATE_LIMIT_SECONDS = 15
  83. unit_dict = dict(util.POW_10_BYTES)
  84. # Wait to upload until size has increased 20% from last upload
  85. RATE_LIMIT_SIZE_INCREASE = 1.2
  86. def __init__(
  87. self,
  88. file_path: PathStr,
  89. save_name: LogicalPath,
  90. file_pusher: "FilePusher",
  91. settings: Optional["SettingsStatic"] = None,
  92. *args: Any,
  93. **kwargs: Any,
  94. ) -> None:
  95. super().__init__(file_path, save_name, file_pusher, *args, **kwargs)
  96. self._last_uploaded_time: Optional[float] = None
  97. self._last_uploaded_size: int = 0
  98. if settings is not None:
  99. if settings.x_live_policy_rate_limit is not None:
  100. self.RATE_LIMIT_SECONDS = settings.x_live_policy_rate_limit
  101. self._min_wait_time: Optional[float] = settings.x_live_policy_wait_time
  102. else:
  103. self._min_wait_time = None
  104. @property
  105. def current_size(self) -> int:
  106. return os.path.getsize(self.file_path)
  107. @classmethod
  108. def min_wait_for_size(cls, size: int) -> float:
  109. if size < 10 * cls.unit_dict["MB"]:
  110. return 60
  111. elif size < 100 * cls.unit_dict["MB"]:
  112. return 5 * 60
  113. elif size < cls.unit_dict["GB"]:
  114. return 10 * 60
  115. else:
  116. return 20 * 60
  117. def should_update(self) -> bool:
  118. if self._last_uploaded_time is not None:
  119. # Check rate limit by time elapsed
  120. time_elapsed = time.time() - self._last_uploaded_time
  121. # if more than 15 seconds has passed potentially upload it
  122. if time_elapsed < self.RATE_LIMIT_SECONDS:
  123. return False
  124. # Check rate limit by size increase
  125. if float(self._last_uploaded_size) > 0:
  126. size_increase = self.current_size / float(self._last_uploaded_size)
  127. if size_increase < self.RATE_LIMIT_SIZE_INCREASE:
  128. return False
  129. return time_elapsed > (
  130. self._min_wait_time or self.min_wait_for_size(self.current_size)
  131. )
  132. # if the file has never been uploaded, we'll upload it
  133. return True
  134. def on_modified(self, force: bool = False) -> None:
  135. if self.current_size == 0:
  136. return
  137. if self._last_sync == os.path.getmtime(self.file_path):
  138. return
  139. if force or self.should_update():
  140. self.save_file()
  141. def save_file(self) -> None:
  142. self._last_sync = os.path.getmtime(self.file_path)
  143. self._last_uploaded_time = time.time()
  144. self._last_uploaded_size = self.current_size
  145. self._file_pusher.file_changed(self.save_name, self.file_path)
  146. def finish(self) -> None:
  147. self.on_modified(force=True)
  148. @property
  149. def policy(self) -> "PolicyName":
  150. return "live"
  151. class DirWatcher:
  152. def __init__(
  153. self,
  154. settings: "SettingsStatic",
  155. file_pusher: "FilePusher",
  156. file_dir: Optional[PathStr] = None,
  157. ) -> None:
  158. self._file_count = 0
  159. self._dir = file_dir or settings.files_dir
  160. self._settings = settings
  161. self._savename_file_policies: MutableMapping[LogicalPath, PolicyName] = {}
  162. self._user_file_policies: Mapping[PolicyName, MutableSet[GlobStr]] = {
  163. "end": set(),
  164. "live": set(),
  165. "now": set(),
  166. }
  167. self._file_pusher = file_pusher
  168. self._file_event_handlers: MutableMapping[LogicalPath, FileEventHandler] = {}
  169. self._file_observer = wd_polling.PollingObserver()
  170. self._file_observer.schedule(
  171. self._per_file_event_handler(), self._dir, recursive=True
  172. )
  173. self._file_observer.start()
  174. logger.info("watching files in: %s", settings.files_dir)
  175. @property
  176. def emitter(self) -> Optional["wd_api.EventEmitter"]:
  177. try:
  178. return next(iter(self._file_observer.emitters))
  179. except StopIteration:
  180. return None
  181. def update_policy(self, path: GlobStr, policy: "PolicyName") -> None:
  182. # When we're dealing with one of our own media files, there's no need
  183. # to store the policy in memory. _get_file_event_handler will always
  184. # return PolicyNow. Using the path makes syncing historic runs much
  185. # faster if the name happens to include glob escapable characters. In
  186. # the future we may add a flag to "files" records that indicates it's
  187. # policy is not dynamic and doesn't need to be stored / checked.
  188. save_name = LogicalPath(
  189. os.path.relpath(os.path.join(self._dir, path), self._dir)
  190. )
  191. if save_name.startswith("media/"):
  192. pass
  193. elif path == glob.escape(path):
  194. self._savename_file_policies[save_name] = policy
  195. else:
  196. self._user_file_policies[policy].add(path)
  197. for src_path in glob.glob(os.path.join(self._dir, path)):
  198. save_name = LogicalPath(os.path.relpath(src_path, self._dir))
  199. feh = self._get_file_event_handler(src_path, save_name)
  200. # handle the case where the policy changed
  201. if feh.policy != policy:
  202. try:
  203. del self._file_event_handlers[save_name]
  204. except KeyError:
  205. # TODO: probably should do locking, but this handles moved files for now
  206. pass
  207. feh = self._get_file_event_handler(src_path, save_name)
  208. feh.on_modified(force=True)
  209. def _per_file_event_handler(self) -> "wd_events.FileSystemEventHandler":
  210. """Create a Watchdog file event handler that does different things for every file."""
  211. file_event_handler = wd_events.PatternMatchingEventHandler()
  212. file_event_handler.on_created = self._on_file_created
  213. file_event_handler.on_modified = self._on_file_modified
  214. file_event_handler.on_moved = self._on_file_moved
  215. file_event_handler._patterns = [os.path.join(self._dir, os.path.normpath("*"))]
  216. # Ignore hidden files/folders
  217. # TODO: what other files should we skip?
  218. file_event_handler._ignore_patterns = [
  219. "*.tmp",
  220. "*.wandb",
  221. "wandb-summary.json",
  222. os.path.join(self._dir, ".*"),
  223. os.path.join(self._dir, "*/.*"),
  224. ]
  225. for glb in self._settings.ignore_globs:
  226. file_event_handler._ignore_patterns.append(os.path.join(self._dir, glb))
  227. return file_event_handler
  228. def _on_file_created(self, event: "wd_events.FileCreatedEvent") -> None:
  229. logger.info("file/dir created: %s", event.src_path)
  230. if os.path.isdir(event.src_path):
  231. return None
  232. self._file_count += 1
  233. # We do the directory scan less often as it grows
  234. if self._file_count % 100 == 0:
  235. emitter = self.emitter
  236. if emitter:
  237. emitter._timeout = int(self._file_count / 100) + 1
  238. save_name = LogicalPath(os.path.relpath(event.src_path, self._dir))
  239. self._get_file_event_handler(event.src_path, save_name).on_modified()
  240. # TODO(spencerpearson): this pattern repeats so many times we should have a method/function for it
  241. # def _save_name(self, path: PathStr) -> LogicalPath:
  242. # return LogicalPath(os.path.relpath(path, self._dir))
  243. def _on_file_modified(self, event: "wd_events.FileModifiedEvent") -> None:
  244. logger.info(f"file/dir modified: {event.src_path}")
  245. if os.path.isdir(event.src_path):
  246. return None
  247. save_name = LogicalPath(os.path.relpath(event.src_path, self._dir))
  248. self._get_file_event_handler(event.src_path, save_name).on_modified()
  249. def _on_file_moved(self, event: "wd_events.FileMovedEvent") -> None:
  250. # TODO: test me...
  251. logger.info(f"file/dir moved: {event.src_path} -> {event.dest_path}")
  252. if os.path.isdir(event.dest_path):
  253. return None
  254. old_save_name = LogicalPath(os.path.relpath(event.src_path, self._dir))
  255. new_save_name = LogicalPath(os.path.relpath(event.dest_path, self._dir))
  256. # We have to move the existing file handler to the new name
  257. handler = self._get_file_event_handler(event.src_path, old_save_name)
  258. self._file_event_handlers[new_save_name] = handler
  259. del self._file_event_handlers[old_save_name]
  260. handler.on_renamed(event.dest_path, new_save_name)
  261. def _get_file_event_handler(
  262. self, file_path: PathStr, save_name: LogicalPath
  263. ) -> FileEventHandler:
  264. """Get or create an event handler for a particular file.
  265. file_path: the file's actual path
  266. save_name: its path relative to the run directory (aka the watch directory)
  267. """
  268. # Always return PolicyNow for any of our media files.
  269. if save_name.startswith("media/"):
  270. return PolicyNow(file_path, save_name, self._file_pusher, self._settings)
  271. if save_name not in self._file_event_handlers:
  272. # TODO: we can use PolicyIgnore if there are files we never want to sync
  273. if "tfevents" in save_name or "graph.pbtxt" in save_name:
  274. self._file_event_handlers[save_name] = PolicyLive(
  275. file_path, save_name, self._file_pusher, self._settings
  276. )
  277. elif save_name in self._savename_file_policies:
  278. policy_name = self._savename_file_policies[save_name]
  279. make_handler = (
  280. PolicyLive
  281. if policy_name == "live"
  282. else PolicyNow
  283. if policy_name == "now"
  284. else PolicyEnd
  285. )
  286. self._file_event_handlers[save_name] = make_handler(
  287. file_path, save_name, self._file_pusher, self._settings
  288. )
  289. else:
  290. make_handler = PolicyEnd
  291. for policy, globs in self._user_file_policies.items():
  292. if policy == "end":
  293. continue
  294. # Convert set to list to avoid RuntimeError's
  295. # TODO: we may need to add locks
  296. for g in list(globs):
  297. paths = glob.glob(os.path.join(self._dir, g))
  298. if any(save_name in p for p in paths):
  299. if policy == "live":
  300. make_handler = PolicyLive
  301. elif policy == "now":
  302. make_handler = PolicyNow
  303. self._file_event_handlers[save_name] = make_handler(
  304. file_path, save_name, self._file_pusher, self._settings
  305. )
  306. return self._file_event_handlers[save_name]
  307. def finish(self) -> None:
  308. logger.info("shutting down directory watcher")
  309. try:
  310. # avoid hanging if we crashed before the observer was started
  311. if self._file_observer.is_alive():
  312. # rather unfortunately we need to manually do a final scan of the dir
  313. # with `queue_events`, then iterate through all events before stopping
  314. # the observer to catch all files written. First we need to prevent the
  315. # existing thread from consuming our final events, then we process them
  316. self._file_observer._timeout = 0
  317. self._file_observer._stopped_event.set()
  318. self._file_observer.join()
  319. self.emitter.queue_events(0) # type: ignore[union-attr]
  320. while True:
  321. try:
  322. self._file_observer.dispatch_events(
  323. self._file_observer.event_queue, 0
  324. )
  325. except queue.Empty:
  326. break
  327. # Calling stop unschedules any inflight events so we handled them above
  328. self._file_observer.stop()
  329. # TODO: py2 TypeError: PyCObject_AsVoidPtr called with null pointer
  330. except TypeError:
  331. pass
  332. # TODO: py3 SystemError: <built-in function stop> returned an error
  333. except SystemError:
  334. pass
  335. # Ensure we've at least noticed every file in the run directory. Sometimes
  336. # we miss things because asynchronously watching filesystems isn't reliable.
  337. logger.info("scan: %s", self._dir)
  338. for dirpath, _, filenames in os.walk(self._dir):
  339. for fname in filenames:
  340. file_path = os.path.join(dirpath, fname)
  341. save_name = LogicalPath(os.path.relpath(file_path, self._dir))
  342. ignored = False
  343. for glb in self._settings.ignore_globs:
  344. if len(fnmatch.filter([save_name], glb)) > 0:
  345. ignored = True
  346. logger.info("ignored: %s matching glob %s", save_name, glb)
  347. break
  348. if ignored:
  349. continue
  350. logger.info("scan save: %s %s", file_path, save_name)
  351. self._get_file_event_handler(file_path, save_name).finish()