pyrevs.diagnostics

Submodules

Classes

DiagDB

A database to keep track of the diagnostics data.

DiagnosticAnalyst

A class to handle analysing diagnostic statistics.

DiagnosticPlugin

A base class for diagnostic plugins.

Functions

diagnosticfactory(→ list[DiagnosticPlugin])

Parse input parameters to generate a list of DiagnosticPlugin.

Package Contents

class DiagDB(file_name: str | None = None, in_memory: bool = False, ro_mode: bool = False)[source]

Bases: pyrevs.core.sqlmanager.BaseSQLManager

A database to keep track of the diagnostics data.

Diagnostic entries are agregated in single table. Each entry is associated to a trajectory and its weight in the ensemble.

classmethod default_name() str[source]

Default name for the database file.

add_diagnostic_entry(diaglabel: str, traj_id: int, level: float, time: float, weight: float, ldata: bytes) None[source]

Atomic insert of a diagnostic snapshot.

The data schema assumes that any new addition to the database is made on an active trajectory.

Parameters:
  • diaglabel – the label of the diagnostic inserting the entry

  • traj_id – the ID of the traj adding the entry

  • level – the score level of the entry

  • time – the trajectory time at which the diagnostic was triggered

  • weight – the weight of the trajectory

  • ldata – the actual model data stored in the database

get_highest_recorded_level(traj_id: int, label: str) float[source]

Return the maximum level already recorded for this traj/label.

Parameters:
  • traj_id – the ID of a trajectory

  • label – the label of the diagnostic targeter

Returns:

the highest value of level_crossed

duplicate_diagnostic_history(ancestor_id: int, discarded_id: int, new_id: int, new_weight: float, threshold: float) int[source]

Copy diagnostic entries from an ancestor to a descendant.

Copies all entries where level_crossed <= threshold. Returns the number of entries duplicated.

The entries belonging to the discarded trajectory are set to inactive.

Parameters:
  • ancestor_id – the ID of the ancestor to copy data from

  • discarded_id – the ID of the discarded trajectory (during sampling iterations)

  • new_id – the ID of the new child trajectory

  • new_weight – the weight of the new child trajectory

  • threshold – the score threshold up to which copy must be performed

update_all_active_weights(new_weight: float) int[source]

Update all the active trajectories weight.

Parameters:

new_weight – the updated weight

Returns:

the number of trajectory updated

get_diagnostic_data(label: str) dict[float, list[tuple[Any, float, float, int]]][source]

Retrieve all diagnostic snapshots for a specific label.

Parameters:

label – the label of the diagnostic of interest

Returns:

A dictionary mapping each iso-level (float) to a list of tuples. Each tuple contains (unpickled_data, trajectory_weight, time, tid).

dump_to_json(json_path: str) None[source]

Export the entire diagnostic database to a JSON file.

Note that the content of the data stored in the database is omitted. Only the metadata of each stored data is dumpe for debuggin purposes.

close() None[source]

Dispose of the engine and clear connections.

class DiagnosticAnalyst(db_path: str)[source]

A class to handle analysing diagnostic statistics.

Let’s keep the analysis logic separated from the gathering logic. This class retrieves data from the diagnostic database and perform some computation (mostly conditional statistics on score iso-levels).

db
get_diagnostic_data(label: str) dict[float, list[tuple[Any, float, float, int]]][source]

A user-facing access to the diag DB.

An alias to the DB access for the analyst.

Returns:

A dictionary mapping each score iso-level (float) to a list of tuples. Each tuple contains (unpickled_data, trajectory_weight, time, tid).

compute_weighted_stats(label: str) dict[float, dict[str, Any]][source]

Aggregate data and compute mean/variance per level.

get_conditional_means(label: str) dict[float, dict[str, Any]][source]

Compute means at level L_i conditioned on reaching or not reaching L_{i+1}.

Returns:

{“reached_next”: mean_val, “failed_next”: mean_val} }

Return type:

{ level

class DiagnosticPlugin(dlabel: str, params: dict[Any, Any], tid: int, weight: float, workdir: pathlib.Path, fprocess: collections.abc.Callable[Ellipsis, Any], ddb: pyrevs.diagnostics.diagdb.DiagDB)[source]

A base class for diagnostic plugins.

Plugins are attached to the trajectory objects.

Variables:
  • _label – the diagnostic label

  • _tid – the ID of the trajectory the plugin is attached to

  • _weight – the weight of the trajectory

abstractmethod get_crossed_levels(new_snapshot: pyrevs.core.Snapshot) list[float][source]

Test to know if diagnostic is needed.

update(old_snapshot: pyrevs.core.Snapshot, new_snapshot: pyrevs.core.Snapshot) None[source]

Standard entry point called after every MCMC step.

diagnosticfactory(configs: dict[str, pyrevs.core.Config], tid: int, tweight: float, workdir: pathlib.Path, fprocess: collections.abc.Callable[Ellipsis, Any], ddb: pyrevs.diagnostics.diagdb.DiagDB) list[DiagnosticPlugin][source]

Parse input parameters to generate a list of DiagnosticPlugin.

Parameters:
  • configs – a dict with a Config object for each diagnostic

  • tid – the ID of the traj the diagnostic is attached to

  • tweight – the weight of the traj

  • workdir – the workdir associated with a trajectory

  • fprocess – the forward model diagnostic function

  • ddb – the diagnostic database to add the data to