Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crowdsec_tracker_api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class Server(Enum):
'GetCVEIPsResponsePage',
'GetCVEResponse',
'GetCVESubscribedIntegrationsResponsePage',
'GetCVEsFilterBy',
'GetCVEsResponsePage',
'GetCVEsSortBy',
'GetCVEsSortOrder',
Expand Down
Binary file modified crowdsec_tracker_api/__pycache__/base_model.cpython-311.pyc
Binary file not shown.
Binary file modified crowdsec_tracker_api/__pycache__/http_client.cpython-311.pyc
Binary file not shown.
28 changes: 27 additions & 1 deletion crowdsec_tracker_api/models.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: <stdin>
# timestamp: 2026-01-06T15:34:27+00:00
# timestamp: 2026-01-29T15:05:21+00:00

from __future__ import annotations

Expand Down Expand Up @@ -161,6 +161,15 @@ class CVEResponseBase(BaseModelSdk):
title='Crowdsec Score',
),
]
opportunity_score: Annotated[
Optional[int],
Field(
description="Opportunity score indicating if it's an opportunistic(0) or targeted(5) attack (between 0-5)",
ge=0,
le=5,
title='Opportunity Score',
),
] = 0
first_seen: Annotated[
Optional[datetime], Field(description='First seen date', title='First Seen')
] = None
Expand Down Expand Up @@ -251,6 +260,15 @@ class GetCVEResponse(BaseModelSdk):
title='Crowdsec Score',
),
]
opportunity_score: Annotated[
Optional[int],
Field(
description="Opportunity score indicating if it's an opportunistic(0) or targeted(5) attack (between 0-5)",
ge=0,
le=5,
title='Opportunity Score',
),
] = 0
first_seen: Annotated[
Optional[datetime], Field(description='First seen date', title='First Seen')
] = None
Expand Down Expand Up @@ -302,6 +320,10 @@ class GetCVEResponse(BaseModelSdk):
] = None


class GetCVEsFilterBy(StrEnum):
IS_PUBLIC = 'is_public'


class GetCVEsResponsePage(BaseModelSdk):
items: Annotated[List[CVEResponseBase], Field(title='Items')]
total: Annotated[int, Field(ge=0, title='Total')]
Expand Down Expand Up @@ -538,6 +560,10 @@ class CvesGetCvesQueryParameters(BaseModelSdk):
Optional[GetCVEsSortOrder],
Field(description='Sort order: ascending or descending', title='Sort Order'),
] = 'desc'
filters: Annotated[
Optional[List[GetCVEsFilterBy]],
Field(description='Filters to apply on the CVE list', title='Filters'),
] = None
page: Annotated[
Optional[int], Field(description='Page number', ge=1, title='Page')
] = 1
Expand Down
Binary file not shown.
120 changes: 62 additions & 58 deletions crowdsec_tracker_api/services/cves.py
Original file line number Diff line number Diff line change
@@ -1,89 +1,90 @@
import json
from types import NoneType
from typing import Annotated, Optional, Union
from typing import Optional, Union, Annotated

from httpx import Auth
from ..models import *
from ..base_model import Page, Service
from pydantic import BaseModel, Field
from pydantic.fields import FieldInfo

from ..base_model import Page, Service
from httpx import Auth
from ..http_client import HttpClient
from ..models import *


class Cves(Service):
def __init__(
self, auth: Auth, base_url: str = "https://admin.api.crowdsec.net/v1"
) -> None:
super().__init__(
base_url=base_url, auth=auth, user_agent="crowdsec_tracker_api/1.95.2"
)

def __init__(self, auth: Auth, base_url: str = "https://admin.api.crowdsec.net/v1") -> None:
super().__init__(base_url=base_url, auth=auth, user_agent="crowdsec_tracker_api/1.102.2")

def get_cves(
self,
query: Optional[str] = None,
sort_by: Optional[GetCVEsSortBy] = GetCVEsSortBy("rule_release_date"),
sort_order: Optional[GetCVEsSortOrder] = GetCVEsSortOrder("desc"),
filters: Optional[list[GetCVEsFilterBy]] = None,
page: int = 1,
size: int = 50,
) -> GetCVEsResponsePage:
)-> GetCVEsResponsePage:
endpoint_url = "/cves"
loc = locals()
headers = {}
params = json.loads(
CvesGetCvesQueryParameters(**loc).model_dump_json(exclude_none=True)
CvesGetCvesQueryParameters(**loc).model_dump_json(
exclude_none=True
)
)
path_params = {}

response = self.http_client.get(
url=endpoint_url, path_params=path_params, params=params, headers=headers
)

return GetCVEsResponsePage(_client=self, **response.json())

def get_cve(
self,
cve_id: str,
) -> GetCVEResponse:
)-> GetCVEResponse:
endpoint_url = "/cves/{cve_id}"
loc = locals()
headers = {}
params = {}
path_params = json.loads(
CvesGetCvePathParameters(**loc).model_dump_json(exclude_none=True)
CvesGetCvePathParameters(**loc).model_dump_json(
exclude_none=True
)
)

response = self.http_client.get(
url=endpoint_url, path_params=path_params, params=params, headers=headers
)

return GetCVEResponse(**response.json())

def download_cve_ips(
self,
cve_id: str,
) -> str:
)-> str:
endpoint_url = "/cves/{cve_id}/ips-download"
loc = locals()
headers = {}
params = {}
path_params = json.loads(
CvesDownloadCveIpsPathParameters(**loc).model_dump_json(exclude_none=True)
CvesDownloadCveIpsPathParameters(**loc).model_dump_json(
exclude_none=True
)
)

response = self.http_client.get(
url=endpoint_url, path_params=path_params, params=params, headers=headers
)

return response.text

def get_cve_ips_details(
self,
cve_id: str,
since: Optional[str] = "14d",
page: int = 1,
size: int = 50,
) -> GetCVEIPsResponsePage:
)-> GetCVEIPsResponsePage:
endpoint_url = "/cves/{cve_id}/ips-details"
loc = locals()
headers = {}
Expand All @@ -93,21 +94,23 @@ def get_cve_ips_details(
)
)
path_params = json.loads(
CvesGetCveIpsDetailsPathParameters(**loc).model_dump_json(exclude_none=True)
CvesGetCveIpsDetailsPathParameters(**loc).model_dump_json(
exclude_none=True
)
)

response = self.http_client.get(
url=endpoint_url, path_params=path_params, params=params, headers=headers
)

return GetCVEIPsResponsePage(_client=self, **response.json())

def get_cve_subscribed_integrations(
self,
cve_id: str,
page: int = 1,
size: int = 50,
) -> GetCVESubscribedIntegrationsResponsePage:
)-> GetCVESubscribedIntegrationsResponsePage:
endpoint_url = "/cves/{cve_id}/integrations"
loc = locals()
headers = {}
Expand All @@ -121,13 +124,13 @@ def get_cve_subscribed_integrations(
exclude_none=True
)
)

response = self.http_client.get(
url=endpoint_url, path_params=path_params, params=params, headers=headers
)

return GetCVESubscribedIntegrationsResponsePage(_client=self, **response.json())

def subscribe_integration_to_cve(
self,
request: SubscribeCVEIntegrationRequest,
Expand All @@ -142,22 +145,18 @@ def subscribe_integration_to_cve(
exclude_none=True
)
)

payload = (
json.loads(request.model_dump_json(exclude_none=True))
if "request" in loc
else None
)
payload = json.loads(
request.model_dump_json(
exclude_none=True
)
) if "request" in loc else None
response = self.http_client.post(
url=endpoint_url,
path_params=path_params,
params=params,
headers=headers,
json=payload,
url=endpoint_url, path_params=path_params, params=params, headers=headers, json=payload
)

return None

def unsubscribe_integration_from_cve(
self,
cve_id: str,
Expand All @@ -172,30 +171,35 @@ def unsubscribe_integration_from_cve(
exclude_none=True
)
)

response = self.http_client.delete(
url=endpoint_url, path_params=path_params, params=params, headers=headers
)

return None

def get_cve_timeline(
self,
cve_id: str,
since_days: SinceOptions,
) -> list[TimelineItem]:
)-> list[TimelineItem]:
endpoint_url = "/cves/{cve_id}/timeline"
loc = locals()
headers = {}
params = json.loads(
CvesGetCveTimelineQueryParameters(**loc).model_dump_json(exclude_none=True)
CvesGetCveTimelineQueryParameters(**loc).model_dump_json(
exclude_none=True
)
)
path_params = json.loads(
CvesGetCveTimelinePathParameters(**loc).model_dump_json(exclude_none=True)
CvesGetCveTimelinePathParameters(**loc).model_dump_json(
exclude_none=True
)
)

response = self.http_client.get(
url=endpoint_url, path_params=path_params, params=params, headers=headers
)

return [TimelineItem(**item) for item in response.json()]

Loading