pyrevs.diagnostics ================== .. py:module:: pyrevs.diagnostics Submodules ---------- .. toctree:: :maxdepth: 1 /autoapi/pyrevs/diagnostics/diagdb/index /autoapi/pyrevs/diagnostics/diagnostic/index Classes ------- .. autoapisummary:: pyrevs.diagnostics.DiagDB pyrevs.diagnostics.DiagnosticAnalyst pyrevs.diagnostics.DiagnosticPlugin Functions --------- .. autoapisummary:: pyrevs.diagnostics.diagnosticfactory Package Contents ---------------- .. py:class:: DiagDB(file_name: str | None = None, in_memory: bool = False, ro_mode: bool = False) Bases: :py:obj:`pyrevs.core.sqlmanager.BaseSQLManager` A database to keep track of the diagnostics data. Diagnostic entries are agregated in single table. Each entry is associated to a trajectory and its weight in the ensemble. .. py:method:: default_name() -> str :classmethod: Default name for the database file. .. py:method:: add_diagnostic_entry(diaglabel: str, traj_id: int, level: float, time: float, weight: float, ldata: bytes) -> None Atomic insert of a diagnostic snapshot. The data schema assumes that any new addition to the database is made on an active trajectory. :param diaglabel: the label of the diagnostic inserting the entry :param traj_id: the ID of the traj adding the entry :param level: the score level of the entry :param time: the trajectory time at which the diagnostic was triggered :param weight: the weight of the trajectory :param ldata: the actual model data stored in the database .. py:method:: get_highest_recorded_level(traj_id: int, label: str) -> float Return the maximum level already recorded for this traj/label. :param traj_id: the ID of a trajectory :param label: the label of the diagnostic targeter :returns: the highest value of level_crossed .. py:method:: duplicate_diagnostic_history(ancestor_id: int, discarded_id: int, new_id: int, new_weight: float, threshold: float) -> int Copy diagnostic entries from an ancestor to a descendant. Copies all entries where level_crossed <= threshold. Returns the number of entries duplicated. The entries belonging to the discarded trajectory are set to inactive. :param ancestor_id: the ID of the ancestor to copy data from :param discarded_id: the ID of the discarded trajectory (during sampling iterations) :param new_id: the ID of the new child trajectory :param new_weight: the weight of the new child trajectory :param threshold: the score threshold up to which copy must be performed .. py:method:: update_all_active_weights(new_weight: float) -> int Update all the active trajectories weight. :param new_weight: the updated weight :returns: the number of trajectory updated .. py:method:: get_diagnostic_data(label: str) -> dict[float, list[tuple[Any, float, float, int]]] Retrieve all diagnostic snapshots for a specific label. :param label: the label of the diagnostic of interest :returns: A dictionary mapping each iso-level (float) to a list of tuples. Each tuple contains (unpickled_data, trajectory_weight, time, tid). .. py:method:: dump_to_json(json_path: str) -> None Export the entire diagnostic database to a JSON file. Note that the content of the data stored in the database is omitted. Only the metadata of each stored data is dumpe for debuggin purposes. .. py:method:: close() -> None Dispose of the engine and clear connections. .. py:class:: DiagnosticAnalyst(db_path: str) A class to handle analysing diagnostic statistics. Let's keep the analysis logic separated from the gathering logic. This class retrieves data from the diagnostic database and perform some computation (mostly conditional statistics on score iso-levels). .. py:attribute:: db .. py:method:: get_diagnostic_data(label: str) -> dict[float, list[tuple[Any, float, float, int]]] A user-facing access to the diag DB. An alias to the DB access for the analyst. :returns: A dictionary mapping each score iso-level (float) to a list of tuples. Each tuple contains (unpickled_data, trajectory_weight, time, tid). .. py:method:: compute_weighted_stats(label: str) -> dict[float, dict[str, Any]] Aggregate data and compute mean/variance per level. .. py:method:: get_conditional_means(label: str) -> dict[float, dict[str, Any]] Compute means at level L_i conditioned on reaching or not reaching L_{i+1}. :returns: {"reached_next": mean_val, "failed_next": mean_val} } :rtype: { level .. py:class:: DiagnosticPlugin(dlabel: str, params: dict[Any, Any], tid: int, weight: float, workdir: pathlib.Path, fprocess: collections.abc.Callable[Ellipsis, Any], ddb: pyrevs.diagnostics.diagdb.DiagDB) A base class for diagnostic plugins. Plugins are attached to the trajectory objects. :ivar _label: the diagnostic label :ivar _tid: the ID of the trajectory the plugin is attached to :ivar _weight: the weight of the trajectory .. py:method:: get_crossed_levels(new_snapshot: pyrevs.core.Snapshot) -> list[float] :abstractmethod: Test to know if diagnostic is needed. .. py:method:: update(old_snapshot: pyrevs.core.Snapshot, new_snapshot: pyrevs.core.Snapshot) -> None Standard entry point called after every MCMC step. .. py:function:: diagnosticfactory(configs: dict[str, pyrevs.core.Config], tid: int, tweight: float, workdir: pathlib.Path, fprocess: collections.abc.Callable[Ellipsis, Any], ddb: pyrevs.diagnostics.diagdb.DiagDB) -> list[DiagnosticPlugin] Parse input parameters to generate a list of DiagnosticPlugin. :param configs: a dict with a Config object for each diagnostic :param tid: the ID of the traj the diagnostic is attached to :param tweight: the weight of the traj :param workdir: the workdir associated with a trajectory :param fprocess: the forward model diagnostic function :param ddb: the diagnostic database to add the data to