Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
"""add_tasks_metadata_gin_index

Revision ID: e9c4ff9e6542
Revises: 9ff3ee32c81b
Create Date: 2026-05-04 11:11:35.017451

"""
from typing import Sequence, Union

from alembic import op


revision: str = 'e9c4ff9e6542'
down_revision: Union[str, None] = '9ff3ee32c81b'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
op.execute(
"CREATE INDEX IF NOT EXISTS ix_tasks_metadata_gin "
"ON tasks USING GIN (task_metadata jsonb_path_ops)"
)


def downgrade() -> None:
op.execute("DROP INDEX IF EXISTS ix_tasks_metadata_gin")
3 changes: 2 additions & 1 deletion agentex/database/migrations/migration_history.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
57c5ed4f59ae -> 9ff3ee32c81b (head), uppercase deployment status
9ff3ee32c81b -> e9c4ff9e6542 (head), add_tasks_metadata_gin_index
57c5ed4f59ae -> 9ff3ee32c81b, uppercase deployment status enum labels
4a9b7787ccd7 -> 57c5ed4f59ae, add_task_id_to_spans
d1a6cde41b3f -> 4a9b7787ccd7, deployments
d024851e790c -> d1a6cde41b3f, add_langgraph_checkpoint_tables
Expand Down
48 changes: 47 additions & 1 deletion agentex/src/api/routes/tasks.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import json
from typing import Annotated, Any

from fastapi import APIRouter, Query
from fastapi import APIRouter, HTTPException, Query
from fastapi.responses import StreamingResponse

from src.adapters.temporal.adapter_temporal import DTemporalAdapter
Expand All @@ -14,9 +15,11 @@
Task,
TaskRelationships,
TaskResponse,
TaskStatus,
TaskStatusReasonRequest,
UpdateTaskRequest,
)
from src.domain.entities.tasks import TaskStatus as DomainTaskStatus
from src.domain.services.authorization_service import DAuthorizationService
from src.domain.use_cases.streams_use_case import DStreamsUseCase
from src.domain.use_cases.tasks_use_case import DTaskUseCase
Expand Down Expand Up @@ -79,18 +82,61 @@ async def list_tasks(
authorized_ids: DAuthorizedResourceIds(AgentexResourceType.task),
agent_id: str | None = None,
agent_name: str | None = None,
status: Annotated[
TaskStatus | None,
Query(description="Filter tasks by status (e.g. RUNNING, COMPLETED)."),
] = None,
task_metadata: Annotated[
str | None,
Query(
description=(
"JSON-encoded object used to filter tasks via JSONB containment. "
'Example: {"created_by_user_id": "abc-123"}.'
)
),
] = None,
limit: int = 50,
page_number: int = 1,
order_by: str | None = None,
order_direction: str = "desc",
relationships: Annotated[list[TaskRelationships], Query()] = None,
):
"""List all tasks."""
parsed_metadata: dict | None = None
if task_metadata is not None:
try:
parsed_metadata = json.loads(task_metadata)
except json.JSONDecodeError as exc:
raise HTTPException(
status_code=400,
detail=f"Invalid JSON in task_metadata query parameter: {exc.msg}",
) from exc
if not isinstance(parsed_metadata, dict):
raise HTTPException(
status_code=400,
detail="task_metadata must decode to a JSON object.",
)
if not parsed_metadata:
raise HTTPException(
status_code=400,
detail="task_metadata cannot be empty; omit the parameter to skip filtering.",
)

if status == TaskStatus.DELETED:
# list_tasks always excludes DELETED rows at the repository layer, so
# filtering on it would silently return an empty list. Reject explicitly.
raise HTTPException(
status_code=400,
detail="Cannot filter by DELETED status; deleted tasks are not returned by list_tasks.",
)
domain_status = DomainTaskStatus(status.value) if status is not None else None

task_entities = await task_use_case.list_tasks(
id=authorized_ids,
agent_id=agent_id,
agent_name=agent_name,
status=domain_status,
task_metadata=parsed_metadata,
limit=limit,
page_number=page_number,
order_by=order_by,
Expand Down
8 changes: 8 additions & 0 deletions agentex/src/api/schemas/agents_rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,14 @@ class CreateTaskRequest(BaseModel):
params: dict[str, Any] | None = Field(
None, description="The parameters for the task"
)
task_metadata: dict[str, Any] | None = Field(
None,
description=(
"Caller-provided metadata to persist on the task row. Only applied at "
"task creation; ignored if a task with this name already exists. "
"Forwarded to the agent inside the ACP payload for backward compatibility."
),
)
Comment thread
declan-scale marked this conversation as resolved.


class CancelTaskRequest(BaseModel):
Expand Down
9 changes: 9 additions & 0 deletions agentex/src/domain/entities/agents_rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,14 @@ class CreateTaskRequestEntity(BaseModel):
params: dict[str, Any] | None = Field(
None, description="The parameters for the task"
)
task_metadata: dict[str, Any] | None = Field(
None,
description=(
"Caller-provided metadata to persist on the task row. Only applied at "
"task creation; ignored if a task with this name already exists. "
"Forwarded to the agent inside the ACP payload for backward compatibility."
),
)


class CancelTaskRequestEntity(BaseModel):
Expand Down Expand Up @@ -184,6 +192,7 @@ def from_api_request(cls, request: AgentRPCRequest) -> Self:
params = CreateTaskRequestEntity(
name=request.params.root.name,
params=request.params.root.params,
task_metadata=request.params.root.task_metadata,
)
elif request.method == AgentRPCMethod.TASK_CANCEL and isinstance(
request.params.root, CancelTaskRequest
Expand Down
5 changes: 5 additions & 0 deletions agentex/src/domain/repositories/task_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ async def list_with_join(
| None = None,
agent_id: str | None = None,
agent_name: str | None = None,
task_metadata: dict | None = None,
order_by: str | None = None,
order_direction: Literal["asc", "desc"] = "desc",
limit: int | None = None,
Expand All @@ -62,6 +63,8 @@ async def list_with_join(
- task_filters: Filters on the task table itself
- agent_id: Filter tasks by agent ID using the join table
- agent_name: Filter tasks by agent name
- task_metadata: JSONB containment filter on `task_metadata`. Returns
tasks whose metadata is a JSON superset of the provided dict.
- order_by: Column to order by
- order_direction: Direction to order by
- limit: Maximum number of results to return
Expand All @@ -78,6 +81,8 @@ async def list_with_join(
).where(AgentORM.name == agent_name)
if agent_id:
query = query.where(TaskAgentORM.agent_id == agent_id)
if task_metadata is not None:
query = query.where(TaskORM.task_metadata.contains(task_metadata))
query = query.where(TaskORM.status != TaskStatus.DELETED)
return await self.list(
filters=task_filters,
Expand Down
14 changes: 13 additions & 1 deletion agentex/src/domain/services/task_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ async def create_task(
agent: AgentEntity,
task_name: str | None = None,
task_params: dict[str, Any] | None = None,
task_metadata: dict[str, Any] | None = None,
) -> TaskEntity:
"""
Create a new task record in the repository with single agent (maintains existing interface).
Expand All @@ -53,6 +54,8 @@ async def create_task(
agent: The agent to create the task for
task_name: The name of the task to be created
task_params: The parameters for the task
task_metadata: Caller-provided metadata to persist on the task row.
Not forwarded to the agent.
Returns:
Task containing the created task info
"""
Expand All @@ -65,6 +68,7 @@ async def create_task(
status=TaskStatus.RUNNING,
status_reason="Task created, forwarding to ACP server",
params=task_params,
task_metadata=task_metadata,
),
)
return task_entity
Expand Down Expand Up @@ -220,18 +224,26 @@ async def list_tasks(
id: str | list[str] | None = None,
agent_id: str | None = None,
agent_name: str | None = None,
status: TaskStatus | list[TaskStatus] | None = None,
task_metadata: dict | None = None,
order_by: str | None = None,
order_direction: str = "desc",
relationships: list[TaskRelationships] | None = None,
) -> list[TaskEntity]:
"""
List all tasks from the repository.
"""
task_filters: dict = {}
if id is not None:
task_filters["id"] = id
if status is not None:
task_filters["status"] = status

return await self.task_repository.list_with_join(
task_filters={"id": id} if id is not None else None,
task_filters=task_filters or None,
agent_id=agent_id,
agent_name=agent_name,
task_metadata=task_metadata,
order_by=order_by,
order_direction=order_direction,
limit=limit,
Expand Down
14 changes: 11 additions & 3 deletions agentex/src/domain/use_cases/agents_acp_use_case.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ async def _get_or_create_task(
task_id: str | None = None,
task_name: str | None = None,
task_params: dict[str, Any] | None = None,
task_metadata: dict[str, Any] | None = None,
) -> TaskEntity:
"""Return the existing task if *task_id* is provided, otherwise create a new one.

Expand Down Expand Up @@ -303,7 +304,10 @@ async def _get_or_create_task(

# Create a new task if it doesn't exist
task = await self.task_service.create_task(
agent=agent, task_name=task_name, task_params=task_params
agent=agent,
task_name=task_name,
task_params=task_params,
task_metadata=task_metadata,
)
logger.info(f"[agent_id={agent.id}] Created task {task.id}")
await self.grant_with_retry(task)
Expand Down Expand Up @@ -408,9 +412,13 @@ async def _handle_task_create(
Returns:
Task containing the created task info
"""
# This creates the task record then forwards the message to the ACP server
# This creates the task record then forwards the message to the ACP server.
# task_metadata is persisted on the task row but never forwarded to the agent.
task = await self._get_or_create_task(
agent=agent, task_name=params.name, task_params=params.params
agent=agent,
task_name=params.name,
task_params=params.params,
task_metadata=params.task_metadata,
)

if agent.acp_type in [ACPType.AGENTIC, ACPType.ASYNC]:
Expand Down
4 changes: 4 additions & 0 deletions agentex/src/domain/use_cases/tasks_use_case.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ async def list_tasks(
id: str | list[str] | None = None,
agent_id: str | None = None,
agent_name: str | None = None,
status: TaskStatus | list[TaskStatus] | None = None,
task_metadata: dict | None = None,
order_by: str | None = None,
order_direction: str = "desc",
relationships: list[TaskRelationships] | None = None,
Expand All @@ -78,6 +80,8 @@ async def list_tasks(
id=id,
agent_id=agent_id,
agent_name=agent_name,
status=status,
task_metadata=task_metadata,
limit=limit,
page_number=page_number,
order_by=order_by,
Expand Down
103 changes: 103 additions & 0 deletions agentex/tests/integration/api/tasks/test_tasks_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,109 @@ async def test_list_tasks_with_both_agent_id_and_agent_name_filter(
assert len(tasks) == 1
assert tasks[0]["id"] == target_task.id

async def test_list_tasks_with_task_metadata_filter(
self, isolated_client, isolated_repositories
):
"""list_tasks?task_metadata={...} should return only matching tasks."""
agent_repo = isolated_repositories["agent_repository"]
agent = AgentEntity(
id=orm_id(),
name="metadata-filter-agent",
description="agent for metadata filter test",
acp_url="http://test-acp:8000",
acp_type=ACPType.SYNC,
)
await agent_repo.create(agent)

task_repo = isolated_repositories["task_repository"]
matching = TaskEntity(
id=orm_id(),
name="matching-task",
status=TaskStatus.RUNNING,
task_metadata={"created_by_user_id": "user-a"},
)
other = TaskEntity(
id=orm_id(),
name="other-task",
status=TaskStatus.RUNNING,
task_metadata={"created_by_user_id": "user-b"},
)
await task_repo.create(agent_id=agent.id, task=matching)
await task_repo.create(agent_id=agent.id, task=other)

response = await isolated_client.get(
"/tasks",
params={"task_metadata": '{"created_by_user_id": "user-a"}'},
)
assert response.status_code == 200
ids = {t["id"] for t in response.json()}
assert matching.id in ids
assert other.id not in ids

async def test_list_tasks_rejects_malformed_task_metadata(self, isolated_client):
"""Malformed JSON in task_metadata should yield a 400."""
response = await isolated_client.get(
"/tasks", params={"task_metadata": "not-json"}
)
assert response.status_code == 400

async def test_list_tasks_rejects_empty_task_metadata(self, isolated_client):
"""Empty JSON object in task_metadata should yield a 400."""
response = await isolated_client.get("/tasks", params={"task_metadata": "{}"})
assert response.status_code == 400

async def test_list_tasks_rejects_non_object_task_metadata(self, isolated_client):
"""Non-object JSON in task_metadata should yield a 400."""
response = await isolated_client.get(
"/tasks", params={"task_metadata": '"some-string"'}
)
assert response.status_code == 400

async def test_list_tasks_with_status_filter(
self, isolated_client, isolated_repositories
):
"""list_tasks?status=RUNNING should return only RUNNING tasks."""
agent_repo = isolated_repositories["agent_repository"]
agent = AgentEntity(
id=orm_id(),
name="status-filter-agent",
description="agent for status filter test",
acp_url="http://test-acp:8000",
acp_type=ACPType.SYNC,
)
await agent_repo.create(agent)

task_repo = isolated_repositories["task_repository"]
running = TaskEntity(
id=orm_id(),
name="status-filter-running",
status=TaskStatus.RUNNING,
)
completed = TaskEntity(
id=orm_id(),
name="status-filter-completed",
status=TaskStatus.COMPLETED,
)
await task_repo.create(agent_id=agent.id, task=running)
await task_repo.create(agent_id=agent.id, task=completed)

response = await isolated_client.get("/tasks", params={"status": "RUNNING"})
assert response.status_code == 200
ids = {t["id"] for t in response.json()}
assert running.id in ids
assert completed.id not in ids

async def test_list_tasks_rejects_invalid_status(self, isolated_client):
"""Invalid status enum value should yield a 422."""
response = await isolated_client.get("/tasks", params={"status": "BOGUS"})
assert response.status_code == 422

async def test_list_tasks_rejects_deleted_status(self, isolated_client):
"""status=DELETED is contradictory with the always-on DELETED exclusion;
rejecting at the route avoids silently returning an empty list."""
response = await isolated_client.get("/tasks", params={"status": "DELETED"})
assert response.status_code == 400

#
async def test_get_task_by_id_returns_correct_task(
self, isolated_client, test_task
Expand Down
Loading
Loading