Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 24 additions & 13 deletions agentex/src/adapters/http/adapter_httpx.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,6 @@

logger = make_logger(__name__)

# Global connection pool limits for better connection management
# Increased limits to handle streaming requests that hold connections longer
DEFAULT_LIMITS = Limits(
max_keepalive_connections=1000, # Max connections to keep alive (increased)
max_connections=1000, # Max total connections allowed (increased for streaming)
keepalive_expiry=30, # Seconds to keep connections alive
)


class HttpxGateway(HttpPort):
# Class-level cached clients shared across all instances
Expand All @@ -37,20 +29,27 @@ def _get_regular_client(cls) -> httpx.AsyncClient:
"""Get or create the shared regular HTTP client."""
if cls._regular_client is None:
env = cls._environment_variables
limits = Limits(
max_connections=env.HTTPX_MAX_CONNECTIONS,
max_keepalive_connections=env.HTTPX_MAX_KEEPALIVE_CONNECTIONS,
keepalive_expiry=30,
)
timeout = Timeout(
connect=env.HTTPX_CONNECT_TIMEOUT,
read=env.HTTPX_READ_TIMEOUT,
write=env.HTTPX_WRITE_TIMEOUT,
pool=env.HTTPX_POOL_TIMEOUT,
)
cls._regular_client = httpx.AsyncClient(
limits=DEFAULT_LIMITS,
limits=limits,
timeout=timeout,
http2=True, # Enable HTTP/2 for better connection reuse
follow_redirects=True,
)
logger.info(
f"Created shared regular httpx client (id: {id(cls._regular_client)})"
f"Created shared regular httpx client (id: {id(cls._regular_client)}, "
f"max_connections={env.HTTPX_MAX_CONNECTIONS}, "
f"max_keepalive={env.HTTPX_MAX_KEEPALIVE_CONNECTIONS})"
)
return cls._regular_client

Expand All @@ -59,6 +58,11 @@ def _get_streaming_client(cls) -> httpx.AsyncClient:
"""Get or create the shared streaming HTTP client."""
if cls._streaming_client is None:
env = cls._environment_variables
limits = Limits(
max_connections=env.HTTPX_MAX_CONNECTIONS,
max_keepalive_connections=env.HTTPX_MAX_KEEPALIVE_CONNECTIONS,
keepalive_expiry=30,
)
# Use longer timeout for streaming
streaming_timeout = Timeout(
connect=env.HTTPX_CONNECT_TIMEOUT,
Expand All @@ -67,13 +71,15 @@ def _get_streaming_client(cls) -> httpx.AsyncClient:
pool=env.HTTPX_POOL_TIMEOUT,
)
cls._streaming_client = httpx.AsyncClient(
limits=DEFAULT_LIMITS,
limits=limits,
timeout=streaming_timeout,
http2=True, # Enable HTTP/2 for better streaming
follow_redirects=True,
)
logger.info(
f"Created shared streaming httpx client (id: {id(cls._streaming_client)})"
f"Created shared streaming httpx client (id: {id(cls._streaming_client)}, "
f"max_connections={env.HTTPX_MAX_CONNECTIONS}, "
f"max_keepalive={env.HTTPX_MAX_KEEPALIVE_CONNECTIONS})"
)
return cls._streaming_client

Expand Down Expand Up @@ -252,9 +258,14 @@ def call(
)

try:
limits = Limits(
max_connections=env.HTTPX_MAX_CONNECTIONS,
max_keepalive_connections=env.HTTPX_MAX_KEEPALIVE_CONNECTIONS,
keepalive_expiry=30,
)
# Use a client with connection pool limits for sync calls
with httpx.Client(
limits=DEFAULT_LIMITS,
limits=limits,
timeout=httpx_timeout,
http2=True, # Enable HTTP/2
follow_redirects=True,
Expand Down
10 changes: 10 additions & 0 deletions agentex/src/config/environment_variables.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ class EnvVarKeys(str, Enum):
ALLOWED_ORIGINS = "ALLOWED_ORIGINS"
DD_AGENT_HOST = "DD_AGENT_HOST"
DD_STATSD_PORT = "DD_STATSD_PORT"
HTTPX_MAX_CONNECTIONS = "HTTPX_MAX_CONNECTIONS"
HTTPX_MAX_KEEPALIVE_CONNECTIONS = "HTTPX_MAX_KEEPALIVE_CONNECTIONS"
HTTPX_CONNECT_TIMEOUT = "HTTPX_CONNECT_TIMEOUT"
HTTPX_READ_TIMEOUT = "HTTPX_READ_TIMEOUT"
HTTPX_WRITE_TIMEOUT = "HTTPX_WRITE_TIMEOUT"
Expand Down Expand Up @@ -95,6 +97,8 @@ class EnvironmentVariables(BaseModel):
IMAGE_PULL_SECRET_NAME: str | None = None
AGENTEX_AUTH_URL: str | None = None
ALLOWED_ORIGINS: str | None = None
HTTPX_MAX_CONNECTIONS: int = 200 # Max total connections allowed
HTTPX_MAX_KEEPALIVE_CONNECTIONS: int = 100 # Max connections to keep alive
HTTPX_CONNECT_TIMEOUT: float = 10.0 # HTTPX connection timeout in seconds
HTTPX_READ_TIMEOUT: float = 30.0 # HTTPX read timeout in seconds
HTTPX_WRITE_TIMEOUT: float = 30.0 # HTTPX write timeout in seconds
Expand Down Expand Up @@ -158,6 +162,12 @@ def refresh(cls, force_refresh: bool = False) -> EnvironmentVariables | None:
ALLOWED_ORIGINS=os.environ.get(EnvVarKeys.ALLOWED_ORIGINS, "*"),
DD_AGENT_HOST=os.environ.get(EnvVarKeys.DD_AGENT_HOST),
DD_STATSD_PORT=os.environ.get(EnvVarKeys.DD_STATSD_PORT),
HTTPX_MAX_CONNECTIONS=int(
os.environ.get(EnvVarKeys.HTTPX_MAX_CONNECTIONS, "200")
),
HTTPX_MAX_KEEPALIVE_CONNECTIONS=int(
os.environ.get(EnvVarKeys.HTTPX_MAX_KEEPALIVE_CONNECTIONS, "100")
),
HTTPX_CONNECT_TIMEOUT=float(
os.environ.get(EnvVarKeys.HTTPX_CONNECT_TIMEOUT, "10.0")
),
Expand Down
Loading