Skip to content

UN-3211 [FEAT] HTTP session lifecycle management for workers API clients#1782

Open
muhammad-ali-e wants to merge 10 commits intomainfrom
feat/UN-3211-FEAT_http_session_lifecycle_management
Open

UN-3211 [FEAT] HTTP session lifecycle management for workers API clients#1782
muhammad-ali-e wants to merge 10 commits intomainfrom
feat/UN-3211-FEAT_http_session_lifecycle_management

Conversation

@muhammad-ali-e
Copy link
Contributor

What

  • Add session lifecycle management (close/cleanup) to all worker API clients
  • Wire API_CLIENT_POOL_SIZE into HTTPAdapter connection pools
  • Add _owns_session flag to prevent singleton shared sessions from being closed by individual clients
  • Add 25 unit tests for session lifecycle behavior

Why

  • Worker API clients created per-task were not always explicitly closed, risking socket/FD accumulation
  • API_CLIENT_POOL_SIZE config existed but was never wired into HTTPAdapter (dead config)
  • on_task_postrun signal handler ran uselessly on every task when singleton mode was disabled
  • No test coverage existed for session lifecycle behavior

How

  • base_client.py: Added _owns_session flag, idempotent close(), __del__ destructor, wired pool size into HTTPAdapter
  • internal_client.py: Set _owns_session=False on all clients sharing singleton session
  • api-deployment/tasks.py: Added try/finally with api_client.close() for missing cleanup
  • callback/tasks.py: try/finally cleanup in callback task functions
  • worker.py: Early-return guard in on_task_postrun when singleton disabled; on_worker_process_shutdown hook
  • worker_config.py: Default pool size 10, singleton reset threshold config
  • 25 unit tests across 8 test classes

Can this PR break any existing features?

  • No. All changes are additive safety nets (destructors, finally blocks, flags). Default behavior unchanged — ENABLE_API_CLIENT_SINGLETON=false remains the default. Pool size default stays at 10.

Database Migrations

  • None

Env Config

  • API_CLIENT_POOL_SIZE — now actually wired in (default: 10, unchanged)
  • WORKER_SINGLETON_RESET_THRESHOLD — already documented in sample.env (default: 1000)
  • ENABLE_API_CLIENT_SINGLETON — existing, unchanged (default: false)

Relevant Docs

  • UNS-205 spec in mfbt-unstract

Related Issues or PRs

  • UN-3211

Dependencies Versions

  • None

Notes on Testing

  • Run: cd workers && PYTHONPATH=.:../unstract .venv/bin/python -m pytest shared/tests/ -v
  • 25 tests covering: del destructor, close() idempotency, pool size config, singleton reset, task counter, postrun guard, singleton-aware close, managed execution context cleanup

🤖 Generated with Claude Code

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Feb 6, 2026

Summary by CodeRabbit

  • New Features

    • Shared API client singleton lifecycle with Celery hooks to reset and track per-process usage.
  • Bug Fixes

    • Centralized cleanup ensuring API client sessions are closed and execution state/notifications are reliably finalized.
  • Configuration

    • Default API client pool size increased to 10.
    • New worker singleton reset threshold (default 1000).
  • Tests

    • Added comprehensive tests for session lifecycle, pooling, counters, and cleanup.

Walkthrough

Centralizes API client and StateStore cleanup in finally blocks; adds singleton/shared HTTP session support with task-count reset and observability; introduces Celery signal handlers for lifecycle events; increases API client pool defaults; adds test fixtures and extensive session lifecycle tests.

Changes

Cohort / File(s) Summary
API execution & callbacks
workers/api-deployment/tasks.py, workers/callback/tasks.py
Initialize api_client explicitly, centralize client closure and StateStore cleanup in finally blocks, unify status/pipeline update flows, assemble/return consolidated callback_result, and preserve existing error/log handling while ensuring deterministic cleanup.
Session lifecycle & client base
workers/shared/clients/base_client.py, workers/shared/api/internal_client.py
Add session ownership flags (_owns_session, _closed), idempotent close() and __del__, config-driven HTTPAdapter pool sizing, shared-session (singleton) support with shared session sharing helper, per-process task counters and timestamp, and singleton control APIs (reset_singleton, increment_task_counter, get_task_counter_info).
Worker signals & lifecycle hooks
workers/worker.py
Add Celery signal handlers: on_worker_process_shutdown to reset singleton and client factory state; on_task_postrun to increment task counter and trigger singleton reset when threshold reached (guarded by config). Both handlers use defensive error handling.
Configuration & environment
workers/sample.env, workers/shared/infrastructure/config/worker_config.py
Increase default API_CLIENT_POOL_SIZE from 3 to 10; add WORKER_SINGLETON_RESET_THRESHOLD env var and singleton_reset_task_threshold WorkerConfig field (default 1000) to control singleton reset behavior.
Tests & test setup
workers/conftest.py, workers/shared/tests/test_session_lifecycle.py
Add root conftest.py to set test environment defaults and extensive test_session_lifecycle.py covering destructor safety, idempotent close, singleton lifecycle and task-counter reset, pool-size wiring, managed execution context cleanup, and related fixtures.

Sequence Diagram(s)

mermaid
sequenceDiagram
rect rgba(80,160,240,0.5)
participant Task as Celery Task
end
rect rgba(160,200,120,0.5)
participant API as InternalAPIClient
end
rect rgba(240,160,80,0.5)
participant State as StateStore
end
rect rgba(200,120,200,0.5)
participant DB as Pipeline/DB
end

Task->>API: setup_execution_context() => api_client
Task->>API: perform API calls
API-->>Task: responses
Task->>State: read/update context
Task->>DB: update execution/pipeline status
alt completion or error
Task->>API: api_client.close() (finally)
Task->>State: StateStore.clear() (finally)
end
Note right of Task: after task_postrun signal
Task->>API: increment_task_counter()
API-->>Task: reset_singleton() if threshold reached

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly and specifically describes the main feature added: HTTP session lifecycle management for workers API clients, matching the primary changes in the PR.
Description check ✅ Passed The description comprehensively covers all required sections with detailed explanations of what changed, why, how it was implemented, compatibility notes, and testing instructions.
Docstring Coverage ✅ Passed Docstring coverage is 97.96% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feat/UN-3211-FEAT_http_session_lifecycle_management

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

- Add _owns_session flag to prevent singleton shared session from being
  closed by individual clients
- Wire API_CLIENT_POOL_SIZE into HTTPAdapter connection pools
- Add idempotent close() and __del__ destructor to BaseAPIClient
- Add try/finally cleanup in api-deployment and callback tasks
- Add on_worker_process_shutdown hook and early-return guard in postrun
- Add 25 unit tests for session lifecycle behavior

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@muhammad-ali-e muhammad-ali-e force-pushed the feat/UN-3211-FEAT_http_session_lifecycle_management branch from 7f18370 to 0752a37 Compare February 6, 2026 09:13
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Fix all issues with AI agents
In `@workers/shared/api/internal_client.py`:
- Around line 256-268: The reset_singleton method currently swallows exceptions
when closing cls._shared_session; change this to log the exception details
instead of silently passing so FD leaks/errors are visible—catch the Exception
around cls._shared_session.close() and call logger.exception or logger.error
with the exception/context (mentioning InternalAPIClient.reset_singleton and
cls._shared_session) before continuing to set cls._shared_session,
cls._shared_base_client, cls._initialization_count, and cls._task_counter to
None/0 and logging the reset completion.
- Around line 271-294: The non-atomic update in increment_task_counter can lose
counts under threaded/eventlet/gevent worker pools; make the method thread-safe
by adding a class-level lock (e.g., _task_counter_lock = threading.Lock()) and
wrapping the read/increment/check/reset sequence in a with _task_counter_lock:
block (import threading where needed) so the operations on _task_counter and
_last_reset_time and the call to reset_singleton() are atomic; alternatively, if
you require prefork-only deployments, add a precondition/assertion at the start
of increment_task_counter that the worker is running in prefork mode and skip
changes.
🧹 Nitpick comments (2)
workers/api-deployment/tasks.py (1)

263-269: Don’t swallow api_client.close() errors silently.
A debug/warn log helps diagnose lingering sessions if close fails.

🔧 Suggested tweak
-            try:
-                api_client.close()
-            except Exception:
-                pass
+            try:
+                api_client.close()
+            except Exception as e:
+                logger.debug("api_client.close() failed during cleanup: %s", e)
workers/callback/tasks.py (1)

1508-1512: Avoid silent failures when closing API clients.
A debug/warn log helps diagnose leaks if close fails in either finally block.

🔧 Suggested tweak
-            try:
-                context.api_client.close()
-            except Exception:
-                pass
+            try:
+                context.api_client.close()
+            except Exception as e:
+                logger.debug("api_client.close() failed during callback cleanup: %s", e)
@@
-        try:
-            api_client.close()
-        except Exception:
-            pass
+        try:
+            api_client.close()
+        except Exception as e:
+            logger.debug("api_client.close() failed during API callback cleanup: %s", e)

Also applies to: 1781-1784

…afe counter

- Log warning instead of silently swallowing exceptions in reset_singleton()
- Add threading.Lock around task counter increment for thread safety with
  threads/gevent/eventlet pools

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In `@workers/shared/api/internal_client.py`:
- Around line 258-296: reset_singleton() can close cls._shared_session while
other threads are mid-request; change to a "swap-then-close" approach: inside
reset_singleton() grab a session lock (use existing _task_counter_lock or add a
dedicated _session_lock), atomically replace cls._shared_session with a new
requests.Session() (or None if you want lazy re-init) and store the old session
in a local variable, then spawn a short-lived background worker (thread or
timer) that waits a configurable grace period and then closes the old session
(catching exceptions); update increment_task_counter to use the same session
lock when reading/swapping to avoid races and ensure any code that reads
_shared_session uses that lock or reads a local reference so in-flight requests
continue using the old session until it is closed after the grace period.
🧹 Nitpick comments (1)
workers/shared/api/internal_client.py (1)

280-296: WorkerConfig() is instantiated inside the lock on every task completion.

Line 285 creates a new WorkerConfig (parsing env vars) while holding _task_counter_lock. This extends the critical section unnecessarily and allocates an object per task. Consider reading the threshold once (e.g., as a class-level cached value or outside the lock).

Also, cls._task_counter = 0 on line 295 is redundant since reset_singleton() (line 269) already resets it.

♻️ Proposed refactor: read config outside the lock, remove redundant reset
     `@classmethod`
     def increment_task_counter(cls) -> None:
-        with cls._task_counter_lock:
-            cls._task_counter += 1
-
-            from shared.infrastructure.config.worker_config import WorkerConfig
+        from shared.infrastructure.config.worker_config import WorkerConfig
 
-            threshold = WorkerConfig().singleton_reset_task_threshold
-            if threshold > 0 and cls._task_counter >= threshold:
-                import time
+        threshold = WorkerConfig().singleton_reset_task_threshold
+        with cls._task_counter_lock:
+            cls._task_counter += 1
+            if threshold > 0 and cls._task_counter >= threshold:
+                import time
 
-                logger.info(
-                    "Task counter reached threshold (%d/%d), resetting singleton session",
-                    cls._task_counter,
-                    threshold,
-                )
-                cls.reset_singleton()
-                cls._task_counter = 0
-                cls._last_reset_time = time.time()
+                logger.info(
+                    "Task counter reached threshold (%d/%d), resetting singleton session",
+                    cls._task_counter,
+                    threshold,
+                )
+                cls.reset_singleton()
+                cls._last_reset_time = time.time()

… document thread-safety

- Move WorkerConfig() instantiation outside lock in increment_task_counter()
- Remove redundant _task_counter=0 (already done inside reset_singleton)
- Document thread-safety caveat in reset_singleton() docstring
- Log close failures in task cleanup instead of silently swallowing

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
workers/callback/tasks.py (1)

1380-1389: ⚠️ Potential issue | 🟡 Minor

api_client created inside _extract_callback_parameters is not covered by the try/finally if extraction fails mid-way.

If _extract_callback_parameters raises after create_api_client() (line 702 or 772 in the extraction function) but before assigning to context.api_client (line 846), the newly created client becomes an orphan local variable. The try/finally at lines 1389/1507 won't run because the exception occurs before entering that block.

The __del__ destructor (FR-1) serves as the safety net here, which is the design intent. Just flagging for awareness — if deterministic cleanup is desired, the try/finally could be widened to wrap _extract_callback_parameters as well.

🤖 Fix all issues with AI agents
In `@workers/shared/tests/test_session_lifecycle.py`:
- Around line 371-396: Replace the inline simulated guard in the tests with
calls to the real signal handler on_task_postrun (imported from workers.worker)
and mock InternalAPIClient.increment_task_counter so the handler's guard,
try/except, and logging paths are exercised; for the singleton-disabled test
call on_task_postrun(sender=None, task_id=None, **{}) and assert
increment_task_counter was not called, and for the singleton-enabled test patch
the same method and call on_task_postrun then assert increment_task_counter was
called once, ensuring the patch target matches the import path used inside
on_task_postrun.
🧹 Nitpick comments (5)
workers/shared/api/internal_client.py (2)

125-179: Singleton initialization creates and immediately discards 7 sessions.

Each specialized client's __init__ (via BaseAPIClient.__init__) creates a fresh requests.Session with mounted HTTPAdapter, which _share_session immediately closes and replaces. For 7 specialized clients, that's 7 throwaway sessions per InternalAPIClient instantiation.

This isn't a bug — the sessions are properly closed — but it's wasteful, especially if InternalAPIClient is instantiated frequently (e.g., per-task in non-singleton mode). Consider passing an existing session into the specialized client constructors to avoid the create-then-close pattern.


286-300: WorkerConfig() instantiated on every task completion.

increment_task_counter is called via task_postrun signal after every task. Each call constructs a new WorkerConfig(), which reads environment variables. While this keeps the threshold dynamically reconfigurable, it adds overhead on every task completion.

If env-var reading becomes a concern at scale, consider caching the threshold at the class level and only refreshing it on reset.

workers/shared/tests/test_session_lifecycle.py (1)

322-348: mock_config_singleton fixtures are required for env setup — Ruff ARG002 is a false positive.

The mock_config_singleton parameter in test_increment_counter and test_threshold_triggers_reset isn't directly referenced in the test body, but it's needed because the fixture patches os.environ with WORKER_SINGLETON_RESET_THRESHOLD=3 and ENABLE_API_CLIENT_SINGLETON=true. Without it, WorkerConfig() inside increment_task_counter would read unpatched env vars.

To silence the Ruff warning while keeping the intent clear, you could prefix with underscore:

Suggested fix
-    def test_increment_counter(self, mock_config_singleton):
+    def test_increment_counter(self, mock_config_singleton):  # noqa: ARG002

Or rename the parameter:

-    def test_increment_counter(self, mock_config_singleton):
+    def test_increment_counter(self, _mock_config_singleton):
workers/callback/tasks.py (2)

1484-1503: Use logger.exception for better tracebacks in error handlers.

At lines 1485-1487 and 1503, logger.error(...) is used to log exceptions, but logger.exception(...) would automatically include the traceback, which is more useful for debugging callback failures.

Suggested fix
             except Exception as e:
-                logger.error(
-                    f"Unified batch callback processing failed for execution {context.execution_id}: {e}"
+                logger.exception(
+                    f"Unified batch callback processing failed for execution {context.execution_id}: {e}"
                 )
                 except Exception as cleanup_error:
-                    logger.error(f"Failed to mark execution as failed: {cleanup_error}")
+                    logger.exception(f"Failed to mark execution as failed: {cleanup_error}")

1757-1777: Same logger.errorlogger.exception opportunity in API callback error path.

Lines 1758-1760 and 1777 use logger.error where logger.exception would capture the full traceback for easier debugging.

Suggested fix
             except Exception as e:
-                logger.error(
+                logger.exception(
                     f"API callback processing failed for execution {execution_id}: {e}"
                 )
                 except Exception as update_error:
-                    logger.error(f"Failed to update execution status: {update_error}")
+                    logger.exception(f"Failed to update execution status: {update_error}")

…strun handler

Tests now call the real worker.on_task_postrun() signal handler instead of
simulating the guard logic inline, catching divergence if the handler's
guard, try/except, or import path changes.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In `@workers/shared/tests/test_session_lifecycle.py`:
- Around line 243-263: test_default_pool_size uses patch.dict(..., clear=False)
so an externally-set API_CLIENT_POOL_SIZE can leak into the test; ensure the
test removes any inherited value before instantiating WorkerConfig by
deleting/unsetting API_CLIENT_POOL_SIZE in the patched context (so
WorkerConfig().api_client_pool_size is forced to use the hardcoded default).
Locate test_default_pool_size and adjust the patched block to explicitly remove
API_CLIENT_POOL_SIZE from os.environ (e.g., pop if present) before creating
WorkerConfig and asserting api_client_pool_size == 10.
🧹 Nitpick comments (3)
workers/shared/tests/test_session_lifecycle.py (3)

322-348: Prefix unused fixture parameters with _ to suppress Ruff ARG002.

mock_config_singleton is correctly used for its env-patching side effect, but Ruff flags it as unused. Prefixing with _ is the idiomatic pytest convention for fixtures consumed only for side effects.

-    def test_increment_counter(self, mock_config_singleton):
+    def test_increment_counter(self, _mock_config_singleton):
-    def test_threshold_triggers_reset(self, mock_config_singleton):
+    def test_threshold_triggers_reset(self, _mock_config_singleton):

Alternatively, apply @pytest.mark.usefixtures("mock_config_singleton") at the class or method level to avoid the parameter entirely.


415-424: Extract the repeated sub-client attribute list into a constant.

The same 8-element list appears three times in this class. If a sub-client is added or renamed in InternalAPIClient, only some lists may get updated, causing silent test gaps.

Suggested refactor

Define once at module or class level:

_SUB_CLIENT_ATTRS = [
    "base_client",
    "execution_client",
    "file_client",
    "webhook_client",
    "organization_client",
    "tool_client",
    "workflow_client",
    "usage_client",
]

Then reference _SUB_CLIENT_ATTRS in all three test methods.

Also applies to: 466-475, 488-497


523-526: Prefix unused unpacked variables with _ to suppress Ruff RUF059.

The unpacked values aren't needed in these cleanup-focused tests.

-            ) as (cfg, client):
+            ) as (_cfg, _client):

Also applies to: 541-545

@chandrasekharan-zipstack
Copy link
Contributor

@muhammad-ali-e I also suggest wiring up the tests to run with tox on every PR to catch regressions

muhammad-ali-e and others added 4 commits March 10, 2026 15:29
…ove close() logging

Cache the singleton_reset_task_threshold to avoid re-importing WorkerConfig on
every task increment. Promote api_client.close() failure logs from debug to
warning for better production visibility. Update tests to reset cached threshold.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…r imports

Refactor api-deployment tasks to handle setup failures early with proper
cleanup, move shared imports to module level in worker.py, and fix type
annotations in client_factory.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@sonarqubecloud
Copy link

@github-actions
Copy link
Contributor

Test Results

Summary
  • Runner Tests: 11 passed, 0 failed (11 total)
  • SDK1 Tests: 63 passed, 0 failed (63 total)

Runner Tests - Full Report
filepath function $$\textcolor{#23d18b}{\tt{passed}}$$ SUBTOTAL
$$\textcolor{#23d18b}{\tt{runner/src/unstract/runner/clients/test\_docker.py}}$$ $$\textcolor{#23d18b}{\tt{test\_logs}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{runner/src/unstract/runner/clients/test\_docker.py}}$$ $$\textcolor{#23d18b}{\tt{test\_cleanup}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{runner/src/unstract/runner/clients/test\_docker.py}}$$ $$\textcolor{#23d18b}{\tt{test\_cleanup\_skip}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{runner/src/unstract/runner/clients/test\_docker.py}}$$ $$\textcolor{#23d18b}{\tt{test\_client\_init}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{runner/src/unstract/runner/clients/test\_docker.py}}$$ $$\textcolor{#23d18b}{\tt{test\_get\_image\_exists}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{runner/src/unstract/runner/clients/test\_docker.py}}$$ $$\textcolor{#23d18b}{\tt{test\_get\_image}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{runner/src/unstract/runner/clients/test\_docker.py}}$$ $$\textcolor{#23d18b}{\tt{test\_get\_container\_run\_config}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{runner/src/unstract/runner/clients/test\_docker.py}}$$ $$\textcolor{#23d18b}{\tt{test\_get\_container\_run\_config\_without\_mount}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{runner/src/unstract/runner/clients/test\_docker.py}}$$ $$\textcolor{#23d18b}{\tt{test\_run\_container}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{runner/src/unstract/runner/clients/test\_docker.py}}$$ $$\textcolor{#23d18b}{\tt{test\_get\_image\_for\_sidecar}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{runner/src/unstract/runner/clients/test\_docker.py}}$$ $$\textcolor{#23d18b}{\tt{test\_sidecar\_container}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{TOTAL}}$$ $$\textcolor{#23d18b}{\tt{11}}$$ $$\textcolor{#23d18b}{\tt{11}}$$
SDK1 Tests - Full Report
filepath function $$\textcolor{#23d18b}{\tt{passed}}$$ SUBTOTAL
$$\textcolor{#23d18b}{\tt{tests/test\_platform.py}}$$ $$\textcolor{#23d18b}{\tt{TestPlatformHelperRetry.test\_success\_on\_first\_attempt}}$$ $$\textcolor{#23d18b}{\tt{2}}$$ $$\textcolor{#23d18b}{\tt{2}}$$
$$\textcolor{#23d18b}{\tt{tests/test\_platform.py}}$$ $$\textcolor{#23d18b}{\tt{TestPlatformHelperRetry.test\_retry\_on\_connection\_error}}$$ $$\textcolor{#23d18b}{\tt{2}}$$ $$\textcolor{#23d18b}{\tt{2}}$$
$$\textcolor{#23d18b}{\tt{tests/test\_platform.py}}$$ $$\textcolor{#23d18b}{\tt{TestPlatformHelperRetry.test\_non\_retryable\_http\_error}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{tests/test\_platform.py}}$$ $$\textcolor{#23d18b}{\tt{TestPlatformHelperRetry.test\_retryable\_http\_errors}}$$ $$\textcolor{#23d18b}{\tt{3}}$$ $$\textcolor{#23d18b}{\tt{3}}$$
$$\textcolor{#23d18b}{\tt{tests/test\_platform.py}}$$ $$\textcolor{#23d18b}{\tt{TestPlatformHelperRetry.test\_post\_method\_retry}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{tests/test\_platform.py}}$$ $$\textcolor{#23d18b}{\tt{TestPlatformHelperRetry.test\_retry\_logging}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{tests/test\_prompt.py}}$$ $$\textcolor{#23d18b}{\tt{TestPromptToolRetry.test\_success\_on\_first\_attempt}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{tests/test\_prompt.py}}$$ $$\textcolor{#23d18b}{\tt{TestPromptToolRetry.test\_retry\_on\_errors}}$$ $$\textcolor{#23d18b}{\tt{2}}$$ $$\textcolor{#23d18b}{\tt{2}}$$
$$\textcolor{#23d18b}{\tt{tests/test\_prompt.py}}$$ $$\textcolor{#23d18b}{\tt{TestPromptToolRetry.test\_wrapper\_methods\_retry}}$$ $$\textcolor{#23d18b}{\tt{4}}$$ $$\textcolor{#23d18b}{\tt{4}}$$
$$\textcolor{#23d18b}{\tt{tests/utils/test\_retry\_utils.py}}$$ $$\textcolor{#23d18b}{\tt{TestIsRetryableError.test\_connection\_error\_is\_retryable}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{tests/utils/test\_retry\_utils.py}}$$ $$\textcolor{#23d18b}{\tt{TestIsRetryableError.test\_timeout\_is\_retryable}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{tests/utils/test\_retry\_utils.py}}$$ $$\textcolor{#23d18b}{\tt{TestIsRetryableError.test\_http\_error\_retryable\_status\_codes}}$$ $$\textcolor{#23d18b}{\tt{3}}$$ $$\textcolor{#23d18b}{\tt{3}}$$
$$\textcolor{#23d18b}{\tt{tests/utils/test\_retry\_utils.py}}$$ $$\textcolor{#23d18b}{\tt{TestIsRetryableError.test\_http\_error\_non\_retryable\_status\_codes}}$$ $$\textcolor{#23d18b}{\tt{5}}$$ $$\textcolor{#23d18b}{\tt{5}}$$
$$\textcolor{#23d18b}{\tt{tests/utils/test\_retry\_utils.py}}$$ $$\textcolor{#23d18b}{\tt{TestIsRetryableError.test\_http\_error\_without\_response}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{tests/utils/test\_retry\_utils.py}}$$ $$\textcolor{#23d18b}{\tt{TestIsRetryableError.test\_os\_error\_retryable\_errno}}$$ $$\textcolor{#23d18b}{\tt{5}}$$ $$\textcolor{#23d18b}{\tt{5}}$$
$$\textcolor{#23d18b}{\tt{tests/utils/test\_retry\_utils.py}}$$ $$\textcolor{#23d18b}{\tt{TestIsRetryableError.test\_os\_error\_non\_retryable\_errno}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{tests/utils/test\_retry\_utils.py}}$$ $$\textcolor{#23d18b}{\tt{TestIsRetryableError.test\_other\_exception\_not\_retryable}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{tests/utils/test\_retry\_utils.py}}$$ $$\textcolor{#23d18b}{\tt{TestCalculateDelay.test\_exponential\_backoff\_without\_jitter}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{tests/utils/test\_retry\_utils.py}}$$ $$\textcolor{#23d18b}{\tt{TestCalculateDelay.test\_exponential\_backoff\_with\_jitter}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{tests/utils/test\_retry\_utils.py}}$$ $$\textcolor{#23d18b}{\tt{TestCalculateDelay.test\_max\_delay\_cap}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{tests/utils/test\_retry\_utils.py}}$$ $$\textcolor{#23d18b}{\tt{TestCalculateDelay.test\_max\_delay\_cap\_with\_jitter}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{tests/utils/test\_retry\_utils.py}}$$ $$\textcolor{#23d18b}{\tt{TestRetryWithExponentialBackoff.test\_successful\_call\_first\_attempt}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{tests/utils/test\_retry\_utils.py}}$$ $$\textcolor{#23d18b}{\tt{TestRetryWithExponentialBackoff.test\_retry\_after\_transient\_failure}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{tests/utils/test\_retry\_utils.py}}$$ $$\textcolor{#23d18b}{\tt{TestRetryWithExponentialBackoff.test\_max\_retries\_exceeded}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{tests/utils/test\_retry\_utils.py}}$$ $$\textcolor{#23d18b}{\tt{TestRetryWithExponentialBackoff.test\_retry\_with\_custom\_predicate}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{tests/utils/test\_retry\_utils.py}}$$ $$\textcolor{#23d18b}{\tt{TestRetryWithExponentialBackoff.test\_no\_retry\_with\_predicate\_false}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{tests/utils/test\_retry\_utils.py}}$$ $$\textcolor{#23d18b}{\tt{TestRetryWithExponentialBackoff.test\_exception\_not\_in\_tuple\_not\_retried}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{tests/utils/test\_retry\_utils.py}}$$ $$\textcolor{#23d18b}{\tt{TestCreateRetryDecorator.test\_default\_configuration}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{tests/utils/test\_retry\_utils.py}}$$ $$\textcolor{#23d18b}{\tt{TestCreateRetryDecorator.test\_environment\_variable\_configuration}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{tests/utils/test\_retry\_utils.py}}$$ $$\textcolor{#23d18b}{\tt{TestCreateRetryDecorator.test\_invalid\_max\_retries}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{tests/utils/test\_retry\_utils.py}}$$ $$\textcolor{#23d18b}{\tt{TestCreateRetryDecorator.test\_invalid\_base\_delay}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{tests/utils/test\_retry\_utils.py}}$$ $$\textcolor{#23d18b}{\tt{TestCreateRetryDecorator.test\_invalid\_multiplier}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{tests/utils/test\_retry\_utils.py}}$$ $$\textcolor{#23d18b}{\tt{TestCreateRetryDecorator.test\_jitter\_values}}$$ $$\textcolor{#23d18b}{\tt{2}}$$ $$\textcolor{#23d18b}{\tt{2}}$$
$$\textcolor{#23d18b}{\tt{tests/utils/test\_retry\_utils.py}}$$ $$\textcolor{#23d18b}{\tt{TestCreateRetryDecorator.test\_custom\_exceptions\_only}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{tests/utils/test\_retry\_utils.py}}$$ $$\textcolor{#23d18b}{\tt{TestCreateRetryDecorator.test\_custom\_predicate\_only}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{tests/utils/test\_retry\_utils.py}}$$ $$\textcolor{#23d18b}{\tt{TestCreateRetryDecorator.test\_both\_exceptions\_and\_predicate}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{tests/utils/test\_retry\_utils.py}}$$ $$\textcolor{#23d18b}{\tt{TestCreateRetryDecorator.test\_exceptions\_match\_but\_predicate\_false}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{tests/utils/test\_retry\_utils.py}}$$ $$\textcolor{#23d18b}{\tt{TestPreconfiguredDecorators.test\_retry\_platform\_service\_call\_exists}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{tests/utils/test\_retry\_utils.py}}$$ $$\textcolor{#23d18b}{\tt{TestPreconfiguredDecorators.test\_retry\_prompt\_service\_call\_exists}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{tests/utils/test\_retry\_utils.py}}$$ $$\textcolor{#23d18b}{\tt{TestPreconfiguredDecorators.test\_platform\_service\_decorator\_retries\_on\_connection\_error}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{tests/utils/test\_retry\_utils.py}}$$ $$\textcolor{#23d18b}{\tt{TestPreconfiguredDecorators.test\_prompt\_service\_decorator\_retries\_on\_timeout}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{tests/utils/test\_retry\_utils.py}}$$ $$\textcolor{#23d18b}{\tt{TestRetryLogging.test\_warning\_logged\_on\_retry}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{tests/utils/test\_retry\_utils.py}}$$ $$\textcolor{#23d18b}{\tt{TestRetryLogging.test\_info\_logged\_on\_success\_after\_retry}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{tests/utils/test\_retry\_utils.py}}$$ $$\textcolor{#23d18b}{\tt{TestRetryLogging.test\_exception\_logged\_on\_giving\_up}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{TOTAL}}$$ $$\textcolor{#23d18b}{\tt{63}}$$ $$\textcolor{#23d18b}{\tt{63}}$$

@greptile-apps
Copy link
Contributor

greptile-apps bot commented Mar 19, 2026

Greptile Summary

This PR adds HTTP session lifecycle management to all worker API clients: an idempotent close() method, a __del__ destructor safety net, try/finally cleanup blocks in task functions, a _owns_session flag to prevent singleton shared sessions from being torn down by individual client calls, wiring of API_CLIENT_POOL_SIZE into HTTPAdapter, and a periodic singleton reset via on_task_postrun. The changes are well-structured and largely additive with no behavioral changes to the default configuration.

Key findings:

  • Session leak in _extract_callback_parameters fast path (workers/callback/tasks.py ~line 702): when organization_id is present in kwargs, api_client = create_api_client(org_id_from_kwargs) is created inside a try block but is not closed in the except handler before re-raising. If get_workflow_execution or any downstream code raises, the client is leaked. The fallback path for the same function correctly uses try/finally for temp_api_client, making this inconsistency notable — especially for a PR whose stated goal is eliminating session/FD accumulation.
  • _cached_reset_threshold accessed outside _task_counter_lock (workers/shared/api/internal_client.py): The cache-miss read and write happen before acquiring the lock. Safe under CPython's GIL with a prefork pool, but susceptible to races in gevent/eventlet pools.
  • _last_reset_time is dead code (workers/shared/api/internal_client.py): The field is declared, cleared in reset_singleton(), and reset in the test fixture, but is never assigned a non-None value anywhere in production code.

Confidence Score: 3/5

  • Safe to merge with low risk — all changes are additive safety nets with default behavior unchanged, but a session leak in the callback fast-path error route should be addressed first.
  • The PR is well-intentioned and mostly correct: the base client changes, destructor, pool wiring, singleton reset, and worker shutdown hook are all clean. The score is reduced from 5 because (1) the fast-path error branch in _extract_callback_parameters can leak an API client — directly contradicting the PR's stated goal — and (2) _cached_reset_threshold is accessed outside the task counter lock, creating a latent race for non-prefork pools. Neither issue would cause visible breakage under default settings today, but they are quality gaps in new code introduced by this PR.
  • Pay close attention to workers/callback/tasks.py (fast-path leak in _extract_callback_parameters) and workers/shared/api/internal_client.py (_cached_reset_threshold lock gap and _last_reset_time dead code).

Important Files Changed

Filename Overview
workers/shared/clients/base_client.py Core changes: adds _owns_session flag, idempotent close(), __del__ destructor, context manager support, and wires API_CLIENT_POOL_SIZE into HTTPAdapter. Logic is sound; destructor correctly guards against missing attributes and never raises.
workers/shared/api/internal_client.py Adds reset_singleton(), increment_task_counter(), _owns_session=False propagation to all shared clients, and singleton-aware close(). Two issues: _cached_reset_threshold is read/written outside _task_counter_lock (race in gevent/eventlet pools), and _last_reset_time is declared and cleared but never populated — appears to be dead code.
workers/callback/tasks.py Adds try/finally with api_client.close() in _process_batch_callback_core and process_batch_callback_api. However, in _extract_callback_parameters the fast path creates api_client inside a try block without a corresponding finally — if an exception occurs after creation, the client is not explicitly closed before re-raising.
workers/api-deployment/tasks.py Splits the monolithic try/except into a two-phase pattern: a dedicated setup try/except that returns early on failure, then a main try/finally that guarantees api_client.close() and StateStore.clear_all(). Clean and correct.
workers/worker.py Adds on_worker_process_shutdown hook that calls reset_singleton() and ClientFactory.reset_shared_state(), and adds an early-return guard to on_task_postrun when singleton is disabled. Both changes are correct and non-breaking.
workers/shared/infrastructure/config/worker_config.py Adds api_client_pool_size (default 10) and singleton_reset_task_threshold (default 1000) fields. Both use os.getenv with sensible defaults and are consistent with the existing dataclass pattern.
workers/shared/tests/test_session_lifecycle.py 25 tests across 8 classes covering destructor behavior, idempotent close, pool size config, singleton reset, task counter threshold, postrun guard, and context manager cleanup. Good coverage of the happy paths; does not exercise the _extract_callback_parameters fast-path failure case.
workers/shared/patterns/factory/client_factory.py Adds reset_shared_state() classmethod that closes the shared base client and clears class-level state. Correctly uses _client_lock and handles close() exceptions gracefully.
workers/conftest.py New root conftest that pre-seeds required environment variables before any worker module is imported, preventing import-time failures in tests. Simple and correct.

Sequence Diagram

sequenceDiagram
    participant Task as Celery Task
    participant IAC as InternalAPIClient
    participant BAC as BaseAPIClient
    participant Session as requests.Session
    participant Signal as task_postrun signal

    Note over Task,Session: Traditional Mode (ENABLE_API_CLIENT_SINGLETON=false, default)
    Task->>IAC: create InternalAPIClient(config)
    IAC->>BAC: create BaseAPIClient(_owns_session=True)
    BAC->>Session: Session() + HTTPAdapter(pool_size)
    Task->>IAC: use client (API calls)
    Task->>IAC: close() [try/finally]
    IAC->>BAC: close() each sub-client
    BAC->>Session: session.close()

    Note over Task,Session: Singleton Mode (ENABLE_API_CLIENT_SINGLETON=true)
    Task->>IAC: create InternalAPIClient(config)
    alt First instantiation
        IAC->>BAC: create BaseAPIClient(_owns_session=False)
        BAC->>Session: shared Session stored in _shared_session
    else Subsequent instantiation
        IAC->>BAC: create BaseAPIClient, swap session with _shared_session
        BAC->>Session: new session.close(), then use shared session
    end
    Task->>IAC: close() [try/finally]
    IAC-->>Task: no-op (shared session preserved)
    Signal->>IAC: increment_task_counter()
    alt counter >= WORKER_SINGLETON_RESET_THRESHOLD
        IAC->>Session: _shared_session.close()
        IAC->>IAC: reset class state
    end

    Note over Task,Session: Worker Shutdown
    Task->>IAC: on_worker_process_shutdown
    IAC->>Session: reset_singleton() → session.close()
Loading

Comments Outside Diff (1)

  1. workers/callback/tasks.py, line 700-839 (link)

    P1 API client leaked on exception in fast path

    In the fast path (when org_id_from_kwargs is truthy), api_client is created at ~line 702 via create_api_client(org_id_from_kwargs). If any subsequent code in the try block raises an exception — for example, api_client.get_workflow_execution(...) failing — the except block at line 837 re-raises a ValueError without ever calling api_client.close(). The caller _process_batch_callback_core only has a finally block that closes context.api_client, but context is never populated when _extract_callback_parameters raises, so that cleanup never runs either.

    While BaseAPIClient.__del__ provides a GC-based safety net (one of the PR's explicit goals), this is exactly the kind of pattern the PR aims to eliminate — an explicitly-created client that may not be deterministically closed on the error path. The fallback path correctly uses a try/finally around temp_api_client, but the fast path lacks equivalent protection.

    Suggested fix — wrap the fast path in a try/finally:

    if org_id_from_kwargs:
        api_client = create_api_client(org_id_from_kwargs)
        try:
            execution_response = api_client.get_workflow_execution(
                context.execution_id, file_execution=False
            )
            # ... rest of fast path ...
        except Exception:
            api_client.close()
            raise

    or restructure so a single try/finally covers both paths before assigning context.api_client.

    Prompt To Fix With AI
    This is a comment left during a code review.
    Path: workers/callback/tasks.py
    Line: 700-839
    
    Comment:
    **API client leaked on exception in fast path**
    
    In the fast path (when `org_id_from_kwargs` is truthy), `api_client` is created at ~line 702 via `create_api_client(org_id_from_kwargs)`. If any subsequent code in the `try` block raises an exception — for example, `api_client.get_workflow_execution(...)` failing — the `except` block at line 837 re-raises a `ValueError` without ever calling `api_client.close()`. The caller `_process_batch_callback_core` only has a `finally` block that closes `context.api_client`, but `context` is never populated when `_extract_callback_parameters` raises, so that cleanup never runs either.
    
    While `BaseAPIClient.__del__` provides a GC-based safety net (one of the PR's explicit goals), this is exactly the kind of pattern the PR aims to eliminate — an explicitly-created client that may not be deterministically closed on the error path. The fallback path correctly uses a `try/finally` around `temp_api_client`, but the fast path lacks equivalent protection.
    
    Suggested fix — wrap the fast path in a try/finally:
    
    ```python
    if org_id_from_kwargs:
        api_client = create_api_client(org_id_from_kwargs)
        try:
            execution_response = api_client.get_workflow_execution(
                context.execution_id, file_execution=False
            )
            # ... rest of fast path ...
        except Exception:
            api_client.close()
            raise
    ```
    
    or restructure so a single `try/finally` covers both paths before assigning `context.api_client`.
    
    How can I resolve this? If you propose a fix, please make it concise.
Prompt To Fix All With AI
This is a comment left during a code review.
Path: workers/callback/tasks.py
Line: 700-839

Comment:
**API client leaked on exception in fast path**

In the fast path (when `org_id_from_kwargs` is truthy), `api_client` is created at ~line 702 via `create_api_client(org_id_from_kwargs)`. If any subsequent code in the `try` block raises an exception — for example, `api_client.get_workflow_execution(...)` failing — the `except` block at line 837 re-raises a `ValueError` without ever calling `api_client.close()`. The caller `_process_batch_callback_core` only has a `finally` block that closes `context.api_client`, but `context` is never populated when `_extract_callback_parameters` raises, so that cleanup never runs either.

While `BaseAPIClient.__del__` provides a GC-based safety net (one of the PR's explicit goals), this is exactly the kind of pattern the PR aims to eliminate — an explicitly-created client that may not be deterministically closed on the error path. The fallback path correctly uses a `try/finally` around `temp_api_client`, but the fast path lacks equivalent protection.

Suggested fix — wrap the fast path in a try/finally:

```python
if org_id_from_kwargs:
    api_client = create_api_client(org_id_from_kwargs)
    try:
        execution_response = api_client.get_workflow_execution(
            context.execution_id, file_execution=False
        )
        # ... rest of fast path ...
    except Exception:
        api_client.close()
        raise
```

or restructure so a single `try/finally` covers both paths before assigning `context.api_client`.

How can I resolve this? If you propose a fix, please make it concise.

---

This is a comment left during a code review.
Path: workers/shared/api/internal_client.py
Line: 289-293

Comment:
**`_cached_reset_threshold` read and written outside the lock**

The check and assignment on lines 289–290 happen before the `with cls._task_counter_lock:` block:

```python
if cls._cached_reset_threshold is None:
    cls._cached_reset_threshold = WorkerConfig().singleton_reset_task_threshold

with cls._task_counter_lock:
    cls._task_counter += 1
    ...
```

Meanwhile, `reset_singleton()` writes `cls._cached_reset_threshold = None` without holding `_task_counter_lock`. In the default prefork pool this is safe because CPython's GIL prevents true concurrent writes. However, the code comments in `reset_singleton()` explicitly note it's designed for gevent/eventlet compatibility too — in those pools, multiple greenlets can run concurrently and the unprotected read-modify-write of `_cached_reset_threshold` could result in two greenlets both seeing `None`, both constructing a `WorkerConfig()`, and writing the same value (harmless) or, more subtly, the threshold being reset to `None` between the check and the lock acquisition in another greenlet.

Moving the cache-miss read/write inside the lock would eliminate the race:

```python
with cls._task_counter_lock:
    if cls._cached_reset_threshold is None:
        cls._cached_reset_threshold = WorkerConfig().singleton_reset_task_threshold
    cls._task_counter += 1
    if (
        cls._cached_reset_threshold > 0
        and cls._task_counter >= cls._cached_reset_threshold
    ):
        ...
        cls.reset_singleton()
```

How can I resolve this? If you propose a fix, please make it concise.

---

This is a comment left during a code review.
Path: workers/shared/api/internal_client.py
Line: 91-94

Comment:
**`_last_reset_time` is declared but never populated**

`_last_reset_time: float | None = None` is declared as a class variable and also appears in `reset_singleton()` as a cleared field and in the test fixture's teardown — but it is never set to a non-`None` value anywhere in the production code. There is no logic that reads it either.

This looks like an incomplete implementation, perhaps intended for rate-limiting consecutive resets (a "cooldown" guard). If that feature is planned, tracking it here would avoid confusion. If it's not needed, removing the declaration and the test fixture cleanup for `_last_reset_time` would reduce dead state.

```python
# Remove if unused:
_last_reset_time: float | None = None
```

How can I resolve this? If you propose a fix, please make it concise.

Last reviewed commit: "Merge branch 'main' ..."

Comment on lines +289 to +293
if cls._cached_reset_threshold is None:
cls._cached_reset_threshold = WorkerConfig().singleton_reset_task_threshold

with cls._task_counter_lock:
cls._task_counter += 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 _cached_reset_threshold read and written outside the lock

The check and assignment on lines 289–290 happen before the with cls._task_counter_lock: block:

if cls._cached_reset_threshold is None:
    cls._cached_reset_threshold = WorkerConfig().singleton_reset_task_threshold

with cls._task_counter_lock:
    cls._task_counter += 1
    ...

Meanwhile, reset_singleton() writes cls._cached_reset_threshold = None without holding _task_counter_lock. In the default prefork pool this is safe because CPython's GIL prevents true concurrent writes. However, the code comments in reset_singleton() explicitly note it's designed for gevent/eventlet compatibility too — in those pools, multiple greenlets can run concurrently and the unprotected read-modify-write of _cached_reset_threshold could result in two greenlets both seeing None, both constructing a WorkerConfig(), and writing the same value (harmless) or, more subtly, the threshold being reset to None between the check and the lock acquisition in another greenlet.

Moving the cache-miss read/write inside the lock would eliminate the race:

with cls._task_counter_lock:
    if cls._cached_reset_threshold is None:
        cls._cached_reset_threshold = WorkerConfig().singleton_reset_task_threshold
    cls._task_counter += 1
    if (
        cls._cached_reset_threshold > 0
        and cls._task_counter >= cls._cached_reset_threshold
    ):
        ...
        cls.reset_singleton()
Prompt To Fix With AI
This is a comment left during a code review.
Path: workers/shared/api/internal_client.py
Line: 289-293

Comment:
**`_cached_reset_threshold` read and written outside the lock**

The check and assignment on lines 289–290 happen before the `with cls._task_counter_lock:` block:

```python
if cls._cached_reset_threshold is None:
    cls._cached_reset_threshold = WorkerConfig().singleton_reset_task_threshold

with cls._task_counter_lock:
    cls._task_counter += 1
    ...
```

Meanwhile, `reset_singleton()` writes `cls._cached_reset_threshold = None` without holding `_task_counter_lock`. In the default prefork pool this is safe because CPython's GIL prevents true concurrent writes. However, the code comments in `reset_singleton()` explicitly note it's designed for gevent/eventlet compatibility too — in those pools, multiple greenlets can run concurrently and the unprotected read-modify-write of `_cached_reset_threshold` could result in two greenlets both seeing `None`, both constructing a `WorkerConfig()`, and writing the same value (harmless) or, more subtly, the threshold being reset to `None` between the check and the lock acquisition in another greenlet.

Moving the cache-miss read/write inside the lock would eliminate the race:

```python
with cls._task_counter_lock:
    if cls._cached_reset_threshold is None:
        cls._cached_reset_threshold = WorkerConfig().singleton_reset_task_threshold
    cls._task_counter += 1
    if (
        cls._cached_reset_threshold > 0
        and cls._task_counter >= cls._cached_reset_threshold
    ):
        ...
        cls.reset_singleton()
```

How can I resolve this? If you propose a fix, please make it concise.

Comment on lines +91 to +94
_task_counter: int = 0
_task_counter_lock: threading.Lock = threading.Lock()
_last_reset_time: float | None = None
_cached_reset_threshold: int | None = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 _last_reset_time is declared but never populated

_last_reset_time: float | None = None is declared as a class variable and also appears in reset_singleton() as a cleared field and in the test fixture's teardown — but it is never set to a non-None value anywhere in the production code. There is no logic that reads it either.

This looks like an incomplete implementation, perhaps intended for rate-limiting consecutive resets (a "cooldown" guard). If that feature is planned, tracking it here would avoid confusion. If it's not needed, removing the declaration and the test fixture cleanup for _last_reset_time would reduce dead state.

# Remove if unused:
_last_reset_time: float | None = None
Prompt To Fix With AI
This is a comment left during a code review.
Path: workers/shared/api/internal_client.py
Line: 91-94

Comment:
**`_last_reset_time` is declared but never populated**

`_last_reset_time: float | None = None` is declared as a class variable and also appears in `reset_singleton()` as a cleared field and in the test fixture's teardown — but it is never set to a non-`None` value anywhere in the production code. There is no logic that reads it either.

This looks like an incomplete implementation, perhaps intended for rate-limiting consecutive resets (a "cooldown" guard). If that feature is planned, tracking it here would avoid confusion. If it's not needed, removing the declaration and the test fixture cleanup for `_last_reset_time` would reduce dead state.

```python
# Remove if unused:
_last_reset_time: float | None = None
```

How can I resolve this? If you propose a fix, please make it concise.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants