| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841 |
- # coding=utf-8
- # Copyright 2025-present, the HuggingFace Inc. team.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- """Contains the 'hf cache' command group with cache management subcommands."""
- import csv
- import json
- import re
- import sys
- import time
- from collections import defaultdict
- from dataclasses import dataclass
- from enum import Enum
- from typing import Annotated, Any, Callable, Dict, List, Mapping, Optional, Tuple
- import typer
- from ..utils import (
- ANSI,
- CachedRepoInfo,
- CachedRevisionInfo,
- CacheNotFound,
- HFCacheInfo,
- _format_size,
- scan_cache_dir,
- tabulate,
- )
- from ..utils._parsing import parse_duration, parse_size
- from ._cli_utils import RepoIdArg, RepoTypeOpt, RevisionOpt, TokenOpt, get_hf_api, typer_factory
- cache_cli = typer_factory(help="Manage local cache directory.")
- #### Cache helper utilities
- class OutputFormat(str, Enum):
- table = "table"
- json = "json"
- csv = "csv"
- @dataclass(frozen=True)
- class _DeletionResolution:
- revisions: frozenset[str]
- selected: dict[CachedRepoInfo, frozenset[CachedRevisionInfo]]
- missing: tuple[str, ...]
- _FILTER_PATTERN = re.compile(r"^(?P<key>[a-zA-Z_]+)\s*(?P<op>==|!=|>=|<=|>|<|=)\s*(?P<value>.+)$")
- _ALLOWED_OPERATORS = {"=", "!=", ">", "<", ">=", "<="}
- _FILTER_KEYS = {"accessed", "modified", "refs", "size", "type"}
- _SORT_KEYS = {"accessed", "modified", "name", "size"}
- _SORT_PATTERN = re.compile(r"^(?P<key>[a-zA-Z_]+)(?::(?P<order>asc|desc))?$")
- _SORT_DEFAULT_ORDER = {
- # Default ordering: accessed/modified/size are descending (newest/biggest first), name is ascending
- "accessed": "desc",
- "modified": "desc",
- "size": "desc",
- "name": "asc",
- }
- # Dynamically generate SortOptions enum from _SORT_KEYS
- _sort_options_dict = {}
- for key in sorted(_SORT_KEYS):
- _sort_options_dict[key] = key
- _sort_options_dict[f"{key}_asc"] = f"{key}:asc"
- _sort_options_dict[f"{key}_desc"] = f"{key}:desc"
- SortOptions = Enum("SortOptions", _sort_options_dict, type=str, module=__name__) # type: ignore
- @dataclass(frozen=True)
- class CacheDeletionCounts:
- """Simple counters summarizing cache deletions for CLI messaging."""
- repo_count: int
- partial_revision_count: int
- total_revision_count: int
- CacheEntry = Tuple[CachedRepoInfo, Optional[CachedRevisionInfo]]
- RepoRefsMap = Dict[CachedRepoInfo, frozenset[str]]
- def summarize_deletions(
- selected_by_repo: Mapping[CachedRepoInfo, frozenset[CachedRevisionInfo]],
- ) -> CacheDeletionCounts:
- """Summarize deletions across repositories."""
- repo_count = 0
- total_revisions = 0
- revisions_in_full_repos = 0
- for repo, revisions in selected_by_repo.items():
- total_revisions += len(revisions)
- if len(revisions) == len(repo.revisions):
- repo_count += 1
- revisions_in_full_repos += len(revisions)
- partial_revision_count = total_revisions - revisions_in_full_repos
- return CacheDeletionCounts(repo_count, partial_revision_count, total_revisions)
- def print_cache_selected_revisions(selected_by_repo: Mapping[CachedRepoInfo, frozenset[CachedRevisionInfo]]) -> None:
- """Pretty-print selected cache revisions during confirmation prompts."""
- for repo in sorted(selected_by_repo.keys(), key=lambda repo: (repo.repo_type, repo.repo_id.lower())):
- repo_key = f"{repo.repo_type}/{repo.repo_id}"
- revisions = sorted(selected_by_repo[repo], key=lambda rev: rev.commit_hash)
- if len(revisions) == len(repo.revisions):
- print(f" - {repo_key} (entire repo)")
- continue
- print(f" - {repo_key}:")
- for revision in revisions:
- refs = " ".join(sorted(revision.refs)) or "(detached)"
- print(f" {revision.commit_hash} [{refs}] {revision.size_on_disk_str}")
- def build_cache_index(
- hf_cache_info: HFCacheInfo,
- ) -> Tuple[
- Dict[str, CachedRepoInfo],
- Dict[str, Tuple[CachedRepoInfo, CachedRevisionInfo]],
- ]:
- """Create lookup tables so CLI commands can resolve repo ids and revisions quickly."""
- repo_lookup: dict[str, CachedRepoInfo] = {}
- revision_lookup: dict[str, tuple[CachedRepoInfo, CachedRevisionInfo]] = {}
- for repo in hf_cache_info.repos:
- repo_key = repo.cache_id.lower()
- repo_lookup[repo_key] = repo
- for revision in repo.revisions:
- revision_lookup[revision.commit_hash.lower()] = (repo, revision)
- return repo_lookup, revision_lookup
- def collect_cache_entries(
- hf_cache_info: HFCacheInfo, *, include_revisions: bool
- ) -> Tuple[List[CacheEntry], RepoRefsMap]:
- """Flatten cache metadata into rows consumed by `hf cache ls`."""
- entries: List[CacheEntry] = []
- repo_refs_map: RepoRefsMap = {}
- sorted_repos = sorted(hf_cache_info.repos, key=lambda repo: (repo.repo_type, repo.repo_id.lower()))
- for repo in sorted_repos:
- repo_refs_map[repo] = frozenset({ref for revision in repo.revisions for ref in revision.refs})
- if include_revisions:
- for revision in sorted(repo.revisions, key=lambda rev: rev.commit_hash):
- entries.append((repo, revision))
- else:
- entries.append((repo, None))
- if include_revisions:
- entries.sort(
- key=lambda entry: (
- entry[0].cache_id,
- entry[1].commit_hash if entry[1] is not None else "",
- )
- )
- else:
- entries.sort(key=lambda entry: entry[0].cache_id)
- return entries, repo_refs_map
- def compile_cache_filter(
- expr: str, repo_refs_map: RepoRefsMap
- ) -> Callable[[CachedRepoInfo, Optional[CachedRevisionInfo], float], bool]:
- """Convert a `hf cache ls` filter expression into the yes/no test we apply to each cache entry before displaying it."""
- match = _FILTER_PATTERN.match(expr.strip())
- if not match:
- raise ValueError(f"Invalid filter expression: '{expr}'.")
- key = match.group("key").lower()
- op = match.group("op")
- value_raw = match.group("value").strip()
- if op not in _ALLOWED_OPERATORS:
- raise ValueError(f"Unsupported operator '{op}' in filter '{expr}'. Must be one of {list(_ALLOWED_OPERATORS)}.")
- if key not in _FILTER_KEYS:
- raise ValueError(f"Unsupported filter key '{key}' in '{expr}'. Must be one of {list(_FILTER_KEYS)}.")
- # at this point we know that key is in `_FILTER_KEYS`
- if key == "size":
- size_threshold = parse_size(value_raw)
- return lambda repo, revision, _: _compare_numeric(
- revision.size_on_disk if revision is not None else repo.size_on_disk,
- op,
- size_threshold,
- )
- if key in {"modified", "accessed"}:
- seconds = parse_duration(value_raw.strip())
- def _time_filter(repo: CachedRepoInfo, revision: Optional[CachedRevisionInfo], now: float) -> bool:
- timestamp = (
- repo.last_accessed
- if key == "accessed"
- else revision.last_modified
- if revision is not None
- else repo.last_modified
- )
- if timestamp is None:
- return False
- return _compare_numeric(now - timestamp, op, seconds)
- return _time_filter
- if key == "type":
- expected = value_raw.lower()
- if op != "=":
- raise ValueError(f"Only '=' is supported for 'type' filters. Got '{op}'.")
- def _type_filter(repo: CachedRepoInfo, revision: Optional[CachedRevisionInfo], _: float) -> bool:
- return repo.repo_type.lower() == expected
- return _type_filter
- else: # key == "refs"
- if op != "=":
- raise ValueError(f"Only '=' is supported for 'refs' filters. Got {op}.")
- def _refs_filter(repo: CachedRepoInfo, revision: Optional[CachedRevisionInfo], _: float) -> bool:
- refs = revision.refs if revision is not None else repo_refs_map.get(repo, frozenset())
- return value_raw.lower() in [ref.lower() for ref in refs]
- return _refs_filter
- def _build_cache_export_payload(
- entries: List[CacheEntry], *, include_revisions: bool, repo_refs_map: RepoRefsMap
- ) -> List[Dict[str, Any]]:
- """Normalize cache entries into serializable records for JSON/CSV exports."""
- payload: List[Dict[str, Any]] = []
- for repo, revision in entries:
- if include_revisions:
- if revision is None:
- continue
- record: Dict[str, Any] = {
- "repo_id": repo.repo_id,
- "repo_type": repo.repo_type,
- "revision": revision.commit_hash,
- "snapshot_path": str(revision.snapshot_path),
- "size_on_disk": revision.size_on_disk,
- "last_accessed": repo.last_accessed,
- "last_modified": revision.last_modified,
- "refs": sorted(revision.refs),
- }
- else:
- record = {
- "repo_id": repo.repo_id,
- "repo_type": repo.repo_type,
- "size_on_disk": repo.size_on_disk,
- "last_accessed": repo.last_accessed,
- "last_modified": repo.last_modified,
- "refs": sorted(repo_refs_map.get(repo, frozenset())),
- }
- payload.append(record)
- return payload
- def print_cache_entries_table(
- entries: List[CacheEntry], *, include_revisions: bool, repo_refs_map: RepoRefsMap
- ) -> None:
- """Render cache entries as a table and show a human-readable summary."""
- if not entries:
- message = "No cached revisions found." if include_revisions else "No cached repositories found."
- print(message)
- return
- table_rows: List[List[str]]
- if include_revisions:
- headers = ["ID", "REVISION", "SIZE", "LAST_MODIFIED", "REFS"]
- table_rows = [
- [
- repo.cache_id,
- revision.commit_hash,
- revision.size_on_disk_str.rjust(8),
- revision.last_modified_str,
- " ".join(sorted(revision.refs)),
- ]
- for repo, revision in entries
- if revision is not None
- ]
- else:
- headers = ["ID", "SIZE", "LAST_ACCESSED", "LAST_MODIFIED", "REFS"]
- table_rows = [
- [
- repo.cache_id,
- repo.size_on_disk_str.rjust(8),
- repo.last_accessed_str or "",
- repo.last_modified_str,
- " ".join(sorted(repo_refs_map.get(repo, frozenset()))),
- ]
- for repo, _ in entries
- ]
- print(tabulate(table_rows, headers=headers)) # type: ignore[arg-type]
- unique_repos = {repo for repo, _ in entries}
- repo_count = len(unique_repos)
- if include_revisions:
- revision_count = sum(1 for _, revision in entries if revision is not None)
- total_size = sum(revision.size_on_disk for _, revision in entries if revision is not None)
- else:
- revision_count = sum(len(repo.revisions) for repo in unique_repos)
- total_size = sum(repo.size_on_disk for repo in unique_repos)
- summary = f"\nFound {repo_count} repo(s) for a total of {revision_count} revision(s) and {_format_size(total_size)} on disk."
- print(ANSI.bold(summary))
- def print_cache_entries_json(
- entries: List[CacheEntry], *, include_revisions: bool, repo_refs_map: RepoRefsMap
- ) -> None:
- """Dump cache entries as JSON for scripting or automation."""
- payload = _build_cache_export_payload(entries, include_revisions=include_revisions, repo_refs_map=repo_refs_map)
- json.dump(payload, sys.stdout, indent=2)
- sys.stdout.write("\n")
- def print_cache_entries_csv(entries: List[CacheEntry], *, include_revisions: bool, repo_refs_map: RepoRefsMap) -> None:
- """Export cache entries as CSV rows with the shared payload format."""
- records = _build_cache_export_payload(entries, include_revisions=include_revisions, repo_refs_map=repo_refs_map)
- writer = csv.writer(sys.stdout)
- if include_revisions:
- headers = [
- "repo_id",
- "repo_type",
- "revision",
- "snapshot_path",
- "size_on_disk",
- "last_accessed",
- "last_modified",
- "refs",
- ]
- else:
- headers = ["repo_id", "repo_type", "size_on_disk", "last_accessed", "last_modified", "refs"]
- writer.writerow(headers)
- if not records:
- return
- for record in records:
- refs = record["refs"]
- if include_revisions:
- row = [
- record.get("repo_id", ""),
- record.get("repo_type", ""),
- record.get("revision", ""),
- record.get("snapshot_path", ""),
- record.get("size_on_disk"),
- record.get("last_accessed"),
- record.get("last_modified"),
- " ".join(refs) if refs else "",
- ]
- else:
- row = [
- record.get("repo_id", ""),
- record.get("repo_type", ""),
- record.get("size_on_disk"),
- record.get("last_accessed"),
- record.get("last_modified"),
- " ".join(refs) if refs else "",
- ]
- writer.writerow(row)
- def _compare_numeric(left: Optional[float], op: str, right: float) -> bool:
- """Evaluate numeric comparisons for filters."""
- if left is None:
- return False
- comparisons = {
- "=": left == right,
- "!=": left != right,
- ">": left > right,
- "<": left < right,
- ">=": left >= right,
- "<=": left <= right,
- }
- if op not in comparisons:
- raise ValueError(f"Unsupported numeric comparison operator: {op}")
- return comparisons[op]
- def compile_cache_sort(sort_expr: str) -> tuple[Callable[[CacheEntry], tuple[Any, ...]], bool]:
- """Convert a `hf cache ls` sort expression into a key function for sorting entries.
- Returns:
- A tuple of (key_function, reverse_flag) where reverse_flag indicates whether
- to sort in descending order (True) or ascending order (False).
- """
- match = _SORT_PATTERN.match(sort_expr.strip().lower())
- if not match:
- raise ValueError(f"Invalid sort expression: '{sort_expr}'. Expected format: 'key' or 'key:asc' or 'key:desc'.")
- key = match.group("key").lower()
- explicit_order = match.group("order")
- if key not in _SORT_KEYS:
- raise ValueError(f"Unsupported sort key '{key}' in '{sort_expr}'. Must be one of {list(_SORT_KEYS)}.")
- # Use explicit order if provided, otherwise use default for the key
- order = explicit_order if explicit_order else _SORT_DEFAULT_ORDER[key]
- reverse = order == "desc"
- def _sort_key(entry: CacheEntry) -> tuple[Any, ...]:
- repo, revision = entry
- if key == "name":
- # Sort by cache_id (repo type/id)
- value: Any = repo.cache_id.lower()
- return (value,)
- if key == "size":
- # Use revision size if available, otherwise repo size
- value = revision.size_on_disk if revision is not None else repo.size_on_disk
- return (value,)
- if key == "accessed":
- # For revisions, accessed is not available per-revision, use repo's last_accessed
- # For repos, use repo's last_accessed
- value = repo.last_accessed if repo.last_accessed is not None else 0.0
- return (value,)
- if key == "modified":
- # Use revision's last_modified if available, otherwise repo's last_modified
- if revision is not None:
- value = revision.last_modified if revision.last_modified is not None else 0.0
- else:
- value = repo.last_modified if repo.last_modified is not None else 0.0
- return (value,)
- # Should never reach here due to validation above
- raise ValueError(f"Unsupported sort key: {key}")
- return _sort_key, reverse
- def _resolve_deletion_targets(hf_cache_info: HFCacheInfo, targets: list[str]) -> _DeletionResolution:
- """Resolve the deletion targets into a deletion resolution."""
- repo_lookup, revision_lookup = build_cache_index(hf_cache_info)
- selected: dict[CachedRepoInfo, set[CachedRevisionInfo]] = defaultdict(set)
- revisions: set[str] = set()
- missing: list[str] = []
- for raw_target in targets:
- target = raw_target.strip()
- if not target:
- continue
- lowered = target.lower()
- if re.fullmatch(r"[0-9a-fA-F]{40}", lowered):
- match = revision_lookup.get(lowered)
- if match is None:
- missing.append(raw_target)
- continue
- repo, revision = match
- selected[repo].add(revision)
- revisions.add(revision.commit_hash)
- continue
- matched_repo = repo_lookup.get(lowered)
- if matched_repo is None:
- missing.append(raw_target)
- continue
- for revision in matched_repo.revisions:
- selected[matched_repo].add(revision)
- revisions.add(revision.commit_hash)
- frozen_selected = {repo: frozenset(revs) for repo, revs in selected.items()}
- return _DeletionResolution(
- revisions=frozenset(revisions),
- selected=frozen_selected,
- missing=tuple(missing),
- )
- #### Cache CLI commands
- @cache_cli.command()
- def ls(
- cache_dir: Annotated[
- Optional[str],
- typer.Option(
- help="Cache directory to scan (defaults to Hugging Face cache).",
- ),
- ] = None,
- revisions: Annotated[
- bool,
- typer.Option(
- help="Include revisions in the output instead of aggregated repositories.",
- ),
- ] = False,
- filter: Annotated[
- Optional[list[str]],
- typer.Option(
- "-f",
- "--filter",
- help="Filter entries (e.g. 'size>1GB', 'type=model', 'accessed>7d'). Can be used multiple times.",
- ),
- ] = None,
- format: Annotated[
- OutputFormat,
- typer.Option(
- help="Output format.",
- ),
- ] = OutputFormat.table,
- quiet: Annotated[
- bool,
- typer.Option(
- "-q",
- "--quiet",
- help="Print only IDs (repo IDs or revision hashes).",
- ),
- ] = False,
- sort: Annotated[
- Optional[SortOptions],
- typer.Option(
- help="Sort entries by key. Supported keys: 'accessed', 'modified', 'name', 'size'. "
- "Append ':asc' or ':desc' to explicitly set the order (e.g., 'modified:asc'). "
- "Defaults: 'accessed', 'modified', 'size' default to 'desc' (newest/biggest first); "
- "'name' defaults to 'asc' (alphabetical).",
- ),
- ] = None,
- limit: Annotated[
- Optional[int],
- typer.Option(
- help="Limit the number of results returned. Returns only the top N entries after sorting.",
- ),
- ] = None,
- ) -> None:
- """List cached repositories or revisions."""
- try:
- hf_cache_info = scan_cache_dir(cache_dir)
- except CacheNotFound as exc:
- print(f"Cache directory not found: {str(exc.cache_dir)}")
- raise typer.Exit(code=1) from exc
- filters = filter or []
- entries, repo_refs_map = collect_cache_entries(hf_cache_info, include_revisions=revisions)
- try:
- filter_fns = [compile_cache_filter(expr, repo_refs_map) for expr in filters]
- except ValueError as exc:
- raise typer.BadParameter(str(exc)) from exc
- now = time.time()
- for fn in filter_fns:
- entries = [entry for entry in entries if fn(entry[0], entry[1], now)]
- # Apply sorting if requested
- if sort:
- try:
- sort_key_fn, reverse = compile_cache_sort(sort.value)
- entries.sort(key=sort_key_fn, reverse=reverse)
- except ValueError as exc:
- raise typer.BadParameter(str(exc)) from exc
- # Apply limit if requested
- if limit is not None:
- if limit < 0:
- raise typer.BadParameter(f"Limit must be a positive integer, got {limit}.")
- entries = entries[:limit]
- if quiet:
- for repo, revision in entries:
- print(revision.commit_hash if revision is not None else repo.cache_id)
- return
- formatters = {
- OutputFormat.table: print_cache_entries_table,
- OutputFormat.json: print_cache_entries_json,
- OutputFormat.csv: print_cache_entries_csv,
- }
- return formatters[format](entries, include_revisions=revisions, repo_refs_map=repo_refs_map)
- @cache_cli.command()
- def rm(
- targets: Annotated[
- list[str],
- typer.Argument(
- help="One or more repo IDs (e.g. model/bert-base-uncased) or revision hashes to delete.",
- ),
- ],
- cache_dir: Annotated[
- Optional[str],
- typer.Option(
- help="Cache directory to scan (defaults to Hugging Face cache).",
- ),
- ] = None,
- yes: Annotated[
- bool,
- typer.Option(
- "-y",
- "--yes",
- help="Skip confirmation prompt.",
- ),
- ] = False,
- dry_run: Annotated[
- bool,
- typer.Option(
- help="Preview deletions without removing anything.",
- ),
- ] = False,
- ) -> None:
- """Remove cached repositories or revisions."""
- try:
- hf_cache_info = scan_cache_dir(cache_dir)
- except CacheNotFound as exc:
- print(f"Cache directory not found: {str(exc.cache_dir)}")
- raise typer.Exit(code=1)
- resolution = _resolve_deletion_targets(hf_cache_info, targets)
- if resolution.missing:
- print("Could not find the following targets in the cache:")
- for entry in resolution.missing:
- print(f" - {entry}")
- if len(resolution.revisions) == 0:
- print("Nothing to delete.")
- raise typer.Exit(code=0)
- strategy = hf_cache_info.delete_revisions(*sorted(resolution.revisions))
- counts = summarize_deletions(resolution.selected)
- summary_parts: list[str] = []
- if counts.repo_count:
- summary_parts.append(f"{counts.repo_count} repo(s)")
- if counts.partial_revision_count:
- summary_parts.append(f"{counts.partial_revision_count} revision(s)")
- if not summary_parts:
- summary_parts.append(f"{counts.total_revision_count} revision(s)")
- summary_text = " and ".join(summary_parts)
- print(f"About to delete {summary_text} totalling {strategy.expected_freed_size_str}.")
- print_cache_selected_revisions(resolution.selected)
- if dry_run:
- print("Dry run: no files were deleted.")
- return
- if not yes and not typer.confirm("Proceed with deletion?", default=False):
- print("Deletion cancelled.")
- return
- strategy.execute()
- counts = summarize_deletions(resolution.selected)
- print(
- f"Deleted {counts.repo_count} repo(s) and {counts.total_revision_count} revision(s); freed {strategy.expected_freed_size_str}."
- )
- @cache_cli.command()
- def prune(
- cache_dir: Annotated[
- Optional[str],
- typer.Option(
- help="Cache directory to scan (defaults to Hugging Face cache).",
- ),
- ] = None,
- yes: Annotated[
- bool,
- typer.Option(
- "-y",
- "--yes",
- help="Skip confirmation prompt.",
- ),
- ] = False,
- dry_run: Annotated[
- bool,
- typer.Option(
- help="Preview deletions without removing anything.",
- ),
- ] = False,
- ) -> None:
- """Remove detached revisions from the cache."""
- try:
- hf_cache_info = scan_cache_dir(cache_dir)
- except CacheNotFound as exc:
- print(f"Cache directory not found: {str(exc.cache_dir)}")
- raise typer.Exit(code=1)
- selected: dict[CachedRepoInfo, frozenset[CachedRevisionInfo]] = {}
- revisions: set[str] = set()
- for repo in hf_cache_info.repos:
- detached = frozenset(revision for revision in repo.revisions if len(revision.refs) == 0)
- if not detached:
- continue
- selected[repo] = detached
- revisions.update(revision.commit_hash for revision in detached)
- if len(revisions) == 0:
- print("No unreferenced revisions found. Nothing to prune.")
- return
- resolution = _DeletionResolution(
- revisions=frozenset(revisions),
- selected=selected,
- missing=(),
- )
- strategy = hf_cache_info.delete_revisions(*sorted(resolution.revisions))
- counts = summarize_deletions(selected)
- print(
- f"About to delete {counts.total_revision_count} unreferenced revision(s) ({strategy.expected_freed_size_str} total)."
- )
- print_cache_selected_revisions(selected)
- if dry_run:
- print("Dry run: no files were deleted.")
- return
- if not yes and not typer.confirm("Proceed?"):
- print("Pruning cancelled.")
- return
- strategy.execute()
- print(f"Deleted {counts.total_revision_count} unreferenced revision(s); freed {strategy.expected_freed_size_str}.")
- @cache_cli.command()
- def verify(
- repo_id: RepoIdArg,
- repo_type: RepoTypeOpt = RepoTypeOpt.model,
- revision: RevisionOpt = None,
- cache_dir: Annotated[
- Optional[str],
- typer.Option(
- help="Cache directory to use when verifying files from cache (defaults to Hugging Face cache).",
- ),
- ] = None,
- local_dir: Annotated[
- Optional[str],
- typer.Option(
- help="If set, verify files under this directory instead of the cache.",
- ),
- ] = None,
- fail_on_missing_files: Annotated[
- bool,
- typer.Option(
- "--fail-on-missing-files",
- help="Fail if some files exist on the remote but are missing locally.",
- ),
- ] = False,
- fail_on_extra_files: Annotated[
- bool,
- typer.Option(
- "--fail-on-extra-files",
- help="Fail if some files exist locally but are not present on the remote revision.",
- ),
- ] = False,
- token: TokenOpt = None,
- ) -> None:
- """Verify checksums for a single repo revision from cache or a local directory.
- Examples:
- - Verify main revision in cache: `hf cache verify gpt2`
- - Verify specific revision: `hf cache verify gpt2 --revision refs/pr/1`
- - Verify dataset: `hf cache verify karpathy/fineweb-edu-100b-shuffle --repo-type dataset`
- - Verify local dir: `hf cache verify deepseek-ai/DeepSeek-OCR --local-dir /path/to/repo`
- """
- if local_dir is not None and cache_dir is not None:
- print("Cannot pass both --local-dir and --cache-dir. Use one or the other.")
- raise typer.Exit(code=2)
- api = get_hf_api(token=token)
- result = api.verify_repo_checksums(
- repo_id=repo_id,
- repo_type=repo_type.value if hasattr(repo_type, "value") else str(repo_type),
- revision=revision,
- local_dir=local_dir,
- cache_dir=cache_dir,
- token=token,
- )
- exit_code = 0
- has_mismatches = bool(result.mismatches)
- if has_mismatches:
- print("❌ Checksum verification failed for the following file(s):")
- for m in result.mismatches:
- print(f" - {m['path']}: expected {m['expected']} ({m['algorithm']}), got {m['actual']}")
- exit_code = 1
- if result.missing_paths:
- if fail_on_missing_files:
- print("Missing files (present remotely, absent locally):")
- for p in result.missing_paths:
- print(f" - {p}")
- exit_code = 1
- else:
- warning = (
- f"{len(result.missing_paths)} remote file(s) are missing locally. "
- "Use --fail-on-missing-files for details."
- )
- print(f"⚠️ {warning}")
- if result.extra_paths:
- if fail_on_extra_files:
- print("Extra files (present locally, absent remotely):")
- for p in result.extra_paths:
- print(f" - {p}")
- exit_code = 1
- else:
- warning = (
- f"{len(result.extra_paths)} local file(s) do not exist on the remote repo. "
- "Use --fail-on-extra-files for details."
- )
- print(f"⚠️ {warning}")
- verified_location = result.verified_path
- if exit_code != 0:
- print(f"❌ Verification failed for '{repo_id}' ({repo_type.value}) in {verified_location}.")
- print(f" Revision: {result.revision}")
- raise typer.Exit(code=exit_code)
- print(f"✅ Verified {result.checked_count} file(s) for '{repo_id}' ({repo_type.value}) in {verified_location}")
- print(" All checksums match.")
|