Building pools from multiple sources#

Combine three file-based data sources (two Parquet files and one CSV) that share the same schema into a single IntervalSequencePool, then compose a TrajectoryPool on top.

What you will learn:

  • add_parquet / add_csv: ingest data from files

  • Multi-source chaining: three .add_*() calls → one store

  • is_static=True for per-individual static features (bmi, site)

  • Builder option: sort_anchor

  • Trajectory composition with TrajectoryPool.builder()

  • Workspace registration and pool.save()

Note

SQL sources (add_sql) follow the exact same pattern and accept the same column-mapping parameters. They require connectorx; install it with pip install 'tanat[sql]'. See Builder & Storage for details.

Scenario: Admission records arrive from three upstream systems (a hospital export, a supplementary cohort, and a legacy CSV extract). All three share the schema severity_score (float) / ward (categorical) and are merged into a single pool, then linked to a procedures pool through a TrajectoryPool.

Imports#

import tempfile
from pathlib import Path
import datetime

import pandas as pd
import polars as pl

from tanat import get_workspace, set_workspace
from tanat.dataset.simulation import (
    simulate_events,
    simulate_intervals,
    simulate_static,
)
from tanat.sequence.type.event.pool import EventSequencePool
from tanat.sequence.type.interval.pool import IntervalSequencePool
from tanat.trajectory.pool import TrajectoryPool

Workspace setup#

A workspace registers every store built below under a short name so any script can reload them later without tracking file paths.

set_workspace("~/.tanat_workspace/building_pools_tutorial")
ws = get_workspace()
ws.clear()
print(ws)
TanaT Workspace
∟ Root: /home/runner/.tanat_workspace/building_pools_tutorial
∟ Content: 0 stores detected

Generate source files#

simulate_intervals generates one row per admission. Feature types follow the numeric → categorical → boolean cycle, so:

  • severity_score → float (clinical severity score at admission)

  • ward → categorical (care unit, values {A, B, C, D, E})

Static features follow the same cycle:

  • bmi → float

  • site → categorical (care site, values {A, B, C, D, E})

Patient IDs are prefixed with the source letter ("a", "b", "c") to avoid collisions when the three files are merged. The simulation outputs columns id / start / end by default; these match the builder’s defaults so no column mapping is needed.

def _prefix_ids(df: pd.DataFrame, prefix: str) -> pd.DataFrame:
    """Prefix the ``id`` column with a source identifier."""
    return df.assign(id=prefix + df["id"].astype(str))


# Feature schemas: defined once, reused across simulate_* and builder calls
ADMISSION_FEATURES = ["severity_score", "ward"]  # float, categorical
STATIC_FEATURES = ["bmi", "site"]  # float, categorical
PROCEDURE_FEATURES = ["priority", "procedure"]  # float, categorical
TIME_RANGE = (datetime.datetime(2000, 1, 1), datetime.datetime(2000, 12, 31))
SEED = 42

tmpdir = Path(tempfile.mkdtemp())

Simulate admissions in different sources (parquets, CSV, …)

# Source A
src_a = simulate_intervals(
    n_ids=100,
    features=ADMISSION_FEATURES,
    time_range=TIME_RANGE,
    seed=SEED,
)
src_a = _prefix_ids(src_a, "a")

# Source B
src_b = simulate_intervals(
    n_ids=50,
    features=ADMISSION_FEATURES,
    time_range=TIME_RANGE,
    seed=SEED + 1,
)
src_b = _prefix_ids(src_b, "b")

## Source C
src_c = simulate_intervals(
    n_ids=30,
    features=ADMISSION_FEATURES,
    time_range=TIME_RANGE,
    seed=SEED + 2,
)
src_c = _prefix_ids(src_c, "c")
# Simulate static demographics (one row per patient, all sources)
df_static = simulate_static(n_ids=100, features=STATIC_FEATURES, seed=SEED + 10)
tmp_static_A = _prefix_ids(df_static, "a")
df_static_B = simulate_static(n_ids=50, features=STATIC_FEATURES, seed=SEED + 11)
tmp_static_B = _prefix_ids(df_static_B, "b")
df_static_C = simulate_static(n_ids=30, features=STATIC_FEATURES, seed=SEED + 12)
tmp_static_C = _prefix_ids(df_static_C, "c")

# Static single DataFrame
static_df = pd.concat([tmp_static_A, tmp_static_B, tmp_static_C])

Write to disk: two Parquet files + one CSV (mimicking three upstream systems)

parquet_a = tmpdir / "hospital_export.parquet"
parquet_b = tmpdir / "supplementary_cohort.parquet"
csv_c = tmpdir / "legacy_extract.csv"
parquet_static = tmpdir / "demographics.parquet"

src_a.to_parquet(parquet_a, index=False)
src_b.to_parquet(parquet_b, index=False)
src_c.to_csv(csv_c, index=False)
static_df.to_parquet(parquet_static, index=False)

for label, df in [
    ("A (Parquet)", src_a),
    ("B (Parquet)", src_b),
    ("C (CSV)   ", src_c),
]:
    print(f"Source {label}: {df['id'].nunique()} patients")
Source A (Parquet): 100 patients
Source B (Parquet): 50 patients
Source C (CSV)   : 30 patients

Build the admissions pool (multi-source)#

Three .add_*() calls on the same builder merge all rows into one store. A fourth call with is_static=True attaches per-patient demographics. Column mapping is omitted; the default id/start/end names already match the simulation output.

admissions_path = (
    IntervalSequencePool.builder(sort_anchor="start")
    # Source A: hospital export
    .add_parquet(
        str(parquet_a),
        id_column="id",
        start_column="start",
        end_column="end",
        features=["severity_score", "ward"],
    )
    # Source B: supplementary cohort
    .add_parquet(
        str(parquet_b),
        id_column="id",
        start_column="start",
        end_column="end",
        features=["severity_score", "ward"],
    )
    # Source C: legacy CSV extract
    .add_csv(
        str(csv_c),
        id_column="id",
        start_column="start",
        end_column="end",
        features=["severity_score", "ward"],
        try_parse_dates=True,
    )
    # Static demographics
    .add_parquet(
        str(parquet_static), is_static=True, id_column="id", features=["bmi", "site"]
    ).build("admissions")
)
┌─ Interval SequenceStore
│
│ Step 1/4: Sorting & preparing data
│
│ Step 2/4: Building sequence index
│
│ Step 3/4: Writing entity, time index & static features
│
│ Step 4/4: Computing & writing metadata
│
└─ Done (180 sequences · 1,222 entities · 0.02s)

Inspect the admissions pool#

admissions = IntervalSequencePool(store=admissions_path)
print(admissions)
┌────────────────────────────────────────────────┐
│          IntervalSequencePool Summary          │
└────────────────────────────────────────────────┘

Overview
─────────────────────────
  Sequences          180
  Store              /home/runner/.tanat_workspace/building_pools_tutorial/admissions
  id_column          id

Time Index
─────────────────────────
  Type               Datetime(time_unit='us', time_zone=None) [2000-01-01 04:32:39.116009 → 2001-01-23 17:29:10.979611]
  Columns            ['start', 'end']
  t0                 position=0, anchor=start

Entity Features (2)
─────────────────────────
  • severity_score      Numerical [1 → 100]
  • ward                String [len 1 → 1]

Static Features (2)
─────────────────────────
  • bmi                 Numerical [1 → 100]
  • site                String [len 1 → 1]
print(f"Total patients : {len(admissions)}")
admissions.temporal_data().head(5)
Total patients : 180
id start end severity_score ward
0 a1 2000-01-14 01:58:57.008723 2000-01-16 06:34:24.784907 84 A
1 a1 2000-02-07 14:18:12.646728 2000-02-15 17:33:57.148515 20 D
2 a1 2000-12-16 07:37:29.336174 2000-12-19 05:14:02.391399 81 E
3 a10 2000-07-25 22:22:30.313192 2000-08-24 10:50:00.138800 13 E
4 a10 2000-09-21 21:44:31.200385 2000-10-05 01:33:20.095509 56 A


admissions.static_data().head(5)
id bmi site
0 a1 97 D
1 a10 56 D
2 a100 36 A
3 a11 25 D
4 a12 97 C


Builder option: sort_anchor#

sort_anchor controls how intervals are ordered within each sequence: "start" (default), "end", or "middle" (midpoint).

We build all three variants into a dict, then display the same patient’s sequence under each ordering.

anchor_pools = {
    anchor: IntervalSequencePool(
        store=IntervalSequencePool.builder(sort_anchor=anchor)
        .add_parquet(
            str(parquet_a),
            id_column="id",
            start_column="start",
            end_column="end",
            features=["severity_score", "ward"],
        )
        .build(f"admissions_{anchor}", exist_ok=True)
    )
    for anchor in ("start", "end", "middle")
}
┌─ Interval SequenceStore
│
│ Step 1/4: Sorting & preparing data
│
│ Step 2/4: Building sequence index
│
│ Step 3/4: Writing entity & time index features
│
│ Step 4/4: Computing & writing metadata
│
└─ Done (100 sequences · 672 entities · 0.00s)
┌─ Interval SequenceStore
│
│ Step 1/4: Sorting & preparing data
│
│ Step 2/4: Building sequence index
│
│ Step 3/4: Writing entity & time index features
│
│ Step 4/4: Computing & writing metadata
│
└─ Done (100 sequences · 672 entities · 0.00s)
┌─ Interval SequenceStore
│
│ Step 1/4: Sorting & preparing data
│
│ Step 2/4: Building sequence index
│
│ Step 3/4: Writing entity & time index features
│
│ Step 4/4: Computing & writing metadata
│
└─ Done (100 sequences · 672 entities · 0.00s)

Note

anchor changes the order of entity rows within the same sequence. The same patient can therefore be represented differently depending on the chosen anchor.

# sort_anchor = "start"
pid = "a12"
anchor_pools["start"][pid].temporal_data()
id start end severity_score ward
0 a12 2000-03-18 10:24:40.643654 2000-03-22 11:13:24.230208 33 A
1 a12 2000-05-18 08:30:05.790696 2000-06-14 13:40:01.268320 17 E
2 a12 2000-06-20 11:27:20.809481 2000-07-16 13:53:39.364004 34 B
3 a12 2000-07-07 17:05:26.537283 2000-07-31 20:33:59.472470 3 D
4 a12 2000-07-27 00:37:12.904679 2000-07-30 22:07:28.327453 11 E
5 a12 2000-09-23 20:45:04.159722 2000-09-30 19:00:42.202681 10 A
6 a12 2000-09-24 17:24:55.310293 2000-10-17 06:57:40.385107 78 A
7 a12 2000-09-27 17:26:08.722150 2000-09-29 09:32:28.543134 73 A
8 a12 2000-10-12 18:43:50.660502 2000-11-11 04:16:15.902719 70 A
9 a12 2000-11-16 14:29:01.781457 2000-11-28 12:58:35.492556 47 A


# sort_anchor = "middle"
anchor_pools["middle"][pid].temporal_data()
id start end severity_score ward
0 a12 2000-03-18 10:24:40.643654 2000-03-22 11:13:24.230208 33 A
1 a12 2000-05-18 08:30:05.790696 2000-06-14 13:40:01.268320 17 E
2 a12 2000-06-20 11:27:20.809481 2000-07-16 13:53:39.364004 34 B
3 a12 2000-07-07 17:05:26.537283 2000-07-31 20:33:59.472470 3 D
4 a12 2000-07-27 00:37:12.904679 2000-07-30 22:07:28.327453 11 E
5 a12 2000-09-23 20:45:04.159722 2000-09-30 19:00:42.202681 10 A
6 a12 2000-09-27 17:26:08.722150 2000-09-29 09:32:28.543134 73 A
7 a12 2000-09-24 17:24:55.310293 2000-10-17 06:57:40.385107 78 A
8 a12 2000-10-12 18:43:50.660502 2000-11-11 04:16:15.902719 70 A
9 a12 2000-11-16 14:29:01.781457 2000-11-28 12:58:35.492556 47 A


# sort_anchor = "end"
anchor_pools["end"][pid].temporal_data()
id start end severity_score ward
0 a12 2000-03-18 10:24:40.643654 2000-03-22 11:13:24.230208 33 A
1 a12 2000-05-18 08:30:05.790696 2000-06-14 13:40:01.268320 17 E
2 a12 2000-06-20 11:27:20.809481 2000-07-16 13:53:39.364004 34 B
3 a12 2000-07-27 00:37:12.904679 2000-07-30 22:07:28.327453 11 E
4 a12 2000-07-07 17:05:26.537283 2000-07-31 20:33:59.472470 3 D
5 a12 2000-09-27 17:26:08.722150 2000-09-29 09:32:28.543134 73 A
6 a12 2000-09-23 20:45:04.159722 2000-09-30 19:00:42.202681 10 A
7 a12 2000-09-24 17:24:55.310293 2000-10-17 06:57:40.385107 78 A
8 a12 2000-10-12 18:43:50.660502 2000-11-11 04:16:15.902719 70 A
9 a12 2000-11-16 14:29:01.781457 2000-11-28 12:58:35.492556 47 A


Build a procedures pool#

An EventSequencePool stores single-timestamp events. Two Parquet files are merged into one pool.

Feature schema: priority (float), procedure (categorical, values A–E)

proc_a = simulate_events(n_ids=100, features=PROCEDURE_FEATURES, seed=SEED + 20)
proc_a = _prefix_ids(proc_a, "a")

proc_b = simulate_events(n_ids=80, features=PROCEDURE_FEATURES, seed=SEED + 21)
proc_b = _prefix_ids(proc_b, "b")
parquet_proc_a = tmpdir / "procedures_a.parquet"
parquet_proc_b = tmpdir / "procedures_b.parquet"
proc_a.to_parquet(parquet_proc_a, index=False)
proc_b.to_parquet(parquet_proc_b, index=False)
procedures_path = (
    EventSequencePool.builder()
    .add_parquet(
        str(parquet_proc_a),
        id_column="id",
        time_column="time",
        features=["priority", "procedure"],
    )
    .add_parquet(
        str(parquet_proc_b),
        id_column="id",
        time_column="time",
        features=["priority", "procedure"],
    )
    .build("procedures")
)
┌─ Event SequenceStore
│
│ Step 1/4: Sorting & preparing data
│
│ Step 2/4: Building sequence index
│
│ Step 3/4: Writing entity & time index features
│
│ Step 4/4: Computing & writing metadata
│
└─ Done (180 sequences · 1,199 entities · 0.01s)
procedures = EventSequencePool(store=procedures_path)
print(procedures)
┌────────────────────────────────────────────────┐
│           EventSequencePool Summary            │
└────────────────────────────────────────────────┘

Overview
─────────────────────────
  Sequences          180
  Store              /home/runner/.tanat_workspace/building_pools_tutorial/procedures
  id_column          id

Time Index
─────────────────────────
  Type               Datetime(time_unit='us', time_zone=None) [2000-01-03 07:48:51.011785 → 2024-12-26 18:19:00.284083]
  Columns            ['time']
  t0                 position=0, anchor=None

Entity Features (2)
─────────────────────────
  • priority            Numerical [1 → 100]
  • procedure           String [len 1 → 1]

Compose a TrajectoryPool#

A TrajectoryPool groups multiple sequence pools under a shared ID space. Each pool is registered under an alias:

tpool["admissions"]          → IntervalSequencePool (full pool)
tpool[id]                    → Trajectory (one patient)
tpool[id]["admissions"]      → IntervalSequence (one sequence)
tpool[id]["admissions"][0]   → IntervalEntity (one entity)
traj_path = (
    TrajectoryPool.builder()
    .add("admissions", admissions)
    .add("procedures", procedures)
    .build("patient_trajectories", exist_ok=True)
)
┌─ TrajectoryStore
│
│ Step 1/2: Linking pools: admissions, procedures
│
│ Step 2/2: Building trajectory index & metadata
│
└─ Done (210 trajectories · 2 pool(s) · 0.00s)
tpool = TrajectoryPool(store=traj_path)
print(tpool)
print(f"{len(tpool)} patients with at least one sequence")
┌────────────────────────────────────────────────┐
│             TrajectoryPool Summary             │
└────────────────────────────────────────────────┘

Overview
─────────────────────────
  Trajectories       210
  Store              /home/runner/.tanat_workspace/building_pools_tutorial/patient_trajectories
  id_column          id

Time Index
─────────────────────────
  Type               Datetime(time_unit='us', time_zone=None) [2000-01-01 04:32:39.116009 → 2024-12-26 18:19:00.284083]
  t0                 position=0, anchor=start

Sequences (2)
─────────────────────────
  • admissions          IntervalSequencePool(n=180, entity_features=2, static_features=2, store='/home/runner/.tanat_workspace/building_pools_tutorial/admissions')
  • procedures          EventSequencePool(n=180, entity_features=2, static_features=0, store='/home/runner/.tanat_workspace/building_pools_tutorial/procedures')
210 patients with at least one sequence

Workspace: reload without tracking paths#

All stores are registered in the workspace by name. Reload them in any script without knowing the file path.

print(ws)
TanaT Workspace
∟ Root: /home/runner/.tanat_workspace/building_pools_tutorial
∟ Content: 6 stores detected
admissions_reloaded = ws["admissions"]
print(f"Reloaded: {len(admissions_reloaded)} patients")
Reloaded: 180 patients

Save a modified pool#

pool.save() materialises any pending lazy transformations into a new store registered under the given name.

admissions.cast_features({"ward": pl.Categorical})
saved_path = admissions.save("admissions_optimised", overwrite=True)
print("Saved to", saved_path)
┌─ Interval SequenceStore
│
│ Step 1/4: Sorting & preparing data
│
│ Step 2/4: Building sequence index
│
│ Step 3/4: Writing entity, time index & static features
│
│ Step 4/4: Computing & writing metadata
│
└─ Done (180 sequences · 1,222 entities · 0.01s)
Saved to /home/runner/.tanat_workspace/building_pools_tutorial/admissions_optimised

Total running time of the script: (0 minutes 0.144 seconds)

Gallery generated by Sphinx-Gallery