A Model Context Protocol (MCP) server that manages and orchestrates multiple LLM agents as task runners for executing large numbers of similar jobs. While designed with Claude Code in mind, the system is agent-agnostic and can work with any LLM agent capable of connecting via MCP protocol.
- Runtime: Bun
- Language: TypeScript
- Protocol: MCP (supporting both stdio and HTTP)
- MCP SDK: https://github.com/modelcontextprotocol/typescript-sdk (REQUIRED)
- Creation: Projects can be created, listed, closed, and joined
- Purpose: Each project contains a collection of tasks to be executed
- Summary: Projects maintain metadata including purpose, task count, and completion status
- Identification: Agents identify themselves by name when joining a project
- Exclusivity: Only one agent per name per project can be connected at a time
- Workflow: Agents request tasks, execute them, and report results
- Assignment: Tasks are assigned to agents on-demand (one task per agent at a time)
- Status Tracking: Tasks can be completed, failed, or retried
- History: Failed tasks maintain attempt history for debugging
- Queue Processing: Tasks are processed in FIFO order
- Basic Tasks: Simple tasks with direct instructions
- Template Tasks: Tasks with instruction templates and variable substitution
- Templates define allowed variables using
{{variable}}syntax - Tasks specify variable values during creation
- Final instructions are generated by interpolating variables into templates
- Only the final interpolated instructions are stored in the task (not the template)
- Templates define allowed variables using
- Project Selection:
use project proj1- Response: Project summary, purpose, task count
- Agent Registration:
I am agent <name>orgive me a name- System assigns/validates agent name
- Task Request:
give me a task- If agent has current task: Return current task details
- If no current task: Assign next available task
- Task Completion: Agent reports success/failure with explanation
- Success: Task marked complete
- Failure: Task marked failed, agent can specify retry eligibility
- Next Task: Agent requests another task (loop back to step 3)
Q1: Task distribution workflow ✓ Answered: Project-based task management with agent registration and FIFO task assignment
Q2: Expected scale ✓ Answered: Individual scale - maximum 5-10 agents per project at any given time
Q3: Types of tasks ✓ Answered: General LLM agent tasks (e.g., "use this MCP to get email thread, process and summarize it, write summary to filesystem at ____")
- Tasks use template variables (e.g., "thread id")
- Tasks are instructions that any compatible LLM agent can execute
Q4: Task result handling ✓ Answered: Store only success/failure status and explanations for audit purposes
- No collection of output/artifacts
- Results should support summary and detailed audit capabilities
Q5: LLM agent connection method ✓ Answered: Support both stdio and HTTP approaches
- stdio: MCP server spawns LLM agent processes directly
- HTTP: LLM agent instances connect to MCP server as clients
- Sessions: HTTP mode requires session tracking via shared provider (not in-memory)
- Persistence: Session state must survive server restarts
Q6: Session/data persistence ✓ Answered: Mode-dependent storage strategy
- HTTP mode: Redis or MongoDB for shared session state
- stdio mode: File-based storage (no external service dependency)
- Rationale: Match storage complexity to deployment complexity
Q9: File-based storage implementation ✓ Answered: JSON-based file storage with proper concurrency handling
- Format: JSON files for human readability and debugging
- Structure: Separate files per project (data/projects/{projectId}.json)
- Atomicity: Atomic writes using temp files + rename
- Concurrency: Proper file locking using flock/fcntl system calls
- Race condition solution: Use advisory locks, not custom stat checking
Q7: Advanced features ✓ Answered: Keep it simple for now
- FIFO processing: Basic first-in-first-out task queue
- Basic retry: Agent-controlled retry decisions on task failure
- No prioritization: All tasks treated equally
- No load balancing: Simple round-robin task assignment
Q8: Primary clients ✓ Answered: Multi-client architecture
- Primary: LLM agent instances (task agents)
- Secondary: CLI tool for status checking and direct manipulation
- Future: Web interface for HTTP mode (with user management)
- Design: MCP tools/resources optimized for programmatic access
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ LLM Agent │ │ LLM Agent │ │ LLM Agent │
│ (Agent 1) │ │ (Agent 2) │ │ (Agent N) │
└─────────┬───────┘ └─────────┬───────┘ └─────────┬───────┘
│ │ │
│ MCP Protocol │
│ │ │
└──────────────────────┼──────────────────────┘
│
┌─────────────────────────────────┐
│ TaskDriver System │
│ │
│ ┌─────────────────────────────┐│
│ │ Core Service Layer ││
│ │ - ProjectService ││
│ │ - TaskService ││
│ │ - AgentService ││
│ │ - TaskTypeService ││
│ └─────────────────────────────┘│
│ │
│ ┌─────────────────────────────┐│
│ │ Storage Interface ││
│ │ - Pluggable Providers ││
│ │ - Common Operations ││
│ └─────────────────────────────┘│
└─────────────────────────────────┘
│ │
┌─────────────┐ ┌─────────────┐
│ MCP Server │ │ CLI Tool │
│ │ │ │
│ - MCP Tools │ │ - Commands │
│ - Transport │ │ - Parsing │
│ - Protocol │ │ - Output │
└─────────────┘ └─────────────┘
│
┌─────────────────────────────┐
│ Storage Providers │
│ │
│ FileProvider (JSON) │
│ MongoProvider │
│ RedisProvider │
└─────────────────────────────┘
interface Project {
id: string;
name: string;
description: string;
status: 'active' | 'closed';
createdAt: Date;
updatedAt: Date;
taskTypes: TaskType[];
config: {
defaultMaxRetries: number; // Default retry count for new task types
defaultLeaseDurationMinutes: number; // Default lease duration for new task types
reaperIntervalMinutes: number; // How often to check for expired leases
};
stats: {
totalTasks: number;
completedTasks: number;
failedTasks: number;
queuedTasks: number;
runningTasks: number;
};
}interface TaskType {
id: string;
name: string;
template?: string; // Template with variables like {{threadId}}
variables?: string[]; // List of required variables
duplicateHandling: 'ignore' | 'fail' | 'allow'; // How to handle duplicate variable combinations
maxRetries: number; // Maximum server-side retries for failed tasks
leaseDurationMinutes: number; // How long agents have to complete tasks
}interface Task {
id: string;
projectId: string;
typeId: string;
instructions: string; // Final instructions (template filled)
variables?: Record<string, string>; // Variable values
status: 'queued' | 'running' | 'completed' | 'failed';
assignedTo?: string; // Agent name
leaseExpiresAt?: Date; // When current lease expires (for running tasks)
retryCount: number; // Number of times this task has been retried
maxRetries: number; // Maximum retries allowed (from task type or project default)
createdAt: Date;
assignedAt?: Date;
completedAt?: Date;
attempts: TaskAttempt[];
}interface TaskAttempt {
id: string;
agentName: string;
startedAt: Date;
completedAt?: Date;
status: 'running' | 'completed' | 'failed' | 'timeout';
explanation?: string;
failureReason?: 'agent_reported' | 'timeout' | 'server_error';
leaseExpiresAt?: Date; // When this attempt's lease expires
}interface Agent {
name: string;
projectId: string;
sessionId: string;
status: 'idle' | 'working';
currentTaskId?: string;
apiKey: string; // Authentication token for this agent
lastSeen: Date;
connectedAt: Date;
}// Shared business logic used by both MCP and CLI
class ProjectService {
constructor(private storage: StorageProvider) {}
async createProject(name: string, description?: string): Promise<Project> { ... }
async listProjects(includeClosed = false): Promise<Project[]> { ... }
async getProject(projectId: string): Promise<Project | null> { ... }
async closeProject(projectId: string): Promise<void> { ... }
async getProjectStatus(projectId: string): Promise<ProjectStatus> { ... }
}
class TaskService {
constructor(private storage: StorageProvider) {}
async addTask(projectId: string, typeId: string, instructions: string, variables?: Record<string, string>): Promise<Task> {
// Template Processing:
// 1. If task type has template: validate variables and interpolate template to create final instructions
// 2. If task type has no template: use provided instructions directly
// 3. Store only final interpolated instructions in the task (never the template)
// Check for duplicate variable combinations based on TaskType.duplicateHandling
// - 'ignore': Return existing task if found, don't create new one
// - 'fail': Throw error if duplicate found
// - 'allow': Always create new task regardless of duplicates
}
async createTasksBulk(projectId: string, tasks: TaskInput[]): Promise<{
tasksCreated: number;
errors: string[];
createdTasks: Task[];
}> {
// Bulk creation without batch tracking
// - Creates each task individually with proper template processing
// - Returns summary of successful/failed task creations
// - Maximum 1000 tasks per request
// - Continues processing even if some tasks fail
}
async getTask(taskId: string): Promise<Task | null> { ... }
async listTasks(projectId: string, filters?: TaskFilters): Promise<Task[]> { ... }
async getTaskHistory(taskId: string): Promise<TaskAttempt[]> { ... }
}
class AgentService {
constructor(private storage: StorageProvider) {}
async registerAgent(projectId: string, agentName?: string): Promise<{ agent: Agent, apiKey: string }> {
// Generate unique API key for authentication
// Associate agent with specific project(s)
}
async getCurrentTask(projectId: string, agentName: string): Promise<Task | null> { ... }
async requestTask(projectId: string, agentName: string): Promise<Task | null> {
// ATOMIC: Find queued task, set status to 'running', set leaseExpiresAt
// Use backend-native atomic operations (findOneAndUpdate, RPOPLPUSH)
}
async completeTask(taskId: string, agentName: string, explanation: string): Promise<void> {
// Mark task as completed, record final attempt
// Agent-controlled retries should be exhausted before calling this
}
async failTask(taskId: string, agentName: string, explanation: string, canRetry: boolean = true): Promise<void> {
// Mark task as failed
// If canRetry=false, mark as permanently failed regardless of retry count
// If canRetry=true, let server-side retry logic handle requeuing based on maxRetries
}
async extendLease(taskId: string, agentName: string, additionalMinutes: number): Promise<void> {
// Extend lease for long-running tasks
}
async getAgentStatus(projectId: string, agentName: string): Promise<Agent | null> { ... }
}
class TaskTypeService {
constructor(private storage: StorageProvider) {}
async createTaskType(
projectId: string,
name: string,
template?: string,
variables?: string[],
duplicateHandling: 'ignore' | 'fail' | 'allow' = 'allow',
maxRetries?: number, // Defaults to project setting
leaseDurationMinutes?: number // Defaults to project setting
): Promise<TaskType> { ... }
async listTaskTypes(projectId: string): Promise<TaskType[]> { ... }
async getTaskType(projectId: string, typeId: string): Promise<TaskType | null> { ... }
}
class ReaperService {
constructor(private storage: StorageProvider) {}
async processExpiredLeases(): Promise<{ requeuedTasks: number, failedTasks: number }> {
// Find all tasks where status='running' AND leaseExpiresAt < now()
// For each expired task:
// - If retryCount < maxRetries: increment retryCount, set status='queued', clear lease
// - If retryCount >= maxRetries: set status='failed', record timeout reason
// Use proper coordination for multi-instance deployments
}
async startReaper(intervalMinutes: number = 1): Promise<void> {
// Background process that runs processExpiredLeases() periodically
// Handles graceful shutdown via SIGINT/SIGTERM
}
}interface StorageProvider {
// Project operations
createProject(project: Project): Promise<void>;
getProject(projectId: string): Promise<Project | null>;
updateProject(project: Project): Promise<void>;
listProjects(includeClosed?: boolean): Promise<Project[]>;
deleteProject(projectId: string): Promise<void>;
// Task operations
addTask(task: Task): Promise<void>;
getTask(taskId: string): Promise<Task | null>;
updateTask(task: Task): Promise<void>;
listTasks(projectId: string, filters?: TaskFilters): Promise<Task[]>;
// Agent operations
registerAgent(agent: Agent): Promise<void>;
getAgent(projectId: string, agentName: string): Promise<Agent | null>;
updateAgent(agent: Agent): Promise<void>;
listAgents(projectId: string): Promise<Agent[]>;
removeAgent(projectId: string, agentName: string): Promise<void>;
// Atomic operations for task assignment - CRITICAL: Must be atomic across all providers
assignTask(projectId: string, agentName: string): Promise<Task | null>;
completeTask(taskId: string, result: TaskResult): Promise<void>;
failTask(taskId: string, result: TaskResult): Promise<void>;
// Lease management operations
findExpiredLeases(): Promise<Task[]>;
requeueTask(taskId: string): Promise<void>;
extendLease(taskId: string, additionalMinutes: number): Promise<void>;
}
// Storage provider implementations with atomic task assignment
class FileStorageProvider implements StorageProvider {
// Uses proper file locking (flock/fcntl) for single-machine deployments
async assignTask(projectId: string, agentName: string): Promise<Task | null> {
// 1. Acquire exclusive lock on project file using flock()
// 2. Find first queued task
// 3. Atomically update task status to 'running', set assignedTo and leaseExpiresAt
// 4. Write file and release lock
// 5. Return task or null if none available
}
async findExpiredLeases(): Promise<Task[]> {
// Scan all project files for tasks with status='running' and leaseExpiresAt < now()
// Note: File provider has limited multi-instance coordination
}
}
class MongoStorageProvider implements StorageProvider {
// Uses MongoDB's findOneAndUpdate with atomic operations
async assignTask(projectId: string, agentName: string): Promise<Task | null> {
// findOneAndUpdate with filter: {status: 'queued'},
// update: {status: 'running', assignedTo: agentName, leaseExpiresAt: now() + lease_duration}
}
async findExpiredLeases(): Promise<Task[]> {
// db.tasks.find({status: 'running', leaseExpiresAt: {$lt: new Date()}})
}
}
class RedisStorageProvider implements StorageProvider {
// Uses Redis transactions (MULTI/EXEC) or Lua scripts for atomicity
async assignTask(projectId: string, agentName: string): Promise<Task | null> {
// Lua script to atomically:
// 1. RPOPLPUSH from 'project:queued' to 'project:running'
// 2. Update task hash with assignedTo and leaseExpiresAt
// 3. Return task data
}
async findExpiredLeases(): Promise<Task[]> {
// Scan running tasks using ZSCAN on lease expiry sorted set
}
}Both MCP tools and CLI commands are thin wrappers around the shared Core Service Layer
// MCP Tool Example
export const createProjectTool = {
name: 'create_project',
description: 'Create a new project',
handler: async (args: CreateProjectArgs) => {
return await projectService.createProject(args.name, args.description);
}
};
// CLI Command Example
export const createProjectCommand = {
name: 'create-project',
description: 'Create a new project',
handler: async (args: CreateProjectArgs) => {
const result = await projectService.createProject(args.name, args.description);
console.log(`Project created: ${result.id}`);
}
};All operations available through both MCP and CLI interfaces
- create_project / create-project: Create a new project
- list_projects / list-projects: List active projects (exclude closed by default, optional includeClosed parameter)
- get_project / get-project: Get project details and stats
- close_project / close-project: Mark project as closed
- join_project / join-project: Join a project (sets context)
- create_task_type / create-task-type: Create a new task type with template and duplicate handling
- list_task_types / list-task-types: List task types for current project
- get_task_type / get-task-type: Get task type details
- add_task / add-task: Add a new task to the project queue
- create_tasks_bulk / create-tasks-bulk: Create multiple tasks at once (no batch tracking)
- get_task / get-task: Get task details and history
- list_tasks / list-tasks: List tasks with filtering options
- register_agent / register-agent: Register with a name (or get auto-assigned) and receive API key
- get_current_task / get-current-task: Get currently assigned task
- request_task / request-task: Request a new task assignment (atomic lease acquisition)
- complete_task / complete-task: Mark task as completed with explanation
- fail_task / fail-task: Mark task as failed with explanation and retry flag
- extend_lease / extend-lease: Extend lease for long-running tasks
- get_project_status / get-project-status: Get detailed project status
- get_agent_status / get-agent-status: Get agent status and activity
- get_task_history / get-task-history: Get task attempt history
- get_audit_log / get-audit-log: Get audit trail for project
CLI commands use the exact same service layer as MCP tools - zero code duplication
# Project management
taskdriver create-project <name> [description]
taskdriver list-projects [--include-closed]
taskdriver get-project <project-name>
taskdriver close-project <project-name>
taskdriver join-project <project-name>
# Task type management
taskdriver create-task-type <project> <name> [template] [--duplicates=ignore|fail|allow] [--max-retries=N] [--lease-duration=N]
taskdriver list-task-types <project>
taskdriver get-task-type <project> <type-name>
# Task management
taskdriver add-task <project> <type> <instructions> [variables...]
taskdriver create-tasks-bulk <project> <tasks-file>
taskdriver get-task <task-id>
taskdriver list-tasks <project> [--status=queued|running|completed|failed]
# Agent operations
taskdriver register-agent <project> [agent-name]
taskdriver get-current-task <project> <agent-name>
taskdriver request-task <project> <agent-name>
taskdriver complete-task <task-id> <explanation>
taskdriver fail-task <task-id> <explanation> [--no-retry]
taskdriver extend-lease <task-id> <additional-minutes>
# Status and monitoring
taskdriver get-project-status <project>
taskdriver get-agent-status <project> <agent-name>
taskdriver get-task-history <task-id>
taskdriver get-audit-log <project>src/
├── index.ts # Main entry point
├── mcp.ts # MCP server implementation
├── cli.ts # CLI command implementation
├── config/
│ ├── index.ts # Configuration loading
│ └── types.ts # Configuration types
├── services/
│ ├── index.ts # Service exports
│ ├── ProjectService.ts # Project management logic
│ ├── TaskService.ts # Task management logic
│ ├── AgentService.ts # Agent management logic
│ └── TaskTypeService.ts # Task type management logic
├── storage/
│ ├── index.ts # Storage provider factory
│ ├── StorageProvider.ts # Storage interface
│ ├── FileStorageProvider.ts # JSON file storage
│ ├── MongoStorageProvider.ts # MongoDB storage
│ └── RedisStorageProvider.ts # Redis storage
├── types/
│ ├── index.ts # Type exports
│ ├── Project.ts # Project-related types
│ ├── Task.ts # Task-related types
│ ├── Agent.ts # Agent-related types
│ └── TaskType.ts # TaskType-related types
├── utils/
│ ├── index.ts # Utility exports
│ ├── fileUtils.ts # File handling utilities
│ └── validation.ts # Input validation
└── tools/
├── index.ts # MCP tool definitions
├── projectTools.ts # Project management tools
├── taskTools.ts # Task management tools
├── agentTools.ts # Agent operation tools
└── taskTypeTools.ts # Task type tools
dist/ # Build output directory
└── (compiled JavaScript files)
- Basic MCP server with TypeScript SDK
- Storage interface with pluggable providers
- Common StorageProvider interface with atomic operations
- FileStorageProvider with proper file locking (flock/fcntl)
- Configuration via environment variables
- Minimum Viable Lease system
- Task states:
queued→running→completed/failed - Atomic task assignment with
leaseExpiresAt - ReaperService for timeout handling
- Task states:
- Project-scoped agent authentication
- API key generation and validation
- Agent registration with project association
- Graceful shutdown handling (SIGINT/SIGTERM)
- Core service layer implementation
- Server-side retry policy with configurable
maxRetries - Hybrid retry model (agent + server controlled)
- Structured logging with correlation IDs
- Basic metrics endpoint (
/metrics) for queue depth and task counts - MongoStorageProvider and RedisStorageProvider implementations
- HTTP transport layer with session management
- Lease extension for long-running tasks
- CLI tool with full feature parity
- Multi-instance coordination for reaper service
- Web interface for HTTP mode
- User management and multi-tenancy
- Advanced monitoring and alerting
- Task prioritization and scheduling
- Result artifact handling (external storage URLs)
- Minimum Viable Lease: Simple timeout-based approach without heartbeating
- Clock skew tolerance: Assumes clocks accurate within 5 seconds max
- Hybrid retries: Agents handle transient errors, server handles infrastructure failures
- Atomic operations: Use backend-native primitives (findOneAndUpdate, RPOPLPUSH, flock)
- Project-scoped API keys: Each agent authenticated to specific project(s)
- No task cross-contamination: Agents cannot access tasks from other projects
- Configuration security: All secrets via environment variables, never in code
- Graceful shutdown: Preserve task state during server restarts
- Structured logging: JSON format with
task_id,project_id,agent_idcorrelation - Metrics exposure: Prometheus-compatible
/metricsendpoint - Audit trail: Full task attempt history with failure reasons