Skip to content

Latest commit

 

History

History
626 lines (543 loc) · 26.4 KB

File metadata and controls

626 lines (543 loc) · 26.4 KB

TaskDriver MCP Server Specification

Overview

A Model Context Protocol (MCP) server that manages and orchestrates multiple LLM agents as task runners for executing large numbers of similar jobs. While designed with Claude Code in mind, the system is agent-agnostic and can work with any LLM agent capable of connecting via MCP protocol.

Technology Stack

Core Concepts

Projects

  • Creation: Projects can be created, listed, closed, and joined
  • Purpose: Each project contains a collection of tasks to be executed
  • Summary: Projects maintain metadata including purpose, task count, and completion status

Task Agents

  • Identification: Agents identify themselves by name when joining a project
  • Exclusivity: Only one agent per name per project can be connected at a time
  • Workflow: Agents request tasks, execute them, and report results

Task Management

  • Assignment: Tasks are assigned to agents on-demand (one task per agent at a time)
  • Status Tracking: Tasks can be completed, failed, or retried
  • History: Failed tasks maintain attempt history for debugging
  • Queue Processing: Tasks are processed in FIFO order

Task Types

  • Basic Tasks: Simple tasks with direct instructions
  • Template Tasks: Tasks with instruction templates and variable substitution
    • Templates define allowed variables using {{variable}} syntax
    • Tasks specify variable values during creation
    • Final instructions are generated by interpolating variables into templates
    • Only the final interpolated instructions are stored in the task (not the template)

Agent Workflow

  1. Project Selection: use project proj1
    • Response: Project summary, purpose, task count
  2. Agent Registration: I am agent <name> or give me a name
    • System assigns/validates agent name
  3. Task Request: give me a task
    • If agent has current task: Return current task details
    • If no current task: Assign next available task
  4. Task Completion: Agent reports success/failure with explanation
    • Success: Task marked complete
    • Failure: Task marked failed, agent can specify retry eligibility
  5. Next Task: Agent requests another task (loop back to step 3)

Requirements Gathering

Questions and Answers

Q1: Task distribution workflowAnswered: Project-based task management with agent registration and FIFO task assignment

Q2: Expected scaleAnswered: Individual scale - maximum 5-10 agents per project at any given time

Q3: Types of tasksAnswered: General LLM agent tasks (e.g., "use this MCP to get email thread, process and summarize it, write summary to filesystem at ____")

  • Tasks use template variables (e.g., "thread id")
  • Tasks are instructions that any compatible LLM agent can execute

Q4: Task result handlingAnswered: Store only success/failure status and explanations for audit purposes

  • No collection of output/artifacts
  • Results should support summary and detailed audit capabilities

Q5: LLM agent connection methodAnswered: Support both stdio and HTTP approaches

  • stdio: MCP server spawns LLM agent processes directly
  • HTTP: LLM agent instances connect to MCP server as clients
  • Sessions: HTTP mode requires session tracking via shared provider (not in-memory)
  • Persistence: Session state must survive server restarts

Q6: Session/data persistenceAnswered: Mode-dependent storage strategy

  • HTTP mode: Redis or MongoDB for shared session state
  • stdio mode: File-based storage (no external service dependency)
  • Rationale: Match storage complexity to deployment complexity

Q9: File-based storage implementationAnswered: JSON-based file storage with proper concurrency handling

  • Format: JSON files for human readability and debugging
  • Structure: Separate files per project (data/projects/{projectId}.json)
  • Atomicity: Atomic writes using temp files + rename
  • Concurrency: Proper file locking using flock/fcntl system calls
  • Race condition solution: Use advisory locks, not custom stat checking

Q7: Advanced featuresAnswered: Keep it simple for now

  • FIFO processing: Basic first-in-first-out task queue
  • Basic retry: Agent-controlled retry decisions on task failure
  • No prioritization: All tasks treated equally
  • No load balancing: Simple round-robin task assignment

Q8: Primary clientsAnswered: Multi-client architecture

  • Primary: LLM agent instances (task agents)
  • Secondary: CLI tool for status checking and direct manipulation
  • Future: Web interface for HTTP mode (with user management)
  • Design: MCP tools/resources optimized for programmatic access

Architecture

System Components

┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│   LLM Agent     │    │   LLM Agent     │    │   LLM Agent     │
│   (Agent 1)     │    │   (Agent 2)     │    │   (Agent N)     │
└─────────┬───────┘    └─────────┬───────┘    └─────────┬───────┘
          │                      │                      │
          │              MCP Protocol                   │
          │                      │                      │
          └──────────────────────┼──────────────────────┘
                                 │
                  ┌─────────────────────────────────┐
                  │     TaskDriver System           │
                  │                                 │
                  │  ┌─────────────────────────────┐│
                  │  │     Core Service Layer      ││
                  │  │   - ProjectService          ││
                  │  │   - TaskService             ││
                  │  │   - AgentService            ││
                  │  │   - TaskTypeService         ││
                  │  └─────────────────────────────┘│
                  │                                 │
                  │  ┌─────────────────────────────┐│
                  │  │     Storage Interface       ││
                  │  │   - Pluggable Providers     ││
                  │  │   - Common Operations       ││
                  │  └─────────────────────────────┘│
                  └─────────────────────────────────┘
                           │                │
                  ┌─────────────┐  ┌─────────────┐
                  │ MCP Server  │  │ CLI Tool    │
                  │             │  │             │
                  │ - MCP Tools │  │ - Commands  │
                  │ - Transport │  │ - Parsing   │
                  │ - Protocol  │  │ - Output    │
                  └─────────────┘  └─────────────┘
                                 │
                    ┌─────────────────────────────┐
                    │     Storage Providers       │
                    │                             │
                    │  FileProvider (JSON)        │
                    │  MongoProvider              │
                    │  RedisProvider              │
                    └─────────────────────────────┘

Data Models

Project

interface Project {
  id: string;
  name: string;
  description: string;
  status: 'active' | 'closed';
  createdAt: Date;
  updatedAt: Date;
  taskTypes: TaskType[];
  config: {
    defaultMaxRetries: number;  // Default retry count for new task types
    defaultLeaseDurationMinutes: number;  // Default lease duration for new task types
    reaperIntervalMinutes: number;  // How often to check for expired leases
  };
  stats: {
    totalTasks: number;
    completedTasks: number;
    failedTasks: number;
    queuedTasks: number;
    runningTasks: number;
  };
}

TaskType

interface TaskType {
  id: string;
  name: string;
  template?: string;  // Template with variables like {{threadId}}
  variables?: string[];  // List of required variables
  duplicateHandling: 'ignore' | 'fail' | 'allow';  // How to handle duplicate variable combinations
  maxRetries: number;  // Maximum server-side retries for failed tasks
  leaseDurationMinutes: number;  // How long agents have to complete tasks
}

Task

interface Task {
  id: string;
  projectId: string;
  typeId: string;
  instructions: string;  // Final instructions (template filled)
  variables?: Record<string, string>;  // Variable values
  status: 'queued' | 'running' | 'completed' | 'failed';
  assignedTo?: string;  // Agent name
  leaseExpiresAt?: Date;  // When current lease expires (for running tasks)
  retryCount: number;  // Number of times this task has been retried
  maxRetries: number;  // Maximum retries allowed (from task type or project default)
  createdAt: Date;
  assignedAt?: Date;
  completedAt?: Date;
  attempts: TaskAttempt[];
}

TaskAttempt

interface TaskAttempt {
  id: string;
  agentName: string;
  startedAt: Date;
  completedAt?: Date;
  status: 'running' | 'completed' | 'failed' | 'timeout';
  explanation?: string;
  failureReason?: 'agent_reported' | 'timeout' | 'server_error';
  leaseExpiresAt?: Date;  // When this attempt's lease expires
}

Agent

interface Agent {
  name: string;
  projectId: string;
  sessionId: string;
  status: 'idle' | 'working';
  currentTaskId?: string;
  apiKey: string;  // Authentication token for this agent
  lastSeen: Date;
  connectedAt: Date;
}

Core Service Layer

// Shared business logic used by both MCP and CLI
class ProjectService {
  constructor(private storage: StorageProvider) {}
  
  async createProject(name: string, description?: string): Promise<Project> { ... }
  async listProjects(includeClosed = false): Promise<Project[]> { ... }
  async getProject(projectId: string): Promise<Project | null> { ... }
  async closeProject(projectId: string): Promise<void> { ... }
  async getProjectStatus(projectId: string): Promise<ProjectStatus> { ... }
}

class TaskService {
  constructor(private storage: StorageProvider) {}
  
  async addTask(projectId: string, typeId: string, instructions: string, variables?: Record<string, string>): Promise<Task> { 
    // Template Processing:
    // 1. If task type has template: validate variables and interpolate template to create final instructions
    // 2. If task type has no template: use provided instructions directly
    // 3. Store only final interpolated instructions in the task (never the template)
    // Check for duplicate variable combinations based on TaskType.duplicateHandling
    // - 'ignore': Return existing task if found, don't create new one
    // - 'fail': Throw error if duplicate found
    // - 'allow': Always create new task regardless of duplicates
  }
  async createTasksBulk(projectId: string, tasks: TaskInput[]): Promise<{
    tasksCreated: number;
    errors: string[];
    createdTasks: Task[];
  }> {
    // Bulk creation without batch tracking
    // - Creates each task individually with proper template processing
    // - Returns summary of successful/failed task creations
    // - Maximum 1000 tasks per request
    // - Continues processing even if some tasks fail
  }
  async getTask(taskId: string): Promise<Task | null> { ... }
  async listTasks(projectId: string, filters?: TaskFilters): Promise<Task[]> { ... }
  async getTaskHistory(taskId: string): Promise<TaskAttempt[]> { ... }
}

class AgentService {
  constructor(private storage: StorageProvider) {}
  
  async registerAgent(projectId: string, agentName?: string): Promise<{ agent: Agent, apiKey: string }> { 
    // Generate unique API key for authentication
    // Associate agent with specific project(s)
  }
  async getCurrentTask(projectId: string, agentName: string): Promise<Task | null> { ... }
  async requestTask(projectId: string, agentName: string): Promise<Task | null> { 
    // ATOMIC: Find queued task, set status to 'running', set leaseExpiresAt
    // Use backend-native atomic operations (findOneAndUpdate, RPOPLPUSH)
  }
  async completeTask(taskId: string, agentName: string, explanation: string): Promise<void> { 
    // Mark task as completed, record final attempt
    // Agent-controlled retries should be exhausted before calling this
  }
  async failTask(taskId: string, agentName: string, explanation: string, canRetry: boolean = true): Promise<void> { 
    // Mark task as failed
    // If canRetry=false, mark as permanently failed regardless of retry count
    // If canRetry=true, let server-side retry logic handle requeuing based on maxRetries
  }
  async extendLease(taskId: string, agentName: string, additionalMinutes: number): Promise<void> { 
    // Extend lease for long-running tasks
  }
  async getAgentStatus(projectId: string, agentName: string): Promise<Agent | null> { ... }
}

class TaskTypeService {
  constructor(private storage: StorageProvider) {}
  
  async createTaskType(
    projectId: string, 
    name: string, 
    template?: string, 
    variables?: string[], 
    duplicateHandling: 'ignore' | 'fail' | 'allow' = 'allow',
    maxRetries?: number,  // Defaults to project setting
    leaseDurationMinutes?: number  // Defaults to project setting
  ): Promise<TaskType> { ... }
  async listTaskTypes(projectId: string): Promise<TaskType[]> { ... }
  async getTaskType(projectId: string, typeId: string): Promise<TaskType | null> { ... }
}

class ReaperService {
  constructor(private storage: StorageProvider) {}
  
  async processExpiredLeases(): Promise<{ requeuedTasks: number, failedTasks: number }> {
    // Find all tasks where status='running' AND leaseExpiresAt < now()
    // For each expired task:
    //   - If retryCount < maxRetries: increment retryCount, set status='queued', clear lease
    //   - If retryCount >= maxRetries: set status='failed', record timeout reason
    // Use proper coordination for multi-instance deployments
  }
  
  async startReaper(intervalMinutes: number = 1): Promise<void> {
    // Background process that runs processExpiredLeases() periodically
    // Handles graceful shutdown via SIGINT/SIGTERM
  }
}

Storage Interface

interface StorageProvider {
  // Project operations
  createProject(project: Project): Promise<void>;
  getProject(projectId: string): Promise<Project | null>;
  updateProject(project: Project): Promise<void>;
  listProjects(includeClosed?: boolean): Promise<Project[]>;
  deleteProject(projectId: string): Promise<void>;

  // Task operations
  addTask(task: Task): Promise<void>;
  getTask(taskId: string): Promise<Task | null>;
  updateTask(task: Task): Promise<void>;
  listTasks(projectId: string, filters?: TaskFilters): Promise<Task[]>;
  
  // Agent operations
  registerAgent(agent: Agent): Promise<void>;
  getAgent(projectId: string, agentName: string): Promise<Agent | null>;
  updateAgent(agent: Agent): Promise<void>;
  listAgents(projectId: string): Promise<Agent[]>;
  removeAgent(projectId: string, agentName: string): Promise<void>;

  // Atomic operations for task assignment - CRITICAL: Must be atomic across all providers
  assignTask(projectId: string, agentName: string): Promise<Task | null>;
  completeTask(taskId: string, result: TaskResult): Promise<void>;
  failTask(taskId: string, result: TaskResult): Promise<void>;
  
  // Lease management operations
  findExpiredLeases(): Promise<Task[]>;
  requeueTask(taskId: string): Promise<void>;
  extendLease(taskId: string, additionalMinutes: number): Promise<void>;
  
}

// Storage provider implementations with atomic task assignment
class FileStorageProvider implements StorageProvider { 
  // Uses proper file locking (flock/fcntl) for single-machine deployments
  async assignTask(projectId: string, agentName: string): Promise<Task | null> {
    // 1. Acquire exclusive lock on project file using flock()
    // 2. Find first queued task
    // 3. Atomically update task status to 'running', set assignedTo and leaseExpiresAt
    // 4. Write file and release lock
    // 5. Return task or null if none available
  }
  
  async findExpiredLeases(): Promise<Task[]> {
    // Scan all project files for tasks with status='running' and leaseExpiresAt < now()
    // Note: File provider has limited multi-instance coordination
  }
}

class MongoStorageProvider implements StorageProvider { 
  // Uses MongoDB's findOneAndUpdate with atomic operations
  async assignTask(projectId: string, agentName: string): Promise<Task | null> {
    // findOneAndUpdate with filter: {status: 'queued'}, 
    // update: {status: 'running', assignedTo: agentName, leaseExpiresAt: now() + lease_duration}
  }
  
  async findExpiredLeases(): Promise<Task[]> {
    // db.tasks.find({status: 'running', leaseExpiresAt: {$lt: new Date()}})
  }
}

class RedisStorageProvider implements StorageProvider { 
  // Uses Redis transactions (MULTI/EXEC) or Lua scripts for atomicity
  async assignTask(projectId: string, agentName: string): Promise<Task | null> {
    // Lua script to atomically:
    // 1. RPOPLPUSH from 'project:queued' to 'project:running'
    // 2. Update task hash with assignedTo and leaseExpiresAt
    // 3. Return task data
  }
  
  async findExpiredLeases(): Promise<Task[]> {
    // Scan running tasks using ZSCAN on lease expiry sorted set
  }
}

MCP Tools and CLI Commands

Both MCP tools and CLI commands are thin wrappers around the shared Core Service Layer

Implementation Pattern

// MCP Tool Example
export const createProjectTool = {
  name: 'create_project',
  description: 'Create a new project',
  handler: async (args: CreateProjectArgs) => {
    return await projectService.createProject(args.name, args.description);
  }
};

// CLI Command Example  
export const createProjectCommand = {
  name: 'create-project',
  description: 'Create a new project',
  handler: async (args: CreateProjectArgs) => {
    const result = await projectService.createProject(args.name, args.description);
    console.log(`Project created: ${result.id}`);
  }
};

Available Operations

All operations available through both MCP and CLI interfaces

Project Management

  • create_project / create-project: Create a new project
  • list_projects / list-projects: List active projects (exclude closed by default, optional includeClosed parameter)
  • get_project / get-project: Get project details and stats
  • close_project / close-project: Mark project as closed
  • join_project / join-project: Join a project (sets context)

Task Type Management

  • create_task_type / create-task-type: Create a new task type with template and duplicate handling
  • list_task_types / list-task-types: List task types for current project
  • get_task_type / get-task-type: Get task type details

Task Management

  • add_task / add-task: Add a new task to the project queue
  • create_tasks_bulk / create-tasks-bulk: Create multiple tasks at once (no batch tracking)
  • get_task / get-task: Get task details and history
  • list_tasks / list-tasks: List tasks with filtering options

Agent Operations

  • register_agent / register-agent: Register with a name (or get auto-assigned) and receive API key
  • get_current_task / get-current-task: Get currently assigned task
  • request_task / request-task: Request a new task assignment (atomic lease acquisition)
  • complete_task / complete-task: Mark task as completed with explanation
  • fail_task / fail-task: Mark task as failed with explanation and retry flag
  • extend_lease / extend-lease: Extend lease for long-running tasks

Status and Monitoring

  • get_project_status / get-project-status: Get detailed project status
  • get_agent_status / get-agent-status: Get agent status and activity
  • get_task_history / get-task-history: Get task attempt history
  • get_audit_log / get-audit-log: Get audit trail for project

CLI Tool

Commands

CLI commands use the exact same service layer as MCP tools - zero code duplication

# Project management
taskdriver create-project <name> [description]
taskdriver list-projects [--include-closed]
taskdriver get-project <project-name>
taskdriver close-project <project-name>
taskdriver join-project <project-name>

# Task type management
taskdriver create-task-type <project> <name> [template] [--duplicates=ignore|fail|allow] [--max-retries=N] [--lease-duration=N]
taskdriver list-task-types <project>
taskdriver get-task-type <project> <type-name>

# Task management
taskdriver add-task <project> <type> <instructions> [variables...]
taskdriver create-tasks-bulk <project> <tasks-file>
taskdriver get-task <task-id>
taskdriver list-tasks <project> [--status=queued|running|completed|failed]

# Agent operations
taskdriver register-agent <project> [agent-name]
taskdriver get-current-task <project> <agent-name>
taskdriver request-task <project> <agent-name>
taskdriver complete-task <task-id> <explanation>
taskdriver fail-task <task-id> <explanation> [--no-retry]
taskdriver extend-lease <task-id> <additional-minutes>

# Status and monitoring
taskdriver get-project-status <project>
taskdriver get-agent-status <project> <agent-name>
taskdriver get-task-history <task-id>
taskdriver get-audit-log <project>

Project Structure

src/
├── index.ts                    # Main entry point
├── mcp.ts                      # MCP server implementation
├── cli.ts                      # CLI command implementation
├── config/
│   ├── index.ts               # Configuration loading
│   └── types.ts               # Configuration types
├── services/
│   ├── index.ts               # Service exports
│   ├── ProjectService.ts      # Project management logic
│   ├── TaskService.ts         # Task management logic
│   ├── AgentService.ts        # Agent management logic
│   └── TaskTypeService.ts     # Task type management logic
├── storage/
│   ├── index.ts               # Storage provider factory
│   ├── StorageProvider.ts     # Storage interface
│   ├── FileStorageProvider.ts # JSON file storage
│   ├── MongoStorageProvider.ts # MongoDB storage
│   └── RedisStorageProvider.ts # Redis storage
├── types/
│   ├── index.ts               # Type exports
│   ├── Project.ts             # Project-related types
│   ├── Task.ts                # Task-related types
│   ├── Agent.ts               # Agent-related types
│   └── TaskType.ts            # TaskType-related types
├── utils/
│   ├── index.ts               # Utility exports
│   ├── fileUtils.ts           # File handling utilities
│   └── validation.ts          # Input validation
└── tools/
    ├── index.ts               # MCP tool definitions
    ├── projectTools.ts        # Project management tools
    ├── taskTools.ts           # Task management tools
    ├── agentTools.ts          # Agent operation tools
    └── taskTypeTools.ts       # Task type tools

dist/                           # Build output directory
└── (compiled JavaScript files)

Implementation Priority

Phase 1: Core Reliability (Must-Have for V1)

  1. Basic MCP server with TypeScript SDK
  2. Storage interface with pluggable providers
    • Common StorageProvider interface with atomic operations
    • FileStorageProvider with proper file locking (flock/fcntl)
    • Configuration via environment variables
  3. Minimum Viable Lease system
    • Task states: queuedrunningcompleted/failed
    • Atomic task assignment with leaseExpiresAt
    • ReaperService for timeout handling
  4. Project-scoped agent authentication
    • API key generation and validation
    • Agent registration with project association
  5. Graceful shutdown handling (SIGINT/SIGTERM)
  6. Core service layer implementation

Phase 1.1: Production Readiness

  1. Server-side retry policy with configurable maxRetries
  2. Hybrid retry model (agent + server controlled)
  3. Structured logging with correlation IDs
  4. Basic metrics endpoint (/metrics) for queue depth and task counts
  5. MongoStorageProvider and RedisStorageProvider implementations

Phase 2: Usability & Scalability

  1. HTTP transport layer with session management
  2. Lease extension for long-running tasks
  3. CLI tool with full feature parity
  4. Multi-instance coordination for reaper service

Phase 3: Advanced Features

  1. Web interface for HTTP mode
  2. User management and multi-tenancy
  3. Advanced monitoring and alerting
  4. Task prioritization and scheduling
  5. Result artifact handling (external storage URLs)

Critical Design Decisions

Task Lifecycle & Lease Management

  • Minimum Viable Lease: Simple timeout-based approach without heartbeating
  • Clock skew tolerance: Assumes clocks accurate within 5 seconds max
  • Hybrid retries: Agents handle transient errors, server handles infrastructure failures
  • Atomic operations: Use backend-native primitives (findOneAndUpdate, RPOPLPUSH, flock)

Security & Authentication

  • Project-scoped API keys: Each agent authenticated to specific project(s)
  • No task cross-contamination: Agents cannot access tasks from other projects
  • Configuration security: All secrets via environment variables, never in code

Reliability & Observability

  • Graceful shutdown: Preserve task state during server restarts
  • Structured logging: JSON format with task_id, project_id, agent_id correlation
  • Metrics exposure: Prometheus-compatible /metrics endpoint
  • Audit trail: Full task attempt history with failure reasons