diff --git a/agentex/src/adapters/http/adapter_httpx.py b/agentex/src/adapters/http/adapter_httpx.py index 1fab0f0..ee8bf66 100644 --- a/agentex/src/adapters/http/adapter_httpx.py +++ b/agentex/src/adapters/http/adapter_httpx.py @@ -12,14 +12,6 @@ logger = make_logger(__name__) -# Global connection pool limits for better connection management -# Increased limits to handle streaming requests that hold connections longer -DEFAULT_LIMITS = Limits( - max_keepalive_connections=1000, # Max connections to keep alive (increased) - max_connections=1000, # Max total connections allowed (increased for streaming) - keepalive_expiry=30, # Seconds to keep connections alive -) - class HttpxGateway(HttpPort): # Class-level cached clients shared across all instances @@ -37,6 +29,11 @@ def _get_regular_client(cls) -> httpx.AsyncClient: """Get or create the shared regular HTTP client.""" if cls._regular_client is None: env = cls._environment_variables + limits = Limits( + max_connections=env.HTTPX_MAX_CONNECTIONS, + max_keepalive_connections=env.HTTPX_MAX_KEEPALIVE_CONNECTIONS, + keepalive_expiry=30, + ) timeout = Timeout( connect=env.HTTPX_CONNECT_TIMEOUT, read=env.HTTPX_READ_TIMEOUT, @@ -44,13 +41,15 @@ def _get_regular_client(cls) -> httpx.AsyncClient: pool=env.HTTPX_POOL_TIMEOUT, ) cls._regular_client = httpx.AsyncClient( - limits=DEFAULT_LIMITS, + limits=limits, timeout=timeout, http2=True, # Enable HTTP/2 for better connection reuse follow_redirects=True, ) logger.info( - f"Created shared regular httpx client (id: {id(cls._regular_client)})" + f"Created shared regular httpx client (id: {id(cls._regular_client)}, " + f"max_connections={env.HTTPX_MAX_CONNECTIONS}, " + f"max_keepalive={env.HTTPX_MAX_KEEPALIVE_CONNECTIONS})" ) return cls._regular_client @@ -59,6 +58,11 @@ def _get_streaming_client(cls) -> httpx.AsyncClient: """Get or create the shared streaming HTTP client.""" if cls._streaming_client is None: env = cls._environment_variables + limits = Limits( + max_connections=env.HTTPX_MAX_CONNECTIONS, + max_keepalive_connections=env.HTTPX_MAX_KEEPALIVE_CONNECTIONS, + keepalive_expiry=30, + ) # Use longer timeout for streaming streaming_timeout = Timeout( connect=env.HTTPX_CONNECT_TIMEOUT, @@ -67,13 +71,15 @@ def _get_streaming_client(cls) -> httpx.AsyncClient: pool=env.HTTPX_POOL_TIMEOUT, ) cls._streaming_client = httpx.AsyncClient( - limits=DEFAULT_LIMITS, + limits=limits, timeout=streaming_timeout, http2=True, # Enable HTTP/2 for better streaming follow_redirects=True, ) logger.info( - f"Created shared streaming httpx client (id: {id(cls._streaming_client)})" + f"Created shared streaming httpx client (id: {id(cls._streaming_client)}, " + f"max_connections={env.HTTPX_MAX_CONNECTIONS}, " + f"max_keepalive={env.HTTPX_MAX_KEEPALIVE_CONNECTIONS})" ) return cls._streaming_client @@ -252,9 +258,14 @@ def call( ) try: + limits = Limits( + max_connections=env.HTTPX_MAX_CONNECTIONS, + max_keepalive_connections=env.HTTPX_MAX_KEEPALIVE_CONNECTIONS, + keepalive_expiry=30, + ) # Use a client with connection pool limits for sync calls with httpx.Client( - limits=DEFAULT_LIMITS, + limits=limits, timeout=httpx_timeout, http2=True, # Enable HTTP/2 follow_redirects=True, diff --git a/agentex/src/config/environment_variables.py b/agentex/src/config/environment_variables.py index f82743b..10e217d 100644 --- a/agentex/src/config/environment_variables.py +++ b/agentex/src/config/environment_variables.py @@ -46,6 +46,8 @@ class EnvVarKeys(str, Enum): ALLOWED_ORIGINS = "ALLOWED_ORIGINS" DD_AGENT_HOST = "DD_AGENT_HOST" DD_STATSD_PORT = "DD_STATSD_PORT" + HTTPX_MAX_CONNECTIONS = "HTTPX_MAX_CONNECTIONS" + HTTPX_MAX_KEEPALIVE_CONNECTIONS = "HTTPX_MAX_KEEPALIVE_CONNECTIONS" HTTPX_CONNECT_TIMEOUT = "HTTPX_CONNECT_TIMEOUT" HTTPX_READ_TIMEOUT = "HTTPX_READ_TIMEOUT" HTTPX_WRITE_TIMEOUT = "HTTPX_WRITE_TIMEOUT" @@ -95,6 +97,8 @@ class EnvironmentVariables(BaseModel): IMAGE_PULL_SECRET_NAME: str | None = None AGENTEX_AUTH_URL: str | None = None ALLOWED_ORIGINS: str | None = None + HTTPX_MAX_CONNECTIONS: int = 200 # Max total connections allowed + HTTPX_MAX_KEEPALIVE_CONNECTIONS: int = 100 # Max connections to keep alive HTTPX_CONNECT_TIMEOUT: float = 10.0 # HTTPX connection timeout in seconds HTTPX_READ_TIMEOUT: float = 30.0 # HTTPX read timeout in seconds HTTPX_WRITE_TIMEOUT: float = 30.0 # HTTPX write timeout in seconds @@ -158,6 +162,12 @@ def refresh(cls, force_refresh: bool = False) -> EnvironmentVariables | None: ALLOWED_ORIGINS=os.environ.get(EnvVarKeys.ALLOWED_ORIGINS, "*"), DD_AGENT_HOST=os.environ.get(EnvVarKeys.DD_AGENT_HOST), DD_STATSD_PORT=os.environ.get(EnvVarKeys.DD_STATSD_PORT), + HTTPX_MAX_CONNECTIONS=int( + os.environ.get(EnvVarKeys.HTTPX_MAX_CONNECTIONS, "200") + ), + HTTPX_MAX_KEEPALIVE_CONNECTIONS=int( + os.environ.get(EnvVarKeys.HTTPX_MAX_KEEPALIVE_CONNECTIONS, "100") + ), HTTPX_CONNECT_TIMEOUT=float( os.environ.get(EnvVarKeys.HTTPX_CONNECT_TIMEOUT, "10.0") ),