diff --git a/.github/argo-launchers/run-backtest.yaml b/.github/argo-launchers/run-backtest.yaml index ac695dd..8d9980b 100644 --- a/.github/argo-launchers/run-backtest.yaml +++ b/.github/argo-launchers/run-backtest.yaml @@ -12,6 +12,6 @@ spec: - name: image_tag value: "${IMAGE_TAG}" - name: experiment_config - value: "/usr/local/lib/python3.11/site-packages/core_runtime/argo/argo.json" + value: "/usr/local/lib/python3.11/site-packages/core_runtime/argo/bt_config_argo.json" - name: scratch_root value: "/mnt/scratch" diff --git a/README.md b/README.md index fd066af..e193d64 100644 --- a/README.md +++ b/README.md @@ -48,7 +48,7 @@ Order, State, or Risk Engine. Current local smoke is usable from the `core-runtime` repository root: ```bash -python -m core_runtime.local.backtest --config core_runtime/local/local.json +python -m core_runtime.local.backtest --config core_runtime/local/bt_config_local.json ``` Default output location: @@ -67,14 +67,7 @@ From the `core-runtime` repository root: ```bash python -m pip install -e . -python -m core_runtime.local.backtest --config core_runtime/local/local.json -``` - -If `tradingchassis_core` is not already resolvable in your environment, install `core` as a -sibling editable package in a monorepo workspace: - -```bash -python -m pip install -e ../core +python -m core_runtime.local.backtest --config core_runtime/local/bt_config_local.json ``` --- @@ -83,8 +76,8 @@ python -m pip install -e ../core | Mode | Entrypoint | Command shape | Notes | | --- | --- | --- | --- | -| Local backtest | `core_runtime/local/backtest.py` | `python -m core_runtime.local.backtest --config core_runtime/local/local.json` | Main local runner. | -| Argo plan/run orchestration | `core_runtime/backtest/runtime/entrypoint.py` | `python -m core_runtime.backtest.runtime.entrypoint --config core_runtime/argo/argo.json --plan` | Planner and sweep-context emitter for Argo flow. | +| Local backtest | `core_runtime/local/backtest.py` | `python -m core_runtime.local.backtest --config core_runtime/local/bt_config_local.json` | Main local runner. | +| Argo plan/run orchestration | `core_runtime/backtest/runtime/entrypoint.py` | `python -m core_runtime.backtest.runtime.entrypoint --config core_runtime/argo/bt_config_argo.json --plan` | Planner and sweep-context emitter for Argo flow. | | Sweep worker | `core_runtime/backtest/runtime/run_sweep.py` | `python -m core_runtime.backtest.runtime.run_sweep --context ` | Executes one sweep context. | --- @@ -94,7 +87,7 @@ python -m pip install -e ../core | Capability area | Status | Notes | | --- | --- | --- | | Canonical runtime paths | Active | `MarketEvent`, `OrderSubmittedEvent`, `ControlTimeEvent` | -| Compatibility paths | Active | Post-submission order/fill progression via snapshots, `OrderStateEvent`, and `DerivedFillEvent` | +| Runtime-local compatibility handling | Active | Raw venue order snapshots stay in runtime bookkeeping; Core receives canonical `OrderExecutionFeedbackEvent` (account-level only). | | Deferred capabilities | Deferred | Runtime `FillEvent` ingress, `ExecutionFeedbackRecordSource`, replay/storage/Event Stream persistence, `ProcessingContext` | --- @@ -115,11 +108,11 @@ python -m pip install -e ../core --- -## Compatibility paths +## Runtime-local compatibility handling -- snapshot-based post-submission progression -- `OrderStateEvent` -- `DerivedFillEvent` +- snapshot-based post-submission bookkeeping remains runtime-local +- Core ingestion uses account-level `OrderExecutionFeedbackEvent` +- no snapshot row payload is pushed into Core --- @@ -160,7 +153,7 @@ tests/ Runtime tests and deterministic fixtures Primary local config: -- `core_runtime/local/local.json` +- `core_runtime/local/bt_config_local.json` - OCI config template (for local object storage auth setups): `core_runtime/local/oci.config.example` Note: local JSON configs use cwd-relative paths for `tests/data/...` inputs and `.runtime/...` diff --git a/argo/templates/workflowtemplate-backtest-fanout.yaml b/argo/templates/workflowtemplate-backtest-fanout.yaml index a6b16d7..3d20af7 100644 --- a/argo/templates/workflowtemplate-backtest-fanout.yaml +++ b/argo/templates/workflowtemplate-backtest-fanout.yaml @@ -24,7 +24,7 @@ spec: - name: experiment_config description: "Path to experiment JSON inside the container" - value: /usr/local/lib/python3.11/site-packages/core_runtime/argo/argo.json + value: /usr/local/lib/python3.11/site-packages/core_runtime/argo/bt_config_argo.json - name: scratch_root description: "Scratch root inside the container" diff --git a/core_runtime/argo/argo.json b/core_runtime/argo/bt_config_argo.json similarity index 100% rename from core_runtime/argo/argo.json rename to core_runtime/argo/bt_config_argo.json diff --git a/core_runtime/backtest/adapters/protocols.py b/core_runtime/backtest/adapters/protocols.py index 9a8dee3..202e3cc 100644 --- a/core_runtime/backtest/adapters/protocols.py +++ b/core_runtime/backtest/adapters/protocols.py @@ -74,6 +74,23 @@ def read_orders_snapshot(self) -> tuple[Any, Any]: """Return (state_values, orders) from current snapshot boundary.""" +class VenueAdapter( + VenueEventWaiter, + VenueClock, + MarketInputSource, + OrderSnapshotSource, + Protocol, +): + """Composite runtime venue adapter contract for strategy runner.""" + + def record(self, recorder: Any) -> bool: + """Persist current simulation state. + + Returns: + True when recorder capacity is exhausted and no record was written. + """ + + class OrderSubmissionGateway(Protocol): """Outbound order command submission capability. diff --git a/core_runtime/backtest/adapters/venue.py b/core_runtime/backtest/adapters/venue.py index 33e1997..8dd20a8 100644 --- a/core_runtime/backtest/adapters/venue.py +++ b/core_runtime/backtest/adapters/venue.py @@ -8,7 +8,7 @@ if TYPE_CHECKING: from hftbacktest import ROIVectorMarketDepthBacktest -from tradingchassis_core.core.ports.venue_adapter import VenueAdapter +from core_runtime.backtest.adapters.protocols import VenueAdapter @dataclass(frozen=True) @@ -43,7 +43,14 @@ def read_orders_snapshot(self) -> tuple[Any, Any]: self.hbt.orders(self.asset_no), ) - def record(self, recorder: Any) -> None: + def record(self, recorder: Any) -> bool: """Record the current backtest state using the given recorder.""" # hftbacktest recorder is a thin wrapper exposing .recorder.record(hbt). - recorder.recorder.record(self.hbt) + try: + recorder.recorder.record(self.hbt) + except IndexError: + # hftbacktest Recorder has a fixed capacity and raises IndexError + # when the record buffer is exhausted. Runtime treats this as + # recording exhaustion and keeps the backtest loop alive. + return True + return False diff --git a/core_runtime/backtest/engine/event_stream_cursor.py b/core_runtime/backtest/engine/event_stream_cursor.py index a7f3b8e..3550cab 100644 --- a/core_runtime/backtest/engine/event_stream_cursor.py +++ b/core_runtime/backtest/engine/event_stream_cursor.py @@ -20,6 +20,14 @@ def next_index(self) -> int: def attempt_position(self) -> ProcessingPosition: return ProcessingPosition(index=self._next_index) + def attempt_positions(self, count: int) -> tuple[ProcessingPosition, ...]: + if count < 0: + raise ValueError("count must be >= 0") + return tuple( + ProcessingPosition(index=self._next_index + offset) + for offset in range(count) + ) + def commit_success(self, position: ProcessingPosition) -> None: if position.index != self._next_index: raise ValueError( diff --git a/core_runtime/backtest/engine/hft_engine.py b/core_runtime/backtest/engine/hft_engine.py index 62cfd67..d15a996 100644 --- a/core_runtime/backtest/engine/hft_engine.py +++ b/core_runtime/backtest/engine/hft_engine.py @@ -4,21 +4,12 @@ import importlib from dataclasses import dataclass -from typing import TYPE_CHECKING - -from hftbacktest import ( - BacktestAsset, - Recorder, - ROIVectorMarketDepthBacktest, -) +from typing import TYPE_CHECKING, Any if TYPE_CHECKING: from tradingchassis_core.core.domain.configuration import CoreConfiguration from tradingchassis_core.core.risk.risk_config import RiskConfig -from tradingchassis_core.strategies.base import Strategy -from tradingchassis_core.strategies.strategy_config import StrategyConfig - from core_runtime.backtest.adapters.execution import HftBacktestExecutionAdapter from core_runtime.backtest.adapters.venue import HftBacktestVenueAdapter from core_runtime.backtest.engine.engine_base import ( @@ -27,6 +18,7 @@ BacktestResult, ) from core_runtime.backtest.engine.strategy_runner import HftStrategyRunner +from core_runtime.backtest.strategy_api import Strategy, StrategyConfig # pylint: disable=too-many-instance-attributes @@ -81,8 +73,10 @@ class HftBacktestConfig(BacktestConfig): core_cfg: CoreConfiguration -def _build_backtester(engine_cfg: HftEngineConfig) -> ROIVectorMarketDepthBacktest: +def _build_backtester(engine_cfg: HftEngineConfig) -> Any: """Create an ROIVectorMarketDepthBacktest from the engine configuration.""" + from hftbacktest import BacktestAsset, ROIVectorMarketDepthBacktest + asset = BacktestAsset() # For now we assume file paths. Later this can be replaced with an S3 resolver. @@ -126,10 +120,13 @@ def _load_strategy_class(self, class_path: str) -> type[Strategy]: module_path, class_name = class_path.split(":") module = importlib.import_module(module_path) cls = getattr(module, class_name) - - if not issubclass(cls, Strategy): + if not callable(getattr(cls, "on_feed", None)): + raise TypeError( + f"Loaded class {class_name} does not implement on_feed." + ) + if not callable(getattr(cls, "on_order_update", None)): raise TypeError( - f"Loaded class {class_name} is not a subclass of Strategy." + f"Loaded class {class_name} does not implement on_order_update." ) return cls @@ -150,6 +147,8 @@ def run(self) -> BacktestResult: hbt = _build_backtester(engine_cfg) # 2) Prepare recorder (single asset, record every step) + from hftbacktest import Recorder + recorder = Recorder(1, engine_cfg.max_steps) # 3) Build strategy and runner @@ -182,6 +181,6 @@ def run(self) -> BacktestResult: "strategy_name": strategy_cfg.class_path, "strategy_params": strategy_cfg.params, "risk_scope": risk_cfg.scope, - "risk_params": risk_cfg.params, + "risk_params": risk_cfg.model_dump(), }, ) diff --git a/core_runtime/backtest/engine/strategy_runner.py b/core_runtime/backtest/engine/strategy_runner.py index cd02717..716e843 100644 --- a/core_runtime/backtest/engine/strategy_runner.py +++ b/core_runtime/backtest/engine/strategy_runner.py @@ -3,22 +3,30 @@ from __future__ import annotations import logging -from collections import deque +import os from pathlib import Path from typing import TYPE_CHECKING, Any +from tradingchassis_core import ( + CoreExecutionControlApplyContext, + CorePolicyAdmissionContext, + run_core_step, + run_core_wakeup_step, +) from tradingchassis_core.core.domain.configuration import CoreConfiguration from tradingchassis_core.core.domain.processing import process_event_entry from tradingchassis_core.core.domain.processing_order import ( EventStreamEntry, ) from tradingchassis_core.core.domain.state import StrategyState +from tradingchassis_core.core.domain.step_result import CoreStepResult from tradingchassis_core.core.domain.types import ( BookLevel, BookPayload, ControlTimeEvent, MarketEvent, NewOrderIntent, + OrderExecutionFeedbackEvent, OrderIntent, OrderSubmittedEvent, Price, @@ -26,23 +34,101 @@ ) from tradingchassis_core.core.events.event_bus import EventBus from tradingchassis_core.core.events.sinks.sink_logging import LoggingEventSink -from tradingchassis_core.core.ports.venue_adapter import VenueAdapter +from tradingchassis_core.core.execution_control.execution_control import ExecutionControl +from tradingchassis_core.core.execution_control.types import ( + ControlSchedulingObligation, +) from tradingchassis_core.core.risk.risk_config import RiskConfig -from tradingchassis_core.core.risk.risk_engine import RejectedIntent, RiskEngine +from tradingchassis_core.core.risk.risk_engine import RiskEngine -from core_runtime.backtest.adapters.protocols import OrderSubmissionGateway +from core_runtime.backtest.adapters.protocols import OrderSubmissionGateway, VenueAdapter from core_runtime.backtest.engine.event_stream_cursor import EventStreamCursor -from core_runtime.core.events.sinks.file_recorder import FileRecorderSink +from core_runtime.backtest.events.sinks.file_recorder import FileRecorderSink +from core_runtime.backtest.strategy_api import Strategy if TYPE_CHECKING: - from tradingchassis_core.strategies.base import Strategy - from core_runtime.backtest.engine.hft_engine import HftEngineConfig MAX_TIMEOUT_NS = 1 << 62 # Effectively "wait forever" without a heartbeat +class _LegacyOnFeedStrategyEvaluator: + """Runtime-local adapter from legacy Strategy.on_feed to CoreStep evaluator.""" + + def __init__( + self, + *, + strategy: Strategy, + engine_cfg: HftEngineConfig, + constraints: object, + ) -> None: + self._strategy = strategy + self._engine_cfg = engine_cfg + self._constraints = constraints + + def evaluate(self, context: object) -> tuple[OrderIntent, ...]: + return tuple( + self._strategy.on_feed( + context.state, + context.event, + self._engine_cfg, + self._constraints, + ) + ) + + +class _LegacyOnOrderUpdateStrategyEvaluator: + """Runtime-local adapter from legacy Strategy.on_order_update to CoreStep.""" + + def __init__( + self, + *, + strategy: Strategy, + engine_cfg: HftEngineConfig, + constraints: object, + ) -> None: + self._strategy = strategy + self._engine_cfg = engine_cfg + self._constraints = constraints + + def evaluate(self, context: object) -> tuple[OrderIntent, ...]: + return tuple( + self._strategy.on_order_update( + context.state, + self._engine_cfg, + self._constraints, + ) + ) + + +class _LegacyWakeupStrategyEvaluator: + """Runtime-local adapter for one strategy evaluation per wakeup reduction.""" + + def __init__( + self, + *, + strategy: Strategy, + engine_cfg: HftEngineConfig, + constraints: object, + market_event: MarketEvent, + ) -> None: + self._strategy = strategy + self._engine_cfg = engine_cfg + self._constraints = constraints + self._market_event = market_event + + def evaluate(self, context: object) -> tuple[OrderIntent, ...]: + return tuple( + self._strategy.on_feed( + context.state, + self._market_event, + self._engine_cfg, + self._constraints, + ) + ) + + class HftStrategyRunner: """Strategy runner for HFT backtests. @@ -59,10 +145,45 @@ def __init__( strategy: Strategy, risk_cfg: RiskConfig, core_cfg: CoreConfiguration, + enable_core_step_market_dispatch: bool = True, + enable_core_step_control_time_dispatch: bool = True, + enable_core_step_wakeup_collapse: bool = False, + enable_core_step_order_feedback_dispatch: bool = True, ) -> None: self.engine_cfg = engine_cfg self.strategy = strategy self._core_cfg = core_cfg + self._enable_core_step_market_dispatch = enable_core_step_market_dispatch + self._enable_core_step_control_time_dispatch = ( + enable_core_step_control_time_dispatch + ) + self._enable_core_step_wakeup_collapse = enable_core_step_wakeup_collapse + self._enable_core_step_order_feedback_dispatch = ( + enable_core_step_order_feedback_dispatch + ) + if not self._enable_core_step_market_dispatch: + raise ValueError("clean-core runtime requires enable_core_step_market_dispatch=True") + if not self._enable_core_step_control_time_dispatch: + raise ValueError( + "clean-core runtime requires enable_core_step_control_time_dispatch=True" + ) + if not self._enable_core_step_order_feedback_dispatch: + raise ValueError( + "clean-core runtime requires enable_core_step_order_feedback_dispatch=True" + ) + if self._enable_core_step_wakeup_collapse and not self._enable_core_step_market_dispatch: + raise ValueError( + "enable_core_step_wakeup_collapse=True requires " + "enable_core_step_market_dispatch=True" + ) + if ( + self._enable_core_step_wakeup_collapse + and not self._enable_core_step_control_time_dispatch + ): + raise ValueError( + "enable_core_step_wakeup_collapse=True requires " + "enable_core_step_control_time_dispatch=True" + ) event_bus = self._build_event_bus( path=Path(engine_cfg.event_bus_path), @@ -74,12 +195,14 @@ def __init__( self.risk = RiskEngine( risk_cfg=risk_cfg, - event_bus=event_bus, ) + self.execution_control = ExecutionControl() self._next_send_ts_ns_local: int | None = None self._event_stream_cursor = EventStreamCursor() self._last_injected_control_deadline_ns: int | None = None + self._pending_control_scheduling_obligation: ControlSchedulingObligation | None = None + self._last_core_step_execution_errors: list[tuple[OrderIntent, str]] = [] def _process_canonical_event(self, event: object) -> None: position = self._event_stream_cursor.attempt_position() @@ -110,7 +233,6 @@ def _build_event_bus( def _close_event_bus(self) -> None: self.strategy_state._event_bus.close() - self.risk._event_bus.close() def _compute_timeout_ns(self, now_local_ns: int) -> int: """Compute wait timeout in nanoseconds.""" @@ -133,8 +255,63 @@ def intent_priority(intent: OrderIntent) -> int: return sorted(intents, key=lambda it: (intent_priority(it), it.ts_ns_local)) - def _process_canonical_market_event(self, market_event: MarketEvent) -> None: - self._process_canonical_event(market_event) + def _build_policy_and_apply_context( + self, + *, + now_ts_ns_local: int, + ) -> tuple[CorePolicyAdmissionContext, CoreExecutionControlApplyContext]: + rate_cfg = self.risk.risk_cfg.order_rate_limits + max_orders_per_sec = ( + None if rate_cfg is None else rate_cfg.max_orders_per_second + ) + max_cancels_per_sec = ( + None if rate_cfg is None else rate_cfg.max_cancels_per_second + ) + return ( + CorePolicyAdmissionContext( + policy_evaluator=self.risk, + now_ts_ns_local=now_ts_ns_local, + ), + CoreExecutionControlApplyContext( + execution_control=self.execution_control, + now_ts_ns_local=now_ts_ns_local, + max_orders_per_sec=max_orders_per_sec, + max_cancels_per_sec=max_cancels_per_sec, + activate_dispatchable_outputs=True, + ), + ) + + def _process_canonical_market_event( + self, + market_event: MarketEvent, + *, + constraints: object, + ) -> CoreStepResult: + position = self._event_stream_cursor.attempt_position() + entry = EventStreamEntry( + position=position, + event=market_event, + ) + ( + policy_admission_context, + execution_control_apply_context, + ) = self._build_policy_and_apply_context( + now_ts_ns_local=market_event.ts_ns_local, + ) + result = run_core_step( + self.strategy_state, + entry, + configuration=self._core_cfg, + strategy_evaluator=_LegacyOnFeedStrategyEvaluator( + strategy=self.strategy, + engine_cfg=self.engine_cfg, + constraints=constraints, + ), + policy_admission_context=policy_admission_context, + execution_control_apply_context=execution_control_apply_context, + ) + self._event_stream_cursor.commit_success(position) + return result def _process_canonical_order_submitted_event( self, @@ -157,22 +334,190 @@ def _process_canonical_order_submitted_event( ) self._process_canonical_event(order_submitted_event) + def _build_order_execution_feedback_event( + self, + *, + instrument: str, + sim_now_ns: int, + state_values: object, + ) -> OrderExecutionFeedbackEvent: + return OrderExecutionFeedbackEvent( + ts_ns_local_feedback=sim_now_ns, + instrument=instrument, + position=float(state_values.position), + balance=float(state_values.balance), + fee=float(state_values.fee), + trading_volume=float(state_values.trading_volume), + trading_value=float(state_values.trading_value), + num_trades=int(state_values.num_trades), + runtime_correlation=None, + ) + + def _process_canonical_order_feedback_event( + self, + order_feedback_event: OrderExecutionFeedbackEvent, + *, + constraints: object, + ) -> CoreStepResult: + position = self._event_stream_cursor.attempt_position() + entry = EventStreamEntry( + position=position, + event=order_feedback_event, + ) + ( + policy_admission_context, + execution_control_apply_context, + ) = self._build_policy_and_apply_context( + now_ts_ns_local=order_feedback_event.ts_ns_local_feedback, + ) + result = run_core_step( + self.strategy_state, + entry, + configuration=self._core_cfg, + strategy_evaluator=_LegacyOnOrderUpdateStrategyEvaluator( + strategy=self.strategy, + engine_cfg=self.engine_cfg, + constraints=constraints, + ), + policy_admission_context=policy_admission_context, + execution_control_apply_context=execution_control_apply_context, + ) + self._event_stream_cursor.commit_success(position) + return result + def _process_canonical_control_time_event( self, *, + now_ts_ns_local: int, sim_now_ns: int, scheduled_deadline_ns: int, - ) -> None: - control_time_event = ControlTimeEvent( + scheduling_obligation: ControlSchedulingObligation | None = None, + ) -> CoreStepResult: + control_time_event = self._build_control_time_event( + sim_now_ns=sim_now_ns, + scheduled_deadline_ns=scheduled_deadline_ns, + scheduling_obligation=scheduling_obligation, + ) + position = self._event_stream_cursor.attempt_position() + entry = EventStreamEntry( + position=position, + event=control_time_event, + ) + ( + policy_admission_context, + execution_control_apply_context, + ) = self._build_policy_and_apply_context( + now_ts_ns_local=now_ts_ns_local, + ) + result = run_core_step( + self.strategy_state, + entry, + configuration=self._core_cfg, + policy_admission_context=policy_admission_context, + execution_control_apply_context=execution_control_apply_context, + ) + self._event_stream_cursor.commit_success(position) + return result + + def _build_control_time_event( + self, + *, + sim_now_ns: int, + scheduled_deadline_ns: int, + scheduling_obligation: ControlSchedulingObligation | None = None, + ) -> ControlTimeEvent: + obligation_reason = "rate_limit" + obligation_due_ts_ns_local = scheduled_deadline_ns + if scheduling_obligation is not None: + obligation_reason = scheduling_obligation.reason + obligation_due_ts_ns_local = scheduling_obligation.due_ts_ns_local + return ControlTimeEvent( ts_ns_local_control=sim_now_ns, reason="scheduled_control_recheck", due_ts_ns_local=scheduled_deadline_ns, realized_ts_ns_local=sim_now_ns, - obligation_reason="rate_limit", - obligation_due_ts_ns_local=scheduled_deadline_ns, + obligation_reason=obligation_reason, + obligation_due_ts_ns_local=obligation_due_ts_ns_local, runtime_correlation=None, ) - self._process_canonical_event(control_time_event) + + def _allocate_wakeup_entries( + self, + events: list[object], + ) -> tuple[EventStreamEntry, ...]: + positions = self._event_stream_cursor.attempt_positions(len(events)) + return tuple( + EventStreamEntry( + position=position, + event=event, + ) + for position, event in zip(positions, events, strict=True) + ) + + def _commit_wakeup_entries(self, entries: tuple[EventStreamEntry, ...]) -> None: + for entry in entries: + self._event_stream_cursor.commit_success(entry.position) + + def _clear_pending_control_scheduling_obligation(self) -> None: + self._pending_control_scheduling_obligation = None + self._next_send_ts_ns_local = None + + def _consume_pending_control_scheduling_obligation( + self, + ) -> ControlSchedulingObligation | None: + pending = self._pending_control_scheduling_obligation + self._clear_pending_control_scheduling_obligation() + return pending + + def _apply_control_scheduling_obligation( + self, + obligation: ControlSchedulingObligation | None, + ) -> None: + if obligation is None: + return + if self._pending_control_scheduling_obligation == obligation: + self._next_send_ts_ns_local = obligation.due_ts_ns_local + return + self._pending_control_scheduling_obligation = obligation + self._next_send_ts_ns_local = obligation.due_ts_ns_local + + def _dispatch_accepted_intents( + self, + accepted_now: list[OrderIntent], + execution: OrderSubmissionGateway, + *, + sim_now_ns: int, + ) -> list[tuple[OrderIntent, str]]: + if not accepted_now: + return [] + + execution_errors = execution.apply_intents(accepted_now) + failed_keys = { + (it.instrument, it.client_order_id) + for it, _ in execution_errors + } + + for it in accepted_now: + if (it.instrument, it.client_order_id) in failed_keys: + continue + if it.intent_type == "new": + self._process_canonical_order_submitted_event( + it, + ts_ns_local_dispatch=sim_now_ns, + ) + # Clean-Core runtime treats OrderSubmittedEvent as the successful + # NEW dispatch acknowledgment boundary. Calling mark_intent_sent + # here would recreate NEW inflight state immediately after the + # submitted reducer clears it and can cause non-terminating + # inflight-only loops near end-of-data. + continue + self.strategy_state.mark_intent_sent( + it.instrument, + it.client_order_id, + it.intent_type, + ) + + return execution_errors def run( self, @@ -182,6 +527,37 @@ def run( ) -> None: """Run the backtest loop.""" # pylint: disable=too-many-locals,too-many-branches,too-many-statements + debug_loop = os.getenv("TRADINGCHASSIS_DEBUG_LOOP", "").strip() in { + "1", + "true", + "TRUE", + "yes", + "YES", + } + debug_max_iterations_raw = os.getenv("TRADINGCHASSIS_DEBUG_MAX_ITERATIONS", "").strip() + debug_max_iterations: int | None = None + if debug_max_iterations_raw: + parsed = int(debug_max_iterations_raw) + if parsed <= 0: + raise ValueError("TRADINGCHASSIS_DEBUG_MAX_ITERATIONS must be > 0") + debug_max_iterations = parsed + debug_every_raw = os.getenv("TRADINGCHASSIS_DEBUG_LOOP_EVERY", "").strip() + debug_every = 100 + if debug_every_raw: + parsed_every = int(debug_every_raw) + if parsed_every <= 0: + raise ValueError("TRADINGCHASSIS_DEBUG_LOOP_EVERY must be > 0") + debug_every = parsed_every + debug_logger = logging.getLogger("core_runtime.backtest.loop") + loop_iteration = 0 + last_rc: int | None = None + last_timeout_ns: int | None = None + last_sim_ts_before_wait: int | None = None + last_sim_ts_after_wait: int | None = None + no_progress_iterations = 0 + recorder_exhausted_count = 0 + end_signal_count = 0 + last_debug_signature: tuple[object, ...] | None = None instrument = self.engine_cfg.instrument # Initialize hftbacktest engine @@ -192,18 +568,68 @@ def run( sim_now_ns = self.strategy_state.sim_ts_ns_local while True: - timeout_ns = self._compute_timeout_ns(self.strategy_state.sim_ts_ns_local) + loop_iteration += 1 + if ( + debug_max_iterations is not None + and loop_iteration > debug_max_iterations + ): + pending_due = ( + None + if self._pending_control_scheduling_obligation is None + else self._pending_control_scheduling_obligation.due_ts_ns_local + ) + pending_reason = ( + None + if self._pending_control_scheduling_obligation is None + else self._pending_control_scheduling_obligation.reason + ) + queued_count = sum( + len(queue) + for queue in self.strategy_state.queued_intents.values() + ) + inflight_count = sum( + len(bucket) + for bucket in self.strategy_state.inflight.values() + ) + raise RuntimeError( + "TRADINGCHASSIS_DEBUG_MAX_ITERATIONS exceeded in HftStrategyRunner.run: " + f"iteration={loop_iteration}, sim_ts_ns_local={self.strategy_state.sim_ts_ns_local}, " + f"pending_due={pending_due}, pending_reason={pending_reason}, " + f"last_injected_control_deadline_ns={self._last_injected_control_deadline_ns}, " + f"queued_count={queued_count}, inflight_count={inflight_count}, " + f"last_rc={last_rc}, last_timeout_ns={last_timeout_ns}, " + f"sim_ts_before_wait={last_sim_ts_before_wait}, sim_ts_after_wait={last_sim_ts_after_wait}, " + f"no_progress_iterations={no_progress_iterations}, " + f"recorder_exhausted_count={recorder_exhausted_count}, end_signal_count={end_signal_count}" + ) + + sim_ts_before_wait = self.strategy_state.sim_ts_ns_local + timeout_ns = self._compute_timeout_ns(sim_ts_before_wait) rc = venue.wait_next(timeout_ns=timeout_ns, include_order_resp=True) + last_rc = rc + last_timeout_ns = timeout_ns + last_sim_ts_before_wait = sim_ts_before_wait if rc == 1: + end_signal_count += 1 self._close_event_bus() break observed_local_ns = venue.current_timestamp_ns() self.strategy_state.update_timestamp(observed_local_ns) sim_now_ns = self.strategy_state.sim_ts_ns_local + last_sim_ts_after_wait = sim_now_ns + venue_progressed = sim_now_ns > sim_ts_before_wait - raw_intents: list[OrderIntent] = [] + market_step_result: CoreStepResult | None = None + control_step_result: CoreStepResult | None = None + order_feedback_step_result: CoreStepResult | None = None + market_event: MarketEvent | None = None + market_processed = False + order_feedback_processed = False + control_time_injected = False + dispatch_attempted_count = 0 + stale_obligation_cleared = False # ----------------------------------------------------------------- # Market update @@ -280,124 +706,342 @@ def run( ), ) - self._process_canonical_market_event(market_event) - constraints = self.risk.build_constraints(sim_now_ns) - raw_intents.extend( - self.strategy.on_feed( - self.strategy_state, + if getattr(self, "_enable_core_step_wakeup_collapse", False): + pass + else: + market_step_result = self._process_canonical_market_event( market_event, - self.engine_cfg, - constraints, + constraints=constraints, ) - ) + market_processed = True # ----------------------------------------------------------------- # Order / account update # ----------------------------------------------------------------- if rc == 3: state_values, orders = venue.read_orders_snapshot() - - self.strategy_state.update_account( + _ = orders # runtime keeps raw snapshot ownership; core receives canonical feedback + constraints = self.risk.build_constraints(sim_now_ns) + order_feedback_event = self._build_order_execution_feedback_event( instrument=instrument, - position=state_values.position, - balance=state_values.balance, - fee=state_values.fee, - trading_volume=state_values.trading_volume, - trading_value=state_values.trading_value, - num_trades=state_values.num_trades, - ) - self.strategy_state.ingest_order_snapshots( - instrument, - orders.values(), + sim_now_ns=sim_now_ns, + state_values=state_values, ) - - constraints = self.risk.build_constraints(sim_now_ns) - raw_intents.extend( - self.strategy.on_order_update( - self.strategy_state, - self.engine_cfg, - constraints, - ) + order_feedback_step_result = self._process_canonical_order_feedback_event( + order_feedback_event, + constraints=constraints, ) + order_feedback_processed = True # ----------------------------------------------------------------- # Queue flush # ----------------------------------------------------------------- scheduled_deadline_ns: int | None = None + scheduling_obligation: ControlSchedulingObligation | None = None + if self._pending_control_scheduling_obligation is not None: + scheduling_obligation = self._pending_control_scheduling_obligation + scheduled_deadline_ns = scheduling_obligation.due_ts_ns_local + elif self._next_send_ts_ns_local is not None: + # Transitional compatibility for scalar-only decisions. + scheduled_deadline_ns = self._next_send_ts_ns_local if ( - self._next_send_ts_ns_local is not None - and sim_now_ns >= self._next_send_ts_ns_local + scheduling_obligation is not None + and scheduled_deadline_ns is not None + and sim_now_ns >= scheduled_deadline_ns + and scheduled_deadline_ns == self._last_injected_control_deadline_ns + ): + # The same control deadline has already been realized once. + # Keeping it pending would force timeout_ns=0 and can spin forever + # when venue time no longer advances near end-of-data. + self._consume_pending_control_scheduling_obligation() + scheduling_obligation = None + scheduled_deadline_ns = None + stale_obligation_cleared = True + if ( + scheduled_deadline_ns is not None + and sim_now_ns >= scheduled_deadline_ns ): - scheduled_deadline_ns = self._next_send_ts_ns_local - raw_intents.extend( - self.strategy_state.pop_queued_intents(instrument) - ) if ( scheduled_deadline_ns != self._last_injected_control_deadline_ns ): - self._process_canonical_control_time_event( - sim_now_ns=sim_now_ns, - scheduled_deadline_ns=scheduled_deadline_ns, - ) - self._last_injected_control_deadline_ns = scheduled_deadline_ns + if getattr(self, "_enable_core_step_wakeup_collapse", False): + pass + else: + control_step_result = self._process_canonical_control_time_event( + now_ts_ns_local=sim_now_ns, + sim_now_ns=sim_now_ns, + scheduled_deadline_ns=scheduled_deadline_ns, + scheduling_obligation=scheduling_obligation, + ) + control_time_injected = True + self._last_injected_control_deadline_ns = scheduled_deadline_ns + if scheduling_obligation is not None: + self._consume_pending_control_scheduling_obligation() + else: + self._next_send_ts_ns_local = None - # ----------------------------------------------------------------- - # Gate + execution - # ----------------------------------------------------------------- - if raw_intents: - combined = self._sort_intents_for_gate(raw_intents) + if getattr(self, "_enable_core_step_wakeup_collapse", False): + collapse_events: list[object] = [] + included_control_time = False + if market_event is not None: + collapse_events.append(market_event) + if ( + scheduled_deadline_ns is not None + and sim_now_ns >= scheduled_deadline_ns + and scheduled_deadline_ns != self._last_injected_control_deadline_ns + ): + collapse_events.append( + self._build_control_time_event( + sim_now_ns=sim_now_ns, + scheduled_deadline_ns=scheduled_deadline_ns, + scheduling_obligation=scheduling_obligation, + ) + ) + included_control_time = True - decision = self.risk.decide_intents( - raw_intents=combined, - state=self.strategy_state, - now_ts_ns_local=sim_now_ns, + if collapse_events: + wakeup_entries = self._allocate_wakeup_entries(collapse_events) + collapse_constraints = self.risk.build_constraints(sim_now_ns) + strategy_evaluator = None + if market_event is not None: + strategy_evaluator = _LegacyWakeupStrategyEvaluator( + strategy=self.strategy, + engine_cfg=self.engine_cfg, + constraints=collapse_constraints, + market_event=market_event, + ) + ( + policy_admission_context, + execution_control_apply_context, + ) = self._build_policy_and_apply_context( + now_ts_ns_local=sim_now_ns, + ) + wakeup_result = run_core_wakeup_step( + self.strategy_state, + wakeup_entries, + configuration=self._core_cfg, + wakeup_strategy_evaluator=strategy_evaluator, + queued_instrument=instrument, + policy_admission_context=policy_admission_context, + execution_control_apply_context=execution_control_apply_context, + ) + self._commit_wakeup_entries(wakeup_entries) + if included_control_time and scheduled_deadline_ns is not None: + self._last_injected_control_deadline_ns = scheduled_deadline_ns + if scheduling_obligation is not None: + self._consume_pending_control_scheduling_obligation() + else: + self._next_send_ts_ns_local = None + self._last_core_step_execution_errors = self._dispatch_accepted_intents( + list(wakeup_result.dispatchable_intents), + execution, + sim_now_ns=sim_now_ns, + ) + dispatch_attempted_count += len(wakeup_result.dispatchable_intents) + if wakeup_result.control_scheduling_obligation is None: + self._clear_pending_control_scheduling_obligation() + else: + self._apply_control_scheduling_obligation( + wakeup_result.control_scheduling_obligation + ) + elif control_step_result is not None: + self._last_core_step_execution_errors = ( + self._dispatch_accepted_intents( + list(control_step_result.dispatchable_intents), + execution, + sim_now_ns=sim_now_ns, + ) ) + dispatch_attempted_count += len(control_step_result.dispatchable_intents) + if control_step_result.control_scheduling_obligation is None: + self._clear_pending_control_scheduling_obligation() + else: + self._apply_control_scheduling_obligation( + control_step_result.control_scheduling_obligation + ) - execution_errors: list[tuple[OrderIntent, str]] = [] - if decision.accepted_now: - execution_errors = execution.apply_intents( - decision.accepted_now + if ( + not getattr(self, "_enable_core_step_wakeup_collapse", False) + and market_step_result is not None + ): + self._last_core_step_execution_errors = self._dispatch_accepted_intents( + list(market_step_result.dispatchable_intents), + execution, + sim_now_ns=sim_now_ns, + ) + dispatch_attempted_count += len(market_step_result.dispatchable_intents) + if market_step_result.control_scheduling_obligation is None: + self._clear_pending_control_scheduling_obligation() + else: + self._apply_control_scheduling_obligation( + market_step_result.control_scheduling_obligation ) - failed_keys = { - (it.instrument, it.client_order_id) - for it, _ in execution_errors - } - - for it in decision.accepted_now: - if (it.instrument, it.client_order_id) in failed_keys: - continue - if it.intent_type == "new": - self._process_canonical_order_submitted_event( - it, - ts_ns_local_dispatch=sim_now_ns, - ) - self.strategy_state.mark_intent_sent( - it.instrument, - it.client_order_id, - it.intent_type, - ) + if order_feedback_step_result is not None: + self._last_core_step_execution_errors = self._dispatch_accepted_intents( + list(order_feedback_step_result.dispatchable_intents), + execution, + sim_now_ns=sim_now_ns, + ) + dispatch_attempted_count += len(order_feedback_step_result.dispatchable_intents) + if order_feedback_step_result.control_scheduling_obligation is None: + self._clear_pending_control_scheduling_obligation() + else: + self._apply_control_scheduling_obligation( + order_feedback_step_result.control_scheduling_obligation + ) - if execution_errors: - for it, reason in execution_errors: - decision.execution_rejected.append( - RejectedIntent(it, reason) - ) + pending_due = ( + None + if self._pending_control_scheduling_obligation is None + else self._pending_control_scheduling_obligation.due_ts_ns_local + ) + pending_reason = ( + None + if self._pending_control_scheduling_obligation is None + else self._pending_control_scheduling_obligation.reason + ) + dispatchable_market = ( + 0 + if market_step_result is None + else len(market_step_result.dispatchable_intents) + ) + dispatchable_control = ( + 0 + if control_step_result is None + else len(control_step_result.dispatchable_intents) + ) + dispatchable_feedback = ( + 0 + if order_feedback_step_result is None + else len(order_feedback_step_result.dispatchable_intents) + ) + queued_count = sum( + len(queue) + for queue in self.strategy_state.queued_intents.values() + ) + inflight_count = sum( + len(bucket) + for bucket in self.strategy_state.inflight.values() + ) + queued_keys = tuple( + f"{instrument_key}:{queued.logical_key}" + for instrument_key in sorted(self.strategy_state.queued_intents) + for queued in list(self.strategy_state.queued_intents[instrument_key])[:3] + )[:6] + inflight_keys = tuple( + f"{instrument_key}:{client_order_id}" + for instrument_key in sorted(self.strategy_state.inflight) + for client_order_id in sorted(self.strategy_state.inflight[instrument_key])[:3] + )[:6] + event_processed = market_processed or order_feedback_processed or control_time_injected + has_pending_core_work = ( + queued_count > 0 + or inflight_count > 0 + or self._pending_control_scheduling_obligation is not None + or self._next_send_ts_ns_local is not None + ) + no_event_rc = rc not in {1, 2, 3} + no_progress_iteration = ( + not venue_progressed + and not event_processed + and dispatch_attempted_count == 0 + and no_event_rc + ) + if no_progress_iteration: + no_progress_iterations += 1 + else: + no_progress_iterations = 0 + termination_no_work_no_progress = ( + not has_pending_core_work + and no_progress_iteration + ) - self.strategy.on_risk_decision(decision) - self._next_send_ts_ns_local = decision.next_send_ts_ns_local + if debug_loop: + debug_signature = ( + rc, + timeout_ns, + sim_ts_before_wait, + sim_now_ns, + has_pending_core_work, + queued_count, + inflight_count, + pending_due, + pending_reason, + market_processed, + order_feedback_processed, + control_time_injected, + dispatch_attempted_count, + termination_no_work_no_progress, + ) + should_emit_debug = ( + loop_iteration == 1 + or (loop_iteration % debug_every) == 0 + or debug_signature != last_debug_signature + or termination_no_work_no_progress + ) + if should_emit_debug: + debug_logger.info( + "loop=%s rc=%s sim_ts_ns_local=%s timeout_ns=%s market_processed=%s " + "order_feedback_processed=%s control_time_injected=%s " + "pending_due=%s pending_reason=%s stale_obligation_cleared=%s " + "dispatchable_market=%s dispatchable_control=%s dispatchable_feedback=%s " + "dispatch_attempted=%s queued_count=%s inflight_count=%s " + "queued_keys=%s inflight_keys=%s sim_ts_before_wait=%s " + "sim_ts_after_wait=%s no_progress_iterations=%s has_pending_core_work=%s " + "termination_no_work_no_progress=%s no_event_rc=%s", + loop_iteration, + rc, + sim_now_ns, + timeout_ns, + market_processed, + order_feedback_processed, + control_time_injected, + pending_due, + pending_reason, + stale_obligation_cleared, + dispatchable_market, + dispatchable_control, + dispatchable_feedback, + dispatch_attempted_count, + queued_count, + inflight_count, + queued_keys, + inflight_keys, + sim_ts_before_wait, + sim_now_ns, + no_progress_iterations, + has_pending_core_work, + termination_no_work_no_progress, + no_event_rc, + ) + print( + "[TRADINGCHASSIS_DEBUG_LOOP] " + f"loop={loop_iteration} rc={rc} timeout_ns={timeout_ns} " + f"sim_ts_before_wait={sim_ts_before_wait} sim_ts_after_wait={sim_now_ns} " + f"market_processed={market_processed} order_feedback_processed={order_feedback_processed} " + f"control_time_injected={control_time_injected} dispatch_attempted={dispatch_attempted_count} " + f"pending_due={pending_due} pending_reason={pending_reason} " + f"queued_count={queued_count} inflight_count={inflight_count} " + f"queued_keys={queued_keys} inflight_keys={inflight_keys} " + f"no_progress_iterations={no_progress_iterations} " + f"termination_no_work_no_progress={termination_no_work_no_progress} " + f"no_event_rc={no_event_rc}", + flush=True, + ) + last_debug_signature = debug_signature - # If there are queued intents but the gate did not provide a next_send_ts_ns_local, - # wake up at the next second boundary to ensure progress. - if self._next_send_ts_ns_local is None: - queue = self.strategy_state.queued_intents.setdefault( - instrument, - deque(), + if termination_no_work_no_progress: + if debug_loop: + print( + "[TRADINGCHASSIS_DEBUG_LOOP] breaking no-work/no-progress loop", + flush=True, ) - if queue: - sec = sim_now_ns // 1_000_000_000 - self._next_send_ts_ns_local = (sec + 1) * 1_000_000_000 + self._close_event_bus() + break - venue.record(recorder) + recorder_exhausted = bool(venue.record(recorder)) + if recorder_exhausted: + recorder_exhausted_count += 1 diff --git a/core_runtime/core/events/sinks/file_recorder.py b/core_runtime/backtest/events/sinks/file_recorder.py similarity index 99% rename from core_runtime/core/events/sinks/file_recorder.py rename to core_runtime/backtest/events/sinks/file_recorder.py index bfe46bf..d48ca85 100644 --- a/core_runtime/core/events/sinks/file_recorder.py +++ b/core_runtime/backtest/events/sinks/file_recorder.py @@ -28,3 +28,4 @@ def close(self) -> None: self._fh.flush() self._fh.close() self._closed = True + diff --git a/core_runtime/backtest/runtime/run_sweep.py b/core_runtime/backtest/runtime/run_sweep.py index 30865d0..dc421c4 100644 --- a/core_runtime/backtest/runtime/run_sweep.py +++ b/core_runtime/backtest/runtime/run_sweep.py @@ -14,7 +14,6 @@ from typing import Any from tradingchassis_core.core.risk.risk_config import RiskConfig -from tradingchassis_core.strategies.strategy_config import StrategyConfig from core_runtime.backtest.engine.hft_engine import ( HftBacktestConfig, @@ -26,6 +25,7 @@ from core_runtime.backtest.runtime.core_configuration_mapper import ( build_core_configuration_from_run_config, ) +from core_runtime.backtest.strategy_api import StrategyConfig class SweepMaterializer: @@ -462,7 +462,7 @@ def main() -> None: materializer.materialize(ctx) engine_cfg = HftEngineConfig(**ctx.parameters["engine"]) - strategy_cfg = StrategyConfig(**ctx.parameters["strategy"]) + strategy_cfg = StrategyConfig.from_mapping(ctx.parameters["strategy"]) risk_cfg = RiskConfig(**ctx.parameters["risk"]) runner = SweepEngineRunner( diff --git a/core_runtime/backtest/strategy_api.py b/core_runtime/backtest/strategy_api.py new file mode 100644 index 0000000..ba5190e --- /dev/null +++ b/core_runtime/backtest/strategy_api.py @@ -0,0 +1,57 @@ +"""Runtime-local Strategy protocol and config model. + +Core no longer exports strategy construction interfaces; runtime owns these. +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Any, Mapping, Protocol, runtime_checkable + +from tradingchassis_core.core.domain.state import StrategyState +from tradingchassis_core.core.domain.types import MarketEvent, OrderIntent, RiskConstraints + + +@runtime_checkable +class Strategy(Protocol): + """Runtime strategy callback contract.""" + + def on_feed( + self, + state: StrategyState, + event: MarketEvent, + engine_cfg: object, + constraints: RiskConstraints, + ) -> list[OrderIntent]: + """Return intents generated for one market event.""" + + def on_order_update( + self, + state: StrategyState, + engine_cfg: object, + constraints: RiskConstraints, + ) -> list[OrderIntent]: + """Return intents generated for one execution-feedback update.""" + + +@dataclass(frozen=True, slots=True) +class StrategyConfig: + """Runtime-local strategy constructor config.""" + + class_path: str + params: dict[str, Any] = field(default_factory=dict) + + @classmethod + def from_mapping(cls, raw: Mapping[str, Any]) -> StrategyConfig: + if "class_path" not in raw: + raise ValueError("strategy.class_path is required") + class_path = str(raw["class_path"]) + params = { + key: value + for key, value in raw.items() + if key != "class_path" + } + return cls(class_path=class_path, params=params) + + def to_engine_params(self) -> dict[str, Any]: + return dict(self.params) diff --git a/core_runtime/core/__init__.py b/core_runtime/core/__init__.py deleted file mode 100644 index 27d3575..0000000 --- a/core_runtime/core/__init__.py +++ /dev/null @@ -1,2 +0,0 @@ -"""Runtime-owned modules that are not part of the semantic core.""" - diff --git a/core_runtime/core/events/__init__.py b/core_runtime/core/events/__init__.py deleted file mode 100644 index f8d3dcb..0000000 --- a/core_runtime/core/events/__init__.py +++ /dev/null @@ -1,2 +0,0 @@ -"""Runtime event plumbing (sinks, emitters, wiring).""" - diff --git a/core_runtime/core/events/sinks/__init__.py b/core_runtime/core/events/sinks/__init__.py deleted file mode 100644 index 8749baf..0000000 --- a/core_runtime/core/events/sinks/__init__.py +++ /dev/null @@ -1,2 +0,0 @@ -"""Concrete runtime event sinks (I/O).""" - diff --git a/core_runtime/local/backtest.py b/core_runtime/local/backtest.py index 62a58fb..8b66190 100644 --- a/core_runtime/local/backtest.py +++ b/core_runtime/local/backtest.py @@ -11,7 +11,6 @@ from core_runtime.backtest.engine.engine_base import BacktestResult from tradingchassis_core.core.risk.risk_config import RiskConfig -from tradingchassis_core.strategies.strategy_config import StrategyConfig from core_runtime.backtest.engine.hft_engine import ( HftBacktestConfig, @@ -21,6 +20,7 @@ from core_runtime.backtest.runtime.core_configuration_mapper import ( build_core_configuration_from_run_config, ) +from core_runtime.backtest.strategy_api import StrategyConfig def load_config(path: str) -> HftBacktestConfig: @@ -38,7 +38,7 @@ def load_config(path: str) -> HftBacktestConfig: ) from exc engine_cfg = HftEngineConfig(**engine_raw) - strategy_cfg = StrategyConfig(**strategy_raw) + strategy_cfg = StrategyConfig.from_mapping(strategy_raw) risk_cfg = RiskConfig(**risk_raw) core_cfg = build_core_configuration_from_run_config(raw_json) diff --git a/core_runtime/local/local.json b/core_runtime/local/bt_config_local.json similarity index 97% rename from core_runtime/local/local.json rename to core_runtime/local/bt_config_local.json index 83ef50b..3af644e 100644 --- a/core_runtime/local/local.json +++ b/core_runtime/local/bt_config_local.json @@ -33,7 +33,7 @@ "roi_ub": 80000, "stats_npz_path": ".runtime/local/results/stats.npz", - "event_bus_path": ".runtime/local/results/events.json" + "event_bus_path": ".runtime/local/results/events.jsonl" }, "risk": { diff --git a/core_runtime/strategies/debug_strategy.py b/core_runtime/strategies/debug_strategy.py index 42486d1..226b453 100644 --- a/core_runtime/strategies/debug_strategy.py +++ b/core_runtime/strategies/debug_strategy.py @@ -3,13 +3,7 @@ from typing import TYPE_CHECKING if TYPE_CHECKING: - from tradingchassis_core import ( - EngineContext, - GateDecision, - MarketEvent, - RiskConstraints, - StrategyState, - ) + from tradingchassis_core import MarketEvent, RiskConstraints, StrategyState from tradingchassis_core import ( NewOrderIntent, @@ -18,10 +12,11 @@ Quantity, ReplaceOrderIntent, SlotKey, - Strategy, stable_slot_order_id, ) +from core_runtime.backtest.strategy_api import Strategy + _SLOT_NAMESPACE = "debug_strategy_v1" @@ -52,7 +47,7 @@ def on_feed( self, state: StrategyState, event: MarketEvent, - engine_cfg: EngineContext, + engine_cfg: object, constraints: RiskConstraints, ) -> list[OrderIntent]: """Feed-triggered logic (rc=2). Inputs are read-only for Strategy, otherwise considered a bug.""" @@ -84,7 +79,11 @@ def on_feed( instrument = str(event.instrument) def is_slot_busy(client_order_id: str) -> bool: - return state.is_order_id_busy(instrument, client_order_id) + return ( + state.has_working_order(instrument, client_order_id) + or state.has_inflight(instrument, client_order_id) + or state.has_queued_intent(instrument, client_order_id) + ) def bid_price_for_level(level_index: int) -> float: if level_index < len(event.book.bids): @@ -174,11 +173,12 @@ def ask_price_for_level(level_index: int) -> float: def on_order_update( self, state: StrategyState, - engine_cfg: EngineContext, + engine_cfg: object, constraints: RiskConstraints, ) -> list[OrderIntent]: """Order-update-triggered logic (rc=3). Inputs are read-only for Strategy, otherwise considered a bug.""" return [] - def on_risk_decision(self, decision: GateDecision) -> None: - self.intents_after_risk = decision.accepted_now + def on_risk_decision(self, decision: object) -> None: + accepted_now = getattr(decision, "accepted_now", []) + self.intents_after_risk = list(accepted_now) diff --git a/docs/venue-adapter-abstraction-design-v1.md b/docs/venue-adapter-abstraction-design-v1.md index e43d623..e8a2b49 100644 --- a/docs/venue-adapter-abstraction-design-v1.md +++ b/docs/venue-adapter-abstraction-design-v1.md @@ -13,8 +13,7 @@ This is a docs-only slice: - it does not modify production code or tests; - it does not change runtime behavior; - it does not implement canonical `FillEvent` ingress; -- it does not canonicalize `OrderStateEvent`; -- it does not change `DerivedFillEvent` behavior; +- it does not expand canonical account feedback beyond current `OrderExecutionFeedbackEvent`; - it does not change snapshot ingestion behavior; - it does not change reducers or event taxonomy; - it does not implement `ProcessingContext`; @@ -80,8 +79,8 @@ they do not define production protocol signatures yet. | `VenueClock` (runtime clock boundary view) | provide adopted venue-local timestamp axis used by runtime timestamp update | runtime/internal only | mapped by `current_timestamp_ns()` wrapper | may expose richer venue receipt/event-time metadata while runtime keeps canonical ordering by `ProcessingPosition` | clock/timestamp must not be treated as `ProcessingOrder` authority | | `MarketInputSource` | provide market snapshots/deltas for canonical market mapping | canonical event capable | `read_market_snapshot()` mapped to canonical `MarketEvent` in runner | live adapters may map native book/trade feeds into canonical market events under runtime mapping | no hidden mutable snapshot promotion to canonical semantics outside boundary mapping | | `OrderSubmissionGateway` | submit/modify/cancel outbound intents and expose dispatch result boundary | canonical event capable (submission boundary), plus runtime/internal transport | `apply_intents(...)`; successful `new` dispatch leads to canonical `OrderSubmittedEvent` | live adapters may provide richer dispatch metadata while preserving current canonical submission boundary semantics | no post-submission execution authority from synchronous return codes | -| `OrderSnapshotSource` | provide order snapshots for compatibility lifecycle materialization | compatibility projection only | `read_orders_snapshot()` -> `ingest_order_snapshots()` -> `OrderStateEvent` path | may remain compatibility sidecar where canonical execution feedback is unavailable | no `OrderStateEvent` canonicalization; no snapshot-to-canonical promotion | -| `AccountSnapshotSource` | provide account snapshots for runtime/account views and compatibility projections | compatibility projection only / runtime/internal only | `state_values` adoption into `update_account(...)` | live adapters may offer richer account views without canonical authority by default | no implicit canonical account event expansion in this slice | +| `OrderSnapshotSource` | provide raw order snapshots for runtime-local bookkeeping only | runtime/internal only | `read_orders_snapshot()` -> runtime-side bookkeeping (no Core snapshot reducer input) | may remain runtime sidecar where canonical execution feedback is unavailable | no snapshot row payload promotion into Core | +| `AccountSnapshotSource` | provide account snapshots for canonical execution feedback mapping | canonical feedback input + runtime/internal support | `state_values` -> canonical `OrderExecutionFeedbackEvent` | live adapters may offer richer account views without changing Core boundaries | no implicit expansion beyond current canonical feedback event schema | | `ExecutionFeedbackRecordSource` | provide authoritative execution-feedback records for future canonical `FillEvent` mapping | optional future capability (canonical only after REFC/RAEFSC gates) | unsupported/ineligible today for hftbacktest integration | live adapters may satisfy this with native execution reports and deterministic source sequencing | no `FillEvent` ingress implementation here; no synthetic required-field authority | --- @@ -93,14 +92,13 @@ they do not define production protocol signatures yet. - `MarketInputSource`: supported; canonical `MarketEvent` mapping path exists. - `OrderSubmissionGateway`: supported for successful `new` dispatch boundary via canonical `OrderSubmittedEvent` path. -- `OrderSnapshotSource`: supported; remains compatibility-only. -- `AccountSnapshotSource`: supported for compatibility/runtime-internal account - snapshot adoption. +- `OrderSnapshotSource`: supported; remains runtime-internal bookkeeping only. +- `AccountSnapshotSource`: supported for canonical account-level feedback mapping. - `VenueEventWaiter` + `VenueClock`: supported through existing wrappers. - `ExecutionFeedbackRecordSource`: unsupported/ineligible today. -`VADN-07` - Compatibility authority remains frozen for post-submission lifecycle -progression (`OrderStateEvent` / `DerivedFillEvent` path unchanged). +`VADN-07` - Runtime keeps post-submission snapshot handling local; Core receives +only canonical account-level execution feedback. --- @@ -162,9 +160,9 @@ decided at runtime boundary mapping under existing contracts. `VADN-21` - No canonical `FillEvent` ingress implementation. -`VADN-22` - No `OrderStateEvent` canonicalization. +`VADN-22` - No snapshot row canonicalization into Core event inputs. -`VADN-23` - No `DerivedFillEvent` behavior change. +`VADN-23` - No canonical feedback schema expansion in this slice. `VADN-24` - No snapshot lifecycle rewrite. diff --git a/requirements-dev.txt b/requirements-dev.txt index 0690c4f..5523d8a 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -331,7 +331,7 @@ tornado==6.5.4 # via bokeh tqdm==4.67.3 # via panel -tradingchassis-core @ git+https://github.com/TradingChassis/core.git@2c317546473a2f295d6b747cd42b897aadf2c1c7 +tradingchassis-core @ git+https://github.com/TradingChassis/core.git@b50834e41facc19bfc638cd5b053dddf90640c0c # via -r _git_deps.in typing-extensions==4.15.0 # via diff --git a/requirements.txt b/requirements.txt index 603a65a..ca07101 100644 --- a/requirements.txt +++ b/requirements.txt @@ -307,7 +307,7 @@ tornado==6.5.4 # via bokeh tqdm==4.67.3 # via panel -tradingchassis-core @ git+https://github.com/TradingChassis/core.git@2c317546473a2f295d6b747cd42b897aadf2c1c7 +tradingchassis-core @ git+https://github.com/TradingChassis/core.git@b50834e41facc19bfc638cd5b053dddf90640c0c # via -r _git_deps.in typing-extensions==4.15.0 # via diff --git a/tests/runtime/test_debug_strategy_state_api_compat.py b/tests/runtime/test_debug_strategy_state_api_compat.py new file mode 100644 index 0000000..c057322 --- /dev/null +++ b/tests/runtime/test_debug_strategy_state_api_compat.py @@ -0,0 +1,83 @@ +from __future__ import annotations + +from dataclasses import dataclass + +from tradingchassis_core.core.domain.types import ( + BookLevel, + BookPayload, + MarketEvent, + Price, + Quantity, + RiskConstraints, +) + +from core_runtime.strategies.debug_strategy import DebugStrategyV1 + + +@dataclass +class _StateStub: + busy: bool = False + + def has_working_order(self, instrument: str, client_order_id: str) -> bool: + _ = (instrument, client_order_id) + return self.busy + + def has_inflight(self, instrument: str, client_order_id: str) -> bool: + _ = (instrument, client_order_id) + return self.busy + + def has_queued_intent(self, instrument: str, client_order_id: str) -> bool: + _ = (instrument, client_order_id) + return self.busy + + +@dataclass(frozen=True) +class _EngineCfgStub: + tick_size: float = 0.1 + + +def test_debug_strategy_uses_clean_state_busy_checks_without_legacy_method() -> None: + strategy = DebugStrategyV1( + spread=5.0, + order_qty=0.1, + use_price_tick_levels=1, + post_only=True, + ) + state = _StateStub(busy=False) + event = MarketEvent( + ts_ns_exch=1, + ts_ns_local=1, + instrument="BTC_USDC-PERPETUAL", + event_type="book", + book=BookPayload( + book_type="snapshot", + bids=( + BookLevel( + price=Price(currency="UNKNOWN", value=100.0), + quantity=Quantity(value=1.0, unit="contracts"), + ), + ), + asks=( + BookLevel( + price=Price(currency="UNKNOWN", value=101.0), + quantity=Quantity(value=1.0, unit="contracts"), + ), + ), + depth=1, + ), + ) + constraints = RiskConstraints( + ts_ns_local=1, + scope="test", + trading_enabled=True, + ) + + intents = strategy.on_feed( + state=state, # type: ignore[arg-type] + event=event, + engine_cfg=_EngineCfgStub(), + constraints=constraints, + ) + + assert len(intents) == 2 + assert all(intent.intent_type == "new" for intent in intents) diff --git a/tests/runtime/test_event_stream_cursor.py b/tests/runtime/test_event_stream_cursor.py index b94b9b7..9a9a4aa 100644 --- a/tests/runtime/test_event_stream_cursor.py +++ b/tests/runtime/test_event_stream_cursor.py @@ -18,6 +18,30 @@ def test_attempt_position_does_not_advance_cursor() -> None: assert cursor.next_index == 0 +def test_attempt_positions_does_not_advance_cursor_and_returns_batch() -> None: + cursor = EventStreamCursor(start_index=5) + attempted = cursor.attempt_positions(3) + assert tuple(position.index for position in attempted) == (5, 6, 7) + assert cursor.next_index == 5 + + +def test_attempt_positions_rejects_negative_count() -> None: + cursor = EventStreamCursor() + with pytest.raises(ValueError, match="count must be >= 0"): + cursor.attempt_positions(-1) + assert cursor.next_index == 0 + + +def test_attempt_positions_commit_success_advances_in_batch_order() -> None: + cursor = EventStreamCursor() + attempted = cursor.attempt_positions(2) + + cursor.commit_success(attempted[0]) + cursor.commit_success(attempted[1]) + + assert cursor.next_index == 2 + + def test_commit_success_advances_by_one() -> None: cursor = EventStreamCursor() attempted = cursor.attempt_position() diff --git a/tests/runtime/test_hftbacktest_venue_adapter_recording.py b/tests/runtime/test_hftbacktest_venue_adapter_recording.py new file mode 100644 index 0000000..84deaa3 --- /dev/null +++ b/tests/runtime/test_hftbacktest_venue_adapter_recording.py @@ -0,0 +1,34 @@ +from __future__ import annotations + +from types import SimpleNamespace + +import pytest + +from core_runtime.backtest.adapters.venue import HftBacktestVenueAdapter + + +def test_record_swallows_expected_recorder_index_error() -> None: + calls: list[object] = [] + + def _record(hbt: object) -> None: + calls.append(hbt) + raise IndexError + + hbt = object() + recorder = SimpleNamespace(recorder=SimpleNamespace(record=_record)) + venue = HftBacktestVenueAdapter(hbt=hbt, asset_no=0) # type: ignore[arg-type] + + venue.record(recorder) + + assert calls == [hbt] + + +def test_record_propagates_unexpected_recorder_exceptions() -> None: + def _record(_hbt: object) -> None: + raise ValueError("unexpected recorder failure") + + recorder = SimpleNamespace(recorder=SimpleNamespace(record=_record)) + venue = HftBacktestVenueAdapter(hbt=object(), asset_no=0) # type: ignore[arg-type] + + with pytest.raises(ValueError, match="unexpected recorder failure"): + venue.record(recorder) diff --git a/tests/runtime/test_runtime_core_configuration_integration.py b/tests/runtime/test_runtime_core_configuration_integration.py index f9a4ee2..37cf2e7 100644 --- a/tests/runtime/test_runtime_core_configuration_integration.py +++ b/tests/runtime/test_runtime_core_configuration_integration.py @@ -54,7 +54,7 @@ def _from_file(*, file_location: str, profile_name: str) -> dict[str, object]: def test_local_loader_fails_early_when_core_missing(tmp_path: Path) -> None: - sample_path = _repo_root() / "core_runtime/local/local.json" + sample_path = _repo_root() / "core_runtime/local/bt_config_local.json" config = _load_sample_config(sample_path) config.pop("core", None) @@ -66,7 +66,7 @@ def test_local_loader_fails_early_when_core_missing(tmp_path: Path) -> None: def test_local_loader_succeeds_with_valid_core() -> None: - sample_path = _repo_root() / "core_runtime/local/local.json" + sample_path = _repo_root() / "core_runtime/local/bt_config_local.json" cfg = load_config(str(sample_path)) assert isinstance(cfg.core_cfg, CoreConfiguration) @@ -81,7 +81,7 @@ def test_argo_entrypoint_rejects_invalid_run_config_before_planning( from core_runtime.backtest.runtime.entrypoint import main as argo_entrypoint_main - sample_path = _repo_root() / "core_runtime/argo/argo.json" + sample_path = _repo_root() / "core_runtime/argo/bt_config_argo.json" config = _load_sample_config(sample_path) config.pop("core", None) diff --git a/tests/runtime/test_strategy_runner_canonical_market_adoption.py b/tests/runtime/test_strategy_runner_canonical_market_adoption.py index d3b315d..3a90785 100644 --- a/tests/runtime/test_strategy_runner_canonical_market_adoption.py +++ b/tests/runtime/test_strategy_runner_canonical_market_adoption.py @@ -1,99 +1,23 @@ from __future__ import annotations -from collections import deque +import inspect from types import SimpleNamespace from typing import Any import pytest from tradingchassis_core.core.domain.configuration import CoreConfiguration -from tradingchassis_core.core.domain.state import StrategyState +from tradingchassis_core.core.domain.step_result import CoreStepResult from tradingchassis_core.core.domain.types import ( - BookLevel, - BookPayload, - CancelOrderIntent, ControlTimeEvent, - FillEvent, - MarketEvent, NewOrderIntent, - OrderSubmittedEvent, Price, Quantity, - ReplaceOrderIntent, ) -from tradingchassis_core.core.events.event_bus import EventBus +from tradingchassis_core.core.execution_control.types import ControlSchedulingObligation from tradingchassis_core.core.risk.risk_config import RiskConfig -from tradingchassis_core.core.risk.risk_engine import GateDecision -from tradingchassis_core.strategies.base import Strategy import core_runtime.backtest.engine.strategy_runner as strategy_runner_module -from core_runtime.backtest.engine.event_stream_cursor import EventStreamCursor -from core_runtime.backtest.engine.hft_engine import HftEngineConfig -from core_runtime.backtest.engine.strategy_runner import ( - MAX_TIMEOUT_NS, - HftStrategyRunner, -) - - -class _NoopStrategy(Strategy): - def on_feed(self, state: Any, event: Any, engine_cfg: Any, constraints: Any) -> list[Any]: - _ = (state, event, engine_cfg, constraints) - return [] - - def on_order_update(self, state: Any, engine_cfg: Any, constraints: Any) -> list[Any]: - _ = (state, engine_cfg, constraints) - return [] - - def on_risk_decision(self, decision: Any) -> None: - _ = decision - - -class _NoopExecution: - def apply_intents(self, intents: list[Any]) -> list[tuple[Any, str]]: - _ = intents - return [] - - -class _RecorderWrapper: - recorder: Any - - def __init__(self) -> None: - self.recorder = SimpleNamespace(record=lambda _hbt: None) - - -class _StubVenue: - def __init__( - self, - *, - rc_sequence: list[int], - ts_sequence: list[int], - depth: object | None = None, - state_values: object | None = None, - orders: object | None = None, - ) -> None: - self._rc = list(rc_sequence) - self._ts = list(ts_sequence) - self._depth = depth - self._state_values = state_values - self._orders = orders - self._current_ts = 0 - self.wait_calls: list[tuple[int, bool]] = [] - - def wait_next(self, *, timeout_ns: int, include_order_resp: bool) -> int: - self.wait_calls.append((timeout_ns, include_order_resp)) - self._current_ts = self._ts.pop(0) - return self._rc.pop(0) - - def current_timestamp_ns(self) -> int: - return self._current_ts - - def read_market_snapshot(self) -> object: - return self._depth - - def read_orders_snapshot(self) -> tuple[object, object]: - return self._state_values, self._orders - - def record(self, recorder: Any) -> None: - recorder.recorder.record(self) +from core_runtime.backtest.engine.strategy_runner import HftStrategyRunner def _core_cfg() -> CoreConfiguration: @@ -113,977 +37,504 @@ def _core_cfg() -> CoreConfiguration: ) -def _engine_cfg() -> HftEngineConfig: - return HftEngineConfig( - initial_snapshot=None, - data_files=[], - instrument="BTC_USDC-PERPETUAL", - tick_size=0.1, - lot_size=0.01, - contract_size=1.0, - maker_fee_rate=0.0, - taker_fee_rate=0.0, - entry_latency_ns=0, - response_latency_ns=0, - use_risk_adverse_queue_model=False, - partial_fill_venue=False, - max_steps=1, - last_trades_capacity=1, - max_price_tick_levels=1, - roi_lb=0, - roi_ub=1, - stats_npz_path="/tmp/stats.npz", - event_bus_path="/tmp/events.jsonl", - ) - - def _risk_cfg() -> RiskConfig: return RiskConfig( - scope="test", - notional_limits={"currency": "USDC", "max_gross_notional": 1.0}, - ) - - -def _market_event(ts_ns: int) -> MarketEvent: - return MarketEvent( - ts_ns_exch=ts_ns, - ts_ns_local=ts_ns, - instrument="BTC_USDC-PERPETUAL", - event_type="book", - book=BookPayload( - book_type="snapshot", - bids=[ - BookLevel( - price=Price(currency="UNKNOWN", value=100.0), - quantity=Quantity(value=1.0, unit="contracts"), - ) - ], - asks=[ - BookLevel( - price=Price(currency="UNKNOWN", value=101.0), - quantity=Quantity(value=1.0, unit="contracts"), - ) - ], - depth=1, - ), + scope="runtime-test", + notional_limits={ + "currency": "USDC", + "max_gross_notional": 10_000.0, + "max_single_order_notional": 1_000.0, + }, + order_rate_limits={ + "max_orders_per_second": 10.0, + "max_cancels_per_second": 10.0, + }, ) -def _depth_snapshot() -> object: +def _engine_cfg() -> Any: return SimpleNamespace( - roi_lb_tick=100, + instrument="BTC_USDC-PERPETUAL", + max_price_tick_levels=2, + event_bus_path="/tmp/runtime-events.jsonl", tick_size=0.1, - best_ask_tick=101, - best_bid_tick=100, - ask_depth=[1.0, 0.0], - bid_depth=[1.0, 0.0], - best_bid=100.0, - best_ask=101.0, - best_bid_qty=1.0, - best_ask_qty=1.0, ) -def _new_intent(ts_ns_local: int = 2) -> NewOrderIntent: +def _new_intent(*, ts_ns_local: int, client_order_id: str) -> NewOrderIntent: return NewOrderIntent( ts_ns_local=ts_ns_local, instrument="BTC_USDC-PERPETUAL", - client_order_id="cid-new-1", - intents_correlation_id="corr-new-1", - side="buy", + client_order_id=client_order_id, + intent_type="new", order_type="limit", - intended_qty=Quantity(value=1.0, unit="contracts"), - intended_price=Price(currency="USDC", value=100.0), - time_in_force="GTC", - ) - - -def _replace_intent(ts_ns_local: int = 2) -> ReplaceOrderIntent: - return ReplaceOrderIntent( - ts_ns_local=ts_ns_local, - instrument="BTC_USDC-PERPETUAL", - client_order_id="cid-existing-1", - intents_correlation_id="corr-replace-1", side="buy", - order_type="limit", - intended_qty=Quantity(value=2.0, unit="contracts"), - intended_price=Price(currency="USDC", value=101.0), - ) - - -def _cancel_intent(ts_ns_local: int = 2) -> CancelOrderIntent: - return CancelOrderIntent( - ts_ns_local=ts_ns_local, - instrument="BTC_USDC-PERPETUAL", - client_order_id="cid-existing-1", - intents_correlation_id="corr-cancel-1", + intended_price=Price(currency="UNKNOWN", value=100.0), + intended_qty=Quantity(value=0.1, unit="contracts"), + time_in_force="GTC", ) -class _EmitIntentsStrategy(Strategy): - def __init__(self, intents: list[object]) -> None: - self._intents = intents +class _Strategy: + def __init__(self) -> None: + self.on_feed_calls = 0 + self.on_order_update_calls = 0 def on_feed(self, state: Any, event: Any, engine_cfg: Any, constraints: Any) -> list[Any]: _ = (state, event, engine_cfg, constraints) - return list(self._intents) + self.on_feed_calls += 1 + return [] def on_order_update(self, state: Any, engine_cfg: Any, constraints: Any) -> list[Any]: _ = (state, engine_cfg, constraints) + self.on_order_update_calls += 1 return [] - def on_risk_decision(self, decision: Any) -> None: - _ = decision - - -def _decision_for(accepted_now: list[Any]) -> GateDecision: - return GateDecision( - ts_ns_local=2, - accepted_now=accepted_now, - queued=[], - rejected=[], - replaced_in_queue=[], - dropped_in_queue=[], - handled_in_queue=[], - execution_rejected=[], - next_send_ts_ns_local=None, - ) - - -def test_process_market_event_routes_through_event_entry_with_core_configuration( - monkeypatch: pytest.MonkeyPatch, -) -> None: - runner = object.__new__(HftStrategyRunner) - runner.strategy_state = object() - runner._core_cfg = _core_cfg() - runner._event_stream_cursor = EventStreamCursor() - - captured: list[tuple[int, object]] = [] - - def _spy_process_event_entry(state: object, entry: object, *, configuration: object) -> None: - _ = state - captured.append((entry.position.index, configuration)) - - monkeypatch.setattr( - strategy_runner_module, - "process_event_entry", - _spy_process_event_entry, - ) - - runner._process_canonical_market_event(_market_event(1)) - runner._process_canonical_market_event(_market_event(2)) - - assert [idx for idx, _ in captured] == [0, 1] - assert captured[0][1] is runner._core_cfg - assert captured[1][1] is runner._core_cfg - assert runner._event_stream_cursor.next_index == 2 +class _Execution: + def __init__(self) -> None: + self.applied: list[list[Any]] = [] -def test_first_canonical_event_uses_processing_position_zero( - monkeypatch: pytest.MonkeyPatch, -) -> None: - runner = object.__new__(HftStrategyRunner) - runner.strategy_state = object() - runner._core_cfg = _core_cfg() - runner._event_stream_cursor = EventStreamCursor() - - captured: list[int] = [] - - def _spy_process_event_entry(state: object, entry: object, *, configuration: object) -> None: - _ = state - assert configuration is runner._core_cfg - captured.append(entry.position.index) - - monkeypatch.setattr( - strategy_runner_module, - "process_event_entry", - _spy_process_event_entry, - ) - - runner._process_canonical_market_event(_market_event(1)) - - assert captured == [0] - assert runner._event_stream_cursor.next_index == 1 - - -def test_market_branch_calls_canonical_boundary_not_update_market( - monkeypatch: pytest.MonkeyPatch, -) -> None: - runner = HftStrategyRunner( - engine_cfg=_engine_cfg(), - strategy=_NoopStrategy(), - risk_cfg=_risk_cfg(), - core_cfg=_core_cfg(), - ) - monkeypatch.setattr( - runner.strategy_state, - "update_market", - lambda *args, **kwargs: (_ for _ in ()).throw(AssertionError("update_market must not be called")), - ) - monkeypatch.setattr( - runner.strategy_state, - "apply_fill_event", - lambda *args, **kwargs: (_ for _ in ()).throw(AssertionError("apply_fill_event must not be called")), - ) - - captured: list[tuple[int, object]] = [] - - def _spy_process_event_entry(state: object, entry: object, *, configuration: object) -> None: - _ = state - captured.append((entry.position.index, configuration)) + def apply_intents(self, intents: list[Any]) -> list[tuple[Any, str]]: + self.applied.append(list(intents)) + return [] - monkeypatch.setattr( - strategy_runner_module, - "process_event_entry", - _spy_process_event_entry, - ) - venue = _StubVenue( - rc_sequence=[0, 2, 1], - ts_sequence=[1, 2, 3], - depth=_depth_snapshot(), - ) - runner.run(venue=venue, execution=_NoopExecution(), recorder=_RecorderWrapper()) +class _Recorder: + def __init__(self) -> None: + self.recorder = SimpleNamespace(record=lambda _hbt: None) - assert captured == [(0, runner._core_cfg)] +class _Venue: + def __init__(self, *, rc_sequence: list[int], ts_sequence: list[int]) -> None: + self._rc = list(rc_sequence) + self._ts = list(ts_sequence) + self._now = 0 -def test_wait_next_bootstrap_uses_include_order_resp_false_then_true_in_loop() -> None: - runner = HftStrategyRunner( - engine_cfg=_engine_cfg(), - strategy=_NoopStrategy(), - risk_cfg=_risk_cfg(), - core_cfg=_core_cfg(), - ) + def wait_next(self, *, timeout_ns: int, include_order_resp: bool) -> int: + _ = (timeout_ns, include_order_resp) + self._now = self._ts.pop(0) + return self._rc.pop(0) - venue = _StubVenue( - rc_sequence=[0, 2, 1], - ts_sequence=[1, 2, 3], - depth=_depth_snapshot(), - ) - runner.run(venue=venue, execution=_NoopExecution(), recorder=_RecorderWrapper()) + def current_timestamp_ns(self) -> int: + return self._now + + def read_market_snapshot(self) -> Any: + return SimpleNamespace( + roi_lb_tick=1000, + tick_size=0.1, + best_bid_tick=1005, + best_ask_tick=1006, + bid_depth=[1.0, 0.9, 0.8], + ask_depth=[1.1, 1.2, 1.3], + ) + + def read_orders_snapshot(self) -> tuple[Any, Any]: + return ( + SimpleNamespace( + position=1.0, + balance=1000.0, + fee=1.5, + trading_volume=100.0, + trading_value=5000.0, + num_trades=3, + ), + SimpleNamespace(values=lambda: []), + ) - assert len(venue.wait_calls) >= 2 - first_timeout_ns, first_include_order_resp = venue.wait_calls[0] - assert first_timeout_ns == MAX_TIMEOUT_NS - assert first_include_order_resp is False - assert all(include_order_resp is True for _, include_order_resp in venue.wait_calls[1:]) + def record(self, recorder: Any) -> None: + recorder.recorder.record(self) -def test_market_mapping_from_depth_snapshot_is_deterministic_golden( - monkeypatch: pytest.MonkeyPatch, -) -> None: - runner = HftStrategyRunner( +def _runner(**kwargs: Any) -> HftStrategyRunner: + return HftStrategyRunner( engine_cfg=_engine_cfg(), - strategy=_NoopStrategy(), + strategy=_Strategy(), risk_cfg=_risk_cfg(), core_cfg=_core_cfg(), + **kwargs, ) - captured_market_events: list[MarketEvent] = [] - - def _spy_process_event_entry(state: object, entry: object, *, configuration: object) -> None: - _ = (state, configuration) - if isinstance(entry.event, MarketEvent): - captured_market_events.append(entry.event) - - monkeypatch.setattr( - strategy_runner_module, - "process_event_entry", - _spy_process_event_entry, - ) - - venue = _StubVenue( - rc_sequence=[0, 2, 1], - ts_sequence=[1, 2_000_000_000, 2_000_000_001], - depth=_depth_snapshot(), - ) - runner.run(venue=venue, execution=_NoopExecution(), recorder=_RecorderWrapper()) - - assert len(captured_market_events) == 1 - market_event = captured_market_events[0] - assert market_event.instrument == "BTC_USDC-PERPETUAL" - assert market_event.ts_ns_local == 2_000_000_000 - assert market_event.ts_ns_exch == 2_000_000_000 - assert market_event.book is not None - assert market_event.book.bids[0].price.value == 10.0 - assert market_event.book.asks[0].price.value == 10.100000000000001 - assert market_event.book.bids[0].quantity.value == 1.0 - assert market_event.book.asks[0].quantity.value == 0.0 +def test_market_path_dispatches_only_from_core_step_result(monkeypatch: Any) -> None: + runner = _runner() + execution = _Execution() + venue = _Venue(rc_sequence=[0, 2, 1], ts_sequence=[1, 2, 3]) + recorder = _Recorder() -def test_missing_core_cfg_fails_before_market_mutation() -> None: - runner = object.__new__(HftStrategyRunner) - runner.strategy_state = StrategyState(event_bus=EventBus(sinks=[])) - runner._core_cfg = None - runner._event_stream_cursor = EventStreamCursor() + captured_kwargs: list[dict[str, Any]] = [] - with pytest.raises(ValueError, match="CoreConfiguration is required"): - runner._process_canonical_market_event(_market_event(42)) + def _stub_run_core_step(state: Any, entry: Any, **kwargs: Any) -> CoreStepResult: + _ = (state, entry) + captured_kwargs.append(kwargs) + return CoreStepResult( + generated_intents=(_new_intent(ts_ns_local=2, client_order_id="generated"),), + dispatchable_intents=(_new_intent(ts_ns_local=2, client_order_id="dispatch"),), + ) - assert runner.strategy_state.market == {} - assert runner.strategy_state._last_processing_position_index is None - assert runner._event_stream_cursor.next_index == 0 + monkeypatch.setattr(strategy_runner_module, "run_core_step", _stub_run_core_step) + def _fail_decide_intents(**_: Any) -> Any: + raise AssertionError("runtime must not call risk.decide_intents") -def test_invalid_core_cfg_type_fails_before_market_mutation() -> None: - runner = object.__new__(HftStrategyRunner) - runner.strategy_state = StrategyState(event_bus=EventBus(sinks=[])) - runner._core_cfg = object() - runner._event_stream_cursor = EventStreamCursor() + monkeypatch.setattr(runner.risk, "decide_intents", _fail_decide_intents, raising=False) - with pytest.raises(TypeError, match="configuration must be CoreConfiguration or None"): - runner._process_canonical_market_event(_market_event(42)) + runner.run(venue, execution, recorder) - assert runner.strategy_state.market == {} - assert runner.strategy_state._last_processing_position_index is None - assert runner._event_stream_cursor.next_index == 0 + assert len(execution.applied) == 1 + assert [it.client_order_id for it in execution.applied[0]] == ["dispatch"] + assert "policy_admission_context" in captured_kwargs[0] + assert "execution_control_apply_context" in captured_kwargs[0] -def test_order_snapshot_branch_keeps_compatibility_path( - monkeypatch: pytest.MonkeyPatch, -) -> None: - runner = HftStrategyRunner( - engine_cfg=_engine_cfg(), - strategy=_NoopStrategy(), - risk_cfg=_risk_cfg(), - core_cfg=_core_cfg(), +def test_control_time_path_uses_core_step_dispatchable_intents(monkeypatch: Any) -> None: + runner = _runner() + execution = _Execution() + venue = _Venue(rc_sequence=[0, 0, 1], ts_sequence=[1, 2, 3]) + recorder = _Recorder() + runner._pending_control_scheduling_obligation = ControlSchedulingObligation( + due_ts_ns_local=2, + reason="rate_limit", + scope_key="instrument:BTC_USDC-PERPETUAL", + source="test", ) - monkeypatch.setattr( - runner.strategy_state, - "apply_fill_event", - lambda *args, **kwargs: (_ for _ in ()).throw(AssertionError("apply_fill_event must not be called")), - ) - - calls = {"update_account": 0, "ingest_order_snapshots": 0} + runner._next_send_ts_ns_local = 2 - def _spy_update_account(*args: object, **kwargs: object) -> None: - _ = (args, kwargs) - calls["update_account"] += 1 + captured: list[tuple[Any, dict[str, Any]]] = [] - def _spy_ingest_order_snapshots(*args: object, **kwargs: object) -> None: - _ = (args, kwargs) - calls["ingest_order_snapshots"] += 1 - - monkeypatch.setattr(runner.strategy_state, "update_account", _spy_update_account) - monkeypatch.setattr( - runner.strategy_state, - "ingest_order_snapshots", - _spy_ingest_order_snapshots, - ) - - venue = _StubVenue( - rc_sequence=[0, 3, 1], - ts_sequence=[1, 2, 3], - state_values=SimpleNamespace( - position=0.0, - balance=1000.0, - fee=0.0, - trading_volume=0.0, - trading_value=0.0, - num_trades=0, - ), - orders={}, - ) - - runner.run(venue=venue, execution=_NoopExecution(), recorder=_RecorderWrapper()) - - assert calls["update_account"] == 1 - assert calls["ingest_order_snapshots"] == 1 - assert runner._event_stream_cursor.next_index == 0 - - -def test_snapshot_only_rc3_does_not_consume_canonical_cursor_position( - monkeypatch: pytest.MonkeyPatch, -) -> None: - runner = HftStrategyRunner( - engine_cfg=_engine_cfg(), - strategy=_NoopStrategy(), - risk_cfg=_risk_cfg(), - core_cfg=_core_cfg(), - ) - - calls = {"update_account": 0, "ingest_order_snapshots": 0, "canonical": 0} - - def _spy_update_account(*args: object, **kwargs: object) -> None: - _ = (args, kwargs) - calls["update_account"] += 1 - - def _spy_ingest_order_snapshots(*args: object, **kwargs: object) -> None: - _ = (args, kwargs) - calls["ingest_order_snapshots"] += 1 - - def _spy_process_event_entry(*args: object, **kwargs: object) -> None: - _ = (args, kwargs) - calls["canonical"] += 1 - - monkeypatch.setattr(runner.strategy_state, "update_account", _spy_update_account) - monkeypatch.setattr( - runner.strategy_state, - "ingest_order_snapshots", - _spy_ingest_order_snapshots, - ) - monkeypatch.setattr( - strategy_runner_module, - "process_event_entry", - _spy_process_event_entry, - ) - - venue = _StubVenue( - rc_sequence=[0, 3, 1], - ts_sequence=[1, 2, 3], - state_values=SimpleNamespace( - position=0.0, - balance=1000.0, - fee=0.0, - trading_volume=0.0, - trading_value=0.0, - num_trades=0, - ), - orders={}, - ) + def _stub_run_core_step(state: Any, entry: Any, **kwargs: Any) -> CoreStepResult: + _ = state + captured.append((entry.event, kwargs)) + if isinstance(entry.event, ControlTimeEvent): + return CoreStepResult( + dispatchable_intents=(_new_intent(ts_ns_local=2, client_order_id="ctl"),) + ) + return CoreStepResult() - runner.run(venue=venue, execution=_NoopExecution(), recorder=_RecorderWrapper()) + monkeypatch.setattr(strategy_runner_module, "run_core_step", _stub_run_core_step) + runner.run(venue, execution, recorder) - assert calls == { - "update_account": 1, - "ingest_order_snapshots": 1, - "canonical": 0, - } - assert runner._event_stream_cursor.next_index == 0 + assert [it.client_order_id for it in execution.applied[0]] == ["ctl"] + assert runner._pending_control_scheduling_obligation is None + control_events = [event for event, _ in captured if isinstance(event, ControlTimeEvent)] + assert len(control_events) == 1 + control_kwargs = [kwargs for event, kwargs in captured if isinstance(event, ControlTimeEvent)][0] + assert "control_time_queue_context" not in control_kwargs -def test_rc2_rc3_paths_never_emit_fill_event_through_process_event_entry( - monkeypatch: pytest.MonkeyPatch, -) -> None: - runner = HftStrategyRunner( - engine_cfg=_engine_cfg(), - strategy=_NoopStrategy(), - risk_cfg=_risk_cfg(), - core_cfg=_core_cfg(), +def test_mixed_wakeup_path_uses_new_wakeup_evaluator_api(monkeypatch: Any) -> None: + runner = _runner(enable_core_step_wakeup_collapse=True) + execution = _Execution() + venue = _Venue(rc_sequence=[0, 2, 1], ts_sequence=[1, 2, 3]) + recorder = _Recorder() + runner._pending_control_scheduling_obligation = ControlSchedulingObligation( + due_ts_ns_local=2, + reason="rate_limit", + scope_key="instrument:BTC_USDC-PERPETUAL", + source="test", ) + runner._next_send_ts_ns_local = 2 - calls = {"update_account": 0, "ingest_order_snapshots": 0} - emitted_fill_events = 0 - - def _spy_update_account(*args: object, **kwargs: object) -> None: - _ = (args, kwargs) - calls["update_account"] += 1 - - def _spy_ingest_order_snapshots(*args: object, **kwargs: object) -> None: - _ = (args, kwargs) - calls["ingest_order_snapshots"] += 1 - - def _spy_process_event_entry(state: object, entry: object, *, configuration: object) -> None: - nonlocal emitted_fill_events - _ = (state, configuration) - if isinstance(entry.event, FillEvent): - emitted_fill_events += 1 + observed: dict[str, Any] = {} - monkeypatch.setattr(runner.strategy_state, "update_account", _spy_update_account) - monkeypatch.setattr( - runner.strategy_state, - "ingest_order_snapshots", - _spy_ingest_order_snapshots, - ) - monkeypatch.setattr( - strategy_runner_module, - "process_event_entry", - _spy_process_event_entry, - ) + def _stub_run_core_wakeup_step(state: Any, entries: Any, **kwargs: Any) -> CoreStepResult: + observed["entry_count"] = len(entries) + observed["kwargs"] = kwargs + evaluator = kwargs["wakeup_strategy_evaluator"] + if evaluator is not None: + evaluator.evaluate(SimpleNamespace(state=state, entries=entries)) + return CoreStepResult() - venue = _StubVenue( - rc_sequence=[0, 2, 3, 1], - ts_sequence=[1, 2, 3, 4], - depth=_depth_snapshot(), - state_values=SimpleNamespace( - position=0.0, - balance=1000.0, - fee=0.0, - trading_volume=0.0, - trading_value=0.0, - num_trades=0, - ), - orders={}, - ) - runner.run(venue=venue, execution=_NoopExecution(), recorder=_RecorderWrapper()) + monkeypatch.setattr(strategy_runner_module, "run_core_wakeup_step", _stub_run_core_wakeup_step) + runner.run(venue, execution, recorder) - assert emitted_fill_events == 0 - assert calls["update_account"] == 1 - assert calls["ingest_order_snapshots"] == 1 + assert observed["entry_count"] == 2 # market + injected control-time + assert "wakeup_strategy_evaluator" in observed["kwargs"] + assert "strategy_event_filter" not in observed["kwargs"] + assert observed["kwargs"]["queued_instrument"] == "BTC_USDC-PERPETUAL" + assert runner.strategy.on_feed_calls == 1 -def test_successful_new_dispatch_processes_order_submitted_before_mark_sent( - monkeypatch: pytest.MonkeyPatch, -) -> None: - new_intent = _new_intent() - runner = HftStrategyRunner( - engine_cfg=_engine_cfg(), - strategy=_EmitIntentsStrategy([new_intent]), - risk_cfg=_risk_cfg(), - core_cfg=_core_cfg(), - ) - monkeypatch.setattr(runner.strategy_state, "apply_fill_event", lambda *args, **kwargs: None) - monkeypatch.setattr( - runner.risk, - "decide_intents", - lambda **_: _decision_for([new_intent]), - ) +def test_rc3_feedback_event_uses_account_level_shape(monkeypatch: Any) -> None: + runner = _runner() + execution = _Execution() + venue = _Venue(rc_sequence=[0, 3, 1], ts_sequence=[1, 2, 3]) + recorder = _Recorder() - ordering: list[str] = [] - submitted_events: list[OrderSubmittedEvent] = [] - marks: list[tuple[str, str, str]] = [] + seen_event_payloads: list[dict[str, Any]] = [] - def _spy_process_event_entry(state: object, entry: object, *, configuration: object) -> None: - _ = (state, configuration) - if isinstance(entry.event, OrderSubmittedEvent): - ordering.append("submitted") - submitted_events.append(entry.event) + def _stub_run_core_step(state: Any, entry: Any, **kwargs: Any) -> CoreStepResult: + _ = (state, kwargs) + if hasattr(entry.event, "ts_ns_local_feedback"): + seen_event_payloads.append(entry.event.model_dump()) + return CoreStepResult() - def _spy_mark_intent_sent(instrument: str, client_order_id: str, intent_type: str) -> None: - ordering.append("mark") - marks.append((instrument, client_order_id, intent_type)) + monkeypatch.setattr(strategy_runner_module, "run_core_step", _stub_run_core_step) + runner.run(venue, execution, recorder) - monkeypatch.setattr(strategy_runner_module, "process_event_entry", _spy_process_event_entry) - monkeypatch.setattr(runner.strategy_state, "mark_intent_sent", _spy_mark_intent_sent) + assert len(seen_event_payloads) == 1 + assert "order_snapshots" not in seen_event_payloads[0] - venue = _StubVenue( - rc_sequence=[0, 2, 1], - ts_sequence=[1_111, 5_000_000_000, 5_000_000_001], - depth=_depth_snapshot(), - ) - runner.run(venue=venue, execution=_NoopExecution(), recorder=_RecorderWrapper()) - - assert len(submitted_events) == 1 - event = submitted_events[0] - assert event.instrument == new_intent.instrument - assert event.client_order_id == new_intent.client_order_id - assert event.side == new_intent.side - assert event.order_type == new_intent.order_type - assert event.intended_price == new_intent.intended_price - assert event.intended_qty == new_intent.intended_qty - assert event.time_in_force == new_intent.time_in_force - assert event.intent_correlation_id == new_intent.intents_correlation_id - assert event.dispatch_attempt_id is None - assert event.runtime_correlation is None - assert event.ts_ns_local_dispatch == 5_000_000_000 - assert ordering == ["submitted", "mark"] - assert marks == [(new_intent.instrument, new_intent.client_order_id, "new")] - - -def test_failed_new_dispatch_processes_no_order_submitted_event( - monkeypatch: pytest.MonkeyPatch, -) -> None: - new_intent = _new_intent() - runner = HftStrategyRunner( - engine_cfg=_engine_cfg(), - strategy=_EmitIntentsStrategy([new_intent]), - risk_cfg=_risk_cfg(), - core_cfg=_core_cfg(), - ) - monkeypatch.setattr( - runner.risk, - "decide_intents", - lambda **_: _decision_for([new_intent]), - ) - submitted_event_count = 0 - marked_count = 0 - captured_decisions: list[GateDecision] = [] +def test_order_submitted_event_emitted_before_mark_intent_sent(monkeypatch: Any) -> None: + runner = _runner() + execution = _Execution() + call_order: list[str] = [] - def _spy_process_event_entry(state: object, entry: object, *, configuration: object) -> None: - nonlocal submitted_event_count - _ = (state, configuration) - if isinstance(entry.event, OrderSubmittedEvent): - submitted_event_count += 1 + def _spy_process_submitted(intent: Any, *, ts_ns_local_dispatch: int) -> None: + _ = (intent, ts_ns_local_dispatch) + call_order.append("submitted") - def _spy_mark_intent_sent(instrument: str, client_order_id: str, intent_type: str) -> None: - nonlocal marked_count + def _spy_mark_sent(instrument: str, client_order_id: str, intent_type: str) -> None: _ = (instrument, client_order_id, intent_type) - marked_count += 1 - - monkeypatch.setattr(strategy_runner_module, "process_event_entry", _spy_process_event_entry) - monkeypatch.setattr(runner.strategy_state, "mark_intent_sent", _spy_mark_intent_sent) - monkeypatch.setattr( - runner.strategy, - "on_risk_decision", - lambda decision: captured_decisions.append(decision), - ) - - class _ExecutionFailNew: - def apply_intents(self, intents: list[Any]) -> list[tuple[Any, str]]: - _ = intents - return [(new_intent, "EXCHANGE_REJECT")] - - venue = _StubVenue( - rc_sequence=[0, 2, 1], - ts_sequence=[10, 20, 30], - depth=_depth_snapshot(), - ) - runner.run( - venue=venue, - execution=_ExecutionFailNew(), - recorder=_RecorderWrapper(), - ) - - assert submitted_event_count == 0 - assert marked_count == 0 - assert len(captured_decisions) == 1 - assert len(captured_decisions[0].execution_rejected) == 1 - assert captured_decisions[0].execution_rejected[0].intent.client_order_id == new_intent.client_order_id - + call_order.append("mark_sent") -def test_successful_replace_cancel_dispatch_processes_no_order_submitted_event( - monkeypatch: pytest.MonkeyPatch, -) -> None: - replace_intent = _replace_intent() - cancel_intent = _cancel_intent() - accepted_now = [replace_intent, cancel_intent] + monkeypatch.setattr(runner, "_process_canonical_order_submitted_event", _spy_process_submitted) + monkeypatch.setattr(runner.strategy_state, "mark_intent_sent", _spy_mark_sent) - runner = HftStrategyRunner( - engine_cfg=_engine_cfg(), - strategy=_EmitIntentsStrategy(accepted_now), - risk_cfg=_risk_cfg(), - core_cfg=_core_cfg(), - ) - monkeypatch.setattr( - runner.risk, - "decide_intents", - lambda **_: _decision_for(accepted_now), + runner._dispatch_accepted_intents( + [_new_intent(ts_ns_local=2, client_order_id="new-1")], + execution, + sim_now_ns=2, ) - submitted_event_count = 0 - marks: list[tuple[str, str, str]] = [] + assert call_order == ["submitted"] - def _spy_process_event_entry(state: object, entry: object, *, configuration: object) -> None: - nonlocal submitted_event_count - _ = (state, configuration) - if isinstance(entry.event, OrderSubmittedEvent): - submitted_event_count += 1 - def _spy_mark_intent_sent(instrument: str, client_order_id: str, intent_type: str) -> None: - marks.append((instrument, client_order_id, intent_type)) +def test_successful_new_dispatch_does_not_leave_stale_inflight() -> None: + runner = _runner() + execution = _Execution() - monkeypatch.setattr(strategy_runner_module, "process_event_entry", _spy_process_event_entry) - monkeypatch.setattr(runner.strategy_state, "mark_intent_sent", _spy_mark_intent_sent) - - venue = _StubVenue( - rc_sequence=[0, 2, 1], - ts_sequence=[100, 200, 300], - depth=_depth_snapshot(), - ) - runner.run(venue=venue, execution=_NoopExecution(), recorder=_RecorderWrapper()) - - assert submitted_event_count == 0 - assert marks == [ - ( - replace_intent.instrument, - replace_intent.client_order_id, - "replace", - ), - ( - cancel_intent.instrument, - cancel_intent.client_order_id, - "cancel", - ), - ] - - -def test_global_canonical_counter_shared_between_market_and_order_submitted( - monkeypatch: pytest.MonkeyPatch, -) -> None: - new_intent = _new_intent() - runner = HftStrategyRunner( - engine_cfg=_engine_cfg(), - strategy=_EmitIntentsStrategy([new_intent]), - risk_cfg=_risk_cfg(), - core_cfg=_core_cfg(), - ) - monkeypatch.setattr( - runner.risk, - "decide_intents", - lambda **_: _decision_for([new_intent]), + runner._dispatch_accepted_intents( + [_new_intent(ts_ns_local=2, client_order_id="new-1")], + execution, + sim_now_ns=2, ) - positions: list[tuple[int, str]] = [] + assert runner.strategy_state.has_inflight("BTC_USDC-PERPETUAL", "new-1") is False - def _spy_process_event_entry(state: object, entry: object, *, configuration: object) -> None: - _ = (state, configuration) - event_name = type(entry.event).__name__ - positions.append((entry.position.index, event_name)) - monkeypatch.setattr(strategy_runner_module, "process_event_entry", _spy_process_event_entry) - - venue = _StubVenue( - rc_sequence=[0, 2, 1], - ts_sequence=[7, 9_999_999_999, 10_000_000_000], - depth=_depth_snapshot(), - ) - runner.run(venue=venue, execution=_NoopExecution(), recorder=_RecorderWrapper()) +def test_runner_source_has_no_removed_compat_api_usage() -> None: + source = inspect.getsource(strategy_runner_module) + assert "decide_intents" not in source + assert "compat_gate_decision" not in source + assert "ControlTimeQueueReevaluationContext" not in source + assert "strategy_event_filter" not in source - assert positions == [ - (0, "MarketEvent"), - (1, "OrderSubmittedEvent"), - ] - assert runner._event_stream_cursor.next_index == 2 +def test_stale_control_obligation_is_cleared_to_avoid_zero_timeout_spin() -> None: + runner = _runner() + execution = _Execution() + recorder = _Recorder() -def test_canonical_counter_increments_only_after_successful_canonical_processing( - monkeypatch: pytest.MonkeyPatch, -) -> None: - runner = object.__new__(HftStrategyRunner) - runner.strategy_state = object() - runner._core_cfg = _core_cfg() - runner._event_stream_cursor = EventStreamCursor() + class _TimeoutSensitiveVenue: + def __init__(self) -> None: + self._now = 0 + self._call_count = 0 - def _fail(*args: object, **kwargs: object) -> None: - _ = (args, kwargs) - raise RuntimeError("boom") + def wait_next(self, *, timeout_ns: int, include_order_resp: bool) -> int: + _ = include_order_resp + self._call_count += 1 + if self._call_count == 1: + self._now = 1 + return 0 + if timeout_ns == 0: + if self._call_count > 20: + raise AssertionError("runner is spinning on zero-timeout wait_next") + return 0 + self._now = 2 + return 1 - monkeypatch.setattr(strategy_runner_module, "process_event_entry", _fail) - with pytest.raises(RuntimeError, match="boom"): - runner._process_canonical_market_event(_market_event(1)) - assert runner._event_stream_cursor.next_index == 0 + def current_timestamp_ns(self) -> int: + return self._now - called = {"count": 0} + def read_market_snapshot(self) -> Any: + raise AssertionError("market snapshot should not be needed") - def _ok(*args: object, **kwargs: object) -> None: - _ = (args, kwargs) - called["count"] += 1 + def read_orders_snapshot(self) -> tuple[Any, Any]: + raise AssertionError("orders snapshot should not be needed") - monkeypatch.setattr(strategy_runner_module, "process_event_entry", _ok) - runner._process_canonical_market_event(_market_event(2)) - assert called["count"] == 1 - assert runner._event_stream_cursor.next_index == 1 + def record(self, recorder: Any) -> None: + recorder.recorder.record(self) - -def test_control_time_event_injected_when_scheduled_deadline_is_realized( - monkeypatch: pytest.MonkeyPatch, -) -> None: - runner = HftStrategyRunner( - engine_cfg=_engine_cfg(), - strategy=_NoopStrategy(), - risk_cfg=_risk_cfg(), - core_cfg=_core_cfg(), + venue = _TimeoutSensitiveVenue() + runner._pending_control_scheduling_obligation = ControlSchedulingObligation( + due_ts_ns_local=1, + reason="rate_limit", + scope_key="instrument:BTC_USDC-PERPETUAL", + source="test", ) - runner._next_send_ts_ns_local = 5 + runner._next_send_ts_ns_local = 1 + runner._last_injected_control_deadline_ns = 1 - control_events: list[ControlTimeEvent] = [] + runner.run(venue, execution, recorder) - def _spy_process_event_entry(state: object, entry: object, *, configuration: object) -> None: - _ = (state, configuration) - if isinstance(entry.event, ControlTimeEvent): - control_events.append(entry.event) + assert runner._pending_control_scheduling_obligation is None + assert runner._next_send_ts_ns_local is None - monkeypatch.setattr(strategy_runner_module, "process_event_entry", _spy_process_event_entry) - venue = _StubVenue( - rc_sequence=[0, 0, 1], - ts_sequence=[1, 10, 11], - ) - runner.run(venue=venue, execution=_NoopExecution(), recorder=_RecorderWrapper()) - assert len(control_events) == 1 - event = control_events[0] - assert event.ts_ns_local_control == 10 - assert event.reason == "scheduled_control_recheck" - assert event.due_ts_ns_local == 5 - assert event.realized_ts_ns_local == 10 - assert event.obligation_reason == "rate_limit" - assert event.obligation_due_ts_ns_local == 5 - assert event.runtime_correlation is None - - -def test_no_control_time_event_when_no_deadline_scheduled( - monkeypatch: pytest.MonkeyPatch, -) -> None: - runner = HftStrategyRunner( - engine_cfg=_engine_cfg(), - strategy=_NoopStrategy(), - risk_cfg=_risk_cfg(), - core_cfg=_core_cfg(), +def test_debug_max_iterations_guard_raises_with_loop_state(monkeypatch: Any) -> None: + runner = _runner() + execution = _Execution() + recorder = _Recorder() + runner._pending_control_scheduling_obligation = ControlSchedulingObligation( + due_ts_ns_local=1_000_000_000_000, + reason="rate_limit", + scope_key="instrument:BTC_USDC-PERPETUAL", + source="test", ) - control_count = 0 + runner._next_send_ts_ns_local = 1_000_000_000_000 - def _spy_process_event_entry(state: object, entry: object, *, configuration: object) -> None: - nonlocal control_count - _ = (state, configuration) - if isinstance(entry.event, ControlTimeEvent): - control_count += 1 + class _NeverEndingVenue: + def __init__(self) -> None: + self._now = 1 - monkeypatch.setattr(strategy_runner_module, "process_event_entry", _spy_process_event_entry) - venue = _StubVenue( - rc_sequence=[0, 2, 1], - ts_sequence=[1, 2, 3], - depth=_depth_snapshot(), - ) - runner.run(venue=venue, execution=_NoopExecution(), recorder=_RecorderWrapper()) + def wait_next(self, *, timeout_ns: int, include_order_resp: bool) -> int: + _ = (timeout_ns, include_order_resp) + return 0 - assert control_count == 0 + def current_timestamp_ns(self) -> int: + return self._now + def read_market_snapshot(self) -> Any: + raise AssertionError("market snapshot should not be needed") -def test_no_control_time_event_when_deadline_not_yet_realized( - monkeypatch: pytest.MonkeyPatch, -) -> None: - runner = HftStrategyRunner( - engine_cfg=_engine_cfg(), - strategy=_NoopStrategy(), - risk_cfg=_risk_cfg(), - core_cfg=_core_cfg(), - ) - runner._next_send_ts_ns_local = 50 - control_count = 0 + def read_orders_snapshot(self) -> tuple[Any, Any]: + raise AssertionError("orders snapshot should not be needed") - def _spy_process_event_entry(state: object, entry: object, *, configuration: object) -> None: - nonlocal control_count - _ = (state, configuration) - if isinstance(entry.event, ControlTimeEvent): - control_count += 1 + def record(self, recorder: Any) -> None: + recorder.recorder.record(self) - monkeypatch.setattr(strategy_runner_module, "process_event_entry", _spy_process_event_entry) - venue = _StubVenue( - rc_sequence=[0, 0, 1], - ts_sequence=[1, 10, 20], - ) - runner.run(venue=venue, execution=_NoopExecution(), recorder=_RecorderWrapper()) - - assert control_count == 0 - - -def test_control_time_deadline_injection_is_not_periodic_for_same_deadline( - monkeypatch: pytest.MonkeyPatch, -) -> None: - runner = HftStrategyRunner( - engine_cfg=_engine_cfg(), - strategy=_NoopStrategy(), - risk_cfg=_risk_cfg(), - core_cfg=_core_cfg(), - ) - runner._next_send_ts_ns_local = 5 - control_count = 0 + monkeypatch.setenv("TRADINGCHASSIS_DEBUG_MAX_ITERATIONS", "5") - def _spy_process_event_entry(state: object, entry: object, *, configuration: object) -> None: - nonlocal control_count - _ = (state, configuration) - if isinstance(entry.event, ControlTimeEvent): - control_count += 1 + with pytest.raises(RuntimeError, match="TRADINGCHASSIS_DEBUG_MAX_ITERATIONS exceeded"): + runner.run(_NeverEndingVenue(), execution, recorder) - monkeypatch.setattr(strategy_runner_module, "process_event_entry", _spy_process_event_entry) - venue = _StubVenue( - rc_sequence=[0, 0, 0, 1], - ts_sequence=[1, 10, 10, 11], - ) - runner.run(venue=venue, execution=_NoopExecution(), recorder=_RecorderWrapper()) - assert control_count == 1 +def test_runner_terminates_on_no_work_no_progress_loop() -> None: + runner = _runner() + execution = _Execution() + recorder = _Recorder() + + class _NoProgressVenue: + def __init__(self) -> None: + self._now = 100 + self.wait_call_count = 0 + self.record_call_count = 0 + + def wait_next(self, *, timeout_ns: int, include_order_resp: bool) -> int: + _ = (timeout_ns, include_order_resp) + self.wait_call_count += 1 + if self.wait_call_count == 1: + self._now = 100 + return 0 + if self.wait_call_count > 3: + raise AssertionError("runner should have terminated no-work/no-progress loop") + return 0 + + def current_timestamp_ns(self) -> int: + return self._now + + def read_market_snapshot(self) -> Any: + raise AssertionError("market snapshot should not be needed") + def read_orders_snapshot(self) -> tuple[Any, Any]: + raise AssertionError("orders snapshot should not be needed") -def test_control_time_event_processed_after_pop_and_before_gate( - monkeypatch: pytest.MonkeyPatch, -) -> None: - queued_intent = _new_intent() - runner = HftStrategyRunner( - engine_cfg=_engine_cfg(), - strategy=_NoopStrategy(), - risk_cfg=_risk_cfg(), - core_cfg=_core_cfg(), - ) - runner._next_send_ts_ns_local = 5 + def record(self, _recorder: Any) -> bool: + self.record_call_count += 1 + return False - ordering: list[str] = [] - captured_raw_inputs: list[list[Any]] = [] + venue = _NoProgressVenue() + runner.run(venue, execution, recorder) - def _spy_pop_queued_intents(instrument: str) -> list[Any]: - _ = instrument - ordering.append("pop") - return [queued_intent] + assert venue.wait_call_count == 2 # init wait + one loop iteration + assert venue.record_call_count == 0 - def _spy_process_event_entry(state: object, entry: object, *, configuration: object) -> None: - _ = (state, configuration) - if isinstance(entry.event, ControlTimeEvent): - ordering.append("control") - def _spy_decide_intents(**kwargs: Any) -> GateDecision: - ordering.append("gate") - captured_raw_inputs.append(list(kwargs["raw_intents"])) - return _decision_for([]) +def test_runner_terminates_on_no_work_with_non_event_rc() -> None: + runner = _runner() + execution = _Execution() + recorder = _Recorder() - monkeypatch.setattr(runner.strategy_state, "pop_queued_intents", _spy_pop_queued_intents) - monkeypatch.setattr(strategy_runner_module, "process_event_entry", _spy_process_event_entry) - monkeypatch.setattr(runner.risk, "decide_intents", _spy_decide_intents) + class _Rc14Venue: + def __init__(self) -> None: + self._now = 100 + self.wait_call_count = 0 + self.record_call_count = 0 - venue = _StubVenue( - rc_sequence=[0, 0, 1], - ts_sequence=[1, 10, 11], - ) - runner.run(venue=venue, execution=_NoopExecution(), recorder=_RecorderWrapper()) + def wait_next(self, *, timeout_ns: int, include_order_resp: bool) -> int: + _ = (timeout_ns, include_order_resp) + self.wait_call_count += 1 + if self.wait_call_count == 1: + return 0 + if self.wait_call_count > 3: + raise AssertionError("runner should terminate on non-event rc with no work") + return 14 - assert ordering == ["pop", "control", "gate"] - assert len(captured_raw_inputs) == 1 - assert [it.client_order_id for it in captured_raw_inputs[0]] == [queued_intent.client_order_id] + def current_timestamp_ns(self) -> int: + return self._now + + def read_market_snapshot(self) -> Any: + raise AssertionError("market snapshot should not be needed") + def read_orders_snapshot(self) -> tuple[Any, Any]: + raise AssertionError("orders snapshot should not be needed") -def test_global_canonical_counter_shared_with_control_time_market_and_submitted( - monkeypatch: pytest.MonkeyPatch, -) -> None: - new_intent = _new_intent() - runner = HftStrategyRunner( - engine_cfg=_engine_cfg(), - strategy=_EmitIntentsStrategy([new_intent]), - risk_cfg=_risk_cfg(), - core_cfg=_core_cfg(), - ) - runner._next_send_ts_ns_local = 5 + def record(self, _recorder: Any) -> bool: + self.record_call_count += 1 + return False - monkeypatch.setattr( - runner.risk, - "decide_intents", - lambda **_: _decision_for([new_intent]), - ) + venue = _Rc14Venue() + runner.run(venue, execution, recorder) - positions: list[tuple[int, str]] = [] + assert venue.wait_call_count == 2 + assert venue.record_call_count == 0 - def _spy_process_event_entry(state: object, entry: object, *, configuration: object) -> None: - _ = (state, configuration) - positions.append((entry.position.index, type(entry.event).__name__)) - monkeypatch.setattr(strategy_runner_module, "process_event_entry", _spy_process_event_entry) - venue = _StubVenue( - rc_sequence=[0, 2, 1], - ts_sequence=[1, 10, 11], - depth=_depth_snapshot(), +def test_runner_does_not_terminate_early_when_pending_work_exists() -> None: + runner = _runner() + execution = _Execution() + recorder = _Recorder() + runner._pending_control_scheduling_obligation = ControlSchedulingObligation( + due_ts_ns_local=200, + reason="rate_limit", + scope_key="instrument:BTC_USDC-PERPETUAL", + source="test", ) - runner.run(venue=venue, execution=_NoopExecution(), recorder=_RecorderWrapper()) + runner._next_send_ts_ns_local = 200 - assert positions == [ - (0, "MarketEvent"), - (1, "ControlTimeEvent"), - (2, "OrderSubmittedEvent"), - ] - assert runner._event_stream_cursor.next_index == 3 + class _PendingAwareVenue: + def __init__(self) -> None: + self._now = 100 + self.wait_call_count = 0 + + def wait_next(self, *, timeout_ns: int, include_order_resp: bool) -> int: + _ = include_order_resp + self.wait_call_count += 1 + if self.wait_call_count == 1: + self._now = 100 + return 0 + if self.wait_call_count == 2: + assert timeout_ns > 0 + self._now = 100 + return 0 + self._now = 101 + return 1 + + def current_timestamp_ns(self) -> int: + return self._now + + def read_market_snapshot(self) -> Any: + raise AssertionError("market snapshot should not be needed") + def read_orders_snapshot(self) -> tuple[Any, Any]: + raise AssertionError("orders snapshot should not be needed") -def test_fallback_second_boundary_wakeup_behavior_unchanged( - monkeypatch: pytest.MonkeyPatch, -) -> None: - intent = _new_intent() - runner = HftStrategyRunner( - engine_cfg=_engine_cfg(), - strategy=_EmitIntentsStrategy([intent]), - risk_cfg=_risk_cfg(), - core_cfg=_core_cfg(), - ) - runner.strategy_state.queued_intents.setdefault(runner.engine_cfg.instrument, deque()) - runner.strategy_state.queued_intents[runner.engine_cfg.instrument].append( - SimpleNamespace(intent=intent) - ) - - monkeypatch.setattr(runner.risk, "decide_intents", lambda **_: _decision_for([])) + def record(self, _recorder: Any) -> bool: + return False - venue = _StubVenue( - rc_sequence=[0, 2, 1], - ts_sequence=[1, 2_000_000_000, 2_000_000_001], - depth=_depth_snapshot(), - ) - runner.run(venue=venue, execution=_NoopExecution(), recorder=_RecorderWrapper()) + venue = _PendingAwareVenue() + runner.run(venue, execution, recorder) - assert runner._next_send_ts_ns_local == 3_000_000_000 + assert venue.wait_call_count == 3