tasks.py 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216
  1. # Copyright (C) 2023 The Qt Company Ltd.
  2. # SPDX-License-Identifier: LicenseRef-Qt-Commercial OR LGPL-3.0-only OR GPL-2.0-only OR GPL-3.0-only
  3. from __future__ import annotations
  4. from . import events
  5. from . import futures
  6. import traceback
  7. from typing import Any, Optional
  8. import asyncio
  9. import collections.abc
  10. import concurrent.futures
  11. import contextvars
  12. class QAsyncioTask(futures.QAsyncioFuture):
  13. """ https://docs.python.org/3/library/asyncio-task.html """
  14. def __init__(self, coro: collections.abc.Generator | collections.abc.Coroutine, *,
  15. loop: "events.QAsyncioEventLoop | None" = None, name: str | None = None,
  16. context: contextvars.Context | None = None) -> None:
  17. super().__init__(loop=loop, context=context)
  18. self._source_traceback = None # required for Python < 3.11
  19. self._state: futures.QAsyncioFuture.FutureState = futures.QAsyncioFuture.FutureState.PENDING
  20. self._exception: Optional[BaseException] = None
  21. self._coro = coro # The coroutine for which this task was created.
  22. self._name = name if name else "QtTask"
  23. # The task creates a handle for its coroutine. The handle enqueues the
  24. # task's step function as its callback in the event loop.
  25. self._loop.call_soon(self._step, context=self._context)
  26. # The task step function executes the coroutine until it finishes,
  27. # raises an exception or returns a future. If a future was returned,
  28. # the task will await its completion (or exception). If the task is
  29. # cancelled while it awaits a future, this future must also be
  30. # cancelled in order for the cancellation to be successful.
  31. self._future_to_await: asyncio.Future | None = None
  32. self._cancelled = False # PYSIDE-2644; see _step
  33. self._cancel_count = 0
  34. self._cancel_message: str | None = None
  35. # Store traceback in case of Exception. Useful when exception happens in coroutine
  36. self._tb: str | None = None
  37. # https://docs.python.org/3/library/asyncio-extending.html#task-lifetime-support
  38. asyncio._register_task(self) # type: ignore[arg-type]
  39. def __repr__(self) -> str:
  40. state: str = "Unknown"
  41. if self._state == futures.QAsyncioFuture.FutureState.PENDING:
  42. state = "Pending"
  43. elif self._state == futures.QAsyncioFuture.FutureState.DONE_WITH_RESULT:
  44. state = "Done"
  45. elif self._state == futures.QAsyncioFuture.FutureState.DONE_WITH_EXCEPTION:
  46. state = f"Done with exception ({repr(self._exception)})"
  47. elif self._state == futures.QAsyncioFuture.FutureState.CANCELLED:
  48. state = "Cancelled"
  49. return f"Task '{self.get_name()}' with state: {state}"
  50. class QtTaskApiMisuseError(Exception):
  51. pass
  52. def set_result(self, result: Any) -> None: # type: ignore[override]
  53. # This function is not inherited from the Future APIs.
  54. raise QAsyncioTask.QtTaskApiMisuseError("Tasks cannot set results")
  55. def set_exception(self, exception: Any) -> None: # type: ignore[override]
  56. # This function is not inherited from the Future APIs.
  57. raise QAsyncioTask.QtTaskApiMisuseError("Tasks cannot set exceptions")
  58. def _step(self,
  59. exception_or_future: BaseException | futures.QAsyncioFuture | None = None) -> None:
  60. """
  61. The step function is the heart of a task. It is scheduled in the event
  62. loop repeatedly, executing the coroutine "step" by "step" (i.e.,
  63. iterating through the asynchronous generator) until it finishes with an
  64. exception or successfully. Each step can optionally receive an
  65. exception or a future as a result from a previous step to handle.
  66. """
  67. if self.done():
  68. return
  69. result = None
  70. self._future_to_await = None
  71. if self._cancelled:
  72. exception_or_future = asyncio.CancelledError(self._cancel_message)
  73. self._cancelled = False
  74. if asyncio.futures.isfuture(exception_or_future):
  75. try:
  76. exception_or_future.result()
  77. except BaseException as e:
  78. exception_or_future = e
  79. try:
  80. asyncio._enter_task(self._loop, self) # type: ignore[arg-type]
  81. # It is at this point that the coroutine is resumed for the current
  82. # step (i.e. asynchronous generator iteration). It will now be
  83. # executed until it yields (and potentially returns a future),
  84. # raises an exception, is cancelled, or finishes successfully.
  85. if isinstance(exception_or_future, BaseException):
  86. # If the coroutine doesn't handle this exception, it propagates
  87. # to the caller.
  88. result = self._coro.throw(exception_or_future)
  89. else:
  90. result = self._coro.send(None)
  91. except StopIteration as e:
  92. self._state = futures.QAsyncioFuture.FutureState.DONE_WITH_RESULT
  93. self._result = e.value
  94. except (concurrent.futures.CancelledError, asyncio.exceptions.CancelledError) as e:
  95. self._state = futures.QAsyncioFuture.FutureState.CANCELLED
  96. self._exception = e
  97. except BaseException as e:
  98. self._state = futures.QAsyncioFuture.FutureState.DONE_WITH_EXCEPTION
  99. self._exception = e
  100. self._tb = traceback.format_exc()
  101. else:
  102. if asyncio.futures.isfuture(result):
  103. # If the coroutine yields a future, the task will await its
  104. # completion, and at that point the step function will be
  105. # called again.
  106. result.add_done_callback(
  107. self._step, context=self._context) # type: ignore[arg-type]
  108. # The task will await the completion (or exception) of this
  109. # future. If the task is cancelled while it awaits a future,
  110. # this future must also be cancelled.
  111. self._future_to_await = result
  112. if self._cancelled:
  113. # PYSIDE-2644: If the task was cancelled at this step and a
  114. # new future was created to be awaited, then it should be
  115. # cancelled as well. Otherwise, in some scenarios like a
  116. # loop inside the task and with bad timing, if the new
  117. # future is not cancelled, the task would continue running
  118. # in this loop despite having been cancelled. This bad
  119. # timing can occur especially if the first future finishes
  120. # very quickly.
  121. self._future_to_await.cancel(self._cancel_message)
  122. elif result is None:
  123. # If no future was yielded, we schedule the step function again
  124. # without any arguments.
  125. self._loop.call_soon(self._step, context=self._context)
  126. else:
  127. # This is not supposed to happen.
  128. exception = RuntimeError(f"Bad task result: {result}")
  129. self._loop.call_soon(self._step, exception, context=self._context)
  130. finally:
  131. asyncio._leave_task(self._loop, self) # type: ignore[arg-type]
  132. if self._exception:
  133. message = str(self._exception)
  134. if message == "None":
  135. message = ""
  136. else:
  137. message = "An exception occurred during task execution"
  138. self._loop.call_exception_handler({
  139. "message": message,
  140. "exception": self._exception,
  141. "task": self,
  142. "future": (exception_or_future
  143. if asyncio.futures.isfuture(exception_or_future)
  144. else None),
  145. "traceback": self._tb
  146. })
  147. if self.done():
  148. self._schedule_callbacks()
  149. # https://docs.python.org/3/library/asyncio-extending.html#task-lifetime-support
  150. asyncio._unregister_task(self) # type: ignore[arg-type]
  151. def get_stack(self, *, limit=None) -> list[Any]:
  152. # TODO
  153. raise NotImplementedError("QtTask.get_stack is not implemented")
  154. def print_stack(self, *, limit=None, file=None) -> None:
  155. # TODO
  156. raise NotImplementedError("QtTask.print_stack is not implemented")
  157. def get_coro(self) -> collections.abc.Generator | collections.abc.Coroutine:
  158. return self._coro
  159. def get_name(self) -> str:
  160. return self._name
  161. def set_name(self, value) -> None:
  162. self._name = str(value)
  163. def cancel(self, msg: str | None = None) -> bool:
  164. if self.done():
  165. return False
  166. self._cancel_count += 1
  167. self._cancel_message = msg
  168. if self._future_to_await is not None:
  169. # A task that is awaiting a future must also cancel this future in
  170. # order for the cancellation to be successful.
  171. self._future_to_await.cancel(msg)
  172. self._cancelled = True # PYSIDE-2644; see _step
  173. return True
  174. def uncancel(self) -> int:
  175. if self._cancel_count > 0:
  176. self._cancel_count -= 1
  177. return self._cancel_count
  178. def cancelling(self) -> int:
  179. return self._cancel_count