| layout | title | parent | nav_order |
|---|---|---|---|
default |
Chapter 6: Evaluation & Optimization |
Haystack Tutorial |
6 |
Welcome to Chapter 6: Evaluation & Optimization. In this part of Haystack: Deep Dive Tutorial, you will build an intuitive mental model first, then move into concrete implementation details and practical production tradeoffs.
Measure search quality and optimize Haystack pipelines for maximum performance.
This chapter covers evaluation methodologies for assessing search system quality and optimization techniques to improve performance. You'll learn to measure retrieval accuracy, generation quality, and end-to-end system performance.
from haystack.evaluation import EvaluationRunResult
from sklearn.metrics import precision_score, recall_score, f1_score, ndcg_score
import numpy as np
class RetrievalEvaluator:
def __init__(self):
self.metrics = {}
def evaluate_retrieval(self, queries, retrieved_docs, relevant_docs, k_values=[1, 3, 5, 10]):
"""Comprehensive retrieval evaluation"""
results = {}
for k in k_values:
precision_scores = []
recall_scores = []
f1_scores = []
ndcg_scores = []
mrr_scores = []
for query_docs, query_relevant in zip(retrieved_docs, relevant_docs):
# Limit to top-k
top_k_docs = query_docs[:k]
# Calculate metrics
precision = self._calculate_precision_at_k(top_k_docs, query_relevant, k)
recall = self._calculate_recall_at_k(top_k_docs, query_relevant, k)
f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
# NDCG calculation
ndcg = self._calculate_ndcg_at_k(top_k_docs, query_relevant, k)
# MRR calculation
mrr = self._calculate_mrr(top_k_docs, query_relevant)
precision_scores.append(precision)
recall_scores.append(recall)
f1_scores.append(f1)
ndcg_scores.append(ndcg)
mrr_scores.append(mrr)
results[f"k={k}"] = {
"precision": np.mean(precision_scores),
"recall": np.mean(recall_scores),
"f1": np.mean(f1_scores),
"ndcg": np.mean(ndcg_scores),
"mrr": np.mean(mrr_scores)
}
return results
def _calculate_precision_at_k(self, retrieved, relevant, k):
"""Calculate Precision@K"""
retrieved_set = set(doc.id for doc in retrieved)
relevant_set = set(relevant)
if not retrieved:
return 0.0
return len(retrieved_set & relevant_set) / len(retrieved)
def _calculate_recall_at_k(self, retrieved, relevant, k):
"""Calculate Recall@K"""
retrieved_set = set(doc.id for doc in retrieved)
relevant_set = set(relevant)
if not relevant_set:
return 1.0 if not retrieved else 0.0
return len(retrieved_set & relevant_set) / len(relevant_set)
def _calculate_ndcg_at_k(self, retrieved, relevant, k):
"""Calculate NDCG@K"""
relevant_set = set(relevant)
dcg = 0.0
for i, doc in enumerate(retrieved[:k]):
if doc.id in relevant_set:
dcg += 1.0 / np.log2(i + 2)
# Calculate IDCG (ideal DCG)
idcg = sum(1.0 / np.log2(i + 2) for i in range(min(len(relevant_set), k)))
return dcg / idcg if idcg > 0 else 0.0
def _calculate_mrr(self, retrieved, relevant):
"""Calculate Mean Reciprocal Rank"""
relevant_set = set(relevant)
for rank, doc in enumerate(retrieved, 1):
if doc.id in relevant_set:
return 1.0 / rank
return 0.0
# Usage
evaluator = RetrievalEvaluator()
# Sample evaluation data
queries = ["What is machine learning?", "How do neural networks work?"]
retrieved_docs = [
# Retrieved docs for first query (simplified)
[Document(id="doc1", content="ML definition"), Document(id="doc2", content="AI overview")],
# Retrieved docs for second query
[Document(id="doc3", content="Neural networks"), Document(id="doc4", content="Deep learning")]
]
relevant_docs = [
["doc1", "doc5"], # Ground truth relevant docs
["doc3", "doc6"]
]
results = evaluator.evaluate_retrieval(queries, retrieved_docs, relevant_docs)
print("Retrieval Evaluation Results:")
for k, metrics in results.items():
print(f"{k}:")
print(".3f")
print(".3f")
print(".3f")
print(".3f")
print(".3f")from haystack.evaluation import Evaluator
from rouge_score import rouge_scorer
from nltk.translate.bleu_score import sentence_bleu
import evaluate
class GenerationEvaluator:
def __init__(self):
self.rouge_scorer = rouge_scorer.RougeScorer(['rouge1', 'rouge2', 'rougeL'], use_stemmer=True)
self.bertscore = evaluate.load("bertscore")
def evaluate_generation(self, predictions, references):
"""Evaluate generation quality against references"""
results = {
"rouge": self._calculate_rouge(predictions, references),
"bleu": self._calculate_bleu(predictions, references),
"bertscore": self._calculate_bertscore(predictions, references),
"diversity": self._calculate_diversity(predictions),
"factual_consistency": self._evaluate_factual_consistency(predictions, references)
}
return results
def _calculate_rouge(self, predictions, references):
"""Calculate ROUGE scores"""
rouge1_scores = []
rouge2_scores = []
rougeL_scores = []
for pred, ref in zip(predictions, references):
scores = self.rouge_scorer.score(ref, pred)
rouge1_scores.append(scores["rouge1"].fmeasure)
rouge2_scores.append(scores["rouge2"].fmeasure)
rougeL_scores.append(scores["rougeL"].fmeasure)
return {
"rouge1": np.mean(rouge1_scores),
"rouge2": np.mean(rouge2_scores),
"rougeL": np.mean(rougeL_scores)
}
def _calculate_bleu(self, predictions, references):
"""Calculate BLEU scores"""
bleu_scores = []
for pred, ref in zip(predictions, references):
# Tokenize
pred_tokens = pred.split()
ref_tokens = [ref.split()] # BLEU expects list of references
try:
bleu = sentence_bleu(ref_tokens, pred_tokens)
bleu_scores.append(bleu)
except:
bleu_scores.append(0.0)
return np.mean(bleu_scores)
def _calculate_bertscore(self, predictions, references):
"""Calculate BERTScore"""
results = self.bertscore.compute(
predictions=predictions,
references=references,
lang="en"
)
return {
"precision": np.mean(results["precision"]),
"recall": np.mean(results["recall"]),
"f1": np.mean(results["f1"])
}
def _calculate_diversity(self, predictions):
"""Calculate lexical diversity"""
diversities = []
for pred in predictions:
tokens = pred.lower().split()
if len(tokens) > 1:
unique_tokens = set(tokens)
diversity = len(unique_tokens) / len(tokens)
diversities.append(diversity)
else:
diversities.append(0.0)
return np.mean(diversities)
def _evaluate_factual_consistency(self, predictions, references):
"""Evaluate factual consistency (simplified)"""
consistency_scores = []
for pred, ref in zip(predictions, references):
# Simple consistency check
pred_lower = pred.lower()
ref_lower = ref.lower()
# Check for contradictory statements
contradictions = 0
if ("not" in pred_lower) != ("not" in ref_lower):
contradictions += 1
# Calculate overlap
pred_words = set(pred_lower.split())
ref_words = set(ref_lower.split())
overlap = len(pred_words & ref_words)
total = len(pred_words | ref_words)
consistency = (overlap / total) if total > 0 else 0
consistency_scores.append(consistency)
return np.mean(consistency_scores)
# Usage
gen_evaluator = GenerationEvaluator()
predictions = [
"Machine learning is a subset of artificial intelligence that enables computers to learn from data.",
"Neural networks are computing systems inspired by biological neural networks."
]
references = [
"Machine learning is a method of data analysis that automates analytical model building.",
"Neural networks are computing systems inspired by the biological neural networks in animal brains."
]
results = gen_evaluator.evaluate_generation(predictions, references)
print("Generation Evaluation Results:")
for metric, score in results.items():
if isinstance(score, dict):
print(f"{metric}:")
for sub_metric, sub_score in score.items():
print(f" {sub_metric}: {sub_score:.3f}")
else:
print(f"{metric}: {score:.3f}")class IndexOptimizer:
def __init__(self, document_store):
self.document_store = document_store
def optimize_index(self, optimization_type="comprehensive"):
"""Apply various index optimizations"""
optimizations = {}
if optimization_type in ["comprehensive", "chunking"]:
optimizations["chunking"] = self._optimize_chunking()
if optimization_type in ["comprehensive", "embedding"]:
optimizations["embedding"] = self._optimize_embedding()
if optimization_type in ["comprehensive", "index_structure"]:
optimizations["index_structure"] = self._optimize_index_structure()
if optimization_type in ["comprehensive", "caching"]:
optimizations["caching"] = self._optimize_caching()
return optimizations
def _optimize_chunking(self):
"""Optimize document chunking strategy"""
# Analyze current chunking performance
current_stats = self._analyze_chunking_stats()
# Determine optimal chunk size
optimal_size = self._calculate_optimal_chunk_size(current_stats)
# Apply new chunking strategy
self.document_store.update_chunking_strategy(
chunk_size=optimal_size,
overlap=optimal_size // 10
)
return {
"old_chunk_size": current_stats["avg_chunk_size"],
"new_chunk_size": optimal_size,
"improvement": self._calculate_chunking_improvement(current_stats, optimal_size)
}
def _optimize_embedding(self):
"""Optimize embedding configuration"""
# Test different embedding models
models_to_test = [
"sentence-transformers/all-MiniLM-L6-v2",
"sentence-transformers/all-MiniLM-L12-v2",
"sentence-transformers/paraphrase-MiniLM-L6-v2"
]
best_model = None
best_score = 0
for model_name in models_to_test:
score = self._evaluate_embedding_model(model_name)
if score > best_score:
best_score = score
best_model = model_name
# Apply best model
self.document_store.update_embedding_model(best_model)
return {
"selected_model": best_model,
"performance_score": best_score,
"tested_models": len(models_to_test)
}
def _optimize_index_structure(self):
"""Optimize index structure for better performance"""
# Analyze current index
index_stats = self._analyze_index_stats()
# Determine optimal index type
if index_stats["dataset_size"] > 100000:
index_type = "IVF_PQ" # Approximate nearest neighbors
elif index_stats["dimensionality"] > 768:
index_type = "HNSW" # Hierarchical Navigable Small World
else:
index_type = "FLAT" # Exact search
# Apply optimization
self.document_store.rebuild_index(index_type=index_type)
return {
"old_index_type": index_stats["current_index_type"],
"new_index_type": index_type,
"expected_improvement": self._estimate_index_improvement(index_type, index_stats)
}
def _optimize_caching(self):
"""Optimize caching strategy"""
# Analyze query patterns
query_patterns = self._analyze_query_patterns()
# Implement intelligent caching
cache_config = {
"frequent_queries_ttl": 3600, # 1 hour
"semantic_cache_enabled": True,
"result_deduplication": True,
"prefetching_enabled": query_patterns["has_patterns"]
}
self.document_store.configure_caching(cache_config)
return cache_config
def _analyze_chunking_stats(self):
"""Analyze current chunking performance"""
# This would analyze actual document chunks
return {
"avg_chunk_size": 512,
"chunk_count": 1000,
"overlap_ratio": 0.1
}
def _calculate_optimal_chunk_size(self, stats):
"""Calculate optimal chunk size based on analysis"""
# Simple heuristic - could be more sophisticated
base_size = 512
if stats["avg_chunk_size"] > 600:
return base_size * 0.8
elif stats["avg_chunk_size"] < 400:
return base_size * 1.2
else:
return base_size
def _evaluate_embedding_model(self, model_name):
"""Evaluate embedding model performance"""
# Simplified evaluation - would use actual retrieval metrics
model_scores = {
"sentence-transformers/all-MiniLM-L6-v2": 0.85,
"sentence-transformers/all-MiniLM-L12-v2": 0.87,
"sentence-transformers/paraphrase-MiniLM-L6-v2": 0.82
}
return model_scores.get(model_name, 0.8)
# Usage
optimizer = IndexOptimizer(document_store)
optimizations = optimizer.optimize_index()
print("Index Optimization Results:")
for opt_type, results in optimizations.items():
print(f"{opt_type}:")
for key, value in results.items():
print(f" {key}: {value}")class QueryOptimizer:
def __init__(self):
self.query_patterns = {}
self.performance_stats = {}
def optimize_query(self, query, context=None):
"""Apply various query optimizations"""
optimized_query = query
# Apply optimizations in sequence
optimized_query = self._expand_query(optimized_query, context)
optimized_query = self._rewrite_query(optimized_query)
optimized_query = self._add_boosting_terms(optimized_query, context)
return optimized_query
def _expand_query(self, query, context):
"""Expand query with synonyms and related terms"""
# Simple synonym expansion
synonyms = {
"machine learning": ["ML", "artificial intelligence", "data science"],
"neural network": ["neural net", "deep learning", "artificial neural network"],
"database": ["data store", "data warehouse", "repository"]
}
expanded_terms = []
query_lower = query.lower()
for term, syns in synonyms.items():
if term in query_lower:
expanded_terms.extend(syns[:2]) # Limit to 2 synonyms
if expanded_terms:
return f"{query} {' '.join(expanded_terms)}"
return query
def _rewrite_query(self, query):
"""Rewrite query for better retrieval"""
# Simple query rewriting rules
rewrites = {
"how do i": "how to",
"what's": "what is",
"can't": "cannot",
"don't": "do not"
}
rewritten = query
for old, new in rewrites.items():
rewritten = rewritten.replace(old, new)
return rewritten
def _add_boosting_terms(self, query, context):
"""Add context-based boosting terms"""
if not context or not context.get("user_profile"):
return query
user_profile = context["user_profile"]
boost_terms = []
# Add expertise-based terms
if user_profile.get("expertise") == "beginner":
boost_terms.extend(["introduction", "basics", "fundamentals"])
elif user_profile.get("expertise") == "expert":
boost_terms.extend(["advanced", "implementation", "optimization"])
# Add domain-specific terms
if user_profile.get("domain"):
domain_terms = {
"software": ["programming", "development", "code"],
"data_science": ["statistics", "modeling", "analysis"],
"business": ["strategy", "management", "operations"]
}
boost_terms.extend(domain_terms.get(user_profile["domain"], []))
if boost_terms:
return f"{query} {' '.join(boost_terms[:3])}" # Limit boost terms
return query
def track_performance(self, query, response_time, result_quality):
"""Track query performance for optimization"""
query_hash = hash(query) % 10000 # Simple hashing
if query_hash not in self.performance_stats:
self.performance_stats[query_hash] = []
self.performance_stats[query_hash].append({
"response_time": response_time,
"quality_score": result_quality,
"timestamp": time.time()
})
# Keep only recent stats (last 100)
if len(self.performance_stats[query_hash]) > 100:
self.performance_stats[query_hash] = self.performance_stats[query_hash][-100:]
def analyze_performance(self):
"""Analyze query performance patterns"""
analysis = {
"slow_queries": [],
"low_quality_queries": [],
"optimization_opportunities": []
}
for query_hash, stats in self.performance_stats.items():
avg_time = np.mean([s["response_time"] for s in stats])
avg_quality = np.mean([s["quality_score"] for s in stats])
if avg_time > 2.0: # Slow queries
analysis["slow_queries"].append({
"query_hash": query_hash,
"avg_time": avg_time,
"count": len(stats)
})
if avg_quality < 0.7: # Low quality queries
analysis["low_quality_queries"].append({
"query_hash": query_hash,
"avg_quality": avg_quality,
"count": len(stats)
})
return analysis
# Usage
query_optimizer = QueryOptimizer()
# Optimize a query
original_query = "how do i learn machine learning"
optimized_query = query_optimizer.optimize_query(
original_query,
context={"user_profile": {"expertise": "beginner", "domain": "software"}}
)
print(f"Original: {original_query}")
print(f"Optimized: {optimized_query}")
# Track performance
query_optimizer.track_performance(optimized_query, 1.2, 0.85)import random
from collections import defaultdict
import json
class ABTestingFramework:
def __init__(self):
self.experiments = {}
self.results = defaultdict(dict)
def create_experiment(self, experiment_name, variants, traffic_split=None):
"""Create a new A/B test experiment"""
if traffic_split is None:
# Equal split
traffic_split = [1.0 / len(variants)] * len(variants)
self.experiments[experiment_name] = {
"variants": variants,
"traffic_split": traffic_split,
"start_time": time.time(),
"metrics": {}
}
def assign_variant(self, experiment_name, user_id):
"""Assign user to a variant based on consistent hashing"""
if experiment_name not in self.experiments:
return None
experiment = self.experiments[experiment_name]
variants = experiment["variants"]
traffic_split = experiment["traffic_split"]
# Consistent assignment based on user_id
user_hash = hash(user_id) % 100
cumulative_split = 0
for i, split in enumerate(traffic_split):
cumulative_split += split
if user_hash < cumulative_split * 100:
return variants[i]
return variants[0] # Fallback
def track_metric(self, experiment_name, variant, metric_name, value):
"""Track a metric for a specific variant"""
if experiment_name not in self.experiments:
return
if variant not in self.results[experiment_name]:
self.results[experiment_name][variant] = defaultdict(list)
self.results[experiment_name][variant][metric_name].append(value)
def get_experiment_results(self, experiment_name, min_samples=100):
"""Get statistical results for an experiment"""
if experiment_name not in self.results:
return None
experiment = self.experiments[experiment_name]
results = self.results[experiment_name]
analysis = {}
for variant, metrics in results.items():
variant_analysis = {}
for metric_name, values in metrics.items():
if len(values) >= min_samples:
variant_analysis[metric_name] = {
"mean": np.mean(values),
"std": np.std(values),
"count": len(values),
"confidence_interval": self._calculate_confidence_interval(values)
}
analysis[variant] = variant_analysis
# Statistical significance testing
analysis["statistical_tests"] = self._perform_statistical_tests(results)
return analysis
def _calculate_confidence_interval(self, values, confidence=0.95):
"""Calculate confidence interval"""
mean = np.mean(values)
std = np.std(values)
n = len(values)
# z-score for 95% confidence
z = 1.96
margin = z * (std / np.sqrt(n))
return {
"lower": mean - margin,
"upper": mean + margin,
"margin": margin
}
def _perform_statistical_tests(self, results):
"""Perform statistical significance tests between variants"""
tests = {}
if len(results) < 2:
return tests
# Compare first two variants for each metric
variants = list(results.keys())
variant1, variant2 = variants[0], variants[1]
for metric_name in results[variant1].keys():
if metric_name in results[variant2]:
values1 = results[variant1][metric_name]
values2 = results[variant2][metric_name]
if len(values1) > 10 and len(values2) > 10:
# T-test
t_stat, p_value = stats.ttest_ind(values1, values2)
tests[f"{metric_name}_{variant1}_vs_{variant2}"] = {
"t_statistic": t_stat,
"p_value": p_value,
"significant": p_value < 0.05
}
return tests
# Usage
ab_tester = ABTestingFramework()
# Create experiment comparing different retrieval strategies
ab_tester.create_experiment(
"retrieval_strategy_test",
variants=["bm25", "semantic", "hybrid"],
traffic_split=[0.3, 0.3, 0.4] # 30%, 30%, 40% traffic split
)
# Simulate user interactions
users = [f"user_{i}" for i in range(1000)]
for user in users:
variant = ab_tester.assign_variant("retrieval_strategy_test", user)
# Simulate performance metrics based on variant
if variant == "bm25":
latency = random.normalvariate(1.2, 0.2)
accuracy = random.normalvariate(0.75, 0.05)
elif variant == "semantic":
latency = random.normalvariate(1.8, 0.3)
accuracy = random.normalvariate(0.82, 0.04)
else: # hybrid
latency = random.normalvariate(1.5, 0.25)
accuracy = random.normalvariate(0.85, 0.03)
ab_tester.track_metric("retrieval_strategy_test", variant, "latency", latency)
ab_tester.track_metric("retrieval_strategy_test", variant, "accuracy", accuracy)
# Get results
results = ab_tester.get_experiment_results("retrieval_strategy_test")
print("A/B Test Results:")
print(json.dumps(results, indent=2, default=str))from sklearn.model_selection import ParameterSampler
import optuna
class HyperparameterOptimizer:
def __init__(self, pipeline_factory):
self.pipeline_factory = pipeline_factory
def optimize_pipeline(self, train_data, val_data, param_space, n_trials=50):
"""Optimize pipeline hyperparameters"""
def objective(trial):
# Sample hyperparameters
params = {}
for param_name, param_config in param_space.items():
if param_config["type"] == "int":
params[param_name] = trial.suggest_int(
param_name,
param_config["low"],
param_config["high"]
)
elif param_config["type"] == "float":
params[param_name] = trial.suggest_float(
param_name,
param_config["low"],
param_config["high"]
)
elif param_config["type"] == "categorical":
params[param_name] = trial.suggest_categorical(
param_name,
param_config["choices"]
)
# Create pipeline with parameters
pipeline = self.pipeline_factory(params)
# Evaluate on validation data
score = self._evaluate_pipeline(pipeline, val_data)
return score
# Run optimization
study = optuna.create_study(direction="maximize")
study.optimize(objective, n_trials=n_trials)
# Get best parameters
best_params = study.best_params
best_score = study.best_value
return {
"best_params": best_params,
"best_score": best_score,
"study": study
}
def _evaluate_pipeline(self, pipeline, eval_data):
"""Evaluate pipeline performance"""
# Simplified evaluation - would use actual metrics
total_score = 0
for query, expected_results in eval_data:
try:
results = pipeline.run({"query": query})
score = self._calculate_query_score(results, expected_results)
total_score += score
except Exception as e:
print(f"Error evaluating query '{query}': {e}")
total_score += 0 # Penalize failures
return total_score / len(eval_data)
def _calculate_query_score(self, results, expected):
"""Calculate score for a single query"""
# Simplified scoring - would use proper retrieval metrics
retrieved_ids = {doc.id for doc in results["documents"]}
expected_ids = set(expected)
if not expected_ids:
return 1.0 if not retrieved_ids else 0.0
precision = len(retrieved_ids & expected_ids) / len(retrieved_ids) if retrieved_ids else 0
recall = len(retrieved_ids & expected_ids) / len(expected_ids)
f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0
return f1
# Usage
def create_optimized_pipeline(params):
"""Factory function for creating pipelines with different parameters"""
pipeline = Pipeline()
# Retriever with tunable parameters
retriever = EmbeddingRetriever(
document_store=document_store,
top_k=params["top_k"]
)
pipeline.add_component("retriever", retriever)
# Generator with tunable parameters
generator = OpenAIGenerator(
model=params["model"],
generation_kwargs={
"temperature": params["temperature"],
"max_tokens": params["max_tokens"]
}
)
pipeline.add_component("generator", generator)
# Connect components
pipeline.connect("retriever", "generator")
return pipeline
# Define parameter space
param_space = {
"top_k": {"type": "int", "low": 3, "high": 10},
"model": {"type": "categorical", "choices": ["gpt-3.5-turbo", "gpt-4", "gpt-4o"]},
"temperature": {"type": "float", "low": 0.1, "high": 1.0},
"max_tokens": {"type": "int", "low": 100, "high": 500}
}
# Sample evaluation data
eval_data = [
("What is machine learning?", ["doc1", "doc2"]),
("How do neural networks work?", ["doc3", "doc4"])
]
optimizer = HyperparameterOptimizer(create_optimized_pipeline)
results = optimizer.optimize_pipeline(None, eval_data, param_space, n_trials=20)
print("Hyperparameter Optimization Results:")
print(f"Best Parameters: {results['best_params']}")
print(f"Best Score: {results['best_score']:.3f}")- Use Multiple Metrics: Combine precision, recall, F1, NDCG, and user satisfaction
- Cross-Validation: Evaluate on multiple datasets and query types
- Statistical Significance: Use proper statistical tests for comparing systems
- User-Centric Metrics: Include user satisfaction and task completion rates
- Continuous Evaluation: Monitor performance over time and after updates
- Start Simple: Begin with basic optimizations before advanced techniques
- Measure Impact: A/B test optimizations to ensure they improve user experience
- Iterative Approach: Make small changes and evaluate each incrementally
- Resource Awareness: Balance performance improvements with resource costs
- Monitoring: Track optimization impact on all system metrics
- Automated Testing: Implement continuous evaluation pipelines
- Model Versioning: Track which optimizations work for which model versions
- Rollback Capability: Ability to revert optimizations that degrade performance
- Experiment Tracking: Log all optimization experiments and results
- Production Monitoring: Monitor optimized systems for performance regression
With evaluation and optimization mastered, you're ready to:
- Chapter 7: Custom Components - Extend Haystack with custom functionality
- Chapter 8: Production Deployment - Deploy optimized systems at scale
Ready to build custom Haystack components? Continue to Chapter 7: Custom Components! 🚀
Most teams struggle here because the hard part is not writing more code, but deciding clear boundaries for self, results, query so behavior stays predictable as complexity grows.
In practical terms, this chapter helps you avoid three common failures:
- coupling core logic too tightly to one implementation path
- missing the handoff boundaries between setup, execution, and validation
- shipping changes without clear rollback or observability strategy
After working through this chapter, you should be able to reason about Chapter 6: Evaluation & Optimization as an operating subsystem inside Haystack: Deep Dive Tutorial, with explicit contracts for inputs, state transitions, and outputs.
Use the implementation notes around mean, print, predictions as your checklist when adapting these patterns to your own repository.
Under the hood, Chapter 6: Evaluation & Optimization usually follows a repeatable control path:
- Context bootstrap: initialize runtime config and prerequisites for
self. - Input normalization: shape incoming data so
resultsreceives stable contracts. - Core execution: run the main logic branch and propagate intermediate state through
query. - Policy and safety checks: enforce limits, auth scopes, and failure boundaries.
- Output composition: return canonical result payloads for downstream consumers.
- Operational telemetry: emit logs/metrics needed for debugging and performance tuning.
When debugging, walk this sequence in order and confirm each stage has explicit success/failure conditions.
Use the following upstream sources to verify implementation details while reading this chapter:
- Haystack
Why it matters: authoritative reference on
Haystack(github.com).
Suggested trace strategy:
- search upstream code for
selfandresultsto map concrete implementation paths - compare docs claims against actual runtime/config code before reusing patterns in production