pyrevs.core

Submodules

Classes

Config

Lightweight structured access to configuration.

MergePolicy

A merge policy for configuration dataclasses.

ForwardModelBaseClass

A base class for the stochastic forward model.

RuntimeConfig

Runtime configuration.

Snapshot

A dataclass defining a snapshot.

CoreBase

A base class for the tables.

CoreDB

A database holding pyREVS trajectories.

Package Contents

class Config(data: collections.abc.Mapping[str, Any])[source]

Lightweight structured access to configuration.

pyREVS input parameters are mostly handled through dataclasses to ensure default values are available and types are enforced: TOML file -> Config -> Typed dataclass.

The typed dataclass are the final object used to instanciate pyREVS objects (Sampler, Database, Trajectories, …)

The Config class provides a simple interface to access configuration parameters in a more structured way. To add a new section to the TOML input file, create a new dataclass with a __section__ class attribute set to the name of the section.

section(name: str) Config[source]

Return a configuration section.

section_dict(name: str) dict[str, Any][source]

Return a section as dict.

require(section: str, key: str) Any[source]

Get a required parameter.

get(section: str, key: str, default: Any = None) Any[source]

Get an optional parameter.

as_dict() dict[str, Any][source]

Return the config Mapping as a dict.

load(cls: type[T]) T[source]

Load a dataclass of the provided type from this config section.

class MergePolicy[source]

A merge policy for configuration dataclasses.

IMMUTABLE = 'immutable'
REPLACE = 'replace'
class ForwardModelBaseClass(a_id: int, deterministic: bool, params: dict[str, Any] | None = None, workdir: pathlib.Path | None = None)[source]

Bases: abc.ABC, Generic[T_Noise, T_State]

A base class for the stochastic forward model.

pyREVS relies on a separation of the stochastic model, encapsulating the physics of interest, and the sampling algorithm itself. The ForwardModelBaseClass defines the API the sampling algorithm requires from the stochastic model.

Concrete model classes must implement all the abstract functions defined in this base class.

The base class handles some components needed by pyREVS, so that the user does not have to ensure compatibility with pyREVS requirements.

Variables:
  • _noise – the noise to be used in the next model step

  • _step – the current stochastic step counter

  • _time – the current stochastic time

  • _workdir – the working directory

advance(dt: float, need_end_state: bool) float[source]

Base class advance function of the model.

This is the advance function called by pyREVS internals. It handles updating the model time and step counter, as well as reusing or generating noise only when needed. It also handles exceptions.

Parameters:
  • dt – the time step size over which to advance

  • need_end_state – whether the step end state is needed

Returns:

Some model will not do exactly dt (e.g. sub-stepping) return the actual dt

property noise: T_Noise

Return the model’s latest noise increment.

clear() None[source]

Destroy internal data.

set_workdir(workdir: pathlib.Path) None[source]

Setter of the model working directory.

Parameters:

workdir – the new working directory

abstractmethod get_current_state() T_State[source]

Return the current state of the model.

Note that the return type is left to the concrete model definition.

abstractmethod set_current_state(state: T_State) None[source]

Set the current state of the model.

Parameters:

state – the externally provide state

abstractmethod score() float[source]

Return the model’s current state score.

The score is a real.

Returns:

the score associated with the current model state

abstractmethod make_noise() T_Noise[source]

Return the model’s latest noise increment.

Note that the noise type is left to the concrete model definition.

Returns:

The model next noise increment

post_trajectory_branching_hook(step: int, time: float) None[source]

Model post trajectory branching hook.

Parameters:
  • step – the current step counter

  • time – the time of the simulation

post_trajectory_restore_hook(step: int, time: float) None[source]

Model post trajectory restore hook.

Parameters:
  • step – the current step counter

  • time – the time of the simulation

abstractmethod diagnostic_hook(dlabel: str, tid: int, score_level: float, old_snap: pyrevs.core.snapshot.Snapshot[T_Noise, T_State], new_snap: pyrevs.core.snapshot.Snapshot[T_Noise, T_State]) Any[source]

Diagnostic hook.

Parameters:
  • dlabel – the label of the diagnostic calling the hook

  • tid – the ID of the trjaectory calling

  • score_level – the score level crossed and triggering the call

  • old_snap – the snapshot at the beginning of the step

  • new_snap – the snapshot at the end of the step

check_convergence(step: int, time: float, current_score: float, target_score: float) bool[source]

Check if the model has converged.

This default implementation checks if the current score is greater than or equal to the target score. The user can override this method to implement a different convergence criterion.

Parameters:
  • step – the current step counter

  • time – the time of the simulation

  • current_score – the current score

  • target_score – the target score

check_termination(step: int, time: float) bool[source]

Check for trajectory termination.

This default always return False. The user can override this method to implement a different termination criterion. Note that simple termination criteria (e.g. end_time, score_min) are handled elsewhere. This function should be used for more complex termination criteria, e.g. entering a given region of the model phase space.

Parameters:
  • step – the current step counter

  • time – the time of the simulation

classmethod name() str[source]

Return a the model name.

class RuntimeConfig[source]

Runtime configuration.

loglevel: str = 'INFO'
logfile: str | None = None
walltime: float = 86400
plot_diagnostics: bool = False
diagnostics: list[str] = []
class Snapshot[source]

Bases: Generic[T_Noise, T_State]

A dataclass defining a snapshot.

Gathering what defines a snapshot into an object. The time and score are of float type, but the actual type of the noise and state are completely determined by the forward model. A snapshot is allowed to have a state or not to accommodate memory savings.

Variables:
  • time – snapshot time

  • score – score function value

  • noise – noise used to reach this snapshot

  • state – model state

time: float
score: float
noise: T_Noise
state: T_State | None = None
property has_state: bool

Check if snapshot has state.

Returns:

True if state is not None

Return type:

bool

class CoreBase[source]

Bases: sqlalchemy.orm.DeclarativeBase

A base class for the tables.

class CoreDB(file_name: str, in_memory: bool = False, ro_mode: bool = False)[source]

Bases: pyrevs.core.sqlmanager.BaseSQLManager

A database holding pyREVS trajectories.

Allows atomic access to an SQL database from all the workers.

Note: pyREVS works with Python indexing starting at 0, while SQL indexing starts at 1. Trajectory ID is updated accordingly when accessing/updating the DB.

Variables:

_file_name – The file name

add_trajectory(traj_file: str, metadata: dict) None[source]

Add a new trajectory to the DB.

Parameters:
  • traj_file – The trajectory file of that trajectory

  • metadata – a dict with the metadata

Raises:

SQLAlchemyError if the DB could not be accessed

update_trajectory(traj_id: int, traj_file: str, metadata: dict) None[source]

Update a given trajectory data in the DB.

Parameters:
  • traj_id – The trajectory id

  • traj_file – The new trajectory file of that trajectory

  • metadata – a dict with the trajectory metadata

Raises:

SQLAlchemyError if the DB could not be accessed

update_trajectory_weight(traj_id: int, weight: float) None[source]

Update a given trajectory weight in the DB.

Parameters:
  • traj_id – The trajectory id

  • weight – the new trajectory weight

Raises:

SQLAlchemyError if the DB could not be accessed

lock_trajectory(traj_id: int, allow_completed_lock: bool = False) bool[source]

Set the status of a trajectory to “locked” if possible.

Parameters:
  • traj_id – The trajectory id

  • allow_completed_lock – Allow to lock a “completed” trajectory

Returns:

True if the trajectory was successfully locked, False otherwise

Raises:
  • ValueError if the trajectory with the given id does not exist

  • SQLAlchemyError if the DB could not be accessed

mark_trajectory_as_completed(traj_id: int) None[source]

Set the status of a trajectory to “completed” if possible.

Parameters:

traj_id – The trajectory id

Raises:
  • ValueError if the trajectory with the given id does not exist

  • SQLAlchemyError if the DB could not be accessed

release_trajectory(traj_id: int) None[source]

Set the status of a trajectory to “idle” if possible.

Parameters:

traj_id – The trajectory id

Raises:

ValueError if the trajectory with the given id does not exist

get_trajectory_count() int[source]

Get the number of trajectories in the DB.

Returns:

The number of trajectories

get_terminated_trajectory_count() int[source]

Return the number of trajectories that have ‘terminated’ in their metadata.

get_converged_trajectory_count() int[source]

Return the number of trajectories that have ‘converged’ in their metadata.

get_total_computed_steps() int[source]

Sum the ‘nstep_compute’ field across all active and archived trajectories.

fetch_trajectory(traj_id: int) tuple[str, dict][source]

Get the trajectory file of a trajectory.

Parameters:

traj_id – The trajectory id

Returns:

A tuple with trajectory file as a str and the trajectory metadata as dict

Raises:

ValueError if the trajectory with the given id does not exist

check_trajectory_exist(traj_id: int) bool[source]

Check if a trajectory exist for a given index.

Parameters:

traj_id – The trajectory id

Returns:

True if the trajectory exist, False otherwise

release_all_trajectories() None[source]

Release all trajectories in the DB.

archive_trajectory(traj_file: str, metadata: dict) None[source]

Add a new trajectory to the archive container.

Parameters:
  • traj_file – The trajectory file of that trajectory

  • metadata – a dict with the traj metadata

fetch_archived_trajectory(traj_id: int) tuple[str, dict][source]

Get the trajectory file of a trajectory in the archive.

Parameters:

traj_id – The trajectory id

Returns:

A tuple with trajectory file as a str and the trajectory metadata as dict

Raises:

ValueError if the trajectory with the given id does not exist

get_archived_trajectory_count() int[source]

Get the number of trajectories in the archive.

Returns:

The number of trajectories

clear_archived_trajectories() int[source]

Delete the content of the archived traj table.

Returns:

The number of entries deleted

dump_file_json(json_file: str | None = None) None[source]

Dump the content of the trajectory table to a json file.

Parameters:

json_file – an optional file name (or path) to dump the data to