-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathengine.py
More file actions
148 lines (123 loc) · 5.09 KB
/
engine.py
File metadata and controls
148 lines (123 loc) · 5.09 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
"""
CodeFlow Engine - Core Engine Implementation
Main engine class that orchestrates CodeFlow operations.
"""
import logging
from typing import Any
from codeflow_engine.actions.registry import ActionRegistry
from codeflow_engine.ai.core.providers.manager import LLMProviderManager
from codeflow_engine.config import CodeFlowConfig
from codeflow_engine.exceptions import CodeFlowException, ConfigurationError
from codeflow_engine.health import HealthChecker
from codeflow_engine.integrations.registry import IntegrationRegistry
from codeflow_engine.utils.error_handlers import handle_operation_error
from codeflow_engine.workflows.engine import WorkflowEngine
logger = logging.getLogger(__name__)
class CodeFlowEngine:
"""
Main CodeFlow Engine class that coordinates all automation activities.
This class serves as the central orchestrator for:
- Workflow execution
- Action processing
- Integration management
- AI/LLM provider coordination
"""
def __init__(
self,
config: CodeFlowConfig | None = None,
log_handler: logging.Handler | None = None,
):
"""
Initialize the CodeFlow Engine.
Args:
config: Configuration object. If None, loads default config.
log_handler: Optional logging handler to add to the root logger.
"""
self.config = config or CodeFlowConfig()
self.workflow_engine = WorkflowEngine(self.config)
self.action_registry: ActionRegistry = ActionRegistry()
self.integration_registry = IntegrationRegistry()
self.llm_manager = LLMProviderManager(self.config)
self.health_checker = HealthChecker(self)
if log_handler:
logging.getLogger().addHandler(log_handler)
logger.info("CodeFlow Engine initialized successfully")
async def __aenter__(self) -> "CodeFlowEngine":
"""Async context manager entry."""
await self.start()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
"""Async context manager exit."""
await self.stop()
async def start(self) -> None:
"""Start the CodeFlow Engine and initialize all components."""
try:
# Validate configuration before starting
if not self.config.validate():
msg = "Invalid configuration: Missing required authentication or LLM provider keys"
logger.error(msg)
raise ConfigurationError(msg)
await self.workflow_engine.start()
await self.integration_registry.initialize()
await self.llm_manager.initialize()
logger.info("CodeFlow Engine started successfully")
except ConfigurationError:
raise
except Exception as e:
handle_operation_error("Engine startup", e, CodeFlowException)
async def stop(self) -> None:
"""Stop the CodeFlow Engine and cleanup resources."""
try:
await self.workflow_engine.stop()
await self.integration_registry.cleanup()
await self.llm_manager.cleanup()
logger.info("CodeFlow Engine stopped successfully")
except Exception as e:
handle_operation_error("Engine shutdown", e, CodeFlowException)
async def process_event(
self, event_type: str, event_data: dict[str, Any]
) -> dict[str, Any]:
"""
Process an incoming event through the workflow engine.
Args:
event_type: Type of event (e.g., 'pull_request', 'issue', 'push')
event_data: Event payload data
Returns:
Processing result dictionary
"""
try:
result = await self.workflow_engine.process_event(event_type, event_data)
logger.info(f"Successfully processed {event_type} event")
return result
except Exception as e:
handle_operation_error("Event processing", e, CodeFlowException)
raise
def get_status(self) -> dict[str, Any]:
"""
Get the current status of the CodeFlow Engine.
Returns:
Status dictionary with component information
"""
return {
"engine": "running",
"workflow_engine": self.workflow_engine.get_status(),
"actions": len(self.action_registry.get_all_actions()),
"integrations": len(self.integration_registry.get_all_integrations()),
"llm_providers": len(self.llm_manager.get_available_providers()),
"config": self.config.to_dict(),
}
def get_version(self) -> str:
"""Get the CodeFlow Engine version."""
from codeflow_engine import __version__
return __version__
async def health_check(self) -> dict[str, Any]:
"""
Perform comprehensive health check on all components.
Returns:
Health check results including overall status and component details
"""
try:
return await self.health_checker.check_all()
except Exception as e:
handle_operation_error("Health check", e, CodeFlowException)
raise