diff --git a/examples/raw_log_search_example.py b/examples/raw_log_search_example.py new file mode 100644 index 0000000..b233c07 --- /dev/null +++ b/examples/raw_log_search_example.py @@ -0,0 +1,79 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Example usage of raw log search functionality.""" + +import argparse +from datetime import datetime, timedelta +from pprint import pprint + +from secops.chronicle import ChronicleClient +from secops.exceptions import APIError + + +def main(): + """Run raw log search example.""" + parser = argparse.ArgumentParser(description="Chronicle Raw Log Search Example") + parser.add_argument("--project_id", required=True, help="GCP Project ID") + parser.add_argument("--customer_id", required=True, help="Chronicle Customer ID") + parser.add_argument("--region", default="us", help="Chronicle Region") + parser.add_argument("--query", default="user = \"user\"", help="Raw log search query") + parser.add_argument("--days", type=int, default=1, help="Search time range in days") + + args = parser.parse_args() + + client = ChronicleClient( + project_id=args.project_id, + customer_id=args.customer_id, + region=args.region, + ) + + end_time = datetime.utcnow() + start_time = end_time - timedelta(days=args.days) + + print(f"Searching raw logs from {start_time} to {end_time}") + print(f"Query: {args.query}") + + try: + # Example 1: Basic Search + results = client.search_raw_logs( + query=args.query, + start_time=start_time, + end_time=end_time, + page_size=10, + ) + + print("\nResults:") + pprint(results) + + # Example 2: Filtering by Log Type (if available) + # Note: Replace 'OKTA' with a valid log type in your environment + # print("\nSearching with Log Type filter:") + # results_filtered = client.search_raw_logs( + # query=args.query, + # start_time=start_time, + # end_time=end_time, + # page_size=10, + # log_types=["OKTA"] + # ) + # pprint(results_filtered) + + except APIError as e: + print(f"API Error: {e}") + except Exception as e: + print(f"Error: {e}") + + +if __name__ == "__main__": + main() diff --git a/src/secops/chronicle/__init__.py b/src/secops/chronicle/__init__.py index f38fcf2..e87c5a8 100644 --- a/src/secops/chronicle/__init__.py +++ b/src/secops/chronicle/__init__.py @@ -184,6 +184,7 @@ RowLogFormat, generate_udm_key_value_mappings, ) +from secops.chronicle.raw_log_search import search_raw_logs from secops.chronicle.udm_search import ( fetch_udm_search_csv, fetch_udm_search_view, @@ -210,6 +211,7 @@ "validate_query", "get_stats", "search_udm", + "search_raw_logs", # Natural Language Search "translate_nl_to_udm", # Entity diff --git a/src/secops/chronicle/client.py b/src/secops/chronicle/client.py index cdf63b2..9c718f1 100644 --- a/src/secops/chronicle/client.py +++ b/src/secops/chronicle/client.py @@ -309,6 +309,7 @@ ) from secops.chronicle.rule_validation import validate_rule as _validate_rule from secops.chronicle.search import search_udm as _search_udm +from secops.chronicle.raw_log_search import search_raw_logs as _search_raw_logs from secops.chronicle.stats import get_stats as _get_stats from secops.chronicle.udm_mapping import RowLogFormat from secops.chronicle.udm_mapping import ( @@ -908,6 +909,50 @@ def search_udm( debug, ) + def search_raw_logs( + self, + query: str, + start_time: datetime, + end_time: datetime, + allow_partial_results: bool = False, + snapshot_query: str | None = None, + case_sensitive: bool = False, + log_types: list[str] | None = None, + max_aggregations_per_field: int | None = None, + page_size: int | None = None, + ) -> dict[str, Any]: + """Search for raw logs in Chronicle. + + Args: + query: Query to search for raw logs. + start_time: Search start time (inclusive). + end_time: Search end time (exclusive). + allow_partial_results: Optional. Whether to allow partial results. + snapshot_query: Optional. Query to filter results. + case_sensitive: Optional. Whether search is case-sensitive. + log_types: Optional. Limit results to specific log types by display name (e.g. ["OKTA"]). + max_aggregations_per_field: Optional. Max values for a UDM field. + page_size: Optional. Maximum number of results to return. + + Returns: + Dictionary containing search results. + + Raises: + APIError: If the API request fails. + """ + return _search_raw_logs( + self, + query=query, + start_time=start_time, + end_time=end_time, + snapshot_query=snapshot_query, + case_sensitive=case_sensitive, + log_types=log_types, + max_aggregations_per_field=max_aggregations_per_field, + page_size=page_size, + allow_partial_results=allow_partial_results, + ) + def find_udm_field_values( self, query: str, page_size: int | None = None ) -> dict[str, Any]: diff --git a/src/secops/chronicle/raw_log_search.py b/src/secops/chronicle/raw_log_search.py new file mode 100644 index 0000000..718a6fe --- /dev/null +++ b/src/secops/chronicle/raw_log_search.py @@ -0,0 +1,90 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Raw log search functionality for Chronicle.""" + +from datetime import datetime +from typing import TYPE_CHECKING, Any + +from secops.chronicle.models import APIVersion +from secops.chronicle.utils.request_utils import chronicle_request + +if TYPE_CHECKING: + from secops.chronicle.client import ChronicleClient + + +def search_raw_logs( + client: "ChronicleClient", + query: str, + start_time: datetime, + end_time: datetime, + snapshot_query: str | None = None, + case_sensitive: bool = False, + log_types: list[str] | None = None, + max_aggregations_per_field: int | None = None, + page_size: int | None = None, + allow_partial_results: bool = False, +) -> dict[str, Any]: + """Search for raw logs in Chronicle. + + Args: + client: The ChronicleClient instance. + query: Query to search for raw logs. + start_time: Search start time (inclusive). + end_time: Search end time (exclusive). + snapshot_query: Optional. Query to filter results. + case_sensitive: Optional. Whether search is case-sensitive. + log_types: Optional. Limit results to specific log types (e.g. ["OKTA"]). + max_aggregations_per_field: Optional. Max values for a UDM field. + page_size: Optional. Maximum number of results to return. + allow_partial_results: Optional. Whether to allow partial results. + + Returns: + Dictionary containing search results. + + Raises: + APIError: If the API request fails. + """ + search_query: dict[str, Any] = { + "baselineQuery": query, + "baselineTimeRange": { + "startTime": start_time.strftime("%Y-%m-%dT%H:%M:%S.%fZ"), + "endTime": end_time.strftime("%Y-%m-%dT%H:%M:%S.%fZ"), + }, + "caseSensitive": case_sensitive, + } + + if snapshot_query: + search_query["snapshotQuery"] = snapshot_query + + if log_types: + # The API expects a list of LogType objects, filtering by displayName + search_query["logTypes"] = [{"displayName": lt} for lt in log_types] + + if max_aggregations_per_field is not None: + search_query["maxAggregationsPerField"] = max_aggregations_per_field + + if page_size is not None: + search_query["pageSize"] = page_size + + if allow_partial_results: + search_query["allowPartialResults"] = allow_partial_results + + return chronicle_request( + client, + method="POST", + endpoint_path=":searchRawLogs", + api_version=APIVersion.V1ALPHA, + json=search_query, + ) diff --git a/tests/chronicle/test_raw_log_search.py b/tests/chronicle/test_raw_log_search.py new file mode 100644 index 0000000..b9bca7c --- /dev/null +++ b/tests/chronicle/test_raw_log_search.py @@ -0,0 +1,78 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from datetime import datetime +from unittest.mock import Mock, patch +import pytest + +from secops.chronicle.raw_log_search import search_raw_logs +from secops.chronicle.models import APIVersion + + +@pytest.fixture +def client(): + return Mock() + + +def test_search_raw_logs_calls_chronicle_request_correctly(client): + start_time = datetime(2023, 1, 1, 12, 0, 0) + end_time = datetime(2023, 1, 2, 12, 0, 0) + query = 'user = "foo"' + + with patch("secops.chronicle.raw_log_search.chronicle_request") as mock_request: + search_raw_logs( + client, + query=query, + start_time=start_time, + end_time=end_time, + log_types=["OKTA", "AWS"], + case_sensitive=True, + allow_partial_results=True + ) + + mock_request.assert_called_once() + kwargs = mock_request.call_args.kwargs + json_body = kwargs["json"] + + assert kwargs["method"] == "POST" + assert kwargs["endpoint_path"] == ":searchRawLogs" + assert kwargs["api_version"] == APIVersion.V1ALPHA + + assert json_body["baselineQuery"] == query + assert json_body["baselineTimeRange"]["startTime"] == "2023-01-01T12:00:00.000000Z" + assert json_body["baselineTimeRange"]["endTime"] == "2023-01-02T12:00:00.000000Z" + assert json_body["caseSensitive"] is True + assert json_body["logTypes"] == [{"displayName": "OKTA"}, {"displayName": "AWS"}] + assert json_body["allowPartialResults"] is True + +def test_search_raw_logs_defaults(client): + start_time = datetime(2023, 1, 1, 12, 0, 0) + end_time = datetime(2023, 1, 2, 12, 0, 0) + query = 'user = "foo"' + + with patch("secops.chronicle.raw_log_search.chronicle_request") as mock_request: + search_raw_logs( + client, + query=query, + start_time=start_time, + end_time=end_time, + ) + + mock_request.assert_called_once() + kwargs = mock_request.call_args.kwargs + json_body = kwargs["json"] + + assert json_body["caseSensitive"] is False + assert "logTypes" not in json_body + assert "allowPartialResults" not in json_body