Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
# Copyright 2026 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""In-memory LRU cache for bucket metadata supporting App-centric Observability (ACO)."""

import logging
import threading

from google.api_core import exceptions as api_exceptions
from google.cloud.exceptions import NotFound
from google.cloud.storage._lru_cache import LRUCache

logger = logging.getLogger(__name__)


class BucketMetadataCache:
"""Thread-safe LRU cache for storing GCS bucket metadata (project number and location).

Supports Singleflight asynchronous background fetching to prevent stampedes on cache misses.
"""

def __init__(self, client, max_size=10000):
self._client = client
self._cache = LRUCache(max_size)
self._lock = threading.Lock()
self._inflight_fetches = set()
self._inflight_checks = set()

def get_or_queue_fetch(self, bucket_name):
"""Retrieve bucket metadata or queue a background fetch on cache miss.

Returns None immediately on cache miss so caller does not block.
"""
with self._lock:
if bucket_name in self._cache:
return self._cache.get(bucket_name)
elif bucket_name in self._inflight_fetches:
# this would be the case of thundering herd, where 'n' threads
# all of them faced "cache miss" and 1 is in progress to fetch metadata.
# hence we don't want rest `n - 1` threads to make the same req
return None
else:
# fire a background thread and get bucket metadata.
self._inflight_fetches.add(bucket_name)
threading.Thread(
target=self._fetch_background, args=(bucket_name,), daemon=True
).start()
return None

def check_and_evict(self, bucket_name):
"""Asynchronously verify if a bucket exists on 404 and evict if deleted."""
with self._lock:
if bucket_name not in self._cache:
return
if bucket_name in self._inflight_checks:
return
self._inflight_checks.add(bucket_name)
threading.Thread(
target=self._verify_existence_background,
args=(bucket_name,),
daemon=True,
).start()

def _verify_existence_background(self, bucket_name):
try:
bucket = self._client.bucket(bucket_name)
if not bucket.exists():
self.evict(bucket_name)
except Exception as e:
logger.debug(
f"Background verification for bucket existence failed for {bucket_name}: {e}"
)
finally:
with self._lock:
self._inflight_checks.discard(bucket_name)

def _fetch_background(self, bucket_name):
"""Asynchronously fetch bucket metadata and update the cache."""
try:
bucket = self._client.get_bucket(bucket_name, timeout=10.0)
self.update_from_bucket(bucket)
except (NotFound, api_exceptions.NotFound):
self.evict(bucket_name)
except api_exceptions.Forbidden:
# On 403 (Forbidden), cache fallback values permanently to avoid retry storms
self.update_cache(
bucket_name, f"projects/_/buckets/{bucket_name}", "global"
)
except Exception as e:
logger.debug(
f"Background fetch for bucket metadata failed for {bucket_name}: {e}"
)
finally:
with self._lock:
self._inflight_fetches.discard(bucket_name)

def update_from_bucket(self, bucket):
"""Update cache from a Bucket instance."""
if not bucket or not bucket.name:
return

project_number = getattr(bucket, "project_number", None)
location = getattr(bucket, "location", None) or "global"
location = location.lower()
location_type = getattr(bucket, "location_type", None) or "region"
location_type = location_type.lower()

if location_type in ("multi-region", "dual-region"):
location = "global"

if project_number:
destination_id = f"projects/{project_number}/buckets/{bucket.name}"
else:
destination_id = f"projects/_/buckets/{bucket.name}"

self.update_cache(bucket.name, destination_id, location)

def update_cache(self, bucket_name, destination_id, location):
"""Thread-safely update or insert a cache entry with bounded size."""
with self._lock:
self._cache.put(bucket_name, (destination_id, location))

def evict(self, bucket_name):
"""Remove a bucket from the cache (e.g., on 404)."""
with self._lock:
self._cache.delete(bucket_name)

def clear(self):
"""Clear all cached metadata."""
with self._lock:
self._cache.clear()
self._inflight_fetches.clear()
self._inflight_checks.clear()
100 changes: 100 additions & 0 deletions packages/google-cloud-storage/google/cloud/storage/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,30 @@

import base64
import datetime
import logging
import os
import secrets
import sys
from contextlib import contextmanager
from hashlib import md5
from urllib.parse import urlsplit, urlunsplit
from uuid import uuid4

from google.api_core import exceptions as api_exceptions
from google.cloud.exceptions import NotFound

from google.auth import environment_vars

from google.cloud.storage.constants import _DEFAULT_TIMEOUT
from google.cloud.storage.retry import (
DEFAULT_RETRY,
DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED,
)
from google.cloud.storage._opentelemetry_tracing import (
create_trace_span as _base_create_trace_span,
)

_logger = logging.getLogger(__name__)

STORAGE_EMULATOR_ENV_VAR = "STORAGE_EMULATOR_HOST" # Despite name, includes scheme.
"""Environment variable defining host for Storage emulator."""
Expand Down Expand Up @@ -185,6 +195,96 @@ def _require_client(self, client):
client = self.client
return client

def _get_aco_attributes(self):
from google.cloud.storage.blob import Blob
from google.cloud.storage.bucket import Bucket

if isinstance(self, Bucket):
cache = getattr(self.client, "_bucket_metadata_cache", None)
bucket_name = self.name
elif isinstance(self, Blob):
bucket = getattr(self, "bucket", None)
cache = (
getattr(bucket.client, "_bucket_metadata_cache", None)
if bucket and hasattr(bucket, "client")
else None
)
bucket_name = getattr(bucket, "name", None) if bucket else None
else:
raise TypeError(
f"Unexpected type for ACO attribute retrieval: {type(self)}"
)

if callable(bucket_name):
try:
bucket_name = bucket_name()
except Exception as e:
_logger.debug(
f"Failed callable bucket_name resolution in _get_aco_attributes: {e}"
)

if cache and bucket_name and isinstance(bucket_name, str):
try:
cached = cache.get_or_queue_fetch(bucket_name)
if cached and isinstance(cached, tuple) and len(cached) == 2:
dest_id, loc = cached
return {
"gcp.resource.destination.id": dest_id,
"gcp.resource.destination.location": loc,
}
except Exception as e:
_logger.debug(
f"Failed cache.get_or_queue_fetch in _get_aco_attributes: {e}"
)
return {}

@contextmanager
def _create_trace_span(self, name, attributes=None, **kwargs):
from google.cloud.storage.blob import Blob
from google.cloud.storage.bucket import Bucket

aco_attrs = self._get_aco_attributes()
if attributes is None:
attributes = {}
attributes.update(aco_attrs)
with _base_create_trace_span(name, attributes=attributes, **kwargs) as span:
try:
yield span
except (NotFound, api_exceptions.NotFound):
if isinstance(self, Bucket):
cache = getattr(self.client, "_bucket_metadata_cache", None)
bucket_name = self.name
elif isinstance(self, Blob):
bucket = getattr(self, "bucket", None)
cache = (
getattr(bucket.client, "_bucket_metadata_cache", None)
if bucket and hasattr(bucket, "client")
else None
)
bucket_name = (
getattr(bucket, "name", None) if bucket else None
)
else:
cache = None
bucket_name = None

if callable(bucket_name):
try:
bucket_name = bucket_name()
except Exception as e:
_logger.debug(
f"Failed callable bucket_name resolution on 404 in _create_trace_span: {e}"
)

if cache and bucket_name and isinstance(bucket_name, str):
try:
cache.check_and_evict(bucket_name)
except Exception as e:
_logger.debug(
f"Failed cache.check_and_evict on 404 in _create_trace_span: {e}"
)
raise

def _encryption_headers(self):
"""Return any encryption headers needed to fetch the object.

Expand Down
44 changes: 42 additions & 2 deletions packages/google-cloud-storage/google/cloud/storage/_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,17 @@
"""Create / interact with Google Cloud Storage connections."""

import functools
import logging
import re

from google.api_core import exceptions as api_exceptions
from google.cloud import _http
from google.cloud.exceptions import NotFound
from google.cloud.storage import __version__, _helpers
from google.cloud.storage._opentelemetry_tracing import create_trace_span

logger = logging.getLogger(__name__)


class Connection(_http.JSONConnection):
"""A connection to Google Cloud Storage via the JSON REST API.
Expand Down Expand Up @@ -71,11 +77,28 @@ def api_request(self, *args, **kwargs):
span_attributes = {
"gccl-invocation-id": invocation_id,
}
client = self._client
if hasattr(client, "_bucket_metadata_cache") and client._bucket_metadata_cache:
match = re.search(r"/b/([^/?#]+)", kwargs.get("path", ""))
if match:
try:
cached = client._bucket_metadata_cache.get_or_queue_fetch(
match.group(1)
)
if cached and isinstance(cached, tuple) and len(cached) == 2:
dest_id, loc = cached
span_attributes["gcp.resource.destination.id"] = dest_id
span_attributes["gcp.resource.destination.location"] = loc
except Exception as e:
logger.debug(
f"Failed cache.get_or_queue_fetch in api_request: {e}"
)

call = functools.partial(super(Connection, self).api_request, *args, **kwargs)
with create_trace_span(
name="Storage.Connection.api_request",
attributes=span_attributes,
client=self._client,
client=client,
api_request=kwargs,
retry=retry,
):
Expand All @@ -87,4 +110,21 @@ def api_request(self, *args, **kwargs):
pass
if retry:
call = retry(call)
return call()
try:
return call()
except (NotFound, api_exceptions.NotFound):
if (
hasattr(client, "_bucket_metadata_cache")
and client._bucket_metadata_cache
):
match = re.search(r"/b/([^/?#]+)", kwargs.get("path", ""))
if match:
try:
client._bucket_metadata_cache.check_and_evict(
match.group(1)
)
except Exception as e:
logger.debug(
f"Failed cache.check_and_evict on 404 in api_request: {e}"
)
raise
Loading
Loading