Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 21 additions & 6 deletions imednet/core/endpoint/mixins/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import httpx

from imednet.core.endpoint.operations.create import CreateOperation
from imednet.core.protocols import AsyncRequestorProtocol, RequestorProtocol

T_RESP = TypeVar("T_RESP")
Expand All @@ -12,6 +13,8 @@
class CreateEndpointMixin(Generic[T_RESP]):
"""Mixin implementing creation logic."""

CREATE_OPERATION_CLS: type[CreateOperation[T_RESP]] = CreateOperation

def _prepare_kwargs(
self,
json: Any = None,
Expand Down Expand Up @@ -65,9 +68,15 @@ def _create_sync(
"""
Execute a synchronous creation request (POST).
"""
kwargs = self._prepare_kwargs(json=json, data=data, headers=headers)
response = client.post(path, **kwargs)
return self._process_response(response, parse_func)
op = self.CREATE_OPERATION_CLS(self)
return op.execute_sync(
client,
path,
json=json,
data=data,
headers=headers,
parse_func=parse_func,
)

async def _create_async(
self,
Expand All @@ -82,6 +91,12 @@ async def _create_async(
"""
Execute an asynchronous creation request (POST).
"""
kwargs = self._prepare_kwargs(json=json, data=data, headers=headers)
response = await client.post(path, **kwargs)
return self._process_response(response, parse_func)
op = self.CREATE_OPERATION_CLS(self)
return await op.execute_async(
client,
path,
json=json,
data=data,
headers=headers,
parse_func=parse_func,
)
35 changes: 11 additions & 24 deletions imednet/core/endpoint/mixins/get.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import Any, Dict, Iterable, List, Optional

from imednet.core.endpoint.abc import EndpointABC
from imednet.core.endpoint.operations.get import FilterGetOperation, PathGetOperation
from imednet.core.paginator import AsyncPaginator, Paginator
from imednet.core.protocols import AsyncRequestorProtocol, RequestorProtocol

Expand All @@ -15,6 +16,7 @@ class FilterGetEndpointMixin(EndpointABC[T]):
# MODEL and _id_param are inherited from EndpointABC as abstract or properties
PAGINATOR_CLS: type[Paginator] = Paginator
ASYNC_PAGINATOR_CLS: type[AsyncPaginator] = AsyncPaginator
FILTER_GET_OPERATION_CLS: type[FilterGetOperation[T]] = FilterGetOperation

def _require_sync_client(self) -> RequestorProtocol:
"""Return the configured sync client."""
Expand Down Expand Up @@ -64,15 +66,8 @@ def _get_sync(
study_key: Optional[str],
item_id: Any,
) -> T:
filters = {self._id_param: item_id}
result = self._list_sync(
client,
paginator_cls,
study_key=study_key,
refresh=True,
**filters,
)
return self._validate_get_result(result, study_key, item_id)
op = self.FILTER_GET_OPERATION_CLS(self)
return op.execute_sync(client, paginator_cls, study_key, item_id)

async def _get_async(
self,
Expand All @@ -82,15 +77,8 @@ async def _get_async(
study_key: Optional[str],
item_id: Any,
) -> T:
filters = {self._id_param: item_id}
result = await self._list_async(
client,
paginator_cls,
study_key=study_key,
refresh=True,
**filters,
)
return self._validate_get_result(result, study_key, item_id)
op = self.FILTER_GET_OPERATION_CLS(self)
return await op.execute_async(client, paginator_cls, study_key, item_id)

def get(self, study_key: Optional[str], item_id: Any) -> T:
"""Get an item by ID using filtering."""
Expand All @@ -115,6 +103,7 @@ class PathGetEndpointMixin(ParsingMixin[T], EndpointABC[T]):
"""Mixin implementing ``get`` via direct path."""

# PATH is inherited from EndpointABC as abstract
PATH_GET_OPERATION_CLS: type[PathGetOperation[T]] = PathGetOperation

def _require_sync_client(self) -> RequestorProtocol:
"""Return the configured sync client."""
Expand Down Expand Up @@ -153,9 +142,8 @@ def _get_path_sync(
study_key: Optional[str],
item_id: Any,
) -> T:
path = self._get_path_for_id(study_key, item_id)
response = client.get(path)
return self._process_response(response, study_key, item_id)
op = self.PATH_GET_OPERATION_CLS(self)
return op.execute_sync(client, study_key, item_id)

async def _get_path_async(
self,
Expand All @@ -164,9 +152,8 @@ async def _get_path_async(
study_key: Optional[str],
item_id: Any,
) -> T:
path = self._get_path_for_id(study_key, item_id)
response = await client.get(path)
return self._process_response(response, study_key, item_id)
op = self.PATH_GET_OPERATION_CLS(self)
return await op.execute_async(client, study_key, item_id)

def get(self, study_key: Optional[str], item_id: Any) -> T:
"""Get an item by ID using direct path."""
Expand Down
30 changes: 6 additions & 24 deletions imednet/core/endpoint/mixins/list.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from imednet.constants import DEFAULT_PAGE_SIZE
from imednet.core.endpoint.abc import EndpointABC
from imednet.core.endpoint.operations.list import ListOperation
from imednet.core.endpoint.structs import ListRequestState
from imednet.core.paginator import AsyncPaginator, Paginator
from imednet.core.parsing import get_model_parser
Expand All @@ -20,6 +21,7 @@ class ListEndpointMixin(ParamMixin, CacheMixin, ParsingMixin[T], EndpointABC[T])
PAGE_SIZE: int = DEFAULT_PAGE_SIZE
PAGINATOR_CLS: type[Paginator] = Paginator
ASYNC_PAGINATOR_CLS: type[AsyncPaginator] = AsyncPaginator
LIST_OPERATION_CLS: type[ListOperation[T]] = ListOperation

def _require_sync_client(self) -> RequestorProtocol:
"""Return the configured sync client."""
Expand Down Expand Up @@ -65,28 +67,6 @@ def _process_list_result(
self._update_local_cache(result, study, has_filters, cache)
return result

async def _execute_async_list(
self,
paginator: AsyncPaginator,
parse_func: Callable[[Any], T],
study: Optional[str],
has_filters: bool,
cache: Any,
) -> List[T]:
result = [parse_func(item) async for item in paginator]
return self._process_list_result(result, study, has_filters, cache)

def _execute_sync_list(
self,
paginator: Paginator,
parse_func: Callable[[Any], T],
study: Optional[str],
has_filters: bool,
cache: Any,
) -> List[T]:
result = [parse_func(item) for item in paginator]
return self._process_list_result(result, study, has_filters, cache)

def _prepare_list_request(
self,
study_key: Optional[str],
Expand Down Expand Up @@ -146,7 +126,8 @@ def _list_sync(
paginator = paginator_cls(client, state.path, params=state.params, page_size=self.PAGE_SIZE)
parse_func = self._resolve_parse_func()

return self._execute_sync_list(
op = self.LIST_OPERATION_CLS(self)
return op.execute_sync(
paginator,
parse_func,
state.study,
Expand All @@ -172,7 +153,8 @@ async def _list_async(
paginator = paginator_cls(client, state.path, params=state.params, page_size=self.PAGE_SIZE)
parse_func = self._resolve_parse_func()

return await self._execute_async_list(
op = self.LIST_OPERATION_CLS(self)
return await op.execute_async(
paginator,
parse_func,
state.study,
Expand Down
17 changes: 17 additions & 0 deletions imednet/core/endpoint/operations/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
"""
Operation implementations for API endpoints.

This package contains classes that encapsulate the execution logic for various
endpoint operations, such as listing, getting, and creating resources.
"""

from .create import CreateOperation
from .get import FilterGetOperation, PathGetOperation
from .list import ListOperation

__all__ = [
"CreateOperation",
"FilterGetOperation",
"ListOperation",
"PathGetOperation",
]
57 changes: 57 additions & 0 deletions imednet/core/endpoint/operations/create.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
"""
Operation logic for creating resources.

This module defines the CreateOperation class, which encapsulates the execution
logic for synchronous and asynchronous create operations (POST).
"""

from __future__ import annotations

from typing import Any, Callable, Dict, Generic, Optional, TypeVar

from imednet.core.endpoint.protocols import CreateEndpointProtocol
from imednet.core.protocols import AsyncRequestorProtocol, RequestorProtocol

T_RESP = TypeVar("T_RESP")


class CreateOperation(Generic[T_RESP]):
"""
Encapsulates execution logic for create operations.

Handles construction of request arguments, execution of POST requests,
and processing of responses.
"""

def __init__(self, endpoint: CreateEndpointProtocol[T_RESP]) -> None:
self.endpoint = endpoint

def execute_sync(
self,
client: RequestorProtocol,
path: str,
*,
json: Any = None,
data: Any = None,
headers: Optional[Dict[str, str]] = None,
parse_func: Optional[Callable[[Any], T_RESP]] = None,
) -> T_RESP:
"""Execute synchronous create request."""
kwargs = self.endpoint._prepare_kwargs(json=json, data=data, headers=headers)
response = client.post(path, **kwargs)
return self.endpoint._process_response(response, parse_func)

async def execute_async(
self,
client: AsyncRequestorProtocol,
path: str,
*,
json: Any = None,
data: Any = None,
headers: Optional[Dict[str, str]] = None,
parse_func: Optional[Callable[[Any], T_RESP]] = None,
) -> T_RESP:
"""Execute asynchronous create request."""
kwargs = self.endpoint._prepare_kwargs(json=json, data=data, headers=headers)
response = await client.post(path, **kwargs)
return self.endpoint._process_response(response, parse_func)
101 changes: 101 additions & 0 deletions imednet/core/endpoint/operations/get.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
"""
Operation logic for getting single resources.

This module defines operations for retrieving individual items, either via
filtering a list (FilterGetOperation) or via a direct path (PathGetOperation).
"""

from __future__ import annotations

from typing import Any, Generic, Optional, TypeVar

from imednet.core.endpoint.protocols import (
FilterGetEndpointProtocol,
PathGetEndpointProtocol,
)
from imednet.core.paginator import AsyncPaginator, Paginator
from imednet.core.protocols import AsyncRequestorProtocol, RequestorProtocol
from imednet.models.json_base import JsonModel

T = TypeVar("T", bound=JsonModel)


class FilterGetOperation(Generic[T]):
"""
Encapsulates execution logic for getting an item via filtering.

This operation relies on the endpoint's list capability to find an item
by its ID parameter.
"""

def __init__(self, endpoint: FilterGetEndpointProtocol[T]) -> None:
self.endpoint = endpoint

def execute_sync(
self,
client: RequestorProtocol,
paginator_cls: type[Paginator],
study_key: Optional[str],
item_id: Any,
) -> T:
"""Execute synchronous get via filter."""
filters = {self.endpoint._id_param: item_id}
result = self.endpoint._list_sync(
client,
paginator_cls,
study_key=study_key,
refresh=True,
**filters,
)
return self.endpoint._validate_get_result(result, study_key, item_id)

async def execute_async(
self,
client: AsyncRequestorProtocol,
paginator_cls: type[AsyncPaginator],
study_key: Optional[str],
item_id: Any,
) -> T:
"""Execute asynchronous get via filter."""
filters = {self.endpoint._id_param: item_id}
result = await self.endpoint._list_async(
client,
paginator_cls,
study_key=study_key,
refresh=True,
**filters,
)
return self.endpoint._validate_get_result(result, study_key, item_id)


class PathGetOperation(Generic[T]):
"""
Encapsulates execution logic for getting an item via direct path.

This operation constructs a specific URL for the item ID and fetches it directly.
"""

def __init__(self, endpoint: PathGetEndpointProtocol[T]) -> None:
self.endpoint = endpoint

def execute_sync(
self,
client: RequestorProtocol,
study_key: Optional[str],
item_id: Any,
) -> T:
"""Execute synchronous get via path."""
path = self.endpoint._get_path_for_id(study_key, item_id)
response = client.get(path)
return self.endpoint._process_response(response, study_key, item_id)

async def execute_async(
self,
client: AsyncRequestorProtocol,
study_key: Optional[str],
item_id: Any,
) -> T:
"""Execute asynchronous get via path."""
path = self.endpoint._get_path_for_id(study_key, item_id)
response = await client.get(path)
return self.endpoint._process_response(response, study_key, item_id)
Loading
Loading