-
Notifications
You must be signed in to change notification settings - Fork 57
feat(engine): env-var switch for async-first models experiment #280
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
de634c0 to
1129ed6
Compare
Adds an opt-in async execution path (DATA_DESIGNER_ASYNC_ENGINE=1) for the cell-by-cell generation pipeline. Replaces thread-pool concurrency with native asyncio TaskGroup + Semaphore for bounded concurrent LLM calls, while keeping the sync path as the default. Key changes: - ModelFacade: acompletion(), agenerate_text_embeddings(), agenerate() - acatch_llm_exceptions decorator (async mirror of catch_llm_exceptions) - AsyncConcurrentExecutor with persistent background event loop - ColumnWiseBuilder branches on env var to fan out via async or threads - Benchmark updated with async mock support Co-Authored-By: Remi <noreply@anthropic.com>
Resolved conflicts: - llm_completion.py: kept agenerate() async method + main's new _extract_reasoning_content(), TraceType handling, and extract_reasoning_content config. Updated agenerate() to match main's trace handling patterns. - column_wise_builder.py: kept DATA_DESIGNER_ASYNC_ENGINE env var + adopted main's get_library_version() replacing importlib.metadata. Co-Authored-By: Remi <noreply@anthropic.com>
…models/ Delete the models_v2/ package (~2,500 lines) that was a near-complete copy of models/ with only ~250 lines of actual async additions. Instead: - Add acatch_llm_exceptions to models/errors.py - Add acompletion, agenerate, agenerate_text_embeddings to ModelFacade - Fix agenerate() to include total_tool_calls tracking (missing in v2 fork) - Fix agenerate() parser default to use _identity (missing in v2 fork) - Remove __path__ swap machinery from models/__init__.py - Env var DATA_DESIGNER_ASYNC_ENGINE now gates at the right level: column_wise_builder choosing between AsyncConcurrentExecutor and ConcurrentThreadExecutor, not swapping entire module trees Also deduplicate: - llm_completion.py: extract _prepare_generation_kwargs/_process_generation_result - column_wise_builder.py: extract _setup_fan_out/_finalize_fan_out All tests pass (make test, make lint, make format, make update-license-headers). Co-Authored-By: Remi <noreply@anthropic.com>
Address Codex review findings: - Add 11 async behavior tests for ModelFacade (acompletion, agenerate, agenerate_text_embeddings) mirroring existing sync test patterns - Add default agenerate() to ColumnGenerator base class that delegates to sync generate() via asyncio.to_thread — fixes AttributeError for EmbeddingCellGenerator and CustomColumnGenerator under async engine - Add coro.close() cleanup in AsyncConcurrentExecutor._run_task early returns to prevent "coroutine was never awaited" warnings - Tighten types: list[ChatMessage] for traces, list[dict[str, Any]] for multi_modal_context, dict[str, Any] for executor kwargs Co-Authored-By: Remi <noreply@anthropic.com>
Greptile OverviewGreptile SummaryThis PR adds async engine support to Data Designer's model inference layer, enabling true async concurrency via Key changes:
Architecture highlights:
|
| Filename | Overview |
|---|---|
| packages/data-designer-engine/src/data_designer/engine/models/facade.py | Added three async methods (acompletion, agenerate, agenerate_text_embeddings) mirroring sync methods, with proper async/await and error handling via @acatch_llm_exceptions decorator |
| packages/data-designer-engine/src/data_designer/engine/models/errors.py | Added acatch_llm_exceptions async decorator that properly wraps async functions and reuses existing exception handling logic |
| packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/async_concurrency.py | New AsyncConcurrentExecutor mirrors ConcurrentThreadExecutor API with async task execution, bounded concurrency via semaphore, and early shutdown on error rate threshold |
| packages/data-designer-engine/src/data_designer/engine/column_generators/generators/llm_completion.py | Refactored to extract shared logic into _prepare_generation_kwargs and _process_generation_result, added async agenerate method that reuses these helpers |
| packages/data-designer-engine/src/data_designer/engine/dataset_builders/column_wise_builder.py | Added env var gating (DATA_DESIGNER_ASYNC_ENGINE) to switch between thread and async executors, refactored fan-out setup/teardown into shared helpers |
Sequence Diagram
sequenceDiagram
participant User
participant ColumnWiseBuilder
participant AsyncExecutor
participant Generator
participant ModelFacade
participant LiteLLM
User->>ColumnWiseBuilder: build() with DATA_DESIGNER_ASYNC_ENGINE=1
ColumnWiseBuilder->>ColumnWiseBuilder: _run_cell_by_cell_generator()
ColumnWiseBuilder->>AsyncExecutor: AsyncConcurrentExecutor(max_workers=N)
loop For each record
ColumnWiseBuilder->>AsyncExecutor: add work_item(generator.agenerate(record))
end
ColumnWiseBuilder->>AsyncExecutor: run(work_items)
AsyncExecutor->>AsyncExecutor: _ensure_async_engine_loop()
AsyncExecutor->>AsyncExecutor: _run_all() on event loop
par Concurrent async tasks (max N)
AsyncExecutor->>Generator: agenerate(record_1)
Generator->>ModelFacade: agenerate(prompt, parser, ...)
ModelFacade->>ModelFacade: acompletion(messages)
ModelFacade->>LiteLLM: router.acompletion()
LiteLLM-->>ModelFacade: response
ModelFacade->>ModelFacade: parser(response)
ModelFacade-->>Generator: (output, trace)
Generator-->>AsyncExecutor: result_dict
and
AsyncExecutor->>Generator: agenerate(record_N)
Generator->>ModelFacade: agenerate(prompt, parser, ...)
ModelFacade->>LiteLLM: router.acompletion()
LiteLLM-->>ModelFacade: response
ModelFacade-->>Generator: (output, trace)
Generator-->>AsyncExecutor: result_dict
end
AsyncExecutor-->>ColumnWiseBuilder: execution complete
ColumnWiseBuilder-->>User: dataset built
Summary
Add async engine support to Data Designer's model inference layer, enabling true async concurrency via
asyncioas an alternative to the existing thread-based fan-out.What changed
Async ModelFacade methods — Three new async methods on
ModelFacade:acompletion()→ callsRouter.acompletion()(LiteLLM native async)agenerate()→ async generation loop with correction/restart logic, MCP tool calling, and usage trackingagenerate_text_embeddings()→ async embedding generationAsyncConcurrentExecutor — New executor that runs coroutines on an event loop instead of dispatching to a thread pool. Used when
DATA_DESIGNER_ASYNC_ENGINE=1.Environment variable gating —
DATA_DESIGNER_ASYNC_ENGINE=1controls whethercolumn_wise_builder.pydispatches toAsyncConcurrentExecutor(async) orConcurrentThreadExecutor(threads). The gate is at the executor selection level, not at the module import level.Architecture
The async methods live directly on
ModelFacadealongside the sync methods — no separate package, no import redirection. The only difference at the call site isself.model.generate(...)vsawait self.model.agenerate(...).Shared logic in
llm_completion.pyis factored into_prepare_generation_kwargs()and_process_generation_result(), sogenerate()andagenerate()are each ~3 lines.Shared fan-out setup in
column_wise_builder.pyis factored into_setup_fan_out()and_finalize_fan_out(), so each fan-out method is ~7 lines.Files changed
engine/models/errors.pyacatch_llm_exceptionsasync decoratorengine/models/facade.pyacompletion,agenerate,agenerate_text_embeddingsengine/models/__init__.pyengine/.../llm_completion.pyengine/.../column_wise_builder.pyengine/.../utils/async_concurrency.pyAsyncConcurrentExecutortests/.../test_async_engine_switch.pyTest plan
make test— all tests pass (487 across 3 packages)make lint-fix— all checks passedmake format— no changes neededmake update-license-headers— all headers currentmodels_v2in codebaseModelFacadehas all 3 async methods confirmed via import check🤖 Generated with Claude Code