From 9462f5de16e9a7cd5fee8de18cf47f9c90f251d7 Mon Sep 17 00:00:00 2001 From: Sayak Maity Date: Thu, 5 Feb 2026 15:30:40 -0500 Subject: [PATCH] fix: reduce HTTPX connection limits to add backpressure Lower HTTPX connection pool limits from 1000 to configurable defaults (200 max connections, 100 max keepalive) to prevent flooding agent pods with concurrent requests during high load. Excess requests now queue at the pool level instead of all hitting OpenAI simultaneously. --- agentex/src/adapters/http/adapter_httpx.py | 37 +++++++++++++-------- agentex/src/config/environment_variables.py | 10 ++++++ 2 files changed, 34 insertions(+), 13 deletions(-) diff --git a/agentex/src/adapters/http/adapter_httpx.py b/agentex/src/adapters/http/adapter_httpx.py index 1fab0f0..ee8bf66 100644 --- a/agentex/src/adapters/http/adapter_httpx.py +++ b/agentex/src/adapters/http/adapter_httpx.py @@ -12,14 +12,6 @@ logger = make_logger(__name__) -# Global connection pool limits for better connection management -# Increased limits to handle streaming requests that hold connections longer -DEFAULT_LIMITS = Limits( - max_keepalive_connections=1000, # Max connections to keep alive (increased) - max_connections=1000, # Max total connections allowed (increased for streaming) - keepalive_expiry=30, # Seconds to keep connections alive -) - class HttpxGateway(HttpPort): # Class-level cached clients shared across all instances @@ -37,6 +29,11 @@ def _get_regular_client(cls) -> httpx.AsyncClient: """Get or create the shared regular HTTP client.""" if cls._regular_client is None: env = cls._environment_variables + limits = Limits( + max_connections=env.HTTPX_MAX_CONNECTIONS, + max_keepalive_connections=env.HTTPX_MAX_KEEPALIVE_CONNECTIONS, + keepalive_expiry=30, + ) timeout = Timeout( connect=env.HTTPX_CONNECT_TIMEOUT, read=env.HTTPX_READ_TIMEOUT, @@ -44,13 +41,15 @@ def _get_regular_client(cls) -> httpx.AsyncClient: pool=env.HTTPX_POOL_TIMEOUT, ) cls._regular_client = httpx.AsyncClient( - limits=DEFAULT_LIMITS, + limits=limits, timeout=timeout, http2=True, # Enable HTTP/2 for better connection reuse follow_redirects=True, ) logger.info( - f"Created shared regular httpx client (id: {id(cls._regular_client)})" + f"Created shared regular httpx client (id: {id(cls._regular_client)}, " + f"max_connections={env.HTTPX_MAX_CONNECTIONS}, " + f"max_keepalive={env.HTTPX_MAX_KEEPALIVE_CONNECTIONS})" ) return cls._regular_client @@ -59,6 +58,11 @@ def _get_streaming_client(cls) -> httpx.AsyncClient: """Get or create the shared streaming HTTP client.""" if cls._streaming_client is None: env = cls._environment_variables + limits = Limits( + max_connections=env.HTTPX_MAX_CONNECTIONS, + max_keepalive_connections=env.HTTPX_MAX_KEEPALIVE_CONNECTIONS, + keepalive_expiry=30, + ) # Use longer timeout for streaming streaming_timeout = Timeout( connect=env.HTTPX_CONNECT_TIMEOUT, @@ -67,13 +71,15 @@ def _get_streaming_client(cls) -> httpx.AsyncClient: pool=env.HTTPX_POOL_TIMEOUT, ) cls._streaming_client = httpx.AsyncClient( - limits=DEFAULT_LIMITS, + limits=limits, timeout=streaming_timeout, http2=True, # Enable HTTP/2 for better streaming follow_redirects=True, ) logger.info( - f"Created shared streaming httpx client (id: {id(cls._streaming_client)})" + f"Created shared streaming httpx client (id: {id(cls._streaming_client)}, " + f"max_connections={env.HTTPX_MAX_CONNECTIONS}, " + f"max_keepalive={env.HTTPX_MAX_KEEPALIVE_CONNECTIONS})" ) return cls._streaming_client @@ -252,9 +258,14 @@ def call( ) try: + limits = Limits( + max_connections=env.HTTPX_MAX_CONNECTIONS, + max_keepalive_connections=env.HTTPX_MAX_KEEPALIVE_CONNECTIONS, + keepalive_expiry=30, + ) # Use a client with connection pool limits for sync calls with httpx.Client( - limits=DEFAULT_LIMITS, + limits=limits, timeout=httpx_timeout, http2=True, # Enable HTTP/2 follow_redirects=True, diff --git a/agentex/src/config/environment_variables.py b/agentex/src/config/environment_variables.py index f82743b..10e217d 100644 --- a/agentex/src/config/environment_variables.py +++ b/agentex/src/config/environment_variables.py @@ -46,6 +46,8 @@ class EnvVarKeys(str, Enum): ALLOWED_ORIGINS = "ALLOWED_ORIGINS" DD_AGENT_HOST = "DD_AGENT_HOST" DD_STATSD_PORT = "DD_STATSD_PORT" + HTTPX_MAX_CONNECTIONS = "HTTPX_MAX_CONNECTIONS" + HTTPX_MAX_KEEPALIVE_CONNECTIONS = "HTTPX_MAX_KEEPALIVE_CONNECTIONS" HTTPX_CONNECT_TIMEOUT = "HTTPX_CONNECT_TIMEOUT" HTTPX_READ_TIMEOUT = "HTTPX_READ_TIMEOUT" HTTPX_WRITE_TIMEOUT = "HTTPX_WRITE_TIMEOUT" @@ -95,6 +97,8 @@ class EnvironmentVariables(BaseModel): IMAGE_PULL_SECRET_NAME: str | None = None AGENTEX_AUTH_URL: str | None = None ALLOWED_ORIGINS: str | None = None + HTTPX_MAX_CONNECTIONS: int = 200 # Max total connections allowed + HTTPX_MAX_KEEPALIVE_CONNECTIONS: int = 100 # Max connections to keep alive HTTPX_CONNECT_TIMEOUT: float = 10.0 # HTTPX connection timeout in seconds HTTPX_READ_TIMEOUT: float = 30.0 # HTTPX read timeout in seconds HTTPX_WRITE_TIMEOUT: float = 30.0 # HTTPX write timeout in seconds @@ -158,6 +162,12 @@ def refresh(cls, force_refresh: bool = False) -> EnvironmentVariables | None: ALLOWED_ORIGINS=os.environ.get(EnvVarKeys.ALLOWED_ORIGINS, "*"), DD_AGENT_HOST=os.environ.get(EnvVarKeys.DD_AGENT_HOST), DD_STATSD_PORT=os.environ.get(EnvVarKeys.DD_STATSD_PORT), + HTTPX_MAX_CONNECTIONS=int( + os.environ.get(EnvVarKeys.HTTPX_MAX_CONNECTIONS, "200") + ), + HTTPX_MAX_KEEPALIVE_CONNECTIONS=int( + os.environ.get(EnvVarKeys.HTTPX_MAX_KEEPALIVE_CONNECTIONS, "100") + ), HTTPX_CONNECT_TIMEOUT=float( os.environ.get(EnvVarKeys.HTTPX_CONNECT_TIMEOUT, "10.0") ),