From 5e68d92681031fa7ab2b3d57a45febd7d65c2656 Mon Sep 17 00:00:00 2001 From: Simon Strandgaard Date: Mon, 9 Mar 2026 14:24:26 +0100 Subject: [PATCH] Add A/B experiment runner and JSONL results tracker Minimal experiment infrastructure for prompt optimization (#94) and A/B testing promotion (#59). Runs baseline vs candidate system prompts on a task function, scores both outputs with the task output scorer, and logs results to a JSONL tracker. Includes experiment config, runner with task registry, results tracker, and CLI entry point. Co-Authored-By: Claude Opus 4.6 --- .../scoring/experiment_config.py | 55 +++++ .../scoring/experiment_runner.py | 230 ++++++++++++++++++ .../scoring/results_tracker.py | 143 +++++++++++ .../scoring/run_experiment.py | 186 ++++++++++++++ 4 files changed, 614 insertions(+) create mode 100644 worker_plan/worker_plan_internal/scoring/experiment_config.py create mode 100644 worker_plan/worker_plan_internal/scoring/experiment_runner.py create mode 100644 worker_plan/worker_plan_internal/scoring/results_tracker.py create mode 100644 worker_plan/worker_plan_internal/scoring/run_experiment.py diff --git a/worker_plan/worker_plan_internal/scoring/experiment_config.py b/worker_plan/worker_plan_internal/scoring/experiment_config.py new file mode 100644 index 00000000..249688d4 --- /dev/null +++ b/worker_plan/worker_plan_internal/scoring/experiment_config.py @@ -0,0 +1,55 @@ +""" +Configuration for a single A/B experiment comparing baseline vs candidate system prompts. + +PROMPT> python -m worker_plan_internal.scoring.experiment_config +""" +import uuid +from dataclasses import dataclass, field +from typing import Optional + + +@dataclass +class ExperimentConfig: + """Configuration for a single prompt optimization experiment.""" + task_name: str + baseline_system_prompt: str + candidate_system_prompt: str + candidate_description: str + reference_plan_prompts: list[str] + experiment_id: str = field(default_factory=lambda: str(uuid.uuid4())) + judge_llm_name: Optional[str] = None + generation_llm_name: Optional[str] = None + + def __post_init__(self): + if not self.task_name: + raise ValueError("task_name must be non-empty.") + if not self.baseline_system_prompt: + raise ValueError("baseline_system_prompt must be non-empty.") + if not self.candidate_system_prompt: + raise ValueError("candidate_system_prompt must be non-empty.") + if not self.reference_plan_prompts: + raise ValueError("reference_plan_prompts must have at least one entry.") + + def to_dict(self) -> dict: + return { + "experiment_id": self.experiment_id, + "task_name": self.task_name, + "baseline_system_prompt": self.baseline_system_prompt, + "candidate_system_prompt": self.candidate_system_prompt, + "candidate_description": self.candidate_description, + "reference_plan_prompts": self.reference_plan_prompts, + "judge_llm_name": self.judge_llm_name, + "generation_llm_name": self.generation_llm_name, + } + + +if __name__ == "__main__": + config = ExperimentConfig( + task_name="swot_business", + baseline_system_prompt="You are a strategic consultant.", + candidate_system_prompt="You are an expert strategic consultant with 20 years of experience.", + candidate_description="Added experience qualifier to system prompt", + reference_plan_prompts=["Build a SaaS platform for small teams."], + ) + import json + print(json.dumps(config.to_dict(), indent=2)) diff --git a/worker_plan/worker_plan_internal/scoring/experiment_runner.py b/worker_plan/worker_plan_internal/scoring/experiment_runner.py new file mode 100644 index 00000000..2a26f6b6 --- /dev/null +++ b/worker_plan/worker_plan_internal/scoring/experiment_runner.py @@ -0,0 +1,230 @@ +""" +Minimal A/B experiment runner: runs baseline + candidate system prompts on a task, +scores both outputs, and returns a comparison result. + +For Phase 0 (#94) / Phase A (#59): calls task functions directly (no Luigi pipeline), +using the plan prompt as user_prompt. + +PROMPT> python -m worker_plan_internal.scoring.experiment_runner +""" +import json +import logging +import time +from dataclasses import dataclass, field +from datetime import datetime, timezone +from math import ceil +from typing import Optional + +from llama_index.core.llms.llm import LLM + +from worker_plan_internal.scoring.experiment_config import ExperimentConfig +from worker_plan_internal.scoring.task_output_scorer import TaskOutputScore, TaskOutputScorer + +logger = logging.getLogger(__name__) + + +# Registry of known task functions. Each entry maps a task_name to a callable +# with signature: (llm, user_prompt, system_prompt) -> dict +# Populated lazily to avoid import overhead. +_TASK_REGISTRY: dict[str, tuple[str, str]] = { + "swot_business": ( + "worker_plan_internal.swot.swot_phase2_conduct_analysis", + "swot_phase2_conduct_analysis", + ), +} + + +def _get_task_function(task_name: str): + """Dynamically import and return the task function.""" + if task_name not in _TASK_REGISTRY: + raise ValueError( + f"Unknown task_name: {task_name!r}. " + f"Available: {list(_TASK_REGISTRY.keys())}" + ) + module_path, func_name = _TASK_REGISTRY[task_name] + import importlib + module = importlib.import_module(module_path) + return getattr(module, func_name) + + +@dataclass +class ExperimentArm: + """Result from one arm (baseline or candidate) of an experiment.""" + label: str # "baseline" or "candidate" + system_prompt: str + task_output: dict + score: TaskOutputScore + + +@dataclass +class ExperimentSummary: + """Aggregate comparison between baseline and candidate.""" + baseline_avg: float + candidate_avg: float + delta: float + status: str # "keep", "discard", or "inconclusive" + + def to_dict(self) -> dict: + return { + "baseline_avg": round(self.baseline_avg, 2), + "candidate_avg": round(self.candidate_avg, 2), + "delta": round(self.delta, 2), + "status": self.status, + } + + +@dataclass +class ExperimentResult: + """Full result from an A/B experiment.""" + config: ExperimentConfig + arms: list[ExperimentArm] + timestamp: str + summary: ExperimentSummary + duration_seconds: int = 0 + + def to_dict(self) -> dict: + return { + "config": self.config.to_dict(), + "arms": [ + { + "label": arm.label, + "score": arm.score.model_dump(), + } + for arm in self.arms + ], + "timestamp": self.timestamp, + "summary": self.summary.to_dict(), + "duration_seconds": self.duration_seconds, + } + + +def _compute_summary( + baseline_scores: list[float], + candidate_scores: list[float], + threshold: float = 0.5, +) -> ExperimentSummary: + """Compute summary from score lists. Threshold is minimum delta to 'keep'.""" + baseline_avg = sum(baseline_scores) / len(baseline_scores) if baseline_scores else 0.0 + candidate_avg = sum(candidate_scores) / len(candidate_scores) if candidate_scores else 0.0 + delta = candidate_avg - baseline_avg + + if delta >= threshold: + status = "keep" + elif delta <= -threshold: + status = "discard" + else: + status = "inconclusive" + + return ExperimentSummary( + baseline_avg=baseline_avg, + candidate_avg=candidate_avg, + delta=delta, + status=status, + ) + + +class ExperimentRunner: + """Runs an A/B experiment comparing baseline vs candidate system prompts.""" + + @staticmethod + def run( + config: ExperimentConfig, + generation_llm: LLM, + judge_llm: LLM, + threshold: float = 0.5, + ) -> ExperimentResult: + """ + Run the experiment: for each reference plan prompt, call the task function + with both baseline and candidate system prompts, score both, compare. + + Args: + config: Experiment configuration. + generation_llm: LLM used to generate task outputs. + judge_llm: LLM used to score task outputs. + threshold: Minimum delta to declare "keep" or "discard". + """ + task_fn = _get_task_function(config.task_name) + + arms: list[ExperimentArm] = [] + baseline_scores: list[float] = [] + candidate_scores: list[float] = [] + + start_time = time.perf_counter() + + for i, plan_prompt in enumerate(config.reference_plan_prompts): + logger.info( + f"Reference prompt {i+1}/{len(config.reference_plan_prompts)}" + ) + + # Run baseline + logger.info("Running baseline arm...") + baseline_output = task_fn( + generation_llm, plan_prompt, config.baseline_system_prompt + ) + baseline_score_result = TaskOutputScorer.score( + llm=judge_llm, + task_output_json=baseline_output, + plan_prompt=plan_prompt, + task_name=config.task_name, + ) + baseline_arm = ExperimentArm( + label="baseline", + system_prompt=config.baseline_system_prompt, + task_output=baseline_output, + score=baseline_score_result.response, + ) + arms.append(baseline_arm) + baseline_scores.append(baseline_score_result.response.composite_score) + + # Run candidate + logger.info("Running candidate arm...") + candidate_output = task_fn( + generation_llm, plan_prompt, config.candidate_system_prompt + ) + candidate_score_result = TaskOutputScorer.score( + llm=judge_llm, + task_output_json=candidate_output, + plan_prompt=plan_prompt, + task_name=config.task_name, + ) + candidate_arm = ExperimentArm( + label="candidate", + system_prompt=config.candidate_system_prompt, + task_output=candidate_output, + score=candidate_score_result.response, + ) + arms.append(candidate_arm) + candidate_scores.append(candidate_score_result.response.composite_score) + + logger.info( + f"Prompt {i+1}: baseline={baseline_score_result.response.composite_score:.1f}, " + f"candidate={candidate_score_result.response.composite_score:.1f}" + ) + + end_time = time.perf_counter() + duration = int(ceil(end_time - start_time)) + + summary = _compute_summary(baseline_scores, candidate_scores, threshold) + + return ExperimentResult( + config=config, + arms=arms, + timestamp=datetime.now(timezone.utc).isoformat(), + summary=summary, + duration_seconds=duration, + ) + + +if __name__ == "__main__": + logging.basicConfig(level=logging.INFO) + + # Demonstrate structure only (requires LLM to actually run) + config = ExperimentConfig( + task_name="swot_business", + baseline_system_prompt="You are a strategic consultant.", + candidate_system_prompt="You are an expert strategic consultant with 20 years of experience.", + candidate_description="Added experience qualifier", + reference_plan_prompts=["Build a SaaS platform for small teams."], + ) + print(json.dumps(config.to_dict(), indent=2)) + print("\nTo run a full experiment, use: python -m worker_plan_internal.scoring.run_experiment") diff --git a/worker_plan/worker_plan_internal/scoring/results_tracker.py b/worker_plan/worker_plan_internal/scoring/results_tracker.py new file mode 100644 index 00000000..b134dda5 --- /dev/null +++ b/worker_plan/worker_plan_internal/scoring/results_tracker.py @@ -0,0 +1,143 @@ +""" +JSONL-based results tracker for experiment outcomes. + +Follows the track_activity.jsonl convention: one JSON object per line, +append-only, human-readable. + +PROMPT> python -m worker_plan_internal.scoring.results_tracker +""" +import json +import logging +from dataclasses import dataclass +from pathlib import Path +from typing import Optional + +logger = logging.getLogger(__name__) + + +@dataclass +class ResultsEntry: + """A single experiment result entry for the JSONL log.""" + experiment_id: str + timestamp: str + task_name: str + baseline_avg_score: float + candidate_avg_score: float + delta: float + status: str # "keep", "discard", "inconclusive" + candidate_description: str + duration_seconds: int = 0 + + def to_dict(self) -> dict: + return { + "experiment_id": self.experiment_id, + "timestamp": self.timestamp, + "task_name": self.task_name, + "baseline_avg_score": round(self.baseline_avg_score, 2), + "candidate_avg_score": round(self.candidate_avg_score, 2), + "delta": round(self.delta, 2), + "status": self.status, + "candidate_description": self.candidate_description, + "duration_seconds": self.duration_seconds, + } + + +class ResultsTracker: + """Append-only JSONL tracker for experiment results.""" + + def __init__(self, results_file: Path): + self.results_file = results_file + + def append(self, entry: ResultsEntry) -> None: + """Append a single results entry to the JSONL file.""" + self.results_file.parent.mkdir(parents=True, exist_ok=True) + with open(self.results_file, "a", encoding="utf-8") as f: + f.write(json.dumps(entry.to_dict()) + "\n") + logger.info(f"Appended result for experiment {entry.experiment_id}") + + def load_all(self) -> list[ResultsEntry]: + """Load all results entries from the JSONL file.""" + if not self.results_file.exists(): + return [] + entries = [] + with open(self.results_file, "r", encoding="utf-8") as f: + for line_num, line in enumerate(f, 1): + line = line.strip() + if not line: + continue + try: + d = json.loads(line) + entries.append(ResultsEntry(**d)) + except (json.JSONDecodeError, TypeError) as e: + logger.warning(f"Skipping malformed line {line_num}: {e}") + return entries + + def recent(self, n: int = 10) -> list[ResultsEntry]: + """Return the N most recent entries.""" + all_entries = self.load_all() + return all_entries[-n:] + + def summary_for_task(self, task_name: str) -> dict: + """Summarize results for a specific task.""" + entries = [e for e in self.load_all() if e.task_name == task_name] + if not entries: + return {"task_name": task_name, "total_experiments": 0} + + kept = [e for e in entries if e.status == "keep"] + discarded = [e for e in entries if e.status == "discard"] + inconclusive = [e for e in entries if e.status == "inconclusive"] + avg_delta = sum(e.delta for e in entries) / len(entries) + + return { + "task_name": task_name, + "total_experiments": len(entries), + "kept": len(kept), + "discarded": len(discarded), + "inconclusive": len(inconclusive), + "avg_delta": round(avg_delta, 2), + } + + +if __name__ == "__main__": + import tempfile + + logging.basicConfig(level=logging.INFO) + + with tempfile.TemporaryDirectory() as tmpdir: + tracker = ResultsTracker(Path(tmpdir) / "experiment_results.jsonl") + + # Add sample entries + entry1 = ResultsEntry( + experiment_id="test-001", + timestamp="2026-03-09T10:00:00Z", + task_name="swot_business", + baseline_avg_score=6.2, + candidate_avg_score=7.1, + delta=0.9, + status="keep", + candidate_description="Added experience qualifier", + duration_seconds=120, + ) + entry2 = ResultsEntry( + experiment_id="test-002", + timestamp="2026-03-09T11:00:00Z", + task_name="swot_business", + baseline_avg_score=6.5, + candidate_avg_score=6.3, + delta=-0.2, + status="inconclusive", + candidate_description="Removed bullet formatting", + duration_seconds=95, + ) + tracker.append(entry1) + tracker.append(entry2) + + print("All entries:") + for entry in tracker.load_all(): + print(f" {entry.experiment_id}: {entry.status} (delta={entry.delta:+.1f})") + + print(f"\nRecent 1: {tracker.recent(1)[0].experiment_id}") + + summary = tracker.summary_for_task("swot_business") + print(f"\nSummary: {json.dumps(summary, indent=2)}") + print("\nPASSED") diff --git a/worker_plan/worker_plan_internal/scoring/run_experiment.py b/worker_plan/worker_plan_internal/scoring/run_experiment.py new file mode 100644 index 00000000..b8aeec34 --- /dev/null +++ b/worker_plan/worker_plan_internal/scoring/run_experiment.py @@ -0,0 +1,186 @@ +""" +CLI for running a single A/B experiment comparing baseline vs candidate system prompts. + +PROMPT> python -m worker_plan_internal.scoring.run_experiment \ + --task swot_business \ + --candidate-prompt "You are an expert strategic consultant with 20 years of experience." \ + --candidate-description "Added experience qualifier" \ + --results-dir ./experiments +""" +import argparse +import json +import logging +import sys +from pathlib import Path + +from worker_plan_internal.scoring.experiment_config import ExperimentConfig +from worker_plan_internal.scoring.experiment_runner import ExperimentRunner, _TASK_REGISTRY +from worker_plan_internal.scoring.results_tracker import ResultsEntry, ResultsTracker + +logger = logging.getLogger(__name__) + + +# Default baseline system prompts per task +_DEFAULT_BASELINES: dict[str, str] = { + "swot_business": ( + "You are a universal strategic consultant with expertise in project management, " + "business analysis, and innovation across various industries." + ), +} + + +def _load_reference_prompts(prompts_file: str | None, inline_prompt: str | None) -> list[str]: + """Load reference plan prompts from file or inline.""" + if prompts_file: + path = Path(prompts_file) + if not path.exists(): + print(f"Error: prompts file not found: {path}", file=sys.stderr) + sys.exit(1) + # One prompt per line, skip empty lines + return [line.strip() for line in path.read_text().splitlines() if line.strip()] + elif inline_prompt: + return [inline_prompt] + else: + # Default reference prompt for testing + return ["Build a SaaS platform for project management targeting small teams with limited budgets."] + + +def main() -> None: + parser = argparse.ArgumentParser( + description="Run an A/B experiment comparing baseline vs candidate system prompts." + ) + parser.add_argument( + "--task", + type=str, + required=True, + choices=list(_TASK_REGISTRY.keys()), + help="Task to run the experiment on.", + ) + parser.add_argument( + "--candidate-prompt", + type=str, + required=True, + help="Candidate system prompt to test.", + ) + parser.add_argument( + "--candidate-description", + type=str, + default="", + help="Short description of what changed in the candidate prompt.", + ) + parser.add_argument( + "--baseline-prompt", + type=str, + default=None, + help="Baseline system prompt. Uses default for the task if not specified.", + ) + parser.add_argument( + "--reference-prompts-file", + type=str, + default=None, + help="File with reference plan prompts (one per line).", + ) + parser.add_argument( + "--reference-prompt", + type=str, + default=None, + help="Single inline reference plan prompt.", + ) + parser.add_argument( + "--generation-llm", + type=str, + default=None, + help="LLM model name for task generation.", + ) + parser.add_argument( + "--judge-llm", + type=str, + default=None, + help="LLM model name for scoring/judging.", + ) + parser.add_argument( + "--results-dir", + type=str, + default="./experiments", + help="Directory for results JSONL file.", + ) + parser.add_argument( + "--threshold", + type=float, + default=0.5, + help="Minimum score delta to declare keep/discard (default: 0.5).", + ) + args = parser.parse_args() + + logging.basicConfig(level=logging.INFO) + logging.getLogger("httpx").setLevel(logging.WARNING) + + from worker_plan_internal.llm_factory import get_llm + + baseline_prompt = args.baseline_prompt or _DEFAULT_BASELINES.get(args.task) + if not baseline_prompt: + print(f"Error: no default baseline for task {args.task!r}, use --baseline-prompt", file=sys.stderr) + sys.exit(1) + + reference_prompts = _load_reference_prompts(args.reference_prompts_file, args.reference_prompt) + + config = ExperimentConfig( + task_name=args.task, + baseline_system_prompt=baseline_prompt, + candidate_system_prompt=args.candidate_prompt, + candidate_description=args.candidate_description or "No description", + reference_plan_prompts=reference_prompts, + judge_llm_name=args.judge_llm, + generation_llm_name=args.generation_llm, + ) + + generation_llm = get_llm(args.generation_llm) if args.generation_llm else get_llm() + judge_llm = get_llm(args.judge_llm) if args.judge_llm else get_llm() + + print(f"Running experiment: {config.experiment_id}") + print(f"Task: {config.task_name}") + print(f"Reference prompts: {len(reference_prompts)}") + print(f"Candidate: {config.candidate_description}") + print() + + result = ExperimentRunner.run( + config=config, + generation_llm=generation_llm, + judge_llm=judge_llm, + threshold=args.threshold, + ) + + # Save full result + results_dir = Path(args.results_dir) + results_dir.mkdir(parents=True, exist_ok=True) + full_result_path = results_dir / f"experiment_{config.experiment_id}.json" + full_result_path.write_text(json.dumps(result.to_dict(), indent=2)) + print(f"\nFull result saved to: {full_result_path}") + + # Append to JSONL tracker + tracker = ResultsTracker(results_dir / "experiment_results.jsonl") + entry = ResultsEntry( + experiment_id=config.experiment_id, + timestamp=result.timestamp, + task_name=config.task_name, + baseline_avg_score=result.summary.baseline_avg, + candidate_avg_score=result.summary.candidate_avg, + delta=result.summary.delta, + status=result.summary.status, + candidate_description=config.candidate_description, + duration_seconds=result.duration_seconds, + ) + tracker.append(entry) + + # Print summary + print(f"\n{'='*50}") + print(f"RESULT: {result.summary.status.upper()}") + print(f"Baseline avg: {result.summary.baseline_avg:.1f}") + print(f"Candidate avg: {result.summary.candidate_avg:.1f}") + print(f"Delta: {result.summary.delta:+.1f}") + print(f"Duration: {result.duration_seconds}s") + print(f"{'='*50}") + + +if __name__ == "__main__": + main()