tanat.zeroing.type package#

Submodules#

tanat.zeroing.type.direct module#

Direct T0 strategy: T0 = a user-provided scalar or per-sequence dict.

class tanat.zeroing.type.direct.DirectT0Setter(*, direct: datetime | date | int | float | None | dict[Any, datetime | date | int | float | None])[source]#

Bases: T0Setter

T0 = a user-supplied scalar or per-sequence mapping.

  • Scalar: the same value is assigned to every sequence.

  • Dict ({seq_id: value}): per-sequence assignment. Keys not present in the pool emit a UserWarning and are ignored. Sequences with no matching key receive _t0 = null.

_t0_nearest_rank is resolved as the last row where t_col T0 (floor semantics). null when _t0 = null or when no row satisfies the constraint.

__init__(*, direct: datetime | date | int | float | None | dict[Any, datetime | date | int | float | None])[source]#
class tanat.zeroing.type.direct.DirectT0Settings(*, direct: Any)[source]#

Bases: object

Settings for the direct T0 strategy.

__init__(*args: Any, **kwargs: Any) None[source]#
direct: Any[source]#
model_dump(*, mode='python', **dump_kwargs)[source]#

Dump settings to a dict via Pydantic serialization.

tanat.zeroing.type.feature module#

Feature-based T0 strategy: T0 = value from a static feature column.

class tanat.zeroing.type.feature.FeatureT0Setter(*, feature: str)[source]#

Bases: T0Setter

T0 = value of a static feature column, one value per sequence.

The named feature must exist in the pool’s static features and its dtype must exactly match the pool’s time index dtype.

__init__(*, feature: str)[source]#
compute_from_trajectory(target: TrajectoryPool, on: str | None = None) pl.DataFrame[source]#

Read T0 from a trajectory-level static feature column.

Uses target.static_data (public API) so no coupling to trajectory store internals is needed.

Parameters:
  • target – The trajectory pool.

  • on – Ignored for the feature strategy (T0 comes from the trajectory-level static feature, not a sub-pool).

Returns:

Complete [id_col, _T0_] DataFrame stored in self._df.

Raises:
  • KeyError – If the feature does not exist in trajectory static features.

  • TypeError – If the feature dtype does not match the trajectory’s time index dtype.

class tanat.zeroing.type.feature.FeatureT0Settings(*, feature: str)[source]#

Bases: object

Settings for the feature-based T0 strategy.

__init__(*args: Any, **kwargs: Any) None[source]#
feature: str[source]#
model_dump(*, mode='python', **dump_kwargs)[source]#

Dump settings to a dict via Pydantic serialization.

tanat.zeroing.type.position module#

Position-based T0 strategy: T0 = row at a given position (0-based, negative ok).

class tanat.zeroing.type.position.PositionT0Setter(*, position: int = 0, anchor: Literal['start', 'end', 'middle'] | None = None)[source]#

Bases: T0Setter

T0 = temporal value of the row at position for each sequence.

Supports 0-based positive indexing (position=0 → first row) and negative indexing (position=-1 → last row). Sequences shorter than abs(position) + 1 rows receive _t0 = null.

__init__(*, position: int = 0, anchor: Literal['start', 'end', 'middle'] | None = None)[source]#
class tanat.zeroing.type.position.PositionT0Settings(*, position: int = 0, anchor: Literal['start', 'end', 'middle'] | None = None)[source]#

Bases: object

Settings for the position-based T0 strategy.

__init__(*args: Any, **kwargs: Any) None[source]#
anchor: Literal['start', 'end', 'middle'] | None = None[source]#
model_dump(*, mode='python', **dump_kwargs)[source]#

Dump settings to a dict via Pydantic serialization.

position: int = 0[source]#

tanat.zeroing.type.query module#

Query-based T0 strategy: T0 = timestamp of first/last row matching a Polars expression.

class tanat.zeroing.type.query.QueryT0Setter(*, query: Expr, anchor: Literal['start', 'end', 'middle'] | None = None, use_first: bool = True)[source]#

Bases: T0Setter

T0 = timestamp of the first (or last) entity row matching query.

  • query: any pl.Expr that evaluates to a boolean Series on any column of the sequence data (time columns or entity features), e.g. pl.col("event") == "admission" or pl.col("t_start") > threshold.

  • use_first=True (default): earliest matching row per sequence.

  • use_first=False: latest matching row.

  • anchor: which time column to read ("start" / "end"). For event sequences the anchor is ignored.

Sequences with no matching row receive _t0 = null.

__init__(*, query: Expr, anchor: Literal['start', 'end', 'middle'] | None = None, use_first: bool = True)[source]#
class tanat.zeroing.type.query.QueryT0Settings(*, query: pl.Expr, anchor: Literal['start', 'end', 'middle'] | None = None, use_first: bool = True)[source]#

Bases: object

Settings for the query-based T0 strategy.

The query expression is evaluated against the full sequence row set (time columns + entity features), so it can reference any of those columns.

__init__(*args: Any, **kwargs: Any) None[source]#
anchor: Literal['start', 'end', 'middle'] | None = None[source]#
model_dump(*, mode='python', **dump_kwargs)[source]#

Dump settings to a dict via Pydantic serialization.

query: Expr[source]#
use_first: bool = True[source]#

Module contents#

T0Setter subtypes.

class tanat.zeroing.type.DirectT0Setter(*, direct: datetime | date | int | float | None | dict[Any, datetime | date | int | float | None])[source]#

Bases: T0Setter

T0 = a user-supplied scalar or per-sequence mapping.

  • Scalar: the same value is assigned to every sequence.

  • Dict ({seq_id: value}): per-sequence assignment. Keys not present in the pool emit a UserWarning and are ignored. Sequences with no matching key receive _t0 = null.

_t0_nearest_rank is resolved as the last row where t_col T0 (floor semantics). null when _t0 = null or when no row satisfies the constraint.

__init__(*, direct: datetime | date | int | float | None | dict[Any, datetime | date | int | float | None])[source]#
class tanat.zeroing.type.DirectT0Settings(*, direct: Any)[source]#

Bases: object

Settings for the direct T0 strategy.

__init__(*args: Any, **kwargs: Any) None[source]#
direct: Any[source]#
model_dump(*, mode='python', **dump_kwargs)[source]#

Dump settings to a dict via Pydantic serialization.

class tanat.zeroing.type.FeatureT0Setter(*, feature: str)[source]#

Bases: T0Setter

T0 = value of a static feature column, one value per sequence.

The named feature must exist in the pool’s static features and its dtype must exactly match the pool’s time index dtype.

__init__(*, feature: str)[source]#
compute_from_trajectory(target: TrajectoryPool, on: str | None = None) pl.DataFrame[source]#

Read T0 from a trajectory-level static feature column.

Uses target.static_data (public API) so no coupling to trajectory store internals is needed.

Parameters:
  • target – The trajectory pool.

  • on – Ignored for the feature strategy (T0 comes from the trajectory-level static feature, not a sub-pool).

Returns:

Complete [id_col, _T0_] DataFrame stored in self._df.

Raises:
  • KeyError – If the feature does not exist in trajectory static features.

  • TypeError – If the feature dtype does not match the trajectory’s time index dtype.

class tanat.zeroing.type.FeatureT0Settings(*, feature: str)[source]#

Bases: object

Settings for the feature-based T0 strategy.

__init__(*args: Any, **kwargs: Any) None[source]#
feature: str[source]#
model_dump(*, mode='python', **dump_kwargs)[source]#

Dump settings to a dict via Pydantic serialization.

class tanat.zeroing.type.PositionT0Setter(*, position: int = 0, anchor: Literal['start', 'end', 'middle'] | None = None)[source]#

Bases: T0Setter

T0 = temporal value of the row at position for each sequence.

Supports 0-based positive indexing (position=0 → first row) and negative indexing (position=-1 → last row). Sequences shorter than abs(position) + 1 rows receive _t0 = null.

__init__(*, position: int = 0, anchor: Literal['start', 'end', 'middle'] | None = None)[source]#
class tanat.zeroing.type.PositionT0Settings(*, position: int = 0, anchor: Literal['start', 'end', 'middle'] | None = None)[source]#

Bases: object

Settings for the position-based T0 strategy.

__init__(*args: Any, **kwargs: Any) None[source]#
anchor: Literal['start', 'end', 'middle'] | None = None[source]#
model_dump(*, mode='python', **dump_kwargs)[source]#

Dump settings to a dict via Pydantic serialization.

position: int = 0[source]#
class tanat.zeroing.type.QueryT0Setter(*, query: Expr, anchor: Literal['start', 'end', 'middle'] | None = None, use_first: bool = True)[source]#

Bases: T0Setter

T0 = timestamp of the first (or last) entity row matching query.

  • query: any pl.Expr that evaluates to a boolean Series on any column of the sequence data (time columns or entity features), e.g. pl.col("event") == "admission" or pl.col("t_start") > threshold.

  • use_first=True (default): earliest matching row per sequence.

  • use_first=False: latest matching row.

  • anchor: which time column to read ("start" / "end"). For event sequences the anchor is ignored.

Sequences with no matching row receive _t0 = null.

__init__(*, query: Expr, anchor: Literal['start', 'end', 'middle'] | None = None, use_first: bool = True)[source]#
class tanat.zeroing.type.QueryT0Settings(*, query: pl.Expr, anchor: Literal['start', 'end', 'middle'] | None = None, use_first: bool = True)[source]#

Bases: object

Settings for the query-based T0 strategy.

The query expression is evaluated against the full sequence row set (time columns + entity features), so it can reference any of those columns.

__init__(*args: Any, **kwargs: Any) None[source]#
anchor: Literal['start', 'end', 'middle'] | None = None[source]#
model_dump(*, mode='python', **dump_kwargs)[source]#

Dump settings to a dict via Pydantic serialization.

query: Expr[source]#
use_first: bool = True[source]#