Parquet Storage Schema #

Each Litmus run produces one Parquet file. Every row carries an explicit record_type discriminator with one of three values:

  • record_type = 'run' — exactly one row per file. Carries run-level identity, timing, outcome, plus DUT / station / project / git / environment context.
  • record_type = 'step' — one row per (step_path, vector_index) execution. Step identity, timing, outcome, dynamic in_* / out_* columns. Measurement columns are NULL.
  • record_type = 'measurement' — one row per recorded measurement. Carries the measurement payload plus the same denormalized step + run + DUT + station + fixture context as the corresponding step row.

Step and measurement rows share grain (run_id, step_path, vector_index); measurement rows are further keyed by measurement_name. A step that records N measurements emits 1 step row + N measurement rows.

The canonical schema lives at src/litmus/data/schemas.py (RUN_ROW_SCHEMA); this page is a human-readable mirror of it.

File layout #

<data_dir>/runs/{date}/
├── {timestamp}_{serial}.parquet           # Run row + all step + measurement rows for one run
├── {timestamp}.parquet                    # Same shape, no DUT serial (dev runs)
└── {timestamp}_{serial}_ref/              # Reference data (waveforms, images, files)
    ├── {vector_id}_scope_waveform.npz
    ├── {vector_id}_camera_image.png
    └── ...

Timestamps are UTC and sort naturally. DuckDB / Spark / Polars / Pandas all read the file directly with read_parquet.

Discriminator #

ColumnTypeDescription
record_typestring'run', 'step', or 'measurement'

Every query starts here. Three values:

  • run — one row per run carrying run-level metadata (start/end timestamps, DUT serial, station, outcome).
  • step — one row per (step, vector) combination.
  • measurement — one row per measurement name within a (step, vector).

To list steps: WHERE record_type = 'step'. To list measurements: WHERE record_type = 'measurement'. All kinds: omit the filter.

Identity & timing #

ColumnTypeDescription
session_idstringSession UUID — groups runs that ran together in one litmus serve / pytest invocation
run_idstringRun UUID — primary key for the run
slot_idstringMulti-DUT slot ID (NULL for single-DUT runs)
run_started_attimestamp[us, UTC]When the run started
run_ended_attimestamp[us, UTC]When the run ended
step_namestringTest function or class name
step_indexint640-based step order within the run
step_pathstringHierarchical path, e.g. TestPower/test_efficiency
parent_pathstringContainer path; empty for root steps. Enables tree reconstruction without joins.
step_started_attimestamp[us, UTC]Step start (NULL for unrun planned steps)
step_ended_attimestamp[us, UTC]Step end
step_node_idstringpytest node id (tests/test_power.py::TestPower::test_efficiency)
step_modulestringModule name
step_filestringSource file path
step_classstringClass name (NULL for module-level functions)
step_functionstringFunction name
step_markersstringMarker payload summary
step_vector_countint32Total planned vectors for this step (1 for non-swept)
vector_indexint640-based index within the step's sweep matrix
vector_retryint640-based retry counter (0 = first execution)
vector_started_attimestamp[us, UTC]Vector start
vector_ended_attimestamp[us, UTC]Vector end

Who — operator #

ColumnTypeDescription
operator_idstringFrom --operator or env var
operator_namestringHuman-readable name

What — DUT #

ColumnTypeDescription
dut_serialstringFrom --dut-serial
dut_part_numberstringOperator-facing product identifier (NOT product_id)
dut_revisionstringHardware revision
dut_lot_numberstringManufacturing lot

What — product spec #

ColumnTypeDescription
product_idstringInternal product identifier from the product YAML
product_namestringHuman-readable product name
product_revisionstringSpec revision

Where — station #

ColumnTypeDescription
station_idstringStation config id
station_namestringHuman-readable station name
station_typestringStation type (template)
station_locationstringPhysical location
station_hostnamestringOperator-facing identifier for the physical bench

Where — fixture #

ColumnTypeDescription
fixture_idstringFixture YAML id

Where — instruments (dynamic step_instruments_*) #

Per-step instrument identity, captured from the pytest fixtures the test actually used. All columns are list[string] (one entry per instrument) and arrays stay in parallel order.

ColumnTypeDescription
step_instruments_namelist[string]Role names (e.g. ["dmm", "psu"])
step_instruments_idlist[string]Instrument file IDs
step_instruments_driverlist[string]Driver class paths
step_instruments_resourcelist[string]VISA addresses
step_instruments_protocollist[string]Protocols ("visa", "daqmx", …)
step_instruments_manufacturerlist[string]From *IDN? or YAML config
step_instruments_modellist[string]Model number
step_instruments_seriallist[string]Serial number
step_instruments_firmwarelist[string]Firmware version
step_instruments_cal_duelist[string]Calibration due date (ISO 8601)
step_instruments_cal_lastlist[string]Last cal date (ISO 8601)
step_instruments_cal_certificatelist[string]Cal certificate number
step_instruments_cal_lablist[string]Cal lab name
step_instruments_mockedlist[bool]True if the instrument ran in mock mode

For real hardware, identity comes from *IDN? at session start. For mock instruments, identity comes from the instrument YAML configs.

-- DuckDB: unnest parallel arrays for per-instrument queries
SELECT
    step_name,
    unnest(step_instruments_name) AS instrument,
    unnest(step_instruments_serial) AS serial,
    unnest(step_instruments_cal_due) AS cal_due
FROM read_parquet('data/runs/**/*.parquet')
WHERE record_type = 'step';

Test context #

ColumnTypeDescription
test_phasestringproduction / characterization / development
project_namestringProject name from litmus.yaml
git_commitstringCode version at test time
git_branchstringBranch at test time
git_remotestringRemote URL at test time

Input conditions (dynamic in_*) #

For each parametrize axis or sidecar sweep parameter, the writer emits a column. Types are inferred from values.

Column patternTypeDescription
in_{param}float64 / int64 / stringValue commanded for that axis
in_{param}_instrumentstringInstrument name
in_{param}_resourcestringVISA address at test time
in_{param}_channelstringChannel on instrument
in_{param}_dut_pinstringDUT pin driven
in_{param}_fixture_connectionstringFixture routing connection

Naming convention:

TypePatternExamples
Spec conditionsbare namein_temperature, in_load, in_vin
Implementation detailsfixture-prefixedin_psu.voltage, in_dmm.sample_count

Bare names are spec-relevant for condition matching; prefixed names are stimulus/settings. Convention is enforced by docs, not by the writer.

Observations (dynamic out_*) #

Observations are measured context — readings captured during the test, not commanded values.

Column patternTypeDescription
out_{key}variesObserved value (scalar, array, or file reference)

Examples: out_temp_probe.temperature, out_temp_probe.humidity, out_scope.waveform.

For non-scalar payloads, the value is a file://_ref/... URI:

Data TypeStorage formatExample column value
Scalar (float / int / str / bool)inline3.31
Waveform.npz with t0, dt, Y, attrsfile://_ref/{id}_scope_waveform.npz
numpy.ndarray.npy compressedfile://_ref/{id}_raw_samples.npy
Pathcopied, extension preservedfile://_ref/{id}_debug_log.txt
Pydantic model.jsonfile://_ref/{id}_protocol_trace.json
bytes.binfile://_ref/{id}_raw_data.bin
from litmus.data.backends.parquet import load_file, is_file_reference
 
if is_file_reference(column_value):
    data = load_file(parquet_path, column_value)

Measurement core (on record_type='measurement' rows) #

ColumnTypeDescription
measurement_namestring"output_voltage", "efficiency", ...
measurement_timestamptimestamp[us, UTC]When the measurement was recorded
measurement_valuefloat64Measured value (scalar; non-scalar payloads go to _ref/ via out_*)
measurement_unitsstringUnits (V, A, %, ...)
measurement_outcomestringpassed / failed / skipped / errored / aborted / terminated / done

Limits (on record_type='measurement' rows) #

ColumnTypeDescription
limit_lowfloat64Lower bound (NULL if no lower limit)
limit_highfloat64Upper bound (NULL if no upper limit)
limit_nominalfloat64Expected / target value
limit_comparatorstringGELE, EQ, GE, LE, GELT, GTLE, GTLT, GT, LT, NE

Spec traceability #

ColumnTypeDescription
characteristic_idstringCharacteristic ID from the product YAML (e.g. "output_voltage")
spec_refstringHuman-readable reference with conditions (e.g. "Table 4.2 @ temp=25")
-- Yield by characteristic across all products
SELECT characteristic_id, product_id,
       AVG(CASE WHEN measurement_outcome='passed' THEN 1.0 ELSE 0.0 END) AS yield
FROM read_parquet('data/runs/**/*.parquet')
WHERE record_type = 'measurement'
GROUP BY characteristic_id, product_id;

Measurement signal path #

ColumnTypeDescription
dut_pinstringDUT pin that was measured
fixture_connectionstringFixture routing connection name
instrument_namestringRole name of the instrument that took the measurement
instrument_resourcestringVISA address
instrument_channelstringChannel on the instrument

Rollup outcomes #

ColumnTypeDescription
step_outcomestringDid this step pass overall
vector_outcomestringDid this vector pass
run_outcomestringDid the entire run pass

Environment traceability #

ColumnTypeDescription
python_versionstringe.g. "3.13.1"
litmus_versionstringInstalled Litmus version
env_fingerprintstringHash of the lockfile + top-level deps

Custom metadata #

Test code can add arbitrary columns via run_context.set():

def test_example(run_context, psu, dmm, verify):
    run_context.set("operator_badge", "EMP-12345")
    run_context.set("fixture_serial", "FIX-001")
    run_context.set("ambient_temp", 23.5)
    ...

Those become Parquet columns prefixed custom_* with inferred types.

Outcome values #

ValueMeaning
passedAll limits satisfied
failedOne or more limits exceeded
skippedTest was skipped (pytest.skip, marker, or session-level skip)
erroredTest errored before pass/fail could be decided
terminatedRun was terminated (keyboard interrupt, signal)
abortedRun was aborted by operator
doneContainer outcome — work finished, no measurements

Source of truth: src/litmus/data/models.py (Outcome).

Comparator values #

ComparatorPass condition
GELElow <= value <= high (default)
GELTlow <= value < high
GTLElow < value <= high
GTLTlow < value < high
EQvalue == nominal
NEvalue != nominal
GEvalue >= low
GTvalue > low
LEvalue <= high
LTvalue < high

Retries #

All retries are stored. Each retry produces measurement rows with the same vector_index and an incremented vector_retry:

vector_index | vector_retry | measurement_name | measurement_value | measurement_outcome
0            | 0            | output_voltage   | 3.50              | failed   ← first execution
0            | 1            | output_voltage   | 3.48              | failed   ← first retry
0            | 2            | output_voltage   | 3.30              | passed   ← second retry

Filter to the final execution with WHERE vector_retry = (SELECT MAX(vector_retry) ...) or use the daemon's runs view, which already rolls retry_count per (run_id, step_path, vector_index).

File-level metadata #

Beyond columns, each Parquet file carries metadata:

KeyDescription
environment_jsonFull environment snapshot (Python version, OS, Litmus version, top-level deps, lockfile hash)
litmus_versionLitmus version that produced this file
schema_versionSchema version ("1.0" at time of writing — see SCHEMA_VERSION in src/litmus/data/schemas.py)
import pyarrow.parquet as pq
from litmus.environment import EnvironmentSnapshot
 
pf = pq.ParquetFile("data/runs/2026-05-16/20260516T143025Z_SN001.parquet")
metadata = pf.schema_arrow.metadata
env = EnvironmentSnapshot.model_validate_json(metadata[b"environment_json"])
print(f"Python {env.python_version}, Litmus {env.litmus_version}")

Querying examples #

Load a run with pandas #

import pandas as pd
 
df = pd.read_parquet("data/runs/2026-05-16/20260516T143025Z_SN001.parquet")
 
# Step rows
steps = df[df["record_type"] == "step"]
# Measurement rows with full context
measurements = df[df["record_type"] == "measurement"]
 
# Failures with full context
failures = measurements[measurements["measurement_outcome"] == "failed"]
print(failures[["step_name", "measurement_name", "measurement_value",
                "limit_low", "limit_high", "dut_pin", "instrument_name"]])

Yield by station with DuckDB #

SELECT
    product_id,
    station_id,
    measurement_name,
    COUNT(*) AS total,
    SUM(CASE WHEN measurement_outcome = 'passed' THEN 1 ELSE 0 END) AS passed,
    ROUND(100.0 * SUM(CASE WHEN measurement_outcome = 'passed' THEN 1 ELSE 0 END) / COUNT(*), 2) AS yield_pct
FROM read_parquet('data/runs/**/*.parquet')
WHERE record_type = 'measurement'
GROUP BY 1, 2, 3
ORDER BY yield_pct ASC;

Cross-run instrument-failure correlation #

SELECT
    instrument_name,
    instrument_resource,
    COUNT(*) AS failures
FROM read_parquet('data/runs/**/*.parquet')
WHERE record_type = 'measurement'
  AND measurement_outcome = 'failed'
GROUP BY 1, 2
ORDER BY failures DESC;

Slowest steps across runs #

SELECT
    step_name,
    AVG(EPOCH(step_ended_at) - EPOCH(step_started_at)) AS avg_seconds,
    COUNT(*) AS runs
FROM read_parquet('data/runs/**/*.parquet')
WHERE record_type = 'step'
  AND step_started_at IS NOT NULL
GROUP BY step_name
ORDER BY avg_seconds DESC;

ATML / IEEE 1671 alignment #

Litmus columnATML equivalent
TestRun (run_id)TestResults
record_type='step'TestGroup
vector_index(Conditions)
record_type='measurement'Data
DUT (dut_*)UUT
measurement_outcomeOutcomeValue
limit_comparatorComparator
dut_pinuutPort
instrument_channelinstrumentPort

See also #