Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 79 additions & 0 deletions examples/raw_log_search_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Example usage of raw log search functionality."""

import argparse
from datetime import datetime, timedelta
from pprint import pprint

from secops.chronicle import ChronicleClient
from secops.exceptions import APIError


def main():
"""Run raw log search example."""
parser = argparse.ArgumentParser(description="Chronicle Raw Log Search Example")
parser.add_argument("--project_id", required=True, help="GCP Project ID")
parser.add_argument("--customer_id", required=True, help="Chronicle Customer ID")
parser.add_argument("--region", default="us", help="Chronicle Region")
parser.add_argument("--query", default="user = \"user\"", help="Raw log search query")
parser.add_argument("--days", type=int, default=1, help="Search time range in days")

args = parser.parse_args()

client = ChronicleClient(
project_id=args.project_id,
customer_id=args.customer_id,
region=args.region,
)

end_time = datetime.utcnow()
start_time = end_time - timedelta(days=args.days)

print(f"Searching raw logs from {start_time} to {end_time}")
print(f"Query: {args.query}")

try:
# Example 1: Basic Search
results = client.search_raw_logs(
query=args.query,
start_time=start_time,
end_time=end_time,
page_size=10,
)

print("\nResults:")
pprint(results)

# Example 2: Filtering by Log Type (if available)
# Note: Replace 'OKTA' with a valid log type in your environment
# print("\nSearching with Log Type filter:")
# results_filtered = client.search_raw_logs(
# query=args.query,
# start_time=start_time,
# end_time=end_time,
# page_size=10,
# log_types=["OKTA"]
# )
# pprint(results_filtered)

except APIError as e:
print(f"API Error: {e}")
except Exception as e:
print(f"Error: {e}")


if __name__ == "__main__":
main()
2 changes: 2 additions & 0 deletions src/secops/chronicle/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@
RowLogFormat,
generate_udm_key_value_mappings,
)
from secops.chronicle.raw_log_search import search_raw_logs
from secops.chronicle.udm_search import (
fetch_udm_search_csv,
fetch_udm_search_view,
Expand All @@ -210,6 +211,7 @@
"validate_query",
"get_stats",
"search_udm",
"search_raw_logs",
# Natural Language Search
"translate_nl_to_udm",
# Entity
Expand Down
45 changes: 45 additions & 0 deletions src/secops/chronicle/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@
)
from secops.chronicle.rule_validation import validate_rule as _validate_rule
from secops.chronicle.search import search_udm as _search_udm
from secops.chronicle.raw_log_search import search_raw_logs as _search_raw_logs
from secops.chronicle.stats import get_stats as _get_stats
from secops.chronicle.udm_mapping import RowLogFormat
from secops.chronicle.udm_mapping import (
Expand Down Expand Up @@ -908,6 +909,50 @@ def search_udm(
debug,
)

def search_raw_logs(
self,
query: str,
start_time: datetime,
end_time: datetime,
allow_partial_results: bool = False,
snapshot_query: str | None = None,
case_sensitive: bool = False,
log_types: list[str] | None = None,
max_aggregations_per_field: int | None = None,
page_size: int | None = None,
) -> dict[str, Any]:
"""Search for raw logs in Chronicle.

Args:
query: Query to search for raw logs.
start_time: Search start time (inclusive).
end_time: Search end time (exclusive).
allow_partial_results: Optional. Whether to allow partial results.
snapshot_query: Optional. Query to filter results.
case_sensitive: Optional. Whether search is case-sensitive.
log_types: Optional. Limit results to specific log types by display name (e.g. ["OKTA"]).
max_aggregations_per_field: Optional. Max values for a UDM field.
page_size: Optional. Maximum number of results to return.

Returns:
Dictionary containing search results.

Raises:
APIError: If the API request fails.
"""
return _search_raw_logs(
self,
query=query,
start_time=start_time,
end_time=end_time,
snapshot_query=snapshot_query,
case_sensitive=case_sensitive,
log_types=log_types,
max_aggregations_per_field=max_aggregations_per_field,
page_size=page_size,
allow_partial_results=allow_partial_results,
)

def find_udm_field_values(
self, query: str, page_size: int | None = None
) -> dict[str, Any]:
Expand Down
90 changes: 90 additions & 0 deletions src/secops/chronicle/raw_log_search.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# Copyright 2026 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Raw log search functionality for Chronicle."""

from datetime import datetime
from typing import TYPE_CHECKING, Any

from secops.chronicle.models import APIVersion
from secops.chronicle.utils.request_utils import chronicle_request

if TYPE_CHECKING:
from secops.chronicle.client import ChronicleClient


def search_raw_logs(
client: "ChronicleClient",
query: str,
start_time: datetime,
end_time: datetime,
snapshot_query: str | None = None,
case_sensitive: bool = False,
log_types: list[str] | None = None,
max_aggregations_per_field: int | None = None,
page_size: int | None = None,
allow_partial_results: bool = False,
) -> dict[str, Any]:
"""Search for raw logs in Chronicle.

Args:
client: The ChronicleClient instance.
query: Query to search for raw logs.
start_time: Search start time (inclusive).
end_time: Search end time (exclusive).
snapshot_query: Optional. Query to filter results.
case_sensitive: Optional. Whether search is case-sensitive.
log_types: Optional. Limit results to specific log types (e.g. ["OKTA"]).
max_aggregations_per_field: Optional. Max values for a UDM field.
page_size: Optional. Maximum number of results to return.
allow_partial_results: Optional. Whether to allow partial results.

Returns:
Dictionary containing search results.

Raises:
APIError: If the API request fails.
"""
search_query: dict[str, Any] = {
"baselineQuery": query,
"baselineTimeRange": {
"startTime": start_time.strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
"endTime": end_time.strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
},
"caseSensitive": case_sensitive,
}

if snapshot_query:
search_query["snapshotQuery"] = snapshot_query

if log_types:
# The API expects a list of LogType objects, filtering by displayName
search_query["logTypes"] = [{"displayName": lt} for lt in log_types]

if max_aggregations_per_field is not None:
search_query["maxAggregationsPerField"] = max_aggregations_per_field

if page_size is not None:
search_query["pageSize"] = page_size

if allow_partial_results:
search_query["allowPartialResults"] = allow_partial_results

return chronicle_request(
client,
method="POST",
endpoint_path=":searchRawLogs",
api_version=APIVersion.V1ALPHA,
json=search_query,
)
78 changes: 78 additions & 0 deletions tests/chronicle/test_raw_log_search.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from datetime import datetime
from unittest.mock import Mock, patch
import pytest

from secops.chronicle.raw_log_search import search_raw_logs
from secops.chronicle.models import APIVersion


@pytest.fixture
def client():
return Mock()


def test_search_raw_logs_calls_chronicle_request_correctly(client):
start_time = datetime(2023, 1, 1, 12, 0, 0)
end_time = datetime(2023, 1, 2, 12, 0, 0)
query = 'user = "foo"'

with patch("secops.chronicle.raw_log_search.chronicle_request") as mock_request:
search_raw_logs(
client,
query=query,
start_time=start_time,
end_time=end_time,
log_types=["OKTA", "AWS"],
case_sensitive=True,
allow_partial_results=True
)

mock_request.assert_called_once()
kwargs = mock_request.call_args.kwargs
json_body = kwargs["json"]

assert kwargs["method"] == "POST"
assert kwargs["endpoint_path"] == ":searchRawLogs"
assert kwargs["api_version"] == APIVersion.V1ALPHA

assert json_body["baselineQuery"] == query
assert json_body["baselineTimeRange"]["startTime"] == "2023-01-01T12:00:00.000000Z"
assert json_body["baselineTimeRange"]["endTime"] == "2023-01-02T12:00:00.000000Z"
assert json_body["caseSensitive"] is True
assert json_body["logTypes"] == [{"displayName": "OKTA"}, {"displayName": "AWS"}]
assert json_body["allowPartialResults"] is True

def test_search_raw_logs_defaults(client):
start_time = datetime(2023, 1, 1, 12, 0, 0)
end_time = datetime(2023, 1, 2, 12, 0, 0)
query = 'user = "foo"'

with patch("secops.chronicle.raw_log_search.chronicle_request") as mock_request:
search_raw_logs(
client,
query=query,
start_time=start_time,
end_time=end_time,
)

mock_request.assert_called_once()
kwargs = mock_request.call_args.kwargs
json_body = kwargs["json"]

assert json_body["caseSensitive"] is False
assert "logTypes" not in json_body
assert "allowPartialResults" not in json_body
Loading