diff --git a/aws-proxy/AGENTS.md b/aws-proxy/AGENTS.md index f963adb..09ae3d8 100644 --- a/aws-proxy/AGENTS.md +++ b/aws-proxy/AGENTS.md @@ -38,3 +38,9 @@ When adding new integration tests, consider the following: * Make sure to either use fixtures (preferred), or reliable cleanups for removing the resources; several fixtures for creating AWS resources are available in the `localstack.testing.pytest.fixtures` module * If a test uses multiple resources with interdependencies (e.g., an SQS queue connected to an SNS topic), then the test needs to ensure that both resource types are proxied (i.e., created in real AWS), to avoid a situation where a resource in AWS is attempting to reference a local resource in LocalStack (using account ID `000000000000` in their ARN). * When waiting for the creation status of a resource, use the `localstack.utils.sync.retry(..)` utility function, rather than a manual `for` loop. + +## Fixing or Enhancing Logic in the Proxy + +Notes: +* The AWS proxy is running as a LocalStack Extension, and the tests are currently set up in a way that they assume the container to be running with the Extension in dev mode. Hence, in order to make actual changes to the proxy logic, we'll need to restart the LocalStack main container. You can either ask me (the user) to restart the container whenever you're making changes in the core logic, or alternatively remove the `localstack-main` container, and then run `EXTENSION_DEV_MODE=1 DEBUG=1 localstack start -d` again to restart the container, which may reveal some error logs, stack traces, etc. +* If the proxy raises errors or something seems off, you can grab and parse the output of the LocalStack container via `localstack logs`. diff --git a/aws-proxy/aws_proxy/client/auth_proxy.py b/aws-proxy/aws_proxy/client/auth_proxy.py index 5426029..845d8fb 100644 --- a/aws-proxy/aws_proxy/client/auth_proxy.py +++ b/aws-proxy/aws_proxy/client/auth_proxy.py @@ -1,3 +1,4 @@ +# Note/disclosure: This file has been partially modified by an AI agent. import json import logging import os @@ -13,7 +14,6 @@ from botocore.awsrequest import AWSPreparedRequest from botocore.model import OperationModel from localstack import config as localstack_config -from localstack.aws.spec import load_service from localstack.config import external_service_url from localstack.constants import ( AWS_REGION_US_EAST_1, @@ -51,6 +51,11 @@ if localstack_config.DEBUG: LOG.setLevel(logging.DEBUG) +# Mapping from AWS service signing names to boto3 client names +SERVICE_NAME_MAPPING = { + "monitoring": "cloudwatch", +} + # TODO make configurable CLI_PIP_PACKAGE = "localstack-extension-aws-proxy" # note: enable the line below temporarily for testing: @@ -86,6 +91,8 @@ def proxy_request(self, request: Request, data: bytes) -> Response: if not parsed: return requests_response("", status_code=400) region_name, service_name = parsed + # Map AWS signing names to boto3 client names + service_name = SERVICE_NAME_MAPPING.get(service_name, service_name) query_string = to_str(request.query_string or "") LOG.debug( @@ -97,10 +104,12 @@ def proxy_request(self, request: Request, data: bytes) -> Response: query_string, ) + # Convert Quart headers to a dict for the LocalStack Request + headers_dict = dict(request.headers) request = Request( body=data, method=request.method, - headers=request.headers, + headers=headers_dict, path=request.path, query_string=query_string, ) @@ -172,12 +181,28 @@ def register_in_instance(self): ) raise + def deregister_from_instance(self): + """Deregister this proxy from the LocalStack instance.""" + port = getattr(self, "port", None) + if not port: + return + url = f"{external_service_url()}{HANDLER_PATH_PROXIES}/{port}" + LOG.debug("Deregistering proxy from main container via: %s", url) + try: + response = requests.delete(url) + return response + except Exception as e: + LOG.debug("Unable to deregister auth proxy: %s", e) + def _parse_aws_request( self, request: Request, service_name: str, region_name: str, client ) -> Tuple[OperationModel, AWSPreparedRequest, Dict]: from localstack.aws.protocol.parser import create_parser - parser = create_parser(load_service(service_name)) + # Use botocore's service model to ensure protocol compatibility + # (LocalStack's load_service may return newer protocol versions that don't match the client) + service_model = self._get_botocore_service_model(service_name) + parser = create_parser(service_model) operation_model, parsed_request = parser.parse(request) request_context = { "client_region": region_name, @@ -315,6 +340,22 @@ def _query_account_id_from_aws(self) -> str: result = sts_client.get_caller_identity() return result["Account"] + @staticmethod + @cache + def _get_botocore_service_model(service_name: str): + """ + Get the botocore service model for a service. This is used instead of LocalStack's + load_service() to ensure protocol compatibility, as LocalStack may use newer protocol + versions (e.g., smithy-rpc-v2-cbor) while clients use older protocols (e.g., query). + """ + import botocore.session + from botocore.model import ServiceModel + + session = botocore.session.get_session() + loader = session.get_component("data_loader") + api_data = loader.load_service_model(service_name, "service-2") + return ServiceModel(api_data) + def start_aws_auth_proxy(config: ProxyConfig, port: int = None) -> AuthProxyAWS: setup_logging() diff --git a/aws-proxy/aws_proxy/server/aws_request_forwarder.py b/aws-proxy/aws_proxy/server/aws_request_forwarder.py index 6b4cdac..567aa6e 100644 --- a/aws-proxy/aws_proxy/server/aws_request_forwarder.py +++ b/aws-proxy/aws_proxy/server/aws_request_forwarder.py @@ -1,9 +1,12 @@ +# Note/disclosure: This file has been partially modified by an AI agent. import json import logging import re from typing import Dict, Optional +from urllib.parse import urlencode import requests +from botocore.serialize import create_serializer from localstack.aws.api import RequestContext from localstack.aws.chain import Handler, HandlerChain from localstack.constants import APPLICATION_JSON, LOCALHOST, LOCALHOST_HOSTNAME @@ -134,7 +137,32 @@ def _request_matches_resource( secret_id, account_id=context.account_id, region_name=context.region ) return bool(re.match(resource_name_pattern, secret_arn)) - # TODO: add more resource patterns + if service_name == "cloudwatch": + # CloudWatch alarm ARN format: arn:aws:cloudwatch:{region}:{account}:alarm:{alarm_name} + alarm_name = context.service_request.get("AlarmName") or "" + alarm_names = context.service_request.get("AlarmNames") or [] + if alarm_name: + alarm_names = [alarm_name] + if alarm_names: + for name in alarm_names: + alarm_arn = f"arn:aws:cloudwatch:{context.region}:{context.account_id}:alarm:{name}" + if re.match(resource_name_pattern, alarm_arn): + return True + return False + # For metric operations without alarm names, check if pattern is generic + return bool(re.match(resource_name_pattern, ".*")) + if service_name == "logs": + # CloudWatch Logs ARN format: arn:aws:logs:{region}:{account}:log-group:{name}:* + log_group_name = context.service_request.get("logGroupName") or "" + log_group_prefix = ( + context.service_request.get("logGroupNamePrefix") or "" + ) + name = log_group_name or log_group_prefix + if name: + log_group_arn = f"arn:aws:logs:{context.region}:{context.account_id}:log-group:{name}:*" + return bool(re.match(resource_name_pattern, log_group_arn)) + # No log group name specified - check if pattern is generic + return bool(re.match(resource_name_pattern, ".*")) except re.error as e: raise Exception( "Error evaluating regular expression - please verify proxy configuration" @@ -168,6 +196,12 @@ def forward_request( data = request.form elif request.data: data = request.data + + # Fallback: if data is empty and we have parsed service_request, + # reconstruct the request body (handles cases where form data was consumed) + if not data and context.service_request: + data = self._reconstruct_request_body(context, ctype) + LOG.debug( "Forward request: %s %s - %s - %s", request.method, @@ -256,6 +290,35 @@ def _get_resource_names(cls, service_config: ProxyServiceConfig) -> list[str]: @classmethod def _get_canonical_service_name(cls, service_name: str) -> str: - if service_name == "sqs-query": - return "sqs" - return service_name + # Map internal/signing service names to boto3 client names + mapping = { + "sqs-query": "sqs", + "monitoring": "cloudwatch", + } + return mapping.get(service_name, service_name) + + def _reconstruct_request_body( + self, context: RequestContext, content_type: str + ) -> bytes: + """ + Reconstruct the request body from the parsed service_request. + This is used when the original request body was consumed during parsing. + """ + try: + protocol = context.service.protocol + if protocol == "query" or "x-www-form-urlencoded" in (content_type or ""): + # For Query protocol, serialize using botocore serializer + serializer = create_serializer(protocol) + operation_model = context.operation + serialized = serializer.serialize_to_request( + context.service_request, operation_model + ) + body = serialized.get("body", {}) + if isinstance(body, dict): + return urlencode(body, doseq=True) + return body + elif protocol == "json" or protocol == "rest-json": + return json.dumps(context.service_request) + except Exception as e: + LOG.debug("Failed to reconstruct request body: %s", e) + return b"" diff --git a/aws-proxy/aws_proxy/server/request_handler.py b/aws-proxy/aws_proxy/server/request_handler.py index 0de720f..481da6d 100644 --- a/aws-proxy/aws_proxy/server/request_handler.py +++ b/aws-proxy/aws_proxy/server/request_handler.py @@ -1,3 +1,4 @@ +# Note/disclosure: This file has been partially modified by an AI agent. import json import logging import os.path @@ -43,6 +44,11 @@ def add_proxy(self, request: Request, **kwargs): result = handle_proxies_request(req) return result or {} + @route(f"{HANDLER_PATH_PROXIES}/", methods=["DELETE"]) + def delete_proxy(self, request: Request, port: int, **kwargs): + removed = AwsProxyHandler.PROXY_INSTANCES.pop(port, None) + return {"removed": removed is not None} + @route(f"{HANDLER_PATH_PROXIES}/status", methods=["GET"]) def get_status(self, request: Request, **kwargs): containers = get_proxy_containers() diff --git a/aws-proxy/tests/conftest.py b/aws-proxy/tests/conftest.py index 47d5b02..d686c8a 100644 --- a/aws-proxy/tests/conftest.py +++ b/aws-proxy/tests/conftest.py @@ -1,3 +1,4 @@ +# Note/disclosure: This file has been partially modified by an AI agent. import os import pytest @@ -51,4 +52,6 @@ def _start(config: dict = None): yield _start for proxy in proxies: + # Deregister from LocalStack instance before shutting down + proxy.deregister_from_instance() proxy.shutdown() diff --git a/aws-proxy/tests/proxy/test_cloudwatch.py b/aws-proxy/tests/proxy/test_cloudwatch.py new file mode 100644 index 0000000..386c571 --- /dev/null +++ b/aws-proxy/tests/proxy/test_cloudwatch.py @@ -0,0 +1,555 @@ +# Note/disclosure: This file has been (partially or fully) generated by an AI agent. +import time +from datetime import datetime, timezone + +import boto3 +from localstack.aws.connect import connect_to +from localstack.utils.strings import short_uid +from localstack.utils.sync import retry + +from aws_proxy.shared.models import ProxyConfig + + +# ============================================================================= +# CloudWatch Metrics Tests +# ============================================================================= + + +def test_cloudwatch_metric_operations(start_aws_proxy, cleanups): + """Test basic CloudWatch metric operations with proxy. + + Note: CloudWatch metrics have significant eventual consistency delays (2-5 minutes). + This test focuses on verifying the proxy functionality by testing PutMetricData + and ListMetrics operations rather than waiting for GetMetricData to return values. + """ + namespace = f"TestNamespace/{short_uid()}" + metric_name = f"TestMetric-{short_uid()}" + + # start proxy - forwarding requests for CloudWatch metrics + config = ProxyConfig(services={"cloudwatch": {"resources": [".*"]}}) + start_aws_proxy(config) + + # create clients + cw_client = connect_to().cloudwatch + cw_client_aws = boto3.client("cloudwatch") + + # put metric data to AWS directly + timestamp = datetime.now(timezone.utc) + cw_client_aws.put_metric_data( + Namespace=namespace, + MetricData=[ + { + "MetricName": metric_name, + "Value": 42.0, + "Unit": "Count", + "Timestamp": timestamp, + } + ], + ) + + # put metric data through local client (proxied) to verify proxy forwards correctly + metric_name_2 = f"TestMetric2-{short_uid()}" + cw_client.put_metric_data( + Namespace=namespace, + MetricData=[ + { + "MetricName": metric_name_2, + "Value": 100.0, + "Unit": "Count", + "Timestamp": datetime.now(timezone.utc), + } + ], + ) + + # verify metrics exist via list_metrics (faster than get_metric_data for new metrics) + def _verify_metrics_aws(): + response = cw_client_aws.list_metrics(Namespace=namespace) + metric_names = [m["MetricName"] for m in response.get("Metrics", [])] + if metric_name not in metric_names or metric_name_2 not in metric_names: + raise AssertionError(f"Metrics not found in AWS yet. Found: {metric_names}") + return response + + metrics_aws = retry(_verify_metrics_aws, retries=20, sleep=5) + metric_names = [m["MetricName"] for m in metrics_aws["Metrics"]] + assert metric_name in metric_names + assert metric_name_2 in metric_names # This proves the proxy forwarded the request + + # verify list_metrics through proxy returns the same data + metrics_local = cw_client.list_metrics(Namespace=namespace) + metric_names_local = [m["MetricName"] for m in metrics_local.get("Metrics", [])] + assert metric_name in metric_names_local + assert metric_name_2 in metric_names_local + + +def test_cloudwatch_alarm_operations(start_aws_proxy, cleanups): + """Test CloudWatch alarm operations with proxy.""" + test_id = short_uid() + alarm_name = f"test-alarm-{test_id}" + namespace = f"TestNamespace/{short_uid()}" + metric_name = f"TestMetric-{short_uid()}" + + # start proxy - forwarding requests for CloudWatch alarms matching "test-alarm-{test_id}*" + config = ProxyConfig( + services={"cloudwatch": {"resources": [f".*:alarm:test-alarm-{test_id}.*"]}} + ) + start_aws_proxy(config) + + # create clients + cw_client = connect_to().cloudwatch + cw_client_aws = boto3.client("cloudwatch") + + # create alarm in AWS + cw_client_aws.put_metric_alarm( + AlarmName=alarm_name, + MetricName=metric_name, + Namespace=namespace, + Statistic="Average", + Period=60, + EvaluationPeriods=1, + Threshold=50.0, + ComparisonOperator="GreaterThanThreshold", + ) + cleanups.append(lambda: cw_client_aws.delete_alarms(AlarmNames=[alarm_name])) + + # describe alarm through local client (proxied) + alarms_local = cw_client.describe_alarms(AlarmNames=[alarm_name]) + alarms_aws = cw_client_aws.describe_alarms(AlarmNames=[alarm_name]) + + assert len(alarms_local["MetricAlarms"]) == 1 + assert len(alarms_aws["MetricAlarms"]) == 1 + assert alarms_local["MetricAlarms"][0]["AlarmName"] == alarm_name + assert ( + alarms_local["MetricAlarms"][0]["AlarmArn"] + == alarms_aws["MetricAlarms"][0]["AlarmArn"] + ) + + # create alarm through local client (proxied) + alarm_name_2 = f"test-alarm-{test_id}-2" + cw_client.put_metric_alarm( + AlarmName=alarm_name_2, + MetricName=metric_name, + Namespace=namespace, + Statistic="Sum", + Period=300, + EvaluationPeriods=2, + Threshold=100.0, + ComparisonOperator="LessThanThreshold", + ) + cleanups.append(lambda: cw_client_aws.delete_alarms(AlarmNames=[alarm_name_2])) + + # verify alarm exists in AWS + alarms_aws_2 = cw_client_aws.describe_alarms(AlarmNames=[alarm_name_2]) + assert len(alarms_aws_2["MetricAlarms"]) == 1 + assert alarms_aws_2["MetricAlarms"][0]["AlarmName"] == alarm_name_2 + + +def test_cloudwatch_readonly_operations(start_aws_proxy, cleanups): + """Test CloudWatch operations in read-only proxy mode.""" + alarm_name = f"test-readonly-alarm-{short_uid()}" + namespace = f"TestNamespace/{short_uid()}" + metric_name = f"TestMetric-{short_uid()}" + + # start proxy - forwarding requests for CloudWatch in read-only mode + config = ProxyConfig( + services={ + "cloudwatch": {"resources": [f".*:alarm:{alarm_name}"], "read_only": True} + } + ) + start_aws_proxy(config) + + # create clients + cw_client = connect_to().cloudwatch + cw_client_aws = boto3.client("cloudwatch") + + # create alarm in AWS (this should succeed as it's direct AWS client) + cw_client_aws.put_metric_alarm( + AlarmName=alarm_name, + MetricName=metric_name, + Namespace=namespace, + Statistic="Average", + Period=60, + EvaluationPeriods=1, + Threshold=50.0, + ComparisonOperator="GreaterThanThreshold", + ) + cleanups.append(lambda: cw_client_aws.delete_alarms(AlarmNames=[alarm_name])) + + # assert that local call for describe_alarms is proxied and results are consistent + alarms_local = cw_client.describe_alarms(AlarmNames=[alarm_name]) + alarms_aws = cw_client_aws.describe_alarms(AlarmNames=[alarm_name]) + assert ( + alarms_local["MetricAlarms"][0]["AlarmArn"] + == alarms_aws["MetricAlarms"][0]["AlarmArn"] + ) + + # Negative test: attempt write operations with proxied client in read-only mode + # Create a new alarm using the proxied client (should succeed in LocalStack locally) + new_alarm_name = f"no-proxy-alarm-{short_uid()}" + cw_client.put_metric_alarm( + AlarmName=new_alarm_name, + MetricName=metric_name, + Namespace=namespace, + Statistic="Average", + Period=60, + EvaluationPeriods=1, + Threshold=50.0, + ComparisonOperator="GreaterThanThreshold", + ) + cleanups.append(lambda: cw_client.delete_alarms(AlarmNames=[new_alarm_name])) + + # Verify that this new alarm does NOT exist in real AWS + alarms_aws_new = cw_client_aws.describe_alarms(AlarmNames=[new_alarm_name]) + assert len(alarms_aws_new["MetricAlarms"]) == 0 + + +def test_cloudwatch_resource_name_matching(start_aws_proxy, cleanups): + """Test that proxy forwards requests for specific CloudWatch alarms matching ARN pattern.""" + alarm_name_match = f"proxy-alarm-{short_uid()}" + alarm_name_nomatch = f"local-alarm-{short_uid()}" + namespace = f"TestNamespace/{short_uid()}" + metric_name = f"TestMetric-{short_uid()}" + + # start proxy - only forwarding requests for alarms starting with "proxy-" + config = ProxyConfig(services={"cloudwatch": {"resources": ".*:alarm:proxy-.*"}}) + start_aws_proxy(config) + + # create clients + cw_client = connect_to().cloudwatch + cw_client_aws = boto3.client("cloudwatch") + + # create alarm in AWS that matches the pattern + cw_client_aws.put_metric_alarm( + AlarmName=alarm_name_match, + MetricName=metric_name, + Namespace=namespace, + Statistic="Average", + Period=60, + EvaluationPeriods=1, + Threshold=50.0, + ComparisonOperator="GreaterThanThreshold", + ) + cleanups.append(lambda: cw_client_aws.delete_alarms(AlarmNames=[alarm_name_match])) + + # assert that the matching alarm is proxied + alarms_local = cw_client.describe_alarms(AlarmNames=[alarm_name_match]) + alarms_aws = cw_client_aws.describe_alarms(AlarmNames=[alarm_name_match]) + assert len(alarms_local["MetricAlarms"]) == 1 + assert ( + alarms_local["MetricAlarms"][0]["AlarmArn"] + == alarms_aws["MetricAlarms"][0]["AlarmArn"] + ) + + # create a local alarm that doesn't match the pattern + cw_client.put_metric_alarm( + AlarmName=alarm_name_nomatch, + MetricName=metric_name, + Namespace=namespace, + Statistic="Average", + Period=60, + EvaluationPeriods=1, + Threshold=50.0, + ComparisonOperator="GreaterThanThreshold", + ) + cleanups.append(lambda: cw_client.delete_alarms(AlarmNames=[alarm_name_nomatch])) + + # verify that the non-matching alarm was created locally but NOT in AWS + alarms_aws_nomatch = cw_client_aws.describe_alarms(AlarmNames=[alarm_name_nomatch]) + assert len(alarms_aws_nomatch["MetricAlarms"]) == 0 + + +# ============================================================================= +# CloudWatch Logs Tests +# ============================================================================= + + +def test_logs_group_operations(start_aws_proxy, cleanups): + """Test basic CloudWatch Logs group operations with proxy.""" + log_group_name = f"/test/logs/{short_uid()}" + + # start proxy - forwarding requests for CloudWatch Logs + config = ProxyConfig( + services={"logs": {"resources": [f".*:log-group:{log_group_name}:.*"]}} + ) + start_aws_proxy(config) + + # create clients + logs_client = connect_to().logs + logs_client_aws = boto3.client("logs") + + # create log group in AWS + logs_client_aws.create_log_group(logGroupName=log_group_name) + cleanups.append( + lambda: logs_client_aws.delete_log_group(logGroupName=log_group_name) + ) + + # describe log groups through local client (proxied) + groups_local = logs_client.describe_log_groups(logGroupNamePrefix=log_group_name) + groups_aws = logs_client_aws.describe_log_groups(logGroupNamePrefix=log_group_name) + + assert len(groups_local["logGroups"]) == 1 + assert len(groups_aws["logGroups"]) == 1 + assert groups_local["logGroups"][0]["logGroupName"] == log_group_name + assert groups_local["logGroups"][0]["arn"] == groups_aws["logGroups"][0]["arn"] + + +def test_logs_stream_and_events(start_aws_proxy, cleanups): + """Test CloudWatch Logs stream and event operations with proxy.""" + log_group_name = f"/test/logs/{short_uid()}" + log_stream_name = f"test-stream-{short_uid()}" + + # start proxy - forwarding requests for CloudWatch Logs + config = ProxyConfig( + services={"logs": {"resources": [f".*:log-group:{log_group_name}:.*"]}} + ) + start_aws_proxy(config) + + # create clients + logs_client = connect_to().logs + logs_client_aws = boto3.client("logs") + + # create log group and stream in AWS + logs_client_aws.create_log_group(logGroupName=log_group_name) + cleanups.append( + lambda: logs_client_aws.delete_log_group(logGroupName=log_group_name) + ) + + logs_client_aws.create_log_stream( + logGroupName=log_group_name, logStreamName=log_stream_name + ) + + # put log events through AWS client + timestamp = int(time.time() * 1000) + logs_client_aws.put_log_events( + logGroupName=log_group_name, + logStreamName=log_stream_name, + logEvents=[ + {"timestamp": timestamp, "message": "Test message from AWS"}, + ], + ) + + # get log events through local client (proxied) + def _get_log_events(): + response = logs_client.get_log_events( + logGroupName=log_group_name, + logStreamName=log_stream_name, + startFromHead=True, + ) + if not response["events"]: + raise AssertionError("Log events not available yet") + return response + + events_local = retry(_get_log_events, retries=10, sleep=2) + assert len(events_local["events"]) >= 1 + assert events_local["events"][0]["message"] == "Test message from AWS" + + # put log events through local client (proxied) + timestamp_2 = int(time.time() * 1000) + logs_client.put_log_events( + logGroupName=log_group_name, + logStreamName=log_stream_name, + logEvents=[ + {"timestamp": timestamp_2, "message": "Test message from LocalStack"}, + ], + ) + + # verify via AWS client + def _verify_events_aws(): + response = logs_client_aws.get_log_events( + logGroupName=log_group_name, + logStreamName=log_stream_name, + startFromHead=True, + ) + messages = [e["message"] for e in response["events"]] + if "Test message from LocalStack" not in messages: + raise AssertionError("Log event from LocalStack not found in AWS") + return response + + events_aws = retry(_verify_events_aws, retries=10, sleep=2) + messages = [e["message"] for e in events_aws["events"]] + assert "Test message from AWS" in messages + assert "Test message from LocalStack" in messages + + +def test_logs_readonly_operations(start_aws_proxy, cleanups): + """Test CloudWatch Logs operations in read-only proxy mode.""" + log_group_name = f"/test/readonly/{short_uid()}" + log_stream_name = f"test-stream-{short_uid()}" + + # start proxy - forwarding requests for CloudWatch Logs in read-only mode + config = ProxyConfig( + services={ + "logs": { + "resources": [f".*:log-group:{log_group_name}:.*"], + "read_only": True, + } + } + ) + start_aws_proxy(config) + + # create clients + logs_client = connect_to().logs + logs_client_aws = boto3.client("logs") + + # create log group and stream in AWS (this should succeed as it's direct AWS client) + logs_client_aws.create_log_group(logGroupName=log_group_name) + cleanups.append( + lambda: logs_client_aws.delete_log_group(logGroupName=log_group_name) + ) + + logs_client_aws.create_log_stream( + logGroupName=log_group_name, logStreamName=log_stream_name + ) + + # put log events through AWS client + timestamp = int(time.time() * 1000) + logs_client_aws.put_log_events( + logGroupName=log_group_name, + logStreamName=log_stream_name, + logEvents=[ + {"timestamp": timestamp, "message": "Test message from AWS"}, + ], + ) + + # assert that local call for describe_log_groups is proxied and results are consistent + groups_local = logs_client.describe_log_groups(logGroupNamePrefix=log_group_name) + groups_aws = logs_client_aws.describe_log_groups(logGroupNamePrefix=log_group_name) + assert groups_local["logGroups"][0]["arn"] == groups_aws["logGroups"][0]["arn"] + + # get log events through local client (proxied) - should work in read-only mode + def _get_log_events(): + response = logs_client.get_log_events( + logGroupName=log_group_name, + logStreamName=log_stream_name, + startFromHead=True, + ) + if not response["events"]: + raise AssertionError("Log events not available yet") + return response + + events_local = retry(_get_log_events, retries=10, sleep=2) + assert events_local["events"][0]["message"] == "Test message from AWS" + + # Negative test: attempt write operations with proxied client in read-only mode + # Create a new log group using the proxied client (should succeed in LocalStack locally) + new_log_group_name = f"/test/no-proxy/{short_uid()}" + logs_client.create_log_group(logGroupName=new_log_group_name) + cleanups.append( + lambda: logs_client.delete_log_group(logGroupName=new_log_group_name) + ) + + # Verify that this new log group does NOT exist in real AWS + groups_aws_new = logs_client_aws.describe_log_groups( + logGroupNamePrefix=new_log_group_name + ) + assert len(groups_aws_new["logGroups"]) == 0 + + +def test_logs_resource_name_matching(start_aws_proxy, cleanups): + """Test that proxy forwards requests for specific log groups matching ARN pattern.""" + log_group_match = f"/proxy/logs/{short_uid()}" + log_group_nomatch = f"/local/logs/{short_uid()}" + + # start proxy - only forwarding requests for log groups starting with "/proxy/" + config = ProxyConfig(services={"logs": {"resources": ".*:log-group:/proxy/.*"}}) + start_aws_proxy(config) + + # create clients + logs_client = connect_to().logs + logs_client_aws = boto3.client("logs") + + # create log group in AWS that matches the pattern + logs_client_aws.create_log_group(logGroupName=log_group_match) + cleanups.append( + lambda: logs_client_aws.delete_log_group(logGroupName=log_group_match) + ) + + # assert that the matching log group is proxied + groups_local = logs_client.describe_log_groups(logGroupNamePrefix=log_group_match) + groups_aws = logs_client_aws.describe_log_groups(logGroupNamePrefix=log_group_match) + assert len(groups_local["logGroups"]) == 1 + assert groups_local["logGroups"][0]["arn"] == groups_aws["logGroups"][0]["arn"] + + # create a local log group that doesn't match the pattern + logs_client.create_log_group(logGroupName=log_group_nomatch) + cleanups.append( + lambda: logs_client.delete_log_group(logGroupName=log_group_nomatch) + ) + + # verify that the non-matching log group was created locally but NOT in AWS + groups_aws_nomatch = logs_client_aws.describe_log_groups( + logGroupNamePrefix=log_group_nomatch + ) + assert len(groups_aws_nomatch["logGroups"]) == 0 + + +def test_logs_filter_log_events(start_aws_proxy, cleanups): + """Test CloudWatch Logs filter_log_events operation with proxy.""" + log_group_name = f"/test/filter/{short_uid()}" + log_stream_name_1 = f"stream-1-{short_uid()}" + log_stream_name_2 = f"stream-2-{short_uid()}" + + # start proxy - forwarding requests for CloudWatch Logs + config = ProxyConfig( + services={"logs": {"resources": [f".*:log-group:{log_group_name}:.*"]}} + ) + start_aws_proxy(config) + + # create clients + logs_client = connect_to().logs + logs_client_aws = boto3.client("logs") + + # create log group and streams in AWS + logs_client_aws.create_log_group(logGroupName=log_group_name) + cleanups.append( + lambda: logs_client_aws.delete_log_group(logGroupName=log_group_name) + ) + + logs_client_aws.create_log_stream( + logGroupName=log_group_name, logStreamName=log_stream_name_1 + ) + logs_client_aws.create_log_stream( + logGroupName=log_group_name, logStreamName=log_stream_name_2 + ) + + # put log events with different messages + timestamp = int(time.time() * 1000) + logs_client_aws.put_log_events( + logGroupName=log_group_name, + logStreamName=log_stream_name_1, + logEvents=[ + {"timestamp": timestamp, "message": "ERROR: Something went wrong"}, + {"timestamp": timestamp + 1, "message": "INFO: Normal operation"}, + ], + ) + logs_client_aws.put_log_events( + logGroupName=log_group_name, + logStreamName=log_stream_name_2, + logEvents=[ + {"timestamp": timestamp + 2, "message": "ERROR: Another error"}, + {"timestamp": timestamp + 3, "message": "DEBUG: Debug info"}, + ], + ) + + # filter log events through local client (proxied) + def _filter_log_events(): + response = logs_client.filter_log_events( + logGroupName=log_group_name, + filterPattern="ERROR", + ) + if len(response["events"]) < 2: + raise AssertionError("Not all error events found yet") + return response + + filtered_local = retry(_filter_log_events, retries=15, sleep=2) + + # verify only ERROR messages are returned + assert len(filtered_local["events"]) >= 2 + for event in filtered_local["events"]: + assert "ERROR" in event["message"] + + # compare with AWS client results + filtered_aws = logs_client_aws.filter_log_events( + logGroupName=log_group_name, + filterPattern="ERROR", + ) + assert len(filtered_local["events"]) == len(filtered_aws["events"])