Live Langfuse test

A complete, copy-pasteable script that reads your real Langfuse traces, runs them through whatifd.pipeline.run_pipeline, and writes a real ReportV01 JSON + Markdown verdict. No agent runner, no Inspect AI, no API keys beyond Langfuse — the goal is to prove trace ingestion + the verdict pipeline work end-to-end against your data.

Once that’s green, the Going further section shows how to swap in your real agent runner and a real Inspect AI scorer.

Prerequisites

uv pip install whatifd whatifd-langfuse

export LANGFUSE_HOST="https://cloud.langfuse.com"   # or self-hosted URL
export LANGFUSE_PUBLIC_KEY="pk-lf-..."
export LANGFUSE_SECRET_KEY="sk-lf-..."

You also need at least ~10 traces in Langfuse you can split into a failure cohort and a baseline cohort. The script below does this via Langfuse tags (most reliable) — if you tag failures with failed (or anything you choose) and the rest count as baseline, you’re set. Other classifiers are shown afterward.

The script

Save as live_langfuse_test.py:

"""Live Langfuse → run_pipeline smoke test.

Reads up to 40 traces from your Langfuse, splits them into `failure` /
`baseline` cohorts by tag, runs them through `whatifd.pipeline.run_pipeline`
with a deterministic `delta_fn` (no agent runner, no Inspect AI), and
writes the resulting ReportV01 to ./reports/.

Verdict will probably be Inconclusive (deterministic delta_fn means no
real signal) — that's fine. The point is to see the Langfuse adapter
ingest your real traces, the pipeline construct a real ReportV01, and
the cardinal-#5 graph walk + cardinal-#10 methodology disclosure run
against your data end-to-end.
"""

from __future__ import annotations

import json
import os
from datetime import datetime, timezone
from pathlib import Path
from types import MappingProxyType

from langfuse.api import LangfuseAPI
from whatifd_langfuse import LangfuseTraceSource

from whatifd.adapters.protocols import RawTrace
from whatifd.cache.summary import CachePolicySnapshot, CacheSummary
from whatifd.pipeline import run_pipeline
from whatifd.serialization import (
    assert_no_unredacted_sensitive,
    encode_report_v01,
)
from whatifd.types.manifest import EnvironmentFingerprint, RunManifest
from whatifd.types.policy import DecisionPolicy, TrustFloor
from whatifd.types.statistical import (
    BootstrapMethodDisclosure,
    EffectSizeDisclosure,
    JudgeMethodDisclosure,
    MethodologyDisclosure,
    MultiplicityDisclosure,
)

# ---------------------------------------------------------------------------
# 1. Cohort classifier: how to split your traces into failure vs baseline.
# ---------------------------------------------------------------------------
#
# Pick ONE of the strategies below — the one that matches how you tag /
# label traces in Langfuse today.

def cohort_by_tag(trace) -> str:
    """Tag-based: a trace tagged `failed` is a failure; everything else baseline."""
    tags = trace.tags or []
    return "failure" if "failed" in tags else "baseline"

# Alternative classifiers — uncomment ONE if tags don't fit your data:

# def cohort_by_score(trace) -> str:
#     """Score-based: scores under 0.6 are failures."""
#     scores = getattr(trace, "scores", None) or []
#     for s in scores:
#         if getattr(s, "value", 1.0) < 0.6:
#             return "failure"
#     return "baseline"

# def cohort_by_metadata(trace) -> str:
#     """Metadata-based: e.g., metadata.outcome == 'error'."""
#     md = trace.metadata or {}
#     return "failure" if md.get("outcome") == "error" else "baseline"


# ---------------------------------------------------------------------------
# 2. Deterministic delta_fn — no Inspect AI, no agent runner needed.
# ---------------------------------------------------------------------------
#
# In a real run this would invoke your runner + scorer. Here it's a
# constant-per-cohort delta so the pipeline runs end-to-end against your
# real Langfuse data without any other dependencies. The verdict will
# reflect this (likely Inconclusive); see Going further for the real
# runner + Inspect AI wiring.

def delta_fn(rt: RawTrace) -> float:
    return 0.4 if rt.cohort == "failure" else 0.05


# ---------------------------------------------------------------------------
# 3. Construct LangfuseTraceSource against your real Langfuse.
# ---------------------------------------------------------------------------

api = LangfuseAPI(
    base_url=os.environ["LANGFUSE_HOST"],
    username=os.environ["LANGFUSE_PUBLIC_KEY"],
    password=os.environ["LANGFUSE_SECRET_KEY"],
)

source = LangfuseTraceSource(
    api=api,
    cohort_classifier=cohort_by_tag,    # swap if you uncommented an alternative above
    page_limit=50,
    max_traces=40,                      # safety cap so we don't drain a prod project
    sdk_version="live-langfuse-smoke",
)


# ---------------------------------------------------------------------------
# 4. Manifest + methodology + cache summary boilerplate.
# ---------------------------------------------------------------------------

now = datetime.now(timezone.utc).isoformat(timespec="seconds")

manifest = RunManifest(
    experiment_id="live-langfuse-smoke",
    started_at=now,
    finished_at=now,
    duration_ms=0,
    whatif_version="0.1.0",
    config_hash="0" * 64,           # not load-bearing for a smoke test
    selection_seed=42,
    source="langfuse",
    target="deterministic-delta-fn",
    trust_floor=TrustFloor(),
    decision_policy=DecisionPolicy(),
    environment=EnvironmentFingerprint(
        python="3.12",
        platform="linux",
        whatif_version="0.2.0",
    ),
)

methodology = MethodologyDisclosure(
    unit_of_analysis="paired_trace_delta",
    primary_metric="faithfulness",
    primary_endpoints=("failure.faithfulness", "baseline.faithfulness"),
    cohorts=("failure", "baseline"),
    bootstrap=BootstrapMethodDisclosure(
        # v0.2 ships the doctrinally-correct paired-percentile bootstrap.
        # For a smoke test with a deterministic delta_fn there's no
        # random sampling worth re-running, so keeping `unavailable`
        # is honest — but for real Langfuse runs you'd declare
        # method="paired_percentile_bootstrap" with resamples=2000
        # and a fixed seed.
        method="unavailable",
        resamples=None,
        seed=None,
        sample_unit="paired_trace_delta",
        ci_level="0.950",
        cluster_key=None,
        assumptions=(),
        unavailable_reason="smoke test — deterministic delta_fn, no random sampling",
    ),
    multiplicity=MultiplicityDisclosure(
        primary_endpoint_count=2,
        correction="none",
        reason="single primary metric per cohort; no correction applied",
    ),
    judge=JudgeMethodDisclosure(
        scorer="deterministic",
        scorer_version="0.1.0",
        judge_provider="none",
        judge_model="none",
        judge_model_version=None,
        rendered_prompt_hash="0" * 16,
        rubric_hash="0" * 16,
        scorer_cache_enabled=False,
        scorer_cache_mode="off",
        scorer_cache_hits=0,
        scorer_cache_misses=0,
        reproducibility_addressed=False,
        reliability_measured=False,
        validity_measured=False,
        calibration_measured=False,
        bias_audit_measured=False,
    ),
    effect_size=EffectSizeDisclosure(
        practical_delta="0.050",
        practical_delta_source="policy",
        judge_noise_floor=None,
    ),
    per_trace_inference="descriptive_only",
    causal_claim_scope="associated_under_cached_tool_replay",
)

cache_summary = CacheSummary(
    schema_version="v1",
    key_version="v1",
    mode="off",
    storage_profile="normalized_result_only",
    storage_path=".whatifd/cache",
    hits=0,
    misses=0,
    writes=0,
    stale_hits=0,
    corrupted_entries=0,
    policy=CachePolicySnapshot(
        mode="off",
        warn_after_days=30,
        block_after_days=90,
        storage_profile="normalized_result_only",
    ),
    policy_violations=(),
    oldest_hit_age_days=None,
    models_distribution=MappingProxyType({}),
)


# ---------------------------------------------------------------------------
# 5. Run the pipeline.
# ---------------------------------------------------------------------------

report = run_pipeline(
    source,
    delta_fn=delta_fn,
    floor=TrustFloor(),
    policy=DecisionPolicy(),
    runtime=manifest,
    methodology=methodology,
    cache_summary=cache_summary,
)

# Cardinal-#5 graph walk: refuse to write any unwrapped Sensitive[T].
assert_no_unredacted_sensitive(report)


# ---------------------------------------------------------------------------
# 6. Write artifacts.
# ---------------------------------------------------------------------------

reports_dir = Path("reports")
reports_dir.mkdir(exist_ok=True)
stamp = datetime.now(timezone.utc).strftime("%Y-%m-%d-%H%M%S")

json_path = reports_dir / f"live-langfuse-{stamp}.json"
json_path.write_text(json.dumps(json.loads(encode_report_v01(report)), indent=2))

print(f"Verdict: {report.verdict_state}")
print(f"JSON:    {json_path}")
print(f"Cohorts seen: {[c.name for c in report.cohort_results]}")
print(f"Traces ingested per cohort: " + ", ".join(
    f"{c.name}={c.selected}" for c in report.cohort_results
))

Run it

python live_langfuse_test.py

Expected output (verdict + numbers depend on your Langfuse data):

Verdict: ship
JSON:    reports/live-langfuse-2026-05-09-180322.json
Cohorts seen: ['baseline', 'failure']
Traces ingested per cohort: baseline=20, failure=15

The verdict will be one of ship / dont_ship / inconclusive depending on what cohort sizes the classifier produces and whether the deterministic delta_fn happens to satisfy the policy thresholds. The verdict isn’t the proof point — getting any verdict back, at all, with cohorts populated from your real Langfuse data is the proof point.

If you see Cohorts seen: [] or selected=0 for both, the cohort classifier isn’t matching — adjust cohort_by_tag to use whatever signal your traces actually carry (tag name, score field, metadata key). The three commented-out alternatives in the script are starting points.

Marking traces in Langfuse

If Cohorts seen: ['baseline'] (no failure cohort), your traces don’t carry the signal cohort_by_tag looks for. The fastest fix is to tag a handful of traces in the Langfuse UI — no code, no SDK calls.

Tag from code (for ongoing use)

Once you want failure marking to happen automatically as your agent runs, three patterns — pick the one that matches what your code already knows:

# (a) Tag — what cohort_by_tag reads (recommended)
langfuse.update_current_trace(tags=["failed"])

# (b) Metadata — switch the script to cohort_by_metadata
langfuse.update_current_trace(metadata={"outcome": "error"})

# (c) Score — switch the script to cohort_by_score (uses < 0.6 threshold)
langfuse.score(trace_id=..., name="quality", value=0.2)

Tags are usually the right call: cheap to add, easy to filter on in the Langfuse UI, and the default classifier doesn’t care about anything else. See Langfuse → Tracing features → Tags for the SDK-side details.

What this proves

  • whatifd-langfuse adapter ingested your real traces (real HTTP, real Langfuse API).

  • Sensitive[str] wrapping at the boundary held — the graph walk passed (assert_no_unredacted_sensitive).

  • ✅ Cohort classification ran against your data.

  • ✅ The pipeline produced a real ReportV01 with cardinal-#10 methodology disclosure baked in.

  • ❌ This does not prove your scoring model works — delta_fn is deterministic. That’s the next step.

Going further

To replace delta_fn with your real agent runner + a real Inspect AI scorer:

  1. Implement the runner contract — a Python function that takes (TraceInput, ReplayConfig, ToolCache) and returns a ReplayOutput by re-executing your agent against a proposed change.

  2. Construct an InspectAIScorer instance (your Inspect score_fn + judge config).

  3. Build a delta_fn(rt) that runs the runner via whatifd.replay.replay_one_trace, projects the result into a ScoreCase, calls scorer.score(case), and returns result.score.

The reference is tests/integration/test_real_adapters.py in the library repo — the _delta_fn_from(scorer) helper there is the exact bridge you need. Once that’s wired you’ll get a real Ship / Don’t Ship / Inconclusive verdict on production traces.

CLI-only path (v0.2+)

As of v0.2, the entire script above can be replaced with a whatifd.config.yaml + whatifd fork --config whatifd.config.yaml invocation — scorer.score_fn resolves your Inspect score function from a python:<module>:<attr> reference, and the CLI constructs InspectAIScorer for you. See Inspect AI integration for the YAML shape. The programmatic script in this page remains useful for ad-hoc smoke tests where you want to bypass config plumbing.