| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100 |
- # Copyright 2025 The HuggingFace Team. All rights reserved.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- """Contains commands to interact with jobs on the Hugging Face Hub.
- Usage:
- # run a job
- hf jobs run <image> <command>
- # List running or completed jobs
- hf jobs ps [-a] [-f key=value] [--format TEMPLATE]
- # Stream logs from a job
- hf jobs logs <job-id>
- # Inspect detailed information about a job
- hf jobs inspect <job-id>
- # Cancel a running job
- hf jobs cancel <job-id>
- """
- import json
- import os
- import re
- from argparse import Namespace, _SubParsersAction
- from dataclasses import asdict
- from pathlib import Path
- from typing import Dict, List, Optional, Union
- import requests
- from huggingface_hub import HfApi, SpaceHardware, get_token
- from huggingface_hub.utils import logging
- from huggingface_hub.utils._dotenv import load_dotenv
- from . import BaseHuggingfaceCLICommand
- logger = logging.get_logger(__name__)
- SUGGESTED_FLAVORS = [item.value for item in SpaceHardware if item.value != "zero-a10g"]
- class JobsCommands(BaseHuggingfaceCLICommand):
- @staticmethod
- def register_subcommand(parser: _SubParsersAction):
- jobs_parser = parser.add_parser("jobs", help="Run and manage Jobs on the Hub.")
- jobs_subparsers = jobs_parser.add_subparsers(help="huggingface.co jobs related commands")
- # Show help if no subcommand is provided
- jobs_parser.set_defaults(func=lambda args: jobs_parser.print_help())
- # Register commands
- InspectCommand.register_subcommand(jobs_subparsers)
- LogsCommand.register_subcommand(jobs_subparsers)
- PsCommand.register_subcommand(jobs_subparsers)
- RunCommand.register_subcommand(jobs_subparsers)
- CancelCommand.register_subcommand(jobs_subparsers)
- UvCommand.register_subcommand(jobs_subparsers)
- ScheduledJobsCommands.register_subcommand(jobs_subparsers)
- class RunCommand(BaseHuggingfaceCLICommand):
- @staticmethod
- def register_subcommand(parser: _SubParsersAction) -> None:
- run_parser = parser.add_parser("run", help="Run a Job")
- run_parser.add_argument("image", type=str, help="The Docker image to use.")
- run_parser.add_argument("-e", "--env", action="append", help="Set environment variables. E.g. --env ENV=value")
- run_parser.add_argument(
- "-s",
- "--secrets",
- action="append",
- help=(
- "Set secret environment variables. E.g. --secrets SECRET=value "
- "or `--secrets HF_TOKEN` to pass your Hugging Face token."
- ),
- )
- run_parser.add_argument("--env-file", type=str, help="Read in a file of environment variables.")
- run_parser.add_argument("--secrets-file", type=str, help="Read in a file of secret environment variables.")
- run_parser.add_argument(
- "--flavor",
- type=str,
- help=f"Flavor for the hardware, as in HF Spaces. Defaults to `cpu-basic`. Possible values: {', '.join(SUGGESTED_FLAVORS)}.",
- )
- run_parser.add_argument(
- "--timeout",
- type=str,
- help="Max duration: int/float with s (seconds, default), m (minutes), h (hours) or d (days).",
- )
- run_parser.add_argument(
- "-d",
- "--detach",
- action="store_true",
- help="Run the Job in the background and print the Job ID.",
- )
- run_parser.add_argument(
- "--namespace",
- type=str,
- help="The namespace where the Job will be created. Defaults to the current user's namespace.",
- )
- run_parser.add_argument(
- "--token",
- type=str,
- help="A User Access Token generated from https://huggingface.co/settings/tokens",
- )
- run_parser.add_argument("command", nargs="...", help="The command to run.")
- run_parser.set_defaults(func=RunCommand)
- def __init__(self, args: Namespace) -> None:
- self.image: str = args.image
- self.command: List[str] = args.command
- self.env: dict[str, Optional[str]] = {}
- if args.env_file:
- self.env.update(load_dotenv(Path(args.env_file).read_text(), environ=os.environ.copy()))
- for env_value in args.env or []:
- self.env.update(load_dotenv(env_value, environ=os.environ.copy()))
- self.secrets: dict[str, Optional[str]] = {}
- extended_environ = _get_extended_environ()
- if args.secrets_file:
- self.secrets.update(load_dotenv(Path(args.secrets_file).read_text(), environ=extended_environ))
- for secret in args.secrets or []:
- self.secrets.update(load_dotenv(secret, environ=extended_environ))
- self.flavor: Optional[SpaceHardware] = args.flavor
- self.timeout: Optional[str] = args.timeout
- self.detach: bool = args.detach
- self.namespace: Optional[str] = args.namespace
- self.token: Optional[str] = args.token
- def run(self) -> None:
- api = HfApi(token=self.token)
- job = api.run_job(
- image=self.image,
- command=self.command,
- env=self.env,
- secrets=self.secrets,
- flavor=self.flavor,
- timeout=self.timeout,
- namespace=self.namespace,
- )
- # Always print the job ID to the user
- print(f"Job started with ID: {job.id}")
- print(f"View at: {job.url}")
- if self.detach:
- return
- # Now let's stream the logs
- for log in api.fetch_job_logs(job_id=job.id):
- print(log)
- class LogsCommand(BaseHuggingfaceCLICommand):
- @staticmethod
- def register_subcommand(parser: _SubParsersAction) -> None:
- run_parser = parser.add_parser("logs", help="Fetch the logs of a Job")
- run_parser.add_argument("job_id", type=str, help="Job ID")
- run_parser.add_argument(
- "--namespace",
- type=str,
- help="The namespace where the job is running. Defaults to the current user's namespace.",
- )
- run_parser.add_argument(
- "--token", type=str, help="A User Access Token generated from https://huggingface.co/settings/tokens"
- )
- run_parser.set_defaults(func=LogsCommand)
- def __init__(self, args: Namespace) -> None:
- self.job_id: str = args.job_id
- self.namespace: Optional[str] = args.namespace
- self.token: Optional[str] = args.token
- def run(self) -> None:
- api = HfApi(token=self.token)
- for log in api.fetch_job_logs(job_id=self.job_id, namespace=self.namespace):
- print(log)
- def _tabulate(rows: List[List[Union[str, int]]], headers: List[str]) -> str:
- """
- Inspired by:
- - stackoverflow.com/a/8356620/593036
- - stackoverflow.com/questions/9535954/printing-lists-as-tabular-data
- """
- col_widths = [max(len(str(x)) for x in col) for col in zip(*rows, headers)]
- terminal_width = max(os.get_terminal_size().columns, len(headers) * 12)
- while len(headers) + sum(col_widths) > terminal_width:
- col_to_minimize = col_widths.index(max(col_widths))
- col_widths[col_to_minimize] //= 2
- if len(headers) + sum(col_widths) <= terminal_width:
- col_widths[col_to_minimize] = terminal_width - sum(col_widths) - len(headers) + col_widths[col_to_minimize]
- row_format = ("{{:{}}} " * len(headers)).format(*col_widths)
- lines = []
- lines.append(row_format.format(*headers))
- lines.append(row_format.format(*["-" * w for w in col_widths]))
- for row in rows:
- row_format_args = [
- str(x)[: col_width - 3] + "..." if len(str(x)) > col_width else str(x)
- for x, col_width in zip(row, col_widths)
- ]
- lines.append(row_format.format(*row_format_args))
- return "\n".join(lines)
- class PsCommand(BaseHuggingfaceCLICommand):
- @staticmethod
- def register_subcommand(parser: _SubParsersAction) -> None:
- run_parser = parser.add_parser("ps", help="List Jobs")
- run_parser.add_argument(
- "-a",
- "--all",
- action="store_true",
- help="Show all Jobs (default shows just running)",
- )
- run_parser.add_argument(
- "--namespace",
- type=str,
- help="The namespace from where it lists the jobs. Defaults to the current user's namespace.",
- )
- run_parser.add_argument(
- "--token",
- type=str,
- help="A User Access Token generated from https://huggingface.co/settings/tokens",
- )
- # Add Docker-style filtering argument
- run_parser.add_argument(
- "-f",
- "--filter",
- action="append",
- default=[],
- help="Filter output based on conditions provided (format: key=value)",
- )
- # Add option to format output
- run_parser.add_argument(
- "--format",
- type=str,
- help="Format output using a custom template",
- )
- run_parser.set_defaults(func=PsCommand)
- def __init__(self, args: Namespace) -> None:
- self.all: bool = args.all
- self.namespace: Optional[str] = args.namespace
- self.token: Optional[str] = args.token
- self.format: Optional[str] = args.format
- self.filters: Dict[str, str] = {}
- # Parse filter arguments (key=value pairs)
- for f in args.filter:
- if "=" in f:
- key, value = f.split("=", 1)
- self.filters[key.lower()] = value
- else:
- print(f"Warning: Ignoring invalid filter format '{f}'. Use key=value format.")
- def run(self) -> None:
- """
- Fetch and display job information for the current user.
- Uses Docker-style filtering with -f/--filter flag and key=value pairs.
- """
- try:
- api = HfApi(token=self.token)
- # Fetch jobs data
- jobs = api.list_jobs(namespace=self.namespace)
- # Define table headers
- table_headers = ["JOB ID", "IMAGE/SPACE", "COMMAND", "CREATED", "STATUS"]
- # Process jobs data
- rows = []
- for job in jobs:
- # Extract job data for filtering
- status = job.status.stage if job.status else "UNKNOWN"
- # Skip job if not all jobs should be shown and status doesn't match criteria
- if not self.all and status not in ("RUNNING", "UPDATING"):
- continue
- # Extract job ID
- job_id = job.id
- # Extract image or space information
- image_or_space = job.docker_image or "N/A"
- # Extract and format command
- command = job.command or []
- command_str = " ".join(command) if command else "N/A"
- # Extract creation time
- created_at = job.created_at.strftime("%Y-%m-%d %H:%M:%S") if job.created_at else "N/A"
- # Create a dict with all job properties for filtering
- job_properties = {
- "id": job_id,
- "image": image_or_space,
- "status": status.lower(),
- "command": command_str,
- }
- # Check if job matches all filters
- if not self._matches_filters(job_properties):
- continue
- # Create row
- rows.append([job_id, image_or_space, command_str, created_at, status])
- # Handle empty results
- if not rows:
- filters_msg = ""
- if self.filters:
- filters_msg = f" matching filters: {', '.join([f'{k}={v}' for k, v in self.filters.items()])}"
- print(f"No jobs found{filters_msg}")
- return
- # Apply custom format if provided or use default tabular format
- self._print_output(rows, table_headers)
- except requests.RequestException as e:
- print(f"Error fetching jobs data: {e}")
- except (KeyError, ValueError, TypeError) as e:
- print(f"Error processing jobs data: {e}")
- except Exception as e:
- print(f"Unexpected error - {type(e).__name__}: {e}")
- def _matches_filters(self, job_properties: Dict[str, str]) -> bool:
- """Check if job matches all specified filters."""
- for key, pattern in self.filters.items():
- # Check if property exists
- if key not in job_properties:
- return False
- # Support pattern matching with wildcards
- if "*" in pattern or "?" in pattern:
- # Convert glob pattern to regex
- regex_pattern = pattern.replace("*", ".*").replace("?", ".")
- if not re.search(f"^{regex_pattern}$", job_properties[key], re.IGNORECASE):
- return False
- # Simple substring matching
- elif pattern.lower() not in job_properties[key].lower():
- return False
- return True
- def _print_output(self, rows, headers):
- """Print output according to the chosen format."""
- if self.format:
- # Custom template formatting (simplified)
- template = self.format
- for row in rows:
- line = template
- for i, field in enumerate(["id", "image", "command", "created", "status"]):
- placeholder = f"{{{{.{field}}}}}"
- if placeholder in line:
- line = line.replace(placeholder, str(row[i]))
- print(line)
- else:
- # Default tabular format
- print(
- _tabulate(
- rows,
- headers=headers,
- )
- )
- class InspectCommand(BaseHuggingfaceCLICommand):
- @staticmethod
- def register_subcommand(parser: _SubParsersAction) -> None:
- run_parser = parser.add_parser("inspect", help="Display detailed information on one or more Jobs")
- run_parser.add_argument(
- "--namespace",
- type=str,
- help="The namespace where the job is running. Defaults to the current user's namespace.",
- )
- run_parser.add_argument(
- "--token", type=str, help="A User Access Token generated from https://huggingface.co/settings/tokens"
- )
- run_parser.add_argument("job_ids", nargs="...", help="The jobs to inspect")
- run_parser.set_defaults(func=InspectCommand)
- def __init__(self, args: Namespace) -> None:
- self.namespace: Optional[str] = args.namespace
- self.token: Optional[str] = args.token
- self.job_ids: List[str] = args.job_ids
- def run(self) -> None:
- api = HfApi(token=self.token)
- jobs = [api.inspect_job(job_id=job_id, namespace=self.namespace) for job_id in self.job_ids]
- print(json.dumps([asdict(job) for job in jobs], indent=4, default=str))
- class CancelCommand(BaseHuggingfaceCLICommand):
- @staticmethod
- def register_subcommand(parser: _SubParsersAction) -> None:
- run_parser = parser.add_parser("cancel", help="Cancel a Job")
- run_parser.add_argument("job_id", type=str, help="Job ID")
- run_parser.add_argument(
- "--namespace",
- type=str,
- help="The namespace where the job is running. Defaults to the current user's namespace.",
- )
- run_parser.add_argument(
- "--token", type=str, help="A User Access Token generated from https://huggingface.co/settings/tokens"
- )
- run_parser.set_defaults(func=CancelCommand)
- def __init__(self, args: Namespace) -> None:
- self.job_id: str = args.job_id
- self.namespace = args.namespace
- self.token: Optional[str] = args.token
- def run(self) -> None:
- api = HfApi(token=self.token)
- api.cancel_job(job_id=self.job_id, namespace=self.namespace)
- class UvCommand(BaseHuggingfaceCLICommand):
- """Run UV scripts on Hugging Face infrastructure."""
- @staticmethod
- def register_subcommand(parser):
- """Register UV run subcommand."""
- uv_parser = parser.add_parser(
- "uv",
- help="Run UV scripts (Python with inline dependencies) on HF infrastructure",
- )
- subparsers = uv_parser.add_subparsers(dest="uv_command", help="UV commands", required=True)
- # Run command only
- run_parser = subparsers.add_parser(
- "run",
- help="Run a UV script (local file or URL) on HF infrastructure",
- )
- run_parser.add_argument("script", help="UV script to run (local file or URL)")
- run_parser.add_argument("script_args", nargs="...", help="Arguments for the script", default=[])
- run_parser.add_argument("--image", type=str, help="Use a custom Docker image with `uv` installed.")
- run_parser.add_argument(
- "--repo",
- help="Repository name for the script (creates ephemeral if not specified)",
- )
- run_parser.add_argument(
- "--flavor",
- type=str,
- help=f"Flavor for the hardware, as in HF Spaces. Defaults to `cpu-basic`. Possible values: {', '.join(SUGGESTED_FLAVORS)}.",
- )
- run_parser.add_argument("-e", "--env", action="append", help="Environment variables")
- run_parser.add_argument(
- "-s",
- "--secrets",
- action="append",
- help=(
- "Set secret environment variables. E.g. --secrets SECRET=value "
- "or `--secrets HF_TOKEN` to pass your Hugging Face token."
- ),
- )
- run_parser.add_argument("--env-file", type=str, help="Read in a file of environment variables.")
- run_parser.add_argument(
- "--secrets-file",
- type=str,
- help="Read in a file of secret environment variables.",
- )
- run_parser.add_argument("--timeout", type=str, help="Max duration (e.g., 30s, 5m, 1h)")
- run_parser.add_argument("-d", "--detach", action="store_true", help="Run in background")
- run_parser.add_argument(
- "--namespace",
- type=str,
- help="The namespace where the Job will be created. Defaults to the current user's namespace.",
- )
- run_parser.add_argument("--token", type=str, help="HF token")
- # UV options
- run_parser.add_argument("--with", action="append", help="Run with the given packages installed", dest="with_")
- run_parser.add_argument(
- "-p", "--python", type=str, help="The Python interpreter to use for the run environment"
- )
- run_parser.set_defaults(func=UvCommand)
- def __init__(self, args: Namespace) -> None:
- """Initialize the command with parsed arguments."""
- self.script = args.script
- self.script_args = args.script_args
- self.dependencies = args.with_
- self.python = args.python
- self.image = args.image
- self.env: dict[str, Optional[str]] = {}
- if args.env_file:
- self.env.update(load_dotenv(Path(args.env_file).read_text(), environ=os.environ.copy()))
- for env_value in args.env or []:
- self.env.update(load_dotenv(env_value, environ=os.environ.copy()))
- self.secrets: dict[str, Optional[str]] = {}
- extended_environ = _get_extended_environ()
- if args.secrets_file:
- self.secrets.update(load_dotenv(Path(args.secrets_file).read_text(), environ=extended_environ))
- for secret in args.secrets or []:
- self.secrets.update(load_dotenv(secret, environ=extended_environ))
- self.flavor: Optional[SpaceHardware] = args.flavor
- self.timeout: Optional[str] = args.timeout
- self.detach: bool = args.detach
- self.namespace: Optional[str] = args.namespace
- self.token: Optional[str] = args.token
- self._repo = args.repo
- def run(self) -> None:
- """Execute UV command."""
- logging.set_verbosity(logging.INFO)
- api = HfApi(token=self.token)
- job = api.run_uv_job(
- script=self.script,
- script_args=self.script_args,
- dependencies=self.dependencies,
- python=self.python,
- image=self.image,
- env=self.env,
- secrets=self.secrets,
- flavor=self.flavor,
- timeout=self.timeout,
- namespace=self.namespace,
- _repo=self._repo,
- )
- # Always print the job ID to the user
- print(f"Job started with ID: {job.id}")
- print(f"View at: {job.url}")
- if self.detach:
- return
- # Now let's stream the logs
- for log in api.fetch_job_logs(job_id=job.id):
- print(log)
- def _get_extended_environ() -> Dict[str, str]:
- extended_environ = os.environ.copy()
- if (token := get_token()) is not None:
- extended_environ["HF_TOKEN"] = token
- return extended_environ
- class ScheduledJobsCommands(BaseHuggingfaceCLICommand):
- @staticmethod
- def register_subcommand(parser: _SubParsersAction):
- scheduled_jobs_parser = parser.add_parser("scheduled", help="Create and manage scheduled Jobs on the Hub.")
- scheduled_jobs_subparsers = scheduled_jobs_parser.add_subparsers(
- help="huggingface.co scheduled jobs related commands"
- )
- # Show help if no subcommand is provided
- scheduled_jobs_parser.set_defaults(func=lambda args: scheduled_jobs_subparsers.print_help())
- # Register commands
- ScheduledRunCommand.register_subcommand(scheduled_jobs_subparsers)
- ScheduledPsCommand.register_subcommand(scheduled_jobs_subparsers)
- ScheduledInspectCommand.register_subcommand(scheduled_jobs_subparsers)
- ScheduledDeleteCommand.register_subcommand(scheduled_jobs_subparsers)
- ScheduledSuspendCommand.register_subcommand(scheduled_jobs_subparsers)
- ScheduledResumeCommand.register_subcommand(scheduled_jobs_subparsers)
- ScheduledUvCommand.register_subcommand(scheduled_jobs_subparsers)
- class ScheduledRunCommand(BaseHuggingfaceCLICommand):
- @staticmethod
- def register_subcommand(parser: _SubParsersAction) -> None:
- run_parser = parser.add_parser("run", help="Schedule a Job")
- run_parser.add_argument(
- "schedule",
- type=str,
- help="One of annually, yearly, monthly, weekly, daily, hourly, or a CRON schedule expression.",
- )
- run_parser.add_argument("image", type=str, help="The Docker image to use.")
- run_parser.add_argument(
- "--suspend",
- action="store_true",
- help="Suspend (pause) the scheduled Job",
- default=None,
- )
- run_parser.add_argument(
- "--concurrency",
- action="store_true",
- help="Allow multiple instances of this Job to run concurrently",
- default=None,
- )
- run_parser.add_argument("-e", "--env", action="append", help="Set environment variables. E.g. --env ENV=value")
- run_parser.add_argument(
- "-s",
- "--secrets",
- action="append",
- help=(
- "Set secret environment variables. E.g. --secrets SECRET=value "
- "or `--secrets HF_TOKEN` to pass your Hugging Face token."
- ),
- )
- run_parser.add_argument("--env-file", type=str, help="Read in a file of environment variables.")
- run_parser.add_argument("--secrets-file", type=str, help="Read in a file of secret environment variables.")
- run_parser.add_argument(
- "--flavor",
- type=str,
- help=f"Flavor for the hardware, as in HF Spaces. Defaults to `cpu-basic`. Possible values: {', '.join(SUGGESTED_FLAVORS)}.",
- )
- run_parser.add_argument(
- "--timeout",
- type=str,
- help="Max duration: int/float with s (seconds, default), m (minutes), h (hours) or d (days).",
- )
- run_parser.add_argument(
- "--namespace",
- type=str,
- help="The namespace where the scheduled Job will be created. Defaults to the current user's namespace.",
- )
- run_parser.add_argument(
- "--token",
- type=str,
- help="A User Access Token generated from https://huggingface.co/settings/tokens",
- )
- run_parser.add_argument("command", nargs="...", help="The command to run.")
- run_parser.set_defaults(func=ScheduledRunCommand)
- def __init__(self, args: Namespace) -> None:
- self.schedule: str = args.schedule
- self.image: str = args.image
- self.command: List[str] = args.command
- self.suspend: Optional[bool] = args.suspend
- self.concurrency: Optional[bool] = args.concurrency
- self.env: dict[str, Optional[str]] = {}
- if args.env_file:
- self.env.update(load_dotenv(Path(args.env_file).read_text(), environ=os.environ.copy()))
- for env_value in args.env or []:
- self.env.update(load_dotenv(env_value, environ=os.environ.copy()))
- self.secrets: dict[str, Optional[str]] = {}
- extended_environ = _get_extended_environ()
- if args.secrets_file:
- self.secrets.update(load_dotenv(Path(args.secrets_file).read_text(), environ=extended_environ))
- for secret in args.secrets or []:
- self.secrets.update(load_dotenv(secret, environ=extended_environ))
- self.flavor: Optional[SpaceHardware] = args.flavor
- self.timeout: Optional[str] = args.timeout
- self.namespace: Optional[str] = args.namespace
- self.token: Optional[str] = args.token
- def run(self) -> None:
- api = HfApi(token=self.token)
- scheduled_job = api.create_scheduled_job(
- image=self.image,
- command=self.command,
- schedule=self.schedule,
- suspend=self.suspend,
- concurrency=self.concurrency,
- env=self.env,
- secrets=self.secrets,
- flavor=self.flavor,
- timeout=self.timeout,
- namespace=self.namespace,
- )
- # Always print the scheduled job ID to the user
- print(f"Scheduled Job created with ID: {scheduled_job.id}")
- class ScheduledPsCommand(BaseHuggingfaceCLICommand):
- @staticmethod
- def register_subcommand(parser: _SubParsersAction) -> None:
- run_parser = parser.add_parser("ps", help="List scheduled Jobs")
- run_parser.add_argument(
- "-a",
- "--all",
- action="store_true",
- help="Show all scheduled Jobs (default hides suspended)",
- )
- run_parser.add_argument(
- "--namespace",
- type=str,
- help="The namespace from where it lists the jobs. Defaults to the current user's namespace.",
- )
- run_parser.add_argument(
- "--token",
- type=str,
- help="A User Access Token generated from https://huggingface.co/settings/tokens",
- )
- # Add Docker-style filtering argument
- run_parser.add_argument(
- "-f",
- "--filter",
- action="append",
- default=[],
- help="Filter output based on conditions provided (format: key=value)",
- )
- # Add option to format output
- run_parser.add_argument(
- "--format",
- type=str,
- help="Format output using a custom template",
- )
- run_parser.set_defaults(func=ScheduledPsCommand)
- def __init__(self, args: Namespace) -> None:
- self.all: bool = args.all
- self.namespace: Optional[str] = args.namespace
- self.token: Optional[str] = args.token
- self.format: Optional[str] = args.format
- self.filters: Dict[str, str] = {}
- # Parse filter arguments (key=value pairs)
- for f in args.filter:
- if "=" in f:
- key, value = f.split("=", 1)
- self.filters[key.lower()] = value
- else:
- print(f"Warning: Ignoring invalid filter format '{f}'. Use key=value format.")
- def run(self) -> None:
- """
- Fetch and display scheduked job information for the current user.
- Uses Docker-style filtering with -f/--filter flag and key=value pairs.
- """
- try:
- api = HfApi(token=self.token)
- # Fetch jobs data
- scheduled_jobs = api.list_scheduled_jobs(namespace=self.namespace)
- # Define table headers
- table_headers = [
- "ID",
- "SCHEDULE",
- "IMAGE/SPACE",
- "COMMAND",
- "LAST RUN",
- "NEXT RUN",
- "SUSPEND",
- ]
- # Process jobs data
- rows = []
- for scheduled_job in scheduled_jobs:
- # Extract job data for filtering
- suspend = scheduled_job.suspend
- # Skip job if not all jobs should be shown and status doesn't match criteria
- if not self.all and suspend:
- continue
- # Extract job ID
- scheduled_job_id = scheduled_job.id
- # Extract schedule
- schedule = scheduled_job.schedule
- # Extract image or space information
- image_or_space = scheduled_job.job_spec.docker_image or "N/A"
- # Extract and format command
- command = scheduled_job.job_spec.command or []
- command_str = " ".join(command) if command else "N/A"
- # Extract status
- last_job_at = (
- scheduled_job.status.last_job.at.strftime("%Y-%m-%d %H:%M:%S")
- if scheduled_job.status.last_job
- else "N/A"
- )
- next_job_run_at = (
- scheduled_job.status.next_job_run_at.strftime("%Y-%m-%d %H:%M:%S")
- if scheduled_job.status.next_job_run_at
- else "N/A"
- )
- # Create a dict with all job properties for filtering
- job_properties = {
- "id": scheduled_job_id,
- "image": image_or_space,
- "suspend": str(suspend),
- "command": command_str,
- }
- # Check if job matches all filters
- if not self._matches_filters(job_properties):
- continue
- # Create row
- rows.append(
- [
- scheduled_job_id,
- schedule,
- image_or_space,
- command_str,
- last_job_at,
- next_job_run_at,
- suspend,
- ]
- )
- # Handle empty results
- if not rows:
- filters_msg = ""
- if self.filters:
- filters_msg = f" matching filters: {', '.join([f'{k}={v}' for k, v in self.filters.items()])}"
- print(f"No scheduled jobs found{filters_msg}")
- return
- # Apply custom format if provided or use default tabular format
- self._print_output(rows, table_headers)
- except requests.RequestException as e:
- print(f"Error fetching scheduled jobs data: {e}")
- except (KeyError, ValueError, TypeError) as e:
- print(f"Error processing scheduled jobs data: {e}")
- except Exception as e:
- print(f"Unexpected error - {type(e).__name__}: {e}")
- def _matches_filters(self, job_properties: Dict[str, str]) -> bool:
- """Check if scheduled job matches all specified filters."""
- for key, pattern in self.filters.items():
- # Check if property exists
- if key not in job_properties:
- return False
- # Support pattern matching with wildcards
- if "*" in pattern or "?" in pattern:
- # Convert glob pattern to regex
- regex_pattern = pattern.replace("*", ".*").replace("?", ".")
- if not re.search(f"^{regex_pattern}$", job_properties[key], re.IGNORECASE):
- return False
- # Simple substring matching
- elif pattern.lower() not in job_properties[key].lower():
- return False
- return True
- def _print_output(self, rows, headers):
- """Print output according to the chosen format."""
- if self.format:
- # Custom template formatting (simplified)
- template = self.format
- for row in rows:
- line = template
- for i, field in enumerate(
- ["id", "schedule", "image", "command", "last_job_at", "next_job_run_at", "suspend"]
- ):
- placeholder = f"{{{{.{field}}}}}"
- if placeholder in line:
- line = line.replace(placeholder, str(row[i]))
- print(line)
- else:
- # Default tabular format
- print(
- _tabulate(
- rows,
- headers=headers,
- )
- )
- class ScheduledInspectCommand(BaseHuggingfaceCLICommand):
- @staticmethod
- def register_subcommand(parser: _SubParsersAction) -> None:
- run_parser = parser.add_parser("inspect", help="Display detailed information on one or more scheduled Jobs")
- run_parser.add_argument(
- "--namespace",
- type=str,
- help="The namespace where the scheduled job is. Defaults to the current user's namespace.",
- )
- run_parser.add_argument(
- "--token", type=str, help="A User Access Token generated from https://huggingface.co/settings/tokens"
- )
- run_parser.add_argument("scheduled_job_ids", nargs="...", help="The scheduled jobs to inspect")
- run_parser.set_defaults(func=ScheduledInspectCommand)
- def __init__(self, args: Namespace) -> None:
- self.namespace: Optional[str] = args.namespace
- self.token: Optional[str] = args.token
- self.scheduled_job_ids: List[str] = args.scheduled_job_ids
- def run(self) -> None:
- api = HfApi(token=self.token)
- scheduled_jobs = [
- api.inspect_scheduled_job(scheduled_job_id=scheduled_job_id, namespace=self.namespace)
- for scheduled_job_id in self.scheduled_job_ids
- ]
- print(json.dumps([asdict(scheduled_job) for scheduled_job in scheduled_jobs], indent=4, default=str))
- class ScheduledDeleteCommand(BaseHuggingfaceCLICommand):
- @staticmethod
- def register_subcommand(parser: _SubParsersAction) -> None:
- run_parser = parser.add_parser("delete", help="Delete a scheduled Job")
- run_parser.add_argument("scheduled_job_id", type=str, help="Scheduled Job ID")
- run_parser.add_argument(
- "--namespace",
- type=str,
- help="The namespace where the scheduled job is. Defaults to the current user's namespace.",
- )
- run_parser.add_argument(
- "--token", type=str, help="A User Access Token generated from https://huggingface.co/settings/tokens"
- )
- run_parser.set_defaults(func=ScheduledDeleteCommand)
- def __init__(self, args: Namespace) -> None:
- self.scheduled_job_id: str = args.scheduled_job_id
- self.namespace = args.namespace
- self.token: Optional[str] = args.token
- def run(self) -> None:
- api = HfApi(token=self.token)
- api.delete_scheduled_job(scheduled_job_id=self.scheduled_job_id, namespace=self.namespace)
- class ScheduledSuspendCommand(BaseHuggingfaceCLICommand):
- @staticmethod
- def register_subcommand(parser: _SubParsersAction) -> None:
- run_parser = parser.add_parser("suspend", help="Suspend (pause) a scheduled Job")
- run_parser.add_argument("scheduled_job_id", type=str, help="Scheduled Job ID")
- run_parser.add_argument(
- "--namespace",
- type=str,
- help="The namespace where the scheduled job is. Defaults to the current user's namespace.",
- )
- run_parser.add_argument(
- "--token", type=str, help="A User Access Token generated from https://huggingface.co/settings/tokens"
- )
- run_parser.set_defaults(func=ScheduledSuspendCommand)
- def __init__(self, args: Namespace) -> None:
- self.scheduled_job_id: str = args.scheduled_job_id
- self.namespace = args.namespace
- self.token: Optional[str] = args.token
- def run(self) -> None:
- api = HfApi(token=self.token)
- api.suspend_scheduled_job(scheduled_job_id=self.scheduled_job_id, namespace=self.namespace)
- class ScheduledResumeCommand(BaseHuggingfaceCLICommand):
- @staticmethod
- def register_subcommand(parser: _SubParsersAction) -> None:
- run_parser = parser.add_parser("resume", help="Resume (unpause) a scheduled Job")
- run_parser.add_argument("scheduled_job_id", type=str, help="Scheduled Job ID")
- run_parser.add_argument(
- "--namespace",
- type=str,
- help="The namespace where the scheduled job is. Defaults to the current user's namespace.",
- )
- run_parser.add_argument(
- "--token", type=str, help="A User Access Token generated from https://huggingface.co/settings/tokens"
- )
- run_parser.set_defaults(func=ScheduledResumeCommand)
- def __init__(self, args: Namespace) -> None:
- self.scheduled_job_id: str = args.scheduled_job_id
- self.namespace = args.namespace
- self.token: Optional[str] = args.token
- def run(self) -> None:
- api = HfApi(token=self.token)
- api.resume_scheduled_job(scheduled_job_id=self.scheduled_job_id, namespace=self.namespace)
- class ScheduledUvCommand(BaseHuggingfaceCLICommand):
- """Schedule UV scripts on Hugging Face infrastructure."""
- @staticmethod
- def register_subcommand(parser):
- """Register UV run subcommand."""
- uv_parser = parser.add_parser(
- "uv",
- help="Schedule UV scripts (Python with inline dependencies) on HF infrastructure",
- )
- subparsers = uv_parser.add_subparsers(dest="uv_command", help="UV commands", required=True)
- # Run command only
- run_parser = subparsers.add_parser(
- "run",
- help="Run a UV script (local file or URL) on HF infrastructure",
- )
- run_parser.add_argument(
- "schedule",
- type=str,
- help="One of annually, yearly, monthly, weekly, daily, hourly, or a CRON schedule expression.",
- )
- run_parser.add_argument("script", help="UV script to run (local file or URL)")
- run_parser.add_argument("script_args", nargs="...", help="Arguments for the script", default=[])
- run_parser.add_argument(
- "--suspend",
- action="store_true",
- help="Suspend (pause) the scheduled Job",
- default=None,
- )
- run_parser.add_argument(
- "--concurrency",
- action="store_true",
- help="Allow multiple instances of this Job to run concurrently",
- default=None,
- )
- run_parser.add_argument("--image", type=str, help="Use a custom Docker image with `uv` installed.")
- run_parser.add_argument(
- "--repo",
- help="Repository name for the script (creates ephemeral if not specified)",
- )
- run_parser.add_argument(
- "--flavor",
- type=str,
- help=f"Flavor for the hardware, as in HF Spaces. Defaults to `cpu-basic`. Possible values: {', '.join(SUGGESTED_FLAVORS)}.",
- )
- run_parser.add_argument("-e", "--env", action="append", help="Environment variables")
- run_parser.add_argument(
- "-s",
- "--secrets",
- action="append",
- help=(
- "Set secret environment variables. E.g. --secrets SECRET=value "
- "or `--secrets HF_TOKEN` to pass your Hugging Face token."
- ),
- )
- run_parser.add_argument("--env-file", type=str, help="Read in a file of environment variables.")
- run_parser.add_argument(
- "--secrets-file",
- type=str,
- help="Read in a file of secret environment variables.",
- )
- run_parser.add_argument("--timeout", type=str, help="Max duration (e.g., 30s, 5m, 1h)")
- run_parser.add_argument("-d", "--detach", action="store_true", help="Run in background")
- run_parser.add_argument(
- "--namespace",
- type=str,
- help="The namespace where the Job will be created. Defaults to the current user's namespace.",
- )
- run_parser.add_argument("--token", type=str, help="HF token")
- # UV options
- run_parser.add_argument("--with", action="append", help="Run with the given packages installed", dest="with_")
- run_parser.add_argument(
- "-p", "--python", type=str, help="The Python interpreter to use for the run environment"
- )
- run_parser.set_defaults(func=ScheduledUvCommand)
- def __init__(self, args: Namespace) -> None:
- """Initialize the command with parsed arguments."""
- self.schedule: str = args.schedule
- self.script = args.script
- self.script_args = args.script_args
- self.suspend: Optional[bool] = args.suspend
- self.concurrency: Optional[bool] = args.concurrency
- self.dependencies = args.with_
- self.python = args.python
- self.image = args.image
- self.env: dict[str, Optional[str]] = {}
- if args.env_file:
- self.env.update(load_dotenv(Path(args.env_file).read_text(), environ=os.environ.copy()))
- for env_value in args.env or []:
- self.env.update(load_dotenv(env_value, environ=os.environ.copy()))
- self.secrets: dict[str, Optional[str]] = {}
- extended_environ = _get_extended_environ()
- if args.secrets_file:
- self.secrets.update(load_dotenv(Path(args.secrets_file).read_text(), environ=extended_environ))
- for secret in args.secrets or []:
- self.secrets.update(load_dotenv(secret, environ=extended_environ))
- self.flavor: Optional[SpaceHardware] = args.flavor
- self.timeout: Optional[str] = args.timeout
- self.detach: bool = args.detach
- self.namespace: Optional[str] = args.namespace
- self.token: Optional[str] = args.token
- self._repo = args.repo
- def run(self) -> None:
- """Schedule UV command."""
- logging.set_verbosity(logging.INFO)
- api = HfApi(token=self.token)
- job = api.create_scheduled_uv_job(
- script=self.script,
- script_args=self.script_args,
- schedule=self.schedule,
- suspend=self.suspend,
- concurrency=self.concurrency,
- dependencies=self.dependencies,
- python=self.python,
- image=self.image,
- env=self.env,
- secrets=self.secrets,
- flavor=self.flavor,
- timeout=self.timeout,
- namespace=self.namespace,
- _repo=self._repo,
- )
- # Always print the job ID to the user
- print(f"Scheduled Job created with ID: {job.id}")
|