| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739 |
- # Copyright (C) 2023 The Qt Company Ltd.
- # SPDX-License-Identifier: LicenseRef-Qt-Commercial OR LGPL-3.0-only OR GPL-2.0-only OR GPL-3.0-only
- from __future__ import annotations
- from PySide6.QtCore import (QCoreApplication, QDateTime, QDeadlineTimer,
- QEventLoop, QObject, QTimer, QThread, Slot)
- from . import futures
- from . import tasks
- from typing import Any, Callable, TypeVar
- import asyncio
- import collections.abc
- import concurrent.futures
- import contextvars
- import enum
- import os
- import signal
- import socket
- import subprocess
- import warnings
- __all__ = [
- "QAsyncioEventLoopPolicy", "QAsyncioEventLoop",
- "QAsyncioHandle", "QAsyncioTimerHandle",
- ]
- from typing import TYPE_CHECKING
- _T = TypeVar("_T")
- if TYPE_CHECKING:
- try:
- from typing import TypeVarTuple, Unpack
- except ImportError:
- from typing_extensions import TypeVarTuple, Unpack # type: ignore
- _Ts = TypeVarTuple("_Ts")
- Context = contextvars.Context # type: ignore
- else:
- _Ts = None # type: ignore
- Context = contextvars.Context
- class QAsyncioExecutorWrapper(QObject):
- """
- Executors in asyncio allow running synchronous code in a separate thread or
- process without blocking the event loop or interrupting the asynchronous
- program flow. Callables are scheduled for execution by calling submit() or
- map() on an executor object.
- Executors require a bit of extra work for QtAsyncio, as we can't use
- naked Python threads; instead, we must make sure that the thread created
- by executor.submit() has an event loop. This is achieved by not submitting
- the callable directly, but a small wrapper that attaches a QEventLoop to
- the executor thread, and then creates a zero-delay singleshot timer to push
- the actual callable for the executor into this new event loop.
- """
- def __init__(self, func: Callable[[Unpack[_Ts]], Any], *args: Unpack[_Ts]) -> None:
- super().__init__()
- self._loop: QEventLoop
- self._func = func
- self._args = args
- self._result: Any = None
- self._exception: BaseException | None = None
- def _cb(self):
- try:
- # Call the synchronous callable that we submitted with submit() or
- # map().
- self._result = self._func(*self._args)
- except BaseException as e:
- self._exception = e
- self._loop.exit()
- def do(self) -> Any:
- # This creates a new event loop and dispatcher for the thread, if not
- # already created.
- self._loop = QEventLoop()
- asyncio.events._set_running_loop(self._loop)
- # The do() function will always be executed from the new executor
- # thread and never from outside, so using the overload without the
- # context argument is sufficient.
- QTimer.singleShot(0, lambda: self._cb())
- self._loop.exec()
- if self._exception is not None:
- raise self._exception
- return self._result
- def exit(self):
- self._loop.exit()
- class QAsyncioEventLoopPolicy(asyncio.AbstractEventLoopPolicy):
- """
- Event loop policies are expected to be deprecated with Python 3.13, with
- subsequent removal in Python 3.15. At that point, part of the current
- logic of the QAsyncioEventLoopPolicy constructor will have to be moved
- to QtAsyncio.run() and/or to a loop factory class (to be provided as an
- argument to asyncio.run()). In particular, this concerns the logic of
- setting up the QCoreApplication and the SIGINT handler.
- More details:
- https://discuss.python.org/t/removing-the-asyncio-policy-system-asyncio-set-event-loop-policy-in-python-3-15/37553
- """
- def __init__(self,
- quit_qapp: bool = True,
- handle_sigint: bool = False) -> None:
- super().__init__()
- self._application = QCoreApplication.instance() or QCoreApplication()
- # Configure whether the QCoreApplication at the core of QtAsyncio
- # should be shut down when asyncio finishes. A special case where one
- # would want to disable this is test suites that want to reuse a single
- # QCoreApplication instance across all unit tests, which would fail if
- # this instance is shut down every time.
- self._quit_qapp = quit_qapp
- self._event_loop: asyncio.AbstractEventLoop | None = None
- if handle_sigint:
- signal.signal(signal.SIGINT, signal.SIG_DFL)
- def get_event_loop(self) -> asyncio.AbstractEventLoop:
- if self._event_loop is None:
- self._event_loop = QAsyncioEventLoop(self._application, quit_qapp=self._quit_qapp)
- return self._event_loop
- def set_event_loop(self, loop: asyncio.AbstractEventLoop | None) -> None:
- self._event_loop = loop
- def new_event_loop(self) -> asyncio.AbstractEventLoop:
- return QAsyncioEventLoop(self._application, quit_qapp=self._quit_qapp)
- def get_child_watcher(self) -> "asyncio.AbstractChildWatcher":
- raise DeprecationWarning("Child watchers are deprecated since Python 3.12")
- def set_child_watcher(self, watcher: "asyncio.AbstractChildWatcher") -> None:
- raise DeprecationWarning("Child watchers are deprecated since Python 3.12")
- class QAsyncioEventLoop(asyncio.BaseEventLoop, QObject):
- """
- Implements the asyncio API:
- https://docs.python.org/3/library/asyncio-eventloop.html
- """
- class ShutDownThread(QThread):
- """
- Used to shut down the default executor when calling
- shutdown_default_executor(). As the executor is a ThreadPoolExecutor,
- it must be shut down in a separate thread as all the threads from the
- thread pool must join, which we want to do without blocking the event
- loop.
- """
- def __init__(self, future: futures.QAsyncioFuture, loop: "QAsyncioEventLoop") -> None:
- super().__init__()
- self._future = future
- self._loop = loop
- self.started.connect(self.shutdown)
- def run(self) -> None:
- pass
- def shutdown(self) -> None:
- try:
- self._loop._default_executor.shutdown(wait=True)
- if not self._loop.is_closed():
- self._loop.call_soon_threadsafe(self._future.set_result, None)
- except Exception as e:
- if not self._loop.is_closed():
- self._loop.call_soon_threadsafe(self._future.set_exception, e)
- def __init__(self,
- application: QCoreApplication, quit_qapp: bool = True) -> None:
- asyncio.BaseEventLoop.__init__(self)
- QObject.__init__(self)
- self._application: QCoreApplication = application
- # Configure whether the QCoreApplication at the core of QtAsyncio
- # should be shut down when asyncio finishes. A special case where one
- # would want to disable this is test suites that want to reuse a single
- # QCoreApplication instance across all unit tests, which would fail if
- # this instance is shut down every time.
- self._quit_qapp = quit_qapp
- self._thread = QThread.currentThread()
- self._closed = False
- # These two flags are used to determine whether the loop was stopped
- # from inside the loop (i.e., coroutine or callback called stop()) or
- # from outside the loop (i.e., the QApplication is being shut down, for
- # example, by the user closing the window or by calling
- # QApplication.quit()). The different cases can trigger slightly
- # different behaviors (see the comments where the flags are used).
- # There are two variables for this as in a third case the loop is still
- # running and both flags are False.
- self._quit_from_inside = False
- self._quit_from_outside = False
- # A set of all asynchronous generators that are currently running.
- self._asyncgens: set[collections.abc.AsyncGenerator] = set()
- # Starting with Python 3.11, this must be an instance of
- # ThreadPoolExecutor.
- self._default_executor = concurrent.futures.ThreadPoolExecutor()
- # The exception handler, if set with set_exception_handler(). The
- # exception handler is currently called in two places: One, if an
- # asynchonrous generator raises an exception when closed, and two, if
- # an exception is raised during the execution of a task. Currently, the
- # default exception handler just prints the exception to the console.
- self._exception_handler: Callable | None = self.default_exception_handler
- # The task factory, if set with set_task_factory(). Otherwise, a new
- # task is created with the QAsyncioTask constructor.
- self._task_factory: Callable | None = None
- # The future that is currently being awaited with run_until_complete().
- self._future_to_complete: futures.QAsyncioFuture | None = None
- self._debug = bool(os.getenv("PYTHONASYNCIODEBUG", False))
- self._application.aboutToQuit.connect(self._about_to_quit_cb)
- # Running and stopping the loop
- def _run_until_complete_cb(self, future: futures.QAsyncioFuture) -> None:
- """
- A callback that stops the loop when the future is done, used when
- running the loop with run_until_complete().
- """
- if not future.cancelled():
- if isinstance(future.exception(), (SystemExit, KeyboardInterrupt)):
- return
- future.get_loop().stop()
- def run_until_complete(self,
- future: futures.QAsyncioFuture) -> Any: # type: ignore[override]
- if self.is_closed():
- raise RuntimeError("Event loop is closed")
- if self.is_running():
- raise RuntimeError("Event loop is already running")
- arg_was_coro = not asyncio.futures.isfuture(future)
- future = asyncio.tasks.ensure_future(future, loop=self) # type: ignore[assignment]
- future.add_done_callback(self._run_until_complete_cb)
- self._future_to_complete = future
- try:
- self.run_forever()
- except Exception as e:
- if arg_was_coro and future.done() and not future.cancelled():
- future.exception()
- raise e
- finally:
- future.remove_done_callback(self._run_until_complete_cb)
- if not future.done():
- raise RuntimeError("Event loop stopped before Future completed")
- return future.result()
- def run_forever(self) -> None:
- if self.is_closed():
- raise RuntimeError("Event loop is closed")
- if self.is_running():
- raise RuntimeError("Event loop is already running")
- asyncio.events._set_running_loop(self)
- self._application.exec()
- asyncio.events._set_running_loop(None)
- def _about_to_quit_cb(self):
- """ A callback for the aboutToQuit signal of the QCoreApplication. """
- if not self._quit_from_inside:
- # If the aboutToQuit signal is emitted, the user is closing the
- # application window or calling QApplication.quit(). In this case,
- # we want to close the event loop, and we consider this a quit from
- # outside the loop.
- self._quit_from_outside = True
- self.close()
- def stop(self) -> None:
- if self._future_to_complete is not None:
- if self._future_to_complete.done():
- self._future_to_complete = None
- else:
- # Do not stop the loop if there is a future still being awaited
- # with run_until_complete().
- return
- self._quit_from_inside = True
- # The user might want to keep the QApplication running after the event
- # event loop finishes, which they can control with the quit_qapp
- # argument.
- if self._quit_qapp:
- self._application.quit()
- def is_running(self) -> bool:
- return self._thread.loopLevel() > 0
- def is_closed(self) -> bool:
- return self._closed
- def close(self) -> None:
- if self.is_running() and not self._quit_from_outside:
- raise RuntimeError("Cannot close a running event loop")
- if self.is_closed():
- return
- if self._default_executor is not None:
- self._default_executor.shutdown(wait=False)
- self._closed = True
- async def shutdown_asyncgens(self) -> None:
- if not len(self._asyncgens):
- return
- results = await asyncio.tasks.gather(
- *[asyncgen.aclose() for asyncgen in self._asyncgens],
- return_exceptions=True)
- for result, asyncgen in zip(results, self._asyncgens):
- if isinstance(result, Exception):
- self.call_exception_handler({
- "message": f"Closing asynchronous generator {asyncgen}"
- f"raised an exception",
- "exception": result,
- "asyncgen": asyncgen})
- self._asyncgens.clear()
- async def shutdown_default_executor(self, # type: ignore[override]
- timeout: int | float | None = None) -> None:
- shutdown_successful = False
- if timeout is not None:
- deadline_timer = QDeadlineTimer(int(timeout * 1000))
- else:
- deadline_timer = QDeadlineTimer(QDeadlineTimer.ForeverConstant.Forever)
- if self._default_executor is None:
- return
- future = self.create_future()
- thread = QAsyncioEventLoop.ShutDownThread(future, self)
- thread.start()
- try:
- await future
- finally:
- shutdown_successful = thread.wait(deadline_timer)
- if timeout is not None and not shutdown_successful:
- warnings.warn(
- f"Could not shutdown the default executor within {timeout} seconds",
- RuntimeWarning, stacklevel=2)
- self._default_executor.shutdown(wait=False)
- # Scheduling callbacks
- def _call_soon_impl(self, callback: Callable[[Unpack[_Ts]], object], *args: Unpack[_Ts],
- context: Context | None = None,
- is_threadsafe: bool | None = False) -> asyncio.Handle:
- return self._call_later_impl(0, callback, *args, context=context,
- is_threadsafe=is_threadsafe)
- def call_soon(self, callback: Callable[[Unpack[_Ts]], object], *args: Unpack[_Ts],
- context: Context | None = None) -> asyncio.Handle:
- return self._call_soon_impl(callback, *args, context=context, is_threadsafe=False)
- def call_soon_threadsafe(self, callback: Callable[[Unpack[_Ts]], object], *args: Unpack[_Ts],
- context: Context | None = None) -> asyncio.Handle:
- if self.is_closed():
- raise RuntimeError("Event loop is closed")
- if context is None:
- context = contextvars.copy_context()
- return self._call_soon_impl(callback, *args, context=context, is_threadsafe=True)
- def _call_later_impl(self, delay: float, callback: Callable[[Unpack[_Ts]], object],
- *args: Unpack[_Ts], context: Context | None = None,
- is_threadsafe: bool | None = False) -> asyncio.TimerHandle:
- if not isinstance(delay, (int, float)):
- raise TypeError("delay must be an int or float")
- return self._call_at_impl(self.time() + delay, callback, *args,
- context=context, is_threadsafe=is_threadsafe)
- def call_later(self, delay: float, callback: Callable[[Unpack[_Ts]], object],
- *args: Unpack[_Ts], context: Context | None = None) -> asyncio.TimerHandle:
- return self._call_later_impl(delay, callback, *args, context=context, is_threadsafe=False)
- def _call_at_impl(self, when: float, callback: Callable[[Unpack[_Ts]], object],
- *args: Unpack[_Ts], context: Context | None = None,
- is_threadsafe: bool | None = False) -> asyncio.TimerHandle:
- """ All call_at() and call_later() methods map to this method. """
- if not isinstance(when, (int, float)):
- raise TypeError("when must be an int or float")
- return QAsyncioTimerHandle(when, callback, args, self, context, is_threadsafe=is_threadsafe)
- def call_at(self, when: float, callback: Callable[[Unpack[_Ts]], object],
- *args: Unpack[_Ts], context: Context | None = None) -> asyncio.TimerHandle:
- return self._call_at_impl(when, callback, *args, context=context, is_threadsafe=False)
- def time(self) -> float:
- return QDateTime.currentMSecsSinceEpoch() / 1000.0
- # Creating Futures and Tasks
- def create_future(self) -> futures.QAsyncioFuture: # type: ignore[override]
- return futures.QAsyncioFuture(loop=self)
- def create_task(self, # type: ignore[override]
- coro: collections.abc.Generator | collections.abc.Coroutine,
- *, name: str | None = None,
- context: contextvars.Context | None = None) -> tasks.QAsyncioTask:
- if self._task_factory is None:
- task = tasks.QAsyncioTask(coro, loop=self, name=name, context=context)
- else:
- task = self._task_factory(self, coro, context=context)
- task.set_name(name)
- return task
- def set_task_factory(self, factory: Callable | None) -> None:
- if factory is not None and not callable(factory):
- raise TypeError("The task factory must be a callable or None")
- self._task_factory = factory
- def get_task_factory(self) -> Callable | None:
- return self._task_factory
- # Opening network connections
- async def create_connection(
- self, protocol_factory, host=None, port=None,
- *, ssl=None, family=0, proto=0,
- flags=0, sock=None, local_addr=None,
- server_hostname=None,
- ssl_handshake_timeout=None,
- ssl_shutdown_timeout=None,
- happy_eyeballs_delay=None, interleave=None):
- raise NotImplementedError("QAsyncioEventLoop.create_connection() is not implemented yet")
- async def create_datagram_endpoint(self, protocol_factory,
- local_addr=None, remote_addr=None, *,
- family=0, proto=0, flags=0,
- reuse_address=None, reuse_port=None,
- allow_broadcast=None, sock=None):
- raise NotImplementedError(
- "QAsyncioEventLoop.create_datagram_endpoint() is not implemented yet")
- async def create_unix_connection(
- self, protocol_factory, path=None, *,
- ssl=None, sock=None,
- server_hostname=None,
- ssl_handshake_timeout=None,
- ssl_shutdown_timeout=None):
- raise NotImplementedError(
- "QAsyncioEventLoop.create_unix_connection() is not implemented yet")
- # Creating network servers
- async def create_server(
- self, protocol_factory, host=None, port=None,
- *, family=socket.AF_UNSPEC,
- flags=socket.AI_PASSIVE, sock=None, backlog=100,
- ssl=None, reuse_address=None, reuse_port=None,
- ssl_handshake_timeout=None,
- ssl_shutdown_timeout=None,
- start_serving=True):
- raise NotImplementedError("QAsyncioEventLoop.create_server() is not implemented yet")
- async def create_unix_server(
- self, protocol_factory, path=None, *,
- sock=None, backlog=100, ssl=None,
- ssl_handshake_timeout=None,
- ssl_shutdown_timeout=None,
- start_serving=True):
- raise NotImplementedError("QAsyncioEventLoop.create_unix_server() is not implemented yet")
- async def connect_accepted_socket(
- self, protocol_factory, sock,
- *, ssl=None,
- ssl_handshake_timeout=None,
- ssl_shutdown_timeout=None):
- raise NotImplementedError(
- "QAsyncioEventLoop.connect_accepted_socket() is not implemented yet")
- # Transferring files
- async def sendfile(self, transport, file, offset=0, count=None,
- *, fallback=True):
- raise NotImplementedError("QAsyncioEventLoop.sendfile() is not implemented yet")
- # TLS Upgrade
- async def start_tls(self, transport, protocol, sslcontext, *,
- server_side=False,
- server_hostname=None,
- ssl_handshake_timeout=None,
- ssl_shutdown_timeout=None):
- raise NotImplementedError("QAsyncioEventLoop.start_tls() is not implemented yet")
- # Watching file descriptors
- def add_reader(self, fd, callback, *args):
- raise NotImplementedError("QAsyncioEventLoop.add_reader() is not implemented yet")
- def remove_reader(self, fd):
- raise NotImplementedError("QAsyncioEventLoop.remove_reader() is not implemented yet")
- def add_writer(self, fd, callback, *args):
- raise NotImplementedError("QAsyncioEventLoop.add_writer() is not implemented yet")
- def remove_writer(self, fd):
- raise NotImplementedError("QAsyncioEventLoop.remove_writer() is not implemented yet")
- # Working with socket objects directly
- async def sock_recv(self, sock, nbytes):
- raise NotImplementedError("QAsyncioEventLoop.sock_recv() is not implemented yet")
- async def sock_recv_into(self, sock, buf):
- raise NotImplementedError("QAsyncioEventLoop.sock_recv_into() is not implemented yet")
- async def sock_recvfrom(self, sock, bufsize):
- raise NotImplementedError("QAsyncioEventLoop.sock_recvfrom() is not implemented yet")
- async def sock_recvfrom_into(self, sock, buf, nbytes=0):
- raise NotImplementedError("QAsyncioEventLoop.sock_recvfrom_into() is not implemented yet")
- async def sock_sendall(self, sock, data):
- raise NotImplementedError("QAsyncioEventLoop.sock_sendall() is not implemented yet")
- async def sock_sendto(self, sock, data, address):
- raise NotImplementedError("QAsyncioEventLoop.sock_sendto() is not implemented yet")
- async def sock_connect(self, sock, address):
- raise NotImplementedError("QAsyncioEventLoop.sock_connect() is not implemented yet")
- async def sock_accept(self, sock):
- raise NotImplementedError("QAsyncioEventLoop.sock_accept() is not implemented yet")
- async def sock_sendfile(self, sock, file, offset=0, count=None, *,
- fallback=None):
- raise NotImplementedError("QAsyncioEventLoop.sock_sendfile() is not implemented yet")
- # DNS
- async def getaddrinfo(self, host, port, *,
- family=0, type=0, proto=0, flags=0):
- raise NotImplementedError("QAsyncioEventLoop.getaddrinfo() is not implemented yet")
- async def getnameinfo(self, sockaddr, flags=0):
- raise NotImplementedError("QAsyncioEventLoop.getnameinfo() is not implemented yet")
- # Working with pipes
- async def connect_read_pipe(self, protocol_factory, pipe):
- raise NotImplementedError("QAsyncioEventLoop.connect_read_pipe() is not implemented yet")
- async def connect_write_pipe(self, protocol_factory, pipe):
- raise NotImplementedError("QAsyncioEventLoop.connect_write_pipe() is not implemented yet")
- # Unix signals
- def add_signal_handler(self, sig, callback, *args):
- raise NotImplementedError("QAsyncioEventLoop.add_signal_handler() is not implemented yet")
- def remove_signal_handler(self, sig):
- raise NotImplementedError(
- "QAsyncioEventLoop.remove_signal_handler() is not implemented yet")
- # Executing code in thread or process pools
- def run_in_executor(self, executor: concurrent.futures.ThreadPoolExecutor | None,
- func: Callable[[Unpack[_Ts]], _T],
- *args: Unpack[_Ts]) -> asyncio.Future[_T]:
- if self.is_closed():
- raise RuntimeError("Event loop is closed")
- if executor is None:
- executor = self._default_executor
- # Executors require a bit of extra work for QtAsyncio, as we can't use
- # naked Python threads; instead, we must make sure that the thread
- # created by executor.submit() has an event loop. This is achieved by
- # not submitting the callable directly, but a small wrapper that
- # attaches a QEventLoop to the executor thread, and then pushes the
- # actual callable for the executor into this new event loop.
- wrapper = QAsyncioExecutorWrapper(func, *args)
- return asyncio.futures.wrap_future(executor.submit(wrapper.do), loop=self)
- def set_default_executor(self,
- executor: concurrent.futures.ThreadPoolExecutor | None) -> None:
- if not isinstance(executor, concurrent.futures.ThreadPoolExecutor):
- raise TypeError("The executor must be a ThreadPoolExecutor")
- self._default_executor = executor
- # Error Handling API
- def set_exception_handler(self, handler: Callable | None) -> None:
- if handler is not None and not callable(handler):
- raise TypeError("The handler must be a callable or None")
- self._exception_handler = handler
- def get_exception_handler(self) -> Callable | None:
- return self._exception_handler
- def default_exception_handler(self, context: dict[str, Any]) -> None:
- # TODO
- if context["message"]:
- print(f"{context['message']} from task {context['task']._name},"
- "read the following traceback:")
- print(context["traceback"])
- def call_exception_handler(self, context: dict[str, Any]) -> None:
- if self._exception_handler is not None:
- self._exception_handler(context)
- # Enabling debug mode
- def get_debug(self) -> bool:
- # TODO: Part of the asyncio API but currently unused. More details:
- # https://docs.python.org/3/library/asyncio-dev.html#asyncio-debug-mode
- return self._debug
- def set_debug(self, enabled: bool) -> None:
- self._debug = enabled
- # Running subprocesses
- async def subprocess_exec(self, protocol_factory, *args,
- stdin=subprocess.PIPE,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- **kwargs):
- raise NotImplementedError("QAsyncioEventLoop.subprocess_exec() is not implemented yet")
- async def subprocess_shell(self, protocol_factory, cmd, *,
- stdin=subprocess.PIPE,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- **kwargs):
- raise NotImplementedError("QAsyncioEventLoop.subprocess_shell() is not implemented yet")
- class QAsyncioHandle():
- """
- The handle enqueues a callback to be executed by the event loop, and allows
- for this callback to be cancelled before it is executed. This callback will
- typically execute the step function for a task. This makes the handle one
- of the main components of asyncio.
- """
- class HandleState(enum.Enum):
- PENDING = enum.auto()
- CANCELLED = enum.auto()
- DONE = enum.auto()
- def __init__(self, callback: Callable, args: tuple,
- loop: QAsyncioEventLoop, context: contextvars.Context | None,
- is_threadsafe: bool | None = False) -> None:
- self._callback = callback
- self._cb_args = args # renamed from _args to avoid conflict with TimerHandle._args
- self._loop = loop
- self._context = context
- self._is_threadsafe = is_threadsafe
- self._timeout = 0
- self._state = QAsyncioHandle.HandleState.PENDING
- self._start()
- def _start(self) -> None:
- self._schedule_event(self._timeout, lambda: self._cb())
- def _schedule_event(self, timeout: int, func: Callable) -> None:
- # Do not schedule events from asyncio when the app is quit from outside
- # the event loop, as this would cause events to be enqueued after the
- # event loop was destroyed.
- if not self._loop.is_closed() and not self._loop._quit_from_outside:
- if self._is_threadsafe:
- # This singleShot overload will push func into self._loop
- # instead of the current thread's loop. This allows scheduling
- # a callback from a different thread, which is necessary for
- # thread-safety.
- # https://docs.python.org/3/library/asyncio-dev.html#asyncio-multithreading
- QTimer.singleShot(timeout, self._loop, func)
- else:
- QTimer.singleShot(timeout, func)
- @Slot()
- def _cb(self) -> None:
- """
- A slot, enqueued into the event loop, that wraps around the actual
- callback, typically the step function of a task.
- """
- if self._state == QAsyncioHandle.HandleState.PENDING:
- if self._context is not None:
- self._context.run(self._callback, *self._cb_args)
- else:
- self._callback(*self._cb_args)
- self._state = QAsyncioHandle.HandleState.DONE
- def cancel(self) -> None:
- if self._state == QAsyncioHandle.HandleState.PENDING:
- # The old timer that was created in _start will still trigger but
- # _cb won't do anything, therefore the callback is effectively
- # cancelled.
- self._state = QAsyncioHandle.HandleState.CANCELLED
- def cancelled(self) -> bool:
- return self._state == QAsyncioHandle.HandleState.CANCELLED
- class QAsyncioTimerHandle(QAsyncioHandle, asyncio.TimerHandle):
- def __init__(self, when: float, callback: Callable, args: tuple,
- loop: QAsyncioEventLoop, context: contextvars.Context | None,
- is_threadsafe: bool | None = False) -> None:
- QAsyncioHandle.__init__(self, callback, args, loop, context, is_threadsafe)
- self._when = when
- time = self._loop.time()
- # PYSIDE-2644: Timeouts should be rounded up or down instead of only up
- # as happens with int(). Otherwise, a timeout of e.g. 0.9 would be
- # handled as 0, where 1 would be more appropriate.
- self._timeout = round(max(self._when - time, 0) * 1000)
- QAsyncioHandle._start(self)
- def _start(self) -> None:
- """
- Overridden so that timer.start() is only called once at the end of the
- constructor for both QtHandle and QtTimerHandle.
- """
- pass
- def when(self) -> float:
- return self._when
|