diff --git a/src/secops/chronicle/client.py b/src/secops/chronicle/client.py index cdf63b2..2795f8b 100644 --- a/src/secops/chronicle/client.py +++ b/src/secops/chronicle/client.py @@ -873,7 +873,8 @@ def search_udm( max_attempts: int = 30, timeout: int = 30, debug: bool = False, - ) -> dict[str, Any]: + as_list: bool = False, + ) -> dict[str, Any] | list[dict[str, Any]]: """Search UDM events in Chronicle. Args: @@ -885,13 +886,13 @@ def search_udm( max_attempts: Maximum number of polling attempts (default: 30) timeout: Timeout in seconds for each API request (default: 30) debug: Print debug information during execution + as_list: If True, return a list of events instead of a dict + with events list and nextPageToken. Returns: - Dictionary with search results containing: - - events: List of UDM events with 'name' and 'udm' fields - - total_events: Number of events returned - - more_data_available: Boolean indicating - if more results are available + If as_list is True: List of Events. + If as_list is False: Dict with event list, total number of event and + flag to check if more data is available. Raises: APIError: If the API request fails @@ -906,6 +907,7 @@ def search_udm( max_attempts, timeout, debug, + as_list, ) def find_udm_field_values( diff --git a/src/secops/chronicle/search.py b/src/secops/chronicle/search.py index 037a6e5..32f0c7a 100644 --- a/src/secops/chronicle/search.py +++ b/src/secops/chronicle/search.py @@ -15,15 +15,19 @@ """UDM search functionality for Chronicle.""" from datetime import datetime -from typing import Any +from typing import Any, TYPE_CHECKING -import requests +from secops.chronicle.models import APIVersion +from secops.chronicle.utils.request_utils import ( + chronicle_request, +) -from secops.exceptions import APIError +if TYPE_CHECKING: + from secops.chronicle.client import ChronicleClient def search_udm( - client, + client: "ChronicleClient", query: str, start_time: datetime, end_time: datetime, @@ -32,7 +36,8 @@ def search_udm( max_attempts: int = 30, timeout: int = 30, debug: bool = False, -) -> dict[str, Any]: + as_list: bool = False, +) -> dict[str, Any] | list[dict[str, Any]]: """Perform a UDM search query using the Chronicle V1alpha API. Args: @@ -46,23 +51,19 @@ def search_udm( for backwards compatibility) timeout: Timeout in seconds for each API request (default: 30) debug: Print debug information during execution + as_list: Whether to return results as a list or dictionary Returns: - Dict containing the search results with events + If as_list is True: List of Events. + If as_list is False: Dict with event list, total number of event and + flag to check if more data is available. Raises: APIError: If the API request fails """ - # Unused parameters, kept for backward compatibility _ = (case_insensitive, max_attempts) - # Format the instance ID for the API call - instance = client.instance_id - - # Endpoint for UDM search - url = f"{client.base_url}/{instance}:udmSearch" - # Format times for the API start_time_str = start_time.strftime("%Y-%m-%dT%H:%M:%S.%fZ") end_time_str = end_time.strftime("%Y-%m-%dT%H:%M:%S.%fZ") @@ -79,40 +80,21 @@ def search_udm( print(f"Executing UDM search: {query}") print(f"Time range: {start_time_str} to {end_time_str}") - try: - response = client.session.get(url, params=params, timeout=timeout) - - if response.status_code != 200: - error_msg = ( - f"Error executing search: Status {response.status_code}, " - f"Response: {response.text}" - ) - if debug: - print(f"Error: {error_msg}") - raise APIError(error_msg) - - # Parse the response - response_data = response.json() - - # Extract events and metadata - events = response_data.get("events", []) - more_data_available = response_data.get("moreDataAvailable", False) - - if debug: - print(f"Found {len(events)} events") - print(f"More data available: {more_data_available}") - - # Build the result structure to match the expected format - result = { - "events": events, - "total_events": len(events), - "more_data_available": more_data_available, - } - - return result - - except requests.exceptions.RequestException as e: - error_msg = f"Request failed: {str(e)}" - if debug: - print(f"Error: {error_msg}") - raise APIError(error_msg) from e + result = chronicle_request( + client, + method="GET", + endpoint_path=":udmSearch", + api_version=APIVersion.V1ALPHA, + params=params, + timeout=timeout, + ) + + if as_list: + return result.get("events", []) + + events = result.get("events", []) + return { + "events": events, + "total_events": len(events), + "more_data_available": result.get("moreDataAvailable", False), + } diff --git a/tests/chronicle/test_client.py b/tests/chronicle/test_client.py index e9cb648..f826590 100644 --- a/tests/chronicle/test_client.py +++ b/tests/chronicle/test_client.py @@ -90,41 +90,6 @@ def test_chronicle_client_custom_session_user_agent(): assert client.session.headers.get("User-Agent") == "secops-wrapper-sdk" -def test_search_udm(chronicle_client): - """Test UDM search functionality.""" - # Mock the search request - mock_response = Mock() - mock_response.status_code = 200 - mock_response.json.return_value = { - "events": [ - { - "name": "projects/test-project/locations/us/instances/test-instance/events/event1", - "udm": { - "metadata": { - "eventTimestamp": "2024-01-01T00:00:00Z", - "eventType": "NETWORK_CONNECTION", - }, - "target": {"ip": "192.168.1.1", "hostname": "test-host"}, - }, - } - ], - "moreDataAvailable": False, - } - - with patch.object(chronicle_client.session, "get", return_value=mock_response): - result = chronicle_client.search_udm( - query='target.ip != ""', - start_time=datetime(2024, 1, 1, tzinfo=timezone.utc), - end_time=datetime(2024, 1, 2, tzinfo=timezone.utc), - max_events=10, - ) - - assert "events" in result - assert "total_events" in result - assert result["total_events"] == 1 - assert result["events"][0]["udm"]["target"]["ip"] == "192.168.1.1" - - @patch("secops.chronicle.entity._detect_value_type_for_query") @patch("secops.chronicle.entity._summarize_entity_by_id") def test_summarize_entity_ip(mock_summarize_by_id, mock_detect, chronicle_client): diff --git a/tests/chronicle/test_search.py b/tests/chronicle/test_search.py new file mode 100644 index 0000000..ef094b3 --- /dev/null +++ b/tests/chronicle/test_search.py @@ -0,0 +1,200 @@ +"""Tests for Chronicle UDM search functionality (search_udm).""" + +from __future__ import annotations + +import unittest +from datetime import datetime, timedelta, timezone +from unittest import mock + +from secops.chronicle.models import APIVersion +from secops.chronicle.search import search_udm + + +class TestChronicleUdmSearch(unittest.TestCase): + """Tests for Chronicle search functionality.""" + + def setUp(self) -> None: + self.client = mock.MagicMock() + self.start_time = datetime.now(tz=timezone.utc) - timedelta(days=1) + self.end_time = datetime.now(tz=timezone.utc) + + @mock.patch("secops.chronicle.search.chronicle_request") + def test_search_udm_returns_expected_shape( + self, mock_chronicle_request: mock.MagicMock + ) -> None: + mock_chronicle_request.return_value = { + "events": [{"id": 1}, {"id": 2}], + "moreDataAvailable": True, + } + + result = search_udm( + client=self.client, + query='metadata.event_type = "NETWORK_CONNECTION"', + start_time=self.start_time, + end_time=self.end_time, + max_events=500, + ) + + self.assertEqual(result["events"], [{"id": 1}, {"id": 2}]) + self.assertEqual(result["total_events"], 2) + self.assertTrue(result["more_data_available"]) + + mock_chronicle_request.assert_called_once() + _, kwargs = mock_chronicle_request.call_args + + self.assertEqual(kwargs["method"], "GET") + self.assertEqual(kwargs["endpoint_path"], ":udmSearch") + self.assertEqual(kwargs["api_version"], APIVersion.V1ALPHA) + + params = kwargs["params"] + self.assertEqual( + params["query"], 'metadata.event_type = "NETWORK_CONNECTION"' + ) + self.assertEqual(params["limit"], 500) + self.assertEqual( + params["timeRange.start_time"], + self.start_time.strftime("%Y-%m-%dT%H:%M:%S.%fZ"), + ) + self.assertEqual( + params["timeRange.end_time"], + self.end_time.strftime("%Y-%m-%dT%H:%M:%S.%fZ"), + ) + + @mock.patch("secops.chronicle.search.chronicle_request") + def test_search_udm_defaults_when_keys_missing( + self, mock_chronicle_request: mock.MagicMock + ) -> None: + # Missing "events" and "moreDataAvailable" should default safely + mock_chronicle_request.return_value = {} + + result = search_udm( + client=self.client, + query="q", + start_time=self.start_time, + end_time=self.end_time, + ) + + self.assertEqual(result["events"], []) + self.assertEqual(result["total_events"], 0) + self.assertFalse(result["more_data_available"]) + + @mock.patch("secops.chronicle.search.chronicle_request") + @mock.patch("builtins.print") + def test_search_udm_debug_prints( + self, mock_print: mock.MagicMock, mock_chronicle_request: mock.MagicMock + ) -> None: + mock_chronicle_request.return_value = {"events": []} + + search_udm( + client=self.client, + query="q", + start_time=self.start_time, + end_time=self.end_time, + debug=True, + ) + + # Two prints: query + time range + self.assertGreaterEqual(mock_print.call_count, 2) + + @mock.patch("secops.chronicle.search.chronicle_request") + def test_search_udm_propagates_api_error( + self, mock_chronicle_request: mock.MagicMock + ) -> None: + mock_chronicle_request.side_effect = Exception("boom") + + with self.assertRaises(Exception) as ctx: + search_udm( + client=self.client, + query="q", + start_time=self.start_time, + end_time=self.end_time, + ) + + self.assertIn("boom", str(ctx.exception)) + + @mock.patch("secops.chronicle.search.chronicle_request") + def test_search_udm_as_list_returns_list( + self, mock_chronicle_request: mock.MagicMock + ) -> None: + """Test that as_list=True returns a list of events.""" + mock_chronicle_request.return_value = { + "events": [{"id": 1}, {"id": 2}, {"id": 3}], + "moreDataAvailable": True, + } + + result = search_udm( + client=self.client, + query="test", + start_time=self.start_time, + end_time=self.end_time, + as_list=True, + ) + + self.assertIsInstance(result, list) + self.assertEqual(len(result), 3) + self.assertEqual(result, [{"id": 1}, {"id": 2}, {"id": 3}]) + + @mock.patch("secops.chronicle.search.chronicle_request") + def test_search_udm_as_list_with_missing_events( + self, mock_chronicle_request: mock.MagicMock + ) -> None: + """Test that as_list=True returns empty list when events missing.""" + mock_chronicle_request.return_value = { + "moreDataAvailable": False, + } + + result = search_udm( + client=self.client, + query="test", + start_time=self.start_time, + end_time=self.end_time, + as_list=True, + ) + + self.assertIsInstance(result, list) + self.assertEqual(len(result), 0) + self.assertEqual(result, []) + + @mock.patch("secops.chronicle.search.chronicle_request") + def test_search_udm_timeout_parameter_passed( + self, mock_chronicle_request: mock.MagicMock + ) -> None: + """Test that timeout parameter is correctly passed.""" + mock_chronicle_request.return_value = {"events": []} + + search_udm( + client=self.client, + query="test", + start_time=self.start_time, + end_time=self.end_time, + timeout=60, + ) + + mock_chronicle_request.assert_called_once() + _, kwargs = mock_chronicle_request.call_args + self.assertEqual(kwargs["timeout"], 60) + + @mock.patch("secops.chronicle.search.chronicle_request") + def test_search_udm_empty_events_list( + self, mock_chronicle_request: mock.MagicMock + ) -> None: + """Test handling of empty events list in response.""" + mock_chronicle_request.return_value = { + "events": [], + "moreDataAvailable": False, + } + + result = search_udm( + client=self.client, + query="test", + start_time=self.start_time, + end_time=self.end_time, + ) + + self.assertEqual(result["events"], []) + self.assertEqual(result["total_events"], 0) + self.assertFalse(result["more_data_available"]) + + +if __name__ == "__main__": + unittest.main()