diff --git a/imednet/core/endpoint/mixins/create.py b/imednet/core/endpoint/mixins/create.py index 147459f0..73a13790 100644 --- a/imednet/core/endpoint/mixins/create.py +++ b/imednet/core/endpoint/mixins/create.py @@ -4,6 +4,7 @@ import httpx +from imednet.core.endpoint.operations.create import CreateOperation from imednet.core.protocols import AsyncRequestorProtocol, RequestorProtocol T_RESP = TypeVar("T_RESP") @@ -12,6 +13,8 @@ class CreateEndpointMixin(Generic[T_RESP]): """Mixin implementing creation logic.""" + CREATE_OPERATION_CLS: type[CreateOperation[T_RESP]] = CreateOperation + def _prepare_kwargs( self, json: Any = None, @@ -65,9 +68,15 @@ def _create_sync( """ Execute a synchronous creation request (POST). """ - kwargs = self._prepare_kwargs(json=json, data=data, headers=headers) - response = client.post(path, **kwargs) - return self._process_response(response, parse_func) + op = self.CREATE_OPERATION_CLS(self) + return op.execute_sync( + client, + path, + json=json, + data=data, + headers=headers, + parse_func=parse_func, + ) async def _create_async( self, @@ -82,6 +91,12 @@ async def _create_async( """ Execute an asynchronous creation request (POST). """ - kwargs = self._prepare_kwargs(json=json, data=data, headers=headers) - response = await client.post(path, **kwargs) - return self._process_response(response, parse_func) + op = self.CREATE_OPERATION_CLS(self) + return await op.execute_async( + client, + path, + json=json, + data=data, + headers=headers, + parse_func=parse_func, + ) diff --git a/imednet/core/endpoint/mixins/get.py b/imednet/core/endpoint/mixins/get.py index 616f9c20..5a63ff6c 100644 --- a/imednet/core/endpoint/mixins/get.py +++ b/imednet/core/endpoint/mixins/get.py @@ -3,6 +3,7 @@ from typing import Any, Dict, Iterable, List, Optional from imednet.core.endpoint.abc import EndpointABC +from imednet.core.endpoint.operations.get import FilterGetOperation, PathGetOperation from imednet.core.paginator import AsyncPaginator, Paginator from imednet.core.protocols import AsyncRequestorProtocol, RequestorProtocol @@ -15,6 +16,7 @@ class FilterGetEndpointMixin(EndpointABC[T]): # MODEL and _id_param are inherited from EndpointABC as abstract or properties PAGINATOR_CLS: type[Paginator] = Paginator ASYNC_PAGINATOR_CLS: type[AsyncPaginator] = AsyncPaginator + FILTER_GET_OPERATION_CLS: type[FilterGetOperation[T]] = FilterGetOperation def _require_sync_client(self) -> RequestorProtocol: """Return the configured sync client.""" @@ -64,15 +66,8 @@ def _get_sync( study_key: Optional[str], item_id: Any, ) -> T: - filters = {self._id_param: item_id} - result = self._list_sync( - client, - paginator_cls, - study_key=study_key, - refresh=True, - **filters, - ) - return self._validate_get_result(result, study_key, item_id) + op = self.FILTER_GET_OPERATION_CLS(self) + return op.execute_sync(client, paginator_cls, study_key, item_id) async def _get_async( self, @@ -82,15 +77,8 @@ async def _get_async( study_key: Optional[str], item_id: Any, ) -> T: - filters = {self._id_param: item_id} - result = await self._list_async( - client, - paginator_cls, - study_key=study_key, - refresh=True, - **filters, - ) - return self._validate_get_result(result, study_key, item_id) + op = self.FILTER_GET_OPERATION_CLS(self) + return await op.execute_async(client, paginator_cls, study_key, item_id) def get(self, study_key: Optional[str], item_id: Any) -> T: """Get an item by ID using filtering.""" @@ -115,6 +103,7 @@ class PathGetEndpointMixin(ParsingMixin[T], EndpointABC[T]): """Mixin implementing ``get`` via direct path.""" # PATH is inherited from EndpointABC as abstract + PATH_GET_OPERATION_CLS: type[PathGetOperation[T]] = PathGetOperation def _require_sync_client(self) -> RequestorProtocol: """Return the configured sync client.""" @@ -153,9 +142,8 @@ def _get_path_sync( study_key: Optional[str], item_id: Any, ) -> T: - path = self._get_path_for_id(study_key, item_id) - response = client.get(path) - return self._process_response(response, study_key, item_id) + op = self.PATH_GET_OPERATION_CLS(self) + return op.execute_sync(client, study_key, item_id) async def _get_path_async( self, @@ -164,9 +152,8 @@ async def _get_path_async( study_key: Optional[str], item_id: Any, ) -> T: - path = self._get_path_for_id(study_key, item_id) - response = await client.get(path) - return self._process_response(response, study_key, item_id) + op = self.PATH_GET_OPERATION_CLS(self) + return await op.execute_async(client, study_key, item_id) def get(self, study_key: Optional[str], item_id: Any) -> T: """Get an item by ID using direct path.""" diff --git a/imednet/core/endpoint/mixins/list.py b/imednet/core/endpoint/mixins/list.py index f391a13e..b7d7369b 100644 --- a/imednet/core/endpoint/mixins/list.py +++ b/imednet/core/endpoint/mixins/list.py @@ -4,6 +4,7 @@ from imednet.constants import DEFAULT_PAGE_SIZE from imednet.core.endpoint.abc import EndpointABC +from imednet.core.endpoint.operations.list import ListOperation from imednet.core.endpoint.structs import ListRequestState from imednet.core.paginator import AsyncPaginator, Paginator from imednet.core.parsing import get_model_parser @@ -20,6 +21,7 @@ class ListEndpointMixin(ParamMixin, CacheMixin, ParsingMixin[T], EndpointABC[T]) PAGE_SIZE: int = DEFAULT_PAGE_SIZE PAGINATOR_CLS: type[Paginator] = Paginator ASYNC_PAGINATOR_CLS: type[AsyncPaginator] = AsyncPaginator + LIST_OPERATION_CLS: type[ListOperation[T]] = ListOperation def _require_sync_client(self) -> RequestorProtocol: """Return the configured sync client.""" @@ -65,28 +67,6 @@ def _process_list_result( self._update_local_cache(result, study, has_filters, cache) return result - async def _execute_async_list( - self, - paginator: AsyncPaginator, - parse_func: Callable[[Any], T], - study: Optional[str], - has_filters: bool, - cache: Any, - ) -> List[T]: - result = [parse_func(item) async for item in paginator] - return self._process_list_result(result, study, has_filters, cache) - - def _execute_sync_list( - self, - paginator: Paginator, - parse_func: Callable[[Any], T], - study: Optional[str], - has_filters: bool, - cache: Any, - ) -> List[T]: - result = [parse_func(item) for item in paginator] - return self._process_list_result(result, study, has_filters, cache) - def _prepare_list_request( self, study_key: Optional[str], @@ -146,7 +126,8 @@ def _list_sync( paginator = paginator_cls(client, state.path, params=state.params, page_size=self.PAGE_SIZE) parse_func = self._resolve_parse_func() - return self._execute_sync_list( + op = self.LIST_OPERATION_CLS(self) + return op.execute_sync( paginator, parse_func, state.study, @@ -172,7 +153,8 @@ async def _list_async( paginator = paginator_cls(client, state.path, params=state.params, page_size=self.PAGE_SIZE) parse_func = self._resolve_parse_func() - return await self._execute_async_list( + op = self.LIST_OPERATION_CLS(self) + return await op.execute_async( paginator, parse_func, state.study, diff --git a/imednet/core/endpoint/operations/__init__.py b/imednet/core/endpoint/operations/__init__.py new file mode 100644 index 00000000..5a25c3b5 --- /dev/null +++ b/imednet/core/endpoint/operations/__init__.py @@ -0,0 +1,17 @@ +""" +Operation implementations for API endpoints. + +This package contains classes that encapsulate the execution logic for various +endpoint operations, such as listing, getting, and creating resources. +""" + +from .create import CreateOperation +from .get import FilterGetOperation, PathGetOperation +from .list import ListOperation + +__all__ = [ + "CreateOperation", + "FilterGetOperation", + "ListOperation", + "PathGetOperation", +] diff --git a/imednet/core/endpoint/operations/create.py b/imednet/core/endpoint/operations/create.py new file mode 100644 index 00000000..ddf432ca --- /dev/null +++ b/imednet/core/endpoint/operations/create.py @@ -0,0 +1,57 @@ +""" +Operation logic for creating resources. + +This module defines the CreateOperation class, which encapsulates the execution +logic for synchronous and asynchronous create operations (POST). +""" + +from __future__ import annotations + +from typing import Any, Callable, Dict, Generic, Optional, TypeVar + +from imednet.core.endpoint.protocols import CreateEndpointProtocol +from imednet.core.protocols import AsyncRequestorProtocol, RequestorProtocol + +T_RESP = TypeVar("T_RESP") + + +class CreateOperation(Generic[T_RESP]): + """ + Encapsulates execution logic for create operations. + + Handles construction of request arguments, execution of POST requests, + and processing of responses. + """ + + def __init__(self, endpoint: CreateEndpointProtocol[T_RESP]) -> None: + self.endpoint = endpoint + + def execute_sync( + self, + client: RequestorProtocol, + path: str, + *, + json: Any = None, + data: Any = None, + headers: Optional[Dict[str, str]] = None, + parse_func: Optional[Callable[[Any], T_RESP]] = None, + ) -> T_RESP: + """Execute synchronous create request.""" + kwargs = self.endpoint._prepare_kwargs(json=json, data=data, headers=headers) + response = client.post(path, **kwargs) + return self.endpoint._process_response(response, parse_func) + + async def execute_async( + self, + client: AsyncRequestorProtocol, + path: str, + *, + json: Any = None, + data: Any = None, + headers: Optional[Dict[str, str]] = None, + parse_func: Optional[Callable[[Any], T_RESP]] = None, + ) -> T_RESP: + """Execute asynchronous create request.""" + kwargs = self.endpoint._prepare_kwargs(json=json, data=data, headers=headers) + response = await client.post(path, **kwargs) + return self.endpoint._process_response(response, parse_func) diff --git a/imednet/core/endpoint/operations/get.py b/imednet/core/endpoint/operations/get.py new file mode 100644 index 00000000..c2b2675e --- /dev/null +++ b/imednet/core/endpoint/operations/get.py @@ -0,0 +1,101 @@ +""" +Operation logic for getting single resources. + +This module defines operations for retrieving individual items, either via +filtering a list (FilterGetOperation) or via a direct path (PathGetOperation). +""" + +from __future__ import annotations + +from typing import Any, Generic, Optional, TypeVar + +from imednet.core.endpoint.protocols import ( + FilterGetEndpointProtocol, + PathGetEndpointProtocol, +) +from imednet.core.paginator import AsyncPaginator, Paginator +from imednet.core.protocols import AsyncRequestorProtocol, RequestorProtocol +from imednet.models.json_base import JsonModel + +T = TypeVar("T", bound=JsonModel) + + +class FilterGetOperation(Generic[T]): + """ + Encapsulates execution logic for getting an item via filtering. + + This operation relies on the endpoint's list capability to find an item + by its ID parameter. + """ + + def __init__(self, endpoint: FilterGetEndpointProtocol[T]) -> None: + self.endpoint = endpoint + + def execute_sync( + self, + client: RequestorProtocol, + paginator_cls: type[Paginator], + study_key: Optional[str], + item_id: Any, + ) -> T: + """Execute synchronous get via filter.""" + filters = {self.endpoint._id_param: item_id} + result = self.endpoint._list_sync( + client, + paginator_cls, + study_key=study_key, + refresh=True, + **filters, + ) + return self.endpoint._validate_get_result(result, study_key, item_id) + + async def execute_async( + self, + client: AsyncRequestorProtocol, + paginator_cls: type[AsyncPaginator], + study_key: Optional[str], + item_id: Any, + ) -> T: + """Execute asynchronous get via filter.""" + filters = {self.endpoint._id_param: item_id} + result = await self.endpoint._list_async( + client, + paginator_cls, + study_key=study_key, + refresh=True, + **filters, + ) + return self.endpoint._validate_get_result(result, study_key, item_id) + + +class PathGetOperation(Generic[T]): + """ + Encapsulates execution logic for getting an item via direct path. + + This operation constructs a specific URL for the item ID and fetches it directly. + """ + + def __init__(self, endpoint: PathGetEndpointProtocol[T]) -> None: + self.endpoint = endpoint + + def execute_sync( + self, + client: RequestorProtocol, + study_key: Optional[str], + item_id: Any, + ) -> T: + """Execute synchronous get via path.""" + path = self.endpoint._get_path_for_id(study_key, item_id) + response = client.get(path) + return self.endpoint._process_response(response, study_key, item_id) + + async def execute_async( + self, + client: AsyncRequestorProtocol, + study_key: Optional[str], + item_id: Any, + ) -> T: + """Execute asynchronous get via path.""" + path = self.endpoint._get_path_for_id(study_key, item_id) + response = await client.get(path) + return self.endpoint._process_response(response, study_key, item_id) diff --git a/imednet/core/endpoint/operations/list.py b/imednet/core/endpoint/operations/list.py new file mode 100644 index 00000000..b3a59add --- /dev/null +++ b/imednet/core/endpoint/operations/list.py @@ -0,0 +1,83 @@ +""" +Operation logic for listing resources. + +This module defines the ListOperation class, which encapsulates the execution +logic for synchronous and asynchronous list operations, separating it from +the endpoint configuration. +""" + +from __future__ import annotations + +from typing import Any, Callable, Generic, List, Optional, TypeVar + +from imednet.core.endpoint.protocols import ListEndpointProtocol +from imednet.core.paginator import AsyncPaginator, Paginator +from imednet.models.json_base import JsonModel + +T = TypeVar("T", bound=JsonModel) + + +class ListOperation(Generic[T]): + """ + Encapsulates execution logic for list operations. + + Handles iteration over paginated results and parsing of items, + delegating state management and result processing back to the endpoint. + """ + + def __init__(self, endpoint: ListEndpointProtocol[T]) -> None: + self.endpoint = endpoint + + def execute_sync( + self, + paginator: Paginator, + parse_func: Callable[[Any], T], + study: Optional[str], + has_filters: bool, + cache: Any, + ) -> List[T]: + """ + Execute synchronous list retrieval. + + Args: + paginator: The paginator to iterate over. + parse_func: The function to parse raw items into models. + study: The study context. + has_filters: Whether filters were applied. + cache: The cache object. + + Returns: + The list of parsed items. + """ + # Execute iteration + result = [parse_func(item) for item in paginator] + + # Process result (update cache, etc.) + return self.endpoint._process_list_result(result, study, has_filters, cache) + + async def execute_async( + self, + paginator: AsyncPaginator, + parse_func: Callable[[Any], T], + study: Optional[str], + has_filters: bool, + cache: Any, + ) -> List[T]: + """ + Execute asynchronous list retrieval. + + Args: + paginator: The async paginator to iterate over. + parse_func: The function to parse raw items into models. + study: The study context. + has_filters: Whether filters were applied. + cache: The cache object. + + Returns: + The list of parsed items. + """ + # Execute iteration + result = [parse_func(item) async for item in paginator] + + # Process result (update cache, etc.) + return self.endpoint._process_list_result(result, study, has_filters, cache) diff --git a/imednet/core/endpoint/protocols.py b/imednet/core/endpoint/protocols.py index e8009569..ca282e49 100644 --- a/imednet/core/endpoint/protocols.py +++ b/imednet/core/endpoint/protocols.py @@ -1,5 +1,22 @@ -from typing import Any, Dict, Protocol, Type, runtime_checkable +from __future__ import annotations +from typing import ( + Any, + Awaitable, + Callable, + Dict, + List, + Optional, + Protocol, + Type, + TypeVar, + runtime_checkable, +) + +from httpx import Response + +from imednet.core.paginator import AsyncPaginator, Paginator +from imednet.core.protocols import AsyncRequestorProtocol, RequestorProtocol from imednet.models.json_base import JsonModel @@ -23,3 +40,97 @@ def _auto_filter(self, filters: Dict[str, Any]) -> Dict[str, Any]: def _build_path(self, *segments: Any) -> str: """Build the API path.""" ... + + +T = TypeVar("T", bound=JsonModel) +T_RESP = TypeVar("T_RESP") + + +@runtime_checkable +class ListEndpointProtocol(Protocol[T]): + """Protocol for endpoints supporting list operations.""" + + def _process_list_result( + self, + result: List[T], + study: Optional[str], + has_filters: bool, + cache: Any, + ) -> List[T]: + """Process the list result (e.g., update cache).""" + ... + + +@runtime_checkable +class FilterGetEndpointProtocol(Protocol[T]): + """Protocol for endpoints supporting get operations via filtering.""" + + @property + def _id_param(self) -> str: + """The query parameter name for the ID.""" + ... + + def _list_sync( + self, + client: RequestorProtocol, + paginator_cls: type[Paginator], + *, + study_key: Optional[str] = None, + refresh: bool = False, + extra_params: Optional[Dict[str, Any]] = None, + **filters: Any, + ) -> List[T]: + """Execute synchronous list retrieval.""" + ... + + def _list_async( + self, + client: AsyncRequestorProtocol, + paginator_cls: type[AsyncPaginator], + *, + study_key: Optional[str] = None, + refresh: bool = False, + extra_params: Optional[Dict[str, Any]] = None, + **filters: Any, + ) -> Awaitable[List[T]]: + """Execute asynchronous list retrieval.""" + ... + + def _validate_get_result(self, items: List[T], study_key: Optional[str], item_id: Any) -> T: + """Validate the result of a get operation.""" + ... + + +@runtime_checkable +class PathGetEndpointProtocol(Protocol[T]): + """Protocol for endpoints supporting get operations via direct path.""" + + def _get_path_for_id(self, study_key: Optional[str], item_id: Any) -> str: + """Get the path for a specific item ID.""" + ... + + def _process_response(self, response: Response, study_key: Optional[str], item_id: Any) -> T: + """Process the response from a direct get request.""" + ... + + +@runtime_checkable +class CreateEndpointProtocol(Protocol[T_RESP]): + """Protocol for endpoints supporting create operations.""" + + def _prepare_kwargs( + self, + json: Any = None, + data: Any = None, + headers: Optional[Dict[str, str]] = None, + ) -> Dict[str, Any]: + """Prepare keyword arguments for the request.""" + ... + + def _process_response( + self, + response: Response, + parse_func: Optional[Callable[[Any], T_RESP]] = None, + ) -> T_RESP: + """Process the response from a create request.""" + ...