pytams.trajdb

A class for the TAMS data as an SQL database using SQLAlchemy.

Attributes

Classes

TrajBase

A base class for the tables.

Trajectory

A table storing the active trajectories.

ArchivedTrajectory

A table storing the archived trajectories.

SplittingIterations

A table storing the splitting iterations.

TrajDB

A database holding TAMS trajectory/iterations repertoire.

Module Contents

class TrajBase[source]

Bases: sqlalchemy.orm.DeclarativeBase

A base class for the tables.

class Trajectory[source]

Bases: TrajBase

A table storing the active trajectories.

id: sqlalchemy.orm.Mapped[int][source]
traj_file: sqlalchemy.orm.Mapped[str][source]
t_metadata: sqlalchemy.orm.Mapped[dict][source]
status: sqlalchemy.orm.Mapped[str][source]
class ArchivedTrajectory[source]

Bases: TrajBase

A table storing the archived trajectories.

id: sqlalchemy.orm.Mapped[int][source]
traj_file: sqlalchemy.orm.Mapped[str][source]
t_metadata: sqlalchemy.orm.Mapped[dict][source]
class SplittingIterations[source]

Bases: TrajBase

A table storing the splitting iterations.

id: sqlalchemy.orm.Mapped[int][source]
split_id: sqlalchemy.orm.Mapped[int][source]
bias: sqlalchemy.orm.Mapped[int][source]
weight: sqlalchemy.orm.Mapped[float][source]
discarded_traj_ids: sqlalchemy.orm.Mapped[list[int]][source]
ancestor_traj_ids: sqlalchemy.orm.Mapped[list[int]][source]
min_vals: sqlalchemy.orm.Mapped[list[float]][source]
min_max: sqlalchemy.orm.Mapped[list[float]][source]
status: sqlalchemy.orm.Mapped[str][source]
valid_statuses = ['locked', 'idle', 'completed'][source]
class TrajDB(file_name: str, in_memory: bool = False, ro_mode: bool = False)[source]

Bases: pytams.sqlmanager.BaseSQLManager

A database holding TAMS trajectory/iterations repertoire.

Allows atomic access to an SQL database from all the workers.

Note: TAMS works with Python indexing starting at 0, while SQL indexing starts at 1. Trajectory ID is updated accordingly when accessing/updating the DB.

Variables:

_file_name – The file name

add_trajectory(traj_file: str, metadata: dict) None[source]

Add a new trajectory to the DB.

Parameters:
  • traj_file – The trajectory file of that trajectory

  • metadata – a dict with the metadata

Raises:

SQLAlchemyError if the DB could not be accessed

update_trajectory(traj_id: int, traj_file: str, metadata: dict) None[source]

Update a given trajectory data in the DB.

Parameters:
  • traj_id – The trajectory id

  • traj_file – The new trajectory file of that trajectory

  • metadata – a dict with the trajectory metadata

Raises:

SQLAlchemyError if the DB could not be accessed

update_trajectory_weight(traj_id: int, weight: float) None[source]

Update a given trajectory weight in the DB.

Parameters:
  • traj_id – The trajectory id

  • weight – the new trajectory weight

Raises:

SQLAlchemyError if the DB could not be accessed

lock_trajectory(traj_id: int, allow_completed_lock: bool = False) bool[source]

Set the status of a trajectory to “locked” if possible.

Parameters:
  • traj_id – The trajectory id

  • allow_completed_lock – Allow to lock a “completed” trajectory

Returns:

True if the trajectory was successfully locked, False otherwise

Raises:
  • ValueError if the trajectory with the given id does not exist

  • SQLAlchemyError if the DB could not be accessed

mark_trajectory_as_completed(traj_id: int) None[source]

Set the status of a trajectory to “completed” if possible.

Parameters:

traj_id – The trajectory id

Raises:
  • ValueError if the trajectory with the given id does not exist

  • SQLAlchemyError if the DB could not be accessed

release_trajectory(traj_id: int) None[source]

Set the status of a trajectory to “idle” if possible.

Parameters:

traj_id – The trajectory id

Raises:

ValueError if the trajectory with the given id does not exist

get_trajectory_count() int[source]

Get the number of trajectories in the DB.

Returns:

The number of trajectories

get_ended_trajectory_count() int[source]

Return the number of trajectories that have ‘ended’ in their metadata.

get_converged_trajectory_count() int[source]

Return the number of trajectories that have ‘converged’ in their metadata.

get_total_computed_steps() int[source]

Sum the ‘nstep_compute’ field across all active and archived trajectories.

fetch_trajectory(traj_id: int) tuple[str, dict][source]

Get the trajectory file of a trajectory.

Parameters:

traj_id – The trajectory id

Returns:

A tuple with trajectory file as a str and the trajectory metadata as dict

Raises:

ValueError if the trajectory with the given id does not exist

release_all_trajectories() None[source]

Release all trajectories in the DB.

archive_trajectory(traj_file: str, metadata: dict) None[source]

Add a new trajectory to the archive container.

Parameters:
  • traj_file – The trajectory file of that trajectory

  • metadata – a dict with the traj metadata

fetch_archived_trajectory(traj_id: int) tuple[str, dict][source]

Get the trajectory file of a trajectory in the archive.

Parameters:

traj_id – The trajectory id

Returns:

A tuple with trajectory file as a str and the trajectory metadata as dict

Raises:

ValueError if the trajectory with the given id does not exist

get_archived_trajectory_count() int[source]

Get the number of trajectories in the archive.

Returns:

The number of trajectories

clear_archived_trajectories() int[source]

Delete the content of the archived traj table.

Returns:

The number of entries deleted

add_splitting_data(k: int, bias: int, weight: float, discarded_ids: list[int], ancestor_ids: list[int], min_vals: list[float], min_max: list[float]) None[source]

Add a new splitting data to the DB.

Parameters:
  • k – The splitting iteration index

  • bias – The number of restarted trajectories

  • weight – Weight of the ensemble at the current iteration

  • discarded_ids – The list of discarded trajectory ids

  • ancestor_ids – The list of trajectories used to restart

  • min_vals – The list of minimum values

  • min_max – The score minimum and maximum values

update_splitting_data(k: int, bias: int, weight: float, discarded_ids: list[int], ancestor_ids: list[int], min_vals: list[float], min_max: list[float]) None[source]

Update the last splitting data row to the DB.

Parameters:
  • k – The splitting iteration index

  • bias – The number of restarted trajectories

  • weight – Weight of the ensemble at the current iteration

  • discarded_ids – The list of discarded trajectory ids

  • ancestor_ids – The list of trajectories used to restart

  • min_vals – The list of minimum values

  • min_max – The score minimum and maximum values

mark_last_iteration_as_completed() None[source]

Mark the last splitting iteration as complete.

By default, iteration data append to the SQL table with a state “locked” to indicate an iteration being worked on. Upon completion, mark it as “completed” otherwise the iteration is considered incomplete, i.e. interrupted by some error or wall clock limit.

get_k_split() int[source]

Get the current splitting iteration counter.

Returns:

The ksplit from the last entry in the SplittingIterations table

check_new_min_of_maxes(newmin: float) None[source]

Compare the incoming min to the last entry.

When running TAMS, at each new iteration the ensemble minimum of maximum should be strictly above the previous iteration’s one.

Parameters:

newmin – the new minimum of maximums

get_iteration_count() int[source]

Get the number of splitting iteration stored.

Returns:

The length of the SplittingIterations table

fetch_splitting_data(k_id: int) tuple[int, int, float, list[int], list[int], list[float], list[float], str] | None[source]

Get the splitting iteration data for a given iteration.

Parameters:

k_id – The iteration id

Returns:

The splitting iteration data

Raises:

ValueError if the splitting iteration with the given id does not exist

get_ongoing() list[int] | None[source]

Get the list of ongoing trajectories if any.

Returns:

Either a list trajectories or None if nothing was left to do

get_weights() numpy.typing.NDArray[numpy.number][source]

Read the weights from the database.

Returns:

the weight for each splitting iteration as a numpy array

get_biases() numpy.typing.NDArray[numpy.number][source]

Read the biases from the database.

Returns:

the bias for each splitting iteration as a numpy array

get_minmax() numpy.typing.NDArray[numpy.number][source]

Read the min/max from the database.

Returns:

the 2D Numpy array with k_index, min, max

clear_splitting_data() int[source]

Delete the content of the splitting data table.

Returns:

The number of entries deleted

dump_file_json(json_file: str | None = None) None[source]

Dump the content of the trajectory table to a json file.

Parameters:

json_file – an optional file name (or path) to dump the data to