pyrevs.core¶
Submodules¶
Classes¶
Lightweight structured access to configuration. |
|
A merge policy for configuration dataclasses. |
|
A base class for the stochastic forward model. |
|
Runtime configuration. |
|
A dataclass defining a snapshot. |
|
A base class for the tables. |
|
A database holding pyREVS trajectories. |
Package Contents¶
- class Config(data: collections.abc.Mapping[str, Any])[source]¶
Lightweight structured access to configuration.
pyREVS input parameters are mostly handled through dataclasses to ensure default values are available and types are enforced: TOML file -> Config -> Typed dataclass.
The typed dataclass are the final object used to instanciate pyREVS objects (Sampler, Database, Trajectories, …)
The Config class provides a simple interface to access configuration parameters in a more structured way. To add a new section to the TOML input file, create a new dataclass with a __section__ class attribute set to the name of the section.
- class MergePolicy[source]¶
A merge policy for configuration dataclasses.
- IMMUTABLE = 'immutable'¶
- REPLACE = 'replace'¶
- class ForwardModelBaseClass(a_id: int, deterministic: bool, params: dict[str, Any] | None = None, workdir: pathlib.Path | None = None)[source]¶
Bases:
abc.ABC,Generic[T_Noise,T_State]A base class for the stochastic forward model.
pyREVS relies on a separation of the stochastic model, encapsulating the physics of interest, and the sampling algorithm itself. The ForwardModelBaseClass defines the API the sampling algorithm requires from the stochastic model.
Concrete model classes must implement all the abstract functions defined in this base class.
The base class handles some components needed by pyREVS, so that the user does not have to ensure compatibility with pyREVS requirements.
- Variables:
_noise – the noise to be used in the next model step
_step – the current stochastic step counter
_time – the current stochastic time
_workdir – the working directory
- advance(dt: float, need_end_state: bool) float[source]¶
Base class advance function of the model.
This is the advance function called by pyREVS internals. It handles updating the model time and step counter, as well as reusing or generating noise only when needed. It also handles exceptions.
- Parameters:
dt – the time step size over which to advance
need_end_state – whether the step end state is needed
- Returns:
Some model will not do exactly dt (e.g. sub-stepping) return the actual dt
- set_workdir(workdir: pathlib.Path) None[source]¶
Setter of the model working directory.
- Parameters:
workdir – the new working directory
- abstractmethod get_current_state() T_State[source]¶
Return the current state of the model.
Note that the return type is left to the concrete model definition.
- abstractmethod set_current_state(state: T_State) None[source]¶
Set the current state of the model.
- Parameters:
state – the externally provide state
- abstractmethod score() float[source]¶
Return the model’s current state score.
The score is a real.
- Returns:
the score associated with the current model state
- abstractmethod make_noise() T_Noise[source]¶
Return the model’s latest noise increment.
Note that the noise type is left to the concrete model definition.
- Returns:
The model next noise increment
- post_trajectory_branching_hook(step: int, time: float) None[source]¶
Model post trajectory branching hook.
- Parameters:
step – the current step counter
time – the time of the simulation
- post_trajectory_restore_hook(step: int, time: float) None[source]¶
Model post trajectory restore hook.
- Parameters:
step – the current step counter
time – the time of the simulation
- abstractmethod diagnostic_hook(dlabel: str, tid: int, score_level: float, old_snap: pyrevs.core.snapshot.Snapshot[T_Noise, T_State], new_snap: pyrevs.core.snapshot.Snapshot[T_Noise, T_State]) Any[source]¶
Diagnostic hook.
- Parameters:
dlabel – the label of the diagnostic calling the hook
tid – the ID of the trjaectory calling
score_level – the score level crossed and triggering the call
old_snap – the snapshot at the beginning of the step
new_snap – the snapshot at the end of the step
- check_convergence(step: int, time: float, current_score: float, target_score: float) bool[source]¶
Check if the model has converged.
This default implementation checks if the current score is greater than or equal to the target score. The user can override this method to implement a different convergence criterion.
- Parameters:
step – the current step counter
time – the time of the simulation
current_score – the current score
target_score – the target score
- check_termination(step: int, time: float) bool[source]¶
Check for trajectory termination.
This default always return False. The user can override this method to implement a different termination criterion. Note that simple termination criteria (e.g. end_time, score_min) are handled elsewhere. This function should be used for more complex termination criteria, e.g. entering a given region of the model phase space.
- Parameters:
step – the current step counter
time – the time of the simulation
- class Snapshot[source]¶
Bases:
Generic[T_Noise,T_State]A dataclass defining a snapshot.
Gathering what defines a snapshot into an object. The time and score are of float type, but the actual type of the noise and state are completely determined by the forward model. A snapshot is allowed to have a state or not to accommodate memory savings.
- Variables:
time – snapshot time
score – score function value
noise – noise used to reach this snapshot
state – model state
- class CoreDB(file_name: str, in_memory: bool = False, ro_mode: bool = False)[source]¶
Bases:
pyrevs.core.sqlmanager.BaseSQLManagerA database holding pyREVS trajectories.
Allows atomic access to an SQL database from all the workers.
Note: pyREVS works with Python indexing starting at 0, while SQL indexing starts at 1. Trajectory ID is updated accordingly when accessing/updating the DB.
- Variables:
_file_name – The file name
- add_trajectory(traj_file: str, metadata: dict) None[source]¶
Add a new trajectory to the DB.
- Parameters:
traj_file – The trajectory file of that trajectory
metadata – a dict with the metadata
- Raises:
SQLAlchemyError if the DB could not be accessed –
- update_trajectory(traj_id: int, traj_file: str, metadata: dict) None[source]¶
Update a given trajectory data in the DB.
- Parameters:
traj_id – The trajectory id
traj_file – The new trajectory file of that trajectory
metadata – a dict with the trajectory metadata
- Raises:
SQLAlchemyError if the DB could not be accessed –
- update_trajectory_weight(traj_id: int, weight: float) None[source]¶
Update a given trajectory weight in the DB.
- Parameters:
traj_id – The trajectory id
weight – the new trajectory weight
- Raises:
SQLAlchemyError if the DB could not be accessed –
- lock_trajectory(traj_id: int, allow_completed_lock: bool = False) bool[source]¶
Set the status of a trajectory to “locked” if possible.
- Parameters:
traj_id – The trajectory id
allow_completed_lock – Allow to lock a “completed” trajectory
- Returns:
True if the trajectory was successfully locked, False otherwise
- Raises:
ValueError if the trajectory with the given id does not exist –
SQLAlchemyError if the DB could not be accessed –
- mark_trajectory_as_completed(traj_id: int) None[source]¶
Set the status of a trajectory to “completed” if possible.
- Parameters:
traj_id – The trajectory id
- Raises:
ValueError if the trajectory with the given id does not exist –
SQLAlchemyError if the DB could not be accessed –
- release_trajectory(traj_id: int) None[source]¶
Set the status of a trajectory to “idle” if possible.
- Parameters:
traj_id – The trajectory id
- Raises:
ValueError if the trajectory with the given id does not exist –
- get_trajectory_count() int[source]¶
Get the number of trajectories in the DB.
- Returns:
The number of trajectories
- get_terminated_trajectory_count() int[source]¶
Return the number of trajectories that have ‘terminated’ in their metadata.
- get_converged_trajectory_count() int[source]¶
Return the number of trajectories that have ‘converged’ in their metadata.
- get_total_computed_steps() int[source]¶
Sum the ‘nstep_compute’ field across all active and archived trajectories.
- fetch_trajectory(traj_id: int) tuple[str, dict][source]¶
Get the trajectory file of a trajectory.
- Parameters:
traj_id – The trajectory id
- Returns:
A tuple with trajectory file as a str and the trajectory metadata as dict
- Raises:
ValueError if the trajectory with the given id does not exist –
- check_trajectory_exist(traj_id: int) bool[source]¶
Check if a trajectory exist for a given index.
- Parameters:
traj_id – The trajectory id
- Returns:
True if the trajectory exist, False otherwise
- archive_trajectory(traj_file: str, metadata: dict) None[source]¶
Add a new trajectory to the archive container.
- Parameters:
traj_file – The trajectory file of that trajectory
metadata – a dict with the traj metadata
- fetch_archived_trajectory(traj_id: int) tuple[str, dict][source]¶
Get the trajectory file of a trajectory in the archive.
- Parameters:
traj_id – The trajectory id
- Returns:
A tuple with trajectory file as a str and the trajectory metadata as dict
- Raises:
ValueError if the trajectory with the given id does not exist –
- get_archived_trajectory_count() int[source]¶
Get the number of trajectories in the archive.
- Returns:
The number of trajectories