Live Langfuse test¶
A complete, copy-pasteable script that reads your real Langfuse traces, runs them through whatifd.pipeline.run_pipeline, and writes a real ReportV01 JSON + Markdown verdict. No agent runner, no Inspect AI, no API keys beyond Langfuse — the goal is to prove trace ingestion + the verdict pipeline work end-to-end against your data.
Once that’s green, the Going further section shows how to swap in your real agent runner and a real Inspect AI scorer.
Prerequisites¶
uv pip install whatifd whatifd-langfuse
export LANGFUSE_HOST="https://cloud.langfuse.com" # or self-hosted URL
export LANGFUSE_PUBLIC_KEY="pk-lf-..."
export LANGFUSE_SECRET_KEY="sk-lf-..."
You also need at least ~10 traces in Langfuse you can split into a failure cohort and a baseline cohort. The script below does this via Langfuse tags (most reliable) — if you tag failures with failed (or anything you choose) and the rest count as baseline, you’re set. Other classifiers are shown afterward.
The script¶
Save as live_langfuse_test.py:
"""Live Langfuse → run_pipeline smoke test.
Reads up to 40 traces from your Langfuse, splits them into `failure` /
`baseline` cohorts by tag, runs them through `whatifd.pipeline.run_pipeline`
with a deterministic `delta_fn` (no agent runner, no Inspect AI), and
writes the resulting ReportV01 to ./reports/.
Verdict will probably be Inconclusive (deterministic delta_fn means no
real signal) — that's fine. The point is to see the Langfuse adapter
ingest your real traces, the pipeline construct a real ReportV01, and
the cardinal-#5 graph walk + cardinal-#10 methodology disclosure run
against your data end-to-end.
"""
from __future__ import annotations
import json
import os
from datetime import datetime, timezone
from pathlib import Path
from types import MappingProxyType
from langfuse.api import LangfuseAPI
from whatifd_langfuse import LangfuseTraceSource
from whatifd.adapters.protocols import RawTrace
from whatifd.cache.summary import CachePolicySnapshot, CacheSummary
from whatifd.pipeline import run_pipeline
from whatifd.serialization import (
assert_no_unredacted_sensitive,
encode_report_v01,
)
from whatifd.types.manifest import EnvironmentFingerprint, RunManifest
from whatifd.types.policy import DecisionPolicy, TrustFloor
from whatifd.types.statistical import (
BootstrapMethodDisclosure,
EffectSizeDisclosure,
JudgeMethodDisclosure,
MethodologyDisclosure,
MultiplicityDisclosure,
)
# ---------------------------------------------------------------------------
# 1. Cohort classifier: how to split your traces into failure vs baseline.
# ---------------------------------------------------------------------------
#
# Pick ONE of the strategies below — the one that matches how you tag /
# label traces in Langfuse today.
def cohort_by_tag(trace) -> str:
"""Tag-based: a trace tagged `failed` is a failure; everything else baseline."""
tags = trace.tags or []
return "failure" if "failed" in tags else "baseline"
# Alternative classifiers — uncomment ONE if tags don't fit your data:
# def cohort_by_score(trace) -> str:
# """Score-based: scores under 0.6 are failures."""
# scores = getattr(trace, "scores", None) or []
# for s in scores:
# if getattr(s, "value", 1.0) < 0.6:
# return "failure"
# return "baseline"
# def cohort_by_metadata(trace) -> str:
# """Metadata-based: e.g., metadata.outcome == 'error'."""
# md = trace.metadata or {}
# return "failure" if md.get("outcome") == "error" else "baseline"
# ---------------------------------------------------------------------------
# 2. Deterministic delta_fn — no Inspect AI, no agent runner needed.
# ---------------------------------------------------------------------------
#
# In a real run this would invoke your runner + scorer. Here it's a
# constant-per-cohort delta so the pipeline runs end-to-end against your
# real Langfuse data without any other dependencies. The verdict will
# reflect this (likely Inconclusive); see Going further for the real
# runner + Inspect AI wiring.
def delta_fn(rt: RawTrace) -> float:
return 0.4 if rt.cohort == "failure" else 0.05
# ---------------------------------------------------------------------------
# 3. Construct LangfuseTraceSource against your real Langfuse.
# ---------------------------------------------------------------------------
api = LangfuseAPI(
base_url=os.environ["LANGFUSE_HOST"],
username=os.environ["LANGFUSE_PUBLIC_KEY"],
password=os.environ["LANGFUSE_SECRET_KEY"],
)
source = LangfuseTraceSource(
api=api,
cohort_classifier=cohort_by_tag, # swap if you uncommented an alternative above
page_limit=50,
max_traces=40, # safety cap so we don't drain a prod project
sdk_version="live-langfuse-smoke",
)
# ---------------------------------------------------------------------------
# 4. Manifest + methodology + cache summary boilerplate.
# ---------------------------------------------------------------------------
now = datetime.now(timezone.utc).isoformat(timespec="seconds")
manifest = RunManifest(
experiment_id="live-langfuse-smoke",
started_at=now,
finished_at=now,
duration_ms=0,
whatif_version="0.1.0",
config_hash="0" * 64, # not load-bearing for a smoke test
selection_seed=42,
source="langfuse",
target="deterministic-delta-fn",
trust_floor=TrustFloor(),
decision_policy=DecisionPolicy(),
environment=EnvironmentFingerprint(
python="3.12",
platform="linux",
whatif_version="0.2.0",
),
)
methodology = MethodologyDisclosure(
unit_of_analysis="paired_trace_delta",
primary_metric="faithfulness",
primary_endpoints=("failure.faithfulness", "baseline.faithfulness"),
cohorts=("failure", "baseline"),
bootstrap=BootstrapMethodDisclosure(
# v0.2 ships the doctrinally-correct paired-percentile bootstrap.
# For a smoke test with a deterministic delta_fn there's no
# random sampling worth re-running, so keeping `unavailable`
# is honest — but for real Langfuse runs you'd declare
# method="paired_percentile_bootstrap" with resamples=2000
# and a fixed seed.
method="unavailable",
resamples=None,
seed=None,
sample_unit="paired_trace_delta",
ci_level="0.950",
cluster_key=None,
assumptions=(),
unavailable_reason="smoke test — deterministic delta_fn, no random sampling",
),
multiplicity=MultiplicityDisclosure(
primary_endpoint_count=2,
correction="none",
reason="single primary metric per cohort; no correction applied",
),
judge=JudgeMethodDisclosure(
scorer="deterministic",
scorer_version="0.1.0",
judge_provider="none",
judge_model="none",
judge_model_version=None,
rendered_prompt_hash="0" * 16,
rubric_hash="0" * 16,
scorer_cache_enabled=False,
scorer_cache_mode="off",
scorer_cache_hits=0,
scorer_cache_misses=0,
reproducibility_addressed=False,
reliability_measured=False,
validity_measured=False,
calibration_measured=False,
bias_audit_measured=False,
),
effect_size=EffectSizeDisclosure(
practical_delta="0.050",
practical_delta_source="policy",
judge_noise_floor=None,
),
per_trace_inference="descriptive_only",
causal_claim_scope="associated_under_cached_tool_replay",
)
cache_summary = CacheSummary(
schema_version="v1",
key_version="v1",
mode="off",
storage_profile="normalized_result_only",
storage_path=".whatifd/cache",
hits=0,
misses=0,
writes=0,
stale_hits=0,
corrupted_entries=0,
policy=CachePolicySnapshot(
mode="off",
warn_after_days=30,
block_after_days=90,
storage_profile="normalized_result_only",
),
policy_violations=(),
oldest_hit_age_days=None,
models_distribution=MappingProxyType({}),
)
# ---------------------------------------------------------------------------
# 5. Run the pipeline.
# ---------------------------------------------------------------------------
report = run_pipeline(
source,
delta_fn=delta_fn,
floor=TrustFloor(),
policy=DecisionPolicy(),
runtime=manifest,
methodology=methodology,
cache_summary=cache_summary,
)
# Cardinal-#5 graph walk: refuse to write any unwrapped Sensitive[T].
assert_no_unredacted_sensitive(report)
# ---------------------------------------------------------------------------
# 6. Write artifacts.
# ---------------------------------------------------------------------------
reports_dir = Path("reports")
reports_dir.mkdir(exist_ok=True)
stamp = datetime.now(timezone.utc).strftime("%Y-%m-%d-%H%M%S")
json_path = reports_dir / f"live-langfuse-{stamp}.json"
json_path.write_text(json.dumps(json.loads(encode_report_v01(report)), indent=2))
print(f"Verdict: {report.verdict_state}")
print(f"JSON: {json_path}")
print(f"Cohorts seen: {[c.name for c in report.cohort_results]}")
print(f"Traces ingested per cohort: " + ", ".join(
f"{c.name}={c.selected}" for c in report.cohort_results
))
Run it¶
python live_langfuse_test.py
Expected output (verdict + numbers depend on your Langfuse data):
Verdict: ship
JSON: reports/live-langfuse-2026-05-09-180322.json
Cohorts seen: ['baseline', 'failure']
Traces ingested per cohort: baseline=20, failure=15
The verdict will be one of ship / dont_ship / inconclusive depending on what cohort sizes the classifier produces and whether the deterministic delta_fn happens to satisfy the policy thresholds. The verdict isn’t the proof point — getting any verdict back, at all, with cohorts populated from your real Langfuse data is the proof point.
If you see Cohorts seen: [] or selected=0 for both, the cohort classifier isn’t matching — adjust cohort_by_tag to use whatever signal your traces actually carry (tag name, score field, metadata key). The three commented-out alternatives in the script are starting points.
Marking traces in Langfuse¶
If Cohorts seen: ['baseline'] (no failure cohort), your traces don’t carry the signal cohort_by_tag looks for. The fastest fix is to tag a handful of traces in the Langfuse UI — no code, no SDK calls.
Tag in the UI (recommended for first run)¶
Open your Langfuse project in the browser.
Go to Tracing → Traces.
Click into any trace you’d consider a “bad output” / failure case.
Find the Tags field on the trace detail view → type
failed→ press Enter.Repeat for 3–5 traces.
Re-run
python live_langfuse_test.py. Thefailurecohort populates and you get a real verdict.
If you don’t have any obviously-bad traces yet, tag any 3–5 arbitrarily — the deterministic delta_fn in this script doesn’t compare real quality, so the verdict is a pipeline-shape proof, not a real signal. Meaningful verdicts come once the real runner + Inspect AI scorer are wired (see Going further).
Tag from code (for ongoing use)¶
Once you want failure marking to happen automatically as your agent runs, three patterns — pick the one that matches what your code already knows:
# (a) Tag — what cohort_by_tag reads (recommended)
langfuse.update_current_trace(tags=["failed"])
# (b) Metadata — switch the script to cohort_by_metadata
langfuse.update_current_trace(metadata={"outcome": "error"})
# (c) Score — switch the script to cohort_by_score (uses < 0.6 threshold)
langfuse.score(trace_id=..., name="quality", value=0.2)
Tags are usually the right call: cheap to add, easy to filter on in the Langfuse UI, and the default classifier doesn’t care about anything else. See Langfuse → Tracing features → Tags for the SDK-side details.
What this proves¶
✅
whatifd-langfuseadapter ingested your real traces (real HTTP, real Langfuse API).✅
Sensitive[str]wrapping at the boundary held — the graph walk passed (assert_no_unredacted_sensitive).✅ Cohort classification ran against your data.
✅ The pipeline produced a real
ReportV01with cardinal-#10 methodology disclosure baked in.❌ This does not prove your scoring model works —
delta_fnis deterministic. That’s the next step.
Going further¶
To replace delta_fn with your real agent runner + a real Inspect AI scorer:
Implement the runner contract — a Python function that takes
(TraceInput, ReplayConfig, ToolCache)and returns aReplayOutputby re-executing your agent against a proposed change.Construct an
InspectAIScorerinstance (your Inspect score_fn + judge config).Build a
delta_fn(rt)that runs the runner viawhatifd.replay.replay_one_trace, projects the result into aScoreCase, callsscorer.score(case), and returnsresult.score.
The reference is tests/integration/test_real_adapters.py in the library repo — the _delta_fn_from(scorer) helper there is the exact bridge you need. Once that’s wired you’ll get a real Ship / Don’t Ship / Inconclusive verdict on production traces.
CLI-only path (v0.2+)¶
As of v0.2, the entire script above can be replaced with a whatifd.config.yaml + whatifd fork --config whatifd.config.yaml invocation — scorer.score_fn resolves your Inspect score function from a python:<module>:<attr> reference, and the CLI constructs InspectAIScorer for you. See Inspect AI integration for the YAML shape. The programmatic script in this page remains useful for ad-hoc smoke tests where you want to bypass config plumbing.