progress_bars.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. from __future__ import annotations
  2. import functools
  3. import sys
  4. from collections.abc import Generator, Iterable, Iterator
  5. from typing import Callable, Literal, TypeVar
  6. from pip._vendor.rich.progress import (
  7. BarColumn,
  8. DownloadColumn,
  9. FileSizeColumn,
  10. MofNCompleteColumn,
  11. Progress,
  12. ProgressColumn,
  13. SpinnerColumn,
  14. TextColumn,
  15. TimeElapsedColumn,
  16. TimeRemainingColumn,
  17. TransferSpeedColumn,
  18. )
  19. from pip._internal.cli.spinners import RateLimiter
  20. from pip._internal.req.req_install import InstallRequirement
  21. from pip._internal.utils.logging import get_console, get_indentation
  22. T = TypeVar("T")
  23. ProgressRenderer = Callable[[Iterable[T]], Iterator[T]]
  24. BarType = Literal["on", "off", "raw"]
  25. def _rich_download_progress_bar(
  26. iterable: Iterable[bytes],
  27. *,
  28. bar_type: BarType,
  29. size: int | None,
  30. initial_progress: int | None = None,
  31. ) -> Generator[bytes, None, None]:
  32. assert bar_type == "on", "This should only be used in the default mode."
  33. if not size:
  34. total = float("inf")
  35. columns: tuple[ProgressColumn, ...] = (
  36. TextColumn("[progress.description]{task.description}"),
  37. SpinnerColumn("line", speed=1.5),
  38. FileSizeColumn(),
  39. TransferSpeedColumn(),
  40. TimeElapsedColumn(),
  41. )
  42. else:
  43. total = size
  44. columns = (
  45. TextColumn("[progress.description]{task.description}"),
  46. BarColumn(),
  47. DownloadColumn(),
  48. TransferSpeedColumn(),
  49. TextColumn("{task.fields[time_description]}"),
  50. TimeRemainingColumn(elapsed_when_finished=True),
  51. )
  52. progress = Progress(*columns, refresh_per_second=5)
  53. task_id = progress.add_task(
  54. " " * (get_indentation() + 2), total=total, time_description="eta"
  55. )
  56. if initial_progress is not None:
  57. progress.update(task_id, advance=initial_progress)
  58. with progress:
  59. for chunk in iterable:
  60. yield chunk
  61. progress.update(task_id, advance=len(chunk))
  62. progress.update(task_id, time_description="")
  63. def _rich_install_progress_bar(
  64. iterable: Iterable[InstallRequirement], *, total: int
  65. ) -> Iterator[InstallRequirement]:
  66. columns = (
  67. TextColumn("{task.fields[indent]}"),
  68. BarColumn(),
  69. MofNCompleteColumn(),
  70. TextColumn("{task.description}"),
  71. )
  72. console = get_console()
  73. bar = Progress(*columns, refresh_per_second=6, console=console, transient=True)
  74. # Hiding the progress bar at initialization forces a refresh cycle to occur
  75. # until the bar appears, avoiding very short flashes.
  76. task = bar.add_task("", total=total, indent=" " * get_indentation(), visible=False)
  77. with bar:
  78. for req in iterable:
  79. bar.update(task, description=rf"\[{req.name}]", visible=True)
  80. yield req
  81. bar.advance(task)
  82. def _raw_progress_bar(
  83. iterable: Iterable[bytes],
  84. *,
  85. size: int | None,
  86. initial_progress: int | None = None,
  87. ) -> Generator[bytes, None, None]:
  88. def write_progress(current: int, total: int) -> None:
  89. sys.stdout.write(f"Progress {current} of {total}\n")
  90. sys.stdout.flush()
  91. current = initial_progress or 0
  92. total = size or 0
  93. rate_limiter = RateLimiter(0.25)
  94. write_progress(current, total)
  95. for chunk in iterable:
  96. current += len(chunk)
  97. if rate_limiter.ready() or current == total:
  98. write_progress(current, total)
  99. rate_limiter.reset()
  100. yield chunk
  101. def get_download_progress_renderer(
  102. *, bar_type: BarType, size: int | None = None, initial_progress: int | None = None
  103. ) -> ProgressRenderer[bytes]:
  104. """Get an object that can be used to render the download progress.
  105. Returns a callable, that takes an iterable to "wrap".
  106. """
  107. if bar_type == "on":
  108. return functools.partial(
  109. _rich_download_progress_bar,
  110. bar_type=bar_type,
  111. size=size,
  112. initial_progress=initial_progress,
  113. )
  114. elif bar_type == "raw":
  115. return functools.partial(
  116. _raw_progress_bar,
  117. size=size,
  118. initial_progress=initial_progress,
  119. )
  120. else:
  121. return iter # no-op, when passed an iterator
  122. def get_install_progress_renderer(
  123. *, bar_type: BarType, total: int
  124. ) -> ProgressRenderer[InstallRequirement]:
  125. """Get an object that can be used to render the install progress.
  126. Returns a callable, that takes an iterable to "wrap".
  127. """
  128. if bar_type == "on":
  129. return functools.partial(_rich_install_progress_bar, total=total)
  130. else:
  131. return iter