Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 112 additions & 13 deletions autogen.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,17 @@
OPENAPI_URL = "https://api.vantage.sh/v2/oas_v3.json"
OUTPUT_DIR = Path(__file__).parent / "src" / "vantage"

# Maps a substring found in a response description to the internal client method
# that should handle it, and the Python return type to emit.
# Checked against each HTTP status code's description during endpoint parsing.
RESPONSE_HANDLERS: list[tuple[str, str, str]] = [
(
"will be available at the location specified in the Location header",
"_request_for_location",
"str",
),
]


@dataclass
class Parameter:
Expand Down Expand Up @@ -48,6 +59,8 @@ class Endpoint:
request_body_type: str | None = None
response_type: str | None = None
is_multipart: bool = False
response_handler: str | None = None # internal client method to call, if not the default
response_handler_return_type: str | None = None


@dataclass
Expand Down Expand Up @@ -292,13 +305,27 @@ def parse_endpoints(schema: dict[str, Any]) -> list[Endpoint]:

response_type = extract_response_type(spec.get("responses", {}), schemas)

description = spec.get("description")

response_handler = None
response_handler_return_type = None
for resp_desc in spec.get("responses", {}).values():
text = resp_desc.get("description", "")
for phrase, handler, return_type in RESPONSE_HANDLERS:
if phrase in text:
response_handler = handler
response_handler_return_type = return_type
break
if response_handler:
break

endpoints.append(
Endpoint(
path=path,
method=method.upper(),
operation_id=operation_id,
summary=spec.get("summary"),
description=spec.get("description"),
description=description,
deprecated=spec.get("deprecated", False),
parameters=parameters,
request_body_required=request_body.get("required", False)
Expand All @@ -307,6 +334,8 @@ def parse_endpoints(schema: dict[str, Any]) -> list[Endpoint]:
request_body_type=body_type,
response_type=response_type,
is_multipart=is_multipart,
response_handler=response_handler,
response_handler_return_type=response_handler_return_type,
)
)

Expand Down Expand Up @@ -523,6 +552,18 @@ def generate_pydantic_models(schema: dict[str, Any]) -> str:
return "\n".join(lines)


def _collect_handler_routes(resources: dict[str, Resource]) -> dict[str, list[tuple[str, str]]]:
"""Scan all endpoints and group (method, path) pairs by their response_handler."""
handler_routes: dict[str, list[tuple[str, str]]] = {}
for resource in resources.values():
for endpoint in resource.endpoints:
if endpoint.response_handler:
handler_routes.setdefault(endpoint.response_handler, []).append(
(endpoint.method, endpoint.path)
)
return handler_routes


def generate_sync_client(resources: dict[str, Resource]) -> str:
"""Generate synchronous client code."""
lines = [
Expand Down Expand Up @@ -618,13 +659,30 @@ def generate_sync_client(resources: dict[str, Resource]) -> str:
" body=response.text,",
" )",
"",
]
)

# Inject generated routing: one if-block per handler, checking (method, path)
handler_routes = _collect_handler_routes(resources)
for handler, routes in sorted(handler_routes.items()):
route_set = "{" + ", ".join(f'("{m}", "{p}")' for m, p in sorted(routes)) + "}"
lines.append(f" if (method, path) in {route_set}:")
lines.append(f" return self.{handler}(response)")
lines.append("")

lines.extend(
[
" try:",
" data = response.json()",
" except Exception:",
" data = None",
"",
" return data",
"",
" def _request_for_location(self, response: Any) -> str:",
' """Extract the Location header from a response."""',
' return response.headers["Location"]',
"",
"",
]
)
Expand Down Expand Up @@ -699,7 +757,10 @@ def generate_sync_method(endpoint: Endpoint, method_name: str) -> list[str]:

# Method signature
param_str = ", ".join(["self"] + params) if params else "self"
return_type = endpoint.response_type or "Any"
if endpoint.response_handler:
return_type = endpoint.response_handler_return_type or "Any"
else:
return_type = endpoint.response_type or "None"
lines.append(f" def {method_name}({param_str}) -> {return_type}:")

# Docstring
Expand Down Expand Up @@ -740,11 +801,20 @@ def generate_sync_method(endpoint: Endpoint, method_name: str) -> list[str]:
lines.append(" body_data = None")

# Make request and coerce response payload into typed models where possible
lines.append(
f' data = self._client.request("{endpoint.method}", path, params=params, body=body_data)'
)
_append_response_mapping(lines, return_type, "data")
lines.append(" return data")
if endpoint.response_handler:
lines.append(
f' return self._client.request("{endpoint.method}", path, params=params, body=body_data)'
)
elif endpoint.response_type is None:
lines.append(
f' self._client.request("{endpoint.method}", path, params=params, body=body_data)'
)
else:
lines.append(
f' data = self._client.request("{endpoint.method}", path, params=params, body=body_data)'
)
_append_response_mapping(lines, return_type, "data")
lines.append(" return data")

return lines

Expand Down Expand Up @@ -844,13 +914,30 @@ def generate_async_client(resources: dict[str, Resource]) -> str:
" body=response.text,",
" )",
"",
]
)

# Inject generated routing: one if-block per handler, checking (method, path)
handler_routes = _collect_handler_routes(resources)
for handler, routes in sorted(handler_routes.items()):
route_set = "{" + ", ".join(f'("{m}", "{p}")' for m, p in sorted(routes)) + "}"
lines.append(f" if (method, path) in {route_set}:")
lines.append(f" return self.{handler}(response)")
lines.append("")

lines.extend(
[
" try:",
" data = response.json()",
" except Exception:",
" data = None",
"",
" return data",
"",
" def _request_for_location(self, response: Any) -> str:",
' """Extract the Location header from a response."""',
' return response.headers["Location"]',
"",
"",
]
)
Expand Down Expand Up @@ -925,7 +1012,10 @@ def generate_async_method(endpoint: Endpoint, method_name: str) -> list[str]:

# Method signature
param_str = ", ".join(["self"] + params) if params else "self"
return_type = endpoint.response_type or "Any"
if endpoint.response_handler:
return_type = endpoint.response_handler_return_type or "Any"
else:
return_type = endpoint.response_type or "None"
lines.append(f" async def {method_name}({param_str}) -> {return_type}:")

# Docstring
Expand Down Expand Up @@ -966,11 +1056,20 @@ def generate_async_method(endpoint: Endpoint, method_name: str) -> list[str]:
lines.append(" body_data = None")

# Make request and coerce response payload into typed models where possible
lines.append(
f' data = await self._client.request("{endpoint.method}", path, params=params, body=body_data)'
)
_append_response_mapping(lines, return_type, "data")
lines.append(" return data")
if endpoint.response_handler:
lines.append(
f' return await self._client.request("{endpoint.method}", path, params=params, body=body_data)'
)
elif endpoint.response_type is None:
lines.append(
f' await self._client.request("{endpoint.method}", path, params=params, body=body_data)'
)
else:
lines.append(
f' data = await self._client.request("{endpoint.method}", path, params=params, body=body_data)'
)
_append_response_mapping(lines, return_type, "data")
lines.append(" return data")

return lines

Expand Down
Loading