Step 10: Live Monitoring #

Goal: Run tests while monitoring events and instrument data in real time.

Prerequisites #

Start a Station Session #

Open a Python script or Jupyter notebook:

from litmus.connect import connect
 
# Connect to your station (mock mode for this tutorial)
with connect("bench_1", mock=True) as station:
    dmm = station.instrument("dmm")
 
    # Every instrument interaction is logged as an event
    voltage = dmm.measure_voltage()
 
    print(f"Session ID: {station.session_id}")
    print(f"Voltage: {voltage}")

This creates a session, connects instruments, and logs all interactions to the event store (see also three-stores).

Monitor in the UI #

In another terminal:

litmus serve --reload

Open http://localhost:8000 — the operator UI shows live session activity, including:

  • Session metadata (station, DUT, operator)
  • Instrument connections
  • Measurements as they happen
  • Step progress during test runs

Run Tests While Monitoring #

# In another terminal, run tests
pytest tests/ -s

The UI updates in real time as tests execute. Events flow through the system (see concepts/event-log for EventLog / EventStore definitions):

pytest → EventLog.emit() → EventStore → UI subscription

View the Run in Results #

When pytest finishes, the run lands in the Results history. Open http://localhost:8000/results — each pytest invocation that produced one or more tests appears as a row. The list shows Outcome / Serial / Part Number / Hostname / Project / Phase / Started / Steps / Meas / Ended. There's no filter bar; columns are sortable and a stats strip above the table summarizes the visible runs.

Click any row to drill into the detail view at /results/<run_id>. The detail page is a sticky header card with a tab strip beneath: Overview (run-level summary), Steps (one row per (step_path, vector_index) execution with its outcome and measurement count), Measurements (every value logged with its limit and outcome), and DUT History (this DUT's prior runs).

For the full reference, see Operator UI → Results — list and Operator UI → Results — detail.

See How the Line Is Doing #

After a few runs accumulate, the /metrics page becomes the go-to "is the bench healthy" view. A filter bar (Phase / Product / Station / Lot / Since / Until) sits above a tab strip with six analytical lenses:

TabWhat it shows
YieldFirst-pass yield, final yield, run / failure counts, a yield trend chart, and time stats
ParetoFailure counts grouped by Product, Step, or Measurement (the group-by is a control on the tab)
CpkPer-measurement process capability, ranked worst-first
RetestTime-bucketed retest rate — how many serials needed more than one attempt that period
Time lossWall-clock time spent on failed / errored runs per period
AssetsPer-instrument time share — Role / Resource / Sessions / Connected (s) / Share

For the full reference, see Operator UI → Metrics. For the diagnostic recipe behind the Retest signal, see Find flaky tests.

Query Historical Data #

After tests complete, query the results:

# Via HTTP API
curl http://localhost:8000/api/sessions
curl "http://localhost:8000/api/events?session_id=YOUR_SESSION_ID"
curl http://localhost:8000/api/channels

Or with the MCP tools:

litmus_sessions()
litmus_events(session_id="...")
litmus_channels(channel_id="dmm.voltage")

Channel Data from Instrument Reads #

When instruments are read through the proxy, scalar values appear in events directly. Array data (waveforms) is stored in the ChannelStore (Litmus's time-series store for instrument arrays) with a channel:// claim-check URI in the event:

with connect("bench_1", mock=True) as station:
    scope = station.instrument("scope")
    waveform = scope.read_waveform()
    # Event contains: {"value": {"_ref": "channel://scope.ch1/...", "length": 1000}}
    # Actual waveform data is in the ChannelStore

Query channel data:

curl "http://localhost:8000/api/channels/scope.ch1?max_points=500"

What's Happening Under the Hood #

  1. connect() creates an EventStore and EventLog for the session
  2. The EventStore acquires a DuckDB Flight daemon for cross-process queries
  3. Each emit() writes to Arrow IPC files and pushes to DuckDB
  4. The UI subscribes via EventStore.on_event() and receives events in real time
  5. Channel data flows to ChannelStore with LTTB (Largest Triangle Three Buckets) decimation — a downsampling algorithm that preserves visual peaks — for display

Step 9: Production Ready | Tutorial index

Next Steps #

Tutorial · Step 11 of 11