pyrevs.runner.taskrunner

Exceptions

RunnerError

Exception class for the runner.

Classes

WorkerLoggerPlugin

A plugin to configure logging on each worker.

BaseRunner

An ABC for the task runners.

AsIORunner

A task runner class based on asyncIO.

DaskRunner

A task runner class based on Dask.

Functions

make_runner(→ BaseRunner)

Factory that instantiates a configured runner.

Module Contents

class WorkerLoggerPlugin(loglevel: str, logfile: str | None)[source]

Bases: dask.distributed.WorkerPlugin

A plugin to configure logging on each worker.

setup(worker: dask.distributed.Worker) None[source]

Configure logging on the worker.

Parameters:

worker – the dask worker

exception RunnerError[source]

Bases: Exception

Exception class for the runner.

class BaseRunner(runner_cfg: pyrevs.runner.config.RunnerConfig, worker_fn: collections.abc.Callable, n_workers: int = 1, loglevel: str = 'INFO', logfile: str | None = None)[source]

An ABC for the task runners.

abstractmethod make_promise(task: list[Any]) None[source]

Log a new task to the list of task to tackle.

abstractmethod execute_promises() Any[source]

Execute the list of promises.

abstractmethod n_workers() int[source]

Return the number of workers in the runner.

class AsIORunner(runner_cfg: pyrevs.runner.config.RunnerConfig, worker_fn: collections.abc.Callable, n_workers: int = 1, loglevel: str = 'INFO', logfile: str | None = None)[source]

Bases: BaseRunner

A task runner class based on asyncIO.

An runner that relies on asyncio to schedule tasks concurrently in worker processes. Tasks are added to an internal queue from which worker can take them and put the results back into result queue.

Variables:
  • _queue – an asyncio.Queue() to place the tasks in

  • _rqueue – an asyncio.Queue() where the results are returned

  • _n_workers – the number of workers in the runner

  • _sync_worker – the synchrone worker function

  • _async_worker – the asynchrone worker function

  • _loop – the event loop associated with the workers

  • _executor – an executor for the worker to work in

  • _workers – a list of worker tasks

async add_task(task: list[Any]) None[source]

Append a task to the queue.

make_promise(task: list[Any]) None[source]

A synchronous wrapper to add_task.

async run_tasks() list[Any][source]

Create worker tasks and run.

Initialize the executor and setup the workers (tasks) if not already done. The join() task is created seperately and awaited with the others in order to catch any exception coming from the workers as they are generated and stop everything as soon as one task fails.

execute_promises() Any[source]

A synchronous wrapper to run_tasks.

n_workers() int[source]

Return the number of workers in the runner.

class DaskRunner(runner_cfg: pyrevs.runner.config.RunnerConfig, worker_fn: collections.abc.Callable, n_workers: int = 1, loglevel: str = 'INFO', logfile: str | None = None)[source]

Bases: BaseRunner

A task runner class based on Dask.

An runner that relies on dask to schedule a tasks concurrently in workers.

make_promise(task: list[Any]) None[source]

Append a task to the internal task list.

just_delay(obj: Any) Any[source]

Delay an object.

execute_promises() Any[source]

Execute a list of promises.

Parameters:

list_of_p – a list of dask promises

Returns:

A list with the return argument of each promised task.

Raises:

Exception if compute fails (raise internal error)

n_workers() int[source]

Return the number of workers in the runner.

make_runner(runner_cfg: pyrevs.runner.config.RunnerConfig, worker_fn: collections.abc.Callable, loglevel: str = 'INFO', logfile: str | None = None, max_workers: int = -1) BaseRunner[source]

Factory that instantiates a configured runner.

Parameters:
  • runner_cfg – a config mapping for the runner

  • worker_fn – a worker function

  • loglevel – logging level

  • logfile – logging file

  • max_workers – maximum number of workers