lfs.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. """
  2. Implementation of a custom transfer agent for the transfer type "multipart" for
  3. git-lfs.
  4. Inspired by:
  5. github.com/cbartz/git-lfs-swift-transfer-agent/blob/master/git_lfs_swift_transfer.py
  6. Spec is: github.com/git-lfs/git-lfs/blob/master/docs/custom-transfers.md
  7. To launch debugger while developing:
  8. ``` [lfs "customtransfer.multipart"]
  9. path = /path/to/huggingface_hub/.env/bin/python args = -m debugpy --listen 5678
  10. --wait-for-client
  11. /path/to/huggingface_hub/src/huggingface_hub/commands/huggingface_cli.py
  12. lfs-multipart-upload ```"""
  13. import json
  14. import os
  15. import subprocess
  16. import sys
  17. from argparse import _SubParsersAction
  18. from typing import Dict, List, Optional
  19. from huggingface_hub.commands import BaseHuggingfaceCLICommand
  20. from huggingface_hub.lfs import LFS_MULTIPART_UPLOAD_COMMAND
  21. from ..utils import get_session, hf_raise_for_status, logging
  22. from ..utils._lfs import SliceFileObj
  23. logger = logging.get_logger(__name__)
  24. class LfsCommands(BaseHuggingfaceCLICommand):
  25. """
  26. Implementation of a custom transfer agent for the transfer type "multipart"
  27. for git-lfs. This lets users upload large files >5GB 🔥. Spec for LFS custom
  28. transfer agent is:
  29. https://github.com/git-lfs/git-lfs/blob/master/docs/custom-transfers.md
  30. This introduces two commands to the CLI:
  31. 1. $ huggingface-cli lfs-enable-largefiles
  32. This should be executed once for each model repo that contains a model file
  33. >5GB. It's documented in the error message you get if you just try to git
  34. push a 5GB file without having enabled it before.
  35. 2. $ huggingface-cli lfs-multipart-upload
  36. This command is called by lfs directly and is not meant to be called by the
  37. user.
  38. """
  39. @staticmethod
  40. def register_subcommand(parser: _SubParsersAction):
  41. enable_parser = parser.add_parser(
  42. "lfs-enable-largefiles", help="Configure your repository to enable upload of files > 5GB."
  43. )
  44. enable_parser.add_argument("path", type=str, help="Local path to repository you want to configure.")
  45. enable_parser.set_defaults(func=lambda args: LfsEnableCommand(args))
  46. # Command will get called by git-lfs, do not call it directly.
  47. upload_parser = parser.add_parser(LFS_MULTIPART_UPLOAD_COMMAND, add_help=False)
  48. upload_parser.set_defaults(func=lambda args: LfsUploadCommand(args))
  49. class LfsEnableCommand:
  50. def __init__(self, args):
  51. self.args = args
  52. def run(self):
  53. local_path = os.path.abspath(self.args.path)
  54. if not os.path.isdir(local_path):
  55. print("This does not look like a valid git repo.")
  56. exit(1)
  57. subprocess.run(
  58. "git config lfs.customtransfer.multipart.path huggingface-cli".split(),
  59. check=True,
  60. cwd=local_path,
  61. )
  62. subprocess.run(
  63. f"git config lfs.customtransfer.multipart.args {LFS_MULTIPART_UPLOAD_COMMAND}".split(),
  64. check=True,
  65. cwd=local_path,
  66. )
  67. print("Local repo set up for largefiles")
  68. def write_msg(msg: Dict):
  69. """Write out the message in Line delimited JSON."""
  70. msg_str = json.dumps(msg) + "\n"
  71. sys.stdout.write(msg_str)
  72. sys.stdout.flush()
  73. def read_msg() -> Optional[Dict]:
  74. """Read Line delimited JSON from stdin."""
  75. msg = json.loads(sys.stdin.readline().strip())
  76. if "terminate" in (msg.get("type"), msg.get("event")):
  77. # terminate message received
  78. return None
  79. if msg.get("event") not in ("download", "upload"):
  80. logger.critical("Received unexpected message")
  81. sys.exit(1)
  82. return msg
  83. class LfsUploadCommand:
  84. def __init__(self, args) -> None:
  85. self.args = args
  86. def run(self) -> None:
  87. # Immediately after invoking a custom transfer process, git-lfs
  88. # sends initiation data to the process over stdin.
  89. # This tells the process useful information about the configuration.
  90. init_msg = json.loads(sys.stdin.readline().strip())
  91. if not (init_msg.get("event") == "init" and init_msg.get("operation") == "upload"):
  92. write_msg({"error": {"code": 32, "message": "Wrong lfs init operation"}})
  93. sys.exit(1)
  94. # The transfer process should use the information it needs from the
  95. # initiation structure, and also perform any one-off setup tasks it
  96. # needs to do. It should then respond on stdout with a simple empty
  97. # confirmation structure, as follows:
  98. write_msg({})
  99. # After the initiation exchange, git-lfs will send any number of
  100. # transfer requests to the stdin of the transfer process, in a serial sequence.
  101. while True:
  102. msg = read_msg()
  103. if msg is None:
  104. # When all transfers have been processed, git-lfs will send
  105. # a terminate event to the stdin of the transfer process.
  106. # On receiving this message the transfer process should
  107. # clean up and terminate. No response is expected.
  108. sys.exit(0)
  109. oid = msg["oid"]
  110. filepath = msg["path"]
  111. completion_url = msg["action"]["href"]
  112. header = msg["action"]["header"]
  113. chunk_size = int(header.pop("chunk_size"))
  114. presigned_urls: List[str] = list(header.values())
  115. # Send a "started" progress event to allow other workers to start.
  116. # Otherwise they're delayed until first "progress" event is reported,
  117. # i.e. after the first 5GB by default (!)
  118. write_msg(
  119. {
  120. "event": "progress",
  121. "oid": oid,
  122. "bytesSoFar": 1,
  123. "bytesSinceLast": 0,
  124. }
  125. )
  126. parts = []
  127. with open(filepath, "rb") as file:
  128. for i, presigned_url in enumerate(presigned_urls):
  129. with SliceFileObj(
  130. file,
  131. seek_from=i * chunk_size,
  132. read_limit=chunk_size,
  133. ) as data:
  134. r = get_session().put(presigned_url, data=data)
  135. hf_raise_for_status(r)
  136. parts.append(
  137. {
  138. "etag": r.headers.get("etag"),
  139. "partNumber": i + 1,
  140. }
  141. )
  142. # In order to support progress reporting while data is uploading / downloading,
  143. # the transfer process should post messages to stdout
  144. write_msg(
  145. {
  146. "event": "progress",
  147. "oid": oid,
  148. "bytesSoFar": (i + 1) * chunk_size,
  149. "bytesSinceLast": chunk_size,
  150. }
  151. )
  152. # Not precise but that's ok.
  153. r = get_session().post(
  154. completion_url,
  155. json={
  156. "oid": oid,
  157. "parts": parts,
  158. },
  159. )
  160. hf_raise_for_status(r)
  161. write_msg({"event": "complete", "oid": oid})