_eventloop.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414
  1. from __future__ import annotations
  2. import math
  3. import sys
  4. from abc import ABCMeta, abstractmethod
  5. from collections.abc import AsyncIterator, Awaitable, Callable, Sequence
  6. from contextlib import AbstractContextManager
  7. from os import PathLike
  8. from signal import Signals
  9. from socket import AddressFamily, SocketKind, socket
  10. from typing import (
  11. IO,
  12. TYPE_CHECKING,
  13. Any,
  14. TypeVar,
  15. Union,
  16. overload,
  17. )
  18. if sys.version_info >= (3, 11):
  19. from typing import TypeVarTuple, Unpack
  20. else:
  21. from typing_extensions import TypeVarTuple, Unpack
  22. if sys.version_info >= (3, 10):
  23. from typing import TypeAlias
  24. else:
  25. from typing_extensions import TypeAlias
  26. if TYPE_CHECKING:
  27. from _typeshed import FileDescriptorLike
  28. from .._core._synchronization import CapacityLimiter, Event, Lock, Semaphore
  29. from .._core._tasks import CancelScope
  30. from .._core._testing import TaskInfo
  31. from ._sockets import (
  32. ConnectedUDPSocket,
  33. ConnectedUNIXDatagramSocket,
  34. IPSockAddrType,
  35. SocketListener,
  36. SocketStream,
  37. UDPSocket,
  38. UNIXDatagramSocket,
  39. UNIXSocketStream,
  40. )
  41. from ._subprocesses import Process
  42. from ._tasks import TaskGroup
  43. from ._testing import TestRunner
  44. T_Retval = TypeVar("T_Retval")
  45. PosArgsT = TypeVarTuple("PosArgsT")
  46. StrOrBytesPath: TypeAlias = Union[str, bytes, "PathLike[str]", "PathLike[bytes]"]
  47. class AsyncBackend(metaclass=ABCMeta):
  48. @classmethod
  49. @abstractmethod
  50. def run(
  51. cls,
  52. func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]],
  53. args: tuple[Unpack[PosArgsT]],
  54. kwargs: dict[str, Any],
  55. options: dict[str, Any],
  56. ) -> T_Retval:
  57. """
  58. Run the given coroutine function in an asynchronous event loop.
  59. The current thread must not be already running an event loop.
  60. :param func: a coroutine function
  61. :param args: positional arguments to ``func``
  62. :param kwargs: positional arguments to ``func``
  63. :param options: keyword arguments to call the backend ``run()`` implementation
  64. with
  65. :return: the return value of the coroutine function
  66. """
  67. @classmethod
  68. @abstractmethod
  69. def current_token(cls) -> object:
  70. """
  71. Return an object that allows other threads to run code inside the event loop.
  72. :return: a token object, specific to the event loop running in the current
  73. thread
  74. """
  75. @classmethod
  76. @abstractmethod
  77. def current_time(cls) -> float:
  78. """
  79. Return the current value of the event loop's internal clock.
  80. :return: the clock value (seconds)
  81. """
  82. @classmethod
  83. @abstractmethod
  84. def cancelled_exception_class(cls) -> type[BaseException]:
  85. """Return the exception class that is raised in a task if it's cancelled."""
  86. @classmethod
  87. @abstractmethod
  88. async def checkpoint(cls) -> None:
  89. """
  90. Check if the task has been cancelled, and allow rescheduling of other tasks.
  91. This is effectively the same as running :meth:`checkpoint_if_cancelled` and then
  92. :meth:`cancel_shielded_checkpoint`.
  93. """
  94. @classmethod
  95. async def checkpoint_if_cancelled(cls) -> None:
  96. """
  97. Check if the current task group has been cancelled.
  98. This will check if the task has been cancelled, but will not allow other tasks
  99. to be scheduled if not.
  100. """
  101. if cls.current_effective_deadline() == -math.inf:
  102. await cls.checkpoint()
  103. @classmethod
  104. async def cancel_shielded_checkpoint(cls) -> None:
  105. """
  106. Allow the rescheduling of other tasks.
  107. This will give other tasks the opportunity to run, but without checking if the
  108. current task group has been cancelled, unlike with :meth:`checkpoint`.
  109. """
  110. with cls.create_cancel_scope(shield=True):
  111. await cls.sleep(0)
  112. @classmethod
  113. @abstractmethod
  114. async def sleep(cls, delay: float) -> None:
  115. """
  116. Pause the current task for the specified duration.
  117. :param delay: the duration, in seconds
  118. """
  119. @classmethod
  120. @abstractmethod
  121. def create_cancel_scope(
  122. cls, *, deadline: float = math.inf, shield: bool = False
  123. ) -> CancelScope:
  124. pass
  125. @classmethod
  126. @abstractmethod
  127. def current_effective_deadline(cls) -> float:
  128. """
  129. Return the nearest deadline among all the cancel scopes effective for the
  130. current task.
  131. :return:
  132. - a clock value from the event loop's internal clock
  133. - ``inf`` if there is no deadline in effect
  134. - ``-inf`` if the current scope has been cancelled
  135. :rtype: float
  136. """
  137. @classmethod
  138. @abstractmethod
  139. def create_task_group(cls) -> TaskGroup:
  140. pass
  141. @classmethod
  142. @abstractmethod
  143. def create_event(cls) -> Event:
  144. pass
  145. @classmethod
  146. @abstractmethod
  147. def create_lock(cls, *, fast_acquire: bool) -> Lock:
  148. pass
  149. @classmethod
  150. @abstractmethod
  151. def create_semaphore(
  152. cls,
  153. initial_value: int,
  154. *,
  155. max_value: int | None = None,
  156. fast_acquire: bool = False,
  157. ) -> Semaphore:
  158. pass
  159. @classmethod
  160. @abstractmethod
  161. def create_capacity_limiter(cls, total_tokens: float) -> CapacityLimiter:
  162. pass
  163. @classmethod
  164. @abstractmethod
  165. async def run_sync_in_worker_thread(
  166. cls,
  167. func: Callable[[Unpack[PosArgsT]], T_Retval],
  168. args: tuple[Unpack[PosArgsT]],
  169. abandon_on_cancel: bool = False,
  170. limiter: CapacityLimiter | None = None,
  171. ) -> T_Retval:
  172. pass
  173. @classmethod
  174. @abstractmethod
  175. def check_cancelled(cls) -> None:
  176. pass
  177. @classmethod
  178. @abstractmethod
  179. def run_async_from_thread(
  180. cls,
  181. func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]],
  182. args: tuple[Unpack[PosArgsT]],
  183. token: object,
  184. ) -> T_Retval:
  185. pass
  186. @classmethod
  187. @abstractmethod
  188. def run_sync_from_thread(
  189. cls,
  190. func: Callable[[Unpack[PosArgsT]], T_Retval],
  191. args: tuple[Unpack[PosArgsT]],
  192. token: object,
  193. ) -> T_Retval:
  194. pass
  195. @classmethod
  196. @abstractmethod
  197. async def open_process(
  198. cls,
  199. command: StrOrBytesPath | Sequence[StrOrBytesPath],
  200. *,
  201. stdin: int | IO[Any] | None,
  202. stdout: int | IO[Any] | None,
  203. stderr: int | IO[Any] | None,
  204. **kwargs: Any,
  205. ) -> Process:
  206. pass
  207. @classmethod
  208. @abstractmethod
  209. def setup_process_pool_exit_at_shutdown(cls, workers: set[Process]) -> None:
  210. pass
  211. @classmethod
  212. @abstractmethod
  213. async def connect_tcp(
  214. cls, host: str, port: int, local_address: IPSockAddrType | None = None
  215. ) -> SocketStream:
  216. pass
  217. @classmethod
  218. @abstractmethod
  219. async def connect_unix(cls, path: str | bytes) -> UNIXSocketStream:
  220. pass
  221. @classmethod
  222. @abstractmethod
  223. def create_tcp_listener(cls, sock: socket) -> SocketListener:
  224. pass
  225. @classmethod
  226. @abstractmethod
  227. def create_unix_listener(cls, sock: socket) -> SocketListener:
  228. pass
  229. @classmethod
  230. @abstractmethod
  231. async def create_udp_socket(
  232. cls,
  233. family: AddressFamily,
  234. local_address: IPSockAddrType | None,
  235. remote_address: IPSockAddrType | None,
  236. reuse_port: bool,
  237. ) -> UDPSocket | ConnectedUDPSocket:
  238. pass
  239. @classmethod
  240. @overload
  241. async def create_unix_datagram_socket(
  242. cls, raw_socket: socket, remote_path: None
  243. ) -> UNIXDatagramSocket: ...
  244. @classmethod
  245. @overload
  246. async def create_unix_datagram_socket(
  247. cls, raw_socket: socket, remote_path: str | bytes
  248. ) -> ConnectedUNIXDatagramSocket: ...
  249. @classmethod
  250. @abstractmethod
  251. async def create_unix_datagram_socket(
  252. cls, raw_socket: socket, remote_path: str | bytes | None
  253. ) -> UNIXDatagramSocket | ConnectedUNIXDatagramSocket:
  254. pass
  255. @classmethod
  256. @abstractmethod
  257. async def getaddrinfo(
  258. cls,
  259. host: bytes | str | None,
  260. port: str | int | None,
  261. *,
  262. family: int | AddressFamily = 0,
  263. type: int | SocketKind = 0,
  264. proto: int = 0,
  265. flags: int = 0,
  266. ) -> Sequence[
  267. tuple[
  268. AddressFamily,
  269. SocketKind,
  270. int,
  271. str,
  272. tuple[str, int] | tuple[str, int, int, int] | tuple[int, bytes],
  273. ]
  274. ]:
  275. pass
  276. @classmethod
  277. @abstractmethod
  278. async def getnameinfo(
  279. cls, sockaddr: IPSockAddrType, flags: int = 0
  280. ) -> tuple[str, str]:
  281. pass
  282. @classmethod
  283. @abstractmethod
  284. async def wait_readable(cls, obj: FileDescriptorLike) -> None:
  285. pass
  286. @classmethod
  287. @abstractmethod
  288. async def wait_writable(cls, obj: FileDescriptorLike) -> None:
  289. pass
  290. @classmethod
  291. @abstractmethod
  292. def notify_closing(cls, obj: FileDescriptorLike) -> None:
  293. pass
  294. @classmethod
  295. @abstractmethod
  296. async def wrap_listener_socket(cls, sock: socket) -> SocketListener:
  297. pass
  298. @classmethod
  299. @abstractmethod
  300. async def wrap_stream_socket(cls, sock: socket) -> SocketStream:
  301. pass
  302. @classmethod
  303. @abstractmethod
  304. async def wrap_unix_stream_socket(cls, sock: socket) -> UNIXSocketStream:
  305. pass
  306. @classmethod
  307. @abstractmethod
  308. async def wrap_udp_socket(cls, sock: socket) -> UDPSocket:
  309. pass
  310. @classmethod
  311. @abstractmethod
  312. async def wrap_connected_udp_socket(cls, sock: socket) -> ConnectedUDPSocket:
  313. pass
  314. @classmethod
  315. @abstractmethod
  316. async def wrap_unix_datagram_socket(cls, sock: socket) -> UNIXDatagramSocket:
  317. pass
  318. @classmethod
  319. @abstractmethod
  320. async def wrap_connected_unix_datagram_socket(
  321. cls, sock: socket
  322. ) -> ConnectedUNIXDatagramSocket:
  323. pass
  324. @classmethod
  325. @abstractmethod
  326. def current_default_thread_limiter(cls) -> CapacityLimiter:
  327. pass
  328. @classmethod
  329. @abstractmethod
  330. def open_signal_receiver(
  331. cls, *signals: Signals
  332. ) -> AbstractContextManager[AsyncIterator[Signals]]:
  333. pass
  334. @classmethod
  335. @abstractmethod
  336. def get_current_task(cls) -> TaskInfo:
  337. pass
  338. @classmethod
  339. @abstractmethod
  340. def get_running_tasks(cls) -> Sequence[TaskInfo]:
  341. pass
  342. @classmethod
  343. @abstractmethod
  344. async def wait_all_tasks_blocked(cls) -> None:
  345. pass
  346. @classmethod
  347. @abstractmethod
  348. def create_test_runner(cls, options: dict[str, Any]) -> TestRunner:
  349. pass