pyrevs.runner.taskrunner ======================== .. py:module:: pyrevs.runner.taskrunner Exceptions ---------- .. autoapisummary:: pyrevs.runner.taskrunner.RunnerError Classes ------- .. autoapisummary:: pyrevs.runner.taskrunner.WorkerLoggerPlugin pyrevs.runner.taskrunner.BaseRunner pyrevs.runner.taskrunner.AsIORunner pyrevs.runner.taskrunner.DaskRunner Functions --------- .. autoapisummary:: pyrevs.runner.taskrunner.make_runner Module Contents --------------- .. py:class:: WorkerLoggerPlugin(loglevel: str, logfile: str | None) Bases: :py:obj:`dask.distributed.WorkerPlugin` A plugin to configure logging on each worker. .. py:method:: setup(worker: dask.distributed.Worker) -> None Configure logging on the worker. :param worker: the dask worker .. py:exception:: RunnerError Bases: :py:obj:`Exception` Exception class for the runner. .. py:class:: BaseRunner(runner_cfg: pyrevs.runner.config.RunnerConfig, worker_fn: collections.abc.Callable, n_workers: int = 1, loglevel: str = 'INFO', logfile: str | None = None) An ABC for the task runners. .. py:method:: make_promise(task: list[Any]) -> None :abstractmethod: Log a new task to the list of task to tackle. .. py:method:: execute_promises() -> Any :abstractmethod: Execute the list of promises. .. py:method:: n_workers() -> int :abstractmethod: Return the number of workers in the runner. .. py:class:: AsIORunner(runner_cfg: pyrevs.runner.config.RunnerConfig, worker_fn: collections.abc.Callable, n_workers: int = 1, loglevel: str = 'INFO', logfile: str | None = None) Bases: :py:obj:`BaseRunner` A task runner class based on asyncIO. An runner that relies on asyncio to schedule tasks concurrently in worker processes. Tasks are added to an internal queue from which worker can take them and put the results back into result queue. :ivar _queue: an asyncio.Queue() to place the tasks in :ivar _rqueue: an asyncio.Queue() where the results are returned :ivar _n_workers: the number of workers in the runner :ivar _sync_worker: the synchrone worker function :ivar _async_worker: the asynchrone worker function :ivar _loop: the event loop associated with the workers :ivar _executor: an executor for the worker to work in :ivar _workers: a list of worker tasks .. py:method:: add_task(task: list[Any]) -> None :async: Append a task to the queue. .. py:method:: make_promise(task: list[Any]) -> None A synchronous wrapper to add_task. .. py:method:: run_tasks() -> list[Any] :async: Create worker tasks and run. Initialize the executor and setup the workers (tasks) if not already done. The join() task is created seperately and awaited with the others in order to catch any exception coming from the workers as they are generated and stop everything as soon as one task fails. .. py:method:: execute_promises() -> Any A synchronous wrapper to run_tasks. .. py:method:: n_workers() -> int Return the number of workers in the runner. .. py:class:: DaskRunner(runner_cfg: pyrevs.runner.config.RunnerConfig, worker_fn: collections.abc.Callable, n_workers: int = 1, loglevel: str = 'INFO', logfile: str | None = None) Bases: :py:obj:`BaseRunner` A task runner class based on Dask. An runner that relies on dask to schedule a tasks concurrently in workers. .. py:method:: make_promise(task: list[Any]) -> None Append a task to the internal task list. .. py:method:: just_delay(obj: Any) -> Any Delay an object. .. py:method:: execute_promises() -> Any Execute a list of promises. :param list_of_p: a list of dask promises :returns: A list with the return argument of each promised task. :raises Exception if compute fails (raise internal error): .. py:method:: n_workers() -> int Return the number of workers in the runner. .. py:function:: make_runner(runner_cfg: pyrevs.runner.config.RunnerConfig, worker_fn: collections.abc.Callable, loglevel: str = 'INFO', logfile: str | None = None, max_workers: int = -1) -> BaseRunner Factory that instantiates a configured runner. :param runner_cfg: a config mapping for the runner :param worker_fn: a worker function :param loglevel: logging level :param logfile: logging file :param max_workers: maximum number of workers