cached.py 35 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003
  1. from __future__ import annotations
  2. import inspect
  3. import logging
  4. import os
  5. import tempfile
  6. import time
  7. import weakref
  8. from collections.abc import Callable
  9. from shutil import rmtree
  10. from typing import TYPE_CHECKING, Any, ClassVar
  11. from fsspec import filesystem
  12. from fsspec.callbacks import DEFAULT_CALLBACK
  13. from fsspec.compression import compr
  14. from fsspec.core import BaseCache, MMapCache
  15. from fsspec.exceptions import BlocksizeMismatchError
  16. from fsspec.implementations.cache_mapper import create_cache_mapper
  17. from fsspec.implementations.cache_metadata import CacheMetadata
  18. from fsspec.implementations.chained import ChainedFileSystem
  19. from fsspec.implementations.local import LocalFileSystem
  20. from fsspec.spec import AbstractBufferedFile
  21. from fsspec.transaction import Transaction
  22. from fsspec.utils import infer_compression
  23. if TYPE_CHECKING:
  24. from fsspec.implementations.cache_mapper import AbstractCacheMapper
  25. logger = logging.getLogger("fsspec.cached")
  26. class WriteCachedTransaction(Transaction):
  27. def complete(self, commit=True):
  28. rpaths = [f.path for f in self.files]
  29. lpaths = [f.fn for f in self.files]
  30. if commit:
  31. self.fs.put(lpaths, rpaths)
  32. self.files.clear()
  33. self.fs._intrans = False
  34. self.fs._transaction = None
  35. self.fs = None # break cycle
  36. class CachingFileSystem(ChainedFileSystem):
  37. """Locally caching filesystem, layer over any other FS
  38. This class implements chunk-wise local storage of remote files, for quick
  39. access after the initial download. The files are stored in a given
  40. directory with hashes of URLs for the filenames. If no directory is given,
  41. a temporary one is used, which should be cleaned up by the OS after the
  42. process ends. The files themselves are sparse (as implemented in
  43. :class:`~fsspec.caching.MMapCache`), so only the data which is accessed
  44. takes up space.
  45. Restrictions:
  46. - the block-size must be the same for each access of a given file, unless
  47. all blocks of the file have already been read
  48. - caching can only be applied to file-systems which produce files
  49. derived from fsspec.spec.AbstractBufferedFile ; LocalFileSystem is also
  50. allowed, for testing
  51. """
  52. protocol: ClassVar[str | tuple[str, ...]] = ("blockcache", "cached")
  53. _strip_tokenize_options = ("fo",)
  54. def __init__(
  55. self,
  56. target_protocol=None,
  57. cache_storage="TMP",
  58. cache_check=10,
  59. check_files=False,
  60. expiry_time=604800,
  61. target_options=None,
  62. fs=None,
  63. same_names: bool | None = None,
  64. compression=None,
  65. cache_mapper: AbstractCacheMapper | None = None,
  66. **kwargs,
  67. ):
  68. """
  69. Parameters
  70. ----------
  71. target_protocol: str (optional)
  72. Target filesystem protocol. Provide either this or ``fs``.
  73. cache_storage: str or list(str)
  74. Location to store files. If "TMP", this is a temporary directory,
  75. and will be cleaned up by the OS when this process ends (or later).
  76. If a list, each location will be tried in the order given, but
  77. only the last will be considered writable.
  78. cache_check: int
  79. Number of seconds between reload of cache metadata
  80. check_files: bool
  81. Whether to explicitly see if the UID of the remote file matches
  82. the stored one before using. Warning: some file systems such as
  83. HTTP cannot reliably give a unique hash of the contents of some
  84. path, so be sure to set this option to False.
  85. expiry_time: int
  86. The time in seconds after which a local copy is considered useless.
  87. Set to falsy to prevent expiry. The default is equivalent to one
  88. week.
  89. target_options: dict or None
  90. Passed to the instantiation of the FS, if fs is None.
  91. fs: filesystem instance
  92. The target filesystem to run against. Provide this or ``protocol``.
  93. same_names: bool (optional)
  94. By default, target URLs are hashed using a ``HashCacheMapper`` so
  95. that files from different backends with the same basename do not
  96. conflict. If this argument is ``true``, a ``BasenameCacheMapper``
  97. is used instead. Other cache mapper options are available by using
  98. the ``cache_mapper`` keyword argument. Only one of this and
  99. ``cache_mapper`` should be specified.
  100. compression: str (optional)
  101. To decompress on download. Can be 'infer' (guess from the URL name),
  102. one of the entries in ``fsspec.compression.compr``, or None for no
  103. decompression.
  104. cache_mapper: AbstractCacheMapper (optional)
  105. The object use to map from original filenames to cached filenames.
  106. Only one of this and ``same_names`` should be specified.
  107. """
  108. super().__init__(**kwargs)
  109. if fs is None and target_protocol is None:
  110. raise ValueError(
  111. "Please provide filesystem instance(fs) or target_protocol"
  112. )
  113. if not (fs is None) ^ (target_protocol is None):
  114. raise ValueError(
  115. "Both filesystems (fs) and target_protocol may not be both given."
  116. )
  117. if cache_storage == "TMP":
  118. tempdir = tempfile.mkdtemp()
  119. storage = [tempdir]
  120. weakref.finalize(self, self._remove_tempdir, tempdir)
  121. else:
  122. if isinstance(cache_storage, str):
  123. storage = [cache_storage]
  124. else:
  125. storage = cache_storage
  126. os.makedirs(storage[-1], exist_ok=True)
  127. self.storage = storage
  128. self.kwargs = target_options or {}
  129. self.cache_check = cache_check
  130. self.check_files = check_files
  131. self.expiry = expiry_time
  132. self.compression = compression
  133. # Size of cache in bytes. If None then the size is unknown and will be
  134. # recalculated the next time cache_size() is called. On writes to the
  135. # cache this is reset to None.
  136. self._cache_size = None
  137. if same_names is not None and cache_mapper is not None:
  138. raise ValueError(
  139. "Cannot specify both same_names and cache_mapper in "
  140. "CachingFileSystem.__init__"
  141. )
  142. if cache_mapper is not None:
  143. self._mapper = cache_mapper
  144. else:
  145. self._mapper = create_cache_mapper(
  146. same_names if same_names is not None else False
  147. )
  148. self.target_protocol = (
  149. target_protocol
  150. if isinstance(target_protocol, str)
  151. else (fs.protocol if isinstance(fs.protocol, str) else fs.protocol[0])
  152. )
  153. self._metadata = CacheMetadata(self.storage)
  154. self.load_cache()
  155. self.fs = fs if fs is not None else filesystem(target_protocol, **self.kwargs)
  156. def _strip_protocol(path):
  157. # acts as a method, since each instance has a difference target
  158. return self.fs._strip_protocol(type(self)._strip_protocol(path))
  159. self._strip_protocol: Callable = _strip_protocol
  160. @staticmethod
  161. def _remove_tempdir(tempdir):
  162. try:
  163. rmtree(tempdir)
  164. except Exception:
  165. pass
  166. def _mkcache(self):
  167. os.makedirs(self.storage[-1], exist_ok=True)
  168. def cache_size(self):
  169. """Return size of cache in bytes.
  170. If more than one cache directory is in use, only the size of the last
  171. one (the writable cache directory) is returned.
  172. """
  173. if self._cache_size is None:
  174. cache_dir = self.storage[-1]
  175. self._cache_size = filesystem("file").du(cache_dir, withdirs=True)
  176. return self._cache_size
  177. def load_cache(self):
  178. """Read set of stored blocks from file"""
  179. self._metadata.load()
  180. self._mkcache()
  181. self.last_cache = time.time()
  182. def save_cache(self):
  183. """Save set of stored blocks from file"""
  184. self._mkcache()
  185. self._metadata.save()
  186. self.last_cache = time.time()
  187. self._cache_size = None
  188. def _check_cache(self):
  189. """Reload caches if time elapsed or any disappeared"""
  190. self._mkcache()
  191. if not self.cache_check:
  192. # explicitly told not to bother checking
  193. return
  194. timecond = time.time() - self.last_cache > self.cache_check
  195. existcond = all(os.path.exists(storage) for storage in self.storage)
  196. if timecond or not existcond:
  197. self.load_cache()
  198. def _check_file(self, path):
  199. """Is path in cache and still valid"""
  200. path = self._strip_protocol(path)
  201. self._check_cache()
  202. return self._metadata.check_file(path, self)
  203. def clear_cache(self):
  204. """Remove all files and metadata from the cache
  205. In the case of multiple cache locations, this clears only the last one,
  206. which is assumed to be the read/write one.
  207. """
  208. rmtree(self.storage[-1])
  209. self.load_cache()
  210. self._cache_size = None
  211. def clear_expired_cache(self, expiry_time=None):
  212. """Remove all expired files and metadata from the cache
  213. In the case of multiple cache locations, this clears only the last one,
  214. which is assumed to be the read/write one.
  215. Parameters
  216. ----------
  217. expiry_time: int
  218. The time in seconds after which a local copy is considered useless.
  219. If not defined the default is equivalent to the attribute from the
  220. file caching instantiation.
  221. """
  222. if not expiry_time:
  223. expiry_time = self.expiry
  224. self._check_cache()
  225. expired_files, writable_cache_empty = self._metadata.clear_expired(expiry_time)
  226. for fn in expired_files:
  227. if os.path.exists(fn):
  228. os.remove(fn)
  229. if writable_cache_empty:
  230. rmtree(self.storage[-1])
  231. self.load_cache()
  232. self._cache_size = None
  233. def pop_from_cache(self, path):
  234. """Remove cached version of given file
  235. Deletes local copy of the given (remote) path. If it is found in a cache
  236. location which is not the last, it is assumed to be read-only, and
  237. raises PermissionError
  238. """
  239. path = self._strip_protocol(path)
  240. fn = self._metadata.pop_file(path)
  241. if fn is not None:
  242. os.remove(fn)
  243. self._cache_size = None
  244. def _open(
  245. self,
  246. path,
  247. mode="rb",
  248. block_size=None,
  249. autocommit=True,
  250. cache_options=None,
  251. **kwargs,
  252. ):
  253. """Wrap the target _open
  254. If the whole file exists in the cache, just open it locally and
  255. return that.
  256. Otherwise, open the file on the target FS, and make it have a mmap
  257. cache pointing to the location which we determine, in our cache.
  258. The ``blocks`` instance is shared, so as the mmap cache instance
  259. updates, so does the entry in our ``cached_files`` attribute.
  260. We monkey-patch this file, so that when it closes, we call
  261. ``close_and_update`` to save the state of the blocks.
  262. """
  263. path = self._strip_protocol(path)
  264. path = self.fs._strip_protocol(path)
  265. if "r" not in mode:
  266. return self.fs._open(
  267. path,
  268. mode=mode,
  269. block_size=block_size,
  270. autocommit=autocommit,
  271. cache_options=cache_options,
  272. **kwargs,
  273. )
  274. detail = self._check_file(path)
  275. if detail:
  276. # file is in cache
  277. detail, fn = detail
  278. hash, blocks = detail["fn"], detail["blocks"]
  279. if blocks is True:
  280. # stored file is complete
  281. logger.debug("Opening local copy of %s", path)
  282. return open(fn, mode)
  283. # TODO: action where partial file exists in read-only cache
  284. logger.debug("Opening partially cached copy of %s", path)
  285. else:
  286. hash = self._mapper(path)
  287. fn = os.path.join(self.storage[-1], hash)
  288. blocks = set()
  289. detail = {
  290. "original": path,
  291. "fn": hash,
  292. "blocks": blocks,
  293. "time": time.time(),
  294. "uid": self.fs.ukey(path),
  295. }
  296. self._metadata.update_file(path, detail)
  297. logger.debug("Creating local sparse file for %s", path)
  298. # explicitly submitting the size to the open call will avoid extra
  299. # operations when opening. This is particularly relevant
  300. # for any file that is read over a network, e.g. S3.
  301. size = detail.get("size")
  302. # call target filesystems open
  303. self._mkcache()
  304. f = self.fs._open(
  305. path,
  306. mode=mode,
  307. block_size=block_size,
  308. autocommit=autocommit,
  309. cache_options=cache_options,
  310. cache_type="none",
  311. size=size,
  312. **kwargs,
  313. )
  314. # set size if not already set
  315. if size is None:
  316. detail["size"] = f.size
  317. self._metadata.update_file(path, detail)
  318. if self.compression:
  319. comp = (
  320. infer_compression(path)
  321. if self.compression == "infer"
  322. else self.compression
  323. )
  324. f = compr[comp](f, mode="rb")
  325. if "blocksize" in detail:
  326. if detail["blocksize"] != f.blocksize:
  327. raise BlocksizeMismatchError(
  328. f"Cached file must be reopened with same block"
  329. f" size as original (old: {detail['blocksize']},"
  330. f" new {f.blocksize})"
  331. )
  332. else:
  333. detail["blocksize"] = f.blocksize
  334. def _fetch_ranges(ranges):
  335. return self.fs.cat_ranges(
  336. [path] * len(ranges),
  337. [r[0] for r in ranges],
  338. [r[1] for r in ranges],
  339. **kwargs,
  340. )
  341. multi_fetcher = None if self.compression else _fetch_ranges
  342. f.cache = MMapCache(
  343. f.blocksize, f._fetch_range, f.size, fn, blocks, multi_fetcher=multi_fetcher
  344. )
  345. close = f.close
  346. f.close = lambda: self.close_and_update(f, close)
  347. self.save_cache()
  348. return f
  349. def _parent(self, path):
  350. return self.fs._parent(path)
  351. def hash_name(self, path: str, *args: Any) -> str:
  352. # Kept for backward compatibility with downstream libraries.
  353. # Ignores extra arguments, previously same_name boolean.
  354. return self._mapper(path)
  355. def close_and_update(self, f, close):
  356. """Called when a file is closing, so store the set of blocks"""
  357. if f.closed:
  358. return
  359. path = self._strip_protocol(f.path)
  360. self._metadata.on_close_cached_file(f, path)
  361. try:
  362. logger.debug("going to save")
  363. self.save_cache()
  364. logger.debug("saved")
  365. except OSError:
  366. logger.debug("Cache saving failed while closing file")
  367. except NameError:
  368. logger.debug("Cache save failed due to interpreter shutdown")
  369. close()
  370. f.closed = True
  371. def ls(self, path, detail=True):
  372. return self.fs.ls(path, detail)
  373. def __getattribute__(self, item):
  374. if item in {
  375. "load_cache",
  376. "_open",
  377. "save_cache",
  378. "close_and_update",
  379. "__init__",
  380. "__getattribute__",
  381. "__reduce__",
  382. "_make_local_details",
  383. "open",
  384. "cat",
  385. "cat_file",
  386. "_cat_file",
  387. "cat_ranges",
  388. "_cat_ranges",
  389. "get",
  390. "read_block",
  391. "tail",
  392. "head",
  393. "info",
  394. "ls",
  395. "exists",
  396. "isfile",
  397. "isdir",
  398. "_check_file",
  399. "_check_cache",
  400. "_mkcache",
  401. "clear_cache",
  402. "clear_expired_cache",
  403. "pop_from_cache",
  404. "local_file",
  405. "_paths_from_path",
  406. "get_mapper",
  407. "open_many",
  408. "commit_many",
  409. "hash_name",
  410. "__hash__",
  411. "__eq__",
  412. "to_json",
  413. "to_dict",
  414. "cache_size",
  415. "pipe_file",
  416. "pipe",
  417. "start_transaction",
  418. "end_transaction",
  419. }:
  420. # all the methods defined in this class. Note `open` here, since
  421. # it calls `_open`, but is actually in superclass
  422. return lambda *args, **kw: getattr(type(self), item).__get__(self)(
  423. *args, **kw
  424. )
  425. if item in ["__reduce_ex__"]:
  426. raise AttributeError
  427. if item in ["transaction"]:
  428. # property
  429. return type(self).transaction.__get__(self)
  430. if item in {"_cache", "transaction_type", "protocol"}:
  431. # class attributes
  432. return getattr(type(self), item)
  433. if item == "__class__":
  434. return type(self)
  435. d = object.__getattribute__(self, "__dict__")
  436. fs = d.get("fs", None) # fs is not immediately defined
  437. if item in d:
  438. return d[item]
  439. elif fs is not None:
  440. if item in fs.__dict__:
  441. # attribute of instance
  442. return fs.__dict__[item]
  443. # attributed belonging to the target filesystem
  444. cls = type(fs)
  445. m = getattr(cls, item)
  446. if (inspect.isfunction(m) or inspect.isdatadescriptor(m)) and (
  447. not hasattr(m, "__self__") or m.__self__ is None
  448. ):
  449. # instance method
  450. return m.__get__(fs, cls)
  451. return m # class method or attribute
  452. else:
  453. # attributes of the superclass, while target is being set up
  454. return super().__getattribute__(item)
  455. def __eq__(self, other):
  456. """Test for equality."""
  457. if self is other:
  458. return True
  459. if not isinstance(other, type(self)):
  460. return False
  461. return (
  462. self.storage == other.storage
  463. and self.kwargs == other.kwargs
  464. and self.cache_check == other.cache_check
  465. and self.check_files == other.check_files
  466. and self.expiry == other.expiry
  467. and self.compression == other.compression
  468. and self._mapper == other._mapper
  469. and self.target_protocol == other.target_protocol
  470. )
  471. def __hash__(self):
  472. """Calculate hash."""
  473. return (
  474. hash(tuple(self.storage))
  475. ^ hash(str(self.kwargs))
  476. ^ hash(self.cache_check)
  477. ^ hash(self.check_files)
  478. ^ hash(self.expiry)
  479. ^ hash(self.compression)
  480. ^ hash(self._mapper)
  481. ^ hash(self.target_protocol)
  482. )
  483. class WholeFileCacheFileSystem(CachingFileSystem):
  484. """Caches whole remote files on first access
  485. This class is intended as a layer over any other file system, and
  486. will make a local copy of each file accessed, so that all subsequent
  487. reads are local. This is similar to ``CachingFileSystem``, but without
  488. the block-wise functionality and so can work even when sparse files
  489. are not allowed. See its docstring for definition of the init
  490. arguments.
  491. The class still needs access to the remote store for listing files,
  492. and may refresh cached files.
  493. """
  494. protocol = "filecache"
  495. local_file = True
  496. def open_many(self, open_files, **kwargs):
  497. paths = [of.path for of in open_files]
  498. if "r" in open_files.mode:
  499. self._mkcache()
  500. else:
  501. return [
  502. LocalTempFile(
  503. self.fs,
  504. path,
  505. mode=open_files.mode,
  506. fn=os.path.join(self.storage[-1], self._mapper(path)),
  507. **kwargs,
  508. )
  509. for path in paths
  510. ]
  511. if self.compression:
  512. raise NotImplementedError
  513. details = [self._check_file(sp) for sp in paths]
  514. downpath = [p for p, d in zip(paths, details) if not d]
  515. downfn0 = [
  516. os.path.join(self.storage[-1], self._mapper(p))
  517. for p, d in zip(paths, details)
  518. ] # keep these path names for opening later
  519. downfn = [fn for fn, d in zip(downfn0, details) if not d]
  520. if downpath:
  521. # skip if all files are already cached and up to date
  522. self.fs.get(downpath, downfn)
  523. # update metadata - only happens when downloads are successful
  524. newdetail = [
  525. {
  526. "original": path,
  527. "fn": self._mapper(path),
  528. "blocks": True,
  529. "time": time.time(),
  530. "uid": self.fs.ukey(path),
  531. }
  532. for path in downpath
  533. ]
  534. for path, detail in zip(downpath, newdetail):
  535. self._metadata.update_file(path, detail)
  536. self.save_cache()
  537. def firstpart(fn):
  538. # helper to adapt both whole-file and simple-cache
  539. return fn[1] if isinstance(fn, tuple) else fn
  540. return [
  541. open(firstpart(fn0) if fn0 else fn1, mode=open_files.mode)
  542. for fn0, fn1 in zip(details, downfn0)
  543. ]
  544. def commit_many(self, open_files):
  545. self.fs.put([f.fn for f in open_files], [f.path for f in open_files])
  546. [f.close() for f in open_files]
  547. for f in open_files:
  548. # in case autocommit is off, and so close did not already delete
  549. try:
  550. os.remove(f.name)
  551. except FileNotFoundError:
  552. pass
  553. self._cache_size = None
  554. def _make_local_details(self, path):
  555. hash = self._mapper(path)
  556. fn = os.path.join(self.storage[-1], hash)
  557. detail = {
  558. "original": path,
  559. "fn": hash,
  560. "blocks": True,
  561. "time": time.time(),
  562. "uid": self.fs.ukey(path),
  563. }
  564. self._metadata.update_file(path, detail)
  565. logger.debug("Copying %s to local cache", path)
  566. return fn
  567. def cat(
  568. self,
  569. path,
  570. recursive=False,
  571. on_error="raise",
  572. callback=DEFAULT_CALLBACK,
  573. **kwargs,
  574. ):
  575. paths = self.expand_path(
  576. path, recursive=recursive, maxdepth=kwargs.get("maxdepth")
  577. )
  578. getpaths = []
  579. storepaths = []
  580. fns = []
  581. out = {}
  582. for p in paths.copy():
  583. try:
  584. detail = self._check_file(p)
  585. if not detail:
  586. fn = self._make_local_details(p)
  587. getpaths.append(p)
  588. storepaths.append(fn)
  589. else:
  590. detail, fn = detail if isinstance(detail, tuple) else (None, detail)
  591. fns.append(fn)
  592. except Exception as e:
  593. if on_error == "raise":
  594. raise
  595. if on_error == "return":
  596. out[p] = e
  597. paths.remove(p)
  598. if getpaths:
  599. self.fs.get(getpaths, storepaths)
  600. self.save_cache()
  601. callback.set_size(len(paths))
  602. for p, fn in zip(paths, fns):
  603. with open(fn, "rb") as f:
  604. out[p] = f.read()
  605. callback.relative_update(1)
  606. if isinstance(path, str) and len(paths) == 1 and recursive is False:
  607. out = out[paths[0]]
  608. return out
  609. def _open(self, path, mode="rb", **kwargs):
  610. path = self._strip_protocol(path)
  611. if "r" not in mode:
  612. hash = self._mapper(path)
  613. fn = os.path.join(self.storage[-1], hash)
  614. user_specified_kwargs = {
  615. k: v
  616. for k, v in kwargs.items()
  617. # those kwargs were added by open(), we don't want them
  618. if k not in ["autocommit", "block_size", "cache_options"]
  619. }
  620. return LocalTempFile(self, path, mode=mode, fn=fn, **user_specified_kwargs)
  621. detail = self._check_file(path)
  622. if detail:
  623. detail, fn = detail
  624. _, blocks = detail["fn"], detail["blocks"]
  625. if blocks is True:
  626. logger.debug("Opening local copy of %s", path)
  627. # In order to support downstream filesystems to be able to
  628. # infer the compression from the original filename, like
  629. # the `TarFileSystem`, let's extend the `io.BufferedReader`
  630. # fileobject protocol by adding a dedicated attribute
  631. # `original`.
  632. f = open(fn, mode)
  633. f.original = detail.get("original")
  634. return f
  635. else:
  636. raise ValueError(
  637. f"Attempt to open partially cached file {path}"
  638. f" as a wholly cached file"
  639. )
  640. else:
  641. fn = self._make_local_details(path)
  642. kwargs["mode"] = mode
  643. # call target filesystems open
  644. self._mkcache()
  645. if self.compression:
  646. with self.fs._open(path, **kwargs) as f, open(fn, "wb") as f2:
  647. if isinstance(f, AbstractBufferedFile):
  648. # want no type of caching if just downloading whole thing
  649. f.cache = BaseCache(0, f.cache.fetcher, f.size)
  650. comp = (
  651. infer_compression(path)
  652. if self.compression == "infer"
  653. else self.compression
  654. )
  655. f = compr[comp](f, mode="rb")
  656. data = True
  657. while data:
  658. block = getattr(f, "blocksize", 5 * 2**20)
  659. data = f.read(block)
  660. f2.write(data)
  661. else:
  662. self.fs.get_file(path, fn)
  663. self.save_cache()
  664. return self._open(path, mode)
  665. class SimpleCacheFileSystem(WholeFileCacheFileSystem):
  666. """Caches whole remote files on first access
  667. This class is intended as a layer over any other file system, and
  668. will make a local copy of each file accessed, so that all subsequent
  669. reads are local. This implementation only copies whole files, and
  670. does not keep any metadata about the download time or file details.
  671. It is therefore safer to use in multi-threaded/concurrent situations.
  672. This is the only of the caching filesystems that supports write: you will
  673. be given a real local open file, and upon close and commit, it will be
  674. uploaded to the target filesystem; the writability or the target URL is
  675. not checked until that time.
  676. """
  677. protocol = "simplecache"
  678. local_file = True
  679. transaction_type = WriteCachedTransaction
  680. def __init__(self, **kwargs):
  681. kw = kwargs.copy()
  682. for key in ["cache_check", "expiry_time", "check_files"]:
  683. kw[key] = False
  684. super().__init__(**kw)
  685. for storage in self.storage:
  686. if not os.path.exists(storage):
  687. os.makedirs(storage, exist_ok=True)
  688. def _check_file(self, path):
  689. self._check_cache()
  690. sha = self._mapper(path)
  691. for storage in self.storage:
  692. fn = os.path.join(storage, sha)
  693. if os.path.exists(fn):
  694. return fn
  695. def save_cache(self):
  696. pass
  697. def load_cache(self):
  698. pass
  699. def pipe_file(self, path, value=None, **kwargs):
  700. if self._intrans:
  701. with self.open(path, "wb") as f:
  702. f.write(value)
  703. else:
  704. super().pipe_file(path, value)
  705. def ls(self, path, detail=True, **kwargs):
  706. path = self._strip_protocol(path)
  707. details = []
  708. try:
  709. details = self.fs.ls(
  710. path, detail=True, **kwargs
  711. ).copy() # don't edit original!
  712. except FileNotFoundError as e:
  713. ex = e
  714. else:
  715. ex = None
  716. if self._intrans:
  717. path1 = path.rstrip("/") + "/"
  718. for f in self.transaction.files:
  719. if f.path == path:
  720. details.append(
  721. {"name": path, "size": f.size or f.tell(), "type": "file"}
  722. )
  723. elif f.path.startswith(path1):
  724. if f.path.count("/") == path1.count("/"):
  725. details.append(
  726. {"name": f.path, "size": f.size or f.tell(), "type": "file"}
  727. )
  728. else:
  729. dname = "/".join(f.path.split("/")[: path1.count("/") + 1])
  730. details.append({"name": dname, "size": 0, "type": "directory"})
  731. if ex is not None and not details:
  732. raise ex
  733. if detail:
  734. return details
  735. return sorted(_["name"] for _ in details)
  736. def info(self, path, **kwargs):
  737. path = self._strip_protocol(path)
  738. if self._intrans:
  739. f = [_ for _ in self.transaction.files if _.path == path]
  740. if f:
  741. size = os.path.getsize(f[0].fn) if f[0].closed else f[0].tell()
  742. return {"name": path, "size": size, "type": "file"}
  743. f = any(_.path.startswith(path + "/") for _ in self.transaction.files)
  744. if f:
  745. return {"name": path, "size": 0, "type": "directory"}
  746. return self.fs.info(path, **kwargs)
  747. def pipe(self, path, value=None, **kwargs):
  748. if isinstance(path, str):
  749. self.pipe_file(self._strip_protocol(path), value, **kwargs)
  750. elif isinstance(path, dict):
  751. for k, v in path.items():
  752. self.pipe_file(self._strip_protocol(k), v, **kwargs)
  753. else:
  754. raise ValueError("path must be str or dict")
  755. async def _cat_file(self, path, start=None, end=None, **kwargs):
  756. logger.debug("async cat_file %s", path)
  757. path = self._strip_protocol(path)
  758. sha = self._mapper(path)
  759. fn = self._check_file(path)
  760. if not fn:
  761. fn = os.path.join(self.storage[-1], sha)
  762. await self.fs._get_file(path, fn, **kwargs)
  763. with open(fn, "rb") as f: # noqa ASYNC230
  764. if start:
  765. f.seek(start)
  766. size = -1 if end is None else end - f.tell()
  767. return f.read(size)
  768. async def _cat_ranges(
  769. self, paths, starts, ends, max_gap=None, on_error="return", **kwargs
  770. ):
  771. logger.debug("async cat ranges %s", paths)
  772. lpaths = []
  773. rset = set()
  774. download = []
  775. rpaths = []
  776. for p in paths:
  777. fn = self._check_file(p)
  778. if fn is None and p not in rset:
  779. sha = self._mapper(p)
  780. fn = os.path.join(self.storage[-1], sha)
  781. download.append(fn)
  782. rset.add(p)
  783. rpaths.append(p)
  784. lpaths.append(fn)
  785. if download:
  786. await self.fs._get(rpaths, download, on_error=on_error)
  787. return LocalFileSystem().cat_ranges(
  788. lpaths, starts, ends, max_gap=max_gap, on_error=on_error, **kwargs
  789. )
  790. def cat_ranges(
  791. self, paths, starts, ends, max_gap=None, on_error="return", **kwargs
  792. ):
  793. logger.debug("cat ranges %s", paths)
  794. lpaths = [self._check_file(p) for p in paths]
  795. rpaths = [p for l, p in zip(lpaths, paths) if l is False]
  796. lpaths = [l for l, p in zip(lpaths, paths) if l is False]
  797. self.fs.get(rpaths, lpaths)
  798. paths = [self._check_file(p) for p in paths]
  799. return LocalFileSystem().cat_ranges(
  800. paths, starts, ends, max_gap=max_gap, on_error=on_error, **kwargs
  801. )
  802. def _open(self, path, mode="rb", **kwargs):
  803. path = self._strip_protocol(path)
  804. sha = self._mapper(path)
  805. if "r" not in mode:
  806. fn = os.path.join(self.storage[-1], sha)
  807. user_specified_kwargs = {
  808. k: v
  809. for k, v in kwargs.items()
  810. if k not in ["autocommit", "block_size", "cache_options"]
  811. } # those were added by open()
  812. return LocalTempFile(
  813. self,
  814. path,
  815. mode=mode,
  816. autocommit=not self._intrans,
  817. fn=fn,
  818. **user_specified_kwargs,
  819. )
  820. fn = self._check_file(path)
  821. if fn:
  822. return open(fn, mode)
  823. fn = os.path.join(self.storage[-1], sha)
  824. logger.debug("Copying %s to local cache", path)
  825. kwargs["mode"] = mode
  826. self._mkcache()
  827. self._cache_size = None
  828. if self.compression:
  829. with self.fs._open(path, **kwargs) as f, open(fn, "wb") as f2:
  830. if isinstance(f, AbstractBufferedFile):
  831. # want no type of caching if just downloading whole thing
  832. f.cache = BaseCache(0, f.cache.fetcher, f.size)
  833. comp = (
  834. infer_compression(path)
  835. if self.compression == "infer"
  836. else self.compression
  837. )
  838. f = compr[comp](f, mode="rb")
  839. data = True
  840. while data:
  841. block = getattr(f, "blocksize", 5 * 2**20)
  842. data = f.read(block)
  843. f2.write(data)
  844. else:
  845. self.fs.get_file(path, fn)
  846. return self._open(path, mode)
  847. class LocalTempFile:
  848. """A temporary local file, which will be uploaded on commit"""
  849. def __init__(self, fs, path, fn, mode="wb", autocommit=True, seek=0, **kwargs):
  850. self.fn = fn
  851. self.fh = open(fn, mode)
  852. self.mode = mode
  853. if seek:
  854. self.fh.seek(seek)
  855. self.path = path
  856. self.size = None
  857. self.fs = fs
  858. self.closed = False
  859. self.autocommit = autocommit
  860. self.kwargs = kwargs
  861. def __reduce__(self):
  862. # always open in r+b to allow continuing writing at a location
  863. return (
  864. LocalTempFile,
  865. (self.fs, self.path, self.fn, "r+b", self.autocommit, self.tell()),
  866. )
  867. def __enter__(self):
  868. return self.fh
  869. def __exit__(self, exc_type, exc_val, exc_tb):
  870. self.close()
  871. def close(self):
  872. # self.size = self.fh.tell()
  873. if self.closed:
  874. return
  875. self.fh.close()
  876. self.closed = True
  877. if self.autocommit:
  878. self.commit()
  879. def discard(self):
  880. self.fh.close()
  881. os.remove(self.fn)
  882. def commit(self):
  883. # calling put() with list arguments avoids path expansion and additional operations
  884. # like isdir()
  885. self.fs.put([self.fn], [self.path], **self.kwargs)
  886. # we do not delete the local copy, it's still in the cache.
  887. @property
  888. def name(self):
  889. return self.fn
  890. def __repr__(self) -> str:
  891. return f"LocalTempFile: {self.path}"
  892. def __getattr__(self, item):
  893. return getattr(self.fh, item)