_asyncio.py 97 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755275627572758275927602761276227632764276527662767276827692770277127722773277427752776277727782779278027812782278327842785278627872788278927902791279227932794279527962797279827992800280128022803280428052806280728082809281028112812281328142815281628172818281928202821282228232824282528262827282828292830283128322833283428352836283728382839284028412842284328442845284628472848284928502851285228532854285528562857285828592860286128622863286428652866286728682869287028712872287328742875287628772878287928802881288228832884288528862887288828892890289128922893289428952896289728982899290029012902290329042905290629072908290929102911291229132914291529162917291829192920292129222923292429252926292729282929293029312932293329342935293629372938293929402941294229432944294529462947294829492950295129522953295429552956295729582959296029612962296329642965296629672968296929702971297229732974297529762977297829792980298129822983298429852986298729882989299029912992299329942995299629972998299930003001300230033004300530063007
  1. from __future__ import annotations
  2. import array
  3. import asyncio
  4. import concurrent.futures
  5. import contextvars
  6. import math
  7. import os
  8. import socket
  9. import sys
  10. import threading
  11. import weakref
  12. from asyncio import (
  13. AbstractEventLoop,
  14. CancelledError,
  15. all_tasks,
  16. create_task,
  17. current_task,
  18. get_running_loop,
  19. sleep,
  20. )
  21. from asyncio.base_events import _run_until_complete_cb # type: ignore[attr-defined]
  22. from collections import OrderedDict, deque
  23. from collections.abc import (
  24. AsyncGenerator,
  25. AsyncIterator,
  26. Awaitable,
  27. Callable,
  28. Collection,
  29. Coroutine,
  30. Iterable,
  31. Sequence,
  32. )
  33. from concurrent.futures import Future
  34. from contextlib import AbstractContextManager, suppress
  35. from contextvars import Context, copy_context
  36. from dataclasses import dataclass, field
  37. from functools import partial, wraps
  38. from inspect import (
  39. CORO_RUNNING,
  40. CORO_SUSPENDED,
  41. getcoroutinestate,
  42. iscoroutine,
  43. )
  44. from io import IOBase
  45. from os import PathLike
  46. from queue import Queue
  47. from signal import Signals
  48. from socket import AddressFamily, SocketKind
  49. from threading import Thread
  50. from types import CodeType, TracebackType
  51. from typing import (
  52. IO,
  53. TYPE_CHECKING,
  54. Any,
  55. Optional,
  56. TypeVar,
  57. cast,
  58. )
  59. from weakref import WeakKeyDictionary
  60. from .. import (
  61. CapacityLimiterStatistics,
  62. EventStatistics,
  63. LockStatistics,
  64. TaskInfo,
  65. abc,
  66. )
  67. from .._core._eventloop import (
  68. claim_worker_thread,
  69. set_current_async_library,
  70. threadlocals,
  71. )
  72. from .._core._exceptions import (
  73. BrokenResourceError,
  74. BusyResourceError,
  75. ClosedResourceError,
  76. EndOfStream,
  77. RunFinishedError,
  78. WouldBlock,
  79. iterate_exceptions,
  80. )
  81. from .._core._sockets import convert_ipv6_sockaddr
  82. from .._core._streams import create_memory_object_stream
  83. from .._core._synchronization import (
  84. CapacityLimiter as BaseCapacityLimiter,
  85. )
  86. from .._core._synchronization import Event as BaseEvent
  87. from .._core._synchronization import Lock as BaseLock
  88. from .._core._synchronization import (
  89. ResourceGuard,
  90. SemaphoreStatistics,
  91. )
  92. from .._core._synchronization import Semaphore as BaseSemaphore
  93. from .._core._tasks import CancelScope as BaseCancelScope
  94. from ..abc import (
  95. AsyncBackend,
  96. IPSockAddrType,
  97. SocketListener,
  98. UDPPacketType,
  99. UNIXDatagramPacketType,
  100. )
  101. from ..abc._eventloop import StrOrBytesPath
  102. from ..lowlevel import RunVar
  103. from ..streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream
  104. if TYPE_CHECKING:
  105. from _typeshed import FileDescriptorLike
  106. else:
  107. FileDescriptorLike = object
  108. if sys.version_info >= (3, 10):
  109. from typing import ParamSpec
  110. else:
  111. from typing_extensions import ParamSpec
  112. if sys.version_info >= (3, 11):
  113. from asyncio import Runner
  114. from typing import TypeVarTuple, Unpack
  115. else:
  116. import contextvars
  117. import enum
  118. import signal
  119. from asyncio import coroutines, events, exceptions, tasks
  120. from exceptiongroup import BaseExceptionGroup
  121. from typing_extensions import TypeVarTuple, Unpack
  122. class _State(enum.Enum):
  123. CREATED = "created"
  124. INITIALIZED = "initialized"
  125. CLOSED = "closed"
  126. class Runner:
  127. # Copied from CPython 3.11
  128. def __init__(
  129. self,
  130. *,
  131. debug: bool | None = None,
  132. loop_factory: Callable[[], AbstractEventLoop] | None = None,
  133. ):
  134. self._state = _State.CREATED
  135. self._debug = debug
  136. self._loop_factory = loop_factory
  137. self._loop: AbstractEventLoop | None = None
  138. self._context = None
  139. self._interrupt_count = 0
  140. self._set_event_loop = False
  141. def __enter__(self) -> Runner:
  142. self._lazy_init()
  143. return self
  144. def __exit__(
  145. self,
  146. exc_type: type[BaseException] | None,
  147. exc_val: BaseException | None,
  148. exc_tb: TracebackType | None,
  149. ) -> None:
  150. self.close()
  151. def close(self) -> None:
  152. """Shutdown and close event loop."""
  153. loop = self._loop
  154. if self._state is not _State.INITIALIZED or loop is None:
  155. return
  156. try:
  157. _cancel_all_tasks(loop)
  158. loop.run_until_complete(loop.shutdown_asyncgens())
  159. if hasattr(loop, "shutdown_default_executor"):
  160. loop.run_until_complete(loop.shutdown_default_executor())
  161. else:
  162. loop.run_until_complete(_shutdown_default_executor(loop))
  163. finally:
  164. if self._set_event_loop:
  165. events.set_event_loop(None)
  166. loop.close()
  167. self._loop = None
  168. self._state = _State.CLOSED
  169. def get_loop(self) -> AbstractEventLoop:
  170. """Return embedded event loop."""
  171. self._lazy_init()
  172. return self._loop
  173. def run(self, coro: Coroutine[T_Retval], *, context=None) -> T_Retval:
  174. """Run a coroutine inside the embedded event loop."""
  175. if not coroutines.iscoroutine(coro):
  176. raise ValueError(f"a coroutine was expected, got {coro!r}")
  177. if events._get_running_loop() is not None:
  178. # fail fast with short traceback
  179. raise RuntimeError(
  180. "Runner.run() cannot be called from a running event loop"
  181. )
  182. self._lazy_init()
  183. if context is None:
  184. context = self._context
  185. task = context.run(self._loop.create_task, coro)
  186. if (
  187. threading.current_thread() is threading.main_thread()
  188. and signal.getsignal(signal.SIGINT) is signal.default_int_handler
  189. ):
  190. sigint_handler = partial(self._on_sigint, main_task=task)
  191. try:
  192. signal.signal(signal.SIGINT, sigint_handler)
  193. except ValueError:
  194. # `signal.signal` may throw if `threading.main_thread` does
  195. # not support signals (e.g. embedded interpreter with signals
  196. # not registered - see gh-91880)
  197. sigint_handler = None
  198. else:
  199. sigint_handler = None
  200. self._interrupt_count = 0
  201. try:
  202. return self._loop.run_until_complete(task)
  203. except exceptions.CancelledError:
  204. if self._interrupt_count > 0:
  205. uncancel = getattr(task, "uncancel", None)
  206. if uncancel is not None and uncancel() == 0:
  207. raise KeyboardInterrupt # noqa: B904
  208. raise # CancelledError
  209. finally:
  210. if (
  211. sigint_handler is not None
  212. and signal.getsignal(signal.SIGINT) is sigint_handler
  213. ):
  214. signal.signal(signal.SIGINT, signal.default_int_handler)
  215. def _lazy_init(self) -> None:
  216. if self._state is _State.CLOSED:
  217. raise RuntimeError("Runner is closed")
  218. if self._state is _State.INITIALIZED:
  219. return
  220. if self._loop_factory is None:
  221. self._loop = events.new_event_loop()
  222. if not self._set_event_loop:
  223. # Call set_event_loop only once to avoid calling
  224. # attach_loop multiple times on child watchers
  225. events.set_event_loop(self._loop)
  226. self._set_event_loop = True
  227. else:
  228. self._loop = self._loop_factory()
  229. if self._debug is not None:
  230. self._loop.set_debug(self._debug)
  231. self._context = contextvars.copy_context()
  232. self._state = _State.INITIALIZED
  233. def _on_sigint(self, signum, frame, main_task: asyncio.Task) -> None:
  234. self._interrupt_count += 1
  235. if self._interrupt_count == 1 and not main_task.done():
  236. main_task.cancel()
  237. # wakeup loop if it is blocked by select() with long timeout
  238. self._loop.call_soon_threadsafe(lambda: None)
  239. return
  240. raise KeyboardInterrupt()
  241. def _cancel_all_tasks(loop: AbstractEventLoop) -> None:
  242. to_cancel = tasks.all_tasks(loop)
  243. if not to_cancel:
  244. return
  245. for task in to_cancel:
  246. task.cancel()
  247. loop.run_until_complete(tasks.gather(*to_cancel, return_exceptions=True))
  248. for task in to_cancel:
  249. if task.cancelled():
  250. continue
  251. if task.exception() is not None:
  252. loop.call_exception_handler(
  253. {
  254. "message": "unhandled exception during asyncio.run() shutdown",
  255. "exception": task.exception(),
  256. "task": task,
  257. }
  258. )
  259. async def _shutdown_default_executor(loop: AbstractEventLoop) -> None:
  260. """Schedule the shutdown of the default executor."""
  261. def _do_shutdown(future: asyncio.futures.Future) -> None:
  262. try:
  263. loop._default_executor.shutdown(wait=True) # type: ignore[attr-defined]
  264. loop.call_soon_threadsafe(future.set_result, None)
  265. except Exception as ex:
  266. loop.call_soon_threadsafe(future.set_exception, ex)
  267. loop._executor_shutdown_called = True
  268. if loop._default_executor is None:
  269. return
  270. future = loop.create_future()
  271. thread = threading.Thread(target=_do_shutdown, args=(future,))
  272. thread.start()
  273. try:
  274. await future
  275. finally:
  276. thread.join()
  277. T_Retval = TypeVar("T_Retval")
  278. T_contra = TypeVar("T_contra", contravariant=True)
  279. PosArgsT = TypeVarTuple("PosArgsT")
  280. P = ParamSpec("P")
  281. _root_task: RunVar[asyncio.Task | None] = RunVar("_root_task")
  282. def find_root_task() -> asyncio.Task:
  283. root_task = _root_task.get(None)
  284. if root_task is not None and not root_task.done():
  285. return root_task
  286. # Look for a task that has been started via run_until_complete()
  287. for task in all_tasks():
  288. if task._callbacks and not task.done():
  289. callbacks = [cb for cb, context in task._callbacks]
  290. for cb in callbacks:
  291. if (
  292. cb is _run_until_complete_cb
  293. or getattr(cb, "__module__", None) == "uvloop.loop"
  294. ):
  295. _root_task.set(task)
  296. return task
  297. # Look up the topmost task in the AnyIO task tree, if possible
  298. task = cast(asyncio.Task, current_task())
  299. state = _task_states.get(task)
  300. if state:
  301. cancel_scope = state.cancel_scope
  302. while cancel_scope and cancel_scope._parent_scope is not None:
  303. cancel_scope = cancel_scope._parent_scope
  304. if cancel_scope is not None:
  305. return cast(asyncio.Task, cancel_scope._host_task)
  306. return task
  307. def get_callable_name(func: Callable) -> str:
  308. module = getattr(func, "__module__", None)
  309. qualname = getattr(func, "__qualname__", None)
  310. return ".".join([x for x in (module, qualname) if x])
  311. #
  312. # Event loop
  313. #
  314. _run_vars: WeakKeyDictionary[asyncio.AbstractEventLoop, Any] = WeakKeyDictionary()
  315. def _task_started(task: asyncio.Task) -> bool:
  316. """Return ``True`` if the task has been started and has not finished."""
  317. # The task coro should never be None here, as we never add finished tasks to the
  318. # task list
  319. coro = task.get_coro()
  320. assert coro is not None
  321. try:
  322. return getcoroutinestate(coro) in (CORO_RUNNING, CORO_SUSPENDED)
  323. except AttributeError:
  324. # task coro is async_genenerator_asend https://bugs.python.org/issue37771
  325. raise Exception(f"Cannot determine if task {task} has started or not") from None
  326. #
  327. # Timeouts and cancellation
  328. #
  329. def is_anyio_cancellation(exc: CancelledError) -> bool:
  330. # Sometimes third party frameworks catch a CancelledError and raise a new one, so as
  331. # a workaround we have to look at the previous ones in __context__ too for a
  332. # matching cancel message
  333. while True:
  334. if (
  335. exc.args
  336. and isinstance(exc.args[0], str)
  337. and exc.args[0].startswith("Cancelled via cancel scope ")
  338. ):
  339. return True
  340. if isinstance(exc.__context__, CancelledError):
  341. exc = exc.__context__
  342. continue
  343. return False
  344. class CancelScope(BaseCancelScope):
  345. def __new__(
  346. cls, *, deadline: float = math.inf, shield: bool = False
  347. ) -> CancelScope:
  348. return object.__new__(cls)
  349. def __init__(self, deadline: float = math.inf, shield: bool = False):
  350. self._deadline = deadline
  351. self._shield = shield
  352. self._parent_scope: CancelScope | None = None
  353. self._child_scopes: set[CancelScope] = set()
  354. self._cancel_called = False
  355. self._cancel_reason: str | None = None
  356. self._cancelled_caught = False
  357. self._active = False
  358. self._timeout_handle: asyncio.TimerHandle | None = None
  359. self._cancel_handle: asyncio.Handle | None = None
  360. self._tasks: set[asyncio.Task] = set()
  361. self._host_task: asyncio.Task | None = None
  362. if sys.version_info >= (3, 11):
  363. self._pending_uncancellations: int | None = 0
  364. else:
  365. self._pending_uncancellations = None
  366. def __enter__(self) -> CancelScope:
  367. if self._active:
  368. raise RuntimeError(
  369. "Each CancelScope may only be used for a single 'with' block"
  370. )
  371. self._host_task = host_task = cast(asyncio.Task, current_task())
  372. self._tasks.add(host_task)
  373. try:
  374. task_state = _task_states[host_task]
  375. except KeyError:
  376. task_state = TaskState(None, self)
  377. _task_states[host_task] = task_state
  378. else:
  379. self._parent_scope = task_state.cancel_scope
  380. task_state.cancel_scope = self
  381. if self._parent_scope is not None:
  382. # If using an eager task factory, the parent scope may not even contain
  383. # the host task
  384. self._parent_scope._child_scopes.add(self)
  385. self._parent_scope._tasks.discard(host_task)
  386. self._timeout()
  387. self._active = True
  388. # Start cancelling the host task if the scope was cancelled before entering
  389. if self._cancel_called:
  390. self._deliver_cancellation(self)
  391. return self
  392. def __exit__(
  393. self,
  394. exc_type: type[BaseException] | None,
  395. exc_val: BaseException | None,
  396. exc_tb: TracebackType | None,
  397. ) -> bool:
  398. del exc_tb
  399. if not self._active:
  400. raise RuntimeError("This cancel scope is not active")
  401. if current_task() is not self._host_task:
  402. raise RuntimeError(
  403. "Attempted to exit cancel scope in a different task than it was "
  404. "entered in"
  405. )
  406. assert self._host_task is not None
  407. host_task_state = _task_states.get(self._host_task)
  408. if host_task_state is None or host_task_state.cancel_scope is not self:
  409. raise RuntimeError(
  410. "Attempted to exit a cancel scope that isn't the current tasks's "
  411. "current cancel scope"
  412. )
  413. try:
  414. self._active = False
  415. if self._timeout_handle:
  416. self._timeout_handle.cancel()
  417. self._timeout_handle = None
  418. self._tasks.remove(self._host_task)
  419. if self._parent_scope is not None:
  420. self._parent_scope._child_scopes.remove(self)
  421. self._parent_scope._tasks.add(self._host_task)
  422. host_task_state.cancel_scope = self._parent_scope
  423. # Restart the cancellation effort in the closest visible, cancelled parent
  424. # scope if necessary
  425. self._restart_cancellation_in_parent()
  426. # We only swallow the exception iff it was an AnyIO CancelledError, either
  427. # directly as exc_val or inside an exception group and there are no cancelled
  428. # parent cancel scopes visible to us here
  429. if self._cancel_called and not self._parent_cancellation_is_visible_to_us:
  430. # For each level-cancel() call made on the host task, call uncancel()
  431. while self._pending_uncancellations:
  432. self._host_task.uncancel()
  433. self._pending_uncancellations -= 1
  434. # Update cancelled_caught and check for exceptions we must not swallow
  435. cannot_swallow_exc_val = False
  436. if exc_val is not None:
  437. for exc in iterate_exceptions(exc_val):
  438. if isinstance(exc, CancelledError) and is_anyio_cancellation(
  439. exc
  440. ):
  441. self._cancelled_caught = True
  442. else:
  443. cannot_swallow_exc_val = True
  444. return self._cancelled_caught and not cannot_swallow_exc_val
  445. else:
  446. if self._pending_uncancellations:
  447. assert self._parent_scope is not None
  448. assert self._parent_scope._pending_uncancellations is not None
  449. self._parent_scope._pending_uncancellations += (
  450. self._pending_uncancellations
  451. )
  452. self._pending_uncancellations = 0
  453. return False
  454. finally:
  455. self._host_task = None
  456. del exc_val
  457. @property
  458. def _effectively_cancelled(self) -> bool:
  459. cancel_scope: CancelScope | None = self
  460. while cancel_scope is not None:
  461. if cancel_scope._cancel_called:
  462. return True
  463. if cancel_scope.shield:
  464. return False
  465. cancel_scope = cancel_scope._parent_scope
  466. return False
  467. @property
  468. def _parent_cancellation_is_visible_to_us(self) -> bool:
  469. return (
  470. self._parent_scope is not None
  471. and not self.shield
  472. and self._parent_scope._effectively_cancelled
  473. )
  474. def _timeout(self) -> None:
  475. if self._deadline != math.inf:
  476. loop = get_running_loop()
  477. if loop.time() >= self._deadline:
  478. self.cancel("deadline exceeded")
  479. else:
  480. self._timeout_handle = loop.call_at(self._deadline, self._timeout)
  481. def _deliver_cancellation(self, origin: CancelScope) -> bool:
  482. """
  483. Deliver cancellation to directly contained tasks and nested cancel scopes.
  484. Schedule another run at the end if we still have tasks eligible for
  485. cancellation.
  486. :param origin: the cancel scope that originated the cancellation
  487. :return: ``True`` if the delivery needs to be retried on the next cycle
  488. """
  489. should_retry = False
  490. current = current_task()
  491. for task in self._tasks:
  492. should_retry = True
  493. if task._must_cancel: # type: ignore[attr-defined]
  494. continue
  495. # The task is eligible for cancellation if it has started
  496. if task is not current and (task is self._host_task or _task_started(task)):
  497. waiter = task._fut_waiter # type: ignore[attr-defined]
  498. if not isinstance(waiter, asyncio.Future) or not waiter.done():
  499. task.cancel(origin._cancel_reason)
  500. if (
  501. task is origin._host_task
  502. and origin._pending_uncancellations is not None
  503. ):
  504. origin._pending_uncancellations += 1
  505. # Deliver cancellation to child scopes that aren't shielded or running their own
  506. # cancellation callbacks
  507. for scope in self._child_scopes:
  508. if not scope._shield and not scope.cancel_called:
  509. should_retry = scope._deliver_cancellation(origin) or should_retry
  510. # Schedule another callback if there are still tasks left
  511. if origin is self:
  512. if should_retry:
  513. self._cancel_handle = get_running_loop().call_soon(
  514. self._deliver_cancellation, origin
  515. )
  516. else:
  517. self._cancel_handle = None
  518. return should_retry
  519. def _restart_cancellation_in_parent(self) -> None:
  520. """
  521. Restart the cancellation effort in the closest directly cancelled parent scope.
  522. """
  523. scope = self._parent_scope
  524. while scope is not None:
  525. if scope._cancel_called:
  526. if scope._cancel_handle is None:
  527. scope._deliver_cancellation(scope)
  528. break
  529. # No point in looking beyond any shielded scope
  530. if scope._shield:
  531. break
  532. scope = scope._parent_scope
  533. def cancel(self, reason: str | None = None) -> None:
  534. if not self._cancel_called:
  535. if self._timeout_handle:
  536. self._timeout_handle.cancel()
  537. self._timeout_handle = None
  538. self._cancel_called = True
  539. self._cancel_reason = f"Cancelled via cancel scope {id(self):x}"
  540. if task := current_task():
  541. self._cancel_reason += f" by {task}"
  542. if reason:
  543. self._cancel_reason += f"; reason: {reason}"
  544. if self._host_task is not None:
  545. self._deliver_cancellation(self)
  546. @property
  547. def deadline(self) -> float:
  548. return self._deadline
  549. @deadline.setter
  550. def deadline(self, value: float) -> None:
  551. self._deadline = float(value)
  552. if self._timeout_handle is not None:
  553. self._timeout_handle.cancel()
  554. self._timeout_handle = None
  555. if self._active and not self._cancel_called:
  556. self._timeout()
  557. @property
  558. def cancel_called(self) -> bool:
  559. return self._cancel_called
  560. @property
  561. def cancelled_caught(self) -> bool:
  562. return self._cancelled_caught
  563. @property
  564. def shield(self) -> bool:
  565. return self._shield
  566. @shield.setter
  567. def shield(self, value: bool) -> None:
  568. if self._shield != value:
  569. self._shield = value
  570. if not value:
  571. self._restart_cancellation_in_parent()
  572. #
  573. # Task states
  574. #
  575. class TaskState:
  576. """
  577. Encapsulates auxiliary task information that cannot be added to the Task instance
  578. itself because there are no guarantees about its implementation.
  579. """
  580. __slots__ = "parent_id", "cancel_scope", "__weakref__"
  581. def __init__(self, parent_id: int | None, cancel_scope: CancelScope | None):
  582. self.parent_id = parent_id
  583. self.cancel_scope = cancel_scope
  584. _task_states: WeakKeyDictionary[asyncio.Task, TaskState] = WeakKeyDictionary()
  585. #
  586. # Task groups
  587. #
  588. class _AsyncioTaskStatus(abc.TaskStatus):
  589. def __init__(self, future: asyncio.Future, parent_id: int):
  590. self._future = future
  591. self._parent_id = parent_id
  592. def started(self, value: T_contra | None = None) -> None:
  593. try:
  594. self._future.set_result(value)
  595. except asyncio.InvalidStateError:
  596. if not self._future.cancelled():
  597. raise RuntimeError(
  598. "called 'started' twice on the same task status"
  599. ) from None
  600. task = cast(asyncio.Task, current_task())
  601. _task_states[task].parent_id = self._parent_id
  602. if sys.version_info >= (3, 12):
  603. _eager_task_factory_code: CodeType | None = asyncio.eager_task_factory.__code__
  604. else:
  605. _eager_task_factory_code = None
  606. class TaskGroup(abc.TaskGroup):
  607. def __init__(self) -> None:
  608. self.cancel_scope: CancelScope = CancelScope()
  609. self._active = False
  610. self._exceptions: list[BaseException] = []
  611. self._tasks: set[asyncio.Task] = set()
  612. self._on_completed_fut: asyncio.Future[None] | None = None
  613. async def __aenter__(self) -> TaskGroup:
  614. self.cancel_scope.__enter__()
  615. self._active = True
  616. return self
  617. async def __aexit__(
  618. self,
  619. exc_type: type[BaseException] | None,
  620. exc_val: BaseException | None,
  621. exc_tb: TracebackType | None,
  622. ) -> bool:
  623. try:
  624. if exc_val is not None:
  625. self.cancel_scope.cancel()
  626. if not isinstance(exc_val, CancelledError):
  627. self._exceptions.append(exc_val)
  628. loop = get_running_loop()
  629. try:
  630. if self._tasks:
  631. with CancelScope() as wait_scope:
  632. while self._tasks:
  633. self._on_completed_fut = loop.create_future()
  634. try:
  635. await self._on_completed_fut
  636. except CancelledError as exc:
  637. # Shield the scope against further cancellation attempts,
  638. # as they're not productive (#695)
  639. wait_scope.shield = True
  640. self.cancel_scope.cancel()
  641. # Set exc_val from the cancellation exception if it was
  642. # previously unset. However, we should not replace a native
  643. # cancellation exception with one raise by a cancel scope.
  644. if exc_val is None or (
  645. isinstance(exc_val, CancelledError)
  646. and not is_anyio_cancellation(exc)
  647. ):
  648. exc_val = exc
  649. self._on_completed_fut = None
  650. else:
  651. # If there are no child tasks to wait on, run at least one checkpoint
  652. # anyway
  653. await AsyncIOBackend.cancel_shielded_checkpoint()
  654. self._active = False
  655. if self._exceptions:
  656. # The exception that got us here should already have been
  657. # added to self._exceptions so it's ok to break exception
  658. # chaining and avoid adding a "During handling of above..."
  659. # for each nesting level.
  660. raise BaseExceptionGroup(
  661. "unhandled errors in a TaskGroup", self._exceptions
  662. ) from None
  663. elif exc_val:
  664. raise exc_val
  665. except BaseException as exc:
  666. if self.cancel_scope.__exit__(type(exc), exc, exc.__traceback__):
  667. return True
  668. raise
  669. return self.cancel_scope.__exit__(exc_type, exc_val, exc_tb)
  670. finally:
  671. del exc_val, exc_tb, self._exceptions
  672. def _spawn(
  673. self,
  674. func: Callable[[Unpack[PosArgsT]], Awaitable[Any]],
  675. args: tuple[Unpack[PosArgsT]],
  676. name: object,
  677. task_status_future: asyncio.Future | None = None,
  678. ) -> asyncio.Task:
  679. def task_done(_task: asyncio.Task) -> None:
  680. if sys.version_info >= (3, 14) and self.cancel_scope._host_task is not None:
  681. asyncio.future_discard_from_awaited_by(
  682. _task, self.cancel_scope._host_task
  683. )
  684. task_state = _task_states[_task]
  685. assert task_state.cancel_scope is not None
  686. assert _task in task_state.cancel_scope._tasks
  687. task_state.cancel_scope._tasks.remove(_task)
  688. self._tasks.remove(task)
  689. del _task_states[_task]
  690. if self._on_completed_fut is not None and not self._tasks:
  691. try:
  692. self._on_completed_fut.set_result(None)
  693. except asyncio.InvalidStateError:
  694. pass
  695. try:
  696. exc = _task.exception()
  697. except CancelledError as e:
  698. while isinstance(e.__context__, CancelledError):
  699. e = e.__context__
  700. exc = e
  701. if exc is not None:
  702. # The future can only be in the cancelled state if the host task was
  703. # cancelled, so return immediately instead of adding one more
  704. # CancelledError to the exceptions list
  705. if task_status_future is not None and task_status_future.cancelled():
  706. return
  707. if task_status_future is None or task_status_future.done():
  708. if not isinstance(exc, CancelledError):
  709. self._exceptions.append(exc)
  710. if not self.cancel_scope._effectively_cancelled:
  711. self.cancel_scope.cancel()
  712. else:
  713. task_status_future.set_exception(exc)
  714. elif task_status_future is not None and not task_status_future.done():
  715. task_status_future.set_exception(
  716. RuntimeError("Child exited without calling task_status.started()")
  717. )
  718. if not self._active:
  719. raise RuntimeError(
  720. "This task group is not active; no new tasks can be started."
  721. )
  722. kwargs = {}
  723. if task_status_future:
  724. parent_id = id(current_task())
  725. kwargs["task_status"] = _AsyncioTaskStatus(
  726. task_status_future, id(self.cancel_scope._host_task)
  727. )
  728. else:
  729. parent_id = id(self.cancel_scope._host_task)
  730. coro = func(*args, **kwargs)
  731. if not iscoroutine(coro):
  732. prefix = f"{func.__module__}." if hasattr(func, "__module__") else ""
  733. raise TypeError(
  734. f"Expected {prefix}{func.__qualname__}() to return a coroutine, but "
  735. f"the return value ({coro!r}) is not a coroutine object"
  736. )
  737. name = get_callable_name(func) if name is None else str(name)
  738. loop = asyncio.get_running_loop()
  739. if (
  740. (factory := loop.get_task_factory())
  741. and getattr(factory, "__code__", None) is _eager_task_factory_code
  742. and (closure := getattr(factory, "__closure__", None))
  743. ):
  744. custom_task_constructor = closure[0].cell_contents
  745. task = custom_task_constructor(coro, loop=loop, name=name)
  746. else:
  747. task = create_task(coro, name=name)
  748. # Make the spawned task inherit the task group's cancel scope
  749. _task_states[task] = TaskState(
  750. parent_id=parent_id, cancel_scope=self.cancel_scope
  751. )
  752. self.cancel_scope._tasks.add(task)
  753. self._tasks.add(task)
  754. if sys.version_info >= (3, 14) and self.cancel_scope._host_task is not None:
  755. asyncio.future_add_to_awaited_by(task, self.cancel_scope._host_task)
  756. task.add_done_callback(task_done)
  757. return task
  758. def start_soon(
  759. self,
  760. func: Callable[[Unpack[PosArgsT]], Awaitable[Any]],
  761. *args: Unpack[PosArgsT],
  762. name: object = None,
  763. ) -> None:
  764. self._spawn(func, args, name)
  765. async def start(
  766. self, func: Callable[..., Awaitable[Any]], *args: object, name: object = None
  767. ) -> Any:
  768. future: asyncio.Future = asyncio.Future()
  769. task = self._spawn(func, args, name, future)
  770. # If the task raises an exception after sending a start value without a switch
  771. # point between, the task group is cancelled and this method never proceeds to
  772. # process the completed future. That's why we have to have a shielded cancel
  773. # scope here.
  774. try:
  775. return await future
  776. except CancelledError:
  777. # Cancel the task and wait for it to exit before returning
  778. task.cancel()
  779. with CancelScope(shield=True), suppress(CancelledError):
  780. await task
  781. raise
  782. #
  783. # Threads
  784. #
  785. _Retval_Queue_Type = tuple[Optional[T_Retval], Optional[BaseException]]
  786. class WorkerThread(Thread):
  787. MAX_IDLE_TIME = 10 # seconds
  788. def __init__(
  789. self,
  790. root_task: asyncio.Task,
  791. workers: set[WorkerThread],
  792. idle_workers: deque[WorkerThread],
  793. ):
  794. super().__init__(name="AnyIO worker thread")
  795. self.root_task = root_task
  796. self.workers = workers
  797. self.idle_workers = idle_workers
  798. self.loop = root_task._loop
  799. self.queue: Queue[
  800. tuple[Context, Callable, tuple, asyncio.Future, CancelScope] | None
  801. ] = Queue(2)
  802. self.idle_since = AsyncIOBackend.current_time()
  803. self.stopping = False
  804. def _report_result(
  805. self, future: asyncio.Future, result: Any, exc: BaseException | None
  806. ) -> None:
  807. self.idle_since = AsyncIOBackend.current_time()
  808. if not self.stopping:
  809. self.idle_workers.append(self)
  810. if not future.cancelled():
  811. if exc is not None:
  812. if isinstance(exc, StopIteration):
  813. new_exc = RuntimeError("coroutine raised StopIteration")
  814. new_exc.__cause__ = exc
  815. exc = new_exc
  816. future.set_exception(exc)
  817. else:
  818. future.set_result(result)
  819. def run(self) -> None:
  820. with claim_worker_thread(AsyncIOBackend, self.loop):
  821. while True:
  822. item = self.queue.get()
  823. if item is None:
  824. # Shutdown command received
  825. return
  826. context, func, args, future, cancel_scope = item
  827. if not future.cancelled():
  828. result = None
  829. exception: BaseException | None = None
  830. threadlocals.current_cancel_scope = cancel_scope
  831. try:
  832. result = context.run(func, *args)
  833. except BaseException as exc:
  834. exception = exc
  835. finally:
  836. del threadlocals.current_cancel_scope
  837. if not self.loop.is_closed():
  838. self.loop.call_soon_threadsafe(
  839. self._report_result, future, result, exception
  840. )
  841. del result, exception
  842. self.queue.task_done()
  843. del item, context, func, args, future, cancel_scope
  844. def stop(self, f: asyncio.Task | None = None) -> None:
  845. self.stopping = True
  846. self.queue.put_nowait(None)
  847. self.workers.discard(self)
  848. try:
  849. self.idle_workers.remove(self)
  850. except ValueError:
  851. pass
  852. _threadpool_idle_workers: RunVar[deque[WorkerThread]] = RunVar(
  853. "_threadpool_idle_workers"
  854. )
  855. _threadpool_workers: RunVar[set[WorkerThread]] = RunVar("_threadpool_workers")
  856. class BlockingPortal(abc.BlockingPortal):
  857. def __new__(cls) -> BlockingPortal:
  858. return object.__new__(cls)
  859. def __init__(self) -> None:
  860. super().__init__()
  861. self._loop = get_running_loop()
  862. def _spawn_task_from_thread(
  863. self,
  864. func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval] | T_Retval],
  865. args: tuple[Unpack[PosArgsT]],
  866. kwargs: dict[str, Any],
  867. name: object,
  868. future: Future[T_Retval],
  869. ) -> None:
  870. AsyncIOBackend.run_sync_from_thread(
  871. partial(self._task_group.start_soon, name=name),
  872. (self._call_func, func, args, kwargs, future),
  873. self._loop,
  874. )
  875. #
  876. # Subprocesses
  877. #
  878. @dataclass(eq=False)
  879. class StreamReaderWrapper(abc.ByteReceiveStream):
  880. _stream: asyncio.StreamReader
  881. async def receive(self, max_bytes: int = 65536) -> bytes:
  882. data = await self._stream.read(max_bytes)
  883. if data:
  884. return data
  885. else:
  886. raise EndOfStream
  887. async def aclose(self) -> None:
  888. self._stream.set_exception(ClosedResourceError())
  889. await AsyncIOBackend.checkpoint()
  890. @dataclass(eq=False)
  891. class StreamWriterWrapper(abc.ByteSendStream):
  892. _stream: asyncio.StreamWriter
  893. _closed: bool = field(init=False, default=False)
  894. async def send(self, item: bytes) -> None:
  895. await AsyncIOBackend.checkpoint_if_cancelled()
  896. stream_paused = self._stream._protocol._paused # type: ignore[attr-defined]
  897. try:
  898. self._stream.write(item)
  899. await self._stream.drain()
  900. except (ConnectionResetError, BrokenPipeError, RuntimeError) as exc:
  901. # If closed by us and/or the peer:
  902. # * on stdlib, drain() raises ConnectionResetError or BrokenPipeError
  903. # * on uvloop and Winloop, write() eventually starts raising RuntimeError
  904. if self._closed:
  905. raise ClosedResourceError from exc
  906. elif self._stream.is_closing():
  907. raise BrokenResourceError from exc
  908. raise
  909. if not stream_paused:
  910. await AsyncIOBackend.cancel_shielded_checkpoint()
  911. async def aclose(self) -> None:
  912. self._closed = True
  913. self._stream.close()
  914. await AsyncIOBackend.checkpoint()
  915. @dataclass(eq=False)
  916. class Process(abc.Process):
  917. _process: asyncio.subprocess.Process
  918. _stdin: StreamWriterWrapper | None
  919. _stdout: StreamReaderWrapper | None
  920. _stderr: StreamReaderWrapper | None
  921. async def aclose(self) -> None:
  922. with CancelScope(shield=True) as scope:
  923. if self._stdin:
  924. await self._stdin.aclose()
  925. if self._stdout:
  926. await self._stdout.aclose()
  927. if self._stderr:
  928. await self._stderr.aclose()
  929. scope.shield = False
  930. try:
  931. await self.wait()
  932. except BaseException:
  933. scope.shield = True
  934. self.kill()
  935. await self.wait()
  936. raise
  937. async def wait(self) -> int:
  938. return await self._process.wait()
  939. def terminate(self) -> None:
  940. self._process.terminate()
  941. def kill(self) -> None:
  942. self._process.kill()
  943. def send_signal(self, signal: int) -> None:
  944. self._process.send_signal(signal)
  945. @property
  946. def pid(self) -> int:
  947. return self._process.pid
  948. @property
  949. def returncode(self) -> int | None:
  950. return self._process.returncode
  951. @property
  952. def stdin(self) -> abc.ByteSendStream | None:
  953. return self._stdin
  954. @property
  955. def stdout(self) -> abc.ByteReceiveStream | None:
  956. return self._stdout
  957. @property
  958. def stderr(self) -> abc.ByteReceiveStream | None:
  959. return self._stderr
  960. def _forcibly_shutdown_process_pool_on_exit(
  961. workers: set[Process], _task: object
  962. ) -> None:
  963. """
  964. Forcibly shuts down worker processes belonging to this event loop."""
  965. child_watcher: asyncio.AbstractChildWatcher | None = None # type: ignore[name-defined]
  966. if sys.version_info < (3, 12):
  967. try:
  968. child_watcher = asyncio.get_event_loop_policy().get_child_watcher()
  969. except NotImplementedError:
  970. pass
  971. # Close as much as possible (w/o async/await) to avoid warnings
  972. for process in workers.copy():
  973. if process.returncode is None:
  974. continue
  975. process._stdin._stream._transport.close() # type: ignore[union-attr]
  976. process._stdout._stream._transport.close() # type: ignore[union-attr]
  977. process._stderr._stream._transport.close() # type: ignore[union-attr]
  978. process.kill()
  979. if child_watcher:
  980. child_watcher.remove_child_handler(process.pid)
  981. async def _shutdown_process_pool_on_exit(workers: set[abc.Process]) -> None:
  982. """
  983. Shuts down worker processes belonging to this event loop.
  984. NOTE: this only works when the event loop was started using asyncio.run() or
  985. anyio.run().
  986. """
  987. process: abc.Process
  988. try:
  989. await sleep(math.inf)
  990. except asyncio.CancelledError:
  991. workers = workers.copy()
  992. for process in workers:
  993. if process.returncode is None:
  994. process.kill()
  995. for process in workers:
  996. await process.aclose()
  997. #
  998. # Sockets and networking
  999. #
  1000. class StreamProtocol(asyncio.Protocol):
  1001. read_queue: deque[bytes]
  1002. read_event: asyncio.Event
  1003. write_event: asyncio.Event
  1004. exception: Exception | None = None
  1005. is_at_eof: bool = False
  1006. def connection_made(self, transport: asyncio.BaseTransport) -> None:
  1007. self.read_queue = deque()
  1008. self.read_event = asyncio.Event()
  1009. self.write_event = asyncio.Event()
  1010. self.write_event.set()
  1011. cast(asyncio.Transport, transport).set_write_buffer_limits(0)
  1012. def connection_lost(self, exc: Exception | None) -> None:
  1013. if exc:
  1014. self.exception = BrokenResourceError()
  1015. self.exception.__cause__ = exc
  1016. self.read_event.set()
  1017. self.write_event.set()
  1018. def data_received(self, data: bytes) -> None:
  1019. # ProactorEventloop sometimes sends bytearray instead of bytes
  1020. self.read_queue.append(bytes(data))
  1021. self.read_event.set()
  1022. def eof_received(self) -> bool | None:
  1023. self.is_at_eof = True
  1024. self.read_event.set()
  1025. return True
  1026. def pause_writing(self) -> None:
  1027. self.write_event = asyncio.Event()
  1028. def resume_writing(self) -> None:
  1029. self.write_event.set()
  1030. class DatagramProtocol(asyncio.DatagramProtocol):
  1031. read_queue: deque[tuple[bytes, IPSockAddrType]]
  1032. read_event: asyncio.Event
  1033. write_event: asyncio.Event
  1034. exception: Exception | None = None
  1035. def connection_made(self, transport: asyncio.BaseTransport) -> None:
  1036. self.read_queue = deque(maxlen=100) # arbitrary value
  1037. self.read_event = asyncio.Event()
  1038. self.write_event = asyncio.Event()
  1039. self.write_event.set()
  1040. def connection_lost(self, exc: Exception | None) -> None:
  1041. self.read_event.set()
  1042. self.write_event.set()
  1043. def datagram_received(self, data: bytes, addr: IPSockAddrType) -> None:
  1044. addr = convert_ipv6_sockaddr(addr)
  1045. self.read_queue.append((data, addr))
  1046. self.read_event.set()
  1047. def error_received(self, exc: Exception) -> None:
  1048. self.exception = exc
  1049. def pause_writing(self) -> None:
  1050. self.write_event.clear()
  1051. def resume_writing(self) -> None:
  1052. self.write_event.set()
  1053. class SocketStream(abc.SocketStream):
  1054. def __init__(self, transport: asyncio.Transport, protocol: StreamProtocol):
  1055. self._transport = transport
  1056. self._protocol = protocol
  1057. self._receive_guard = ResourceGuard("reading from")
  1058. self._send_guard = ResourceGuard("writing to")
  1059. self._closed = False
  1060. @property
  1061. def _raw_socket(self) -> socket.socket:
  1062. return self._transport.get_extra_info("socket")
  1063. async def receive(self, max_bytes: int = 65536) -> bytes:
  1064. with self._receive_guard:
  1065. if (
  1066. not self._protocol.read_event.is_set()
  1067. and not self._transport.is_closing()
  1068. and not self._protocol.is_at_eof
  1069. ):
  1070. self._transport.resume_reading()
  1071. await self._protocol.read_event.wait()
  1072. self._transport.pause_reading()
  1073. else:
  1074. await AsyncIOBackend.checkpoint()
  1075. try:
  1076. chunk = self._protocol.read_queue.popleft()
  1077. except IndexError:
  1078. if self._closed:
  1079. raise ClosedResourceError from None
  1080. elif self._protocol.exception:
  1081. raise self._protocol.exception from None
  1082. else:
  1083. raise EndOfStream from None
  1084. if len(chunk) > max_bytes:
  1085. # Split the oversized chunk
  1086. chunk, leftover = chunk[:max_bytes], chunk[max_bytes:]
  1087. self._protocol.read_queue.appendleft(leftover)
  1088. # If the read queue is empty, clear the flag so that the next call will
  1089. # block until data is available
  1090. if not self._protocol.read_queue:
  1091. self._protocol.read_event.clear()
  1092. return chunk
  1093. async def send(self, item: bytes) -> None:
  1094. with self._send_guard:
  1095. await AsyncIOBackend.checkpoint()
  1096. if self._closed:
  1097. raise ClosedResourceError
  1098. elif self._protocol.exception is not None:
  1099. raise self._protocol.exception
  1100. try:
  1101. self._transport.write(item)
  1102. except RuntimeError as exc:
  1103. if self._transport.is_closing():
  1104. raise BrokenResourceError from exc
  1105. else:
  1106. raise
  1107. await self._protocol.write_event.wait()
  1108. async def send_eof(self) -> None:
  1109. try:
  1110. self._transport.write_eof()
  1111. except OSError:
  1112. pass
  1113. async def aclose(self) -> None:
  1114. self._closed = True
  1115. if not self._transport.is_closing():
  1116. try:
  1117. self._transport.write_eof()
  1118. except OSError:
  1119. pass
  1120. self._transport.close()
  1121. await sleep(0)
  1122. self._transport.abort()
  1123. class _RawSocketMixin:
  1124. _receive_future: asyncio.Future | None = None
  1125. _send_future: asyncio.Future | None = None
  1126. _closing = False
  1127. def __init__(self, raw_socket: socket.socket):
  1128. self.__raw_socket = raw_socket
  1129. self._receive_guard = ResourceGuard("reading from")
  1130. self._send_guard = ResourceGuard("writing to")
  1131. @property
  1132. def _raw_socket(self) -> socket.socket:
  1133. return self.__raw_socket
  1134. def _wait_until_readable(self, loop: asyncio.AbstractEventLoop) -> asyncio.Future:
  1135. def callback(f: object) -> None:
  1136. del self._receive_future
  1137. loop.remove_reader(self.__raw_socket)
  1138. f = self._receive_future = asyncio.Future()
  1139. loop.add_reader(self.__raw_socket, f.set_result, None)
  1140. f.add_done_callback(callback)
  1141. return f
  1142. def _wait_until_writable(self, loop: asyncio.AbstractEventLoop) -> asyncio.Future:
  1143. def callback(f: object) -> None:
  1144. del self._send_future
  1145. loop.remove_writer(self.__raw_socket)
  1146. f = self._send_future = asyncio.Future()
  1147. loop.add_writer(self.__raw_socket, f.set_result, None)
  1148. f.add_done_callback(callback)
  1149. return f
  1150. async def aclose(self) -> None:
  1151. if not self._closing:
  1152. self._closing = True
  1153. if self.__raw_socket.fileno() != -1:
  1154. self.__raw_socket.close()
  1155. if self._receive_future:
  1156. self._receive_future.set_result(None)
  1157. if self._send_future:
  1158. self._send_future.set_result(None)
  1159. class UNIXSocketStream(_RawSocketMixin, abc.UNIXSocketStream):
  1160. async def send_eof(self) -> None:
  1161. with self._send_guard:
  1162. self._raw_socket.shutdown(socket.SHUT_WR)
  1163. async def receive(self, max_bytes: int = 65536) -> bytes:
  1164. loop = get_running_loop()
  1165. await AsyncIOBackend.checkpoint()
  1166. with self._receive_guard:
  1167. while True:
  1168. try:
  1169. data = self._raw_socket.recv(max_bytes)
  1170. except BlockingIOError:
  1171. await self._wait_until_readable(loop)
  1172. except OSError as exc:
  1173. if self._closing:
  1174. raise ClosedResourceError from None
  1175. else:
  1176. raise BrokenResourceError from exc
  1177. else:
  1178. if not data:
  1179. raise EndOfStream
  1180. return data
  1181. async def send(self, item: bytes) -> None:
  1182. loop = get_running_loop()
  1183. await AsyncIOBackend.checkpoint()
  1184. with self._send_guard:
  1185. view = memoryview(item)
  1186. while view:
  1187. try:
  1188. bytes_sent = self._raw_socket.send(view)
  1189. except BlockingIOError:
  1190. await self._wait_until_writable(loop)
  1191. except OSError as exc:
  1192. if self._closing:
  1193. raise ClosedResourceError from None
  1194. else:
  1195. raise BrokenResourceError from exc
  1196. else:
  1197. view = view[bytes_sent:]
  1198. async def receive_fds(self, msglen: int, maxfds: int) -> tuple[bytes, list[int]]:
  1199. if not isinstance(msglen, int) or msglen < 0:
  1200. raise ValueError("msglen must be a non-negative integer")
  1201. if not isinstance(maxfds, int) or maxfds < 1:
  1202. raise ValueError("maxfds must be a positive integer")
  1203. loop = get_running_loop()
  1204. fds = array.array("i")
  1205. await AsyncIOBackend.checkpoint()
  1206. with self._receive_guard:
  1207. while True:
  1208. try:
  1209. message, ancdata, flags, addr = self._raw_socket.recvmsg(
  1210. msglen, socket.CMSG_LEN(maxfds * fds.itemsize)
  1211. )
  1212. except BlockingIOError:
  1213. await self._wait_until_readable(loop)
  1214. except OSError as exc:
  1215. if self._closing:
  1216. raise ClosedResourceError from None
  1217. else:
  1218. raise BrokenResourceError from exc
  1219. else:
  1220. if not message and not ancdata:
  1221. raise EndOfStream
  1222. break
  1223. for cmsg_level, cmsg_type, cmsg_data in ancdata:
  1224. if cmsg_level != socket.SOL_SOCKET or cmsg_type != socket.SCM_RIGHTS:
  1225. raise RuntimeError(
  1226. f"Received unexpected ancillary data; message = {message!r}, "
  1227. f"cmsg_level = {cmsg_level}, cmsg_type = {cmsg_type}"
  1228. )
  1229. fds.frombytes(cmsg_data[: len(cmsg_data) - (len(cmsg_data) % fds.itemsize)])
  1230. return message, list(fds)
  1231. async def send_fds(self, message: bytes, fds: Collection[int | IOBase]) -> None:
  1232. if not message:
  1233. raise ValueError("message must not be empty")
  1234. if not fds:
  1235. raise ValueError("fds must not be empty")
  1236. loop = get_running_loop()
  1237. filenos: list[int] = []
  1238. for fd in fds:
  1239. if isinstance(fd, int):
  1240. filenos.append(fd)
  1241. elif isinstance(fd, IOBase):
  1242. filenos.append(fd.fileno())
  1243. fdarray = array.array("i", filenos)
  1244. await AsyncIOBackend.checkpoint()
  1245. with self._send_guard:
  1246. while True:
  1247. try:
  1248. # The ignore can be removed after mypy picks up
  1249. # https://github.com/python/typeshed/pull/5545
  1250. self._raw_socket.sendmsg(
  1251. [message], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, fdarray)]
  1252. )
  1253. break
  1254. except BlockingIOError:
  1255. await self._wait_until_writable(loop)
  1256. except OSError as exc:
  1257. if self._closing:
  1258. raise ClosedResourceError from None
  1259. else:
  1260. raise BrokenResourceError from exc
  1261. class TCPSocketListener(abc.SocketListener):
  1262. _accept_scope: CancelScope | None = None
  1263. _closed = False
  1264. def __init__(self, raw_socket: socket.socket):
  1265. self.__raw_socket = raw_socket
  1266. self._loop = cast(asyncio.BaseEventLoop, get_running_loop())
  1267. self._accept_guard = ResourceGuard("accepting connections from")
  1268. @property
  1269. def _raw_socket(self) -> socket.socket:
  1270. return self.__raw_socket
  1271. async def accept(self) -> abc.SocketStream:
  1272. if self._closed:
  1273. raise ClosedResourceError
  1274. with self._accept_guard:
  1275. await AsyncIOBackend.checkpoint()
  1276. with CancelScope() as self._accept_scope:
  1277. try:
  1278. client_sock, _addr = await self._loop.sock_accept(self._raw_socket)
  1279. except asyncio.CancelledError:
  1280. # Workaround for https://bugs.python.org/issue41317
  1281. try:
  1282. self._loop.remove_reader(self._raw_socket)
  1283. except (ValueError, NotImplementedError):
  1284. pass
  1285. if self._closed:
  1286. raise ClosedResourceError from None
  1287. raise
  1288. finally:
  1289. self._accept_scope = None
  1290. client_sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
  1291. transport, protocol = await self._loop.connect_accepted_socket(
  1292. StreamProtocol, client_sock
  1293. )
  1294. return SocketStream(transport, protocol)
  1295. async def aclose(self) -> None:
  1296. if self._closed:
  1297. return
  1298. self._closed = True
  1299. if self._accept_scope:
  1300. # Workaround for https://bugs.python.org/issue41317
  1301. try:
  1302. self._loop.remove_reader(self._raw_socket)
  1303. except (ValueError, NotImplementedError):
  1304. pass
  1305. self._accept_scope.cancel()
  1306. await sleep(0)
  1307. self._raw_socket.close()
  1308. class UNIXSocketListener(abc.SocketListener):
  1309. def __init__(self, raw_socket: socket.socket):
  1310. self.__raw_socket = raw_socket
  1311. self._loop = get_running_loop()
  1312. self._accept_guard = ResourceGuard("accepting connections from")
  1313. self._closed = False
  1314. async def accept(self) -> abc.SocketStream:
  1315. await AsyncIOBackend.checkpoint()
  1316. with self._accept_guard:
  1317. while True:
  1318. try:
  1319. client_sock, _ = self.__raw_socket.accept()
  1320. client_sock.setblocking(False)
  1321. return UNIXSocketStream(client_sock)
  1322. except BlockingIOError:
  1323. f: asyncio.Future = asyncio.Future()
  1324. self._loop.add_reader(self.__raw_socket, f.set_result, None)
  1325. f.add_done_callback(
  1326. lambda _: self._loop.remove_reader(self.__raw_socket)
  1327. )
  1328. await f
  1329. except OSError as exc:
  1330. if self._closed:
  1331. raise ClosedResourceError from None
  1332. else:
  1333. raise BrokenResourceError from exc
  1334. async def aclose(self) -> None:
  1335. self._closed = True
  1336. self.__raw_socket.close()
  1337. @property
  1338. def _raw_socket(self) -> socket.socket:
  1339. return self.__raw_socket
  1340. class UDPSocket(abc.UDPSocket):
  1341. def __init__(
  1342. self, transport: asyncio.DatagramTransport, protocol: DatagramProtocol
  1343. ):
  1344. self._transport = transport
  1345. self._protocol = protocol
  1346. self._receive_guard = ResourceGuard("reading from")
  1347. self._send_guard = ResourceGuard("writing to")
  1348. self._closed = False
  1349. @property
  1350. def _raw_socket(self) -> socket.socket:
  1351. return self._transport.get_extra_info("socket")
  1352. async def aclose(self) -> None:
  1353. self._closed = True
  1354. if not self._transport.is_closing():
  1355. self._transport.close()
  1356. async def receive(self) -> tuple[bytes, IPSockAddrType]:
  1357. with self._receive_guard:
  1358. await AsyncIOBackend.checkpoint()
  1359. # If the buffer is empty, ask for more data
  1360. if not self._protocol.read_queue and not self._transport.is_closing():
  1361. self._protocol.read_event.clear()
  1362. await self._protocol.read_event.wait()
  1363. try:
  1364. return self._protocol.read_queue.popleft()
  1365. except IndexError:
  1366. if self._closed:
  1367. raise ClosedResourceError from None
  1368. else:
  1369. raise BrokenResourceError from None
  1370. async def send(self, item: UDPPacketType) -> None:
  1371. with self._send_guard:
  1372. await AsyncIOBackend.checkpoint()
  1373. await self._protocol.write_event.wait()
  1374. if self._closed:
  1375. raise ClosedResourceError
  1376. elif self._transport.is_closing():
  1377. raise BrokenResourceError
  1378. else:
  1379. self._transport.sendto(*item)
  1380. class ConnectedUDPSocket(abc.ConnectedUDPSocket):
  1381. def __init__(
  1382. self, transport: asyncio.DatagramTransport, protocol: DatagramProtocol
  1383. ):
  1384. self._transport = transport
  1385. self._protocol = protocol
  1386. self._receive_guard = ResourceGuard("reading from")
  1387. self._send_guard = ResourceGuard("writing to")
  1388. self._closed = False
  1389. @property
  1390. def _raw_socket(self) -> socket.socket:
  1391. return self._transport.get_extra_info("socket")
  1392. async def aclose(self) -> None:
  1393. self._closed = True
  1394. if not self._transport.is_closing():
  1395. self._transport.close()
  1396. async def receive(self) -> bytes:
  1397. with self._receive_guard:
  1398. await AsyncIOBackend.checkpoint()
  1399. # If the buffer is empty, ask for more data
  1400. if not self._protocol.read_queue and not self._transport.is_closing():
  1401. self._protocol.read_event.clear()
  1402. await self._protocol.read_event.wait()
  1403. try:
  1404. packet = self._protocol.read_queue.popleft()
  1405. except IndexError:
  1406. if self._closed:
  1407. raise ClosedResourceError from None
  1408. else:
  1409. raise BrokenResourceError from None
  1410. return packet[0]
  1411. async def send(self, item: bytes) -> None:
  1412. with self._send_guard:
  1413. await AsyncIOBackend.checkpoint()
  1414. await self._protocol.write_event.wait()
  1415. if self._closed:
  1416. raise ClosedResourceError
  1417. elif self._transport.is_closing():
  1418. raise BrokenResourceError
  1419. else:
  1420. self._transport.sendto(item)
  1421. class UNIXDatagramSocket(_RawSocketMixin, abc.UNIXDatagramSocket):
  1422. async def receive(self) -> UNIXDatagramPacketType:
  1423. loop = get_running_loop()
  1424. await AsyncIOBackend.checkpoint()
  1425. with self._receive_guard:
  1426. while True:
  1427. try:
  1428. data = self._raw_socket.recvfrom(65536)
  1429. except BlockingIOError:
  1430. await self._wait_until_readable(loop)
  1431. except OSError as exc:
  1432. if self._closing:
  1433. raise ClosedResourceError from None
  1434. else:
  1435. raise BrokenResourceError from exc
  1436. else:
  1437. return data
  1438. async def send(self, item: UNIXDatagramPacketType) -> None:
  1439. loop = get_running_loop()
  1440. await AsyncIOBackend.checkpoint()
  1441. with self._send_guard:
  1442. while True:
  1443. try:
  1444. self._raw_socket.sendto(*item)
  1445. except BlockingIOError:
  1446. await self._wait_until_writable(loop)
  1447. except OSError as exc:
  1448. if self._closing:
  1449. raise ClosedResourceError from None
  1450. else:
  1451. raise BrokenResourceError from exc
  1452. else:
  1453. return
  1454. class ConnectedUNIXDatagramSocket(_RawSocketMixin, abc.ConnectedUNIXDatagramSocket):
  1455. async def receive(self) -> bytes:
  1456. loop = get_running_loop()
  1457. await AsyncIOBackend.checkpoint()
  1458. with self._receive_guard:
  1459. while True:
  1460. try:
  1461. data = self._raw_socket.recv(65536)
  1462. except BlockingIOError:
  1463. await self._wait_until_readable(loop)
  1464. except OSError as exc:
  1465. if self._closing:
  1466. raise ClosedResourceError from None
  1467. else:
  1468. raise BrokenResourceError from exc
  1469. else:
  1470. return data
  1471. async def send(self, item: bytes) -> None:
  1472. loop = get_running_loop()
  1473. await AsyncIOBackend.checkpoint()
  1474. with self._send_guard:
  1475. while True:
  1476. try:
  1477. self._raw_socket.send(item)
  1478. except BlockingIOError:
  1479. await self._wait_until_writable(loop)
  1480. except OSError as exc:
  1481. if self._closing:
  1482. raise ClosedResourceError from None
  1483. else:
  1484. raise BrokenResourceError from exc
  1485. else:
  1486. return
  1487. _read_events: RunVar[dict[int, asyncio.Future[bool]]] = RunVar("read_events")
  1488. _write_events: RunVar[dict[int, asyncio.Future[bool]]] = RunVar("write_events")
  1489. #
  1490. # Synchronization
  1491. #
  1492. class Event(BaseEvent):
  1493. def __new__(cls) -> Event:
  1494. return object.__new__(cls)
  1495. def __init__(self) -> None:
  1496. self._event = asyncio.Event()
  1497. def set(self) -> None:
  1498. self._event.set()
  1499. def is_set(self) -> bool:
  1500. return self._event.is_set()
  1501. async def wait(self) -> None:
  1502. if self.is_set():
  1503. await AsyncIOBackend.checkpoint()
  1504. else:
  1505. await self._event.wait()
  1506. def statistics(self) -> EventStatistics:
  1507. return EventStatistics(len(self._event._waiters))
  1508. class Lock(BaseLock):
  1509. def __new__(cls, *, fast_acquire: bool = False) -> Lock:
  1510. return object.__new__(cls)
  1511. def __init__(self, *, fast_acquire: bool = False) -> None:
  1512. self._fast_acquire = fast_acquire
  1513. self._owner_task: asyncio.Task | None = None
  1514. self._waiters: deque[tuple[asyncio.Task, asyncio.Future]] = deque()
  1515. async def acquire(self) -> None:
  1516. task = cast(asyncio.Task, current_task())
  1517. if self._owner_task is None and not self._waiters:
  1518. await AsyncIOBackend.checkpoint_if_cancelled()
  1519. self._owner_task = task
  1520. # Unless on the "fast path", yield control of the event loop so that other
  1521. # tasks can run too
  1522. if not self._fast_acquire:
  1523. try:
  1524. await AsyncIOBackend.cancel_shielded_checkpoint()
  1525. except CancelledError:
  1526. self.release()
  1527. raise
  1528. return
  1529. if self._owner_task == task:
  1530. raise RuntimeError("Attempted to acquire an already held Lock")
  1531. fut: asyncio.Future[None] = asyncio.Future()
  1532. item = task, fut
  1533. self._waiters.append(item)
  1534. try:
  1535. await fut
  1536. except CancelledError:
  1537. self._waiters.remove(item)
  1538. if self._owner_task is task:
  1539. self.release()
  1540. raise
  1541. self._waiters.remove(item)
  1542. def acquire_nowait(self) -> None:
  1543. task = cast(asyncio.Task, current_task())
  1544. if self._owner_task is None and not self._waiters:
  1545. self._owner_task = task
  1546. return
  1547. if self._owner_task is task:
  1548. raise RuntimeError("Attempted to acquire an already held Lock")
  1549. raise WouldBlock
  1550. def locked(self) -> bool:
  1551. return self._owner_task is not None
  1552. def release(self) -> None:
  1553. if self._owner_task != current_task():
  1554. raise RuntimeError("The current task is not holding this lock")
  1555. for task, fut in self._waiters:
  1556. if not fut.cancelled():
  1557. self._owner_task = task
  1558. fut.set_result(None)
  1559. return
  1560. self._owner_task = None
  1561. def statistics(self) -> LockStatistics:
  1562. task_info = AsyncIOTaskInfo(self._owner_task) if self._owner_task else None
  1563. return LockStatistics(self.locked(), task_info, len(self._waiters))
  1564. class Semaphore(BaseSemaphore):
  1565. def __new__(
  1566. cls,
  1567. initial_value: int,
  1568. *,
  1569. max_value: int | None = None,
  1570. fast_acquire: bool = False,
  1571. ) -> Semaphore:
  1572. return object.__new__(cls)
  1573. def __init__(
  1574. self,
  1575. initial_value: int,
  1576. *,
  1577. max_value: int | None = None,
  1578. fast_acquire: bool = False,
  1579. ):
  1580. super().__init__(initial_value, max_value=max_value)
  1581. self._value = initial_value
  1582. self._max_value = max_value
  1583. self._fast_acquire = fast_acquire
  1584. self._waiters: deque[asyncio.Future[None]] = deque()
  1585. async def acquire(self) -> None:
  1586. if self._value > 0 and not self._waiters:
  1587. await AsyncIOBackend.checkpoint_if_cancelled()
  1588. self._value -= 1
  1589. # Unless on the "fast path", yield control of the event loop so that other
  1590. # tasks can run too
  1591. if not self._fast_acquire:
  1592. try:
  1593. await AsyncIOBackend.cancel_shielded_checkpoint()
  1594. except CancelledError:
  1595. self.release()
  1596. raise
  1597. return
  1598. fut: asyncio.Future[None] = asyncio.Future()
  1599. self._waiters.append(fut)
  1600. try:
  1601. await fut
  1602. except CancelledError:
  1603. try:
  1604. self._waiters.remove(fut)
  1605. except ValueError:
  1606. self.release()
  1607. raise
  1608. def acquire_nowait(self) -> None:
  1609. if self._value == 0:
  1610. raise WouldBlock
  1611. self._value -= 1
  1612. def release(self) -> None:
  1613. if self._max_value is not None and self._value == self._max_value:
  1614. raise ValueError("semaphore released too many times")
  1615. for fut in self._waiters:
  1616. if not fut.cancelled():
  1617. fut.set_result(None)
  1618. self._waiters.remove(fut)
  1619. return
  1620. self._value += 1
  1621. @property
  1622. def value(self) -> int:
  1623. return self._value
  1624. @property
  1625. def max_value(self) -> int | None:
  1626. return self._max_value
  1627. def statistics(self) -> SemaphoreStatistics:
  1628. return SemaphoreStatistics(len(self._waiters))
  1629. class CapacityLimiter(BaseCapacityLimiter):
  1630. _total_tokens: float = 0
  1631. def __new__(cls, total_tokens: float) -> CapacityLimiter:
  1632. return object.__new__(cls)
  1633. def __init__(self, total_tokens: float):
  1634. self._borrowers: set[Any] = set()
  1635. self._wait_queue: OrderedDict[Any, asyncio.Event] = OrderedDict()
  1636. self.total_tokens = total_tokens
  1637. async def __aenter__(self) -> None:
  1638. await self.acquire()
  1639. async def __aexit__(
  1640. self,
  1641. exc_type: type[BaseException] | None,
  1642. exc_val: BaseException | None,
  1643. exc_tb: TracebackType | None,
  1644. ) -> None:
  1645. self.release()
  1646. @property
  1647. def total_tokens(self) -> float:
  1648. return self._total_tokens
  1649. @total_tokens.setter
  1650. def total_tokens(self, value: float) -> None:
  1651. if not isinstance(value, int) and not math.isinf(value):
  1652. raise TypeError("total_tokens must be an int or math.inf")
  1653. if value < 0:
  1654. raise ValueError("total_tokens must be >= 0")
  1655. waiters_to_notify = max(value - self._total_tokens, 0)
  1656. self._total_tokens = value
  1657. # Notify waiting tasks that they have acquired the limiter
  1658. while self._wait_queue and waiters_to_notify:
  1659. event = self._wait_queue.popitem(last=False)[1]
  1660. event.set()
  1661. waiters_to_notify -= 1
  1662. @property
  1663. def borrowed_tokens(self) -> int:
  1664. return len(self._borrowers)
  1665. @property
  1666. def available_tokens(self) -> float:
  1667. return self._total_tokens - len(self._borrowers)
  1668. def _notify_next_waiter(self) -> None:
  1669. """Notify the next task in line if this limiter has free capacity now."""
  1670. if self._wait_queue and len(self._borrowers) < self._total_tokens:
  1671. event = self._wait_queue.popitem(last=False)[1]
  1672. event.set()
  1673. def acquire_nowait(self) -> None:
  1674. self.acquire_on_behalf_of_nowait(current_task())
  1675. def acquire_on_behalf_of_nowait(self, borrower: object) -> None:
  1676. if borrower in self._borrowers:
  1677. raise RuntimeError(
  1678. "this borrower is already holding one of this CapacityLimiter's tokens"
  1679. )
  1680. if self._wait_queue or len(self._borrowers) >= self._total_tokens:
  1681. raise WouldBlock
  1682. self._borrowers.add(borrower)
  1683. async def acquire(self) -> None:
  1684. return await self.acquire_on_behalf_of(current_task())
  1685. async def acquire_on_behalf_of(self, borrower: object) -> None:
  1686. await AsyncIOBackend.checkpoint_if_cancelled()
  1687. try:
  1688. self.acquire_on_behalf_of_nowait(borrower)
  1689. except WouldBlock:
  1690. event = asyncio.Event()
  1691. self._wait_queue[borrower] = event
  1692. try:
  1693. await event.wait()
  1694. except BaseException:
  1695. self._wait_queue.pop(borrower, None)
  1696. if event.is_set():
  1697. self._notify_next_waiter()
  1698. raise
  1699. self._borrowers.add(borrower)
  1700. else:
  1701. try:
  1702. await AsyncIOBackend.cancel_shielded_checkpoint()
  1703. except BaseException:
  1704. self.release()
  1705. raise
  1706. def release(self) -> None:
  1707. self.release_on_behalf_of(current_task())
  1708. def release_on_behalf_of(self, borrower: object) -> None:
  1709. try:
  1710. self._borrowers.remove(borrower)
  1711. except KeyError:
  1712. raise RuntimeError(
  1713. "this borrower isn't holding any of this CapacityLimiter's tokens"
  1714. ) from None
  1715. self._notify_next_waiter()
  1716. def statistics(self) -> CapacityLimiterStatistics:
  1717. return CapacityLimiterStatistics(
  1718. self.borrowed_tokens,
  1719. self.total_tokens,
  1720. tuple(self._borrowers),
  1721. len(self._wait_queue),
  1722. )
  1723. _default_thread_limiter: RunVar[CapacityLimiter] = RunVar("_default_thread_limiter")
  1724. #
  1725. # Operating system signals
  1726. #
  1727. class _SignalReceiver:
  1728. def __init__(self, signals: tuple[Signals, ...]):
  1729. self._signals = signals
  1730. self._loop = get_running_loop()
  1731. self._signal_queue: deque[Signals] = deque()
  1732. self._future: asyncio.Future = asyncio.Future()
  1733. self._handled_signals: set[Signals] = set()
  1734. def _deliver(self, signum: Signals) -> None:
  1735. self._signal_queue.append(signum)
  1736. if not self._future.done():
  1737. self._future.set_result(None)
  1738. def __enter__(self) -> _SignalReceiver:
  1739. for sig in set(self._signals):
  1740. self._loop.add_signal_handler(sig, self._deliver, sig)
  1741. self._handled_signals.add(sig)
  1742. return self
  1743. def __exit__(
  1744. self,
  1745. exc_type: type[BaseException] | None,
  1746. exc_val: BaseException | None,
  1747. exc_tb: TracebackType | None,
  1748. ) -> None:
  1749. for sig in self._handled_signals:
  1750. self._loop.remove_signal_handler(sig)
  1751. def __aiter__(self) -> _SignalReceiver:
  1752. return self
  1753. async def __anext__(self) -> Signals:
  1754. await AsyncIOBackend.checkpoint()
  1755. if not self._signal_queue:
  1756. self._future = asyncio.Future()
  1757. await self._future
  1758. return self._signal_queue.popleft()
  1759. #
  1760. # Testing and debugging
  1761. #
  1762. class AsyncIOTaskInfo(TaskInfo):
  1763. def __init__(self, task: asyncio.Task):
  1764. task_state = _task_states.get(task)
  1765. if task_state is None:
  1766. parent_id = None
  1767. else:
  1768. parent_id = task_state.parent_id
  1769. coro = task.get_coro()
  1770. assert coro is not None, "created TaskInfo from a completed Task"
  1771. super().__init__(id(task), parent_id, task.get_name(), coro)
  1772. self._task = weakref.ref(task)
  1773. def has_pending_cancellation(self) -> bool:
  1774. if not (task := self._task()):
  1775. # If the task isn't around anymore, it won't have a pending cancellation
  1776. return False
  1777. if task._must_cancel: # type: ignore[attr-defined]
  1778. return True
  1779. elif (
  1780. isinstance(task._fut_waiter, asyncio.Future) # type: ignore[attr-defined]
  1781. and task._fut_waiter.cancelled() # type: ignore[attr-defined]
  1782. ):
  1783. return True
  1784. if task_state := _task_states.get(task):
  1785. if cancel_scope := task_state.cancel_scope:
  1786. return cancel_scope._effectively_cancelled
  1787. return False
  1788. class TestRunner(abc.TestRunner):
  1789. _send_stream: MemoryObjectSendStream[tuple[Awaitable[Any], asyncio.Future[Any]]]
  1790. def __init__(
  1791. self,
  1792. *,
  1793. debug: bool | None = None,
  1794. use_uvloop: bool = False,
  1795. loop_factory: Callable[[], AbstractEventLoop] | None = None,
  1796. ) -> None:
  1797. if use_uvloop and loop_factory is None:
  1798. if sys.platform != "win32":
  1799. import uvloop
  1800. loop_factory = uvloop.new_event_loop
  1801. else:
  1802. import winloop
  1803. loop_factory = winloop.new_event_loop
  1804. self._runner = Runner(debug=debug, loop_factory=loop_factory)
  1805. self._exceptions: list[BaseException] = []
  1806. self._runner_task: asyncio.Task | None = None
  1807. def __enter__(self) -> TestRunner:
  1808. self._runner.__enter__()
  1809. self.get_loop().set_exception_handler(self._exception_handler)
  1810. return self
  1811. def __exit__(
  1812. self,
  1813. exc_type: type[BaseException] | None,
  1814. exc_val: BaseException | None,
  1815. exc_tb: TracebackType | None,
  1816. ) -> None:
  1817. self._runner.__exit__(exc_type, exc_val, exc_tb)
  1818. def get_loop(self) -> AbstractEventLoop:
  1819. return self._runner.get_loop()
  1820. def _exception_handler(
  1821. self, loop: asyncio.AbstractEventLoop, context: dict[str, Any]
  1822. ) -> None:
  1823. if isinstance(context.get("exception"), Exception):
  1824. self._exceptions.append(context["exception"])
  1825. else:
  1826. loop.default_exception_handler(context)
  1827. def _raise_async_exceptions(self) -> None:
  1828. # Re-raise any exceptions raised in asynchronous callbacks
  1829. if self._exceptions:
  1830. exceptions, self._exceptions = self._exceptions, []
  1831. if len(exceptions) == 1:
  1832. raise exceptions[0]
  1833. elif exceptions:
  1834. raise BaseExceptionGroup(
  1835. "Multiple exceptions occurred in asynchronous callbacks", exceptions
  1836. )
  1837. async def _run_tests_and_fixtures(
  1838. self,
  1839. receive_stream: MemoryObjectReceiveStream[
  1840. tuple[Awaitable[T_Retval], asyncio.Future[T_Retval]]
  1841. ],
  1842. ) -> None:
  1843. from _pytest.outcomes import OutcomeException
  1844. with receive_stream, self._send_stream:
  1845. async for coro, future in receive_stream:
  1846. try:
  1847. retval = await coro
  1848. except CancelledError as exc:
  1849. if not future.cancelled():
  1850. future.cancel(*exc.args)
  1851. raise
  1852. except BaseException as exc:
  1853. if not future.cancelled():
  1854. future.set_exception(exc)
  1855. if not isinstance(exc, (Exception, OutcomeException)):
  1856. raise
  1857. else:
  1858. if not future.cancelled():
  1859. future.set_result(retval)
  1860. async def _call_in_runner_task(
  1861. self,
  1862. func: Callable[P, Awaitable[T_Retval]],
  1863. *args: P.args,
  1864. **kwargs: P.kwargs,
  1865. ) -> T_Retval:
  1866. if not self._runner_task:
  1867. self._send_stream, receive_stream = create_memory_object_stream[
  1868. tuple[Awaitable[Any], asyncio.Future]
  1869. ](1)
  1870. self._runner_task = self.get_loop().create_task(
  1871. self._run_tests_and_fixtures(receive_stream)
  1872. )
  1873. coro = func(*args, **kwargs)
  1874. future: asyncio.Future[T_Retval] = self.get_loop().create_future()
  1875. self._send_stream.send_nowait((coro, future))
  1876. return await future
  1877. def run_asyncgen_fixture(
  1878. self,
  1879. fixture_func: Callable[..., AsyncGenerator[T_Retval, Any]],
  1880. kwargs: dict[str, Any],
  1881. ) -> Iterable[T_Retval]:
  1882. asyncgen = fixture_func(**kwargs)
  1883. fixturevalue: T_Retval = self.get_loop().run_until_complete(
  1884. self._call_in_runner_task(asyncgen.asend, None)
  1885. )
  1886. self._raise_async_exceptions()
  1887. yield fixturevalue
  1888. try:
  1889. self.get_loop().run_until_complete(
  1890. self._call_in_runner_task(asyncgen.asend, None)
  1891. )
  1892. except StopAsyncIteration:
  1893. self._raise_async_exceptions()
  1894. else:
  1895. self.get_loop().run_until_complete(asyncgen.aclose())
  1896. raise RuntimeError("Async generator fixture did not stop")
  1897. def run_fixture(
  1898. self,
  1899. fixture_func: Callable[..., Coroutine[Any, Any, T_Retval]],
  1900. kwargs: dict[str, Any],
  1901. ) -> T_Retval:
  1902. retval = self.get_loop().run_until_complete(
  1903. self._call_in_runner_task(fixture_func, **kwargs)
  1904. )
  1905. self._raise_async_exceptions()
  1906. return retval
  1907. def run_test(
  1908. self, test_func: Callable[..., Coroutine[Any, Any, Any]], kwargs: dict[str, Any]
  1909. ) -> None:
  1910. try:
  1911. self.get_loop().run_until_complete(
  1912. self._call_in_runner_task(test_func, **kwargs)
  1913. )
  1914. except Exception as exc:
  1915. self._exceptions.append(exc)
  1916. self._raise_async_exceptions()
  1917. class AsyncIOBackend(AsyncBackend):
  1918. @classmethod
  1919. def run(
  1920. cls,
  1921. func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]],
  1922. args: tuple[Unpack[PosArgsT]],
  1923. kwargs: dict[str, Any],
  1924. options: dict[str, Any],
  1925. ) -> T_Retval:
  1926. @wraps(func)
  1927. async def wrapper() -> T_Retval:
  1928. task = cast(asyncio.Task, current_task())
  1929. task.set_name(get_callable_name(func))
  1930. _task_states[task] = TaskState(None, None)
  1931. try:
  1932. return await func(*args)
  1933. finally:
  1934. del _task_states[task]
  1935. debug = options.get("debug", None)
  1936. loop_factory = options.get("loop_factory", None)
  1937. if loop_factory is None and options.get("use_uvloop", False):
  1938. if sys.platform != "win32":
  1939. import uvloop
  1940. loop_factory = uvloop.new_event_loop
  1941. else:
  1942. import winloop
  1943. loop_factory = winloop.new_event_loop
  1944. with Runner(debug=debug, loop_factory=loop_factory) as runner:
  1945. return runner.run(wrapper())
  1946. @classmethod
  1947. def current_token(cls) -> object:
  1948. return get_running_loop()
  1949. @classmethod
  1950. def current_time(cls) -> float:
  1951. return get_running_loop().time()
  1952. @classmethod
  1953. def cancelled_exception_class(cls) -> type[BaseException]:
  1954. return CancelledError
  1955. @classmethod
  1956. async def checkpoint(cls) -> None:
  1957. await sleep(0)
  1958. @classmethod
  1959. async def checkpoint_if_cancelled(cls) -> None:
  1960. task = current_task()
  1961. if task is None:
  1962. return
  1963. try:
  1964. cancel_scope = _task_states[task].cancel_scope
  1965. except KeyError:
  1966. return
  1967. while cancel_scope:
  1968. if cancel_scope.cancel_called:
  1969. await sleep(0)
  1970. elif cancel_scope.shield:
  1971. break
  1972. else:
  1973. cancel_scope = cancel_scope._parent_scope
  1974. @classmethod
  1975. async def cancel_shielded_checkpoint(cls) -> None:
  1976. with CancelScope(shield=True):
  1977. await sleep(0)
  1978. @classmethod
  1979. async def sleep(cls, delay: float) -> None:
  1980. await sleep(delay)
  1981. @classmethod
  1982. def create_cancel_scope(
  1983. cls, *, deadline: float = math.inf, shield: bool = False
  1984. ) -> CancelScope:
  1985. return CancelScope(deadline=deadline, shield=shield)
  1986. @classmethod
  1987. def current_effective_deadline(cls) -> float:
  1988. if (task := current_task()) is None:
  1989. return math.inf
  1990. try:
  1991. cancel_scope = _task_states[task].cancel_scope
  1992. except KeyError:
  1993. return math.inf
  1994. deadline = math.inf
  1995. while cancel_scope:
  1996. deadline = min(deadline, cancel_scope.deadline)
  1997. if cancel_scope._cancel_called:
  1998. deadline = -math.inf
  1999. break
  2000. elif cancel_scope.shield:
  2001. break
  2002. else:
  2003. cancel_scope = cancel_scope._parent_scope
  2004. return deadline
  2005. @classmethod
  2006. def create_task_group(cls) -> abc.TaskGroup:
  2007. return TaskGroup()
  2008. @classmethod
  2009. def create_event(cls) -> abc.Event:
  2010. return Event()
  2011. @classmethod
  2012. def create_lock(cls, *, fast_acquire: bool) -> abc.Lock:
  2013. return Lock(fast_acquire=fast_acquire)
  2014. @classmethod
  2015. def create_semaphore(
  2016. cls,
  2017. initial_value: int,
  2018. *,
  2019. max_value: int | None = None,
  2020. fast_acquire: bool = False,
  2021. ) -> abc.Semaphore:
  2022. return Semaphore(initial_value, max_value=max_value, fast_acquire=fast_acquire)
  2023. @classmethod
  2024. def create_capacity_limiter(cls, total_tokens: float) -> abc.CapacityLimiter:
  2025. return CapacityLimiter(total_tokens)
  2026. @classmethod
  2027. async def run_sync_in_worker_thread( # type: ignore[return]
  2028. cls,
  2029. func: Callable[[Unpack[PosArgsT]], T_Retval],
  2030. args: tuple[Unpack[PosArgsT]],
  2031. abandon_on_cancel: bool = False,
  2032. limiter: abc.CapacityLimiter | None = None,
  2033. ) -> T_Retval:
  2034. await cls.checkpoint()
  2035. # If this is the first run in this event loop thread, set up the necessary
  2036. # variables
  2037. try:
  2038. idle_workers = _threadpool_idle_workers.get()
  2039. workers = _threadpool_workers.get()
  2040. except LookupError:
  2041. idle_workers = deque()
  2042. workers = set()
  2043. _threadpool_idle_workers.set(idle_workers)
  2044. _threadpool_workers.set(workers)
  2045. async with limiter or cls.current_default_thread_limiter():
  2046. with CancelScope(shield=not abandon_on_cancel) as scope:
  2047. future = asyncio.Future[T_Retval]()
  2048. root_task = find_root_task()
  2049. if not idle_workers:
  2050. worker = WorkerThread(root_task, workers, idle_workers)
  2051. worker.start()
  2052. workers.add(worker)
  2053. root_task.add_done_callback(
  2054. worker.stop, context=contextvars.Context()
  2055. )
  2056. else:
  2057. worker = idle_workers.pop()
  2058. # Prune any other workers that have been idle for MAX_IDLE_TIME
  2059. # seconds or longer
  2060. now = cls.current_time()
  2061. while idle_workers:
  2062. if (
  2063. now - idle_workers[0].idle_since
  2064. < WorkerThread.MAX_IDLE_TIME
  2065. ):
  2066. break
  2067. expired_worker = idle_workers.popleft()
  2068. expired_worker.root_task.remove_done_callback(
  2069. expired_worker.stop
  2070. )
  2071. expired_worker.stop()
  2072. context = copy_context()
  2073. context.run(set_current_async_library, None)
  2074. if abandon_on_cancel or scope._parent_scope is None:
  2075. worker_scope = scope
  2076. else:
  2077. worker_scope = scope._parent_scope
  2078. worker.queue.put_nowait((context, func, args, future, worker_scope))
  2079. return await future
  2080. @classmethod
  2081. def check_cancelled(cls) -> None:
  2082. scope: CancelScope | None = threadlocals.current_cancel_scope
  2083. while scope is not None:
  2084. if scope.cancel_called:
  2085. raise CancelledError(f"Cancelled by cancel scope {id(scope):x}")
  2086. if scope.shield:
  2087. return
  2088. scope = scope._parent_scope
  2089. @classmethod
  2090. def run_async_from_thread(
  2091. cls,
  2092. func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]],
  2093. args: tuple[Unpack[PosArgsT]],
  2094. token: object,
  2095. ) -> T_Retval:
  2096. async def task_wrapper() -> T_Retval:
  2097. __tracebackhide__ = True
  2098. if scope is not None:
  2099. task = cast(asyncio.Task, current_task())
  2100. _task_states[task] = TaskState(None, scope)
  2101. scope._tasks.add(task)
  2102. try:
  2103. return await func(*args)
  2104. except CancelledError as exc:
  2105. raise concurrent.futures.CancelledError(str(exc)) from None
  2106. finally:
  2107. if scope is not None:
  2108. scope._tasks.discard(task)
  2109. loop = cast(
  2110. "AbstractEventLoop", token or threadlocals.current_token.native_token
  2111. )
  2112. if loop.is_closed():
  2113. raise RunFinishedError
  2114. context = copy_context()
  2115. context.run(set_current_async_library, "asyncio")
  2116. scope = getattr(threadlocals, "current_cancel_scope", None)
  2117. f: concurrent.futures.Future[T_Retval] = context.run(
  2118. asyncio.run_coroutine_threadsafe, task_wrapper(), loop=loop
  2119. )
  2120. return f.result()
  2121. @classmethod
  2122. def run_sync_from_thread(
  2123. cls,
  2124. func: Callable[[Unpack[PosArgsT]], T_Retval],
  2125. args: tuple[Unpack[PosArgsT]],
  2126. token: object,
  2127. ) -> T_Retval:
  2128. @wraps(func)
  2129. def wrapper() -> None:
  2130. try:
  2131. set_current_async_library("asyncio")
  2132. f.set_result(func(*args))
  2133. except BaseException as exc:
  2134. f.set_exception(exc)
  2135. if not isinstance(exc, Exception):
  2136. raise
  2137. loop = cast(
  2138. "AbstractEventLoop", token or threadlocals.current_token.native_token
  2139. )
  2140. if loop.is_closed():
  2141. raise RunFinishedError
  2142. f: concurrent.futures.Future[T_Retval] = Future()
  2143. loop.call_soon_threadsafe(wrapper)
  2144. return f.result()
  2145. @classmethod
  2146. def create_blocking_portal(cls) -> abc.BlockingPortal:
  2147. return BlockingPortal()
  2148. @classmethod
  2149. async def open_process(
  2150. cls,
  2151. command: StrOrBytesPath | Sequence[StrOrBytesPath],
  2152. *,
  2153. stdin: int | IO[Any] | None,
  2154. stdout: int | IO[Any] | None,
  2155. stderr: int | IO[Any] | None,
  2156. **kwargs: Any,
  2157. ) -> Process:
  2158. await cls.checkpoint()
  2159. if isinstance(command, PathLike):
  2160. command = os.fspath(command)
  2161. if isinstance(command, (str, bytes)):
  2162. process = await asyncio.create_subprocess_shell(
  2163. command,
  2164. stdin=stdin,
  2165. stdout=stdout,
  2166. stderr=stderr,
  2167. **kwargs,
  2168. )
  2169. else:
  2170. process = await asyncio.create_subprocess_exec(
  2171. *command,
  2172. stdin=stdin,
  2173. stdout=stdout,
  2174. stderr=stderr,
  2175. **kwargs,
  2176. )
  2177. stdin_stream = StreamWriterWrapper(process.stdin) if process.stdin else None
  2178. stdout_stream = StreamReaderWrapper(process.stdout) if process.stdout else None
  2179. stderr_stream = StreamReaderWrapper(process.stderr) if process.stderr else None
  2180. return Process(process, stdin_stream, stdout_stream, stderr_stream)
  2181. @classmethod
  2182. def setup_process_pool_exit_at_shutdown(cls, workers: set[abc.Process]) -> None:
  2183. create_task(
  2184. _shutdown_process_pool_on_exit(workers),
  2185. name="AnyIO process pool shutdown task",
  2186. )
  2187. find_root_task().add_done_callback(
  2188. partial(_forcibly_shutdown_process_pool_on_exit, workers) # type:ignore[arg-type]
  2189. )
  2190. @classmethod
  2191. async def connect_tcp(
  2192. cls, host: str, port: int, local_address: IPSockAddrType | None = None
  2193. ) -> abc.SocketStream:
  2194. transport, protocol = cast(
  2195. tuple[asyncio.Transport, StreamProtocol],
  2196. await get_running_loop().create_connection(
  2197. StreamProtocol, host, port, local_addr=local_address
  2198. ),
  2199. )
  2200. transport.pause_reading()
  2201. return SocketStream(transport, protocol)
  2202. @classmethod
  2203. async def connect_unix(cls, path: str | bytes) -> abc.UNIXSocketStream:
  2204. await cls.checkpoint()
  2205. loop = get_running_loop()
  2206. raw_socket = socket.socket(socket.AF_UNIX)
  2207. raw_socket.setblocking(False)
  2208. while True:
  2209. try:
  2210. raw_socket.connect(path)
  2211. except BlockingIOError:
  2212. f: asyncio.Future = asyncio.Future()
  2213. loop.add_writer(raw_socket, f.set_result, None)
  2214. f.add_done_callback(lambda _: loop.remove_writer(raw_socket))
  2215. await f
  2216. except BaseException:
  2217. raw_socket.close()
  2218. raise
  2219. else:
  2220. return UNIXSocketStream(raw_socket)
  2221. @classmethod
  2222. def create_tcp_listener(cls, sock: socket.socket) -> SocketListener:
  2223. return TCPSocketListener(sock)
  2224. @classmethod
  2225. def create_unix_listener(cls, sock: socket.socket) -> SocketListener:
  2226. return UNIXSocketListener(sock)
  2227. @classmethod
  2228. async def create_udp_socket(
  2229. cls,
  2230. family: AddressFamily,
  2231. local_address: IPSockAddrType | None,
  2232. remote_address: IPSockAddrType | None,
  2233. reuse_port: bool,
  2234. ) -> UDPSocket | ConnectedUDPSocket:
  2235. transport, protocol = await get_running_loop().create_datagram_endpoint(
  2236. DatagramProtocol,
  2237. local_addr=local_address,
  2238. remote_addr=remote_address,
  2239. family=family,
  2240. reuse_port=reuse_port,
  2241. )
  2242. if protocol.exception:
  2243. transport.close()
  2244. raise protocol.exception
  2245. if not remote_address:
  2246. return UDPSocket(transport, protocol)
  2247. else:
  2248. return ConnectedUDPSocket(transport, protocol)
  2249. @classmethod
  2250. async def create_unix_datagram_socket( # type: ignore[override]
  2251. cls, raw_socket: socket.socket, remote_path: str | bytes | None
  2252. ) -> abc.UNIXDatagramSocket | abc.ConnectedUNIXDatagramSocket:
  2253. await cls.checkpoint()
  2254. loop = get_running_loop()
  2255. if remote_path:
  2256. while True:
  2257. try:
  2258. raw_socket.connect(remote_path)
  2259. except BlockingIOError:
  2260. f: asyncio.Future = asyncio.Future()
  2261. loop.add_writer(raw_socket, f.set_result, None)
  2262. f.add_done_callback(lambda _: loop.remove_writer(raw_socket))
  2263. await f
  2264. except BaseException:
  2265. raw_socket.close()
  2266. raise
  2267. else:
  2268. return ConnectedUNIXDatagramSocket(raw_socket)
  2269. else:
  2270. return UNIXDatagramSocket(raw_socket)
  2271. @classmethod
  2272. async def getaddrinfo(
  2273. cls,
  2274. host: bytes | str | None,
  2275. port: str | int | None,
  2276. *,
  2277. family: int | AddressFamily = 0,
  2278. type: int | SocketKind = 0,
  2279. proto: int = 0,
  2280. flags: int = 0,
  2281. ) -> Sequence[
  2282. tuple[
  2283. AddressFamily,
  2284. SocketKind,
  2285. int,
  2286. str,
  2287. tuple[str, int] | tuple[str, int, int, int] | tuple[int, bytes],
  2288. ]
  2289. ]:
  2290. return await get_running_loop().getaddrinfo(
  2291. host, port, family=family, type=type, proto=proto, flags=flags
  2292. )
  2293. @classmethod
  2294. async def getnameinfo(
  2295. cls, sockaddr: IPSockAddrType, flags: int = 0
  2296. ) -> tuple[str, str]:
  2297. return await get_running_loop().getnameinfo(sockaddr, flags)
  2298. @classmethod
  2299. async def wait_readable(cls, obj: FileDescriptorLike) -> None:
  2300. try:
  2301. read_events = _read_events.get()
  2302. except LookupError:
  2303. read_events = {}
  2304. _read_events.set(read_events)
  2305. fd = obj if isinstance(obj, int) else obj.fileno()
  2306. if read_events.get(fd):
  2307. raise BusyResourceError("reading from")
  2308. loop = get_running_loop()
  2309. fut: asyncio.Future[bool] = loop.create_future()
  2310. def cb() -> None:
  2311. try:
  2312. del read_events[fd]
  2313. except KeyError:
  2314. pass
  2315. else:
  2316. remove_reader(fd)
  2317. try:
  2318. fut.set_result(True)
  2319. except asyncio.InvalidStateError:
  2320. pass
  2321. try:
  2322. loop.add_reader(fd, cb)
  2323. except NotImplementedError:
  2324. from anyio._core._asyncio_selector_thread import get_selector
  2325. selector = get_selector()
  2326. selector.add_reader(fd, cb)
  2327. remove_reader = selector.remove_reader
  2328. else:
  2329. remove_reader = loop.remove_reader
  2330. read_events[fd] = fut
  2331. try:
  2332. success = await fut
  2333. finally:
  2334. try:
  2335. del read_events[fd]
  2336. except KeyError:
  2337. pass
  2338. else:
  2339. remove_reader(fd)
  2340. if not success:
  2341. raise ClosedResourceError
  2342. @classmethod
  2343. async def wait_writable(cls, obj: FileDescriptorLike) -> None:
  2344. try:
  2345. write_events = _write_events.get()
  2346. except LookupError:
  2347. write_events = {}
  2348. _write_events.set(write_events)
  2349. fd = obj if isinstance(obj, int) else obj.fileno()
  2350. if write_events.get(fd):
  2351. raise BusyResourceError("writing to")
  2352. loop = get_running_loop()
  2353. fut: asyncio.Future[bool] = loop.create_future()
  2354. def cb() -> None:
  2355. try:
  2356. del write_events[fd]
  2357. except KeyError:
  2358. pass
  2359. else:
  2360. remove_writer(fd)
  2361. try:
  2362. fut.set_result(True)
  2363. except asyncio.InvalidStateError:
  2364. pass
  2365. try:
  2366. loop.add_writer(fd, cb)
  2367. except NotImplementedError:
  2368. from anyio._core._asyncio_selector_thread import get_selector
  2369. selector = get_selector()
  2370. selector.add_writer(fd, cb)
  2371. remove_writer = selector.remove_writer
  2372. else:
  2373. remove_writer = loop.remove_writer
  2374. write_events[fd] = fut
  2375. try:
  2376. success = await fut
  2377. finally:
  2378. try:
  2379. del write_events[fd]
  2380. except KeyError:
  2381. pass
  2382. else:
  2383. remove_writer(fd)
  2384. if not success:
  2385. raise ClosedResourceError
  2386. @classmethod
  2387. def notify_closing(cls, obj: FileDescriptorLike) -> None:
  2388. fd = obj if isinstance(obj, int) else obj.fileno()
  2389. loop = get_running_loop()
  2390. try:
  2391. write_events = _write_events.get()
  2392. except LookupError:
  2393. pass
  2394. else:
  2395. try:
  2396. fut = write_events.pop(fd)
  2397. except KeyError:
  2398. pass
  2399. else:
  2400. try:
  2401. fut.set_result(False)
  2402. except asyncio.InvalidStateError:
  2403. pass
  2404. try:
  2405. loop.remove_writer(fd)
  2406. except NotImplementedError:
  2407. from anyio._core._asyncio_selector_thread import get_selector
  2408. get_selector().remove_writer(fd)
  2409. try:
  2410. read_events = _read_events.get()
  2411. except LookupError:
  2412. pass
  2413. else:
  2414. try:
  2415. fut = read_events.pop(fd)
  2416. except KeyError:
  2417. pass
  2418. else:
  2419. try:
  2420. fut.set_result(False)
  2421. except asyncio.InvalidStateError:
  2422. pass
  2423. try:
  2424. loop.remove_reader(fd)
  2425. except NotImplementedError:
  2426. from anyio._core._asyncio_selector_thread import get_selector
  2427. get_selector().remove_reader(fd)
  2428. @classmethod
  2429. async def wrap_listener_socket(cls, sock: socket.socket) -> SocketListener:
  2430. return TCPSocketListener(sock)
  2431. @classmethod
  2432. async def wrap_stream_socket(cls, sock: socket.socket) -> SocketStream:
  2433. transport, protocol = await get_running_loop().create_connection(
  2434. StreamProtocol, sock=sock
  2435. )
  2436. return SocketStream(transport, protocol)
  2437. @classmethod
  2438. async def wrap_unix_stream_socket(cls, sock: socket.socket) -> UNIXSocketStream:
  2439. return UNIXSocketStream(sock)
  2440. @classmethod
  2441. async def wrap_udp_socket(cls, sock: socket.socket) -> UDPSocket:
  2442. transport, protocol = await get_running_loop().create_datagram_endpoint(
  2443. DatagramProtocol, sock=sock
  2444. )
  2445. return UDPSocket(transport, protocol)
  2446. @classmethod
  2447. async def wrap_connected_udp_socket(cls, sock: socket.socket) -> ConnectedUDPSocket:
  2448. transport, protocol = await get_running_loop().create_datagram_endpoint(
  2449. DatagramProtocol, sock=sock
  2450. )
  2451. return ConnectedUDPSocket(transport, protocol)
  2452. @classmethod
  2453. async def wrap_unix_datagram_socket(cls, sock: socket.socket) -> UNIXDatagramSocket:
  2454. return UNIXDatagramSocket(sock)
  2455. @classmethod
  2456. async def wrap_connected_unix_datagram_socket(
  2457. cls, sock: socket.socket
  2458. ) -> ConnectedUNIXDatagramSocket:
  2459. return ConnectedUNIXDatagramSocket(sock)
  2460. @classmethod
  2461. def current_default_thread_limiter(cls) -> CapacityLimiter:
  2462. try:
  2463. return _default_thread_limiter.get()
  2464. except LookupError:
  2465. limiter = CapacityLimiter(40)
  2466. _default_thread_limiter.set(limiter)
  2467. return limiter
  2468. @classmethod
  2469. def open_signal_receiver(
  2470. cls, *signals: Signals
  2471. ) -> AbstractContextManager[AsyncIterator[Signals]]:
  2472. return _SignalReceiver(signals)
  2473. @classmethod
  2474. def get_current_task(cls) -> TaskInfo:
  2475. return AsyncIOTaskInfo(current_task()) # type: ignore[arg-type]
  2476. @classmethod
  2477. def get_running_tasks(cls) -> Sequence[TaskInfo]:
  2478. return [AsyncIOTaskInfo(task) for task in all_tasks() if not task.done()]
  2479. @classmethod
  2480. async def wait_all_tasks_blocked(cls) -> None:
  2481. await cls.checkpoint()
  2482. this_task = current_task()
  2483. while True:
  2484. for task in all_tasks():
  2485. if task is this_task:
  2486. continue
  2487. waiter = task._fut_waiter # type: ignore[attr-defined]
  2488. if waiter is None or waiter.done():
  2489. await sleep(0.1)
  2490. break
  2491. else:
  2492. return
  2493. @classmethod
  2494. def create_test_runner(cls, options: dict[str, Any]) -> TestRunner:
  2495. return TestRunner(**options)
  2496. backend_class = AsyncIOBackend