pyrevs.core =========== .. py:module:: pyrevs.core Submodules ---------- .. toctree:: :maxdepth: 1 /autoapi/pyrevs/core/config/index /autoapi/pyrevs/core/fmodel/index /autoapi/pyrevs/core/runtime_cfg/index /autoapi/pyrevs/core/snapshot/index /autoapi/pyrevs/core/sqlcore/index /autoapi/pyrevs/core/sqlmanager/index Classes ------- .. autoapisummary:: pyrevs.core.Config pyrevs.core.MergePolicy pyrevs.core.ForwardModelBaseClass pyrevs.core.RuntimeConfig pyrevs.core.Snapshot pyrevs.core.CoreBase pyrevs.core.CoreDB Package Contents ---------------- .. py:class:: Config(data: collections.abc.Mapping[str, Any]) Lightweight structured access to configuration. pyREVS input parameters are mostly handled through dataclasses to ensure default values are available and types are enforced: TOML file -> Config -> Typed dataclass. The typed dataclass are the final object used to instanciate pyREVS objects (Sampler, Database, Trajectories, ...) The Config class provides a simple interface to access configuration parameters in a more structured way. To add a new section to the TOML input file, create a new dataclass with a `__section__` class attribute set to the name of the section. .. py:method:: section(name: str) -> Config Return a configuration section. .. py:method:: section_dict(name: str) -> dict[str, Any] Return a section as dict. .. py:method:: require(section: str, key: str) -> Any Get a required parameter. .. py:method:: get(section: str, key: str, default: Any = None) -> Any Get an optional parameter. .. py:method:: as_dict() -> dict[str, Any] Return the config Mapping as a dict. .. py:method:: load(cls: type[T]) -> T Load a dataclass of the provided type from this config section. .. py:class:: MergePolicy A merge policy for configuration dataclasses. .. py:attribute:: IMMUTABLE :value: 'immutable' .. py:attribute:: REPLACE :value: 'replace' .. py:class:: ForwardModelBaseClass(a_id: int, deterministic: bool, params: dict[str, Any] | None = None, workdir: pathlib.Path | None = None) Bases: :py:obj:`abc.ABC`, :py:obj:`Generic`\ [\ :py:obj:`T_Noise`\ , :py:obj:`T_State`\ ] A base class for the stochastic forward model. pyREVS relies on a separation of the stochastic model, encapsulating the physics of interest, and the sampling algorithm itself. The ForwardModelBaseClass defines the API the sampling algorithm requires from the stochastic model. Concrete model classes must implement all the abstract functions defined in this base class. The base class handles some components needed by pyREVS, so that the user does not have to ensure compatibility with pyREVS requirements. :ivar _noise: the noise to be used in the next model step :ivar _step: the current stochastic step counter :ivar _time: the current stochastic time :ivar _workdir: the working directory .. py:method:: advance(dt: float, need_end_state: bool) -> float Base class advance function of the model. This is the advance function called by pyREVS internals. It handles updating the model time and step counter, as well as reusing or generating noise only when needed. It also handles exceptions. :param dt: the time step size over which to advance :param need_end_state: whether the step end state is needed :returns: Some model will not do exactly dt (e.g. sub-stepping) return the actual dt .. py:property:: noise :type: T_Noise Return the model's latest noise increment. .. py:method:: clear() -> None Destroy internal data. .. py:method:: set_workdir(workdir: pathlib.Path) -> None Setter of the model working directory. :param workdir: the new working directory .. py:method:: get_current_state() -> T_State :abstractmethod: Return the current state of the model. Note that the return type is left to the concrete model definition. .. py:method:: set_current_state(state: T_State) -> None :abstractmethod: Set the current state of the model. :param state: the externally provide state .. py:method:: score() -> float :abstractmethod: Return the model's current state score. The score is a real. :returns: the score associated with the current model state .. py:method:: make_noise() -> T_Noise :abstractmethod: Return the model's latest noise increment. Note that the noise type is left to the concrete model definition. :returns: The model next noise increment .. py:method:: post_trajectory_branching_hook(step: int, time: float) -> None Model post trajectory branching hook. :param step: the current step counter :param time: the time of the simulation .. py:method:: post_trajectory_restore_hook(step: int, time: float) -> None Model post trajectory restore hook. :param step: the current step counter :param time: the time of the simulation .. py:method:: diagnostic_hook(dlabel: str, tid: int, score_level: float, old_snap: pyrevs.core.snapshot.Snapshot[T_Noise, T_State], new_snap: pyrevs.core.snapshot.Snapshot[T_Noise, T_State]) -> Any :abstractmethod: Diagnostic hook. :param dlabel: the label of the diagnostic calling the hook :param tid: the ID of the trjaectory calling :param score_level: the score level crossed and triggering the call :param old_snap: the snapshot at the beginning of the step :param new_snap: the snapshot at the end of the step .. py:method:: check_convergence(step: int, time: float, current_score: float, target_score: float) -> bool Check if the model has converged. This default implementation checks if the current score is greater than or equal to the target score. The user can override this method to implement a different convergence criterion. :param step: the current step counter :param time: the time of the simulation :param current_score: the current score :param target_score: the target score .. py:method:: check_termination(step: int, time: float) -> bool Check for trajectory termination. This default always return False. The user can override this method to implement a different termination criterion. Note that simple termination criteria (e.g. end_time, score_min) are handled elsewhere. This function should be used for more complex termination criteria, e.g. entering a given region of the model phase space. :param step: the current step counter :param time: the time of the simulation .. py:method:: name() -> str :classmethod: Return a the model name. .. py:class:: RuntimeConfig Runtime configuration. .. py:attribute:: loglevel :type: str :value: 'INFO' .. py:attribute:: logfile :type: str | None :value: None .. py:attribute:: walltime :type: float :value: 86400 .. py:attribute:: plot_diagnostics :type: bool :value: False .. py:attribute:: diagnostics :type: list[str] :value: [] .. py:class:: Snapshot Bases: :py:obj:`Generic`\ [\ :py:obj:`T_Noise`\ , :py:obj:`T_State`\ ] A dataclass defining a snapshot. Gathering what defines a snapshot into an object. The time and score are of float type, but the actual type of the noise and state are completely determined by the forward model. A snapshot is allowed to have a state or not to accommodate memory savings. :ivar time: snapshot time :ivar score: score function value :ivar noise: noise used to reach this snapshot :ivar state: model state .. py:attribute:: time :type: float .. py:attribute:: score :type: float .. py:attribute:: noise :type: T_Noise .. py:attribute:: state :type: T_State | None :value: None .. py:property:: has_state :type: bool Check if snapshot has state. :returns: True if state is not None :rtype: bool .. py:class:: CoreBase Bases: :py:obj:`sqlalchemy.orm.DeclarativeBase` A base class for the tables. .. py:class:: CoreDB(file_name: str, in_memory: bool = False, ro_mode: bool = False) Bases: :py:obj:`pyrevs.core.sqlmanager.BaseSQLManager` A database holding pyREVS trajectories. Allows atomic access to an SQL database from all the workers. Note: pyREVS works with Python indexing starting at 0, while SQL indexing starts at 1. Trajectory ID is updated accordingly when accessing/updating the DB. :ivar _file_name: The file name .. py:method:: add_trajectory(traj_file: str, metadata: dict) -> None Add a new trajectory to the DB. :param traj_file: The trajectory file of that trajectory :param metadata: a dict with the metadata :raises SQLAlchemyError if the DB could not be accessed: .. py:method:: update_trajectory(traj_id: int, traj_file: str, metadata: dict) -> None Update a given trajectory data in the DB. :param traj_id: The trajectory id :param traj_file: The new trajectory file of that trajectory :param metadata: a dict with the trajectory metadata :raises SQLAlchemyError if the DB could not be accessed: .. py:method:: update_trajectory_weight(traj_id: int, weight: float) -> None Update a given trajectory weight in the DB. :param traj_id: The trajectory id :param weight: the new trajectory weight :raises SQLAlchemyError if the DB could not be accessed: .. py:method:: lock_trajectory(traj_id: int, allow_completed_lock: bool = False) -> bool Set the status of a trajectory to "locked" if possible. :param traj_id: The trajectory id :param allow_completed_lock: Allow to lock a "completed" trajectory :returns: True if the trajectory was successfully locked, False otherwise :raises ValueError if the trajectory with the given id does not exist: :raises SQLAlchemyError if the DB could not be accessed: .. py:method:: mark_trajectory_as_completed(traj_id: int) -> None Set the status of a trajectory to "completed" if possible. :param traj_id: The trajectory id :raises ValueError if the trajectory with the given id does not exist: :raises SQLAlchemyError if the DB could not be accessed: .. py:method:: release_trajectory(traj_id: int) -> None Set the status of a trajectory to "idle" if possible. :param traj_id: The trajectory id :raises ValueError if the trajectory with the given id does not exist: .. py:method:: get_trajectory_count() -> int Get the number of trajectories in the DB. :returns: The number of trajectories .. py:method:: get_terminated_trajectory_count() -> int Return the number of trajectories that have 'terminated' in their metadata. .. py:method:: get_converged_trajectory_count() -> int Return the number of trajectories that have 'converged' in their metadata. .. py:method:: get_total_computed_steps() -> int Sum the 'nstep_compute' field across all active and archived trajectories. .. py:method:: fetch_trajectory(traj_id: int) -> tuple[str, dict] Get the trajectory file of a trajectory. :param traj_id: The trajectory id :returns: A tuple with trajectory file as a str and the trajectory metadata as dict :raises ValueError if the trajectory with the given id does not exist: .. py:method:: check_trajectory_exist(traj_id: int) -> bool Check if a trajectory exist for a given index. :param traj_id: The trajectory id :returns: True if the trajectory exist, False otherwise .. py:method:: release_all_trajectories() -> None Release all trajectories in the DB. .. py:method:: archive_trajectory(traj_file: str, metadata: dict) -> None Add a new trajectory to the archive container. :param traj_file: The trajectory file of that trajectory :param metadata: a dict with the traj metadata .. py:method:: fetch_archived_trajectory(traj_id: int) -> tuple[str, dict] Get the trajectory file of a trajectory in the archive. :param traj_id: The trajectory id :returns: A tuple with trajectory file as a str and the trajectory metadata as dict :raises ValueError if the trajectory with the given id does not exist: .. py:method:: get_archived_trajectory_count() -> int Get the number of trajectories in the archive. :returns: The number of trajectories .. py:method:: clear_archived_trajectories() -> int Delete the content of the archived traj table. :returns: The number of entries deleted .. py:method:: dump_file_json(json_file: str | None = None) -> None Dump the content of the trajectory table to a json file. :param json_file: an optional file name (or path) to dump the data to