wandb_controller.py 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719
  1. """Sweep controller.
  2. This module implements the sweep controller.
  3. On error an exception is raised:
  4. ControllerError
  5. Example:
  6. import wandb
  7. #
  8. # create a sweep controller
  9. #
  10. # There are three different ways sweeps can be created:
  11. # (1) create with sweep id from `wandb sweep` command
  12. sweep_id = 'xyzxyz2'
  13. tuner = wandb.controller(sweep_id)
  14. # (2) create with sweep config
  15. sweep_config = {}
  16. tuner = wandb.controller()
  17. tuner.configure(sweep_config)
  18. tuner.create()
  19. # (3) create by constructing programmatic sweep configuration
  20. tuner = wandb.controller()
  21. tuner.configure_search('random')
  22. tuner.configure_program('train-dummy.py')
  23. tuner.configure_parameter('param1', values=[1,2,3])
  24. tuner.configure_parameter('param2', values=[1,2,3])
  25. tuner.configure_controller(type="local")
  26. tuner.create()
  27. #
  28. # run the sweep controller
  29. #
  30. # There are three different ways sweeps can be executed:
  31. # (1) run to completion
  32. tuner.run()
  33. # (2) run in a simple loop
  34. while not tuner.done():
  35. tuner.step()
  36. tuner.print_status()
  37. # (3) run in a more complex loop
  38. while not tuner.done():
  39. params = tuner.search()
  40. tuner.schedule(params)
  41. runs = tuner.stopping()
  42. if runs:
  43. tuner.stop_runs(runs)
  44. """
  45. import json
  46. import os
  47. import random
  48. import string
  49. import time
  50. from typing import Callable, Dict, List, Optional, Tuple, Union
  51. import yaml
  52. from wandb import env
  53. from wandb.apis import InternalApi
  54. from wandb.sdk import wandb_sweep
  55. from wandb.sdk.launch.sweeps.utils import (
  56. handle_sweep_config_violations,
  57. sweep_config_err_text_from_jsonschema_violations,
  58. )
  59. from wandb.util import get_module
  60. # TODO(jhr): Add metric status
  61. # TODO(jhr): Add print_space
  62. # TODO(jhr): Add print_summary
  63. sweeps = get_module(
  64. "sweeps",
  65. required="wandb[sweeps] is required to use the local controller. "
  66. "Please run `pip install wandb[sweeps]`.",
  67. )
  68. # This should be something like 'pending' (but we need to make sure everyone else is ok with that)
  69. SWEEP_INITIAL_RUN_STATE = sweeps.RunState.pending
  70. def _id_generator(size=10, chars=string.ascii_lowercase + string.digits):
  71. return "".join(random.choice(chars) for _ in range(size))
  72. class ControllerError(Exception):
  73. """Base class for sweep errors."""
  74. class _WandbController:
  75. """Sweep controller class.
  76. Internal datastructures on the sweep object to coordinate local controller with
  77. cloud controller.
  78. Data structures:
  79. controller: {
  80. schedule: [
  81. { id: SCHEDULE_ID
  82. data: {param1: val1, param2: val2}},
  83. ]
  84. earlystop: [RUN_ID, ...]
  85. scheduler:
  86. scheduled: [
  87. { id: SCHEDULE_ID
  88. runid: RUN_ID},
  89. ]
  90. `controller` is only updated by the client
  91. `scheduler` is only updated by the cloud backend
  92. Protocols:
  93. Scheduling a run:
  94. - client controller adds a schedule entry on the controller.schedule list
  95. - cloud backend notices the new entry and creates a run with the parameters
  96. - cloud backend adds a scheduled entry on the scheduler.scheduled list
  97. - client controller notices that the run has been scheduled and removes it from
  98. controller.schedule list
  99. Current implementation details:
  100. - Runs are only schedule if there are no other runs scheduled.
  101. """
  102. def __init__(self, sweep_id_or_config=None, entity=None, project=None):
  103. # sweep id configured in constructor
  104. self._sweep_id: Optional[str] = None
  105. # configured parameters
  106. # Configuration to be created
  107. self._create: Dict = {}
  108. # Custom search
  109. self._custom_search: Optional[
  110. Callable[
  111. [Union[dict, sweeps.SweepConfig], List[sweeps.SweepRun]],
  112. Optional[sweeps.SweepRun],
  113. ]
  114. ] = None
  115. # Custom stopping
  116. self._custom_stopping: Optional[
  117. Callable[
  118. [Union[dict, sweeps.SweepConfig], List[sweeps.SweepRun]],
  119. List[sweeps.SweepRun],
  120. ]
  121. ] = None
  122. # Program function (used for future jupyter support)
  123. self._program_function = None
  124. # The following are updated every sweep step
  125. # raw sweep object (dict of strings)
  126. self._sweep_obj = None
  127. # parsed sweep config (dict)
  128. self._sweep_config: Optional[Union[dict, sweeps.SweepConfig]] = None
  129. # sweep metric used to optimize (str or None)
  130. self._sweep_metric: Optional[str] = None
  131. # list of _Run objects
  132. self._sweep_runs: Optional[List[sweeps.SweepRun]] = None
  133. # dictionary mapping name of run to run object
  134. self._sweep_runs_map: Optional[Dict[str, sweeps.SweepRun]] = None
  135. # scheduler dict (read only from controller) - used as feedback from the server
  136. self._scheduler: Optional[Dict] = None
  137. # controller dict (write only from controller) - used to send commands to server
  138. self._controller: Optional[Dict] = None
  139. # keep track of controller dict from previous step
  140. self._controller_prev_step: Optional[Dict] = None
  141. # Internal
  142. # Keep track of whether the sweep has been started
  143. self._started: bool = False
  144. # indicate whether there is more to schedule
  145. self._done_scheduling: bool = False
  146. # indicate whether the sweep needs to be created
  147. self._defer_sweep_creation: bool = False
  148. # count of logged lines since last status
  149. self._logged: int = 0
  150. # last status line printed
  151. self._laststatus: str = ""
  152. # keep track of logged actions for print_actions()
  153. self._log_actions: List[Tuple[str, str]] = []
  154. # keep track of logged debug for print_debug()
  155. self._log_debug: List[str] = []
  156. # all backend commands use internal api
  157. environ = os.environ
  158. if entity:
  159. env.set_entity(entity, env=environ)
  160. if project:
  161. env.set_project(project, env=environ)
  162. self._api = InternalApi(environ=environ)
  163. if isinstance(sweep_id_or_config, str):
  164. self._sweep_id = sweep_id_or_config
  165. elif isinstance(sweep_id_or_config, dict) or isinstance(
  166. sweep_id_or_config, sweeps.SweepConfig
  167. ):
  168. self._create = sweeps.SweepConfig(sweep_id_or_config)
  169. # check for custom search and or stopping functions
  170. for config_key, controller_attr in zip(
  171. ["method", "early_terminate"], ["_custom_search", "_custom_stopping"]
  172. ):
  173. if callable(config_key in self._create and self._create[config_key]):
  174. setattr(self, controller_attr, self._create[config_key])
  175. self._create[config_key] = "custom"
  176. self._sweep_id = self.create(from_dict=True)
  177. elif sweep_id_or_config is None:
  178. self._defer_sweep_creation = True
  179. return
  180. else:
  181. raise ControllerError("Unhandled sweep controller type")
  182. sweep_obj = self._sweep_object_read_from_backend()
  183. if sweep_obj is None:
  184. raise ControllerError("Can not find sweep")
  185. self._sweep_obj = sweep_obj
  186. def configure_search(
  187. self,
  188. search: Union[
  189. str,
  190. Callable[
  191. [Union[dict, sweeps.SweepConfig], List[sweeps.SweepRun]],
  192. Optional[sweeps.SweepRun],
  193. ],
  194. ],
  195. ):
  196. self._configure_check()
  197. if isinstance(search, str):
  198. self._create["method"] = search
  199. elif callable(search):
  200. self._create["method"] = "custom"
  201. self._custom_search = search
  202. else:
  203. raise ControllerError("Unhandled search type.")
  204. def configure_stopping(
  205. self,
  206. stopping: Union[
  207. str,
  208. Callable[
  209. [Union[dict, sweeps.SweepConfig], List[sweeps.SweepRun]],
  210. List[sweeps.SweepRun],
  211. ],
  212. ],
  213. **kwargs,
  214. ):
  215. self._configure_check()
  216. if isinstance(stopping, str):
  217. self._create.setdefault("early_terminate", {})
  218. self._create["early_terminate"]["type"] = stopping
  219. for k, v in kwargs.items():
  220. self._create["early_terminate"][k] = v
  221. elif callable(stopping):
  222. self._custom_stopping = stopping(kwargs)
  223. self._create.setdefault("early_terminate", {})
  224. self._create["early_terminate"]["type"] = "custom"
  225. else:
  226. raise ControllerError("Unhandled stopping type.")
  227. def configure_metric(self, metric, goal=None):
  228. self._configure_check()
  229. self._create.setdefault("metric", {})
  230. self._create["metric"]["name"] = metric
  231. if goal:
  232. self._create["metric"]["goal"] = goal
  233. def configure_program(self, program):
  234. self._configure_check()
  235. if isinstance(program, str):
  236. self._create["program"] = program
  237. elif callable(program):
  238. self._create["program"] = "__callable__"
  239. self._program_function = program
  240. raise ControllerError("Program functions are not supported yet")
  241. else:
  242. raise ControllerError("Unhandled sweep program type")
  243. def configure_name(self, name):
  244. self._configure_check()
  245. self._create["name"] = name
  246. def configure_description(self, description):
  247. self._configure_check()
  248. self._create["description"] = description
  249. def configure_parameter(
  250. self,
  251. name,
  252. values=None,
  253. value=None,
  254. distribution=None,
  255. min=None,
  256. max=None,
  257. mu=None,
  258. sigma=None,
  259. q=None,
  260. a=None,
  261. b=None,
  262. ):
  263. self._configure_check()
  264. self._create.setdefault("parameters", {}).setdefault(name, {})
  265. if value is not None or (
  266. values is None and min is None and max is None and distribution is None
  267. ):
  268. self._create["parameters"][name]["value"] = value
  269. if values is not None:
  270. self._create["parameters"][name]["values"] = values
  271. if distribution is not None:
  272. self._create["parameters"][name]["distribution"] = distribution
  273. if min is not None:
  274. self._create["parameters"][name]["min"] = min
  275. if max is not None:
  276. self._create["parameters"][name]["max"] = max
  277. if mu is not None:
  278. self._create["parameters"][name]["mu"] = mu
  279. if sigma is not None:
  280. self._create["parameters"][name]["sigma"] = sigma
  281. if q is not None:
  282. self._create["parameters"][name]["q"] = q
  283. if a is not None:
  284. self._create["parameters"][name]["a"] = a
  285. if b is not None:
  286. self._create["parameters"][name]["b"] = b
  287. def configure_controller(self, type):
  288. """Configure controller to local if type == 'local'."""
  289. self._configure_check()
  290. self._create.setdefault("controller", {})
  291. self._create["controller"].setdefault("type", type)
  292. def configure(self, sweep_dict_or_config):
  293. self._configure_check()
  294. if self._create:
  295. raise ControllerError("Already configured.")
  296. if isinstance(sweep_dict_or_config, dict):
  297. self._create = sweep_dict_or_config
  298. elif isinstance(sweep_dict_or_config, str):
  299. self._create = yaml.safe_load(sweep_dict_or_config)
  300. else:
  301. raise ControllerError("Unhandled sweep controller type")
  302. @property
  303. def sweep_config(self) -> Union[dict, sweeps.SweepConfig]:
  304. return self._sweep_config
  305. @property
  306. def sweep_id(self) -> str:
  307. return self._sweep_id
  308. def _log(self) -> None:
  309. self._logged += 1
  310. def _error(self, s: str) -> None:
  311. print("ERROR:", s) # noqa: T201
  312. self._log()
  313. def _warn(self, s: str) -> None:
  314. print("WARN:", s) # noqa: T201
  315. self._log()
  316. def _info(self, s: str) -> None:
  317. print("INFO:", s) # noqa: T201
  318. self._log()
  319. def _debug(self, s: str) -> None:
  320. print("DEBUG:", s) # noqa: T201
  321. self._log()
  322. def _configure_check(self) -> None:
  323. if self._started:
  324. raise ControllerError("Can not configure after sweep has been started.")
  325. def _validate(self, config: Dict) -> str:
  326. violations = sweeps.schema_violations_from_proposed_config(config)
  327. msg = (
  328. sweep_config_err_text_from_jsonschema_violations(violations)
  329. if len(violations) > 0
  330. else ""
  331. )
  332. return msg
  333. def create(self, from_dict: bool = False) -> str:
  334. if self._started:
  335. raise ControllerError("Can not create after sweep has been started.")
  336. if not self._defer_sweep_creation and not from_dict:
  337. raise ControllerError("Can not use create on already created sweep.")
  338. if not self._create:
  339. raise ControllerError("Must configure sweep before create.")
  340. # validate sweep config
  341. self._create = sweeps.SweepConfig(self._create)
  342. # Create sweep
  343. sweep_id, warnings = self._api.upsert_sweep(self._create)
  344. handle_sweep_config_violations(warnings)
  345. print("Create sweep with ID:", sweep_id) # noqa: T201
  346. sweep_url = wandb_sweep._get_sweep_url(self._api, sweep_id)
  347. if sweep_url:
  348. print("Sweep URL:", sweep_url) # noqa: T201
  349. self._sweep_id = sweep_id
  350. self._defer_sweep_creation = False
  351. return sweep_id
  352. def run(
  353. self,
  354. verbose: bool = False,
  355. print_status: bool = True,
  356. print_actions: bool = False,
  357. print_debug: bool = False,
  358. ) -> None:
  359. if verbose:
  360. print_status = True
  361. print_actions = True
  362. print_debug = True
  363. self._start_if_not_started()
  364. while not self.done():
  365. if print_status:
  366. self.print_status()
  367. self.step()
  368. if print_actions:
  369. self.print_actions()
  370. if print_debug:
  371. self.print_debug()
  372. time.sleep(5)
  373. def _sweep_object_read_from_backend(self) -> Optional[dict]:
  374. specs_json = {}
  375. if self._sweep_metric:
  376. k = ["_step"]
  377. k.append(self._sweep_metric)
  378. specs_json = {"keys": k, "samples": 100000}
  379. specs = json.dumps(specs_json)
  380. # TODO(jhr): catch exceptions?
  381. sweep_obj = self._api.sweep(self._sweep_id, specs)
  382. if not sweep_obj:
  383. return
  384. self._sweep_obj = sweep_obj
  385. self._sweep_config = yaml.safe_load(sweep_obj["config"])
  386. self._sweep_metric = self._sweep_config.get("metric", {}).get("name")
  387. _sweep_runs: List[sweeps.SweepRun] = []
  388. for r in sweep_obj["runs"]:
  389. rr = r.copy()
  390. if "summaryMetrics" in rr:
  391. if rr["summaryMetrics"]:
  392. rr["summaryMetrics"] = json.loads(rr["summaryMetrics"])
  393. if "config" not in rr:
  394. raise ValueError("sweep object is missing config")
  395. rr["config"] = json.loads(rr["config"])
  396. if "history" in rr:
  397. if isinstance(rr["history"], list):
  398. rr["history"] = [json.loads(d) for d in rr["history"]]
  399. else:
  400. raise ValueError(
  401. "Invalid history value: expected list of json strings: {}".format(
  402. rr["history"]
  403. )
  404. )
  405. if "sampledHistory" in rr:
  406. sampled_history = []
  407. for historyDictList in rr["sampledHistory"]:
  408. sampled_history += historyDictList
  409. rr["sampledHistory"] = sampled_history
  410. _sweep_runs.append(sweeps.SweepRun(**rr))
  411. self._sweep_runs = _sweep_runs
  412. self._sweep_runs_map = {r.name: r for r in self._sweep_runs}
  413. self._controller = json.loads(sweep_obj.get("controller") or "{}")
  414. self._scheduler = json.loads(sweep_obj.get("scheduler") or "{}")
  415. self._controller_prev_step = self._controller.copy()
  416. return sweep_obj
  417. def _sweep_object_sync_to_backend(self) -> None:
  418. if self._controller == self._controller_prev_step:
  419. return
  420. sweep_obj_id = self._sweep_obj["id"]
  421. controller = json.dumps(self._controller)
  422. _, warnings = self._api.upsert_sweep(
  423. self._sweep_config, controller=controller, obj_id=sweep_obj_id
  424. )
  425. handle_sweep_config_violations(warnings)
  426. self._controller_prev_step = self._controller.copy()
  427. def _start_if_not_started(self) -> None:
  428. if self._started:
  429. return
  430. if self._defer_sweep_creation:
  431. raise ControllerError(
  432. "Must specify or create a sweep before running controller."
  433. )
  434. obj = self._sweep_object_read_from_backend()
  435. if not obj:
  436. return
  437. is_local = self._sweep_config.get("controller", {}).get("type") == "local"
  438. if not is_local:
  439. raise ControllerError(
  440. "Only sweeps with a local controller are currently supported."
  441. )
  442. self._started = True
  443. # reset controller state, we might want to parse this and decide
  444. # what we can continue and add a version key, but for now we can
  445. # be safe and just reset things on start
  446. self._controller = {}
  447. self._sweep_object_sync_to_backend()
  448. def _parse_scheduled(self):
  449. scheduled_list = self._scheduler.get("scheduled") or []
  450. started_ids = []
  451. stopped_runs = []
  452. done_runs = []
  453. for s in scheduled_list:
  454. runid = s.get("runid")
  455. objid = s.get("id")
  456. r = self._sweep_runs_map.get(runid)
  457. if not r:
  458. continue
  459. if r.stopped:
  460. stopped_runs.append(runid)
  461. summary = r.summary_metrics
  462. if r.state == SWEEP_INITIAL_RUN_STATE and not summary:
  463. continue
  464. started_ids.append(objid)
  465. if r.state != "running":
  466. done_runs.append(runid)
  467. return started_ids, stopped_runs, done_runs
  468. def _step(self) -> None:
  469. self._start_if_not_started()
  470. self._sweep_object_read_from_backend()
  471. started_ids, stopped_runs, done_runs = self._parse_scheduled()
  472. # Remove schedule entry from controller dict if already scheduled
  473. schedule_list = self._controller.get("schedule", [])
  474. new_schedule_list = [s for s in schedule_list if s.get("id") not in started_ids]
  475. self._controller["schedule"] = new_schedule_list
  476. # Remove earlystop entry from controller if already stopped
  477. earlystop_list = self._controller.get("earlystop", [])
  478. new_earlystop_list = [
  479. r for r in earlystop_list if r not in stopped_runs and r not in done_runs
  480. ]
  481. self._controller["earlystop"] = new_earlystop_list
  482. # Clear out step logs
  483. self._log_actions = []
  484. self._log_debug = []
  485. def step(self) -> None:
  486. self._step()
  487. suggestion = self.search()
  488. self.schedule(suggestion)
  489. to_stop = self.stopping()
  490. if len(to_stop) > 0:
  491. self.stop_runs(to_stop)
  492. def done(self) -> bool:
  493. self._start_if_not_started()
  494. state = self._sweep_obj.get("state")
  495. if state in [
  496. s.upper()
  497. for s in (
  498. sweeps.RunState.preempting.value,
  499. SWEEP_INITIAL_RUN_STATE.value,
  500. sweeps.RunState.running.value,
  501. )
  502. ]:
  503. return False
  504. return True
  505. def _search(self) -> Optional[sweeps.SweepRun]:
  506. search = self._custom_search or sweeps.next_run
  507. next_run = search(self._sweep_config, self._sweep_runs or [])
  508. if next_run is None:
  509. self._done_scheduling = True
  510. return next_run
  511. def search(self) -> Optional[sweeps.SweepRun]:
  512. self._start_if_not_started()
  513. suggestion = self._search()
  514. return suggestion
  515. def _stopping(self) -> List[sweeps.SweepRun]:
  516. if "early_terminate" not in self.sweep_config:
  517. return []
  518. stopper = self._custom_stopping or sweeps.stop_runs
  519. stop_runs = stopper(self._sweep_config, self._sweep_runs or [])
  520. debug_lines = [
  521. " ".join([f"{k}={v}" for k, v in run.early_terminate_info.items()])
  522. for run in stop_runs
  523. if run.early_terminate_info is not None
  524. ]
  525. if debug_lines:
  526. self._log_debug += debug_lines
  527. return stop_runs
  528. def stopping(self) -> List[sweeps.SweepRun]:
  529. self._start_if_not_started()
  530. return self._stopping()
  531. def schedule(self, run: Optional[sweeps.SweepRun]) -> None:
  532. self._start_if_not_started()
  533. # only schedule one run at a time (for now)
  534. if self._controller and self._controller.get("schedule"):
  535. return
  536. schedule_id = _id_generator()
  537. if run is None:
  538. schedule_list = [{"id": schedule_id, "data": {"args": None}}]
  539. else:
  540. param_list = [
  541. "{}={}".format(k, v.get("value")) for k, v in sorted(run.config.items())
  542. ]
  543. self._log_actions.append(("schedule", ",".join(param_list)))
  544. # schedule one run
  545. schedule_list = [{"id": schedule_id, "data": {"args": run.config}}]
  546. self._controller["schedule"] = schedule_list
  547. self._sweep_object_sync_to_backend()
  548. def stop_runs(self, runs: List[sweeps.SweepRun]) -> None:
  549. earlystop_list = list({run.name for run in runs})
  550. self._log_actions.append(("stop", ",".join(earlystop_list)))
  551. self._controller["earlystop"] = earlystop_list
  552. self._sweep_object_sync_to_backend()
  553. def print_status(self) -> None:
  554. status = _sweep_status(self._sweep_obj, self._sweep_config, self._sweep_runs)
  555. if self._laststatus != status or self._logged:
  556. print(status) # noqa: T201
  557. self._laststatus = status
  558. self._logged = 0
  559. def print_actions(self) -> None:
  560. for action, line in self._log_actions:
  561. self._info(f"{action.capitalize()} ({line})")
  562. self._log_actions = []
  563. def print_debug(self) -> None:
  564. for line in self._log_debug:
  565. self._debug(line)
  566. self._log_debug = []
  567. def print_space(self) -> None:
  568. self._warn("Method not implemented yet.")
  569. def print_summary(self) -> None:
  570. self._warn("Method not implemented yet.")
  571. def _get_run_counts(runs: List[sweeps.SweepRun]) -> Dict[str, int]:
  572. metrics = {}
  573. categories = [name for name, _ in sweeps.RunState.__members__.items()] + ["unknown"]
  574. for r in runs:
  575. state = r.state
  576. found = "unknown"
  577. for c in categories:
  578. if state == c:
  579. found = c
  580. break
  581. metrics.setdefault(found, 0)
  582. metrics[found] += 1
  583. return metrics
  584. def _get_runs_status(metrics):
  585. categories = [name for name, _ in sweeps.RunState.__members__.items()] + ["unknown"]
  586. mlist = []
  587. for c in categories:
  588. if not metrics.get(c):
  589. continue
  590. mlist.append(f"{c.capitalize()}: {metrics[c]}")
  591. s = ", ".join(mlist)
  592. return s
  593. def _sweep_status(
  594. sweep_obj: dict,
  595. sweep_conf: Union[dict, sweeps.SweepConfig],
  596. sweep_runs: List[sweeps.SweepRun],
  597. ) -> str:
  598. sweep = sweep_obj["name"]
  599. _ = sweep_obj["state"]
  600. run_count = len(sweep_runs)
  601. run_type_counts = _get_run_counts(sweep_runs)
  602. stopped = len([r for r in sweep_runs if r.stopped])
  603. stopping = len([r for r in sweep_runs if r.should_stop])
  604. stopstr = ""
  605. if stopped or stopping:
  606. stopstr = f"Stopped: {stopped}"
  607. if stopping:
  608. stopstr += f" (Stopping: {stopping})"
  609. runs_status = _get_runs_status(run_type_counts)
  610. method = sweep_conf.get("method", "unknown")
  611. stopping = sweep_conf.get("early_terminate", None)
  612. sweep_options = []
  613. sweep_options.append(method)
  614. if stopping:
  615. sweep_options.append(stopping.get("type", "unknown"))
  616. sweep_options = ",".join(sweep_options)
  617. sections = []
  618. sections.append(f"Sweep: {sweep} ({sweep_options})")
  619. if runs_status:
  620. sections.append(f"Runs: {run_count} ({runs_status})")
  621. else:
  622. sections.append(f"Runs: {run_count}")
  623. if stopstr:
  624. sections.append(stopstr)
  625. sections = " | ".join(sections)
  626. return sections