# TanaT - Complete Documentation TanaT (Temporal ANalysis of Trajectories) is a Python library for temporal sequence analysis, focused on patient care pathways. It supports multi-sequence trajectories combining events, intervals, and states. ============================================================ ## Core Concepts Core Concepts This page introduces the fundamental concepts of *TanaT*'s data model. Understanding these concepts is essential for using the library effectively. *TanaT* organises temporal data in three nested levels: *entities*, *sequences*, and *trajectories*. For population-level analysis, TanaT groups sequences and trajectories into *pools*. Entities, Sequences, and Trajectories *TanaT* distinguishes three levels of temporal data structures: .. list-table:: :header-rows: 1 :widths: 20 40 40 * - Level - Description - Example * - **Entity** - A single observation with temporal extent - A medical visit, a hospitalization * - **Sequence** - Collection of entities for one individual - All visits of patient P001 * - **Trajectory** - Multiple sequences for one individual - Visits + hospitalizations + lab results for P001 Entity An :term:`entity` is the atomic unit of temporal data in a :term:`sequence ` object. It has: - **Features**: One or more descriptive attributes (e.g., visit type, diagnosis code) - **Temporal extent**: Either a single timestamp or a time interval The temporal extent nature and feature structure are formalized through :term:`metadata`. Sequence A :term:`sequence` is a temporal arrangement of entities described by the same :term:`metadata`. All entities in a sequence share the same type (events, intervals, or states) and the same feature structure. See the examples to build and explore each type. The diagram below shows a sequence with 4 event entities. Note that two events can share the same timestamp (Event `A` and Event `B` on *Nov 8*). Trajectory A :term:`trajectory` combines multiple sequences of different types for the same individual. For a complete walkthrough, see the example. The diagram below shows a trajectory with three sequence types: Sequence Types TanaT supports three types of temporal extent: .. list-table:: :header-rows: 1 :widths: 20 35 45 * - Type - Temporal Extent - Constraints * - **Event** - Single timestamp (punctual) - None * - **Interval** - Start and end dates - Can overlap, gaps allowed * - **State** - Start and end dates - Contiguous, no overlap, no gaps **When to use each type:** - **Event**: Point-in-time occurrences (visits, purchases, clicks). Use `tanat.sequence.shortcuts.build_events`. - **Interval**: Duration-based events that can overlap (treatments, projects). Use `tanat.sequence.shortcuts.build_intervals`. - **State**: Continuous states without gaps (disease stages, employment status). Use `tanat.sequence.shortcuts.build_states`. Pools A :term:`pool` is a collection of sequences or trajectories for multiple individuals. All individual sequences of a pool share the same structure (same features, same temporal type). Pools are the primary data structure for analysis operations like computing distance matrices or clustering. Pools can be created with shortcut functions (`tanat.sequence.shortcuts.build_events`, `tanat.sequence.shortcuts.build_intervals`, `tanat.sequence.shortcuts.build_states`) or via the lower-level builder pattern for multi-source ingestion. See or for the full builder reference. Static Data A sequence can be complemented by non-temporal features, so-called :term:`static features ` (attributes like birth date or gender). Similarly to temporal features, static features are also described through the :term:`metadata`. More specifically, each static feature has a type. Static features are the same for all sequences belonging to a pool. See Also * : Minimal working example to get started quickly * : All terms defined in one place * : Build pools from DataFrames, Parquet, CSV, or SQL * : Iterate, navigate, transform, and split pools * : Align sequences to a common reference date (T0) * : Inspect and cast feature types * : Complete API documentation ---------------------------------------- ## First Steps First Steps This guide walks you through the core TanaT workflow: loading data, choosing the right sequence type, and exploring your temporal data. .. note:: Make sure TanaT is installed: `pip install tanat` (see ). 1. Prepare Your Data A typical data structure that fits the *TanaT*'s meeds is a pandas DataFrames containing the events of a cohort of individuals. Such table may be referred as a *table of events*. Each row describes one event and is indexed by both an identifier of the individual and a temporal extend. This example illustrates of such a table inspired by the MIMIC database: import pandas as pd # Sample data: patient visits data = pd.DataFrame({ 'patient_id': ['P001', 'P001', 'P001', 'P002', 'P002'], 'visit_date': pd.to_datetime([ '2023-01-15', '2023-02-20', '2023-03-10', '2023-01-20', '2023-03-15' ]), 'visit_type': ['GP', 'SPECIALIST', 'GP', 'GP', 'EMERGENCY'] }) In this table, containing 5 events, the `patient_id` is the identifier of the individuals (there are two individuals). Each event is also timestamped by a `visit_date`. The last column contains information about the event itself. In this case, it is a categorical attribute that gives a type of visit. Note that events can be described by more than one attribute (see for a detailed comparison). This table of events contain the information about the temporal sequences you would like to manipulate. Concretize them as *TanaT* objects, and more specifically a :term:`sequence` :term:`pool`, will ease your work. 2. Choose the Right Sequence Type Before creating a pool, identify which sequence type matches your data (see for a detailed comparison): .. list-table:: :header-rows: 1 :widths: 20 40 40 * - Type - Your data has... - Example * - **EventSequence** - Single timestamps (punctual events) - Medical visits, purchases, clicks * - **IntervalSequence** - Start + end dates (can overlap) - Treatments, hospital stays, projects * - **StateSequence** - Contiguous states (no gaps, no overlap) - Disease stages, employment status For our example, visits are **punctual events** so we use `tanat.sequence.EventSequencePool`. 3. Create a Sequence Pool A :term:`pool` is a *TanaT* object that groups sequences from multiple individuals. An as we want sequence of punctual events, we use the `tanat.sequence.shortcuts.build_events` shortcut function to create the pool from the dataframe above (use `tanat.sequence.shortcuts.build_states` for state sequence, etc.). This function requires to know which are the indexing columns for individuals and time, and it infers all other columns as :term:`entity feature`. from tanat import build_events pool = build_events( temporal_data=data, id_column="patient_id", time_column="visit_date", ) The `pool` is now a *TanaT* object! .. note:: The content of the dataframe has been copied in the pool, meaning that you can delete it to free memory. For more advanced data ingestion settings and format (Parquet, CSV, SQL, multi-source chaining), see the reference. 4. Verify Inferred Metadata Displaying the pool shows a summary of its content, structure and automatically inferred :term:`metadata`. print(pool) ┌──────────────────────────────────────────────┐ │ EventSequencePool Summary │ └──────────────────────────────────────────────┘ Overview ───────────────────────── Sequences 2 Store ~/.tanat/_quick_event_... id_column patient_id Time Index ───────────────────────── Type Datetime [2023-01-15 → 2023-03-15] Columns ['visit_date'] t0 position=0, anchor=None Entity Features (1) ───────────────────────── • visit_type String [len 2 → 10] Before further exploration of your data, this summary allows you to verify the type inference made by the building function. For instance, we see that `visit_type` has been inferred as a string feature, while it could be considered a categorical feature. In this case, we suggest simply casting it to suit your analysis needs (see for cast and override methods). 5. Access Individual Sequences As a pool, this data structure contains a collection of sequences that can be access by their identifier. The code below illustrates how access one sequence, and its internal data. # Get a specific patient's sequence patient = pool['P001'] print(f"Patient P001: {len(patient)} visits") # View the temporal data (id + time + entity features) print(patient.temporal_data().head()) # View the static data (id + static features or None if not provided) print(patient.static_data().head()) `patient.temporal_data()` provides a pandas dataframe similar to the table of events introduced earlier. `patient.static_data()` will return only if sequence identifier in this case, as there is no static (non-temporal) data associated with individuals (see for details). Instead of accessing through an identifier, *TanaT* provides iterators to explore the sequences: # Pool → Sequence : iterate over all sequences for seq in pool: print(seq.id_value, len(seq)) 6. Access Individual Entities Within a sequence, entities are accessed by index (entities are ordered along time axis). Positive and negative indices are both supported: # Get the first entity (visit) in the sequence first_visit = patient[0] # Access entity properties print(first_visit.temporal_extent) # 2023-01-15 00:00:00 print(first_visit.data()) # {'visit_type': 'GP'} # Iterate over all entities in the sequence for entity in patient: print(entity.temporal_extent, entity.data()) Entities of a sequences can also be iterated in a standard Python manner: # Sequence → Entity : iterate over all entities for entity in patient: print(entity.temporal_extent, entity.data()) Next Steps You now know how to build a pool, inspect metadata, and navigate sequences. You are on the right track to visualize, manipulate, and analyze your sequences. Here is the recommended reading order to deepen your understanding: 1. : Understand the data model: entities, sequences, trajectories, and pools. 2. : Self-contained examples for each container type, visualisation, and temporal alignment. 3. : Step-by-step tutorials (multi-source ingestion, real-world applications, ...). 4. : Full technical reference (builder, manipulation, zeroing, metadata, API). ---------------------------------------- ## Installation Installation Using PyPI Install the latest stable release from PyPI: python -m pip install tanat Using the latest GitHub-hosted version To get TanaT's latest development version directly from GitHub: python -m pip install git+https://github.com/TanaT-Lab/TanaT.git Dependencies *TanaT* relies on several foundational libraries from the data science Python ecosystem, including: - `pandas` for convenient tabular data handling - `polars` and `pyarrow` for high-performance columnar data processing - `numpy` and `scipy` for numerical and scientific computing (transitive dependencies) - `matplotlib` for basic visualization - `scikit-learn` for machine learning utilities - `numba` for performance optimization through JIT compilation In addition, *TanaT* makes use of: - `tanat_utils` for shared internal utilities - `tqdm` for progress tracking in processing pipelines (transitive dependency) Optional dependencies SQL support (requires `connectorx`): python -m pip install tanat[sql] ---------------------------------------- ## Builder Builder & Storage Reference for building sequence and trajectory pools from various data sources. The builder pattern lets you chain multiple sources of the same schema before materialising a single store on disk. Builder Lifecycle SequencePool.builder() .add_*() .add_*() .build(name) The result of `.build()` is a path to the store. Wrap it in the corresponding pool class to start working with it: from tanat.sequence import IntervalSequencePool store_path = ( IntervalSequencePool.builder() .add_parquet( "data.parquet", id_column="id", start_column="start", end_column="end", ) .build("my_pool") ) pool = IntervalSequencePool(store=store_path) Source Methods All source methods are available on every `tanat.sequence.base.builder.SequenceStoreBuilder` regardless of pool type. They share the same column-mapping parameters and can be chained freely. .. list-table:: :header-rows: 1 :widths: 27 35 38 * - Method - Input - Notes * - `add_dataframe(df)` - `pandas` or `polars` DataFrame - In-memory; no file path required * - `add_parquet(path)` - `.parquet` file or glob - Glob patterns (`"data/*.parquet"`) are supported * - `add_csv(path)` - `.csv` file - Set `try_parse_dates=True` to auto-parse temporal columns * - `add_sql(con, query)` - SQL query + connection string - Requires `connectorx`; `con` is a DB URI such as `"sqlite:///path.db"` Temporal column names differ by pool type: .. list-table:: :header-rows: 1 :widths: 30 70 * - Pool type - Required temporal columns * - `EventSequencePool` - `time_column` * - `IntervalSequencePool` - `start_column`, `end_column` * - `StateSequencePool` - `start_column` (`end_column` is optional; see `Builder Options`_) Static Features Static features are time-invariant attributes of an individual (age, gender, cohort…). They can be attached at build time or added to an existing pool. **At build time** - pass `is_static=True` to any `add_*()` call: store_path = ( IntervalSequencePool.builder() .add_parquet( "sequences.parquet", id_column="id", start_column="start", end_column="end", features=["value", "label"], ) .add_csv( "demographics.csv", id_column="id", is_static=True, features=["age", "gender"], try_parse_dates=True, ) .build("my_pool") ) **Shortcut functions** (`build_events`, `build_intervals`, `build_states`) - pass the static DataFrame directly via the `static_data` parameter: from tanat import build_intervals pool = build_intervals( temporal_data=df, id_column="id", start_column="start", end_column="end", static_data=static_df, ) **Post-build** - attach static features to an already-built pool: pool.add_static_features(df) # id column auto-detected pool.add_static_features(df, id_column="pid") # explicit join key Multi-Source Chaining Multiple `.add_*()` calls on the same builder merge all rows into one pool. All sources must share the same schema (same `id_column` name, same temporal column names, same feature names). store_path = ( IntervalSequencePool.builder() .add_sql( DB, admissions_query, id_column="subject_id", start_column="admittime", end_column="dischtime", features=["admission_type"], ) .add_parquet( "extra_patients.parquet", id_column="subject_id", start_column="admittime", end_column="dischtime", features=["admission_type"], ) .add_csv( "simulated.csv", id_column="subject_id", start_column="admittime", end_column="dischtime", features=["admission_type"], ) .build("all_admissions") ) .. note:: A temporal dtype mismatch between sources (e.g. one `Datetime[us]`, another `Datetime[ms]`) triggers a warning at registration time and causes an error at `.build()`. Cast the source data to a consistent dtype before calling `add_*`. Builder Options .. list-table:: :header-rows: 1 :widths: 25 22 45 * - Pool type - Option - Purpose * - `IntervalSequencePool` - `sort_anchor` - Controls row ordering within each sequence: `"start"`, `"end"`, or `"middle"` (midpoint of the interval) * - `StateSequencePool` - `end_column` - When omitted, `T_END` is computed as `next(T_START)` within each sequence * - `StateSequencePool` - `end_value` - Sentinel value appended as the last state's `T_END`; defaults to `None` (leaves it null) * - `StateSequencePool` - `validate_continuity` - When `end_column` is provided, raises `ValueError` if gaps exist between consecutive states from tanat.sequence import IntervalSequencePool from tanat.sequence.type.state.pool import StateSequencePool from datetime import datetime # IntervalSequencePool: intervals sorted by their midpoint store_path = ( IntervalSequencePool.builder(sort_anchor="middle") .add_dataframe( df, id_column="id", start_column="start", end_column="end", features=["score"], ) .build("intervals_mid") ) pool = IntervalSequencePool(store=store_path) # StateSequencePool: end derived from next start, sentinel closes the last state store_path = ( StateSequencePool.builder(end_value=datetime(2025, 12, 31)) .add_dataframe( df, id_column="id", start_column="start", features=["phase"], ) .build("states_closed") ) pool = StateSequencePool(store=store_path) Trajectory Composition A `tanat.trajectory.pool.TrajectoryPool` wraps multiple sequence pools under a shared ID space. Each pool is registered under an **alias** that acts as the retrieval key. TrajectoryPool.builder() .add(alias, pool) .add(alias, pool) .build(name) from tanat.trajectory.pool import TrajectoryPool store_path = ( TrajectoryPool.builder() .add("admissions", admissions_pool) .add("pharmacy", pharmacy_pool) .add("procedures", procedures_pool) .build("patient_trajectories") ) tpool = TrajectoryPool(store=store_path) Static features can also be added at trajectory build time via the same `add_static_*` family of methods: .. list-table:: :header-rows: 1 :widths: 40 60 * - Method - Description * - `add_static_dataframe(df)` - In-memory static features for the trajectory * - `add_static_csv(path)` - Static features from a CSV file * - `add_static_parquet(path)` - Static features from a Parquet file * - `add_static_sql(con, query)` - Static features from a SQL query Workspace A **workspace** is a named registry that maps store names to their paths on disk. Once a store is built under a workspace, you can reload it by name without tracking the file path. from tanat import set_workspace, get_workspace set_workspace("~/.tanat_workspace/my_project") ws = get_workspace() # Build and register pool = IntervalSequencePool(store=builder.build("my_pool")) # Reload from workspace (no path needed) pool = ws["my_pool"] # Save a modified pool back under a new name pool.cast_features({"status": pl.Categorical}) pool.save("my_pool_v2") .. list-table:: :header-rows: 1 :widths: 40 60 * - Operation - Code * - Set the active workspace - `set_workspace(path)` * - Get the active workspace object - `get_workspace()` * - Reload a store by name - `ws["name"]` or `IntervalSequencePool(store="name")` * - List all registered stores - `ws` (repr) or `ws.list()` * - Save pool with pending changes - `pool.save("new_name")` See Also * - All pool operations available after building. * - Setting a reference date (T0) after building. * - Building from multiple sources. * - Build and explore container types. ---------------------------------------- ## Clustering Clustering Clustering partitions a pool of sequences or trajectories into groups based on pairwise distances. Available Algorithms .. list-table:: :header-rows: 1 :widths: 20 50 * - Algorithm - Characteristics * - `tanat.clustering.HierarchicalClusterer` - Produces nested partitions at all distance thresholds. * - `tanat.clustering.PAMClusterer` - Medoid-based; robust to outliers; slower on large datasets * - `tanat.clustering.CLARAClusterer` - Scalable variant of PAM; samples subsets repeatedly for speed See Also * - Worked examples for each algorithm * - Sequence and trajectory metrics used to compute distance matrices for clustering ---------------------------------------- ## Criterion Criteria **Criteria** are composable filtering objects that evaluate temporal or static properties of sequences and entities. They expose a uniform three-operation API: .. list-table:: :header-rows: 1 :widths: 25 75 * - Operation - Description * - `pool.which(criterion)` - Returns a `set` of IDs whose sequences satisfy the criterion at **sequence level**. * - `pool.filter_entities(criterion)` - Returns a new pool view where only the entity rows satisfying the criterion are kept (**entity level**). The original pool is unchanged. * - `seq.match(criterion)` - Returns `True` if the single sequence satisfies the criterion. Each criterion declares which **levels** it supports. Applying an unsupported operation raises `tanat.criterion.CriterionLevelError`. EntityCriterion Filter entities or select sequences using any **Polars expression** evaluated against the temporal data. from tanat.criterion import EntityCriterion import polars as pl # Select sequences with at least one "error" row. ids = pool.which(EntityCriterion(query=pl.col("status") == "error")) # Keep only the "error" rows across all sequences. pool2 = pool.filter_entities(EntityCriterion(query=pl.col("status") == "error")) # Combine conditions with any Polars expression. pool3 = pool.filter_entities( EntityCriterion(query=(pl.col("status") == "error") & (pl.col("value") > 0.5)) ) # Single-sequence match. ok = seq.match(EntityCriterion(query=pl.col("status") == "error")) The expression must return a Boolean column. Rows where it evaluates to `True` are kept (`filter_entities`) or counted towards sequence selection (`which`). .. list-table:: :header-rows: 1 :widths: 25 15 60 * - Parameter - Type - Description * - `query` - `pl.Expr` - A Polars expression evaluated per entity row against the temporal data. StaticCriterion Select sequences or trajectories using a **Polars expression evaluated against the static (per-ID) data**. The pool must have static features attached. from tanat.criterion import StaticCriterion import polars as pl # Select IDs whose age exceeds 50. ids = pool.which(StaticCriterion(query=pl.col("age") > 50)) pool2 = pool.subset(ids) # Works identically on a TrajectoryPool. traj_ids = tpool.which(StaticCriterion(query=pl.col("group") == "A")) # Single match. ok = seq.match(StaticCriterion(query=pl.col("age") > 50)) `filter_entities()` is **not supported**: static data has no entity rows. .. list-table:: :header-rows: 1 :widths: 25 15 60 * - Parameter - Type - Description * - `query` - `pl.Expr` - A Polars expression evaluated per ID against the static data frame. TimeCriterion Filter entities or select sequences based on **temporal bounds** on the start and/or end time columns. All bounds are inclusive. import datetime as dt from tanat.criterion import TimeCriterion t0 = dt.datetime(2020, 1, 1) t1 = dt.datetime(2021, 1, 1) # Sequences with at least one entity starting on or after t0. ids = pool.which(TimeCriterion(start_ge=t0)) # Sequences where ALL entities start on or after t0. ids = pool.which(TimeCriterion(start_ge=t0, all_entities=True)) # Entity pruning: keep rows inside [t0, t1] (overlap mode, default). pool2 = pool.filter_entities(TimeCriterion(start_ge=t0, end_le=t1)) # Containment mode: entity interval must be fully inside [t0, t1]. pool3 = pool.filter_entities( TimeCriterion(start_ge=t0, end_le=t1, duration_within=True) ) # Numeric bounds for timestep pools. ids = state_pool.which(TimeCriterion(start_ge=200.0, start_le=400.0)) .. list-table:: :header-rows: 1 :widths: 25 15 60 * - Parameter - Type - Description * - `start_ge` - TimeBound | `None` - Minimum value for the start column (inclusive). * - `start_le` - TimeBound | `None` - Maximum value for the start column (inclusive). * - `end_ge` - TimeBound | `None` - Minimum value for the end column: interval/state pools only. * - `end_le` - TimeBound | `None` - Maximum value for the end column: interval/state pools only. * - `duration_within` - `bool` - `False` (default): overlap is sufficient. `True`: entity interval must be fully contained in the window. * - `all_entities` - `bool` - `False` (default): at least one row must match. `True`: every row must match. .. rubric:: TimeBound `TimeBound = datetime.datetime | datetime.date | int | float` All bounds within a single criterion call must share the same Python type. `datetime` and `date` may be mixed (`datetime` takes precedence). Use `int` or `float` for numeric timestep sequences. Overlap vs containment (two-column pools) For interval and state sequences (duration-based sequences): * **Overlap** (`duration_within=False`, default): entity `[s, e]` overlaps window `[lo, hi]` when `s ≤ hi AND e ≥ lo`. Provide `start_ge=lo, end_le=hi`. * **Containment** (`duration_within=True`): entity is fully inside when `s ≥ lo AND e ≤ hi`. Provide `start_ge=lo, end_le=hi`. Open-ended states (`end = null`) are treated as still-ongoing: their end is considered `+∞` in overlap mode (they satisfy any `end ≥ lo` condition). PatternCriterion Select sequences or extract witness rows based on an **ordered pattern** of string values in a feature column. Elements are matched in temporal order. from tanat.criterion import PatternCriterion, ANY, WILDCARD # A directly followed by B (adjacent). ids = pool.which(PatternCriterion(feature="status", pattern=["A", "B"])) # A before B with any number of rows in between. ids = pool.which(PatternCriterion(feature="status", pattern=["A", ANY, "B"])) # A, then exactly one element, then B. ids = pool.which(PatternCriterion(feature="status", pattern=["A", WILDCARD, "B"])) # Sequences that never contain A→B. ids = pool.which( PatternCriterion(feature="status", pattern=["A", "B"], present=False) ) # Keep only the witness rows (greedy first match). pool2 = pool.filter_entities( PatternCriterion(feature="status", pattern=["A", "B"]) ) Sentinels .. list-table:: :header-rows: 1 :widths: 20 20 60 * - Constant - Value - Description * - `ANY` - `"..."` - Matches **zero or more** elements: free gap between adjacent sub-patterns. * - `WILDCARD` - `"*"` - Matches **exactly one** element of any value. Parameters .. list-table:: :header-rows: 1 :widths: 25 15 60 * - Parameter - Type - Description * - `feature` - `str` - Name of the string feature column to match against. * - `pattern` - `str` | `list[str]` - Ordered pattern. A bare string is a single-element pattern. * - `present` - `bool` - `True` (default): pattern must be present. `False`: pattern must be absent. * - `regex` - `bool` - `True` (default): elements are regular expressions. `False`: literal substring matching. * - `case_sensitive` - `bool` - `True` (default): case-sensitive. `False`: case-insensitive. Entity-level behaviour * `present=True`: keeps the **greedy first-match witness rows** only. Each ID contributes at most `len(pattern)` rows; IDs with no match contribute 0 rows. * `present=False`: keeps all rows that are **not** witnesses. IDs with no match keep all their rows. LengthCriterion Select sequences by their **number of entity rows** (sequence length). from tanat.criterion import LengthCriterion # More than 6 entities. ids = pool.which(LengthCriterion(gt=6)) # Between 3 and 10 entities (inclusive on both ends). ids = pool.which(LengthCriterion(ge=3, le=10)) # Single match. ok = seq.match(LengthCriterion(ge=3, lt=20)) `filter_entities()` is **not supported**. .. list-table:: :header-rows: 1 :widths: 15 10 75 * - Parameter - Type - Description * - `gt` - `int` - Strictly greater than. * - `ge` - `int` - Greater than or equal to. * - `lt` - `int` - Strictly less than. * - `le` - `int` - Less than or equal to. At least one bound must be provided. Contradictory bounds (e.g. ``gt=5, lt=3`) raise `ValueError`` at construction time. RankCriterion Prune entity rows by their **0-based positional rank** within each sequence. from tanat.criterion import RankCriterion # Keep the first 3 entities. pool2 = pool.filter_entities(RankCriterion(first=3)) # Keep all except the last 2 entities. pool2 = pool.filter_entities(RankCriterion(first=-2)) # Keep the last 2 entities. pool2 = pool.filter_entities(RankCriterion(last=2)) # Python-slice: ranks 1, 2, 3 (0-based). pool2 = pool.filter_entities(RankCriterion(start=1, end=4)) # Every other entity. pool2 = pool.filter_entities(RankCriterion(step=2)) # First and last entity. pool2 = pool.filter_entities(RankCriterion(ranks=[0, -1])) # Relative to T0: entity at T0 and the one after it. pool.set_t0(position=0, anchor="start") pool2 = pool.filter_entities(RankCriterion(start=0, end=2, relative=True)) `which()` and `match()` are **not supported**. .. list-table:: :header-rows: 1 :widths: 20 15 65 * - Parameter - Type - Description * - `first` - `int` - Keep first N rows (`< 0` → all except last `|N|`). Cannot be 0. * - `last` - `int` - Keep last N rows (`< 0` → all except first `|N|`). Cannot be 0. * - `start` - `int` - Start rank inclusive (Python-style negative supported). * - `end` - `int` - End rank exclusive (Python-style negative supported). * - `step` - `int` - Sub-sample every N-th entity (≥ 1). Compatible with `start`/`end` or standalone. * - `ranks` - `list[int]` - Explicit 0-based positions (negative = from end). A single `int` is accepted. * - `relative` - `bool` - `False` (default): absolute ranks from start of sequence. `True`: ranks relative to T0 (requires `pool.set_t0()` first). Not compatible with `first`/`last`. Exactly one parameter group must be active at a time. Chaining criteria Criteria can be chained by passing the result of one operation as the target of the next. Each call returns a new pool view; the original is never modified. # 1. Select IDs matching a static condition. ids = pool.which(StaticCriterion(query=pl.col("age") > 50)) # 2. Restrict the pool to those IDs. pool2 = pool.subset(ids) # 3. Prune entity rows by time window. pool3 = pool2.filter_entities( TimeCriterion(start_ge=dt.datetime(2020, 1, 1), end_le=dt.datetime(2021, 1, 1)) ) # 4. Keep only the first 2 entities per sequence. pool4 = pool3.filter_entities(RankCriterion(first=2)) Alternatively, use `which()` results to drive multi-step pipelines: ids_long = pool.which(LengthCriterion(gt=5)) ids_error = pool.which(PatternCriterion(feature="status", pattern="error")) ids_target = ids_long & ids_error # set intersection pool_target = pool.subset(ids_target) See Also * : EntityCriterion examples. * : StaticCriterion examples. * : TimeCriterion examples. * : PatternCriterion examples. * : LengthCriterion examples. * : RankCriterion examples. * : Full operation reference (`which`, `filter_entities`, `subset`). ---------------------------------------- ## Manipulation .. role:: green .. role:: red Data Manipulation Reference for main operations available on sequence pools, trajectory pools, individual sequences, trajectories, and entities. Navigation Look up a single item by ID or row index; the return type depends on the container. # SequencePool: lookup a sequence by ID seq = pool["patient_001"] # TrajectoryPool: lookup a trajectory by ID traj = tpool["patient_001"] # Sequence: lookup an entity by position entity = seq[0] entity = seq[-1] # last entity Iteration All containers implement the standard Python iteration protocol. .. list-table:: :header-rows: 1 :widths: 36 16 16 16 16 * - Syntax → yields - SP - S - TP - T * - `for x in obj` → item - Sequence - Entity - Trajectory - alias (str) * - `obj.items()` → (key, item) pairs - :red:`✗` - :red:`✗` - (id, Trajectory) - (alias, Sequence) *SP: SequencePool · TP: TrajectoryPool · S: Sequence · T: Trajectory* for traj in tpool: # TP → Trajectory print(traj.id_value) for alias, seq in traj.items(): # T → (alias, Sequence) print(alias, len(seq)) for seq in pool: # SP → Sequence print(seq.id_value, len(seq)) for entity in seq: # S → Entity print(entity.temporal_extent, entity.data()) Subset Restrict a pool to a subset of IDs **without copying data**. view = pool.subset(ids=["id_001", "id_042", "id_099"]) print(len(view)) # 3 The returned object is a **view**: it shares the same underlying store. Changes to the view (casts, feature drops…) are visible through the view only. Feature Engineering All methods below operate **lazily**: transformations are applied on the fly at materialisation time and do not rewrite the store. Call `pool.save` to persist them. Add and remove columns Attach new columns to the view or hide existing ones; the underlying store is never rewritten. .. list-table:: :header-rows: 1 :widths: 40 10 50 * - Method - Scope - Description * - `pool.add_entity_features(df)` - SP - Append new entity-level columns. `df` must be positionally aligned with the **full** entity row set of the store. Blocked on filtered views; call `pool.save()` first. * - `pool.add_static_features(df)` - SP, TP - Append new static columns joined by ID. Works on filtered views. Pass `id_column` if the join key column has a non-standard name. * - `pool.drop_features(names, is_static)` - SP - Hide entity (default) or static features from the view. Pass `permanently=True` to also delete from disk. * - `tpool.drop_static_features(names)` - TP - Hide static features from a TrajectoryPool view. Pass `permanently=True` to also delete from disk. *SP: SequencePool · TP: TrajectoryPool* Type casting All casts are lazy and scoped to the current view. Call `pool.save` to persist. .. list-table:: :header-rows: 1 :widths: 49 9 44 * - Method - Scope - Description * - `pool.cast_features(schema, is_static)` - SP - Re-type entity (default) or static features. `schema` is a `dict[str, pl.DataType]`. * - `tpool.cast_static_features(schema)` - TP - Re-type static features. Entity features must be cast on each linked sequence pool directly. * - `pool.cast_to_datetime(unit, time_zone)` - SP, TP - Convert the time index to `pl.Datetime`. *unit*: `"s"` / `"ms"` / `"us"` (default) / `"ns"`. On TP the cast propagates to all linked sequence pools. * - `pool.cast_to_timestep(dtype)` - SP, TP - Convert the time index to an integer type (e.g. `pl.Int64`). Cannot be applied if the time index is already in Datetime format. *SP: SequencePool · TP: TrajectoryPool* import polars as pl # SequencePool: cast entity feature pool.cast_features({"status": pl.Categorical}) # SequencePool: cast static feature pool.cast_features({"age": pl.UInt8}, is_static=True) # TrajectoryPool: cast static feature (different method name!) tpool.cast_static_features({"group": pl.Categorical}) # Both: convert time index pool.cast_to_datetime(unit="us", time_zone="UTC") pool.cast_to_timestep(pl.Int32) # Drop (SP only with is_static; TP: drop_static_features) pool.drop_features(["flag_valid"], is_static=False) tpool.drop_static_features(["debug_col"]) Transformation All methods in this section return a **new DataFrame** and do not modify the pool. `apply`: evaluate an expression Evaluate a Polars expression against the pool's temporal **or static** data. Available on SP and TP. Pass `is_static=True` to target static features. At pool level, `by_id=True` groups the evaluation per ID, making it ideal for deriving per-sequence aggregates. A natural follow-up is to pipe the result directly into `add_static_features` or `add_entity_features`: # Per-sequence mean → attach as a static feature means = pool.apply(pl.col("value").mean().alias("value_mean"), by_id=True) pool.add_static_features(means) # Without by_id: expression runs over the full temporal data flags = pool.apply(pl.col("value") > 0) # On static data result = pool.apply(pl.col("age") > 65, is_static=True) # Works on TrajectoryPool too stats = tpool.apply(pl.col("score").max().alias("score_max"), by_id=True) *SP: SequencePool · TP: TrajectoryPool* `to_dummies`: one-hot encode One-hot encode one or more `Categorical` features into binary indicator columns. Pass `is_static=True` to target static features instead of entity features. # Entity features (default) dummies = pool.to_dummies(["status", "category"]) # Static features dummies = pool.to_dummies(["site"], is_static=True) `binned_data` / `to_tensor`: regular time bins Project temporal features onto a regular time grid. * `tanat.sequence.base.pool.SequencePool.binned_data` returns a long-format DataFrame (pandas or polars). Useful for exploration, joins, and plotting. * `tanat.sequence.base.pool.SequencePool.to_tensor` returns a dense `(N, M, K)` ndarray together with IDs and K-axis feature labels. Useful for ML pipelines. # Long-format dataframe df = pool.binned_data(features=["value", "score"], bin_size="1d") # ML-ready tensor with IDs and feature names arr, ids, feature_names = pool.to_tensor(features=["value", "score"], bin_size="1d") Descriptive Statistics # One row per sequence (length, temporal span, …) pool.describe() # Cross-ID aggregated stats (equivalent to pandas .describe()) pool.describe(by_id=False) # Attach stats as static features (side-effect) pool.describe(add_to_static=True) # Single sequence seq = pool[pool.unique_ids[0]] seq.describe() # TrajectoryPool: one row per trajectory, columns prefixed by alias tpool.describe() Persistence Transformations are **lazy** by default. Save a snapshot to make them permanent or to share a modified pool. # Save under a new name (returns the new store path) saved_path = pool.save("my_pool_optimised", overwrite=True) # Copy the pool in-memory (deep copy of settings, same store) clone = pool.copy() Composition `extend` Merge another pool into the current one. Two execution paths: .. list-table:: :header-rows: 1 :widths: 40 60 * - Situation - Behaviour * - Both pools share the **same store** - Fast path: union of ID masks, no I/O * - Different stores - Cross-store: rebuilds a new store on disk; `destination` is required # Same-store fast path (e.g. after train_test_split) train, test = pool.train_test_split(test_size=0.3) merged = train.extend(test) # Cross-store merge pool_a.extend(pool_b, destination="merged_pool", on_duplicate="skip", overwrite=True) `train_test_split` Split a pool by **unique IDs**. The interface mirrors `sklearn.model_selection.train_test_split`. train, test = pool.train_test_split(test_size=0.2, random_state=42) # Guarantee: zero ID overlap assert not set(train.unique_ids) & set(test.unique_ids) Type Conversion Convert a pool between the three sequence types. The conversion is always **view-level**: the original store is not modified. .. list-table:: :header-rows: 1 :widths: 30 40 * - Method - Converts to * - `pool.as_event()` - `tanat.sequence.type.event.pool.EventSequencePool` * - `pool.as_interval()` - `tanat.sequence.type.interval.pool.IntervalSequencePool` * - `pool.as_state()` - `tanat.sequence.type.state.pool.StateSequencePool` event_view = interval_pool.as_event() # treat interval start as event time Temporal Alignment See for the full T0 reference. # Set a reference date using the position strategy pool.set_t0(position=0, anchor="start") # Retrieve T0 values as a DataFrame pool.t0_data() # Sequence-level properties (available after set_t0) seq = pool[pool.unique_ids[0]] seq.t0 # T0 value for this sequence seq.t0_nearest_rank # 0-based index of the entity at or just before T0 See Also * - How to build and load pools. * - T0 strategies and temporal alignment. * - Inspect dtypes, feature info, and cast methods. ---------------------------------------- ## Metadata .. role:: green .. role:: red Metadata TanaT automatically infers rich metadata from your data at build time. Metadata is attached to every pool via `pool.metadata` and describes the time index, entity features, and static features. Metadata Objects `pool.metadata` on a `tanat.sequence.base.pool.SequencePool` returns a `tanat.metadata.sequence.SequenceMetadata` instance; on a `tanat.trajectory.pool.TrajectoryPool` it returns a `tanat.metadata.trajectory.TrajectoryMetadata`. print(pool.metadata) # human-readable summary pool.metadata.time_index # TimeIndexInfo (dtype, range, tz…) pool.metadata.entity_features # list[FeatureInfo], alphabetical pool.metadata.static_features # list[FeatureInfo] | None Both objects expose `is_categorical_feature(name)`; `tanat.metadata.sequence.SequenceMetadata` also has `is_numeric_feature`, `is_datetime_feature`, and `is_duration_feature`. All raise `KeyError` for unknown feature names. Feature Types TanaT maps each Polars dtype to a `tanat.metadata.feature.FeatureInfo` subclass with type-specific extra attributes: .. list-table:: :header-rows: 1 :widths: 30 30 40 * - Class - Polars dtypes - Extra attributes * - `tanat.metadata.feature.NumericalInfo` - integers, floats - `min`, `max` * - `tanat.metadata.feature.CategoricalInfo` - `Categorical`, `Enum` - `n_unique`, `ordered` * - `tanat.metadata.feature.BooleanInfo` - `Boolean` - `true_count`, `false_count` * - `tanat.metadata.feature.StringInfo` - `String` - `min_length`, `max_length` * - `tanat.metadata.feature.TemporalInfo` - `Date`, `Datetime`, `Duration` - `min`, `max`, `is_duration` * - `tanat.metadata.feature.ArrayInfo` - `Array`, `List` - `dimension` info = pool.metadata.feature_info("status") print(info.summary) # e.g. "Categorical (5 categories)" Cast Methods Casts are **lazy and view-local**: they are applied on the fly when data is materialised, and do not touch the store files. Call `pool.save()` to persist them. .. note:: Cast methods are only available at **Pool level** (`SequencePool`, `TrajectoryPool`). Casting directly on a `Sequence`, `Trajectory`, or `Entity` is intentionally not supported: those objects are views derived from a pool, and mutating their types independently would desynchronise them from their siblings in the pool. SequencePool .. rubric:: cast_features() Cast one or more entity or static feature columns. # Entity features (default) pool.cast_features({"status": pl.Categorical}) pool.cast_features({"response_time": pl.Duration("ms")}) pool.cast_features({"severity": pl.Enum(["low", "medium", "high"])}) # Static features pool.cast_features({"age": pl.UInt8, "group": pl.Categorical}, is_static=True) .. rubric:: cast_id() Cast the sequence ID column. pool.cast_id(pl.Categorical) .. rubric:: cast_to_datetime() / cast_to_timestep() Change the type of the time index. pool.cast_to_datetime() # us, no timezone pool.cast_to_datetime(unit="ms", time_zone="UTC") pool.cast_to_timestep(pl.UInt32) .. note:: `cast_to_timestep()` raises `TypeError` if the time index is already a `Datetime` type. TrajectoryPool Entity features live inside each linked sequence store; cast them directly on `tpool.sequence_pools[""]`. .. rubric:: cast_static_features() Cast trajectory-level static features. tpool.cast_static_features({"group": pl.Categorical}) # For entity features, go through the sequence pool: tpool.sequence_pools["pharmacy"].cast_features({"medication": pl.Categorical}) .. rubric:: cast_id() / cast_to_datetime() / cast_to_timestep() Same signatures as on `tanat.sequence.base.pool.SequencePool`, but the cast is **automatically propagated to all linked sequence pools**. tpool.cast_id(pl.Categorical) # propagates to all sub-pools tpool.cast_to_datetime(unit="ms", time_zone="UTC") # idem tpool.cast_to_timestep(pl.UInt32) # idem Compatibility Matrix .. list-table:: :header-rows: 1 :widths: 42 14 14 * - Method - `SequencePool` - `TrajectoryPool` * - `cast_features(schema, is_static=False)` - :green:`✓` - :red:`✗` * - `cast_static_features(schema)` - :red:`✗` - :green:`✓` * - `cast_id(dtype)` - :green:`✓` - :green:`✓` * - `cast_to_datetime(unit, time_zone)` - :green:`✓` - :green:`✓` * - `cast_to_timestep(dtype)` - :green:`✓` - :green:`✓` See Also * `tanat.metadata`: full API for all metadata classes * `tanat.sequence.base.pool.SequencePool.cast_features` * `tanat.trajectory.pool.TrajectoryPool.cast_static_features` ---------------------------------------- ## Metrics Metrics **Philosophy: Hierarchical Composition** The metrics module implements a three-tier hierarchy of distance functions: 1. **EntityMetric**: Single point-in-time comparisons (categorical feature equality) 2. **SequenceMetric**: Built on EntityMetric, aligned/aggregated across timesteps 3. **TrajectoryMetric**: Aggregates SequenceMetrics across multiple sequence types This composition allows flexible metric selection at each level while maintaining consistent interfaces. Entity Metrics Entity metrics compare individual states or events on categorical features. .. list-table:: :header-rows: 1 :widths: 30 70 * - Metric - Purpose * - `tanat.metric.entity.HammingEntityMetric` - Categorical mismatch counter; building block for all sequence metrics Sequence Metrics Sequence metrics operate on entire sequences, leveraging an EntityMetric as a foundation. .. list-table:: :header-rows: 1 :widths: 30 70 * - Metric - Purpose * - `tanat.metric.sequence.LinearPairwiseSequenceMetric` - Position-wise alignment with configurable aggregation (mean, max, sum) * - `tanat.metric.sequence.EditSequenceMetric` - Needleman-Wunsch edit distance; insertions, deletions, substitutions * - `tanat.metric.sequence.LCPSequenceMetric` - Longest Common Prefix distance between sequences * - `tanat.metric.sequence.LCSSequenceMetric` - Longest Common Subsequence distance * - `tanat.metric.sequence.DTWSequenceMetric` - Dynamic Time Warping; flexible time alignment * - `tanat.metric.sequence.SoftDTWSequenceMetric` - Differentiable DTW variant for optimization * - `tanat.metric.sequence.Chi2SequenceMetric` - Chi-squared distance between state-time distributions Trajectory Metrics Trajectory metrics compare trajectories (multi-sequence entities) by aggregating sequence-level distances. .. list-table:: :header-rows: 1 :widths: 40 60 * - Metric - Purpose * - `tanat.metric.AggregationTrajectoryMetric` - Compute per-alias SequenceMetric, then aggregate via mean/min/max/sum **Key feature:** Use different SequenceMetrics for different sequence types (aliases). For example, use EditSequenceMetric for states and LCSSequenceMetric for events, then aggregate the results. Distance Matrix `tanat.metric.DistanceMatrix` is a matrix wrapper that stores pairwise distances and pool IDs, enabling efficient cluster operations. SequenceMetric classes provide a `tanat.metric.SequenceMetric.compute_matrix` method to compute the full pairwise distance matrix: metric = EditSequenceMetric(entity_metric=hamming, normalize=True) distance_matrix = metric.compute_matrix(pool) Similarly, TrajectoryMetric classes have a `tanat.metric.TrajectoryMetric.compute_matrix` method that computes the full distance matrix across multiple trajectories. See Also * - Entity metric examples * - Sequence metric examples * - Trajectory metric examples * - Clustering algorithms that operate on sequences or trajectories ---------------------------------------- ## Zeroing Zeroing & Alignment **Zeroing** aligns sequences to a common reference date (T0 / index date), transforming absolute timestamps into relative ones. This is essential when comparing sequences across individuals who were observed at different calendar times, for example aligning patients to their first hospitalisation or users to their registration date. Once `set_t0` is called on a pool, every `tanat.sequence.base.sequence.Sequence` object automatically exposes `seq.t0` and `seq.t0_nearest_rank`. Strategies Four strategies are available. Pass exactly **one** keyword to `set_t0`. .. list-table:: :header-rows: 1 :widths: 15 25 60 * - Strategy - Keyword - Description * - `position` - `set_t0(position=N)` - T0 = temporal value at row index `N` (0-based; negative indices are supported: `-1` is the last row). For interval and state pools, the `anchor` parameter selects the reference point within the period (`"start"`, `"end"`, or `"middle"`). * - `direct` - `set_t0(direct=value)` - T0 = the same scalar timestamp for every sequence. Alternatively, pass a `dict[id, timestamp]` to assign a different T0 per individual; IDs absent from the dict receive `_T0_ = null`. * - `feature` - `set_t0(feature="col")` - T0 = the value of a **static feature column**. The feature dtype must exactly match the pool's temporal dtype; use `cast_features` to align if needed. IDs with a `null` static value receive `_T0_ = null`. * - `query` - `set_t0(query=expr)` - T0 = temporal value of the **first** (or last, with `use_first=False`) entity row where the Polars expression evaluates to `True`. The `anchor` parameter determines the reference point within the matched period. Sequences with no matching row receive `_T0_ = null`. The `anchor` parameter (`"start"` | `"end"` | `"middle"`) applies only to the `position` and `query` strategies, and only for `tanat.sequence.type.interval.pool.IntervalSequencePool` and `tanat.sequence.type.state.pool.StateSequencePool`; it is ignored otherwise. Usage import polars as pl from datetime import datetime # position: first row, start of interval pool.set_t0(position=0, anchor="start") # position: last row, end of interval pool.set_t0(position=-1, anchor="end") # direct: same T0 for all sequences pool.set_t0(direct=datetime(2000, 1, 1)) # direct: per-id mapping pool.set_t0(direct={ "pat_01": datetime(2020, 3, 15), "pat_02": datetime(2021, 6, 1), }) # feature: read T0 from a static column pool.cast_features({"registration_date": pl.Datetime("us")}, is_static=True) pool.set_t0(feature="registration_date") # query: first row where status == "error" pool.set_t0(query=pl.col("status") == "error", anchor="start", use_first=True) # query: last row where value > 0.9 pool.set_t0(query=pl.col("value") > 0.9, anchor="end", use_first=False) Pool-Level Inspection `pool.t0_data()` returns the full T0 table as a DataFrame with columns `[id, _T0_, _T0_NEAREST_RANK_]`. The `_T0_NEAREST_RANK_` column holds the 0-based index of the entity whose temporal start is the **floor** value at or just before T0. It is computed from the `start` column regardless of the `anchor` used in `set_t0`. pool.set_t0(position=0, anchor="start") # pandas (default) pool.t0_data().head() Sequence-Level Properties Once `set_t0` has been called on the pool, every sequence object exposes two read-only properties: .. list-table:: :header-rows: 1 :widths: 30 20 50 * - Property - Type - Description * - `seq.t0` - scalar or `None` - T0 value for this sequence. `None` when T0 could not be determined (sequence too short, no query match, `null` static feature value…). * - `seq.t0_nearest_rank` - `int` or `None` - 0-based index of the entity at or just before T0. `None` when `seq.t0` is `None`. seq = pool[pool.unique_ids[0]] print(seq.t0) # e.g. datetime(2020, 3, 15, ...) print(seq.t0_nearest_rank) # e.g. 2 Trajectory-Level Zeroing `TrajectoryPool.set_t0` accepts the same four strategy keywords as the sequence-level `set_t0`, plus an additional `on=` parameter that selects the **reference sub-pool** from which T0 is computed. The `on=` parameter Strategies that inspect temporal rows (`position`, `query`) **require** `on=` because the row index or filter expression is evaluated against a specific sub-pool. Strategies that do not read rows (`direct`, `feature`) do **not** need `on=`; if provided it is ignored with a warning. # position: first admission, start of interval tpool.set_t0(position=0, anchor="start", on="admissions") # direct: no on= needed tpool.set_t0(direct=datetime(2010, 6, 1)) # feature: trajectory-level static column tpool.set_t0(feature="admission_date") # query: first lab matching a condition tpool.set_t0(query=pl.col("status") == "error", on="labs") Trajectory-Level Inspection `tpool.t0_data()` returns one row per trajectory with columns `[id, _T0_, _T0_NEAREST_RANK_, _T0_NEAREST_RANK_, ...]`. Each sub-pool gets its **own** nearest-rank column because the floor-index depends on each pool's temporal grid. The column is named `_T0_NEAREST_RANK_` (alias prefix, then the constant suffix). tpool.set_t0(position=0, anchor="start", on="admissions") tpool.t0_data().head() # columns: id, _T0_, admissions_T0_NEAREST_RANK_, labs_T0_NEAREST_RANK_, ... Trajectory Properties Once `set_t0` has been called on the trajectory pool, every `tanat.trajectory.trajectory.Trajectory` exposes: .. list-table:: :header-rows: 1 :widths: 30 20 50 * - Property - Type - Description * - `traj.t0` - scalar or `None` - T0 for this trajectory. `None` when no T0 could be determined. * - `traj.t0_nearest_rank` - `dict[str, int | None]` - Per-alias floor index: `{"admissions": 0, "labs": 2, ...}`. `None` per alias when `traj.t0` is `None`. traj = tpool[tpool.unique_ids[0]] print(traj.t0) # e.g. datetime(2020, 3, 15, ...) print(traj.t0_nearest_rank) # e.g. {'admissions': 0, 'labs': 2, 'phases': 1} T0 is shared across all children A single `tpool.set_t0(...)` call is enough. Every object you retrieve from the pool (a sub-pool, a trajectory, or an individual sequence) automatically returns the same `t0` value. `t0_nearest_rank` still varies: each pool computes its floor index on its own temporal grid. tpool.set_t0(position=0, anchor="start", on="admissions") traj = tpool[tpool.unique_ids[0]] print(traj.t0) # e.g. datetime(2020, 3, 15, ...) seq = traj["labs"] print(seq.t0) # == traj.t0 print(seq.t0_nearest_rank) # floor index on the labs temporal grid tpool.sequence_pools["labs"].t0_data().head() # _T0_ column == traj.t0 Null Handling A sequence receives `_T0_ = null` in any of these situations: * **position** - the index is out of range for that sequence. * **direct (dict)** - the sequence ID is not a key in the dict. * **feature** - the static feature value is `null` for that ID. * **query** - no entity row matches the expression (or the sequence is empty). Sequences with `null` T0 are **not dropped** from the pool. `seq.t0` returns `None` and `seq.t0_nearest_rank` returns `None` for those individuals. To inspect how many sequences are affected: null_count = pool.t0_data()["_T0_"].isnull().sum() print(f"{null_count}/{len(pool)} sequences with T0 = null") See Also * - Full operation reference including `set_t0` and `t0_data`. * - Set T0 on sequence level. * - Set T0 on trajectory level. ---------------------------------------- ## Clustering: CLARA (Clustering LARge Applications) """ Clustering: CLARA (Clustering LARge Applications) ================================================== This example demonstrates CLARA clustering, a scalable variant of PAM that works on large datasets by sampling subsets of the data for medoid selection. """ # Setup # ----- import polars as pl from tanat import build_states from tanat.clustering import CLARAClusterer from tanat.dataset import simulate_states from tanat.metric.entity import HammingEntityMetric from tanat.metric.sequence import EditSequenceMetric # Generate synthetic data # ----------------------- N_IDS = 100 SEED = 42 raw_df = simulate_states( n_ids=N_IDS, seq_length_range=(3, 8), features=["score", "status"], seed=SEED, ) pool = build_states( temporal_data=raw_df, id_column="id", start_column="start", end_column="end", ) # Cast features to categorical pool.cast_features({"status": pl.Categorical}) print(pool) # Define the metric used by the clusterer # --------------------------------------- hamming = HammingEntityMetric(entity_feature="status") metric = EditSequenceMetric(entity_metric=hamming, normalize=True) # Perform CLARA clustering # ------------------------- n_clusters = 5 n_samples = 40 # subset size per PAM instance n_iterations = 3 # number of PAM instances clusterer = CLARAClusterer( metric=metric, n_clusters=n_clusters, sampling_ratio=n_samples / N_IDS, nb_pam_instances=n_iterations, random_state=SEED, ) clusterer.fit(pool) # Clustering results print(clusterer) # Inspect cluster assignments and medoids # ---------------------------------------- print("\nMedoids (representative sequences):") for i, medoid_id in enumerate(clusterer.medoids): print(f" Cluster {i}: {medoid_id}") print("\nCluster assignments injected as static features:") print(pool.static_data().head()) ---------------------------------------- ## Clustering: Hierarchical Clustering """ Clustering: Hierarchical Clustering ==================================== This example demonstrates hierarchical clustering on a pool of sequences. """ import polars as pl from tanat import build_states from tanat.clustering import HierarchicalClusterer from tanat.dataset import simulate_states from tanat.metric.entity import HammingEntityMetric from tanat.metric.sequence import EditSequenceMetric # Generate synthetic data # ----------------------- N_IDS = 50 SEED = 42 raw_df = simulate_states( n_ids=N_IDS, seq_length_range=(3, 8), features=["score", "status"], seed=SEED, ) pool = build_states( temporal_data=raw_df, id_column="id", start_column="start", end_column="end", ) # Cast features to categorical pool.cast_features({"status": pl.Categorical}) print(pool) # Define the metric used by the clusterer # --------------------------------------- hamming = HammingEntityMetric(entity_feature="status") metric = EditSequenceMetric(entity_metric=hamming, normalize=True) # Perform hierarchical clustering # -------------------------------- clusterer = HierarchicalClusterer( metric=metric, n_clusters=4, ) clusterer.fit(pool) # Clustering results print(clusterer) # Inspect cluster assignments # ---------------------------- print("\nCluster assignments injected as static features:") print(pool.static_data().head()) ---------------------------------------- ## Clustering: Partitioning Around Medoids (PAM) """ Clustering: Partitioning Around Medoids (PAM) ============================================== This example demonstrates Partitioning Around Medoids (PAM) clustering, which finds k representative objects (medoids) that minimize total distance to all assigned objects. """ # Setup # ----- import polars as pl from tanat import build_states from tanat.clustering import PAMClusterer from tanat.dataset import simulate_states from tanat.metric.entity import HammingEntityMetric from tanat.metric.sequence import EditSequenceMetric # Generate synthetic data # ----------------------- N_IDS = 50 SEED = 42 raw_df = simulate_states( n_ids=N_IDS, seq_length_range=(3, 8), features=["score", "status"], seed=SEED, ) pool = build_states( temporal_data=raw_df, id_column="id", start_column="start", end_column="end", ) # Cast features to categorical pool.cast_features({"status": pl.Categorical}) print(pool) # Define the metric used by the clusterer # --------------------------------------- hamming = HammingEntityMetric(entity_feature="status") metric = EditSequenceMetric(entity_metric=hamming, normalize=True) # Perform PAM clustering # ---------------------- n_clusters = 5 clusterer = PAMClusterer(metric=metric, n_clusters=n_clusters) clusterer.fit(pool) # Clustering results print(clusterer) # Inspect cluster assignments and medoids # ---------------------------------------- print("\nMedoids (representative sequences):") for i, medoid_id in enumerate(clusterer.medoids): print(f" Cluster {i}: {medoid_id}") print("\nCluster assignments injected as static features:") print(pool.static_data().head()) ---------------------------------------- ## The Three Types of Sequences """ The Three Types of Sequences ============================= *TanaT* supports three types of temporal sequences depending on how each entity's temporal extent is defined: .. list-table:: :header-rows: 1 :widths: 15 30 35 20 * - Type - Temporal extent - Key constraint - Builder * - **Event** - Single timestamp - None - :func:`~tanat.sequence.shortcuts.build_events` * - **Interval** - ``[start, end]`` - Overlaps and gaps are **allowed** - :func:`~tanat.sequence.shortcuts.build_intervals` * - **State** - ``[start, end]`` - **Contiguous**, no overlap, no gap - :func:`~tanat.sequence.shortcuts.build_states` This example walks through each type step by step: data simulation, pool construction, pool-level exploration, navigation down to individual sequences and entities, and a comparison of the ``temporal_extent`` at the entity level. For a broader conceptual introduction see :doc:`/getting-started/concepts`. """ Imports ~~~~~~~ Each type has its own shortcut builder. All three live in the same :mod:`tanat` namespace. from tanat import build_events, build_intervals, build_states from tanat.dataset import ( simulate_events, simulate_intervals, simulate_states, simulate_static, ) ------------------------- 1. Event Sequences ------------------------- An **event** is a point-in-time observation: it has *one* timestamp and no duration. Think of medical visits, user clicks, or purchase records. Simulate data ^^^^^^^^^^^^^ :func:`~tanat.dataset.simulation.events.simulate_events` returns a ``DataFrame`` with columns ``id``, ``time``, and one column per feature. events_data = simulate_events( n_ids=10, features=["value", "category"], seed=42, ) events_data.head() Build the pool ^^^^^^^^^^^^^^ :func:`~tanat.sequence.shortcuts.build_events` needs at minimum: - ``id_column``: the column that identifies each sequence - ``time_column``: the column containing the event timestamp All remaining columns are automatically inferred as entity features. events_pool = build_events( temporal_data=events_data, id_column="id", time_column="time", ) print(events_pool) Explore the pool ^^^^^^^^^^^^^^^^ print(f"Number of sequences : {len(events_pool)}") print(f"First IDs : {events_pool.unique_ids[:5]}") # Temporal data of the pool in tabular form (one row = one entity) events_pool.temporal_data().head() Navigate to a sequence then to an entity ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ # Index the pool by ID to get an EventSequence event_seq = events_pool[events_pool.unique_ids[0]] print(event_seq) print(f"→ {len(event_seq)} events for ID {event_seq.id_value!r}") # Index the sequence by integer to get an EventEntity event_entity = event_seq[0] # first event print(event_entity) # At the entity level the temporal extent is a **single timestamp** print("features :", event_entity.data()) print("temporal span :", event_entity.temporal_extent) # single date/time value print("feature value :", event_entity["value"]) Iterate ^^^^^^^ # Pool → one sequence per ID for seq in events_pool.subset(events_pool.unique_ids[:3]): print(f" ID {seq.id_value!r}: {len(seq)} events") # Sequence → one entity per row for entity in event_seq: print(f" t={entity.temporal_extent} value={entity['value']}") Static features ^^^^^^^^^^^^^^^ Per-sequence static data (age, group, …) can be attached at build time via ``static_data``, or added later with :func:`~tanat.sequence.base.pool.SequencePool.add_static_features`. Static features are shared by all entities of a given sequence. # Generate one row of static attributes per sequence ID static_df = simulate_static(n_ids=10, features=["age", "group"], seed=0) static_df.head() # Option 1: attach at build time events_pool_with_static = build_events( temporal_data=events_data, id_column="id", time_column="time", static_data=static_df, ) events_pool_with_static.static_data().head() # Option 2: add to an existing pool in place events_pool.add_static_features(static_df) events_pool.static_data().head() # Static data is also accessible per-sequence (single row) events_pool[events_pool.unique_ids[0]].static_data() ------------------------- 2. Interval Sequences ------------------------- An **interval** spans a period of time with a ``start`` and an ``end``. Unlike states, intervals are **not** required to be contiguous: two intervals can **overlap** and **gaps** between them are allowed. Think of overlapping treatments, project assignments, or sensor readings. Simulate data ^^^^^^^^^^^^^ :func:`~tanat.dataset.simulation.intervals.simulate_intervals` produces a ``DataFrame`` with ``id``, ``start``, ``end``, and feature columns. intervals_data = simulate_intervals( n_ids=10, features=["value", "category"], seed=42, ) intervals_data.head() Build the pool ^^^^^^^^^^^^^^ :func:`~tanat.sequence.shortcuts.build_intervals` needs: - ``id_column``: sequence identifier - ``start_column``: interval start - ``end_column``: interval end intervals_pool = build_intervals( temporal_data=intervals_data, id_column="id", start_column="start", end_column="end", ) print(intervals_pool) Navigate to a sequence then to an entity ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ interval_seq = intervals_pool[intervals_pool.unique_ids[0]] print(f"→ {len(interval_seq)} intervals for ID {interval_seq.id_value!r}") interval_entity = interval_seq[0] print(interval_entity) # The temporal extent is now a **(start, end) pair** print("features :", interval_entity.data()) print("temporal span :", interval_entity.temporal_extent) # (start, end) print("feature value :", interval_entity["value"]) ------------------------- 3. State Sequences ------------------------- A **state sequence** partitions the timeline into **contiguous, non-overlapping** periods: ``end[i] == start[i+1]`` within every sequence. The individual is in *exactly one state* at any point in time. Think of disease stages, employment status, or device modes. Simulate data ^^^^^^^^^^^^^ :func:`~tanat.dataset.simulation.states.simulate_states` guarantees strict continuity: ``end[i] == start[i+1]`` by construction. states_data = simulate_states( n_ids=10, features=["value", "category"], seed=42, ) states_data.head() Build the pool ^^^^^^^^^^^^^^ :func:`~tanat.sequence.shortcuts.build_states` accepts the same ``start_column`` / ``end_column`` pair as :func:`~tanat.sequence.shortcuts.build_intervals`. .. note:: ``end_column`` is **optional** for state sequences. When omitted, the end of state *i* is automatically derived from the start of state *i+1*. The last state per sequence will have ``end = null`` unless you supply an explicit sentinel value to the builder. states_pool = build_states( temporal_data=states_data, id_column="id", start_column="start", end_column="end", # optional: omit to let TanaT infer it ) print(states_pool) Navigate to a sequence then to an entity ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ state_seq = states_pool[states_pool.unique_ids[0]] print(f"→ {len(state_seq)} states for ID {state_seq.id_value!r}") state_entity = state_seq[0] print(state_entity) # Like intervals, the temporal extent is a **(start, end) pair** print("features :", state_entity.data()) print("temporal span :", state_entity.temporal_extent) # (start, end) print("feature value :", state_entity["value"]) ------------------------- 4. Side-by-side comparison ------------------------- To summarise the differences, we build all three pools from the *same* underlying dataset (states data, which contains both ``start`` and ``end`` columns) and compare the ``temporal_extent`` of the first entity of the first sequence. # Re-use states_data for all three types so the raw data is identical common_events = build_events( temporal_data=states_data, id_column="id", time_column="start", # use start as the single event timestamp ) common_intervals = build_intervals( temporal_data=states_data, id_column="id", start_column="start", end_column="end", ) common_states = build_states( temporal_data=states_data, id_column="id", start_column="start", end_column="end", ) first_id = common_events.unique_ids[0] for label, pool in [ ("Event ", common_events), ("Interval", common_intervals), ("State ", common_states), ]: entity = pool[first_id][0] print(f"{label} → temporal_extent: {entity.temporal_extent}") # .. note:: # - ``Event`` : a single timestamp (no duration) # - ``Interval`` : a ``(start, end)`` pair; gaps and overlaps are allowed # - ``State`` : a ``(start, end)`` pair; ``end[i] == start[i+1]`` # is guaranteed by construction ---------------------------------------- ## Trajectories """ Trajectories ============= We illustrate here how to build a :class:`~tanat.trajectory.TrajectoryPool` by composing several sequence pools, then navigate from the pool down to an individual trajectory, its sub-sequences. A **trajectory** groups all sequences belonging to the same individual across multiple temporal dimensions (e.g. visits, treatments, lab results). A **trajectory pool** aggregates trajectories across an entire cohort. Each sequence pool is registered under an **alias** that acts as the key for retrieval:: tpool.sequence_pools["events"] → EventSequencePool (full pool) tpool[id] → Trajectory (one individual) tpool[id]["events"] → EventSequence (one sequence) tpool[id]["events"][0] → EventEntity (one entity) """ Imports ~~~~~~~ from tanat import build_events, build_intervals, build_states, build_trajectories Simulate data ~~~~~~~~~~~~~ :func:`~tanat.dataset.simulation.trajectories.simulate_trajectories` is a convenience wrapper that calls each ``simulate_*`` function in one shot and guarantees a **shared ID space** across all sequence types. from tanat.dataset import simulate_trajectories, simulate_static data = simulate_trajectories( sequences={ "events": {"type": "event", "n_ids": 50, "features": ["value", "category"]}, "intervals": { "type": "interval", "n_ids": 50, "features": ["duration_days", "label"], }, "states": {"type": "state", "n_ids": 50, "features": ["score", "status"]}, }, shared_ids=True, seed=42, ) # Each value is a plain DataFrame. print("events :", data["events"].shape) print("intervals:", data["intervals"].shape) print("states :", data["states"].shape) Build the sequence pools ~~~~~~~~~~~~~~~~~~~~~~~~ Each pool is built independently with its own ``build_*`` shortcut (:func:`~tanat.sequence.shortcuts.build_events`, :func:`~tanat.sequence.shortcuts.build_intervals`, :func:`~tanat.sequence.shortcuts.build_states`). event_pool = build_events( temporal_data=data["events"], id_column="id", time_column="time", ) interval_pool = build_intervals( temporal_data=data["intervals"], id_column="id", start_column="start", end_column="end", ) state_pool = build_states( temporal_data=data["states"], id_column="id", start_column="start", end_column="end", ) Build the trajectory pool ~~~~~~~~~~~~~~~~~~~~~~~~~ :func:`~tanat.trajectory.shortcuts.build_trajectories` composes the pools under their aliases. The alias becomes the key used to retrieve a sub-sequence from a trajectory (``traj["events"]``). tpool = build_trajectories( pools={ "events": event_pool, "intervals": interval_pool, "states": state_pool, }, ) print(tpool) Explore the trajectory pool ~~~~~~~~~~~~~~~~~~~~~~~~~~~ print(f"Trajectories : {len(tpool)}") print(f"First IDs : {tpool.unique_ids[:5]}") Access one of the sequence pool ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ The underlying sequence pools are accessible as a read-only mapping `tpool.sequence_pools`. # To access the pool with the alias `states`: print(tpool.sequence_pools["states"]) Access a trajectory of the trajectory pool ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ``tpool[id]`` returns a :class:`~tanat.trajectory.Trajectory`, a lightweight view over all sub-sequences for that individual. traj = tpool[tpool.unique_ids[0]] print(traj) Sequences of a trajectory ~~~~~~~~~~~~~~~~~~~~~~~ Use the alias as the key to retrieve the sequence of an individual trajectory. event_seq = traj["events"] interval_seq = traj["intervals"] state_seq = traj["states"] print(f"events : {len(event_seq)} events") print(f"intervals : {len(interval_seq)} intervals") print(f"states : {len(state_seq)} states") print(event_seq) print(interval_seq) print(state_seq) Static features ~~~~~~~~~~~~~~~ Per-trajectory static data (age, group, ...) is passed at build time via :func:`~tanat.trajectory.shortcuts.build_trajectories`. It is then accessible on the pool and on individual trajectories. # Generate a static DataFrame matching the shared ID space static_df = simulate_static(n_ids=50, features=["age", "group"], seed=0) static_df.head() tpool_with_static = build_trajectories( pools={ "events": event_pool, "intervals": interval_pool, "states": state_pool, }, static_data=static_df, id_column="id", ) # Access to static data is similar for trajectory pools than for sequences pools. tpool_with_static.static_data().head() # Static data is also accessible per-trajectory (single row) tpool_with_static[tpool_with_static.unique_ids[0]].static_data() # .. note:: # If a sequence pool combined to create a trajectory pool contains static features # they are kept in the sequence pool but not visible at the trajectiry level. Iteration ~~~~~~~~~ All pool and trajectory objects are iterable. - :func:`~tanat.trajectory.pool.TrajectoryPool.sequence_pools` yields :class:`~tanat.sequence.pool.SequencePool` - :class:`~tanat.trajectory.pool.TrajectoryPool` yields :class:`~tanat.trajectory.trajectory.Trajectory` objects; ``.items()`` gives ``(id, trajectory)`` pairs. - :class:`~tanat.trajectory.trajectory.Trajectory` yields its aliases (string keys); ``.items()`` gives ``(alias, sequence)`` pairs. # TrajectoryPool → SequencePool for seq_pool in tpool.sequence_pools: print(f" {len(seq_pool)}") # TrajectoryPool → Trajectory for t in tpool: print(f" {t.id_value}: sequences={list(t)}") # TrajectoryPool.items() → (id, Trajectory) pairs for tid, t in tpool.items(): print(f" {tid}: {type(t).__name__}") # Trajectory.items() → (alias, Sequence) pairs traj = tpool[tpool.unique_ids[0]] for alias, seq in traj.items(): print(f" {alias}: {len(seq)} entities") ---------------------------------------- ## EntityCriterion """ EntityCriterion =============== Select sequences or prune entity rows using any **Polars expression** evaluated against the temporal data. .. list-table:: :header-rows: 1 :widths: 25 75 * - Level - Behaviour * - ``which()`` - Returns IDs that have **at least one** row satisfying the expression. * - ``filter_entities()`` - Keeps only the rows where the expression is ``True``; sequences with zero matching rows disappear from the filtered view. * - ``match()`` - Returns ``True`` iff the sequence has at least one matching row. See :doc:`../../../reference/criterion` for the full reference. """ Imports ~~~~~~~ import polars as pl from tanat import build_intervals from tanat.criterion import EntityCriterion from tanat.dataset import simulate_intervals, simulate_static Simulate data ~~~~~~~~~~~~~ temporal = simulate_intervals( n_ids=50, features=["value", "status"], seed=42, ) static = simulate_static(n_ids=50, features=["age", "group"], seed=0) pool = build_intervals( temporal_data=temporal, id_column="id", start_column="start", end_column="end", static_data=static, ) print(pool) # Inspect the unique status values present in the data. pool.temporal_data()["status"].unique() ``which()`` : sequence-level selection ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Return the IDs of all sequences that have **at least one** entity row satisfying the expression. The original pool is left unchanged. # Pick a status value that exists in the data target_status = "A" # Select sequences that have at least one entity with that status. ids_with_status = pool.which(EntityCriterion(query=pl.col("status") == target_status)) # Numeric threshold: sequences with at least one high-value entity. ids_high_value = pool.which(EntityCriterion(query=pl.col("value") > 80)) # Combine conditions with a Polars expression. ids_combined = pool.which( EntityCriterion(query=(pl.col("status") == target_status) & (pl.col("value") > 80)) ) ``filter_entities()``: entity-level pruning ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Return a **new pool view** that contains only the rows satisfying the expression. The original pool is unchanged. Sequences with zero surviving rows no longer appear in the filtered pool. filtered = pool.filter_entities( EntityCriterion(query=pl.col("status") == target_status) ) # Combine two conditions in a single criterion to narrow further. filtered2 = pool.filter_entities( EntityCriterion(query=(pl.col("status") == target_status) & (pl.col("value") > 80)) ) ``match()``: single-sequence evaluation ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ criterion = EntityCriterion(query=pl.col("status") == target_status) # Iterate to find the first sequence that matches. first_match = next((s for s in pool if s.match(criterion)), None) if first_match: print(f"First matching sequence: id={first_match.id_value}") ---------------------------------------- ## LengthCriterion """ LengthCriterion =============== Select sequences by their **number of entity rows** (sequence length). .. list-table:: :header-rows: 1 :widths: 20 80 * - Parameter - Description * - ``gt`` / ``ge`` - Strictly greater than / greater than or equal to. * - ``lt`` / ``le`` - Strictly less than / less than or equal to. At least one bound must be supplied. Contradictory bounds (e.g. ``gt=5, lt=3``) are rejected at construction time. :class:`~tanat.criterion.LengthCriterion` supports **SEQUENCE** level only (``which()``, ``match()``); ``filter_entities()`` is not available. See :doc:`../../../reference/criterion` for the full reference. """ Imports ~~~~~~~ from tanat import build_intervals from tanat.criterion import LengthCriterion from tanat.dataset import simulate_intervals Simulate data ~~~~~~~~~~~~~ temporal = simulate_intervals(n_ids=50, features=["value", "status"], seed=42) pool = build_intervals( temporal_data=temporal, id_column="id", start_column="start", end_column="end", ) print(pool) # Inspect length distribution or other summary statistics. pool.describe(by_id=False) ``which()``: single-bound selection ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Long sequences: more than 6 entities. ids_long = pool.which(LengthCriterion(gt=6)) # Short sequences: at most 3 entities. ids_short = pool.which(LengthCriterion(le=3)) print(f"Length ≤ 3 : {len(ids_short)} / {len(pool)} IDs") Range selection ~~~~~~~~~~~~~~~ Combine bounds to select sequences whose length falls in a range. # Length = ]3, 6] ids_medium = pool.which(LengthCriterion(gt=3, le=6)) Subset the pool ~~~~~~~~~~~~~~~ Use :py:meth:`~tanat.sequence.base.pool.SequencePool.subset` to obtain a restricted pool from the selected IDs. pool_long = pool.subset(ids_long) print(pool_long) # Inspect the length distribution in the subset. pool_long.describe(by_id=False) ``match()``: single-sequence evaluation ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ seq = pool[pool.unique_ids[0]] seq_len = len(seq) print( f"Sequence {seq.id_value}: length={seq_len} " f"gt=6? {seq.match(LengthCriterion(gt=6))} " f"le=3? {seq.match(LengthCriterion(le=3))}" ) ---------------------------------------- ## PatternCriterion """ PatternCriterion ================ Select sequences or extract witness rows based on an **ordered pattern** of string values in a feature column. .. list-table:: :header-rows: 1 :widths: 20 80 * - Sentinel - Meaning * - ``ANY`` (``"..."``) - Zero or more elements: free gap between adjacent sub-patterns. * - ``WILDCARD`` (``"*"``) - Exactly **one** element of any value at that position. .. list-table:: :header-rows: 1 :widths: 25 75 * - Level - Behaviour * - ``which()`` - IDs whose temporal sequence contains (``present=True``) or does not contain (``present=False``) the ordered pattern. * - ``filter_entities()`` - Keeps the "witness" rows of the greedy first match (``present=True``), or all non-witness rows (``present=False``). * - ``match()`` - Returns ``True`` iff the pattern is found (resp. absent). See :doc:`../../../reference/criterion` for the full reference. """ Imports ~~~~~~~ from tanat import build_intervals from tanat.criterion import ANY, WILDCARD, PatternCriterion from tanat.dataset import simulate_intervals Simulate data ~~~~~~~~~~~~~ temporal = simulate_intervals(n_ids=50, features=["score", "status"], seed=42) pool = build_intervals( temporal_data=temporal, id_column="id", start_column="start", end_column="end", ) print(pool) # Pick 2 status values existing in the temporal data. A = "A" B = "B" Single-element pattern ~~~~~~~~~~~~~~~~~~~~~~ A plain string (or single-element list) selects sequences that contain **at least one** entity with that value. ids_has_A = pool.which(PatternCriterion(feature="status", pattern=A)) # Exclusion: sequences that never show status A. ids_no_A = pool.which(PatternCriterion(feature="status", pattern=A, present=False)) Adjacent pattern: A directly followed by B ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ``[A, B]`` matches only if B appears **immediately after** A in the ordered sequence of entities. ids_adj = pool.which(PatternCriterion(feature="status", pattern=[A, B])) Free gap: A anywhere before B ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Insert :data:`~tanat.criterion.ANY` between elements to allow an arbitrary number of rows in between. ids_gap = pool.which(PatternCriterion(feature="status", pattern=[A, ANY, B])) Wildcard: exactly one element between A and B ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ :data:`~tanat.criterion.WILDCARD` matches **exactly one** entity of any value. ids_wildcard = pool.which(PatternCriterion(feature="status", pattern=[A, WILDCARD, B])) Combining sentinels ~~~~~~~~~~~~~~~~~~~~ You can mix :data:`ANY` and :data:`WILDCARD` freely. Here: A, then any gap, then exactly two consecutive B's. ids_double_B = pool.which(PatternCriterion(feature="status", pattern=[A, ANY, B, B])) Regex and case options ~~~~~~~~~~~~~~~~~~~~~~~ By default elements are treated as **regular expressions** (``regex=True``). Use ``regex=False`` for literal substring matching. Add ``case_sensitive=False`` for case-insensitive matching. # Literal, case-insensitive: same result as the exact match above. a_lower = A.lower() ids_ci = pool.which( PatternCriterion( feature="status", pattern=a_lower, regex=False, case_sensitive=False ) ) ``filter_entities()``: witness rows ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ With ``present=True`` (default), only the **greedy first-match witness rows** are kept. Each ID contributes at most ``len(pattern)`` rows. pattern = [A, B] filtered = pool.filter_entities(PatternCriterion(feature="status", pattern=pattern)) # inspect length of filtered sequences filtered.describe(by_id=False) ``match()``: single-sequence evaluation ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ criterion = PatternCriterion(feature="status", pattern=[A, B]) # Find all matching sequences by iterating. matching_seqs = [s for s in pool if s.match(criterion)] print(f"{len(matching_seqs)} sequence(s) contain A→B") ---------------------------------------- ## RankCriterion """ RankCriterion ============= Prune entity rows by their **0-based positional rank** within each sequence. Ranks can be absolute (from the first entity) or relative to T0 (the nearest entity to the reference date set via ``pool.set_t0()``). Exactly **one** parameter group must be specified: .. list-table:: :header-rows: 1 :widths: 30 70 * - Group - Description * - ``first=N`` - Keep first N rows (``N < 0`` → all except last ``|N|``). * - ``last=N`` - Keep last N rows (``N < 0`` → all except first ``|N|``). * - ``start`` / ``end`` / ``step`` - Python-slice semantics (negative indices supported). * - ``ranks=[…]`` - Explicit list of 0-based positions (negative = from end). Pass ``relative=True`` to interpret ranks relative to T0 rather than the start of the sequence. :class:`~tanat.criterion.RankCriterion` supports **ENTITY** level only (``filter_entities()``); ``which()`` and ``match()`` are not available. See :doc:`../../../reference/criterion` for the full reference. """ Imports ~~~~~~~ from tanat import build_intervals from tanat.criterion import RankCriterion from tanat.dataset import simulate_intervals, simulate_static Simulate data ~~~~~~~~~~~~~ temporal = simulate_intervals(n_ids=50, features=["value", "status"], seed=42) static = simulate_static(n_ids=50, features=["age"], seed=0) pool = build_intervals( temporal_data=temporal, id_column="id", start_column="start", end_column="end", static_data=static, ) print(pool) # Inspect length distribution or other summary statistics. pool.describe(by_id=False) ``first`` and ``last`` ~~~~~~~~~~~~~~~~~~~~~~ Positive ``N``: keep the first (or last) N entities per sequence. Negative ``N``: drop the last (or first) ``|N|`` entities per sequence. # Keep the first 2 entities. pool_first2 = pool.filter_entities(RankCriterion(first=2)) # Inspect length of filtered sequences. pool_first2.describe(by_id=False) # Keep the last 3 entities. pool_last3 = pool.filter_entities(RankCriterion(last=3)) # Inspect length of filtered sequences. pool_last3.describe(by_id=False) # Drop the last entity: first=-1 keeps all except the final row. pool_drop_last = pool.filter_entities(RankCriterion(first=-1)) # Inspect length of filtered sequences. pool_drop_last.describe(by_id=False) Slice: ``start`` / ``end`` / ``step`` ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Python-slice semantics. Negative indices count from the end of each sequence. # Entities at absolute ranks 1, 2, 3 (0-based → second to fourth row). pool_slice = pool.filter_entities(RankCriterion(start=1, end=4)) # Inspect length of filtered sequences. pool_slice.describe(by_id=False) # Every other entity (even-ranked rows). pool_step = pool.filter_entities(RankCriterion(step=2)) # Inspect length of filtered sequences. pool_step.describe(by_id=False) Explicit ``ranks`` ~~~~~~~~~~~~~~~~~~ Pass a list of 0-based positions. Negative values index from the end. # First and last entity of each sequence. pool_ends = pool.filter_entities(RankCriterion(ranks=[0, -1])) # Inspect length of filtered sequences. pool_ends.describe(by_id=False) Relative mode: ranks relative to T0 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Set a reference date with ``pool.set_t0()`` first. Then ``relative=True`` interprets ranks relative to the nearest entity to T0: rank 0 = that entity, rank -1 = one entity before, rank +1 = one after. pool.set_t0(position=-1, anchor="start") # T0 = start of last entity # Keep the entity at T0 and the 2 entities before it: [T-2, T-1, T0]. # NOTE: relative=True, end is exclusive. pool_t0 = pool.filter_entities(RankCriterion(start=-2, end=1, relative=True)) # Inspect length of filtered sequences. pool_t0.describe(by_id=False) # Rank 0 alone: a single "anchor" entity per sequence. pool_anchor = pool.filter_entities(RankCriterion(ranks=0, relative=True)) # Inspect T0 anchor entities. pool_anchor.temporal_data().head() ---------------------------------------- ## StaticCriterion """ StaticCriterion =============== Select sequences or trajectories using a **Polars expression evaluated against the static (per-ID) data**. Static features do not vary over time; typical examples are age, group membership, or a baseline score. .. list-table:: :header-rows: 1 :widths: 25 75 * - Level - Behaviour * - ``which()`` on a SequencePool - Returns IDs whose static row satisfies the expression. * - ``which()`` on a TrajectoryPool - Same, at trajectory level. * - ``match()`` - Returns ``True`` iff this sequence / trajectory's static row matches. * - ``filter_entities()`` - **Not supported** — static data has no entity rows to prune. See :doc:`../../../reference/criterion` for the full reference. """ Imports ~~~~~~~ import polars as pl from tanat import build_events, build_intervals, build_trajectories from tanat.criterion import StaticCriterion from tanat.dataset import simulate_events, simulate_intervals, simulate_static Simulate data ~~~~~~~~~~~~~ :class:`~tanat.criterion.StaticCriterion` requires the pool to have static features attached. Pass ``static_data`` to the builder (or call ``pool.add_static_features()`` later). temporal = simulate_intervals( n_ids=50, features=["value", "status"], seed=42, ) static = simulate_static(n_ids=50, features=["age", "group"], seed=0) static.head() pool = build_intervals( temporal_data=temporal, id_column="id", start_column="start", end_column="end", static_data=static, ) print(pool) ``which()``: sequence-level selection ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ The expression is evaluated once per ID against the static table. IDs that lack a static row (e.g. IDs not present in ``static_data``) do not appear in the result. # Numeric threshold. ids_old = pool.which(StaticCriterion(query=pl.col("age") > 50)) # Categorical filter. target_group = "A" ids_group = pool.which(StaticCriterion(query=pl.col("group") == target_group)) # Combine conditions. ids_combined = pool.which( StaticCriterion(query=(pl.col("age") > 50) & (pl.col("group") == target_group)) ) # Use the result to subset the pool. pool_old = pool.subset(ids_old) print(pool_old) Complement and partitioning ~~~~~~~~~~~~~~~~~~~~~~~~~~~ The two complementary age filters partition the IDs that have a non-null age. ids_young = pool.which(StaticCriterion(query=pl.col("age") <= 50)) ids_null_age = pool.which(StaticCriterion(query=pl.col("age").is_null())) Trajectory pool ~~~~~~~~~~~~~~~ :class:`~tanat.criterion.StaticCriterion` works identically on a :class:`~tanat.trajectory.pool.TrajectoryPool` because trajectories share the same static-data concept. temporal_events = simulate_events(n_ids=50, features=["value", "status"], seed=1) event_pool = build_events( temporal_data=temporal_events, id_column="id", time_column="time", ) tpool = build_trajectories( pools={"admissions": pool, "labs": event_pool}, static_data=static, id_column="id", ) print(tpool) # Query on static features to get trajectory IDs. traj_ids = tpool.which(StaticCriterion(query=pl.col("age") > 50)) ``match()``: single-trajectory evaluation ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Iterate to find the first trajectory that matches. criterion = StaticCriterion(query=pl.col("age") > 50) first_match = next((t for t in tpool if t.match(criterion)), None) if first_match: print(f"First matching trajectory: id={first_match.id_value}") ---------------------------------------- ## TimeCriterion """ TimeCriterion ============= Filter entities or select sequences based on **temporal bounds** applied to the start and/or end time columns. .. list-table:: :header-rows: 1 :widths: 25 75 * - Parameter - Description * - ``start_ge`` / ``start_le`` - Inclusive bounds on the **start** column. * - ``end_ge`` / ``end_le`` - Inclusive bounds on the **end** column (interval/state pools only). * - ``duration_within`` - ``False`` (default): any overlap with the window suffices. ``True``: the entity interval must be **fully contained** in the window. * - ``all_entities`` - ``False`` (default): at least one row must satisfy the bounds. ``True``: **every** row must satisfy the bounds. All bounds are inclusive. At least one bound must be supplied. See :doc:`../../../reference/criterion` for the full reference. """ Imports ~~~~~~~ import datetime as dt from tanat import build_intervals, build_events from tanat.criterion import TimeCriterion from tanat.dataset import simulate_intervals, simulate_events Simulate data ~~~~~~~~~~~~~ temporal = simulate_intervals(n_ids=50, features=["value", "status"], seed=42) pool = build_intervals( temporal_data=temporal, id_column="id", start_column="start", end_column="end", ) print(pool) # Inspect the time range covered by the data. df = pool.temporal_data() print( f"start range: {df['start'].min()} → {df['start'].max()}\n" f"end range : {df['end'].min()} → {df['end'].max()}" ) ``which()``: sequence-level selection ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Any entity starts on or after a given date (default: all_entities=False). cutoff = dt.datetime(2000, 7, 1) ids_after = pool.which(TimeCriterion(start_ge=cutoff)) # All entities must start after the cutoff (stricter: all_entities=True). ids_all_after = pool.which(TimeCriterion(start_ge=cutoff, all_entities=True)) # Two-sided window: sequences with at least one entity that starts in [t0, t1]. t0 = dt.datetime(2000, 3, 1) t1 = dt.datetime(2000, 9, 1) ids_window = pool.which(TimeCriterion(start_ge=t0, start_le=t1)) Overlap vs containment ~~~~~~~~~~~~~~~~~~~~~~ For duration-based sequences (Interval/State) two modes control how entity relate to the query window: * **Overlap** (``duration_within=False``, default): entity touches the window → start ≤ window_end **and** end ≥ window_start. * **Containment** (``duration_within=True``): entity lies fully inside → start ≥ window_start **and** end ≤ window_end. window_start = dt.datetime(2007, 1, 1) window_end = dt.datetime(2008, 1, 1) filtered_overlap = pool.filter_entities( TimeCriterion(start_ge=window_start, end_le=window_end, duration_within=False) ) filtered_within = pool.filter_entities( TimeCriterion(start_ge=window_start, end_le=window_end, duration_within=True) ) ids_overlap = pool.which(TimeCriterion(start_ge=window_start, end_le=window_end)) ids_within = pool.which( TimeCriterion(start_ge=window_start, end_le=window_end, duration_within=True) ) Event pools (single time column) ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ For event sequences only ``start_ge`` / ``start_le`` apply; ``end_ge`` / ``end_le`` and ``duration_within`` are unavailable. raw_events = simulate_events(n_ids=50, features=["value", "status"], seed=1) event_pool = build_events( temporal_data=raw_events, id_column="id", time_column="time", ) # Inspect time range. ev_df = event_pool.temporal_data() print(f"event time range: {ev_df['time'].min()} → {ev_df['time'].max()}") ev_cutoff = dt.datetime(2000, 6, 1) ids_ev = event_pool.which(TimeCriterion(start_ge=ev_cutoff)) ``match()``: single-sequence evaluation ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ criterion = TimeCriterion(start_ge=cutoff) # Iterate to find the first sequence that matches. first_match = next((s for s in pool if s.match(criterion)), None) if first_match: print(f"First matching sequence: id={first_match.id_value}") ---------------------------------------- ## Custom Entity Metric """ Custom Entity Metric ==================== Learn how to implement a custom entity-level distance metric by subclassing :class:`~tanat.metric.entity.base.EntityMetric`. An entity metric computes a scalar distance between two individual entities (atomic observations in a sequence). It is the basic building block used by most sequence-level metrics. **Minimal contract** 1. Declare a ``SETTINGS_CLASS`` dataclass (use ``tanat_utils.settings_dataclass``). 2. Implement ``validate_entity(ent_a, ent_b)``: raise ``TypeError`` / ``KeyError`` when the entities are incompatible with the metric. 3. Implement ``_compute(ent_a, ent_b)``: return a non-negative ``float``. The public ``__call__`` in the base class invokes ``validate_entity`` then ``_compute`` automatically. """ Setup ----- import polars as pl from tanat_utils import settings_dataclass as dataclass from tanat import build_events from tanat.dataset import simulate_events from tanat.metric.entity.base import EntityMetric Data ---- A small event pool with a ``score`` numeric feature and a ``status`` categorical feature. raw = simulate_events(n_ids=20, features=["score", "status"], seed=0) pool = build_events(temporal_data=raw, id_column="id", time_column="time") pool.cast_features({"score": pl.Float32}) print(pool) Define a custom entity metric ----------------------------- We implement a **normalised absolute difference** on a numeric feature. ``dist(a, b) = |a.score - b.score| / scale`` @dataclass class ScoreDiffSettings: """Settings for :class:`ScoreDiffEntityMetric`. Args: entity_feature: Numeric feature to compare. scale: Normalisation constant (``1.0`` means raw absolute diff). """ entity_feature: str = "score" scale: float = 1.0 class ScoreDiffEntityMetric(EntityMetric, register_name="score_diff"): """Normalised absolute difference on a numeric entity feature.""" SETTINGS_CLASS = ScoreDiffSettings def __init__(self, entity_feature: str = "score", scale: float = 1.0) -> None: super().__init__( settings=ScoreDiffSettings(entity_feature=entity_feature, scale=scale) ) # ------------------------------------------------------------------ # Required implementations # ------------------------------------------------------------------ def validate_entity(self, ent_a, ent_b=None) -> None: """Check that both entities expose the expected numeric feature.""" self._validate_entity_instance(ent_a, ent_b) feat = self.settings.entity_feature for ent in (e for e in (ent_a, ent_b) if e is not None): if feat not in ent.data(): raise KeyError( f"Feature {feat!r} not found in entity. " f"Available: {list(ent.data().keys())}" ) def _compute(self, ent_a, ent_b) -> float: """Return normalised absolute difference of the configured feature.""" feat = self.settings.entity_feature val_a = float(ent_a[feat]) val_b = float(ent_b[feat]) return abs(val_a - val_b) / self.settings.scale Instantiate and inspect ----------------------- metric = ScoreDiffEntityMetric(entity_feature="score", scale=100.0) print(metric) Compute a distance between two entities ---------------------------------------- ids = pool.unique_ids ent_a = pool[ids[0]][0] ent_b = pool[ids[1]][0] print(f"score A : {ent_a['score']}") print(f"score B : {ent_b['score']}") print(f"distance: {metric(ent_a, ent_b):.4f}") Compute distances over several pairs ------------------------------------- print("Sample pairwise distances") print("-" * 40) for i in range(5): ea = pool[ids[i]][0] eb = pool[ids[i + 1]][0] d = metric(ea, eb) print(f" {ea['score']:6.1f} vs {eb['score']:6.1f} -> {d:.4f}") Use the custom metric inside a sequence metric ----------------------------------------------- Any :class:`~tanat.metric.entity.base.EntityMetric` can be passed as the ``entity_metric`` argument to sequence-level metrics such as :class:`~tanat.metric.sequence.LinearPairwiseSequenceMetric`. from tanat.metric.sequence import LinearPairwiseSequenceMetric lp = LinearPairwiseSequenceMetric(entity_metric=metric) print(lp) seq_a = pool[ids[0]] seq_b = pool[ids[1]] dist = lp(seq_a, seq_b) print(f"LinearPairwise distance: {dist:.4f}") dm = lp.compute_matrix(pool) dm.to_frame().head() ---------------------------------------- ## Entity Metric: Hamming Distance """ Entity Metric: Hamming Distance ================================ This example demonstrates the Hamming entity metric, which measures the distance between two individual entities (single point-in-time observations) based on categorical feature equality. .. note:: Most of sequence-level metrics require an entity metric as a building block. Hamming is the most common choice for categorical features. """ # Setup # ----- import polars as pl from tanat import build_states from tanat.dataset import simulate_states from tanat.metric.entity import HammingEntityMetric # Generate synthetic data # ----------------------- N_IDS = 50 SEED = 42 raw_df = simulate_states( n_ids=N_IDS, seq_length_range=(3, 8), features=["score", "status"], seed=SEED, ) pool = build_states( temporal_data=raw_df, id_column="id", start_column="start", end_column="end", ) # HammingEntityMetric requires Categorical features pool.cast_features({"status": pl.Categorical}) print(pool) # Create Hamming entity metric # ---------------------------- hamming = HammingEntityMetric(entity_feature="status") print(hamming) # Compute distance between individual entities # --------------------------------------------- ids = pool.unique_ids seq_a = pool[ids[0]] seq_b = pool[ids[1]] # Extract first entity from each sequence ent_a, ent_b = seq_a[0], seq_b[0] # Entity A print(ent_a) # Entity B print(ent_b) # Compute Hamming distance dist = hamming(ent_a, ent_b) print(f"\nHamming distance: {dist}") print(" Same categories → 0.0") print(" Different categories → 1.0 (default mismatch_cost)") # Try multiple pairs # ------------------ print("\nDistances between random entity pairs:") print("-" * 50) for i in range(5): seq_1 = pool[ids[i]] seq_2 = pool[ids[i + 1]] # Compare first entities from each sequence e1, e2 = seq_1[0], seq_2[0] d = hamming(e1, e2) print(f"Pair {i+1}: {e1['status']!r:10} vs {e2['status']!r:10} → {d:.1f}") ---------------------------------------- ## Sequence Metrics: Chi² """ Sequence Metrics: Chi² ======================= This example demonstrates :class:`~tanat.metric.sequence.Chi2SequenceMetric`, which computes the Chi-squared distance between state-time distributions. """ # Setup # ----- import matplotlib.pyplot as plt import polars as pl from tanat import build_states from tanat.dataset import simulate_states from tanat.metric.sequence import Chi2SequenceMetric # Generate synthetic data # ----------------------- SEED = 42 N_IDS = 80 raw_df = simulate_states( n_ids=N_IDS, seq_length_range=(3, 8), features=["score", "status"], seed=SEED, ) pool = build_states(raw_df, id_column="id", start_column="start", end_column="end") # Cast features to categorical pool.cast_features({"status": pl.Categorical}) print(pool) # Define metric # ------------- metric = Chi2SequenceMetric(entity_feature="status") print(metric) # Compute distance between a single pair # --------------------------------------- ids = pool.unique_ids dist = metric(pool[ids[0]], pool[ids[1]]) print(f"Distance between {ids[0]} and {ids[1]}: {dist:.4f}") # Compute full pairwise distance matrix # -------------------------------------- dm = metric.compute_matrix(pool) print(f"Distance matrix shape: {dm.shape}") # Visualize distances # ------------------- arr = dm.to_numpy() fig, ax = plt.subplots(figsize=(6.5, 5.5)) im = ax.imshow(arr, cmap="viridis_r") ax.set_title("Chi² distance matrix", fontsize=12, fontweight="bold") ax.set_xlabel("Sequence index") ax.set_ylabel("Sequence index") cbar = plt.colorbar(im, ax=ax) cbar.set_label("Distance") plt.tight_layout() plt.show() ---------------------------------------- ## Custom Sequence Metric """ Custom Sequence Metric ====================== Learn how to implement a custom sequence-level distance metric by subclassing :class:`~tanat.metric.sequence.base.SequenceMetric`. A sequence metric computes a scalar distance between two sequences and can produce a full pairwise :class:`~tanat.metric.DistanceMatrix` over a pool. **Minimal contract** 1. Declare a ``SETTINGS_CLASS`` dataclass (use ``tanat_utils.settings_dataclass``). 2. Implement ``_compute(seq_a, seq_b)``: return a non-negative ``float``. 3. Implement ``validate_composition(seq_a, seq_b)``: raise early if the metric is incompatible with the given sequences (feature mismatch, wrong type, …). The base class handles ``__call__``, ``compute_matrix``, and ``compute_cross_matrix`` automatically. .. note:: For performance-critical cases you can additionally override ``_compute_matrix_impl`` with a vectorised or Numba-backed kernel. This is not required for a minimal working implementation. """ Setup ----- from tanat_utils import settings_dataclass as dataclass from tanat import build_states from tanat.dataset import simulate_states from tanat.metric.sequence.base import SequenceMetric Data ---- raw = simulate_states(n_ids=30, features=["status"], seed=42) pool = build_states( temporal_data=raw, id_column="id", start_column="start", end_column="end" ) print(pool) Define a custom sequence metric -------------------------------- We implement a **length-difference** metric: the distance between two sequences is the absolute difference of their lengths (number of entities). This intentionally simple metric requires no entity-level building block and illustrates the minimal implementation pattern. @dataclass class LengthDiffSettings: """Settings for :class:`LengthDiffSequenceMetric`. Args: normalize: If ``True``, divide by the length of the longer sequence so that the result is in ``[0, 1]``. """ normalize: bool = False class LengthDiffSequenceMetric(SequenceMetric, register_name="length_diff"): """Distance metric based on the difference in sequence lengths. ``dist(seq_a, seq_b) = |len(seq_a) - len(seq_b)|`` With ``normalize=True``: ``dist = |len(seq_a) - len(seq_b)| / max(len(seq_a), len(seq_b))`` """ SETTINGS_CLASS = LengthDiffSettings def __init__(self, normalize: bool = False) -> None: super().__init__(settings=LengthDiffSettings(normalize=normalize)) # ------------------------------------------------------------------ # Required implementations # ------------------------------------------------------------------ def validate_composition(self, seq_a, seq_b=None) -> None: """No entity-feature constraint: any sequence type is accepted.""" # Length is always available, nothing to validate here. def _compute(self, seq_a, seq_b) -> float: """Absolute (or normalised) difference of sequence lengths.""" la, lb = len(seq_a), len(seq_b) diff = abs(la - lb) if self.settings.normalize: denom = max(la, lb) return float(diff / denom) if denom > 0 else 0.0 return float(diff) Instantiate and inspect ----------------------- metric = LengthDiffSequenceMetric(normalize=True) print(metric) Compute a distance between two sequences ----------------------------------------- ids = pool.unique_ids seq_a = pool[ids[0]] seq_b = pool[ids[1]] print(f"len(seq_a) = {len(seq_a)}, len(seq_b) = {len(seq_b)}") print(f"distance = {metric(seq_a, seq_b):.4f}") Compute a full pairwise distance matrix ---------------------------------------- ``compute_matrix`` is inherited from the base class and works out of the box once ``_compute`` is implemented. dm = metric.compute_matrix(pool) dm.to_frame().head() ---------------------------------------- ## Sequence Metrics: DTW """ Sequence Metrics: DTW ====================== This example demonstrates :class:`~tanat.metric.sequence.DTWSequenceMetric`, which computes Dynamic Time Warping distance allowing flexible temporal alignment between sequences of different lengths. """ # Setup # ----- import matplotlib.pyplot as plt import polars as pl from tanat import build_states from tanat.dataset import simulate_states from tanat.metric.entity import HammingEntityMetric from tanat.metric.sequence import DTWSequenceMetric # Generate synthetic data # ----------------------- SEED = 42 N_IDS = 80 raw_df = simulate_states( n_ids=N_IDS, seq_length_range=(3, 8), features=["score", "status"], seed=SEED, ) pool = build_states(raw_df, id_column="id", start_column="start", end_column="end") # Cast features to categorical pool.cast_features({"status": pl.Categorical}) print(pool) # Define metric # ------------- hamming = HammingEntityMetric(entity_feature="status") metric = DTWSequenceMetric(entity_metric=hamming, normalize=True) print(metric) # Compute distance between a single pair # --------------------------------------- ids = pool.unique_ids dist = metric(pool[ids[0]], pool[ids[1]]) print(f"Distance between {ids[0]} and {ids[1]}: {dist:.4f}") # Compute full pairwise distance matrix # -------------------------------------- dm = metric.compute_matrix(pool) print(f"Distance matrix shape: {dm.shape}") # Visualize distances # ------------------- arr = dm.to_numpy() fig, ax = plt.subplots(figsize=(6.5, 5.5)) im = ax.imshow(arr, cmap="viridis_r", vmin=0, vmax=1) ax.set_title("DTW distance matrix", fontsize=12, fontweight="bold") ax.set_xlabel("Sequence index") ax.set_ylabel("Sequence index") cbar = plt.colorbar(im, ax=ax) cbar.set_label("Distance") plt.tight_layout() plt.show() ---------------------------------------- ## Sequence Metrics: Edit Distance """ Sequence Metrics: Edit Distance ================================ This example demonstrates :class:`~tanat.metric.sequence.EditSequenceMetric`, which computes Needleman-Wunsch edit distance between sequences with configurable insertion/deletion penalties. """ # Setup # ----- import matplotlib.pyplot as plt import polars as pl from tanat import build_states from tanat.dataset import simulate_states from tanat.metric.entity import HammingEntityMetric from tanat.metric.sequence import EditSequenceMetric # Generate synthetic data # ----------------------- SEED = 42 N_IDS = 80 raw_df = simulate_states( n_ids=N_IDS, seq_length_range=(3, 8), features=["score", "status"], seed=SEED, ) pool = build_states(raw_df, id_column="id", start_column="start", end_column="end") # Cast features to categorical pool.cast_features({"status": pl.Categorical}) print(pool) # Define metric # ------------- hamming = HammingEntityMetric(entity_feature="status") metric = EditSequenceMetric(entity_metric=hamming, indel_cost=1.0, normalize=True) print(metric) # Compute distance between a single pair # --------------------------------------- ids = pool.unique_ids dist = metric(pool[ids[0]], pool[ids[1]]) print(f"Distance between {ids[0]} and {ids[1]}: {dist:.4f}") # Compute full pairwise distance matrix # -------------------------------------- dm = metric.compute_matrix(pool) print(f"Distance matrix shape: {dm.shape}") # Visualize distances # ------------------- arr = dm.to_numpy() fig, ax = plt.subplots(figsize=(6.5, 5.5)) im = ax.imshow(arr, cmap="viridis_r", vmin=0, vmax=1) ax.set_title("Edit distance matrix", fontsize=12, fontweight="bold") ax.set_xlabel("Sequence index") ax.set_ylabel("Sequence index") cbar = plt.colorbar(im, ax=ax) cbar.set_label("Distance") plt.tight_layout() plt.show() ---------------------------------------- ## Sequence Metrics: LCP """ Sequence Metrics: LCP ===================== This example demonstrates :class:`~tanat.metric.sequence.LCPSequenceMetric`, which computes the Longest Common Prefix distance between two temporal sequences. """ # Setup # ----- import matplotlib.pyplot as plt import polars as pl from tanat import build_states from tanat.dataset import simulate_states from tanat.metric.entity import HammingEntityMetric from tanat.metric.sequence import LCPSequenceMetric # Generate synthetic data # ----------------------- SEED = 42 N_IDS = 80 raw_df = simulate_states( n_ids=N_IDS, seq_length_range=(3, 8), features=["score", "status"], seed=SEED, ) pool = build_states(raw_df, id_column="id", start_column="start", end_column="end") # Cast features to categorical pool.cast_features({"status": pl.Categorical}) print(pool) # Define metric # ------------- hamming = HammingEntityMetric(entity_feature="status") metric = LCPSequenceMetric(entity_metric=hamming, mode="normalized") print(metric) # Compute distance between a single pair # --------------------------------------- ids = pool.unique_ids dist = metric(pool[ids[0]], pool[ids[1]]) print(f"Distance between {ids[0]} and {ids[1]}: {dist:.4f}") # Compute full pairwise distance matrix # -------------------------------------- dm = metric.compute_matrix(pool) print(f"Distance matrix shape: {dm.shape}") # Visualize distances # ------------------- arr = dm.to_numpy() fig, ax = plt.subplots(figsize=(6.5, 5.5)) im = ax.imshow(arr, cmap="viridis_r", vmin=0, vmax=1) ax.set_title("LCP distance matrix", fontsize=12, fontweight="bold") ax.set_xlabel("Sequence index") ax.set_ylabel("Sequence index") cbar = plt.colorbar(im, ax=ax) cbar.set_label("Distance") plt.tight_layout() plt.show() ---------------------------------------- ## Sequence Metrics: LCS """ Sequence Metrics: LCS ====================== This example demonstrates :class:`~tanat.metric.sequence.LCSSequenceMetric`, which computes a distance derived from the Longest Common Subsequence between two temporal sequences. """ # Setup # ----- import matplotlib.pyplot as plt import polars as pl from tanat import build_states from tanat.dataset import simulate_states from tanat.metric.entity import HammingEntityMetric from tanat.metric.sequence import LCSSequenceMetric # Generate synthetic data # ----------------------- SEED = 42 N_IDS = 80 raw_df = simulate_states( n_ids=N_IDS, seq_length_range=(3, 8), features=["score", "status"], seed=SEED, ) pool = build_states(raw_df, id_column="id", start_column="start", end_column="end") # Cast features to categorical pool.cast_features({"status": pl.Categorical}) print(pool) # Define metric # ------------- hamming = HammingEntityMetric(entity_feature="status") metric = LCSSequenceMetric(entity_metric=hamming, mode="normalized") print(metric) # Compute distance between a single pair # --------------------------------------- ids = pool.unique_ids dist = metric(pool[ids[0]], pool[ids[1]]) print(f"Distance between {ids[0]} and {ids[1]}: {dist:.4f}") # Compute full pairwise distance matrix # -------------------------------------- dm = metric.compute_matrix(pool) print(f"Distance matrix shape: {dm.shape}") # Visualize distances # ------------------- arr = dm.to_numpy() fig, ax = plt.subplots(figsize=(6.5, 5.5)) im = ax.imshow(arr, cmap="viridis_r", vmin=0, vmax=1) ax.set_title("LCS distance matrix", fontsize=12, fontweight="bold") ax.set_xlabel("Sequence index") ax.set_ylabel("Sequence index") cbar = plt.colorbar(im, ax=ax) cbar.set_label("Distance") plt.tight_layout() plt.show() ---------------------------------------- ## Sequence Metrics: LinearPairwise """ Sequence Metrics: LinearPairwise ================================ This example demonstrates :class:`~tanat.metric.sequence.LinearPairwiseSequenceMetric`, which computes position-wise distances between sequences using an entity metric, then aggregates them (mean by default). """ # Setup # ----- import matplotlib.pyplot as plt import polars as pl from tanat import build_states from tanat.dataset import simulate_states from tanat.metric.entity import HammingEntityMetric from tanat.metric.sequence import LinearPairwiseSequenceMetric # Generate synthetic data # ----------------------- SEED = 42 N_IDS = 80 raw_df = simulate_states( n_ids=N_IDS, seq_length_range=(3, 8), features=["score", "status"], seed=SEED, ) pool = build_states(raw_df, id_column="id", start_column="start", end_column="end") # Cast features to categorical pool.cast_features({"status": pl.Categorical}) print(pool) # Define metric # ------------- hamming = HammingEntityMetric(entity_feature="status") metric = LinearPairwiseSequenceMetric( entity_metric=hamming, agg_fun="mean", padding_penalty=1.0 ) print(metric) # Compute distance between a single pair # --------------------------------------- ids = pool.unique_ids dist = metric(pool[ids[0]], pool[ids[1]]) print(f"Distance between {ids[0]} and {ids[1]}: {dist:.4f}") # Compute full pairwise distance matrix # -------------------------------------- dm = metric.compute_matrix(pool) print(f"Distance matrix shape: {dm.shape}") # Visualize distances # ------------------- arr = dm.to_numpy() fig, ax = plt.subplots(figsize=(6.5, 5.5)) im = ax.imshow(arr, cmap="viridis_r", vmin=0, vmax=1) ax.set_title("LinearPairwise distance matrix", fontsize=12, fontweight="bold") ax.set_xlabel("Sequence index") ax.set_ylabel("Sequence index") cbar = plt.colorbar(im, ax=ax) cbar.set_label("Distance") plt.tight_layout() plt.show() ---------------------------------------- ## Sequence Metrics: SoftDTW """ Sequence Metrics: SoftDTW ========================== This example demonstrates :class:`~tanat.metric.sequence.SoftDTWSequenceMetric`, a differentiable variant of DTW controlled by a smoothing parameter ``gamma``. """ # Setup # ----- import matplotlib.pyplot as plt import polars as pl from tanat import build_states from tanat.dataset import simulate_states from tanat.metric.entity import HammingEntityMetric from tanat.metric.sequence import SoftDTWSequenceMetric # Generate synthetic data # ----------------------- SEED = 42 N_IDS = 80 raw_df = simulate_states( n_ids=N_IDS, seq_length_range=(3, 8), features=["score", "status"], seed=SEED, ) pool = build_states(raw_df, id_column="id", start_column="start", end_column="end") # Cast features to categorical pool.cast_features({"status": pl.Categorical}) print(pool) # Define metric # ------------- hamming = HammingEntityMetric(entity_feature="status") metric = SoftDTWSequenceMetric(entity_metric=hamming, gamma=0.5) print(metric) # Compute distance between a single pair # --------------------------------------- ids = pool.unique_ids dist = metric(pool[ids[0]], pool[ids[1]]) print(f"Distance between {ids[0]} and {ids[1]}: {dist:.4f}") # Compute full pairwise distance matrix # -------------------------------------- dm = metric.compute_matrix(pool) print(f"Distance matrix shape: {dm.shape}") # Visualize distances # ------------------- arr = dm.to_numpy() fig, ax = plt.subplots(figsize=(6.5, 5.5)) im = ax.imshow(arr, cmap="viridis_r") ax.set_title("SoftDTW distance matrix", fontsize=12, fontweight="bold") ax.set_xlabel("Sequence index") ax.set_ylabel("Sequence index") cbar = plt.colorbar(im, ax=ax) cbar.set_label("Distance") plt.tight_layout() plt.show() ---------------------------------------- ## Trajectory Metrics: Aggregation Trajectory Metric """ Trajectory Metrics: Aggregation Trajectory Metric ================================================= This example demonstrates trajectory-level metrics using :class:`~tanat.metric.AggregationTrajectoryMetric`, which computes distances between trajectories that contain multiple sequence types (e.g., states, events, or intervals). """ # Setup # ----- import polars as pl import matplotlib.pyplot as plt from tanat import build_states, build_events, build_trajectories from tanat.dataset import simulate_trajectories from tanat.metric.entity import HammingEntityMetric from tanat.metric.sequence import ( EditSequenceMetric, LCSSequenceMetric, ) from tanat.metric import AggregationTrajectoryMetric # Generate synthetic trajectory data # ----------------------------------- N_TRAJ = 100 SEED = 42 raw = simulate_trajectories( sequences={ "states": { "type": "state", "n_ids": N_TRAJ, "seq_length_range": (3, 8), "features": ["score", "status"], }, "events": { "type": "event", "n_ids": N_TRAJ, "seq_length_range": (2, 6), "features": ["score", "status"], }, }, shared_ids=True, seed=SEED, ) # Build pools for each sequence type states_pool = build_states( temporal_data=raw["states"], id_column="id", start_column="start", end_column="end", ) events_pool = build_events( temporal_data=raw["events"], id_column="id", time_column="time", ) # Build trajectory pool traj_pool = build_trajectories(pools={"states": states_pool, "events": events_pool}) # Cast features to categorical for sp in traj_pool.sequence_pools.values(): sp.cast_features({"status": pl.Categorical}) print(traj_pool) # Define trajectory metric # ------------------------- hamming = HammingEntityMetric(entity_feature="status") # Use different metrics per alias (sequence type) agg = AggregationTrajectoryMetric( default_metric=EditSequenceMetric(entity_metric=hamming, normalize=True), sequence_metrics={ "events": LCSSequenceMetric(entity_metric=hamming, mode="normalized"), }, agg_fun="mean", ) print(agg) # Compute distance between a single pair # ---------------------------------------- traj_ids = traj_pool.unique_ids traj_a = traj_pool[traj_ids[0]] traj_b = traj_pool[traj_ids[1]] dist = agg(traj_a, traj_b) print(f"\nDistance between {traj_ids[0]} and {traj_ids[1]}: {dist:.4f}") # Compute full pairwise distance matrix # ---------------------------------------- matrix = agg.compute_matrix(traj_pool) print(f"\nDistance matrix shape: {matrix.shape}") print(f"Mean distance: {matrix.to_numpy()[matrix.to_numpy() > 0].mean():.4f}") # Visualize trajectory distances # -------------------------------- fig, ax = plt.subplots(figsize=(8, 6)) arr = matrix.to_numpy() im = ax.imshow(arr, cmap="viridis", aspect="auto") ax.set_title( "Trajectory distances\n(Edit distance for states, LCS for events)", fontsize=12, fontweight="bold", ) ax.set_xlabel("Trajectory index") ax.set_ylabel("Trajectory index") cbar = plt.colorbar(im, ax=ax) cbar.set_label("Distance") plt.tight_layout() plt.show() ---------------------------------------- ## Custom Trajectory Metric """ Custom Trajectory Metric ======================== Learn how to implement a custom trajectory-level distance metric by subclassing :class:`~tanat.metric.trajectory.base.TrajectoryMetric`. A trajectory metric computes a scalar distance between two :class:`~tanat.trajectory.trajectory.Trajectory` objects and can produce a full pairwise :class:`~tanat.metric.DistanceMatrix` over a :class:`~tanat.trajectory.pool.TrajectoryPool`. **Minimal contract** 1. Declare a ``SETTINGS_CLASS`` dataclass (use ``tanat_utils.settings_dataclass``). 2. Implement ``_compute(traj_a, traj_b)``: return a non-negative ``float``. The base class handles ``__call__``, ``compute_matrix``, and ``compute_cross_matrix`` automatically. """ Setup ----- from tanat_utils import settings_dataclass as dataclass from tanat import build_events, build_states from tanat.dataset import simulate_events, simulate_states from tanat.trajectory.shortcuts import build_trajectories from tanat.metric.trajectory.base import TrajectoryMetric Data ---- A trajectory pool combining an event sequence (``"visits"``) and a state sequence (``"status"``) for the same set of individuals. N_IDS = 20 SEED = 42 events_df = simulate_events(n_ids=N_IDS, features=["score"], seed=SEED) states_df = simulate_states(n_ids=N_IDS, features=["phase"], seed=SEED) event_pool = build_events(temporal_data=events_df, id_column="id", time_column="time") state_pool = build_states( temporal_data=states_df, id_column="id", start_column="start", end_column="end" ) tpool = build_trajectories({"visits": event_pool, "status": state_pool}) print(tpool) Define a custom trajectory metric ----------------------------------- We implement a **total-length-diff** metric: the distance between two trajectories is the sum of absolute length differences across all shared sequence aliases. Simple by design, its purpose is to show the minimal pattern. @dataclass class TotalLengthDiffSettings: """Settings for :class:`TotalLengthDiffTrajectoryMetric`. Args: normalize: If ``True``, divide each per-alias difference by the length of the longer sub-sequence before summing. """ normalize: bool = False class TotalLengthDiffTrajectoryMetric( TrajectoryMetric, register_name="total_length_diff" ): """Trajectory distance as the sum of per-alias sequence-length differences. For each alias present in both trajectories: ``diff_alias = |len(alias_a) - len(alias_b)|`` The final distance is the sum (or normalised sum) of those values. """ SETTINGS_CLASS = TotalLengthDiffSettings def __init__(self, normalize: bool = False) -> None: super().__init__(settings=TotalLengthDiffSettings(normalize=normalize)) # ------------------------------------------------------------------ # Required implementation # ------------------------------------------------------------------ def _compute(self, traj_a, traj_b) -> float: """Sum of per-alias absolute length differences.""" total = 0.0 for alias in traj_a: if alias not in traj_b: continue la = len(traj_a[alias]) lb = len(traj_b[alias]) diff = abs(la - lb) if self.settings.normalize: denom = max(la, lb) diff = (diff / denom) if denom > 0 else 0.0 total += diff return float(total) Instantiate and inspect ----------------------- metric = TotalLengthDiffTrajectoryMetric(normalize=True) print(metric) Compute a distance between two trajectories -------------------------------------------- ids = tpool.unique_ids traj_a = tpool[ids[0]] traj_b = tpool[ids[1]] print( f"Trajectory A | visits: {len(traj_a['visits'])}, status: {len(traj_a['status'])}" ) print( f"Trajectory B | visits: {len(traj_b['visits'])}, status: {len(traj_b['status'])}" ) print(f"Distance: {metric(traj_a, traj_b):.4f}") Compute a full pairwise distance matrix ---------------------------------------- ``compute_matrix`` is inherited from the base class and works out of the box once ``_compute`` is implemented. dm = metric.compute_matrix(tpool) dm.to_frame().head() ---------------------------------------- ## Barplot """ Barplot ======= Aggregate entity features across a :class:`~tanat.sequence.SequencePool` with :class:`~tanat.visualization.SequenceVisualizer`. Three aggregation modes are available: - ``show_as="count"``: raw occurrences per label (all pool types) - ``show_as="rate"``: relative frequency, bars sum to 1 (all pool types) - ``show_as="duration"``: total cumulated duration per label (interval / state pools only) """ Imports ~~~~~~~ import polars as pl from tanat import build_intervals from tanat.dataset import simulate_intervals, simulate_static from tanat.visualization import SequenceVisualizer Simulate data ~~~~~~~~~~~~~ :func:`~tanat.dataset.simulation.intervals.simulate_intervals` produces one row per interval. The second feature (``status``) is categorical; it groups the bars. temporal = simulate_intervals( n_ids=80, seq_length_range=(3, 12), features=["value", "status"], seed=42, ) print(temporal.shape, temporal.columns.tolist()) temporal.head() Build the pool ~~~~~~~~~~~~~~ pool = build_intervals( temporal_data=temporal, id_column="id", start_column="start", end_column="end", ) pool.cast_features({"status": pl.Categorical}, is_static=False) print(pool) Count: occurrences per label ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ``show_as="count"`` (default) counts how many intervals carry each label. # fmt: off SequenceVisualizer.barplot(show_as="count") \ .title("Interval count by status") \ .draw(pool, entity_feature="status") \ .show() # fmt: on Rate: relative frequency ~~~~~~~~~~~~~~~~~~~~~~~~~ ``show_as="rate"`` normalises counts so bars sum to 1. Combine with ``sort="descending"`` to put the most frequent label first. # fmt: off SequenceVisualizer.barplot(show_as="rate", sort="descending") \ .title("Relative frequency by status (descending)") \ .y_axis(label="Rate") \ .draw(pool, entity_feature="status") \ .show() # fmt: on Duration: total time per label ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ``show_as="duration"`` sums ``end − start`` per label. ``display_unit`` converts the result to a human-readable time unit. .. note:: Duration mode requires an interval or state pool. Event pools (point observations) have no duration. # fmt: off SequenceVisualizer.barplot(show_as="duration", display_unit="hours") \ .title("Total duration per status (hours)") \ .y_axis(label="Hours") \ .draw(pool, entity_feature="status") \ .show() # fmt: on Horizontal orientation ~~~~~~~~~~~~~~~~~~~~~~ ``orientation="horizontal"`` flips the axes, handy when label names are long. # fmt: off SequenceVisualizer.barplot( show_as="count", orientation="horizontal", sort="descending", ) \ .title("Interval count by status (horizontal)") \ .draw(pool, entity_feature="status") \ .show() # fmt: on Color customization ~~~~~~~~~~~~~~~~~~~ The ``.colors()`` method accepts three formats: - **Named colormap** string: ``"Set2"``, ``"tab10"``, ``"Pastel1"``, … - **Dict** mapping label → hex color - **No argument** (default): matplotlib default color cycle # Named colormap # fmt: off SequenceVisualizer.barplot(show_as="count") \ .colors("Set2") \ .title("Count (Set2 palette)") \ .draw(pool, entity_feature="status") \ .show() # fmt: on # Explicit dict: one color per label palette = { "A": "#2ecc71", "B": "#e74c3c", "C": "#3498db", "D": "#f39c12", "E": "#9b59b6", } # fmt: off SequenceVisualizer.barplot(show_as="count") \ .colors(palette) \ .title("Count (custom dict palette)") \ .draw(pool, entity_feature="status") \ .show() # fmt: on Single sequence ~~~~~~~~~~~~~~~ Pass a :class:`~tanat.sequence.Sequence` directly for a per-individual view. seq = pool[pool.unique_ids[0]] print(f"ID {seq.id_value}: {len(seq)} intervals") # fmt: off SequenceVisualizer.barplot(show_as="count") \ .title(f"Status counts, sequence {seq.id_value}") \ .colors("Set2") \ .draw(seq, entity_feature="status") \ .show() # fmt: on Layout and style ~~~~~~~~~~~~~~~~ # Grid + capped y-axis # fmt: off SequenceVisualizer.barplot(show_as="rate", sort="descending") \ .figsize(8, 4) \ .grid() \ .x_axis(rotation=30) \ .y_axis(limit_max=1, label="Rate") \ .colors("Set2") \ .title("Rate (grid, capped y-axis)") \ .draw(pool, entity_feature="status") \ .show() # fmt: on # Slim bars with a visible edge # fmt: off SequenceVisualizer.barplot(show_as="count") \ .colors("Set2") \ .marker(bar_width=0.5, alpha=0.85, edge_color="#333333") \ .title("Count (slim bars with edge)") \ .draw(pool, entity_feature="status") \ .show() # fmt: on Faceting ~~~~~~~~ ``.facet()`` splits the chart into a grid of panels, one per unique value of a chosen feature. Here we attach per-sequence static data and facet on ``group``. static_df = simulate_static(n_ids=80, features=["age", "group"], seed=0) pool.add_static_features(static_df) pool.cast_features({"group": pl.Categorical}, is_static=True) # fmt: off SequenceVisualizer.barplot(show_as="count") \ .facet(by="group", is_static=True, cols=3) \ .colors("Set2") \ .draw(pool, entity_feature="status") \ .show() # fmt: on Inspect ``prepare_data()`` ~~~~~~~~~~~~~~~~~~~~~~~~~~ ``prepare_data()`` returns the aggregated Polars DataFrame before rendering. The result is cached: calling ``.draw()`` on the same builder reuses it. builder = SequenceVisualizer.barplot(show_as="rate", sort="descending") df = builder.prepare_data(pool, entity_feature="status") df ---------------------------------------- ## Distribution """ Distribution ============ Visualize state-occupancy distributions over time with :class:`~tanat.visualization.SequenceVisualizer`. Each time bin shows how many (or what fraction of) sequences occupy each state at that moment, using **occupancy-based binning**: a state segment contributes to every bin it overlaps. .. note:: Compatible with **state** pools only. Other pool types raise ``UnsupportedSequenceTypeError``. """ Imports ~~~~~~~ import polars as pl from tanat import build_states from tanat.dataset import simulate_states, simulate_static from tanat.visualization import SequenceVisualizer Simulate data ~~~~~~~~~~~~~ :func:`~tanat.dataset.simulation.states.simulate_states` produces strictly contiguous states (``end[i] == start[i+1]``). The second feature (``status``) is categorical; it labels the occupancy areas. temporal = simulate_states( n_ids=80, seq_length_range=(4, 12), features=["value", "status"], seed=42, ) print(temporal.shape, temporal.columns.tolist()) temporal.head() Build the pool ~~~~~~~~~~~~~~ :func:`~tanat.sequence.shortcuts.build_states` accepts an explicit ``end_column`` when the data is already contiguous; :func:`~tanat.dataset.simulate_states` always guarantees. pool = build_states( temporal_data=temporal, id_column="id", start_column="start", end_column="end", ) pool.cast_features({"status": pl.Categorical}, is_static=False) print(pool) Default: percentage stacked area ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ``mode="percentage"`` (default) renders each state's share summing to 100% per bin, the classic state-sequence distribution chart. # fmt: off SequenceVisualizer.distribution(mode="percentage", bin_size="1mo") \ .title("State distribution over time (%, monthly bins)") \ .x_axis(label="Date", rotation=30, autofmt_xdate=True) \ .y_axis(label="% of sequences") \ .colors("Set2") \ .draw(pool, entity_feature="status") \ .show() # fmt: on Modes: count, proportion, percentage ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ .. list-table:: :header-rows: 1 * - ``mode=`` - y-axis - Sum per bin * - ``"count"`` - raw number of sequences - varies * - ``"proportion"`` - fraction (0–1) - 1.0 * - ``"percentage"`` - percent (0–100) - 100 # Raw counts # fmt: off SequenceVisualizer.distribution(mode="count", bin_size="3mo") \ .title("State distribution: count (3-month bins)") \ .x_axis(label="Date", rotation=30, autofmt_xdate=True) \ .y_axis(label="Number of sequences") \ .colors("tab10") \ .draw(pool, entity_feature="status") \ .show() # fmt: on # Proportion (0-1 scale) # fmt: off SequenceVisualizer.distribution(mode="proportion", bin_size="3mo") \ .title("State distribution: proportion (3-month bins)") \ .x_axis(label="Date", rotation=30, autofmt_xdate=True) \ .y_axis(label="Proportion") \ .colors("Pastel1") \ .draw(pool, entity_feature="status") \ .show() # fmt: on Bin size ~~~~~~~~ ``bin_size`` accepts any Polars duration string: ``"1d"``, ``"1w"``, ``"1mo"``, ``"1y"``. Use coarser bins for long time horizons, finer bins for detailed short-term patterns. # Yearly bins, smoothest view for long horizons # fmt: off SequenceVisualizer.distribution(mode="percentage", bin_size="1y") \ .title("State distribution (1-year bins)") \ .x_axis(label="Year", rotation=30) \ .y_axis(label="% of sequences") \ .colors("Set2") \ .draw(pool, entity_feature="status") \ .show() # fmt: on Flat (unstacked) fills ~~~~~~~~~~~~~~~~~~~~~~ ``stacked=False`` renders overlapping transparent fills, easier to compare individual state curves without stacking artefacts. # fmt: off SequenceVisualizer.distribution(mode="proportion", bin_size="1mo", stacked=False) \ .title("State distribution: flat fills (monthly bins)") \ .x_axis(label="Date", rotation=30, autofmt_xdate=True) \ .y_axis(label="Proportion") \ .marker(alpha=0.4) \ .colors("tab10") \ .draw(pool, entity_feature="status") \ .show() # fmt: on Single sequence ~~~~~~~~~~~~~~~ Pass a :class:`~tanat.sequence.Sequence` directly to inspect one individual's state occupancy. seq = pool[pool.unique_ids[0]] print(f"ID {seq.id_value}: {len(seq)} states") # fmt: off SequenceVisualizer.distribution(mode="count", bin_size="1mo") \ .title(f"State occupancy, sequence {seq.id_value}") \ .x_axis(label="Date", rotation=30, autofmt_xdate=True) \ .y_axis(label="Occupied") \ .colors("Set2") \ .draw(seq, entity_feature="status") \ .show() # fmt: on Relative time mode ~~~~~~~~~~~~~~~~~~ ``time_mode="relative"`` shifts every sequence so that T0 becomes the origin of the x-axis (0 days). The axis then shows a numeric offset in days, and ``bin_size`` retains its usual Polars duration meaning (e.g. ``"1d"`` → one-day bins). Call :meth:`~tanat.sequence.SequencePool.set_t0` to set the reference point; without it the lazy default (``position=0``) is used. # Anchor T0 to the first state of every sequence. pool.set_t0(position=0, anchor="start") # Pool: state distribution relative to each sequence's T0 # fmt: off SequenceVisualizer.distribution(time_mode="relative", bin_size="1d") \ .title("State distribution from T0 (pool)") \ .x_axis(label="Days from T0") \ .y_axis(label="% of sequences") \ .colors("Set2") \ .draw(pool, entity_feature="status") \ .show() # fmt: on # Single sequence: occupancy relative to that individual's T0 # fmt: off SequenceVisualizer.distribution(time_mode="relative", mode="count", bin_size="1d") \ .title(f"State occupancy from T0 (ID {seq.id_value})") \ .x_axis(label="Days from T0") \ .y_axis(label="Occupied") \ .colors("Set2") \ .draw(seq, entity_feature="status") \ .show() # fmt: on Faceting ~~~~~~~~ ``.facet()`` splits the chart into a grid of panels, one per unique value of a chosen feature. Here we attach per-sequence static data and facet on ``group``. static_df = simulate_static(n_ids=80, features=["age", "group"], seed=0) pool.add_static_features(static_df) pool.cast_features({"group": pl.Categorical}, is_static=True) # fmt: off SequenceVisualizer.distribution(mode="percentage", bin_size="1mo") \ .facet(by="group", is_static=True, cols=3) \ .title("State distribution faceted by group") \ .x_axis(label="Date", rotation=30, autofmt_xdate=True) \ .y_axis(label="% of sequences") \ .colors("Set2") \ .draw(pool, entity_feature="status") \ .show() # fmt: on Inspect ``prepare_data()`` ~~~~~~~~~~~~~~~~~~~~~~~~~~ ``prepare_data()`` returns the binned Polars DataFrame before rendering. Columns: ``__BIN__``, ``__LABEL__``, ``__VALUE__``, and optionally ``__COLOR__``. builder = SequenceVisualizer.distribution(mode="percentage", bin_size="1mo") df = builder.prepare_data(pool, entity_feature="status") print(f"Shape: {df.shape}, schema: {dict(df.schema)}") df.head(10) ---------------------------------------- ## Spanplot """ Spanplot ======== Visualize segment-duration distributions with :class:`~tanat.visualization.SequenceVisualizer`. Three chart styles and two grouping dimensions are available: - ``kind``: ``"box"`` (default), ``"violin"``, or ``"strip"`` - ``group_by``: ``"category"`` (one column per label) or ``"id"`` (one column per sequence) .. note:: Compatible with **interval** and **state** pools only. Event pools have no duration; passing one raises ``UnsupportedSequenceTypeError``. """ Imports ~~~~~~~ import polars as pl from tanat import build_intervals from tanat.dataset import simulate_intervals, simulate_static from tanat.visualization import SequenceVisualizer Simulate data ~~~~~~~~~~~~~ :func:`~tanat.dataset.simulation.intervals.simulate_intervals` produces one row per interval. The second feature (``status``) is categorical; it groups the duration boxes. temporal = simulate_intervals( n_ids=80, seq_length_range=(4, 15), features=["value", "status"], seed=42, ) print(temporal.shape, temporal.columns.tolist()) temporal.head() Build the pool ~~~~~~~~~~~~~~ pool = build_intervals( temporal_data=temporal, id_column="id", start_column="start", end_column="end", ) pool.cast_features({"status": pl.Categorical}, is_static=False) print(pool) Box plot (default) ~~~~~~~~~~~~~~~~~~ ``kind="box"`` (default) renders a standard box-and-whisker plot. Groups are sorted by ascending median duration. # fmt: off SequenceVisualizer.spanplot(kind="box", display_unit="hours") \ .title("Duration distribution by status (box)") \ .y_axis(label="Duration (h)") \ .colors("Set2") \ .draw(pool, entity_feature="status") \ .show() # fmt: on Violin plot ~~~~~~~~~~~ ``kind="violin"`` shows the full kernel-density estimate, more informative when the distribution is multimodal or skewed. # fmt: off SequenceVisualizer.spanplot(kind="violin", display_unit="hours") \ .title("Duration distribution by status (violin)") \ .y_axis(label="Duration (h)") \ .colors("Set2") \ .draw(pool, entity_feature="status") \ .show() # fmt: on Strip plot ~~~~~~~~~~ ``kind="strip"`` renders individual points with horizontal jitter, ideal for spotting outliers and showing raw data density. # fmt: off SequenceVisualizer.spanplot(kind="strip", display_unit="hours") \ .title("Duration distribution by status (strip)") \ .y_axis(label="Duration (h)") \ .marker(alpha=0.4, point_size=3.5) \ .colors("Set2") \ .draw(pool, entity_feature="status") \ .show() # fmt: on Group by sequence ID ~~~~~~~~~~~~~~~~~~~~ ``group_by="id"`` shows one distribution per sequence ID. We work on a small subset to keep the chart readable. small_pool = pool.subset(ids=pool.unique_ids[:15]) # Box: one distribution per ID # fmt: off SequenceVisualizer.spanplot(group_by="id", kind="box", display_unit="hours") \ .title("Duration per sequence ID (box)") \ .y_axis(label="Duration (h)") \ .x_axis(rotation=45) \ .colors("tab20") \ .draw(small_pool, entity_feature="status") \ .show() # fmt: on Sort order ~~~~~~~~~~ ``sort`` controls the group ordering on the x-axis: - ``"ascending"``: ascending median duration (default) - ``"descending"``: descending median duration - ``"alphabetic"``: alphabetical label order # Descending: largest median first # fmt: off SequenceVisualizer.spanplot(kind="box", display_unit="hours", sort="descending") \ .title("sort='descending': largest median first") \ .y_axis(label="Duration (h)") \ .colors("Set2") \ .draw(pool, entity_feature="status") \ .show() # fmt: on Horizontal orientation ~~~~~~~~~~~~~~~~~~~~~~ ``orientation="horizontal"`` moves group labels to the y-axis, especially useful when label names are long. # fmt: off SequenceVisualizer.spanplot( kind="box", display_unit="hours", orientation="horizontal", ) \ .title("Duration distribution (horizontal box)") \ .x_axis(label="Duration (h)") \ .colors("Pastel1") \ .draw(pool, entity_feature="status") \ .show() # fmt: on Single sequence ~~~~~~~~~~~~~~~ Pass a :class:`~tanat.sequence.Sequence` directly for a per-individual view. seq = pool[pool.unique_ids[0]] print(f"ID {seq.id_value}: {len(seq)} intervals") # fmt: off SequenceVisualizer.spanplot(kind="strip", display_unit="hours") \ .title(f"Duration distribution, sequence {seq.id_value}") \ .y_axis(label="Duration (h)") \ .colors("tab10") \ .draw(seq, entity_feature="status") \ .show() # fmt: on Faceting ~~~~~~~~ ``.facet()`` splits the chart into a grid of panels, one per unique value of a chosen feature. Here we attach per-sequence static data and facet on ``group``. static_df = simulate_static(n_ids=80, features=["age", "group"], seed=0) pool.add_static_features(static_df) pool.cast_features({"group": pl.Categorical}, is_static=True) # fmt: off SequenceVisualizer.spanplot(kind="box", display_unit="hours") \ .facet(by="group", is_static=True, cols=3) \ .title("Duration distribution faceted by group") \ .y_axis(label="Duration (h)") \ .colors("Set2") \ .draw(pool, entity_feature="status") \ .show() # fmt: on Inspect ``prepare_data()`` ~~~~~~~~~~~~~~~~~~~~~~~~~~ ``prepare_data()`` returns the flat Polars DataFrame before rendering. Each row is one segment; ``__DURATION__`` holds the computed duration. builder = SequenceVisualizer.spanplot(display_unit="hours") df = builder.prepare_data(pool, entity_feature="status") df.head() ---------------------------------------- ## Timeline """ Timeline ======== Visualize a :class:`~tanat.sequence.Sequence` or :class:`~tanat.sequence.SequencePool` as a timeline using :class:`~tanat.visualization.SequenceVisualizer`. Each row spans ``[start, end]`` for interval and state pools (horizontal bars); event pools render as scatter points. Two row-organisation modes are available: - ``group_by="id"``: one row per sequence (default, up to 30 sequences) - ``group_by="category"``: one row per unique label value """ Imports ~~~~~~~ import polars as pl from tanat import build_events, build_intervals from tanat.dataset import simulate_events, simulate_intervals, simulate_static from tanat.visualization import SequenceVisualizer Simulate data ~~~~~~~~~~~~~ :func:`~tanat.dataset.simulation.intervals.simulate_intervals` produces one row per interval. The second feature (``status``) is categorical; it becomes the label rendered on the timeline. temporal = simulate_intervals( n_ids=20, seq_length_range=(4, 10), features=["value", "status"], seed=42, ) print(temporal.shape, temporal.columns.tolist()) temporal.head() Build the pool ~~~~~~~~~~~~~~ pool = build_intervals( temporal_data=temporal, id_column="id", start_column="start", end_column="end", ) pool.cast_features({"status": pl.Categorical}, is_static=False) print(pool) Flat timeline: one row per sequence ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ``group_by="id"`` (default) assigns one horizontal band per sequence. Each bar spans the ``[start, end]`` of that interval. # fmt: off SequenceVisualizer.timeline() \ .title("Interval timeline (flat stacking)") \ .colors("Set2") \ .draw(pool, entity_feature="status") \ .show() # fmt: on Category stacking: one row per label ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ``group_by="category"`` collapses all sequences onto one row per unique label value. Useful to compare when each label is active across the time axis. # fmt: off SequenceVisualizer.timeline(group_by="category") \ .title("Interval timeline (category stacking)") \ .colors("Set2") \ .draw(pool, entity_feature="status") \ .show() # fmt: on Single sequence ~~~~~~~~~~~~~~~ Pass a :class:`~tanat.sequence.Sequence` directly for a per-individual view. seq = pool[pool.unique_ids[0]] print(f"ID {seq.id_value}: {len(seq)} intervals") # fmt: off SequenceVisualizer.timeline() \ .title(f"Single sequence (ID {seq.id_value})") \ .colors("tab10") \ .draw(seq, entity_feature="status") \ .show() # fmt: on Event pool: scatter points ~~~~~~~~~~~~~~~~~~~~~~~~~~~ For event pools (point observations) the timeline renders scatter marks instead of horizontal bars. event_temporal = simulate_events( n_ids=15, seq_length_range=(5, 15), features=["score", "action"], seed=0, ) event_pool = build_events( temporal_data=event_temporal, id_column="id", time_column="time", ) event_pool.cast_features({"action": pl.Categorical}, is_static=False) print(event_pool) # fmt: off SequenceVisualizer.timeline() \ .title("Event timeline (scatter points)") \ .colors("tab10") \ .draw(event_pool, entity_feature="action") \ .show() # fmt: on Layout and style ~~~~~~~~~~~~~~~~ # Wide figure with a grid, timestamps easier to read # fmt: off SequenceVisualizer.timeline() \ .figsize(12, 5) \ .grid() \ .colors("Set2") \ .x_axis(label="Time", autofmt_xdate=True) \ .y_axis(show=False) \ .title("Wide timeline (grid, hidden y-axis)") \ .draw(pool, entity_feature="status") \ .show() # fmt: on # Thin semi-transparent bars with a black edge # fmt: off SequenceVisualizer.timeline() \ .colors("Set2") \ .marker(bar_height=0.3, alpha=0.5, edge_color="black") \ .title("Thin transparent bars") \ .draw(pool, entity_feature="status") \ .show() # fmt: on Relative time (aligned to T0) ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ``time_mode="relative"`` aligns every sequence to its own T0 reference point so that the x-axis shows an offset in days from that anchor instead of absolute timestamps. Call :meth:`~tanat.sequence.SequencePool.set_t0` first to define the reference date; without it the lazy default (``position=0``, i.e. the first row) is used. # Anchor T0 to the first interval of every sequence. pool.set_t0(position=0, anchor="start") # Pool: all sequences aligned to their own t0 # fmt: off SequenceVisualizer.timeline(time_mode="relative") \ .title("Timeline aligned to T0 (pool)") \ .x_axis(label="Days from T0") \ .colors("Set2") \ .draw(pool, entity_feature="status") \ .show() # fmt: on # Single sequence: the same shift applied to one individual # fmt: off SequenceVisualizer.timeline(time_mode="relative") \ .title(f"Timeline aligned to T0 (ID {seq.id_value})") \ .x_axis(label="Days from T0") \ .colors("Set2") \ .draw(seq, entity_feature="status") \ .show() # fmt: on Faceting ~~~~~~~~ ``.facet()`` splits the chart into a grid of panels, one per unique value of a chosen feature. Here we attach per-sequence static data and facet on ``group``. static_df = simulate_static(n_ids=20, features=["age", "group"], seed=0) pool.add_static_features(static_df) pool.cast_features({"group": pl.Categorical}, is_static=True) # fmt: off SequenceVisualizer.timeline() \ .facet(by="group", is_static=True, cols=3) \ .title("Interval timeline faceted by group") \ .colors("Set2") \ .draw(pool, entity_feature="status") \ .show() # fmt: on Inspect ``prepare_data()`` ~~~~~~~~~~~~~~~~~~~~~~~~~~ ``prepare_data()`` exposes the intermediate Polars DataFrame before rendering. Columns: ``__ID__``, ``__Y_POSITION__``, ``__TIME__``, ``__END__``, ``__LABEL__``, and optionally ``__COLOR__``. builder = SequenceVisualizer.timeline(group_by="category").colors("Set2") df = builder.prepare_data(pool, entity_feature="status") df.head(10) ---------------------------------------- ## Sequence Level Zeroing """ Sequence Level Zeroing ====================== Align an :class:`~tanat.sequence.IntervalSequencePool` to a reference date (T0) using each of the four built-in strategies. .. list-table:: :header-rows: 1 :widths: 20 80 * - Strategy - Description * - ``position`` - T0 = temporal value at a given row index (``0`` = first, ``-1`` = last) * - ``direct`` - T0 = fixed scalar or per-id ``dict`` * - ``feature`` - T0 = value of a static feature column * - ``query`` - T0 = first/last row matching a Polars expression After calling ``set_t0``, inspect the results with ``pool.t0_data()``, ``seq.t0``, and ``seq.t0_nearest_rank``. See :doc:`../../../reference/zeroing` for the complete reference. """ Imports ~~~~~~~ import polars as pl import pandas as pd from datetime import datetime, timedelta from tanat import build_intervals from tanat.dataset import simulate_intervals, simulate_static Simulate data ~~~~~~~~~~~~~ Generate a small :class:`~tanat.sequence.IntervalSequencePool` with both temporal and static features so we can exercise all four strategies. temporal = simulate_intervals( n_ids=40, features=["value", "status"], seed=42, ) temporal.head() static = simulate_static(n_ids=40, features=["age"], seed=0) static.head() Build the pool ~~~~~~~~~~~~~~ pool = build_intervals( temporal_data=temporal, id_column="id", start_column="start", end_column="end", static_data=static, ) print(pool) Strategy 1 - position ~~~~~~~~~~~~~~~~~~~~~~ ``set_t0(position=N)`` selects the temporal value at row index ``N`` (0-based; negative indices count from the end). For interval and state pools the ``anchor`` parameter controls which end of the interval is used: ``"start"`` (default) or ``"end"``. # First row, start of interval pool.set_t0(position=0, anchor="start") print("position=0, anchor='start'") pool.t0_data().head() # Last row, end of interval pool.set_t0(position=-1, anchor="end") print("position=-1, anchor='end'") pool.t0_data().head() Strategy 2 - direct ~~~~~~~~~~~~~~~~~~~~ ``set_t0(direct=value)`` assigns the **same** timestamp to every sequence. ``set_t0(direct={id: value, ...})`` assigns a **per-id** timestamp; IDs absent from the dict receive ``_T0_ = null``. # Scalar: same T0 for all sequences pool.set_t0(direct=datetime(2020, 1, 1)) print("direct scalar") pool.t0_data().head() # Dict: per-id mapping first_ids = pool.unique_ids[:3] per_id_map = { first_ids[0]: datetime(2020, 1, 10), first_ids[1]: datetime(2020, 2, 20), first_ids[2]: datetime(2020, 3, 15), } pool.set_t0(direct=per_id_map) print("direct per-id (only 3 IDs in dict → remaining get null)") pool.t0_data().head(6) Strategy 3 - feature ~~~~~~~~~~~~~~~~~~~~~ ``set_t0(feature="col")`` reads T0 from a **static feature column**. The feature dtype must match the pool's temporal dtype; cast it with :py:meth:`~tanat.sequence.base.pool.SequencePool.cast_features` if needed. We first attach a custom static column ``index_date`` whose dtype already matches the pool's temporal dtype (``Datetime[us]``). # Build a per-id index_date column (Datetime[us] to match the pool's time index) n = len(pool) index_dates = pd.DataFrame( { "id": pool.unique_ids, "index_date": pd.array( [datetime(2020, 1, 1) + timedelta(days=int(i * 7)) for i in range(n)], dtype="datetime64[us]", ), } ) pool.add_static_features(index_dates) pool.set_t0(feature="index_date") print("feature='index_date'") pool.t0_data().head() # All IDs have an index_date so no nulls for this strategy null_count = pool.t0_data()["_T0_"].isnull().sum() print(f"Sequences with _T0_ = null: {null_count}/{len(pool)}") Strategy 4 - query ~~~~~~~~~~~~~~~~~~~ ``set_t0(query=expr)`` scans entity rows and picks the **first** (or last with ``use_first=False``) row where the Polars expression is ``True``. The ``anchor`` parameter controls which end of the interval becomes T0. Sequences with no matching row receive ``_T0_ = null``. # T0 = start of the first row where status == "D" pool.set_t0(query=pl.col("status") == "D", anchor="start", use_first=True) print("First 'D' row (start)") pool.t0_data().head() # T0 = end of the last row where value > 0.8 pool.set_t0(query=pl.col("value") > 0.8, anchor="end", use_first=False) print("Last row with value > 0.8 (end)") pool.t0_data().head() Sequence-level properties ~~~~~~~~~~~~~~~~~~~~~~~~~~ After any ``set_t0`` call, every :class:`~tanat.sequence.base.sequence.Sequence` exposes ``seq.t0`` and ``seq.t0_nearest_rank``. .. list-table:: :header-rows: 1 :widths: 30 20 50 * - Property - Type - Description * - ``seq.t0`` - scalar | ``None`` - T0 for this sequence; ``None`` when no T0 could be computed * - ``seq.t0_nearest_rank`` - ``int`` | ``None`` - 0-based index of the entity at or just before T0 T0 is always set at the **pool level** via ``pool.set_t0(...)`` and propagated to every sequence automatically. There is no ``seq.set_t0()``: the pool is the single source of truth, which prevents desynchronisation between sequences after filtering or iterating. pool.set_t0(position=0, anchor="start") seq = pool[pool.unique_ids[0]] print(f"id : {seq.id_value}") print(f"t0 : {seq.t0}") print(f"t0_nearest_rank : {seq.t0_nearest_rank}") # Null case: highly selective query → some sequences have no matching row pool.set_t0(query=pl.col("value") > 0.999, anchor="start") null_seqs = [seq.id_value for seq in pool if seq.t0 is None] print( f"{len(null_seqs)}/{len(pool)} sequence(s) with t0 = None " f"(no row matched value > 0.999)" ) ---------------------------------------- ## Trajectory-Level Zeroing """ Trajectory-Level Zeroing ======================== Align a :class:`~tanat.trajectory.pool.TrajectoryPool` to a common reference date (T0) using any of the four built-in strategies. At the trajectory level, T0 is computed **once** (from a reference sub-pool when ``on=`` is provided) then shared across every child object: sub-pools, trajectories, and individual sequences all return the same ``t0`` value. Each object still computes its own ``t0_nearest_rank`` from its own temporal grid. .. list-table:: :header-rows: 1 :widths: 15 20 65 * - Strategy - ``on=`` required? - Description * - ``position`` - **yes** - T0 = temporal value at row index ``N`` in the reference sub-pool * - ``direct`` - no - T0 = scalar or per-id ``dict`` (no sub-pool lookup needed) * - ``feature`` - no - T0 = trajectory-level static feature column * - ``query`` - **yes** - T0 = first/last row matching a Polars expression in the reference sub-pool After ``set_t0``, inspect the results with ``tpool.t0_data()``, ``traj.t0``, and ``traj.t0_nearest_rank``. See :doc:`../../../reference/zeroing` for the complete reference and :doc:`sequence_t0` for the sequence-level equivalent. """ Imports ~~~~~~~ import polars as pl import warnings from datetime import datetime, timedelta from tanat import build_events, build_intervals, build_states, build_trajectories from tanat.dataset import simulate_trajectories, simulate_static Simulate data ~~~~~~~~~~~~~ Generate three temporal pools (interval, event, state) sharing the same IDs, plus a trajectory-level static DataFrame. N_IDS = 40 SEED = 42 data = simulate_trajectories( sequences={ "admissions": { "type": "interval", "n_ids": N_IDS, "features": ["value", "status"], }, "labs": {"type": "event", "n_ids": N_IDS, "features": ["value", "status"]}, "phases": { "type": "state", "n_ids": N_IDS, "features": ["phase_name", "score"], }, }, seed=SEED, ) static = simulate_static(n_ids=N_IDS, features=["age", "group"], seed=SEED) for alias, df in data.items(): print(f"{alias:12s}: {df.shape[0]:4d} rows, columns={df.columns}") print(f"{'static':12s}: {static.shape[0]:4d} rows, columns={static.columns}") Build the TrajectoryPool ~~~~~~~~~~~~~~~~~~~~~~~~ admissions_pool = build_intervals( temporal_data=data["admissions"], id_column="id", start_column="start", end_column="end", ) labs_pool = build_events( temporal_data=data["labs"], id_column="id", time_column="time", ) phases_pool = build_states( temporal_data=data["phases"], id_column="id", start_column="start", end_column="end", ) tpool = build_trajectories( pools={ "admissions": admissions_pool, "labs": labs_pool, "phases": phases_pool, }, static_data=static, id_column="id", ) print(tpool) Strategy 1 - position ~~~~~~~~~~~~~~~~~~~~~~ ``set_t0(position=N, on="alias")`` selects row ``N`` in the reference sub-pool. - ``on=`` is **required**: the row index depends on the target sub-pool. - ``anchor=`` controls which end of the interval is used (interval/state pools only). # First row of admissions, start of interval tpool.set_t0(position=0, anchor="start", on="admissions") print("position=0, anchor='start', on='admissions'") tpool.t0_data().head() # Last lab event (event pool: anchor is ignored) tpool.set_t0(position=-1, on="labs") print("position=-1, on='labs'") tpool.t0_data().head() Strategy 2 - direct ~~~~~~~~~~~~~~~~~~~~ ``set_t0(direct=value)`` assigns the **same** T0 to every trajectory. ``set_t0(direct={id: value, ...})`` assigns a per-ID T0; absent IDs receive ``_T0_ = null``. ``on=`` is **not** required: the value is provided directly. # Scalar: same T0 for everyone tpool.set_t0(direct=datetime(2010, 6, 1)) print("direct scalar") tpool.t0_data().head() # Per-ID dict (only 3 IDs mapped, rest get null) first_ids = tpool.unique_ids[:3] per_id_map = { first_ids[0]: datetime(2010, 1, 15), first_ids[1]: datetime(2011, 3, 20), first_ids[2]: datetime(2012, 7, 10), } tpool.set_t0(direct=per_id_map) print(f"direct per-id (3 IDs mapped, {len(tpool) - 3} get null)") tpool.t0_data().head(6) Strategy 3 - feature ~~~~~~~~~~~~~~~~~~~~~ ``set_t0(feature="col")`` reads T0 from a **trajectory-level static feature**. The dtype must match the pool's temporal dtype. ``on=`` is **not** required. # Add a custom datetime static feature ids = tpool.unique_ids admission_dates = [ datetime(2008, 1, 1) + timedelta(days=i * 45) for i in range(len(ids)) ] date_df = pl.DataFrame( { "id": ids, "admission_date": pl.Series(admission_dates).cast(pl.Datetime("us")), } ) tpool.add_static_features(date_df) tpool.set_t0(feature="admission_date") print("feature='admission_date'") tpool.t0_data().head() Strategy 4 - query ~~~~~~~~~~~~~~~~~~~ ``set_t0(query=expr, on="alias")`` picks the first (or last) row matching the expression in the reference sub-pool. ``on=`` is **required**: the expression refers to columns in the target sub-pool. # First lab with status matching a known category status_values = labs_pool.temporal_data(fmt="polars")["status"].unique().to_list() ref_status = status_values[0] tpool.set_t0( query=pl.col("status") == ref_status, use_first=True, on="labs", ) print(f"query: first lab with status=={ref_status!r}, on='labs'") tpool.t0_data().head() # Last admission matching a status (interval pool, anchor='end') adm_status_values = ( admissions_pool.temporal_data(fmt="polars")["status"].unique().to_list() ) ref_adm_status = adm_status_values[0] tpool.set_t0( query=pl.col("status") == ref_adm_status, anchor="end", use_first=False, on="admissions", ) print(f"query: last admission with status=={ref_adm_status!r}, anchor='end'") tpool.t0_data().head() Trajectory-level properties ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ After ``set_t0``, every :class:`~tanat.trajectory.trajectory.Trajectory` exposes two read-only properties: .. list-table:: :header-rows: 1 :widths: 30 20 50 * - Property - Type - Description * - ``traj.t0`` - scalar | ``None`` - T0 for this trajectory * - ``traj.t0_nearest_rank`` - ``dict[str, int | None]`` - Per-alias floor index: ``{"admissions": 0, "labs": 2, ...}`` tpool.set_t0(position=0, anchor="start", on="admissions") traj = tpool[tpool.unique_ids[0]] print(f"id : {traj.id_value}") print(f"t0 : {traj.t0}") print(f"t0_nearest_rank : {traj.t0_nearest_rank}") # Iterate over the first few trajectories for traj in list(tpool)[:6]: ranks = ", ".join(f"{alias}={rank}" for alias, rank in traj.t0_nearest_rank.items()) print(f" id={traj.id_value!r:<6} t0={str(traj.t0):<30} ranks=[{ranks}]") ``t0_data()`` column structure ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ``tpool.t0_data()`` returns one row per trajectory with: - ``_T0_``: the T0 value (identical across all sub-pools) - ``_T0_NEAREST_RANK_``: per-alias floor index (each sub-pool gets its own column because the temporal grids differ) df = tpool.t0_data(fmt="polars") print("Columns:", df.columns) print(f"Rows : {len(df)} (one per trajectory)") df.head() Null handling ~~~~~~~~~~~~~ Trajectories with ``_T0_ = null`` arise in the same situations as for sequence pools (out-of-range position, missing dict key, null feature, no query match). The trajectory is **not** dropped; ``traj.t0`` returns ``None`` and all nearest-rank values are ``None``. with warnings.catch_warnings(): warnings.simplefilter("ignore") tpool.set_t0(query=pl.col("value") > 1e9, on="labs") # no match null_trajs = [t.id_value for t in tpool if t.t0 is None] print(f"{len(null_trajs)}/{len(tpool)} trajectory(ies) with t0 = None") T0 is shared across all children ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ A single ``tpool.set_t0(...)`` call is enough. Every object you retrieve from the pool - a sub-pool, a trajectory, or an individual sequence - automatically returns the same ``t0`` value. ``t0_nearest_rank`` still varies: each pool computes its floor index on its own temporal grid. tpool.set_t0(position=0, anchor="start", on="admissions") uid = tpool.unique_ids[0] traj = tpool[uid] seq = traj["labs"] print(f"traj.t0 : {traj.t0}") print(f"traj['labs'].t0 : {seq.t0}") print(f"same value : {seq.t0 == traj.t0}") print(f"seq.t0_nearest_rank (labs grid): {seq.t0_nearest_rank}") # Sub-pools expose the same T0 via t0_data() labs_t0 = tpool.sequence_pools["labs"].t0_data(fmt="polars") print("Columns:", labs_t0.columns) labs_t0.head() ---------------------------------------- ## Discretizing time series into sequences """ Discretizing time series into sequences ======================================= Time series is another type of temporal data that are often encountered, and there are already many libraries that are dedicated to the analysis of such type of data, for instance the `aeon toolkit `_ or `sktime `_. In this tutorial, we illustrate the complementarity between a library dedicated to time series, and *TanaT* dedicated to the analysis of temporal sequences. More specifically, time series segmentation is a machine learning task that bridges the two worlds. It identifies contiguous regions of a time series with a consistent behavior. The segmentation output is a kind of a sequence state that can be further analyzed (for example, by clustering). .. note:: `aeon` is **not** bundled with TanaT. Install it separately:: pip install aeon **Concepts covered:** - Create a pool of state sequence - Visualization of *TanaT* and combination with additional plots """ # some generic imports import pandas as pd import polars as pl import numpy as np from tanat import build_events from tanat.visualization import SequenceVisualizer Prepare some simulated time series data ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Create a dataset of time series with aeon The `load_unit_test` creates time series between 0 and 1000. Scaling values between 0 and 1 fits better the processings to follow. from aeon.datasets import load_unit_test X_raw, _ = load_unit_test() X_raw = X_raw.squeeze() / 1000 Create a simple symbolic representation ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # A first naive approach for transforming time series as sequences # is to use the discretization technique named SAX, which discretize # the time and assign a symbol to each segment based on its mean value. # # The object that is obtained is a sequence of states, suitable for *TanaT*. # from aeon.transformations.collection.dictionary_based import SAX n_segments = 10 voc_size = 15 sax = SAX(n_segments=n_segments, alphabet_size=voc_size) X_sax = sax.fit_transform(X_raw).squeeze() # We obtain a sequence of symbols (a symbol is coded by an integer) print(X_sax) # Let us transform the results as a pandas dataframe ready for *TanaT* values = [ (seq, t, X_sax[seq, t]) for seq in range(X_raw.shape[0]) for t in range(n_segments) ] df = pd.DataFrame(values, columns=["id", "t", "value"]) # It can now be ingested by *TanaT* as a pool of event sequences # Here, we choose to create a pool of events and then to convert it # as a pool of state sequences. pool = build_events(df, id_column="id", time_column="t") state_pool = pool.as_state(end_value=11) state_pool.cast_features({"value": pl.String}) state_pool.cast_features({"value": pl.Categorical}) # .. note:: # We convert the `value` feature as Categorical for the # visualisation of sequences. Direct convertion from integer to # Categorical throw an error, the reason why we convert first # the values as string. print(state_pool) # Let us illustrate one of the sequence ts_id = 0 # identifier of the time-series / sequence to show seq = state_pool[ts_id] SequenceVisualizer.timeline().draw(seq, entity_feature="value").show() # We can also overlay the original time series on the *TanaT* visualization. # The x-axis is rescaled to align with the SAX timestamps, and the values are # shifted by $-0.5$ to center the curve within the symbolic view. figres = SequenceVisualizer.timeline().draw(seq, entity_feature="value") figres.figure.axes[0].plot( np.arange(X_sax.shape[1], step=X_sax.shape[1] / X_raw.shape[1]), X_raw[ts_id] - 0.5 ) figres.show() ---------------------------------------- ## Building pools from multiple sources """ Building pools from multiple sources ====================================== Combine **three file-based data sources** (two Parquet files and one CSV) that share the same schema into a single :class:`~tanat.sequence.IntervalSequencePool`, then compose a :class:`~tanat.trajectory.pool.TrajectoryPool` on top. **What you will learn:** - ``add_parquet`` / ``add_csv``: ingest data from files - Multi-source chaining: three ``.add_*()`` calls → one store - ``is_static=True`` for per-individual static features (``bmi``, ``site``) - Builder option: ``sort_anchor`` - Trajectory composition with ``TrajectoryPool.builder()`` - Workspace registration and ``pool.save()`` .. note:: SQL sources (``add_sql``) follow the exact same pattern and accept the same column-mapping parameters. They require ``connectorx``; install it with ``pip install 'tanat[sql]'``. See :doc:`../../reference/builder` for details. **Scenario:** Admission records arrive from three upstream systems (a hospital export, a supplementary cohort, and a legacy CSV extract). All three share the schema ``severity_score`` (float) / ``ward`` (categorical) and are merged into a single pool, then linked to a procedures pool through a ``TrajectoryPool``. """ Imports ~~~~~~~ import tempfile from pathlib import Path import datetime import pandas as pd import polars as pl from tanat import get_workspace, set_workspace from tanat.dataset.simulation import ( simulate_events, simulate_intervals, simulate_static, ) from tanat.sequence.type.event.pool import EventSequencePool from tanat.sequence.type.interval.pool import IntervalSequencePool from tanat.trajectory.pool import TrajectoryPool Workspace setup ~~~~~~~~~~~~~~~ A **workspace** registers every store built below under a short name so any script can reload them later without tracking file paths. set_workspace("~/.tanat_workspace/building_pools_tutorial") ws = get_workspace() ws.clear() print(ws) Generate source files ~~~~~~~~~~~~~~~~~~~~~ ``simulate_intervals`` generates one row per admission. Feature types follow the **numeric → categorical → boolean** cycle, so: - ``severity_score`` → float (clinical severity score at admission) - ``ward`` → categorical (care unit, values ``{A, B, C, D, E}``) Static features follow the same cycle: - ``bmi`` → float - ``site`` → categorical (care site, values ``{A, B, C, D, E}``) Patient IDs are prefixed with the source letter (``"a"``, ``"b"``, ``"c"``) to avoid collisions when the three files are merged. The simulation outputs columns ``id / start / end`` by default; these match the builder's defaults so **no column mapping is needed**. def _prefix_ids(df: pd.DataFrame, prefix: str) -> pd.DataFrame: """Prefix the ``id`` column with a source identifier.""" return df.assign(id=prefix + df["id"].astype(str)) # Feature schemas: defined once, reused across simulate_* and builder calls ADMISSION_FEATURES = ["severity_score", "ward"] # float, categorical STATIC_FEATURES = ["bmi", "site"] # float, categorical PROCEDURE_FEATURES = ["priority", "procedure"] # float, categorical TIME_RANGE = (datetime.datetime(2000, 1, 1), datetime.datetime(2000, 12, 31)) SEED = 42 tmpdir = Path(tempfile.mkdtemp()) # Simulate admissions in different sources (parquets, CSV, ...) # Source A src_a = simulate_intervals( n_ids=100, features=ADMISSION_FEATURES, time_range=TIME_RANGE, seed=SEED, ) src_a = _prefix_ids(src_a, "a") # Source B src_b = simulate_intervals( n_ids=50, features=ADMISSION_FEATURES, time_range=TIME_RANGE, seed=SEED + 1, ) src_b = _prefix_ids(src_b, "b") ## Source C src_c = simulate_intervals( n_ids=30, features=ADMISSION_FEATURES, time_range=TIME_RANGE, seed=SEED + 2, ) src_c = _prefix_ids(src_c, "c") # Simulate static demographics (one row per patient, all sources) df_static = simulate_static(n_ids=100, features=STATIC_FEATURES, seed=SEED + 10) tmp_static_A = _prefix_ids(df_static, "a") df_static_B = simulate_static(n_ids=50, features=STATIC_FEATURES, seed=SEED + 11) tmp_static_B = _prefix_ids(df_static_B, "b") df_static_C = simulate_static(n_ids=30, features=STATIC_FEATURES, seed=SEED + 12) tmp_static_C = _prefix_ids(df_static_C, "c") # Static single DataFrame static_df = pd.concat([tmp_static_A, tmp_static_B, tmp_static_C]) # Write to disk: two Parquet files + one CSV (mimicking three upstream systems) parquet_a = tmpdir / "hospital_export.parquet" parquet_b = tmpdir / "supplementary_cohort.parquet" csv_c = tmpdir / "legacy_extract.csv" parquet_static = tmpdir / "demographics.parquet" src_a.to_parquet(parquet_a, index=False) src_b.to_parquet(parquet_b, index=False) src_c.to_csv(csv_c, index=False) static_df.to_parquet(parquet_static, index=False) for label, df in [ ("A (Parquet)", src_a), ("B (Parquet)", src_b), ("C (CSV) ", src_c), ]: print(f"Source {label}: {df['id'].nunique()} patients") Build the admissions pool (multi-source) ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Three ``.add_*()`` calls on the same builder merge all rows into one store. A fourth call with ``is_static=True`` attaches per-patient demographics. Column mapping is omitted; the default ``id``/``start``/``end`` names already match the simulation output. admissions_path = ( IntervalSequencePool.builder(sort_anchor="start") # Source A: hospital export .add_parquet( str(parquet_a), id_column="id", start_column="start", end_column="end", features=["severity_score", "ward"], ) # Source B: supplementary cohort .add_parquet( str(parquet_b), id_column="id", start_column="start", end_column="end", features=["severity_score", "ward"], ) # Source C: legacy CSV extract .add_csv( str(csv_c), id_column="id", start_column="start", end_column="end", features=["severity_score", "ward"], try_parse_dates=True, ) # Static demographics .add_parquet( str(parquet_static), is_static=True, id_column="id", features=["bmi", "site"] ).build("admissions") ) Inspect the admissions pool ~~~~~~~~~~~~~~~~~~~~~~~~~~~ admissions = IntervalSequencePool(store=admissions_path) print(admissions) print(f"Total patients : {len(admissions)}") admissions.temporal_data().head(5) admissions.static_data().head(5) Builder option: ``sort_anchor`` ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ``sort_anchor`` controls how intervals are ordered within each sequence: ``"start"`` (default), ``"end"``, or ``"middle"`` (midpoint). We build all three variants into a dict, then display the same patient's sequence under each ordering. anchor_pools = { anchor: IntervalSequencePool( store=IntervalSequencePool.builder(sort_anchor=anchor) .add_parquet( str(parquet_a), id_column="id", start_column="start", end_column="end", features=["severity_score", "ward"], ) .build(f"admissions_{anchor}", exist_ok=True) ) for anchor in ("start", "end", "middle") } .. note:: ``anchor`` changes the order of entity rows within the same sequence. The same patient can therefore be represented differently depending on the chosen anchor. # sort_anchor = "start" pid = "a12" anchor_pools["start"][pid].temporal_data() # sort_anchor = "middle" anchor_pools["middle"][pid].temporal_data() # sort_anchor = "end" anchor_pools["end"][pid].temporal_data() Build a procedures pool ~~~~~~~~~~~~~~~~~~~~~~~ An :class:`~tanat.sequence.type.event.pool.EventSequencePool` stores single-timestamp events. Two Parquet files are merged into one pool. Feature schema: ``priority`` (float), ``procedure`` (categorical, values A–E) proc_a = simulate_events(n_ids=100, features=PROCEDURE_FEATURES, seed=SEED + 20) proc_a = _prefix_ids(proc_a, "a") proc_b = simulate_events(n_ids=80, features=PROCEDURE_FEATURES, seed=SEED + 21) proc_b = _prefix_ids(proc_b, "b") parquet_proc_a = tmpdir / "procedures_a.parquet" parquet_proc_b = tmpdir / "procedures_b.parquet" proc_a.to_parquet(parquet_proc_a, index=False) proc_b.to_parquet(parquet_proc_b, index=False) procedures_path = ( EventSequencePool.builder() .add_parquet( str(parquet_proc_a), id_column="id", time_column="time", features=["priority", "procedure"], ) .add_parquet( str(parquet_proc_b), id_column="id", time_column="time", features=["priority", "procedure"], ) .build("procedures") ) procedures = EventSequencePool(store=procedures_path) print(procedures) Compose a TrajectoryPool ~~~~~~~~~~~~~~~~~~~~~~~~ A :class:`~tanat.trajectory.pool.TrajectoryPool` groups multiple sequence pools under a shared ID space. Each pool is registered under an **alias**:: tpool["admissions"] → IntervalSequencePool (full pool) tpool[id] → Trajectory (one patient) tpool[id]["admissions"] → IntervalSequence (one sequence) tpool[id]["admissions"][0] → IntervalEntity (one entity) traj_path = ( TrajectoryPool.builder() .add("admissions", admissions) .add("procedures", procedures) .build("patient_trajectories", exist_ok=True) ) tpool = TrajectoryPool(store=traj_path) print(tpool) print(f"{len(tpool)} patients with at least one sequence") Navigate a single trajectory ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ first_id = tpool.unique_ids[0] traj = tpool[first_id] print(f"Patient {first_id!r}") for alias in ["admissions", "procedures"]: seq = traj[alias] print(f" {alias:<12}: {len(seq)} rows") traj["admissions"].temporal_data().head(3) Workspace: reload without tracking paths ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ All stores are registered in the workspace by name. Reload them in any script without knowing the file path. print(ws) admissions_reloaded = ws["admissions"] print(f"Reloaded: {len(admissions_reloaded)} patients") Save a modified pool ~~~~~~~~~~~~~~~~~~~~ ``pool.save()`` materialises any pending lazy transformations into a new store registered under the given name. admissions.cast_features({"ward": pl.Categorical}) saved_path = admissions.save("admissions_optimised", overwrite=True) print("Saved to", saved_path) ---------------------------------------- ## Analysing and clustering cohort sequences """ Analysing and clustering cohort sequences =========================================== **Scenario:** Starting from the cohort prepared in :doc:`filter_and_prepare`, you want to quantify how similar the admission sequences are across patients, group them into clusters with shared patterns, and visualise the results. **Concepts covered:** - Compute a pairwise sequence distance matrix with :class:`~tanat.metric.HammingEntityMetric` + :class:`~tanat.metric.LinearPairwiseSequenceMetric` - Cluster with :class:`~tanat.clustering.HierarchicalClusterer` - Inspect cluster membership - Produce a **faceted timeline** coloured by cluster """ Imports ~~~~~~~ import polars as pl from tanat.clustering import HierarchicalClusterer from tanat.criterion import ( EntityCriterion, LengthCriterion, ) from tanat.dataset import access from tanat.metric import HammingEntityMetric, LinearPairwiseSequenceMetric from tanat.sequence.type.interval.pool import IntervalSequencePool from tanat.visualization import SequenceVisualizer Rebuild the prepared cohort ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Self-contained rebuild using the builder API (see :doc:`explore_a_cohort` and :doc:`filter_and_prepare` for details). DB = f"sqlite:///{access('mimic4')}" pool = IntervalSequencePool( store=( IntervalSequencePool.builder() .add_sql( DB, "SELECT subject_id, admittime, dischtime," " admission_type, admission_location" ' FROM "hosp/admissions"', id_column="subject_id", start_column="admittime", end_column="dischtime", features=["admission_type", "admission_location"], ) .add_sql( DB, 'SELECT subject_id, gender, anchor_age AS age FROM "hosp/patients"', id_column="subject_id", is_static=True, features=["gender", "age"], ) .build("admissions_store", exist_ok=True) ) ) # ``pl.Categorical`` is required by the metric and clustering modules, and # enables consistent colour-coding across all visualisations. pool.cast_features({"admission_type": pl.Categorical}, is_static=False) print(pool) Cohort selection ~~~~~~~~~~~~~~~~ Same selection as :doc:`filter_and_prepare`: patients with at least 2 admissions who experienced at least one emergency, aligned to their first emergency (T0). ids_cohort = pool.which(LengthCriterion(ge=2)) & pool.which( EntityCriterion(query=pl.col("admission_type") == "EW EMER.") ) print(f"[Intersection] → {len(ids_cohort)} IDs") cohort = pool.subset(ids_cohort) # Align sequences to first emergency admission (T0). cohort.set_t0(query=pl.col("admission_type") == "EW EMER.", anchor="start") print(cohort) Step 1: Define the sequence metric ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ :class:`~tanat.metric.HammingEntityMetric` compares two admissions at the same position: distance 0 if they share the same type, 1 otherwise. :class:`~tanat.metric.LinearPairwiseSequenceMetric` aggregates these entity-level distances along the full sequence. The ``padding_penalty`` penalises sequences of different lengths. entity_metric = HammingEntityMetric(entity_feature="admission_type") sequence_metric = LinearPairwiseSequenceMetric( entity_metric=entity_metric, padding_penalty=1.0, ) Step 2: Compute the distance matrix ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ dist_matrix = sequence_metric.compute_matrix(cohort) dm = dist_matrix.data print(f"Distance matrix shape : {dist_matrix.shape}") Step 3: Hierarchical clustering ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ We group patients into 3 clusters using complete-linkage hierarchical clustering. After :meth:`~tanat.clustering.HierarchicalClusterer.fit`, the cluster label is automatically added as a static feature on the pool under the name given by ``cluster_column``. clusterer = HierarchicalClusterer( metric=sequence_metric, n_clusters=3, linkage="complete", cluster_column="adm_cluster", ) clusterer.fit(cohort) Step 4: Inspect cluster membership ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ :attr:`~tanat.clustering.Clusterer.clusters` exposes the fitted :class:`~tanat.clustering.Cluster` objects directly. Each provides a ``size`` and the list of patient ``items``. The cluster label is also available as a static feature (``adm_cluster``) for downstream filtering or visualisation. for cluster in clusterer.clusters: print(cluster) # Cluster labels are also stored as a static feature for downstream use. cohort.static_data().head() Step 5: Faceted timeline coloured by cluster ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Each panel shows the admission sequences of one cluster, aligned to T0. This makes it easy to spot structural differences between groups. # fmt: off SequenceVisualizer.timeline(time_mode="relative", allow_large=True) \ .title("Admission sequences by cluster") \ .x_axis(label="Admissions from first emergency (T0)") \ .colors("tab10") \ .facet(by="adm_cluster", is_static=True, cols=2, share_y=False) \ .draw(cohort, entity_feature="admission_type") \ .show() # fmt: on ---------------------------------------- ## Exploring a patient cohort """ Exploring a patient cohort ============================ **Scenario:** You have access to the MIMIC-IV demo dataset, a subset of de-identified electronic health records from the Beth Israel Deaconess Medical Center. Each patient has a sequence of hospital admissions characterised by their type (emergency, elective, …) and admission location. The goal of this tutorial is to load the data, build a TanaT pool, and perform an initial exploratory analysis. **Concepts covered:** - Access the MIMIC-IV demo with :func:`~tanat.dataset.access` - Ingest two SQL tables with the builder API into an :class:`~tanat.sequence.IntervalSequencePool` - Summarise the pool with :meth:`~tanat.sequence.base.pool.SequencePool.describe` - Navigate sequences and individual admissions - Visualise the admission-type distribution and individual timelines - Split into train / test with :meth:`~tanat.sequence.base.pool.SequencePool.train_test_split` .. note:: MIMIC-IV data is downloaded automatically on first use and cached locally. The demo subset covers ~100 patients and is freely available via Zenodo. """ Imports ~~~~~~~ import polars as pl from tanat.dataset import access from tanat.sequence.type.interval.pool import IntervalSequencePool from tanat.visualization import SequenceVisualizer Load the MIMIC-IV demo ~~~~~~~~~~~~~~~~~~~~~~ :func:`~tanat.dataset.access` downloads the SQLite database on the first call and returns the local path. The **builder API** accepts SQL queries directly, with no intermediate DataFrames. Two sources are chained: - ``hosp/admissions``: one row per hospital stay (temporal, interval). - ``hosp/patients``: one row per patient (static features, ``is_static=True``). DB = f"sqlite:///{access('mimic4')}" pool = IntervalSequencePool( store=( IntervalSequencePool.builder() .add_sql( DB, "SELECT subject_id, admittime, dischtime," " admission_type, admission_location" ' FROM "hosp/admissions"', id_column="subject_id", start_column="admittime", end_column="dischtime", features=["admission_type", "admission_location"], ) .add_sql( DB, 'SELECT subject_id, gender, anchor_age AS age FROM "hosp/patients"', id_column="subject_id", is_static=True, features=["gender", "age"], ) .build("admissions_store", exist_ok=True) ) ) # ``pl.Categorical`` is required by the metric and clustering modules, and # enables consistent colour-coding across all visualisations. pool.cast_features({"admission_type": pl.Categorical}, is_static=False) print(pool) Describe the cohort ~~~~~~~~~~~~~~~~~~~ :meth:`~tanat.sequence.base.pool.SequencePool.describe` summarises the pool. ``by_id=False`` returns aggregate statistics across all patients; ``by_id=True`` returns one row per patient. pool.describe(by_id=False) pool.describe(by_id=True).head() Distribution of admission types ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ A barplot gives a quick overview of how often each admission type appears across the full cohort. # fmt: off SequenceVisualizer.barplot() \ .title("Admission-type distribution (all patients)") \ .colors("tab10") \ .draw(pool, entity_feature="admission_type") \ .show() # fmt: on Individual patient timeline ~~~~~~~~~~~~~~~~~~~~~~~~~~~ Indexing the pool by a patient ID returns a :class:`~tanat.sequence.base.sequence.Sequence` whose admissions can be rendered as a horizontal timeline. pid = pool.unique_ids[0] seq = pool[pid] print(f"Patient {pid}: {len(seq)} admissions") print(seq.temporal_data()) # fmt: off SequenceVisualizer.timeline() \ .title(f"Admission timeline - patient {pid}") \ .colors("tab10") \ .draw(seq, entity_feature="admission_type") \ .show() # fmt: on Explore duration of admissions ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ A span plot shows the duration of each admission type as a box plot. This reveals which admission types tend to be short (e.g. observation) vs. long (e.g. elective surgery). # fmt: off SequenceVisualizer.spanplot(display_unit="hours") \ .title("Admission durations") \ .colors("tab10") \ .x_axis(rotation=80) \ .y_axis(label="Duration (hours)") \ .draw(pool, entity_feature="admission_type") \ .show() # fmt: on Train / test split ~~~~~~~~~~~~~~~~~~ :meth:`~tanat.sequence.base.pool.SequencePool.train_test_split` splits at the **patient level** for downstream predictive modelling. train, test = pool.train_test_split(test_size=0.2, random_state=42) print(f"Train : {len(train)} patients") print(f"Test : {len(test)} patients") Merging splits ~~~~~~~~~~~~~~ :meth:`~tanat.sequence.base.pool.SequencePool.extend` merges two pools back into one. Here we verify that the combined pool recovers all original patients. extended = train.extend(test) print(len(extended)) ---------------------------------------- ## Filtering and preparing a cohort """ Filtering and preparing a cohort ================================== **Scenario:** Starting from the admission pool built in :doc:`explore_a_cohort`, you want to isolate a clinically relevant sub-cohort, align all sequences to a shared reference point (the first emergency admission, called T0), and trim to a fixed observation window around that anchor. **Concepts covered:** - Combine :class:`~tanat.criterion.LengthCriterion` and :class:`~tanat.criterion.EntityCriterion` to select a sub-cohort - Detect admission-type progressions with :class:`~tanat.criterion.PatternCriterion` - Set a T0 reference with :meth:`~tanat.sequence.base.pool.SequencePool.set_t0` - Convert to an event pool with :meth:`~tanat.sequence.IntervalSequencePool.as_event` """ Imports ~~~~~~~ import polars as pl from tanat.criterion import ( ANY, EntityCriterion, LengthCriterion, PatternCriterion, ) from tanat.dataset import access from tanat.sequence.type.interval.pool import IntervalSequencePool from tanat.visualization import SequenceVisualizer Rebuild the admission pool ~~~~~~~~~~~~~~~~~~~~~~~~~~ Self-contained rebuild using the builder API (see :doc:`explore_a_cohort` for details). DB = f"sqlite:///{access('mimic4')}" pool = IntervalSequencePool( store=( IntervalSequencePool.builder() .add_sql( DB, "SELECT subject_id, admittime, dischtime," " admission_type, admission_location" ' FROM "hosp/admissions"', id_column="subject_id", start_column="admittime", end_column="dischtime", features=["admission_type", "admission_location"], ) .add_sql( DB, 'SELECT subject_id, gender, anchor_age AS age FROM "hosp/patients"', id_column="subject_id", is_static=True, features=["gender", "age"], ) .build("admissions_store", exist_ok=True) ) ) # ``pl.Categorical`` is required by the metric and clustering modules, and # enables consistent colour-coding across all visualisations. pool.cast_features({"admission_type": pl.Categorical}, is_static=False) print(pool) Step 1: Select the study cohort ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ We focus on patients who: 1. Have **at least 2 admissions**. 2. Experienced **at least one emergency admission**, which marks the highest-acuity patients. :meth:`~tanat.sequence.base.pool.SequencePool.which` returns a set of patient IDs. The ``&`` operator computes the intersection of both criteria. ids_multi = pool.which(LengthCriterion(ge=2)) ids_emergency = pool.which( EntityCriterion(query=pl.col("admission_type") == "EW EMER.") ) ids_cohort = ids_multi & ids_emergency print(f"[Intersection] → {len(ids_cohort)} IDs") cohort = pool.subset(ids_cohort) print(cohort) Step 2: Detect emergency-to-elective progressions ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ :class:`~tanat.criterion.PatternCriterion` matches patients whose admission sequence contains a given ordered pattern. :data:`~tanat.criterion.ANY` acts as a wildcard that matches any single admission. Here we use it to identify patients who transitioned from an emergency admission to an elective one at some point, suggesting clinical stabilisation. This is an **exploratory query**, the result is not used to filter the cohort further, but illustrates how pattern-based selection works. ids_stabilised = cohort.which( PatternCriterion( feature="admission_type", pattern=["EW EMER.", ANY, "ELECTIVE"], ) ) print(ids_stabilised) Step 3: Anchor sequences to T0 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ We set T0 at the **start of the first emergency admission** of each patient. This alignment ensures that position 0 always corresponds to the index emergency event, making cross-patient comparisons meaningful. cohort.set_t0(query=pl.col("admission_type") == "EW EMER.", anchor="start") print(cohort.t0_data().head(5)) Step 4: Visualise the cohort aligned to T0 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ With T0 set, all sequences are now relative to the first emergency admission. The timeline makes it easy to compare individual admission patterns across patients. # fmt: off SequenceVisualizer.timeline(time_mode="relative", display_unit="days", allow_large=True) \ .title("Admission sequences aligned to first emergency (T0)") \ .x_axis(label="Days from first emergency admission") \ .colors("tab10") \ .draw(cohort, entity_feature="admission_type") \ .show() # fmt: on Step 5: Convert to an event pool ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ For some analyses you only need the admission *timestamp*, not its duration. :meth:`~tanat.sequence.IntervalSequencePool.as_event` converts the interval pool to an :class:`~tanat.sequence.EventSequencePool`, replacing the ``(start, end)`` pair with a single ``time`` column. event_pool = cohort.as_event(anchor="start") print(event_pool) The temporal data now contains a single ``time`` column instead of ``start`` and ``end``. print(event_pool.temporal_data().head(5)) ---------------------------------------- ## Learning clinical temporal patterns with SWoTTeD """ Learning clinical temporal patterns with SWoTTeD ========================================== **Scenario:** You want to discover latent *clinical temporal patterns*, i.e. recurrent temporal patterns of medical procedures, directly from raw MIMIC-IV data, without any supervision. This tutorial shows a complete end-to-end pipeline: 1. Ingest procedure events from MIMIC-IV into a TanaT :class:`~tanat.sequence.EventSequencePool`. 2. Restrict to the most frequent procedure codes to keep the tensor tractable. 3. Call :meth:`~tanat.sequence.base.pool.SequencePool.to_tensor` with ``ohe=True`` to obtain a dense ``(N, M, K)`` array alongside patient IDs and feature names, all in a single call. 4. Feed the tensor to `SWoTTeD `_, a dictionary-learning model that decomposes the population into *R* temporal patterns, each with a characteristic temporal signature. 5. Interpret the result using the ``feature_names`` returned by :meth:`~tanat.sequence.base.pool.SequencePool.to_tensor`. .. note:: SWoTTeD is **not** bundled with TanaT. Install it separately:: pip install swotted SWoTTeD can be seen as a deep machine learning model. It is based on a a PyTorch module. Thus, it illustrates how *TanaT* can help you in applying machine learning models on temporal sequences datasets. .. attention:: SQL ingestion also requires ``connectorx``:: pip install 'tanat[sql]' **TanaT concepts covered:** - :class:`~tanat.sequence.EventSequencePool` from a SQL source - Feature frequency filtering with :meth:`~tanat.sequence.base.pool.SequencePool.temporal_data` - :meth:`~tanat.sequence.base.pool.SequencePool.to_tensor` with OHE - Temporal pattern interpretation using ``ids`` and ``feature_names`` """ Imports ~~~~~~~ import numpy as np import polars as pl import torch from omegaconf import OmegaConf from torch.utils.data import DataLoader from swotted import fastSWoTTeDDataset, fastSWoTTeDModule, fastSWoTTeDTrainer from tanat.criterion import EntityCriterion from tanat.dataset import access from tanat.sequence.type.event.pool import EventSequencePool Step 1: Discover the top procedure codes ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ We build a single pool over all procedure codes, count code frequencies, then restrict to the **top 30 most frequent codes**. Keeping only frequent codes avoids an extremely sparse OHE tensor (352 codes x 92 patients would be ~99% zeros). DB = f"sqlite:///{access('mimic4')}" pool = EventSequencePool( store=( EventSequencePool.builder() .add_sql( DB, 'SELECT subject_id, chartdate, icd_code FROM "hosp/procedures_icd"', id_column="subject_id", time_column="chartdate", features=["icd_code"], ) .build("procedures_store", exist_ok=True) ) ) TOP_K = 30 top_codes = ( pool.temporal_data() .groupby("icd_code") .size() .sort_values(ascending=False) .head(TOP_K) .index.tolist() ) print(f"Retaining {TOP_K} codes out of {pool.temporal_data()['icd_code'].nunique()}") print("Top codes:", top_codes[:10], "...") Step 2: Restrict the pool to the top codes and fix the vocabulary ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ :meth:`~tanat.sequence.base.pool.SequencePool.filter_entities` with an :class:`~tanat.criterion.EntityCriterion` prunes every event row whose ``icd_code`` is not in ``top_codes``. pool.filter_entities( EntityCriterion(query=pl.col("icd_code").is_in(top_codes)), inplace=True, ) # pl.Enum fixes the vocabulary to exactly TOP_K codes: OHE will produce one # column per code, named after the code itself (e.g. "icd_code_02HV33Z"). pool.cast_features({"icd_code": pl.Enum(top_codes)}) print(pool) Step 3: Encode as a dense 3-D tensor ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ :meth:`~tanat.sequence.base.pool.SequencePool.to_tensor` projects every patient's procedure history onto a shared daily time axis and returns a **3-tuple** ``(arr, ids, feature_names)``: - ``arr``: shape ``(N, M, K)``, i.e. N patients x M daily bins x K OHE codes. - ``ids``: the N patient identifiers aligned with axis 0. - ``feature_names``: the K column labels aligned with axis 2. ``ohe=True`` one-hot encodes ``icd_code`` in-place; ``fill_value=0`` replaces empty bins with zeros (no procedure recorded that day). .. hint:: :meth:`~tanat.sequence.base.pool.SequencePool.to_tensor` is a function that bridges the world of *TanaT*'s sequences with the worlds of deep machine learning. Tensors are the basic data structure for Keras, PyTorch or JAX machine learning engines. BIN_SIZE = "1D" # one bin per calendar day MAX_BINS = 90 # cap at 90 days (covers > 95 % of stays) arr, ids, feature_names = pool.to_tensor( features="icd_code", bin_size=BIN_SIZE, max_bins=MAX_BINS, fill_value=0, ohe=True, ) print(f"Tensor shape : {arr.shape}") # (N, MAX_BINS, K) print(f"Patients : {len(ids)}") print(f"Features : {len(feature_names)}") print(f"Sparsity : {(arr == 0).mean():.4%} empty bins") Step 4: Prepare the tensor for SWoTTeD ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ SWoTTeD's :class:`~swotted.fastSWoTTeDModule` expects a tensor of shape ``(N, K, T)`` ``(N, K, T)`` : patients × features × time The :meth:`to_tensor` returns ``(N, T, K)``. A single ``transpose`` aligns the axes. # (N, T, K) → (N, K, T) X = torch.from_numpy(arr.transpose(0, 2, 1).astype(np.float32)) print(f"SWoTTeD input shape : {X.shape}") # (N, K, MAX_BINS) Step 5: Train SWoTTeD ~~~~~~~~~~~~~~~~~~~~~~ We search for ``R = 5`` temporal patterns, each described by a temporal window of ``Tw = 7`` days. Training runs for 50 epochs on CPU, fast enough on this small cohort. R = 5 # number of temporal patterns to discover Tw = 7 # temporal window width (days) N_patients, K_codes, T_days = X.shape swotted_cfg = OmegaConf.create( { "model": { "non_succession": True, "sparsity": 0.1, "rank": R, "twl": Tw, "N": K_codes, "metric": "Bernoulli", # binary OHE data → Bernoulli loss }, "training": { "batch_size": N_patients, "nepochs": 50, "lr": 1e-2, }, "predict": { "nepochs": 20, "lr": 1e-2, }, } ) device = torch.device("cpu") model = fastSWoTTeDModule(swotted_cfg).to(device) loader = DataLoader( fastSWoTTeDDataset(X.to(device)), batch_size=N_patients, shuffle=False, collate_fn=lambda x: x, ) trainer = fastSWoTTeDTrainer( fast_dev_run=False, max_epochs=swotted_cfg.training.nepochs, accelerator="cpu", ) trainer.fit(model=model, train_dataloaders=loader) Step 6: Extract and interpret the learned temporal patterns ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ :meth:`~swotted.fastSWoTTeDModule.reorderPhenotypes` reorders the *R* temporal patterns by their activation strength. Each temporal pattern is a ``(K, Tw)`` matrix, i.e. a temporal signature over the ``K`` procedure codes. ``feature_names`` returned by :meth:`to_tensor` gives us the code label for each row, so we can read off *which procedures* drive each temporal pattern and *when* during the window they tend to occur. patterns, pathways = model.reorderPhenotypes(model.Ph.detach().cpu(), tw=Tw) patterns = ( patterns.detach().numpy() ) # (R, K, Tw): temporal signature per temporal pattern pathways = ( pathways.detach().numpy() ) # (N, R, T'): activation of each temporal pattern per patient print(f"Phenotypes shape : {patterns.shape}") print(f"Pathways shape : {pathways.shape}") # For each temporal pattern, print the top-3 most active procedure codes. # ``phenotypes`` has shape (R, K, Tw): sum over the time axis # to get the overall "weight" of each code in each temporal pattern. # # .. note:: # Here, we simplified the analysis of the patterns and we invite the # reader to dig more the SWoTTeD model for a deeper analysis of the # temporal patterns that are extracted. # # More specifically, SWoTTeD discover *temporal* patterns that describes # typical behaviors as small sequences of events. # code_weights = patterns.sum(axis=-1) # (R, K) print("\nTemporal pattern overview:") for r in range(R): top_idx = np.argsort(code_weights[r])[::-1][:3] # Strip the "icd_code_" prefix added by OHE for readability. top_codes_r = [feature_names[i].removeprefix("icd_code_") for i in top_idx] print(f" Top procedures for pattern {r + 1}: {top_codes_r}") Step 7: Assign temporal patterns back to patient IDs ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ``pathways`` tensor contains information about ``how much similar'' is a patient to each pattern, at a given time. By assigning a patient to the most similar pattern, we cluster the set of patients into set of patients sharing temporal patterns. Inpractice, ``pathways`` shape is (N, R, T'); take argmax over R (axis=1) to get the dominant temporal pattern index for each patient × time bin, then keep the most frequent dominant temporal pattern across time bins (majority vote). from scipy.stats import mode dominant_per_patient = mode(pathways.argmax(axis=1), axis=1).mode # shape: (N,) patient_patterns = dict(zip(ids, dominant_per_patient.tolist())) print("\nPatient -> dominant temporal pattern (first 10):") for pid, ph in list(patient_patterns.items())[:10]: print(f" {pid} -> temporal pattern {ph + 1}") ---------------------------------------- ## Survival analysis from cohort clusters """ Survival analysis from cohort clusters ======================================== **Scenario:** Building on the clusters identified in :doc:`analyse_and_cluster`, you want to ask whether patients in different admission clusters have different survival profiles. This tutorial shows how to build a time-to-event target with :meth:`~tanat.sequence.base.SequencePool.survival_target` and plot Kaplan-Meier curves for each cluster. **Concepts covered:** - Enrich an existing pool with a new static column via :meth:`~tanat.sequence.base.SequencePool.add_static_features` - Inspect the survival target in pandas format before modelling with :meth:`~tanat.sequence.base.SequencePool.survival_target` - Build a structured ``sksurv``-compatible target - Plot per-cluster Kaplan-Meier curves with ``sksurv`` and ``matplotlib`` """ Imports ~~~~~~~ import pandas as pd import matplotlib.pyplot as plt import polars as pl from sksurv.nonparametric import kaplan_meier_estimator from tanat.clustering import HierarchicalClusterer from tanat.criterion import EntityCriterion, LengthCriterion from tanat.dataset import access from tanat.metric import HammingEntityMetric, LinearPairwiseSequenceMetric from tanat.sequence.type.interval.pool import IntervalSequencePool Rebuild the cohort ~~~~~~~~~~~~~~~~~~~ We reuse the same ``admissions_store`` built in :doc:`explore_a_cohort` and :doc:`analyse_and_cluster`. DB_PATH = access("mimic4") DB = f"sqlite:///{DB_PATH}" pool = IntervalSequencePool( store=( IntervalSequencePool.builder() .add_sql( DB, "SELECT subject_id, admittime, dischtime," " admission_type, admission_location" ' FROM "hosp/admissions"', id_column="subject_id", start_column="admittime", end_column="dischtime", features=["admission_type", "admission_location"], ) .add_sql( DB, 'SELECT subject_id, gender, anchor_age AS age FROM "hosp/patients"', id_column="subject_id", is_static=True, features=["gender", "age"], ) .build("admissions_store", exist_ok=True) ) ) # ``pl.Categorical`` is required by the metric and clustering modules, and # enables consistent colour-coding across all visualisations. pool.cast_features({"admission_type": pl.Categorical}, is_static=False) print(pool) Cohort selection, T0 alignment, and clustering ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Same pipeline as :doc:`filter_and_prepare` and :doc:`analyse_and_cluster`: patients with at least 2 admissions who had at least one emergency, aligned to their first emergency, then grouped into 3 clusters. # Define the study cohort and align to T0. ids_cohort = pool.which(LengthCriterion(ge=2)) & pool.which( EntityCriterion(query=pl.col("admission_type") == "EW EMER.") ) print(f"Selected {len(ids_cohort)} patients for the cohort.") cohort = pool.subset(ids_cohort) cohort.set_t0(query=pl.col("admission_type") == "EW EMER.", anchor="start") print(cohort) # Cluster the cohort based on admission sequences. entity_metric = HammingEntityMetric(entity_feature="admission_type") sequence_metric = LinearPairwiseSequenceMetric( entity_metric=entity_metric, padding_penalty=1.0 ) clusterer = HierarchicalClusterer( metric=sequence_metric, n_clusters=3, linkage="complete", cluster_column="adm_cluster", ) clusterer.fit(cohort) Enrich the cohort with mortality data ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ``dod`` (date of death) is not in the original store. We load it directly from SQLite using the standard library to handle SQLite's empty-string representation of missing values, then attach it as a new static feature with :meth:`~tanat.sequence.base.SequencePool.add_static_features`. :meth:`~tanat.sequence.base.SequencePool.survival_target` will infer ``occurred`` automatically from ``dod.is_not_null()``. import sqlite3 con = sqlite3.connect(DB_PATH) dod_df = pd.read_sql('SELECT subject_id, dod FROM "hosp/patients"', con) con.close() # SQLite stores missing dates as empty strings, convert to NaT, then to datetime. dod_df["dod"] = pd.to_datetime(dod_df["dod"].replace("", pd.NaT), errors="coerce") cohort.add_static_features(dod_df, id_column="subject_id") # Inspect pool with the new static feature. print(cohort) Step 1: Inspect the survival target ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ :meth:`~tanat.sequence.base.SequencePool.survival_target` assembles a ``(occurred, time)`` pair for each patient: - ``occurred``: ``True`` if ``dod`` is not null (death observed within follow-up), ``False`` otherwise (censored). - ``time``: duration from T0 to death, or to the last recorded admission end for censored patients. Using ``fmt="pandas"`` first lets you inspect the result before fitting any model. survival_df, valid_ids = cohort.survival_target( endpoint_time="dod", fmt="pandas", ) print(f"Patients with valid survival data: {len(valid_ids)}") print(f"Observed deaths : {survival_df['occurred'].sum()}") survival_df.head() Step 2: Per-cluster survival summary ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ A quick summary table combining cluster size, observed death count, and the proportion of censored patients. rows = [] for cluster in clusterer.clusters: subset = cohort.subset(cluster.items) y, _ = subset.survival_target(endpoint_time="dod", fmt="sksurv") rows.append( { "cluster": cluster.id, "n_patients": cluster.size, "n_deaths": int(y["occurred"].sum()), "n_censored": int((~y["occurred"]).sum()), } ) pd.DataFrame(rows).sort_values("cluster").reset_index(drop=True) Step 3: Kaplan-Meier curves per cluster ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ We iterate over the 3 clusters, build a ``sksurv``-compatible structured array with ``fmt="sksurv"`` for each subset, then plot the Kaplan-Meier estimator. Patients excluded by :meth:`survival_target` (non-positive or unresolvable durations) are reported automatically via a warning. The shaded band is the 95 % log-log confidence interval; ``+`` tick marks indicate censored observations. fig, ax = plt.subplots(figsize=(9, 5)) ax.set_prop_cycle(color=plt.cm.tab10.colors) for cluster in clusterer.clusters: subset = cohort.subset(cluster.items) y, subset_ids = subset.survival_target(endpoint_time="dod", fmt="sksurv") time_points, survival_prob, conf_int = kaplan_meier_estimator( y["occurred"], y["time"].astype(float), conf_type="log-log", ) conf_lower, conf_upper = conf_int (line,) = ax.step( time_points, survival_prob, where="post", linewidth=2, label=f"Cluster {cluster.id} (n={cluster.size}, deaths={y['occurred'].sum()})", ) # 95 % confidence band. ax.fill_between( time_points, conf_lower, conf_upper, step="post", alpha=0.15, color=line.get_color(), ) # Mark censored observations with vertical ticks on the curve. censored_mask = ~y["occurred"] censored_times = y["time"][censored_mask].astype(float) # Interpolate survival probability at each censored time point. censored_probs = [ float(survival_prob[time_points <= t][-1]) if (time_points <= t).any() else 1.0 for t in censored_times ] ax.plot( censored_times, censored_probs, "+", color=line.get_color(), markersize=6, ) ax.set_title("Kaplan-Meier survival curves by admission cluster") ax.set_xlabel("Time from first emergency admission (T0) in days") ax.set_ylabel("Survival probability") ax.set_ylim(0, 1.05) ax.legend() ax.grid(alpha=0.3) plt.tight_layout() plt.show() ---------------------------------------- ## Clustering learning sessions by action patterns """ Clustering learning sessions by action patterns ================================================= **Scenario:** Building on :doc:`explore_sessions`, you want to group sessions with similar action sequences together to identify recurring learning strategies. This is the second tutorial in the :ref:`MOOC series `. **Concepts covered:** - Rebuild the session pool with :func:`~tanat.build_states` (self-contained) - Compute edit distances with :class:`~tanat.metric.HammingEntityMetric` + :class:`~tanat.metric.EditSequenceMetric` (Optimal Matching) - Cluster with :class:`~tanat.clustering.HierarchicalClusterer` - Inspect cluster membership via :attr:`~tanat.clustering.Clusterer.clusters` - Visualise per-cluster state distributions with a faceted plot """ Imports ~~~~~~~ import pandas as pd import polars as pl from tanat import build_states from tanat.clustering import HierarchicalClusterer from tanat.criterion import LengthCriterion from tanat.dataset import access from tanat.metric import EditSequenceMetric, HammingEntityMetric from tanat.visualization import SequenceVisualizer Rebuild the filtered session pool ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Self-contained rebuild (see :doc:`explore_sessions` for details). INACTIVITY = pd.Timedelta("2h") df = access("mooc_events") df["timecreated"] = pd.to_datetime(df["timecreated"]) df = df.sort_values(["user", "timecreated"]) df["session"] = ( (df["user"] != df["user"].shift()) | (df["timecreated"].diff() > INACTIVITY) ).cumsum() sessions = df[["user", "session"]].drop_duplicates() df["position"] = df.groupby("session").cumcount() pool = build_states( df[["session", "position", "Action"]], id_column="session", start_column="position", static_data=sessions, store_name="mooc_sessions_store", ) # ``pl.Categorical`` enables consistent colour-coding across visualisations # and is required by the metric module. pool.cast_features({"Action": pl.Categorical}, is_static=False) ids_keep = pool.which(LengthCriterion(ge=2, le=40)) pool_filtered = pool.subset(ids_keep) print(pool_filtered) Step 1: Define the sequence metric ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ We use **Optimal Matching** (edit distance), the standard metric for sequence analysis in the social sciences. :class:`~tanat.metric.HammingEntityMetric` compares two actions at the same position: distance 0 if they share the same type, 1 otherwise. :class:`~tanat.metric.EditSequenceMetric` extends this to full sequences by counting insertions, deletions, and substitutions. .. tip:: You can provide a custom substitution cost matrix to :class:`~tanat.metric.HammingEntityMetric` to reflect domain knowledge about action similarity (e.g. "Course_view" is closer to "Group_work" than to "Feedback"). See the API reference for details. entity_metric = HammingEntityMetric(entity_feature="Action") sequence_metric = EditSequenceMetric( entity_metric=entity_metric, indel_cost=1.0, ) Step 2: Cluster sessions ~~~~~~~~~~~~~~~~~~~~~~~~~ We group sessions into 5 clusters using complete-linkage hierarchical clustering. After :meth:`~tanat.clustering.HierarchicalClusterer.fit`, the cluster label is automatically added as a static feature under ``session_cluster``. clusterer = HierarchicalClusterer( metric=sequence_metric, n_clusters=5, linkage="complete", cluster_column="session_cluster", ) clusterer.fit(pool_filtered) Step 3: Inspect cluster membership ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ :attr:`~tanat.clustering.Clusterer.clusters` exposes the fitted :class:`~tanat.clustering.Cluster` objects directly. for cluster in clusterer.clusters: print(cluster) # Cluster labels are also stored as a static feature for downstream use. pool_filtered.static_data().head() Step 4: Faceted distribution per cluster ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Each panel shows how action proportions evolve across positions for one cluster. Structural differences between clusters reveal distinct learning strategies. # fmt: off SequenceVisualizer.distribution(bin_size=1) \ .title("Action distribution by learning cluster") \ .x_axis(label="Position in session") \ .facet(by="session_cluster", is_static=True, cols=3, share_y=True) \ .draw(pool_filtered, entity_feature="Action") \ .show() # fmt: on ---------------------------------------- ## Exploring learner activity sequences """ Exploring learner activity sequences ====================================== **Scenario:** You have interaction logs from a Moodle LMS and want to understand how learners engage with course material. **Concepts covered:** - Load an event log with :func:`~tanat.dataset.access` - Detect learning sessions from inactivity gaps - Build a :class:`~tanat.sequence.StateSequencePool` with :func:`~tanat.build_states` - Filter sequences by length with :class:`~tanat.criterion.LengthCriterion` - Visualise action distributions, timelines, and state distributions """ Imports ~~~~~~~ import random import pandas as pd import polars as pl from tanat import build_states from tanat.criterion import LengthCriterion from tanat.dataset import access from tanat.visualization import SequenceVisualizer Load and prepare the event log ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ :func:`~tanat.dataset.access` returns the MOOC dataset as a pandas DataFrame. Each row is a single learner interaction recorded by a Moodle LMS (~100 k events, ~118 learners). df = access("mooc_events") print(f"{len(df)} events · {df['user'].nunique()} learners") df.head() Step 1: Session detection ~~~~~~~~~~~~~~~~~~~~~~~~~~ Learning sessions are not labelled in the log. We define a session as a continuous period of activity: a **new session** begins when the same learner is idle for more than 2 hours, or when a different user appears. Each session receives a unique integer id that will serve as the sequence identifier in TanaT. INACTIVITY = pd.Timedelta("2h") df["timecreated"] = pd.to_datetime(df["timecreated"]) df = df.sort_values(["user", "timecreated"]) df["session"] = ( (df["user"] != df["user"].shift()) | (df["timecreated"].diff() > INACTIVITY) ).cumsum() print(f"Detected {df['session'].nunique()} sessions") # Static table: one row per session with the learner identifier. sessions = df[["user", "session"]].drop_duplicates() Step 2: Build the sequence pool ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Each session becomes one sequence. We use :func:`~tanat.build_states` with a **within-session position index** as the time axis (0 = first event, 1 = second, …). This abstracts away calendar time and focuses on the order of actions. The ``sessions`` table (one row per session) is passed as ``static_data`` so the learner identifier is attached to each sequence. # Add a within-session position index. df["position"] = df.groupby("session").cumcount() pool = build_states( df[["session", "position", "Action"]], id_column="session", start_column="position", static_data=sessions, store_name="mooc_sessions_store", ) # ``pl.Categorical`` enables consistent colour-coding across visualisations # and is required by the metric module. pool.cast_features({"Action": pl.Categorical}, is_static=False) print(pool) Step 3: Filter by length ~~~~~~~~~~~~~~~~~~~~~~~~~ The session length distribution is skewed: some outlier sessions contain hundreds of events. We keep sessions with **2 to 40 actions**, which covers the majority of learners while removing single-click noise and unrealistically long sessions. ids_keep = pool.which(LengthCriterion(ge=2, le=40)) pool_filtered = pool.subset(ids_keep) print(pool_filtered) Step 4: Action distribution ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ A bar plot shows the frequency of each action type across all sessions, giving a first overview of what learners do most. # fmt: off SequenceVisualizer.barplot(sort="descending") \ .title("Action type distribution") \ .draw(pool_filtered, entity_feature="Action") \ .show() # fmt: on Step 5: Sample timeline ~~~~~~~~~~~~~~~~~~~~~~~~ We draw 30 random sessions side by side. Each row is one session; each coloured block is one action at a given position. random.seed(42) sample_ids = random.sample(sorted(pool_filtered.unique_ids), 30) sample = pool_filtered.subset(sample_ids) # fmt: off SequenceVisualizer.timeline() \ .title("30 random learning sessions") \ .x_axis(label="Position in session") \ .draw(sample, entity_feature="Action") \ .show() # fmt: on Step 6: State distribution over position ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ The distribution plot shows how action proportions shift across positions, revealing how learners typically start and end their sessions. # fmt: off SequenceVisualizer.distribution(bin_size=1) \ .title("Action distribution over session progress") \ .x_axis(label="Position in session") \ .draw(pool_filtered, entity_feature="Action") \ .show() # fmt: on ---------------------------------------- ## Contributing Contributing First of all, thank you for considering contributing to *TanaT*. Yet it is still an experimental toolkit, it has received a warm welcome from various communities interested in its functionalities. We are sure that there are plenty of missing features that you would like to have in *TanaT*. So why not implement them and share them with the community? We would be eager to integrate new efficient useful features to analyse temporal sequences. We implemented first the features that are the most used for our application cases, and as we expect that our library could be used in multiple contexts, there are certainly other must-have-it methods in your community that could be integrated. Contributions are managed through GitHub Issues and Pull Requests. We welcome contributions in the following forms: - **Bug reports**: when filing an issue to report a bug, please use the search tool to ensure the bug hasn't been reported yet. - **New feature suggestions**: if you think *TanaT* should include a new algorithm, please open an issue to ask for it (always check that the feature has not been asked for yet). Think about linking to a PDF version of the paper that first proposed the method when suggesting a new algorithm. - **Bug fixes and new feature implementations**: if you feel you can fix a reported bug or implement a suggested feature yourself, do not hesitate to: 1. fork the project; 2. implement your bug fix; 3. submit a pull request referencing the ID of the issue in which the bug was reported / the feature was suggested. If you would like to contribute by implementing a new feature reported in the Issues, starting with Issues labelled "good first issue" is a good idea. When submitting code, please think about code quality and add proper docstrings with high code coverage. More details on Pull requests The preferred workflow for contributing to *TanaT* is to fork the main repository on GitHub, clone, and develop on a branch. Steps: 1. Fork the project repository by clicking on the **Fork** button near the top right of the page. This creates a copy of the code under your GitHub user account. For more details on how to fork a repository see this guide . 2. Clone your fork of the *TanaT* repo to your local disk: git clone git@github.com:YourLogin/TanaT.git cd TanaT 3. Create a `my-feature` branch to hold your development changes. Always use a feature branch; never work directly on `main`: git checkout -b my-feature 4. Develop the feature on your branch. Record your changes using `git add` and `git commit`: git add modified_files git commit 5. Push the changes: git push -u origin my-feature 6. Follow these instructions to create a pull request from your fork. This will notify the maintainers. (If any of the above seems unfamiliar, please look up the Git documentation on the web, or ask another contributor for help.) Contributing to the Research Project *TanaT* is also a research project that investigates new methods and principles for exploring temporal sequences. We expect it can be an asset for developping new ideas around temporal data analysis. If you have research project related to *TanaT*, feel free to contact us by email. ---------------------------------------- ## Citing *TanaT* Citing *TanaT* If you use *TanaT* in a scientific publication, please cite: @inproceedings{tanat2025, title={Towards a Library for the Analysis of Temporal Sequences}, authors={Thomas Guyet and Arnaud Duvermy}, booktitle={Proceedings of the Workshop on Advanced Analytics and Learning on Temporal Data (AALTD@ECML)}, year={2025}, pages={165--180}, doi={10.1007/978-3-032-15535-1_11} } ----------------------------------------