Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 99 additions & 8 deletions tests/perf/microbenchmarks/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,24 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Any, List
from typing import Any, List, Optional
import statistics
import io
import os
import socket
import psutil

_C4_STANDARD_192_NIC = "ens3" # can be fetched via ip link show

def publish_benchmark_extra_info(
benchmark: Any,
params: Any,
benchmark_group: str = "read",
true_times: List[float] = [],
download_bytes_list: Optional[List[int]] = None,
duration: Optional[int] = None,
) -> None:

"""
Helper function to publish benchmark parameters to the extra_info property.
"""
Expand All @@ -41,13 +47,23 @@ def publish_benchmark_extra_info(
benchmark.extra_info["processes"] = params.num_processes
benchmark.group = benchmark_group

object_size = params.file_size_bytes
num_files = params.num_files
total_uploaded_mib = object_size / (1024 * 1024) * num_files
min_throughput = total_uploaded_mib / benchmark.stats["max"]
max_throughput = total_uploaded_mib / benchmark.stats["min"]
mean_throughput = total_uploaded_mib / benchmark.stats["mean"]
median_throughput = total_uploaded_mib / benchmark.stats["median"]
if download_bytes_list is not None:
assert duration is not None, "Duration must be provided if total_bytes_transferred is provided."
throughputs_list = [x / duration / (1024 * 1024) for x in download_bytes_list]
min_throughput = min(throughputs_list)
max_throughput = max(throughputs_list)
mean_throughput = statistics.mean(throughputs_list)
median_throughput = statistics.median(throughputs_list)


else:
object_size = params.file_size_bytes
num_files = params.num_files
total_uploaded_mib = object_size / (1024 * 1024) * num_files
min_throughput = total_uploaded_mib / benchmark.stats["max"]
max_throughput = total_uploaded_mib / benchmark.stats["min"]
mean_throughput = total_uploaded_mib / benchmark.stats["mean"]
median_throughput = total_uploaded_mib / benchmark.stats["median"]

benchmark.extra_info["throughput_MiB_s_min"] = min_throughput
benchmark.extra_info["throughput_MiB_s_max"] = max_throughput
Expand Down Expand Up @@ -165,3 +181,78 @@ def seek(self, offset, whence=io.SEEK_SET):
# Clamp position to valid range [0, size]
self._pos = max(0, min(new_pos, self._size))
return self._pos


def get_nic_pci(nic):
"""Gets the PCI address of a network interface."""
return os.path.basename(os.readlink(f"/sys/class/net/{nic}/device"))


def get_irqs_for_pci(pci):
"""Gets the IRQs associated with a PCI address."""
irqs = []
with open("/proc/interrupts") as f:
for line in f:
if pci in line:
irq = line.split(":")[0].strip()
irqs.append(irq)
return irqs


def get_affinity(irq):
"""Gets the CPU affinity of an IRQ."""
path = f"/proc/irq/{irq}/smp_affinity_list"
try:
with open(path) as f:
return f.read().strip()
except FileNotFoundError:
return "N/A"


def get_primary_interface_name():
primary_ip = None

# 1. Determine the Local IP used for internet access
# We use UDP (SOCK_DGRAM) so we don't actually send a handshake/packet
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
# connect() to a public IP (Google DNS) to force route resolution
s.connect(('8.8.8.8', 80))
primary_ip = s.getsockname()[0]
except Exception:
# Fallback if no internet
return None
finally:
s.close()

# 2. Match that IP to an interface name using psutil
if primary_ip:
interfaces = psutil.net_if_addrs()
for name, addresses in interfaces.items():
for addr in addresses:
# check if this interface has the IP we found
if addr.address == primary_ip:
return name
return None


def get_irq_affinity():
"""Gets the set of CPUs for a given network interface."""
nic = get_primary_interface_name()
# if not nic:
# nic = _C4_STANDARD_192_NIC

pci = get_nic_pci(nic)
irqs = get_irqs_for_pci(pci)
cpus = set()
for irq in irqs:
affinity_str = get_affinity(irq)
if affinity_str != "N/A":
for part in affinity_str.split(','):
if not '-' in part:
cpus.add(int(part))
return cpus

if __name__ == "__main__":
cpus = get_irq_affinity()
print(f"CPUs handling network IRQs: {sorted(cpus)}")
21 changes: 21 additions & 0 deletions tests/perf/microbenchmarks/time_based/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Copyright 2026 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pytest


@pytest.fixture
def workload_params(request):
params = request.param
files_names = [f'fio-go_storage_fio.0.{i}' for i in range(0, params.num_processes)]
return params, files_names
106 changes: 106 additions & 0 deletions tests/perf/microbenchmarks/time_based/reads/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
# Copyright 2026 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import itertools
import os
from typing import Dict, List

import yaml

try:
from tests.perf.microbenchmarks.time_based.reads.parameters import (
TimeBasedReadParameters,
)
except ModuleNotFoundError:
from reads.parameters import TimeBasedReadParameters


def _get_params() -> Dict[str, List[TimeBasedReadParameters]]:
"""Generates a dictionary of benchmark parameters for time based read operations."""
params: Dict[str, List[TimeBasedReadParameters]] = {}
config_path = os.path.join(os.path.dirname(__file__), "config.yaml")
with open(config_path, "r") as f:
config = yaml.safe_load(f)

common_params = config["common"]
bucket_types = common_params["bucket_types"]
file_sizes_mib = common_params["file_sizes_mib"]
chunk_sizes_kib = common_params["chunk_sizes_kib"]
num_ranges = common_params["num_ranges"]
rounds = common_params["rounds"]
duration = common_params["duration"]
warmup_duration = common_params["warmup_duration"]

bucket_map = {
"zonal": os.environ.get(
"DEFAULT_RAPID_ZONAL_BUCKET",
config["defaults"]["DEFAULT_RAPID_ZONAL_BUCKET"],
),
"regional": os.environ.get(
"DEFAULT_STANDARD_BUCKET", config["defaults"]["DEFAULT_STANDARD_BUCKET"]
),
}

for workload in config["workload"]:
workload_name = workload["name"]
params[workload_name] = []
pattern = workload["pattern"]
processes = workload["processes"]
coros = workload["coros"]

# Create a product of all parameter combinations
product = itertools.product(
bucket_types,
file_sizes_mib,
chunk_sizes_kib,
num_ranges,
processes,
coros,
)

for (
bucket_type,
file_size_mib,
chunk_size_kib,
num_ranges_val,
num_processes,
num_coros,
) in product:
file_size_bytes = file_size_mib * 1024 * 1024
chunk_size_bytes = chunk_size_kib * 1024
bucket_name = bucket_map[bucket_type]

num_files = num_processes * num_coros

# Create a descriptive name for the parameter set
name = f"{pattern}_{bucket_type}_{num_processes}p_{file_size_mib}MiB_{chunk_size_kib}KiB_{num_ranges_val}ranges"

params[workload_name].append(
TimeBasedReadParameters(
name=name,
workload_name=workload_name,
pattern=pattern,
bucket_name=bucket_name,
bucket_type=bucket_type,
num_coros=num_coros,
num_processes=num_processes,
num_files=num_files,
rounds=rounds,
chunk_size_bytes=chunk_size_bytes,
file_size_bytes=file_size_bytes,
duration=duration,
warmup_duration=warmup_duration,
num_ranges=num_ranges_val,
)
)
return params
28 changes: 28 additions & 0 deletions tests/perf/microbenchmarks/time_based/reads/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
common:
bucket_types:
- "regional"
- "zonal"
file_sizes_mib:
- 10240 # 10GiB
chunk_sizes_kib: [64] # 16KiB
num_ranges: [1]
rounds: 1
duration: 30 # seconds
warmup_duration: 5 # seconds

workload:
############# multi process multi coroutine #########
- name: "read_seq_multi_process"
pattern: "seq"
coros: [1]
processes: [96]


- name: "read_rand_multi_process"
pattern: "rand"
coros: [1]
processes: [1]

defaults:
DEFAULT_RAPID_ZONAL_BUCKET: "chandrasiri-benchmarks-zb"
DEFAULT_STANDARD_BUCKET: "chandrasiri-benchmarks-rb"
23 changes: 23 additions & 0 deletions tests/perf/microbenchmarks/time_based/reads/parameters.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Copyright 2026 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from dataclasses import dataclass
from tests.perf.microbenchmarks.parameters import IOBenchmarkParameters


@dataclass
class TimeBasedReadParameters(IOBenchmarkParameters):
pattern: str
duration: int
warmup_duration: int
num_ranges: int
Loading