ftp.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387
  1. import os
  2. import uuid
  3. from ftplib import FTP, FTP_TLS, Error, error_perm
  4. from typing import Any
  5. from ..spec import AbstractBufferedFile, AbstractFileSystem
  6. from ..utils import infer_storage_options, isfilelike
  7. class FTPFileSystem(AbstractFileSystem):
  8. """A filesystem over classic FTP"""
  9. root_marker = "/"
  10. cachable = False
  11. protocol = "ftp"
  12. def __init__(
  13. self,
  14. host,
  15. port=21,
  16. username=None,
  17. password=None,
  18. acct=None,
  19. block_size=None,
  20. tempdir=None,
  21. timeout=30,
  22. encoding="utf-8",
  23. tls=False,
  24. **kwargs,
  25. ):
  26. """
  27. You can use _get_kwargs_from_urls to get some kwargs from
  28. a reasonable FTP url.
  29. Authentication will be anonymous if username/password are not
  30. given.
  31. Parameters
  32. ----------
  33. host: str
  34. The remote server name/ip to connect to
  35. port: int
  36. Port to connect with
  37. username: str or None
  38. If authenticating, the user's identifier
  39. password: str of None
  40. User's password on the server, if using
  41. acct: str or None
  42. Some servers also need an "account" string for auth
  43. block_size: int or None
  44. If given, the read-ahead or write buffer size.
  45. tempdir: str
  46. Directory on remote to put temporary files when in a transaction
  47. timeout: int
  48. Timeout of the ftp connection in seconds
  49. encoding: str
  50. Encoding to use for directories and filenames in FTP connection
  51. tls: bool
  52. Use FTP-TLS, by default False
  53. """
  54. super().__init__(**kwargs)
  55. self.host = host
  56. self.port = port
  57. self.tempdir = tempdir or "/tmp"
  58. self.cred = username or "", password or "", acct or ""
  59. self.timeout = timeout
  60. self.encoding = encoding
  61. if block_size is not None:
  62. self.blocksize = block_size
  63. else:
  64. self.blocksize = 2**16
  65. self.tls = tls
  66. self._connect()
  67. if self.tls:
  68. self.ftp.prot_p()
  69. def _connect(self):
  70. if self.tls:
  71. ftp_cls = FTP_TLS
  72. else:
  73. ftp_cls = FTP
  74. self.ftp = ftp_cls(timeout=self.timeout, encoding=self.encoding)
  75. self.ftp.connect(self.host, self.port)
  76. self.ftp.login(*self.cred)
  77. @classmethod
  78. def _strip_protocol(cls, path):
  79. return "/" + infer_storage_options(path)["path"].lstrip("/").rstrip("/")
  80. @staticmethod
  81. def _get_kwargs_from_urls(urlpath):
  82. out = infer_storage_options(urlpath)
  83. out.pop("path", None)
  84. out.pop("protocol", None)
  85. return out
  86. def ls(self, path, detail=True, **kwargs):
  87. path = self._strip_protocol(path)
  88. out = []
  89. if path not in self.dircache:
  90. try:
  91. try:
  92. out = [
  93. (fn, details)
  94. for (fn, details) in self.ftp.mlsd(path)
  95. if fn not in [".", ".."]
  96. and details["type"] not in ["pdir", "cdir"]
  97. ]
  98. except error_perm:
  99. out = _mlsd2(self.ftp, path) # Not platform independent
  100. for fn, details in out:
  101. details["name"] = "/".join(
  102. ["" if path == "/" else path, fn.lstrip("/")]
  103. )
  104. if details["type"] == "file":
  105. details["size"] = int(details["size"])
  106. else:
  107. details["size"] = 0
  108. if details["type"] == "dir":
  109. details["type"] = "directory"
  110. self.dircache[path] = out
  111. except Error:
  112. try:
  113. info = self.info(path)
  114. if info["type"] == "file":
  115. out = [(path, info)]
  116. except (Error, IndexError) as exc:
  117. raise FileNotFoundError(path) from exc
  118. files = self.dircache.get(path, out)
  119. if not detail:
  120. return sorted([fn for fn, details in files])
  121. return [details for fn, details in files]
  122. def info(self, path, **kwargs):
  123. # implement with direct method
  124. path = self._strip_protocol(path)
  125. if path == "/":
  126. # special case, since this dir has no real entry
  127. return {"name": "/", "size": 0, "type": "directory"}
  128. files = self.ls(self._parent(path).lstrip("/"), True)
  129. try:
  130. out = next(f for f in files if f["name"] == path)
  131. except StopIteration as exc:
  132. raise FileNotFoundError(path) from exc
  133. return out
  134. def get_file(self, rpath, lpath, **kwargs):
  135. if self.isdir(rpath):
  136. if not os.path.exists(lpath):
  137. os.mkdir(lpath)
  138. return
  139. if isfilelike(lpath):
  140. outfile = lpath
  141. else:
  142. outfile = open(lpath, "wb")
  143. def cb(x):
  144. outfile.write(x)
  145. self.ftp.retrbinary(
  146. f"RETR {rpath}",
  147. blocksize=self.blocksize,
  148. callback=cb,
  149. )
  150. if not isfilelike(lpath):
  151. outfile.close()
  152. def cat_file(self, path, start=None, end=None, **kwargs):
  153. if end is not None:
  154. return super().cat_file(path, start, end, **kwargs)
  155. out = []
  156. def cb(x):
  157. out.append(x)
  158. try:
  159. self.ftp.retrbinary(
  160. f"RETR {path}",
  161. blocksize=self.blocksize,
  162. rest=start,
  163. callback=cb,
  164. )
  165. except (Error, error_perm) as orig_exc:
  166. raise FileNotFoundError(path) from orig_exc
  167. return b"".join(out)
  168. def _open(
  169. self,
  170. path,
  171. mode="rb",
  172. block_size=None,
  173. cache_options=None,
  174. autocommit=True,
  175. **kwargs,
  176. ):
  177. path = self._strip_protocol(path)
  178. block_size = block_size or self.blocksize
  179. return FTPFile(
  180. self,
  181. path,
  182. mode=mode,
  183. block_size=block_size,
  184. tempdir=self.tempdir,
  185. autocommit=autocommit,
  186. cache_options=cache_options,
  187. )
  188. def _rm(self, path):
  189. path = self._strip_protocol(path)
  190. self.ftp.delete(path)
  191. self.invalidate_cache(self._parent(path))
  192. def rm(self, path, recursive=False, maxdepth=None):
  193. paths = self.expand_path(path, recursive=recursive, maxdepth=maxdepth)
  194. for p in reversed(paths):
  195. if self.isfile(p):
  196. self.rm_file(p)
  197. else:
  198. self.rmdir(p)
  199. def mkdir(self, path: str, create_parents: bool = True, **kwargs: Any) -> None:
  200. path = self._strip_protocol(path)
  201. parent = self._parent(path)
  202. if parent != self.root_marker and not self.exists(parent) and create_parents:
  203. self.mkdir(parent, create_parents=create_parents)
  204. self.ftp.mkd(path)
  205. self.invalidate_cache(self._parent(path))
  206. def makedirs(self, path: str, exist_ok: bool = False) -> None:
  207. path = self._strip_protocol(path)
  208. if self.exists(path):
  209. # NB: "/" does not "exist" as it has no directory entry
  210. if not exist_ok:
  211. raise FileExistsError(f"{path} exists without `exist_ok`")
  212. # exists_ok=True -> no-op
  213. else:
  214. self.mkdir(path, create_parents=True)
  215. def rmdir(self, path):
  216. path = self._strip_protocol(path)
  217. self.ftp.rmd(path)
  218. self.invalidate_cache(self._parent(path))
  219. def mv(self, path1, path2, **kwargs):
  220. path1 = self._strip_protocol(path1)
  221. path2 = self._strip_protocol(path2)
  222. self.ftp.rename(path1, path2)
  223. self.invalidate_cache(self._parent(path1))
  224. self.invalidate_cache(self._parent(path2))
  225. def __del__(self):
  226. self.ftp.close()
  227. def invalidate_cache(self, path=None):
  228. if path is None:
  229. self.dircache.clear()
  230. else:
  231. self.dircache.pop(path, None)
  232. super().invalidate_cache(path)
  233. class TransferDone(Exception):
  234. """Internal exception to break out of transfer"""
  235. pass
  236. class FTPFile(AbstractBufferedFile):
  237. """Interact with a remote FTP file with read/write buffering"""
  238. def __init__(
  239. self,
  240. fs,
  241. path,
  242. mode="rb",
  243. block_size="default",
  244. autocommit=True,
  245. cache_type="readahead",
  246. cache_options=None,
  247. **kwargs,
  248. ):
  249. super().__init__(
  250. fs,
  251. path,
  252. mode=mode,
  253. block_size=block_size,
  254. autocommit=autocommit,
  255. cache_type=cache_type,
  256. cache_options=cache_options,
  257. **kwargs,
  258. )
  259. if not autocommit:
  260. self.target = self.path
  261. self.path = "/".join([kwargs["tempdir"], str(uuid.uuid4())])
  262. def commit(self):
  263. self.fs.mv(self.path, self.target)
  264. def discard(self):
  265. self.fs.rm(self.path)
  266. def _fetch_range(self, start, end):
  267. """Get bytes between given byte limits
  268. Implemented by raising an exception in the fetch callback when the
  269. number of bytes received reaches the requested amount.
  270. Will fail if the server does not respect the REST command on
  271. retrieve requests.
  272. """
  273. out = []
  274. total = [0]
  275. def callback(x):
  276. total[0] += len(x)
  277. if total[0] > end - start:
  278. out.append(x[: (end - start) - total[0]])
  279. if end < self.size:
  280. raise TransferDone
  281. else:
  282. out.append(x)
  283. if total[0] == end - start and end < self.size:
  284. raise TransferDone
  285. try:
  286. self.fs.ftp.retrbinary(
  287. f"RETR {self.path}",
  288. blocksize=self.blocksize,
  289. rest=start,
  290. callback=callback,
  291. )
  292. except TransferDone:
  293. try:
  294. # stop transfer, we got enough bytes for this block
  295. self.fs.ftp.abort()
  296. self.fs.ftp.getmultiline()
  297. except Error:
  298. self.fs._connect()
  299. return b"".join(out)
  300. def _upload_chunk(self, final=False):
  301. self.buffer.seek(0)
  302. self.fs.ftp.storbinary(
  303. f"STOR {self.path}", self.buffer, blocksize=self.blocksize, rest=self.offset
  304. )
  305. return True
  306. def _mlsd2(ftp, path="."):
  307. """
  308. Fall back to using `dir` instead of `mlsd` if not supported.
  309. This parses a Linux style `ls -l` response to `dir`, but the response may
  310. be platform dependent.
  311. Parameters
  312. ----------
  313. ftp: ftplib.FTP
  314. path: str
  315. Expects to be given path, but defaults to ".".
  316. """
  317. lines = []
  318. minfo = []
  319. ftp.dir(path, lines.append)
  320. for line in lines:
  321. split_line = line.split()
  322. if len(split_line) < 9:
  323. continue
  324. this = (
  325. split_line[-1],
  326. {
  327. "modify": " ".join(split_line[5:8]),
  328. "unix.owner": split_line[2],
  329. "unix.group": split_line[3],
  330. "unix.mode": split_line[0],
  331. "size": split_line[4],
  332. },
  333. )
  334. if this[1]["unix.mode"][0] == "d":
  335. this[1]["type"] = "dir"
  336. else:
  337. this[1]["type"] = "file"
  338. minfo.append(this)
  339. return minfo