Skip to content

Conversation

@eric-tramel
Copy link
Contributor

@eric-tramel eric-tramel commented Feb 2, 2026

Summary

Add async engine support to Data Designer's model inference layer, enabling true async concurrency via asyncio as an alternative to the existing thread-based fan-out.

What changed

Async ModelFacade methods — Three new async methods on ModelFacade:

  • acompletion() → calls Router.acompletion() (LiteLLM native async)
  • agenerate() → async generation loop with correction/restart logic, MCP tool calling, and usage tracking
  • agenerate_text_embeddings() → async embedding generation

AsyncConcurrentExecutor — New executor that runs coroutines on an event loop instead of dispatching to a thread pool. Used when DATA_DESIGNER_ASYNC_ENGINE=1.

Environment variable gatingDATA_DESIGNER_ASYNC_ENGINE=1 controls whether column_wise_builder.py dispatches to AsyncConcurrentExecutor (async) or ConcurrentThreadExecutor (threads). The gate is at the executor selection level, not at the module import level.

Architecture

The async methods live directly on ModelFacade alongside the sync methods — no separate package, no import redirection. The only difference at the call site is self.model.generate(...) vs await self.model.agenerate(...).

Shared logic in llm_completion.py is factored into _prepare_generation_kwargs() and _process_generation_result(), so generate() and agenerate() are each ~3 lines.

Shared fan-out setup in column_wise_builder.py is factored into _setup_fan_out() and _finalize_fan_out(), so each fan-out method is ~7 lines.

Files changed

File Change
engine/models/errors.py Add acatch_llm_exceptions async decorator
engine/models/facade.py Add acompletion, agenerate, agenerate_text_embeddings
engine/models/__init__.py License-only (clean)
engine/.../llm_completion.py Extract shared generate/agenerate logic
engine/.../column_wise_builder.py Extract shared fan-out setup
engine/.../utils/async_concurrency.py New AsyncConcurrentExecutor
tests/.../test_async_engine_switch.py Tests for async methods + env var gating

Test plan

  • make test — all tests pass (487 across 3 packages)
  • make lint-fix — all checks passed
  • make format — no changes needed
  • make update-license-headers — all headers current
  • Zero references to models_v2 in codebase
  • ModelFacade has all 3 async methods confirmed via import check

🤖 Generated with Claude Code

@eric-tramel eric-tramel self-assigned this Feb 2, 2026
@eric-tramel eric-tramel added the enhancement New feature or request label Feb 2, 2026
eric-tramel and others added 5 commits February 2, 2026 14:49
Adds an opt-in async execution path (DATA_DESIGNER_ASYNC_ENGINE=1) for
the cell-by-cell generation pipeline. Replaces thread-pool concurrency
with native asyncio TaskGroup + Semaphore for bounded concurrent LLM
calls, while keeping the sync path as the default.

Key changes:
- ModelFacade: acompletion(), agenerate_text_embeddings(), agenerate()
- acatch_llm_exceptions decorator (async mirror of catch_llm_exceptions)
- AsyncConcurrentExecutor with persistent background event loop
- ColumnWiseBuilder branches on env var to fan out via async or threads
- Benchmark updated with async mock support

Co-Authored-By: Remi <noreply@anthropic.com>
Resolved conflicts:
- llm_completion.py: kept agenerate() async method + main's new
  _extract_reasoning_content(), TraceType handling, and
  extract_reasoning_content config. Updated agenerate() to match
  main's trace handling patterns.
- column_wise_builder.py: kept DATA_DESIGNER_ASYNC_ENGINE env var +
  adopted main's get_library_version() replacing importlib.metadata.

Co-Authored-By: Remi <noreply@anthropic.com>
…models/

Delete the models_v2/ package (~2,500 lines) that was a near-complete copy of
models/ with only ~250 lines of actual async additions. Instead:

- Add acatch_llm_exceptions to models/errors.py
- Add acompletion, agenerate, agenerate_text_embeddings to ModelFacade
- Fix agenerate() to include total_tool_calls tracking (missing in v2 fork)
- Fix agenerate() parser default to use _identity (missing in v2 fork)
- Remove __path__ swap machinery from models/__init__.py
- Env var DATA_DESIGNER_ASYNC_ENGINE now gates at the right level:
  column_wise_builder choosing between AsyncConcurrentExecutor and
  ConcurrentThreadExecutor, not swapping entire module trees

Also deduplicate:
- llm_completion.py: extract _prepare_generation_kwargs/_process_generation_result
- column_wise_builder.py: extract _setup_fan_out/_finalize_fan_out

All tests pass (make test, make lint, make format, make update-license-headers).

Co-Authored-By: Remi <noreply@anthropic.com>
Address Codex review findings:

- Add 11 async behavior tests for ModelFacade (acompletion, agenerate,
  agenerate_text_embeddings) mirroring existing sync test patterns
- Add default agenerate() to ColumnGenerator base class that delegates
  to sync generate() via asyncio.to_thread — fixes AttributeError for
  EmbeddingCellGenerator and CustomColumnGenerator under async engine
- Add coro.close() cleanup in AsyncConcurrentExecutor._run_task early
  returns to prevent "coroutine was never awaited" warnings
- Tighten types: list[ChatMessage] for traces, list[dict[str, Any]]
  for multi_modal_context, dict[str, Any] for executor kwargs

Co-Authored-By: Remi <noreply@anthropic.com>
@eric-tramel eric-tramel marked this pull request as ready for review February 10, 2026 23:11
@eric-tramel eric-tramel requested a review from a team as a code owner February 10, 2026 23:11
@greptile-apps
Copy link

greptile-apps bot commented Feb 10, 2026

Greptile Overview

Greptile Summary

This PR adds async engine support to Data Designer's model inference layer, enabling true async concurrency via asyncio as an opt-in alternative to the existing thread-based execution. The implementation is well-architected with minimal duplication — async methods (acompletion, agenerate, agenerate_text_embeddings) live directly on ModelFacade alongside sync methods, and shared logic is factored into helper functions.

Key changes:

  • Three new async methods on ModelFacade that mirror sync behavior with proper async/await semantics
  • AsyncConcurrentExecutor for running coroutines with bounded concurrency and error rate monitoring
  • Environment variable DATA_DESIGNER_ASYNC_ENGINE=1 controls executor selection in column_wise_builder.py
  • Async decorator @acatch_llm_exceptions reuses existing exception handling logic
  • MCP tool calling operations delegated to thread pool via asyncio.to_thread (safe given MCP facade is stateless per call)
  • Default agenerate fallback on base ColumnGenerator delegates to sync via asyncio.to_thread
  • Comprehensive test coverage including correction/retry logic, tool calling, and env var gating

Architecture highlights:

  • No separate async package — async and sync coexist in the same modules
  • Shared logic extracted into _prepare_generation_kwargs and _process_generation_result in llm_completion.py
  • Shared fan-out setup extracted into _setup_fan_out and _finalize_fan_out in column_wise_builder.py
  • Persistent background event loop shared across all AsyncConcurrentExecutor instances to avoid breaking LiteLLM's internal async state

Confidence Score: 5/5

  • This PR is safe to merge with minimal risk
  • Score reflects thorough implementation with excellent test coverage (comprehensive async test suite mirroring sync tests), clean architecture (minimal duplication via shared helpers), proper error handling (async decorator reuses proven exception logic), safe concurrency patterns (persistent event loop, proper semaphore usage), backward compatibility (env var gated, defaults to existing thread-based execution), and validation through extensive testing (487 tests pass)
  • No files require special attention

Important Files Changed

Filename Overview
packages/data-designer-engine/src/data_designer/engine/models/facade.py Added three async methods (acompletion, agenerate, agenerate_text_embeddings) mirroring sync methods, with proper async/await and error handling via @acatch_llm_exceptions decorator
packages/data-designer-engine/src/data_designer/engine/models/errors.py Added acatch_llm_exceptions async decorator that properly wraps async functions and reuses existing exception handling logic
packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/async_concurrency.py New AsyncConcurrentExecutor mirrors ConcurrentThreadExecutor API with async task execution, bounded concurrency via semaphore, and early shutdown on error rate threshold
packages/data-designer-engine/src/data_designer/engine/column_generators/generators/llm_completion.py Refactored to extract shared logic into _prepare_generation_kwargs and _process_generation_result, added async agenerate method that reuses these helpers
packages/data-designer-engine/src/data_designer/engine/dataset_builders/column_wise_builder.py Added env var gating (DATA_DESIGNER_ASYNC_ENGINE) to switch between thread and async executors, refactored fan-out setup/teardown into shared helpers

Sequence Diagram

sequenceDiagram
    participant User
    participant ColumnWiseBuilder
    participant AsyncExecutor
    participant Generator
    participant ModelFacade
    participant LiteLLM

    User->>ColumnWiseBuilder: build() with DATA_DESIGNER_ASYNC_ENGINE=1
    ColumnWiseBuilder->>ColumnWiseBuilder: _run_cell_by_cell_generator()
    ColumnWiseBuilder->>AsyncExecutor: AsyncConcurrentExecutor(max_workers=N)
    loop For each record
        ColumnWiseBuilder->>AsyncExecutor: add work_item(generator.agenerate(record))
    end
    ColumnWiseBuilder->>AsyncExecutor: run(work_items)
    AsyncExecutor->>AsyncExecutor: _ensure_async_engine_loop()
    AsyncExecutor->>AsyncExecutor: _run_all() on event loop
    par Concurrent async tasks (max N)
        AsyncExecutor->>Generator: agenerate(record_1)
        Generator->>ModelFacade: agenerate(prompt, parser, ...)
        ModelFacade->>ModelFacade: acompletion(messages)
        ModelFacade->>LiteLLM: router.acompletion()
        LiteLLM-->>ModelFacade: response
        ModelFacade->>ModelFacade: parser(response)
        ModelFacade-->>Generator: (output, trace)
        Generator-->>AsyncExecutor: result_dict
    and
        AsyncExecutor->>Generator: agenerate(record_N)
        Generator->>ModelFacade: agenerate(prompt, parser, ...)
        ModelFacade->>LiteLLM: router.acompletion()
        LiteLLM-->>ModelFacade: response
        ModelFacade-->>Generator: (output, trace)
        Generator-->>AsyncExecutor: result_dict
    end
    AsyncExecutor-->>ColumnWiseBuilder: execution complete
    ColumnWiseBuilder-->>User: dataset built
Loading

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant