From ed5bce6d6dca30d3c4c641ab49c25246ccedde08 Mon Sep 17 00:00:00 2001 From: Ricardo Decal Date: Mon, 2 Feb 2026 19:34:25 -0800 Subject: [PATCH 01/10] initial draft --- beginner_source/batch_inference_tutorial.py | 487 ++++++++++++++++++++ 1 file changed, 487 insertions(+) create mode 100644 beginner_source/batch_inference_tutorial.py diff --git a/beginner_source/batch_inference_tutorial.py b/beginner_source/batch_inference_tutorial.py new file mode 100644 index 0000000000..780ab8c13f --- /dev/null +++ b/beginner_source/batch_inference_tutorial.py @@ -0,0 +1,487 @@ +""" +Offline batch inference at scale with PyTorch and Ray Data +========================================================== + +**Author:** `Ricardo Decal `__ + +This tutorial shows how to run batch inference using a pretrained PyTorch model +with Ray Data for scalable, production-ready data processing. + +.. grid:: 2 + + .. grid-item-card:: :octicon:`mortar-board;1em;` What you will learn + :class-card: card-prerequisites + + * How to create a production-ready PyTorch offline batch inference pipeline. We will cover two use cases: batch predictions and batch embeddings. + * How to scale the pipeline from your laptop to a cluster with thousands of nodes and GPUs with no code changes. + * Configure resource allocation (CPU/GPU) per worker, including fractional GPU usage + * Measure and benchmark throughput for batch inference pipelines + * How Ray Data can self-heal from failures with built-in fault tolerance + * Monitor batch jobs with the Ray dashboard for real-time insights + + .. grid-item-card:: :octicon:`list-unordered;1em;` Prerequisites + :class-card: card-prerequisites + + * PyTorch v2.9+ and ``torchvision`` + * Ray Data (``ray[data]``) v2.52.1+ + * A GPU is recommended for higher throughput but is not required + +`Ray Data `__ is a +scalable framework for data processing in production. +It's built on top of `Ray `__, +which is a unified framework for scaling AI and Python applications that +simplifies the complexities of distributed computing. Ray is also open +source and part of the PyTorch Foundation. + + +Setup +----- + +To install the dependencies: + +""" + +# %%bash +# pip install "ray[data]" torch torchvision + +###################################################################### +# Start by importing the required libraries: + +import os +import time + +import numpy as np +from PIL import Image +import ray +import torch +from torchvision.models import efficientnet_v2_s, EfficientNet_V2_S_Weights + +###################################################################### +# Load the dataset with Ray Data +# ------------------------------ +# +# Ray Data can read images directly from cloud storage (S3, GCS) or local paths. +# Here we use a subset of the ImageNette dataset hosted on S3: + +s3_uri = "s3://anonymous@air-example-data-2/imagenette2/train/n01440764/" + +ds = ray.data.read_images(s3_uri, mode="RGB") +print(ds) + +###################################################################### +# Under the hood, ``read_images()`` reads the data **lazily** and distributes +# the work across all available nodes. This approach leverages every node's +# network bandwidth and starts processing immediately without waiting for +# the entire dataset to download. +# +# After loading, Ray divides the data into **blocks** and dispatches them to +# workers. This block-based architecture enables streaming execution: as soon +# as a block finishes one stage, it can move to the next without waiting for +# the entire dataset. + +###################################################################### +# Ray Data provides useful methods to explore your data without loading it all into memory. +# The ``schema()`` method shows the column names and data types: + +print(ds.schema()) + +###################################################################### +# The ``take_batch()`` method lets you grab a small sample to inspect: + +sample_batch = ds.take_batch(5) +print(f"Batch keys: {sample_batch.keys()}") +print(f"Image shape: {sample_batch['image'][0].shape}") + +###################################################################### +# Let's visualize one of the images: + +img = Image.fromarray(sample_batch["image"][0]) +img.show() + +###################################################################### +# Part 1: Batch Predictions +# ========================= +# +# Define the preprocessing function +# --------------------------------- +# +# First, we define a preprocessing function that transforms raw images into tensors. +# This function operates on individual rows and will be applied lazily via ``ds.map()``. + +weights = EfficientNet_V2_S_Weights.DEFAULT +preprocess = weights.transforms() + + +def preprocess_image(row: dict[str, np.ndarray]) -> dict[str, np.ndarray]: + """Transform a raw image into a tensor suitable for the model.""" + # Convert numpy array to PIL Image for torchvision transforms + pil_image = Image.fromarray(row["image"]) + # Apply the model's preprocessing transforms and convert back to numpy + tensor = preprocess(pil_image) + return { + "original_image": row["image"], + "transformed_image": tensor.numpy(), + } + + +###################################################################### +# Apply the preprocessing with ``ds.map()``. This operation is **lazy**—Ray +# Data only executes the transformation when downstream operations demand +# the results. Lazy execution allows Ray to optimize the entire pipeline +# before any work begins. +# +# ``ds.map()`` applies the transformation to each record in parallel across +# the cluster. Whenever possible, Ray avoids transferring objects across +# network connections to take advantage of **zero-copy reads**, avoiding +# serialization and deserialization overhead. + +ds = ds.map(preprocess_image) +print(ds.schema()) + +###################################################################### +# Define the model class for batch inference +# ------------------------------------------ +# +# For batch inference, we wrap our model in a class. By passing a class to +# ``map_batches()``, Ray creates **Actor** processes that recycle state between +# batches. The model loads once when the Actor starts and remains warm for all +# subsequent batches—avoiding repeated model initialization overhead. +# +# Separating preprocessing (CPU) from model inference (GPU) is a key pattern +# for high-throughput pipelines. This **decoupling** prevents GPUs from +# blocking on CPU work and allows you to scale each stage independently +# based on where your bottlenecks are. + +class EfficientNetClassifier: + """A callable class for batch image classification with EfficientNet.""" + + def __init__(self): + self.weights = EfficientNet_V2_S_Weights.DEFAULT + self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") + self.model = efficientnet_v2_s(weights=self.weights).to(self.device) + self.model.eval() + + def __call__(self, batch: dict[str, np.ndarray]) -> dict[str, np.ndarray]: + """Run inference on a batch of preprocessed images.""" + # Stack the preprocessed images into a batch tensor + images = torch.tensor(batch["transformed_image"], device=self.device) + + with torch.inference_mode(): + logits = self.model(images) + predictions = logits.argmax(dim=1).cpu().numpy() + + # Map class indices to human-readable labels + categories = self.weights.meta["categories"] + predicted_labels = np.array([categories[idx] for idx in predictions]) + + return { + "predicted_label": predicted_labels, + "original_image": batch["original_image"], + } + + +###################################################################### +# Configure resource allocation and scaling +# ----------------------------------------- +# +# Ray Data allows you to specify **resource allocation** per worker, such as the +# number of CPUs or GPUs. Ray handles the orchestration of these resources across +# your cluster, automatically placing workers on nodes with available capacity. +# This **heterogeneous compute** support lets you mix different node types +# (CPU-only machines, different GPU models) in the same cluster, and Ray +# schedules work appropriately. +# +# Ray also supports `fractional +# GPUs `__, +# allowing multiple workers to share a single GPU when models are small +# enough to fit in memory together. +# +# For example, on a cluster of 10 machines with 4 GPUs each, setting +# ``num_gpus=0.5`` would schedule 2 workers per GPU, giving you 80 workers +# across the cluster. The same code that runs on your laptop with a single GPU +# scales to this multi-node setup with only a configuration change—no code +# modifications required. +# +# Run batch inference with map_batches +# ------------------------------------ +# +# The ``map_batches()`` method applies our model to batches of data in parallel. +# Key parameters: +# +# - ``compute``: Use ``ActorPoolStrategy`` for GPU inference to maintain persistent workers +# - ``num_gpus``: GPUs per model replica (set to 0 for CPU-only) +# - ``num_cpus``: CPUs per worker (useful for CPU-intensive preprocessing) +# - ``batch_size``: Number of images per batch (tune based on GPU memory) +# +# The ``num_gpus`` parameter tells Ray to place each replica on a node with an +# available GPU. If a worker fails, Ray automatically restarts the task on +# another node with the required resources. + +num_gpus_per_worker = 1 # Set to 0 for CPU-only +num_cpus_per_worker = 1 +num_workers = 2 # Number of parallel workers + +ds = ds.map_batches( + EfficientNetClassifier, + num_gpus=num_gpus_per_worker, + num_cpus=num_cpus_per_worker, + batch_size=16, # Adjust based on available GPU memory +) + +###################################################################### +# Inspect the predictions: + +prediction_batch = ds.take_batch(5) + +for image, label in zip(prediction_batch["original_image"], prediction_batch["predicted_label"]): + img = Image.fromarray(image) + img.show() + print(f"Prediction: {label}") + + + +# Get the total number of images in the dataset +num_images = ds.count() +print(f"Total images in dataset: {num_images}") + + +###################################################################### +# Save predictions to disk +# ------------------------ +# +# Write results to Parquet format for downstream processing. The +# ``write_parquet()`` call triggers execution of the pipeline and streams +# results to disk as they become available. +# +# Ray Data automatically shards the output into multiple files for efficient +# parallel reads in downstream steps. For distributed workloads writing to +# shared storage (S3, GCS, NFS), all workers write in parallel. + + +# Write predictions to parquet to trigger execution +output_dir = os.path.join(os.getcwd(), "predictions") +os.makedirs(output_dir, exist_ok=True) + +# Drop original images now that we've inspected them +ds = ds.drop_columns(["original_image"]) +# Write predictions to parquet. This is a blocking operation that triggers the execution of the pipeline. +start_time = time.time() +# ds.write_parquet(f"local://{output_dir}") +ds.materialize() # FIXME + +###################################################################### +# Performance benchmarking +# ------------------------ +# +# Measuring throughput is important for understanding how your batch inference +# performs at scale. Ray Data provides built-in execution stats that show +# processing rates, resource utilization, and bottlenecks. +# +# Note that Ray Data uses **streaming execution**: blocks flow through the +# pipeline as soon as they're ready, rather than waiting for entire stages +# to complete. This means the first results appear quickly even on large +# datasets, and memory usage stays bounded since intermediate data doesn't +# accumulate. + +elapsed = time.time() - start_time + +print(f"Processed {num_images} images in {elapsed:.2f} seconds") +print(f"Throughput: {num_images/elapsed:.2f} images/second") + +# Display execution stats after write completes +print("\nExecution statistics:") +print(ds.stats()) + +# Clear ds for the next example +del ds + +###################################################################### +# The stats show important metrics like: +# +# - Wall time and CPU time per operation +# - Peak memory usage +# - Data throughput (MB/s) +# - Number of tasks and blocks processed +# +# This information helps identify bottlenecks and optimize your pipeline. +# For example, if preprocessing is slow, you might increase ``num_cpus`` or +# optimize your preprocessing function. + + + +###################################################################### +# Part 2: Batch Embeddings +# ======================== +# +# Embeddings are dense vector representations useful for similarity search, +# clustering, and downstream ML tasks. To extract embeddings, we modify the +# model to return the features before the final classification layer. +# +# Define the embedding model class +# -------------------------------- +# +# The key modification is replacing the classifier head with an Identity layer, +# so the model outputs the penultimate layer's features instead of class logits. + +class EfficientNetEmbedder: + """A callable class for extracting embeddings from EfficientNet.""" + + def __init__(self): + self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") + self.model = efficientnet_v2_s(weights=EfficientNet_V2_S_Weights.DEFAULT).to(self.device) + + # Replace the classifier head with Identity to get embeddings + # EfficientNet_v2_s has a classifier attribute: Sequential(Dropout, Linear) + # The Linear layer outputs 1000 classes from 1280-dim features + self.model.classifier = torch.nn.Identity() + + self.model.eval() + self.embedding_dim = 1280 # EfficientNet_v2_s feature dimension + + def __call__(self, batch: dict[str, np.ndarray]) -> dict[str, np.ndarray]: + """Extract embeddings from a batch of preprocessed images.""" + images = torch.tensor(batch["transformed_image"], device=self.device) + + with torch.inference_mode(): + embeddings = self.model(images).cpu().numpy() + + return { + "embedding": embeddings, + } + + +###################################################################### +# Run batch embedding extraction: + +ds = ray.data.read_images(s3_uri, mode="RGB") +ds = ds.map(preprocess_image) +ds = ds.drop_columns(["original_image"]) +ds = ds.map_batches( + EfficientNetEmbedder, + num_gpus=1, + batch_size=16, +) + +###################################################################### +# Inspect the embeddings: + +embedding_batch = ds.take_batch(3) +print(f"Embedding shape: {embedding_batch['embedding'].shape}") +print(f"First embedding (truncated): {embedding_batch['embedding'][0][:10]}...") + +###################################################################### +# Save embeddings to disk: + +embeddings_output_dir = os.path.join(os.getcwd(), "embeddings") +os.makedirs(embeddings_output_dir, exist_ok=True) +ds.materialize() # FIXME +# ds.write_parquet(f"local://{embeddings_output_dir}") +print(f"Embeddings saved to: {embeddings_output_dir}") + +# Collect execution stats after write +print("\nExecution statistics for embeddings:") +print(ds.stats()) + + +###################################################################### +# Fault Tolerance +# --------------- +# +# In production, process and machine failures are inevitable during long-running +# batch jobs. Ray Data is designed to handle failures gracefully and continue +# processing without losing progress. +# +# Ray Data provides several fault tolerance mechanisms: +# +# - **Task retry**: If a task fails (e.g., due to an out-of-memory error or +# network issue), Ray automatically retries it on another worker. +# - **Actor reconstruction**: If a worker actor crashes, Ray creates a new +# actor and reassigns pending tasks to it. +# - **Lineage-based recovery**: Ray tracks the lineage of data transformations, +# so if a node fails, only the lost partitions need to be recomputed rather +# than restarting the entire job. +# +# Ray Data can recover from larger infrastructure failures, such as entire nodes +# failing. For very large batch jobs, you can enable checkpointing to save +# intermediate results and resume from the last checkpoint if the job fails. +# +# For more information about Ray Data's fault tolerance, see the +# `Ray Data fault tolerance guide `__. + +###################################################################### +# Monitor your batch jobs +# ----------------------- +# +# Monitoring is critical when running large-scale batch inference. The `Ray +# dashboard `__ +# displays Ray Data metrics like processing throughput, task status, and error +# rates. It also shows cluster resource usage (CPU, GPU, memory) and overall +# job health in real time. +# +# To access the dashboard: +# +# 1. Start Ray with ``ray start --head`` (if running on a cluster) +# 2. Open your browser to ``http://localhost:8265`` (default port) +# 3. Navigate to the "Jobs" tab to see your Ray Data job +# 4. Click on the job to see detailed metrics and task execution timeline +# +# The dashboard lets you: +# +# - Monitor progress of your batch job in real time +# - Inspect logs from individual workers across the cluster +# - Identify bottlenecks in your data pipeline +# - View resource utilization (CPU, GPU, memory) per worker +# - Debug failures with detailed error messages and stack traces +# +# For debugging, Ray offers `distributed debugging +# tools `__ +# that let you attach a debugger to running workers across the cluster. +# For more information, see the `Ray Data monitoring +# documentation `__. + +###################################################################### +# Conclusion +# ---------- +# +# In this tutorial, you learned how to: +# +# - Load image data with Ray Data from cloud storage using **distributed +# ingestion** that leverages all nodes' network bandwidth +# - Explore datasets using ``schema()`` and ``take_batch()`` +# - Separate CPU preprocessing from GPU inference to **maximize hardware +# utilization** and enable independent scaling of each stage +# - Configure **resource allocation** and **fractional GPU usage** to +# efficiently scale across heterogeneous clusters +# - Run scalable batch predictions with a pretrained EfficientNet model +# - Extract embeddings by modifying the model's classification head +# - Measure and benchmark throughput for batch inference pipelines +# - Understand Ray Data's **fault tolerance** mechanisms +# - Monitor batch jobs using the Ray dashboard +# +# The key advantage of Ray Data is that **the same code runs everywhere**: +# from a laptop to a multi-node cluster with heterogeneous GPU types. Ray +# handles parallelization, batching, resource management, and failure recovery +# automatically—you focus on your model and transformations while Ray handles +# the distributed systems complexity. +# +# Further Reading +# --------------- +# +# Ray Data has more production features that are out of scope for this +# tutorial but are worth checking out: +# +# - `Streaming batch inference `__ +# for processing datasets larger than cluster memory by streaming blocks through +# the pipeline with bounded memory usage. +# - `Advanced preprocessing `__ +# with vectorized operations and custom partitioning strategies for optimal performance. +# - `Integration with Ray Train `__ +# to build end-to-end training and inference pipelines. +# - `Checkpointing and resuming `__ +# for very large batch jobs that may span multiple hours or days. +# - `Custom data sources `__ +# to read from databases, APIs, or custom file formats. +# +# For more information, see the `Ray Data +# documentation `__ and +# `Ray Data examples `__. From 0990b568e5ea88fa80cc7dc17f532f4dff4f497c Mon Sep 17 00:00:00 2001 From: Ricardo Decal Date: Wed, 4 Feb 2026 10:32:38 -0800 Subject: [PATCH 02/10] 2nd pass --- beginner_source/batch_inference_tutorial.py | 205 +++++++++----------- 1 file changed, 91 insertions(+), 114 deletions(-) diff --git a/beginner_source/batch_inference_tutorial.py b/beginner_source/batch_inference_tutorial.py index 780ab8c13f..f247ec5bd9 100644 --- a/beginner_source/batch_inference_tutorial.py +++ b/beginner_source/batch_inference_tutorial.py @@ -12,12 +12,15 @@ .. grid-item-card:: :octicon:`mortar-board;1em;` What you will learn :class-card: card-prerequisites - * How to create a production-ready PyTorch offline batch inference pipeline. We will cover two use cases: batch predictions and batch embeddings. - * How to scale the pipeline from your laptop to a cluster with thousands of nodes and GPUs with no code changes. - * Configure resource allocation (CPU/GPU) per worker, including fractional GPU usage - * Measure and benchmark throughput for batch inference pipelines - * How Ray Data can self-heal from failures with built-in fault tolerance - * Monitor batch jobs with the Ray dashboard for real-time insights + * How to create a production-ready PyTorch offline batch inference pipeline. + We will cover two use cases: batch predictions and batch embeddings. + * How to scale the pipeline from your laptop to a cluster with thousands of nodes + and GPUs with no code changes. + * How Ray Data can process data that is much larger than the cluster's shared memory. + * How to configure resource allocation (CPU/GPU) and fractional resources. + * How to measure and benchmark throughput for batch inference pipelines + * How Ray Data can self-heal from failures with built-in fault tolerance. + * How to monitor batch jobs with the Ray dashboard for real-time insights. .. grid-item-card:: :octicon:`list-unordered;1em;` Prerequisites :class-card: card-prerequisites @@ -28,10 +31,10 @@ `Ray Data `__ is a scalable framework for data processing in production. -It's built on top of `Ray `__, -which is a unified framework for scaling AI and Python applications that -simplifies the complexities of distributed computing. Ray is also open -source and part of the PyTorch Foundation. +It's built on top of `Ray `__, a +unified framework for scaling AI and Python applications that +simplifies the complexities of distributed computing. Ray is also open-source +and part of the PyTorch Foundation. Setup @@ -48,7 +51,6 @@ # Start by importing the required libraries: import os -import time import numpy as np from PIL import Image @@ -69,15 +71,15 @@ print(ds) ###################################################################### -# Under the hood, ``read_images()`` reads the data **lazily** and distributes -# the work across all available nodes. This approach leverages every node's -# network bandwidth and starts processing immediately without waiting for -# the entire dataset to download. +# Under the hood, ``read_images()`` spreads the downloads across all available +# nodes, using all the network bandwidth available to the cluster. # -# After loading, Ray divides the data into **blocks** and dispatches them to -# workers. This block-based architecture enables streaming execution: as soon -# as a block finishes one stage, it can move to the next without waiting for -# the entire dataset. +# Ray divides the data into **blocks** and dispatches them to +# workers. This block-based architecture enables **streaming execution**: as soon +# as a stage outputs a block, the next stage can begin processing immediately it without +# waiting for previous stages to process the entire dataset. This allows you to utilize +# all your cluster's resources and evict intermediate data from the cluster's shared memory +# as soon as it's no longer needed, making room for more data to be processed. ###################################################################### # Ray Data provides useful methods to explore your data without loading it all into memory. @@ -86,7 +88,7 @@ print(ds.schema()) ###################################################################### -# The ``take_batch()`` method lets you grab a small sample to inspect: +# The ``take_batch()`` method lets you copy a small sample for inspection: sample_batch = ds.take_batch(5) print(f"Batch keys: {sample_batch.keys()}") @@ -105,8 +107,9 @@ # Define the preprocessing function # --------------------------------- # -# First, we define a preprocessing function that transforms raw images into tensors. -# This function operates on individual rows and will be applied lazily via ``ds.map()``. +# First, we define a preprocessing function that transforms raw images into preprocessed tensors. +# We will use the same preprocessing function that the model used during training. In this case, +# the EfficientNet preprocessing function includes resizing, normalization, and conversion to tensor. weights = EfficientNet_V2_S_Weights.DEFAULT preprocess = weights.transforms() @@ -116,8 +119,9 @@ def preprocess_image(row: dict[str, np.ndarray]) -> dict[str, np.ndarray]: """Transform a raw image into a tensor suitable for the model.""" # Convert numpy array to PIL Image for torchvision transforms pil_image = Image.fromarray(row["image"]) - # Apply the model's preprocessing transforms and convert back to numpy + # Apply the model's preprocessing transforms tensor = preprocess(pil_image) + # Convert the tensor back to ndarray (a zero-copy operation since the tensor is on CPU). return { "original_image": row["image"], "transformed_image": tensor.numpy(), @@ -125,15 +129,11 @@ def preprocess_image(row: dict[str, np.ndarray]) -> dict[str, np.ndarray]: ###################################################################### -# Apply the preprocessing with ``ds.map()``. This operation is **lazy**—Ray -# Data only executes the transformation when downstream operations demand -# the results. Lazy execution allows Ray to optimize the entire pipeline +# Apply the preprocessing function with ``ds.map()``. This operation is **lazy**, +# meaning that Ray Data will not begin this stage until a non-lazy operation +# demands the results (in this case, when ``ds.write_parquet()`` is called). +# Lazy execution allows Ray to intelligently optimize the entire pipeline # before any work begins. -# -# ``ds.map()`` applies the transformation to each record in parallel across -# the cluster. Whenever possible, Ray avoids transferring objects across -# network connections to take advantage of **zero-copy reads**, avoiding -# serialization and deserialization overhead. ds = ds.map(preprocess_image) print(ds.schema()) @@ -145,21 +145,22 @@ def preprocess_image(row: dict[str, np.ndarray]) -> dict[str, np.ndarray]: # For batch inference, we wrap our model in a class. By passing a class to # ``map_batches()``, Ray creates **Actor** processes that recycle state between # batches. The model loads once when the Actor starts and remains warm for all -# subsequent batches—avoiding repeated model initialization overhead. +# subsequent batches, avoiding repeated model initialization overhead. # # Separating preprocessing (CPU) from model inference (GPU) is a key pattern -# for high-throughput pipelines. This **decoupling** prevents GPUs from -# blocking on CPU work and allows you to scale each stage independently -# based on where your bottlenecks are. - -class EfficientNetClassifier: - """A callable class for batch image classification with EfficientNet.""" +# for high-throughput pipelines. This decoupling prevents GPUs from +# blocking on CPU work and allows you to scale stages independently +# or eliminate bottlenecks. Ray takes care of moving the data to a node +# with the appropriate resources if the current node doesn't have the +# required resources. +class Classifier: def __init__(self): self.weights = EfficientNet_V2_S_Weights.DEFAULT self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") self.model = efficientnet_v2_s(weights=self.weights).to(self.device) self.model.eval() + self.categories = np.array(self.weights.meta["categories"]) def __call__(self, batch: dict[str, np.ndarray]) -> dict[str, np.ndarray]: """Run inference on a batch of preprocessed images.""" @@ -167,12 +168,12 @@ def __call__(self, batch: dict[str, np.ndarray]) -> dict[str, np.ndarray]: images = torch.tensor(batch["transformed_image"], device=self.device) with torch.inference_mode(): + # Process the whole batch at once logits = self.model(images) predictions = logits.argmax(dim=1).cpu().numpy() # Map class indices to human-readable labels - categories = self.weights.meta["categories"] - predicted_labels = np.array([categories[idx] for idx in predictions]) + predicted_labels = self.categories[predictions] return { "predicted_label": predicted_labels, @@ -186,50 +187,50 @@ def __call__(self, batch: dict[str, np.ndarray]) -> dict[str, np.ndarray]: # # Ray Data allows you to specify **resource allocation** per worker, such as the # number of CPUs or GPUs. Ray handles the orchestration of these resources across -# your cluster, automatically placing workers on nodes with available capacity. -# This **heterogeneous compute** support lets you mix different node types -# (CPU-only machines, different GPU models) in the same cluster, and Ray -# schedules work appropriately. +# your cluster, automatically placing workers on nodes with available capacity. This +# means that when you move your workload from your laptop to a large cluster, you +# don't need to change your code since Ray will automatically detect the resources +# available in the cluster and scale the workload accordingly. + +# This flexibility enables you to mix different node types into your cluster, such as +# different accelerators or CPU-only machines. This is useful for multi-modal workloads or +# when you want to optimize the hardware utilization of different stages of your pipeline. # # Ray also supports `fractional -# GPUs `__, +# resource allocation `__, # allowing multiple workers to share a single GPU when models are small # enough to fit in memory together. # # For example, on a cluster of 10 machines with 4 GPUs each, setting # ``num_gpus=0.5`` would schedule 2 workers per GPU, giving you 80 workers -# across the cluster. The same code that runs on your laptop with a single GPU -# scales to this multi-node setup with only a configuration change—no code -# modifications required. +# across the cluster. # # Run batch inference with map_batches # ------------------------------------ # # The ``map_batches()`` method applies our model to batches of data in parallel. -# Key parameters: -# -# - ``compute``: Use ``ActorPoolStrategy`` for GPU inference to maintain persistent workers -# - ``num_gpus``: GPUs per model replica (set to 0 for CPU-only) -# - ``num_cpus``: CPUs per worker (useful for CPU-intensive preprocessing) -# - ``batch_size``: Number of images per batch (tune based on GPU memory) +# This enables you to speed up stages of your pipeline that can benefit from vectorized operations, +# which GPUs are particularly good at parallelizing. # # The ``num_gpus`` parameter tells Ray to place each replica on a node with an # available GPU. If a worker fails, Ray automatically restarts the task on -# another node with the required resources. +# another node with the required resources. The ``batch_size`` parameter tells Ray how many +# images to process at each invocation of the actor. If you run into CUDA out of memory errors, +# you can try reducing the ``batch_size``, increasing the ``num_gpus`` per worker, or +# using a GPU with more memory. num_gpus_per_worker = 1 # Set to 0 for CPU-only num_cpus_per_worker = 1 -num_workers = 2 # Number of parallel workers ds = ds.map_batches( - EfficientNetClassifier, + Classifier, num_gpus=num_gpus_per_worker, num_cpus=num_cpus_per_worker, batch_size=16, # Adjust based on available GPU memory ) ###################################################################### -# Inspect the predictions: +# Inspect a few predictions: prediction_batch = ds.take_batch(5) @@ -246,17 +247,14 @@ def __call__(self, batch: dict[str, np.ndarray]) -> dict[str, np.ndarray]: ###################################################################### -# Save predictions to disk +# Run the pipeline and save the predictions to disk # ------------------------ # -# Write results to Parquet format for downstream processing. The -# ``write_parquet()`` call triggers execution of the pipeline and streams -# results to disk as they become available. -# -# Ray Data automatically shards the output into multiple files for efficient -# parallel reads in downstream steps. For distributed workloads writing to -# shared storage (S3, GCS, NFS), all workers write in parallel. - +# The ``write_parquet()`` method is a blocking operation that triggers the execution of the +# pipeline we defined above. As the pipeline streams results, the ``write_parquet()`` method +# writes them to shards. Sharding the results is desirable because afterwards you can read +# the shards in parallel. Writing to shared storage such as S3, GCS, or NFS is efficient because +# different workers can upload shards in parallel and utilizes your cluster's upload bandwidth. # Write predictions to parquet to trigger execution output_dir = os.path.join(os.getcwd(), "predictions") @@ -265,8 +263,8 @@ def __call__(self, batch: dict[str, np.ndarray]) -> dict[str, np.ndarray]: # Drop original images now that we've inspected them ds = ds.drop_columns(["original_image"]) # Write predictions to parquet. This is a blocking operation that triggers the execution of the pipeline. -start_time = time.time() # ds.write_parquet(f"local://{output_dir}") +# print(f"Wrote {len(os.listdir(output_dir))} shards to {output_dir}") ds.materialize() # FIXME ###################################################################### @@ -277,16 +275,6 @@ def __call__(self, batch: dict[str, np.ndarray]) -> dict[str, np.ndarray]: # performs at scale. Ray Data provides built-in execution stats that show # processing rates, resource utilization, and bottlenecks. # -# Note that Ray Data uses **streaming execution**: blocks flow through the -# pipeline as soon as they're ready, rather than waiting for entire stages -# to complete. This means the first results appear quickly even on large -# datasets, and memory usage stays bounded since intermediate data doesn't -# accumulate. - -elapsed = time.time() - start_time - -print(f"Processed {num_images} images in {elapsed:.2f} seconds") -print(f"Throughput: {num_images/elapsed:.2f} images/second") # Display execution stats after write completes print("\nExecution statistics:") @@ -323,41 +311,36 @@ def __call__(self, batch: dict[str, np.ndarray]) -> dict[str, np.ndarray]: # The key modification is replacing the classifier head with an Identity layer, # so the model outputs the penultimate layer's features instead of class logits. -class EfficientNetEmbedder: - """A callable class for extracting embeddings from EfficientNet.""" - +class Embedder: def __init__(self): self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") self.model = efficientnet_v2_s(weights=EfficientNet_V2_S_Weights.DEFAULT).to(self.device) # Replace the classifier head with Identity to get embeddings - # EfficientNet_v2_s has a classifier attribute: Sequential(Dropout, Linear) - # The Linear layer outputs 1000 classes from 1280-dim features self.model.classifier = torch.nn.Identity() - self.model.eval() - self.embedding_dim = 1280 # EfficientNet_v2_s feature dimension def __call__(self, batch: dict[str, np.ndarray]) -> dict[str, np.ndarray]: """Extract embeddings from a batch of preprocessed images.""" + # Stack the preprocessed images into a batch tensor images = torch.tensor(batch["transformed_image"], device=self.device) with torch.inference_mode(): - embeddings = self.model(images).cpu().numpy() - - return { - "embedding": embeddings, - } + # Process the whole batch at once + embeddings = self.model(images) + # Return the embeddings as a numpy array + return {"embedding": embeddings.cpu().numpy()} ###################################################################### # Run batch embedding extraction: ds = ray.data.read_images(s3_uri, mode="RGB") +# TODO: map batches version ds = ds.map(preprocess_image) ds = ds.drop_columns(["original_image"]) ds = ds.map_batches( - EfficientNetEmbedder, + Embedder, num_gpus=1, batch_size=16, ) @@ -393,20 +376,22 @@ def __call__(self, batch: dict[str, np.ndarray]) -> dict[str, np.ndarray]: # # Ray Data provides several fault tolerance mechanisms: # -# - **Task retry**: If a task fails (e.g., due to an out-of-memory error or -# network issue), Ray automatically retries it on another worker. -# - **Actor reconstruction**: If a worker actor crashes, Ray creates a new +# - **Backpressure**: Ray Data has multiple backpressure mechanisms to prevent a job from +# exhausting the cluster's shared memory. For instance, Ray Data can detect if a stage +# becomes a bottleneck, and throttle upstream stages to downstream to prevent queue buildup +# and exhausting memory. +# - **Disk spilling**: If the cluster's shared memory is exhaused, Ray Data will spill data +# from RAM to disk to prevent the job from failing due to out-of-memory errors. +# - **Task retry**: If a task fails (e.g., due to a network issue), Ray automatically +# retries. +# - **Actor reconstruction**: If an actor crashes, Ray creates a new # actor and reassigns pending tasks to it. -# - **Lineage-based recovery**: Ray tracks the lineage of data transformations, -# so if a node fails, only the lost partitions need to be recomputed rather +# - **Lineage-based recovery**: Ray Data tracks the lineage of data transformations, +# so if a node fails, will recompute the lost data rather than # than restarting the entire job. # # Ray Data can recover from larger infrastructure failures, such as entire nodes -# failing. For very large batch jobs, you can enable checkpointing to save -# intermediate results and resume from the last checkpoint if the job fails. -# -# For more information about Ray Data's fault tolerance, see the -# `Ray Data fault tolerance guide `__. +# failing. ###################################################################### # Monitor your batch jobs @@ -425,6 +410,8 @@ def __call__(self, batch: dict[str, np.ndarray]) -> dict[str, np.ndarray]: # 3. Navigate to the "Jobs" tab to see your Ray Data job # 4. Click on the job to see detailed metrics and task execution timeline # +# TODO: screenshots of the dashboard +# # The dashboard lets you: # # - Monitor progress of your batch job in real time @@ -470,18 +457,8 @@ def __call__(self, batch: dict[str, np.ndarray]) -> dict[str, np.ndarray]: # Ray Data has more production features that are out of scope for this # tutorial but are worth checking out: # -# - `Streaming batch inference `__ -# for processing datasets larger than cluster memory by streaming blocks through -# the pipeline with bounded memory usage. -# - `Advanced preprocessing `__ -# with vectorized operations and custom partitioning strategies for optimal performance. +# - `Custom aggregations `__ # - `Integration with Ray Train `__ # to build end-to-end training and inference pipelines. -# - `Checkpointing and resuming `__ -# for very large batch jobs that may span multiple hours or days. -# - `Custom data sources `__ -# to read from databases, APIs, or custom file formats. -# -# For more information, see the `Ray Data -# documentation `__ and -# `Ray Data examples `__. +# - `Reading and writing custom file types `__ +# From 0b60c82bf83fd939ef69250af1e42dd9dfbd6c40 Mon Sep 17 00:00:00 2001 From: Ricardo Decal Date: Wed, 4 Feb 2026 11:28:23 -0800 Subject: [PATCH 03/10] add stats example --- beginner_source/batch_inference_tutorial.py | 30 ++++++++++++++------- 1 file changed, 21 insertions(+), 9 deletions(-) diff --git a/beginner_source/batch_inference_tutorial.py b/beginner_source/batch_inference_tutorial.py index f247ec5bd9..2695ec18b9 100644 --- a/beginner_source/batch_inference_tutorial.py +++ b/beginner_source/batch_inference_tutorial.py @@ -272,11 +272,10 @@ def __call__(self, batch: dict[str, np.ndarray]) -> dict[str, np.ndarray]: # ------------------------ # # Measuring throughput is important for understanding how your batch inference -# performs at scale. Ray Data provides built-in execution stats that show -# processing rates, resource utilization, and bottlenecks. +# performs at scale. Ray Data provides fine-grained execution statistics for both +# the overall pipeline as well as invidivual operations with the ``stats()`` method. # -# Display execution stats after write completes print("\nExecution statistics:") print(ds.stats()) @@ -284,16 +283,29 @@ def __call__(self, batch: dict[str, np.ndarray]) -> dict[str, np.ndarray]: del ds ###################################################################### -# The stats show important metrics like: -# -# - Wall time and CPU time per operation +# For a single stage, the report looks like this: +# +# Operator 3 Map(preprocess_image)->MapBatches(drop_columns): 58 tasks executed, 58 blocks produced in 9.65s +# * Remote wall time: 369.14ms min, 1.85s max, 634.59ms mean, 36.81s total +# * Remote cpu time: 369.57ms min, 696.42ms max, 551.0ms mean, 31.96s total +# * UDF time: 733.07ms min, 3.69s max, 1.26s mean, 73.33s total +# * Peak heap memory usage (MiB): 720.84 min, 1478.72 max, 1129 mean +# * Output num rows per block: 44 min, 54 max, 48 mean, 2794 total +# * Output size bytes per block: 77857120 min, 95551920 max, 85240122 mean, 4943927120 total +# * Output rows per task: 44 min, 54 max, 48 mean, 58 tasks used +# * Tasks per node: 8 min, 40 max, 19 mean; 3 nodes used +# * Operator throughput: +# * Total input num rows: 3358 rows +# * Total output num rows: 2794 rows +# * Ray Data throughput: 289.43 rows/s +# * Estimated single task throughput: 75.91 rows/s +# +# - Wall time and CPU time per operator # - Peak memory usage -# - Data throughput (MB/s) +# - Data throughput (rows/s) # - Number of tasks and blocks processed # # This information helps identify bottlenecks and optimize your pipeline. -# For example, if preprocessing is slow, you might increase ``num_cpus`` or -# optimize your preprocessing function. From d95ffc90f9428185dacd0bc79cd4f6c5b68a17e4 Mon Sep 17 00:00:00 2001 From: Ricardo Decal Date: Wed, 4 Feb 2026 11:30:56 -0800 Subject: [PATCH 04/10] increase bs, increase dset size, consolidate section --- beginner_source/batch_inference_tutorial.py | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/beginner_source/batch_inference_tutorial.py b/beginner_source/batch_inference_tutorial.py index 2695ec18b9..8faf6fd02a 100644 --- a/beginner_source/batch_inference_tutorial.py +++ b/beginner_source/batch_inference_tutorial.py @@ -65,7 +65,7 @@ # Ray Data can read images directly from cloud storage (S3, GCS) or local paths. # Here we use a subset of the ImageNette dataset hosted on S3: -s3_uri = "s3://anonymous@air-example-data-2/imagenette2/train/n01440764/" +s3_uri = "s3://anonymous@air-example-data-2/imagenette2/train/" ds = ray.data.read_images(s3_uri, mode="RGB") print(ds) @@ -91,13 +91,9 @@ # The ``take_batch()`` method lets you copy a small sample for inspection: sample_batch = ds.take_batch(5) -print(f"Batch keys: {sample_batch.keys()}") -print(f"Image shape: {sample_batch['image'][0].shape}") - -###################################################################### -# Let's visualize one of the images: - -img = Image.fromarray(sample_batch["image"][0]) +first_img = sample_batch["image"][0] +print(f"Image shape: {first_img.shape}") +img = Image.fromarray(first_img) img.show() ###################################################################### @@ -226,7 +222,7 @@ def __call__(self, batch: dict[str, np.ndarray]) -> dict[str, np.ndarray]: Classifier, num_gpus=num_gpus_per_worker, num_cpus=num_cpus_per_worker, - batch_size=16, # Adjust based on available GPU memory + batch_size=128, # Adjust based on available GPU memory ) ###################################################################### From 371e64d695a4c5f20d199385f01f1f7c1c1fafb6 Mon Sep 17 00:00:00 2001 From: Ricardo Decal Date: Wed, 4 Feb 2026 12:13:10 -0800 Subject: [PATCH 05/10] lint --- beginner_source/batch_inference_tutorial.py | 133 ++++++++++---------- 1 file changed, 66 insertions(+), 67 deletions(-) diff --git a/beginner_source/batch_inference_tutorial.py b/beginner_source/batch_inference_tutorial.py index 8faf6fd02a..f44f4eee93 100644 --- a/beginner_source/batch_inference_tutorial.py +++ b/beginner_source/batch_inference_tutorial.py @@ -31,12 +31,11 @@ `Ray Data `__ is a scalable framework for data processing in production. -It's built on top of `Ray `__, a +It's built on top of `Ray `__, a unified framework for scaling AI and Python applications that simplifies the complexities of distributed computing. Ray is also open-source and part of the PyTorch Foundation. - Setup ----- @@ -44,10 +43,9 @@ """ -# %%bash # pip install "ray[data]" torch torchvision -###################################################################### +############################################################################### # Start by importing the required libraries: import os @@ -58,7 +56,7 @@ import torch from torchvision.models import efficientnet_v2_s, EfficientNet_V2_S_Weights -###################################################################### +############################################################################### # Load the dataset with Ray Data # ------------------------------ # @@ -70,24 +68,23 @@ ds = ray.data.read_images(s3_uri, mode="RGB") print(ds) -###################################################################### +############################################################################### # Under the hood, ``read_images()`` spreads the downloads across all available # nodes, using all the network bandwidth available to the cluster. # # Ray divides the data into **blocks** and dispatches them to # workers. This block-based architecture enables **streaming execution**: as soon -# as a stage outputs a block, the next stage can begin processing immediately it without -# waiting for previous stages to process the entire dataset. This allows you to utilize +# as a stage outputs a block, the next stage can begin processing immediately it without +# waiting for previous stages to process the entire dataset. This allows you to utilize # all your cluster's resources and evict intermediate data from the cluster's shared memory # as soon as it's no longer needed, making room for more data to be processed. - -###################################################################### +# # Ray Data provides useful methods to explore your data without loading it all into memory. # The ``schema()`` method shows the column names and data types: print(ds.schema()) -###################################################################### +############################################################################### # The ``take_batch()`` method lets you copy a small sample for inspection: sample_batch = ds.take_batch(5) @@ -96,7 +93,7 @@ img = Image.fromarray(first_img) img.show() -###################################################################### +############################################################################### # Part 1: Batch Predictions # ========================= # @@ -124,9 +121,10 @@ def preprocess_image(row: dict[str, np.ndarray]) -> dict[str, np.ndarray]: } -###################################################################### + +############################################################################### # Apply the preprocessing function with ``ds.map()``. This operation is **lazy**, -# meaning that Ray Data will not begin this stage until a non-lazy operation +# meaning that Ray Data will not begin this stage until a non-lazy operation # demands the results (in this case, when ``ds.write_parquet()`` is called). # Lazy execution allows Ray to intelligently optimize the entire pipeline # before any work begins. @@ -134,7 +132,8 @@ def preprocess_image(row: dict[str, np.ndarray]) -> dict[str, np.ndarray]: ds = ds.map(preprocess_image) print(ds.schema()) -###################################################################### + +############################################################################### # Define the model class for batch inference # ------------------------------------------ # @@ -147,7 +146,7 @@ def preprocess_image(row: dict[str, np.ndarray]) -> dict[str, np.ndarray]: # for high-throughput pipelines. This decoupling prevents GPUs from # blocking on CPU work and allows you to scale stages independently # or eliminate bottlenecks. Ray takes care of moving the data to a node -# with the appropriate resources if the current node doesn't have the +# with the appropriate resources if the current node doesn't have the # required resources. class Classifier: @@ -177,7 +176,8 @@ def __call__(self, batch: dict[str, np.ndarray]) -> dict[str, np.ndarray]: } -###################################################################### + +############################################################################### # Configure resource allocation and scaling # ----------------------------------------- # @@ -185,9 +185,9 @@ def __call__(self, batch: dict[str, np.ndarray]) -> dict[str, np.ndarray]: # number of CPUs or GPUs. Ray handles the orchestration of these resources across # your cluster, automatically placing workers on nodes with available capacity. This # means that when you move your workload from your laptop to a large cluster, you -# don't need to change your code since Ray will automatically detect the resources +# don't need to change your code since Ray will automatically detect the resources # available in the cluster and scale the workload accordingly. - +# # This flexibility enables you to mix different node types into your cluster, such as # different accelerators or CPU-only machines. This is useful for multi-modal workloads or # when you want to optimize the hardware utilization of different stages of your pipeline. @@ -200,7 +200,8 @@ def __call__(self, batch: dict[str, np.ndarray]) -> dict[str, np.ndarray]: # For example, on a cluster of 10 machines with 4 GPUs each, setting # ``num_gpus=0.5`` would schedule 2 workers per GPU, giving you 80 workers # across the cluster. -# + +############################################################################### # Run batch inference with map_batches # ------------------------------------ # @@ -225,7 +226,7 @@ def __call__(self, batch: dict[str, np.ndarray]) -> dict[str, np.ndarray]: batch_size=128, # Adjust based on available GPU memory ) -###################################################################### +############################################################################### # Inspect a few predictions: prediction_batch = ds.take_batch(5) @@ -242,11 +243,11 @@ def __call__(self, batch: dict[str, np.ndarray]) -> dict[str, np.ndarray]: print(f"Total images in dataset: {num_images}") -###################################################################### +############################################################################### # Run the pipeline and save the predictions to disk -# ------------------------ +# ------------------------------------------------- # -# The ``write_parquet()`` method is a blocking operation that triggers the execution of the +# The ``write_parquet()`` method is a blocking operation that triggers the execution of the # pipeline we defined above. As the pipeline streams results, the ``write_parquet()`` method # writes them to shards. Sharding the results is desirable because afterwards you can read # the shards in parallel. Writing to shared storage such as S3, GCS, or NFS is efficient because @@ -263,14 +264,13 @@ def __call__(self, batch: dict[str, np.ndarray]) -> dict[str, np.ndarray]: # print(f"Wrote {len(os.listdir(output_dir))} shards to {output_dir}") ds.materialize() # FIXME -###################################################################### +############################################################################### # Performance benchmarking # ------------------------ # # Measuring throughput is important for understanding how your batch inference # performs at scale. Ray Data provides fine-grained execution statistics for both # the overall pipeline as well as invidivual operations with the ``stats()`` method. -# print("\nExecution statistics:") print(ds.stats()) @@ -278,10 +278,13 @@ def __call__(self, batch: dict[str, np.ndarray]) -> dict[str, np.ndarray]: # Clear ds for the next example del ds -###################################################################### + +############################################################################### # For a single stage, the report looks like this: # +# ```text # Operator 3 Map(preprocess_image)->MapBatches(drop_columns): 58 tasks executed, 58 blocks produced in 9.65s +# # * Remote wall time: 369.14ms min, 1.85s max, 634.59ms mean, 36.81s total # * Remote cpu time: 369.57ms min, 696.42ms max, 551.0ms mean, 31.96s total # * UDF time: 733.07ms min, 3.69s max, 1.26s mean, 73.33s total @@ -295,24 +298,19 @@ def __call__(self, batch: dict[str, np.ndarray]) -> dict[str, np.ndarray]: # * Total output num rows: 2794 rows # * Ray Data throughput: 289.43 rows/s # * Estimated single task throughput: 75.91 rows/s -# -# - Wall time and CPU time per operator -# - Peak memory usage -# - Data throughput (rows/s) -# - Number of tasks and blocks processed +# ``` # # This information helps identify bottlenecks and optimize your pipeline. - - -###################################################################### +############################################################################### # Part 2: Batch Embeddings # ======================== # # Embeddings are dense vector representations useful for similarity search, # clustering, and downstream ML tasks. To extract embeddings, we modify the # model to return the features before the final classification layer. -# + +############################################################################### # Define the embedding model class # -------------------------------- # @@ -340,7 +338,8 @@ def __call__(self, batch: dict[str, np.ndarray]) -> dict[str, np.ndarray]: return {"embedding": embeddings.cpu().numpy()} -###################################################################### + +############################################################################### # Run batch embedding extraction: ds = ray.data.read_images(s3_uri, mode="RGB") @@ -353,14 +352,14 @@ def __call__(self, batch: dict[str, np.ndarray]) -> dict[str, np.ndarray]: batch_size=16, ) -###################################################################### +############################################################################### # Inspect the embeddings: embedding_batch = ds.take_batch(3) print(f"Embedding shape: {embedding_batch['embedding'].shape}") print(f"First embedding (truncated): {embedding_batch['embedding'][0][:10]}...") -###################################################################### +############################################################################### # Save embeddings to disk: embeddings_output_dir = os.path.join(os.getcwd(), "embeddings") @@ -374,7 +373,7 @@ def __call__(self, batch: dict[str, np.ndarray]) -> dict[str, np.ndarray]: print(ds.stats()) -###################################################################### +############################################################################### # Fault Tolerance # --------------- # @@ -384,24 +383,24 @@ def __call__(self, batch: dict[str, np.ndarray]) -> dict[str, np.ndarray]: # # Ray Data provides several fault tolerance mechanisms: # -# - **Backpressure**: Ray Data has multiple backpressure mechanisms to prevent a job from -# exhausting the cluster's shared memory. For instance, Ray Data can detect if a stage +# * **Backpressure**: Ray Data has multiple backpressure mechanisms to prevent a job from +# exhausting the cluster's shared memory. For instance, Ray Data can detect if a stage # becomes a bottleneck, and throttle upstream stages to downstream to prevent queue buildup # and exhausting memory. -# - **Disk spilling**: If the cluster's shared memory is exhaused, Ray Data will spill data +# * **Disk spilling**: If the cluster's shared memory is exhaused, Ray Data will spill data # from RAM to disk to prevent the job from failing due to out-of-memory errors. -# - **Task retry**: If a task fails (e.g., due to a network issue), Ray automatically +# * **Task retry**: If a task fails (e.g., due to a network issue), Ray automatically # retries. -# - **Actor reconstruction**: If an actor crashes, Ray creates a new +# * **Actor reconstruction**: If an actor crashes, Ray creates a new # actor and reassigns pending tasks to it. -# - **Lineage-based recovery**: Ray Data tracks the lineage of data transformations, +# * **Lineage-based recovery**: Ray Data tracks the lineage of data transformations, # so if a node fails, will recompute the lost data rather than # than restarting the entire job. # # Ray Data can recover from larger infrastructure failures, such as entire nodes # failing. -###################################################################### +############################################################################### # Monitor your batch jobs # ----------------------- # @@ -422,11 +421,11 @@ def __call__(self, batch: dict[str, np.ndarray]) -> dict[str, np.ndarray]: # # The dashboard lets you: # -# - Monitor progress of your batch job in real time -# - Inspect logs from individual workers across the cluster -# - Identify bottlenecks in your data pipeline -# - View resource utilization (CPU, GPU, memory) per worker -# - Debug failures with detailed error messages and stack traces +# * Monitor progress of your batch job in real time +# * Inspect logs from individual workers across the cluster +# * Identify bottlenecks in your data pipeline +# * View resource utilization (CPU, GPU, memory) per worker +# * Debug failures with detailed error messages and stack traces # # For debugging, Ray offers `distributed debugging # tools `__ @@ -434,39 +433,39 @@ def __call__(self, batch: dict[str, np.ndarray]) -> dict[str, np.ndarray]: # For more information, see the `Ray Data monitoring # documentation `__. -###################################################################### +############################################################################### # Conclusion # ---------- # # In this tutorial, you learned how to: # -# - Load image data with Ray Data from cloud storage using **distributed +# * Load image data with Ray Data from cloud storage using **distributed # ingestion** that leverages all nodes' network bandwidth -# - Explore datasets using ``schema()`` and ``take_batch()`` -# - Separate CPU preprocessing from GPU inference to **maximize hardware +# * Explore datasets using ``schema()`` and ``take_batch()`` +# * Separate CPU preprocessing from GPU inference to **maximize hardware # utilization** and enable independent scaling of each stage -# - Configure **resource allocation** and **fractional GPU usage** to +# * Configure **resource allocation** and **fractional GPU usage** to # efficiently scale across heterogeneous clusters -# - Run scalable batch predictions with a pretrained EfficientNet model -# - Extract embeddings by modifying the model's classification head -# - Measure and benchmark throughput for batch inference pipelines -# - Understand Ray Data's **fault tolerance** mechanisms -# - Monitor batch jobs using the Ray dashboard +# * Run scalable batch predictions with a pretrained EfficientNet model +# * Extract embeddings by modifying the model's classification head +# * Measure and benchmark throughput for batch inference pipelines +# * Understand Ray Data's **fault tolerance** mechanisms +# * Monitor batch jobs using the Ray dashboard # # The key advantage of Ray Data is that **the same code runs everywhere**: # from a laptop to a multi-node cluster with heterogeneous GPU types. Ray # handles parallelization, batching, resource management, and failure recovery # automatically—you focus on your model and transformations while Ray handles # the distributed systems complexity. -# + +############################################################################### # Further Reading # --------------- # # Ray Data has more production features that are out of scope for this # tutorial but are worth checking out: # -# - `Custom aggregations `__ -# - `Integration with Ray Train `__ +# * `Custom aggregations `__ +# * `Integration with Ray Train `__ # to build end-to-end training and inference pipelines. -# - `Reading and writing custom file types `__ -# +# * `Reading and writing custom file types `__ From 83e49afc5051f3def063f43368fd8f5bc95991a6 Mon Sep 17 00:00:00 2001 From: Ricardo Decal Date: Wed, 4 Feb 2026 12:29:24 -0800 Subject: [PATCH 06/10] lint --- beginner_source/batch_inference_tutorial.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/beginner_source/batch_inference_tutorial.py b/beginner_source/batch_inference_tutorial.py index f44f4eee93..f8194c06a7 100644 --- a/beginner_source/batch_inference_tutorial.py +++ b/beginner_source/batch_inference_tutorial.py @@ -188,7 +188,7 @@ def __call__(self, batch: dict[str, np.ndarray]) -> dict[str, np.ndarray]: # don't need to change your code since Ray will automatically detect the resources # available in the cluster and scale the workload accordingly. # -# This flexibility enables you to mix different node types into your cluster, such as +# This flexibility enables you to mix different node types into your cluster, such as # different accelerators or CPU-only machines. This is useful for multi-modal workloads or # when you want to optimize the hardware utilization of different stages of your pipeline. # @@ -211,7 +211,7 @@ def __call__(self, batch: dict[str, np.ndarray]) -> dict[str, np.ndarray]: # # The ``num_gpus`` parameter tells Ray to place each replica on a node with an # available GPU. If a worker fails, Ray automatically restarts the task on -# another node with the required resources. The ``batch_size`` parameter tells Ray how many +# another node with the required resources. The ``batch_size`` parameter tells Ray how many # images to process at each invocation of the actor. If you run into CUDA out of memory errors, # you can try reducing the ``batch_size``, increasing the ``num_gpus`` per worker, or # using a GPU with more memory. From 6c48115688d4be5a3ae489cba3ec6aa53544bf69 Mon Sep 17 00:00:00 2001 From: Ricardo Decal Date: Wed, 4 Feb 2026 13:56:12 -0800 Subject: [PATCH 07/10] another pass --- beginner_source/batch_inference_tutorial.py | 191 ++++++++++---------- 1 file changed, 92 insertions(+), 99 deletions(-) diff --git a/beginner_source/batch_inference_tutorial.py b/beginner_source/batch_inference_tutorial.py index f8194c06a7..92112fbd3a 100644 --- a/beginner_source/batch_inference_tutorial.py +++ b/beginner_source/batch_inference_tutorial.py @@ -9,42 +9,39 @@ .. grid:: 2 - .. grid-item-card:: :octicon:`mortar-board;1em;` What you will learn + .. grid-item-card:: :octicon:`mortar-board;1em;` You will learn how to: :class-card: card-prerequisites - * How to create a production-ready PyTorch offline batch inference pipeline. - We will cover two use cases: batch predictions and batch embeddings. - * How to scale the pipeline from your laptop to a cluster with thousands of nodes + * Create a production-ready PyTorch offline batch inference pipeline. + * Scale the pipeline from your laptop to a cluster with thousands of nodes and GPUs with no code changes. - * How Ray Data can process data that is much larger than the cluster's shared memory. - * How to configure resource allocation (CPU/GPU) and fractional resources. - * How to measure and benchmark throughput for batch inference pipelines - * How Ray Data can self-heal from failures with built-in fault tolerance. - * How to monitor batch jobs with the Ray dashboard for real-time insights. + * Use Ray Data to process data that is much larger than the cluster's shared memory. + * Configure resource allocation (CPU/GPU) and fractional resources. + * Measure and benchmark throughput for batch inference pipelines. + * Use Ray Data fault tolerance to self-heal from failures. + * Monitor batch jobs with the Ray dashboard for real-time insights. .. grid-item-card:: :octicon:`list-unordered;1em;` Prerequisites :class-card: card-prerequisites * PyTorch v2.9+ and ``torchvision`` * Ray Data (``ray[data]``) v2.52.1+ - * A GPU is recommended for higher throughput but is not required + * A GPU is recommended for higher throughput but is not required. `Ray Data `__ is a scalable framework for data processing in production. It's built on top of `Ray `__, a unified framework for scaling AI and Python applications that -simplifies the complexities of distributed computing. Ray is also open-source +simplifies the complexities of distributed computing. Ray is also open source and part of the PyTorch Foundation. Setup ----- -To install the dependencies: +To install the dependencies, run `pip install "ray[data]" torch torchvision`. """ -# pip install "ray[data]" torch torchvision - ############################################################################### # Start by importing the required libraries: @@ -60,8 +57,7 @@ # Load the dataset with Ray Data # ------------------------------ # -# Ray Data can read images directly from cloud storage (S3, GCS) or local paths. -# Here we use a subset of the ImageNette dataset hosted on S3: +# Ray Data can read image files directly from cloud storage such as Amazon S3 and Google Cloud Platform (GCP) Storage, or from local paths. This tutorial uses a subset of the ImageNette dataset stored on S3: s3_uri = "s3://anonymous@air-example-data-2/imagenette2/train/" @@ -69,18 +65,17 @@ print(ds) ############################################################################### -# Under the hood, ``read_images()`` spreads the downloads across all available +# Behind the scenes, ``read_images()`` spreads the downloads across all available # nodes, using all the network bandwidth available to the cluster. # # Ray divides the data into **blocks** and dispatches them to # workers. This block-based architecture enables **streaming execution**: as soon -# as a stage outputs a block, the next stage can begin processing immediately it without -# waiting for previous stages to process the entire dataset. This allows you to utilize -# all your cluster's resources and evict intermediate data from the cluster's shared memory -# as soon as it's no longer needed, making room for more data to be processed. +# as a stage outputs a block, the next stage can begin processing it immediately without +# waiting for previous stages to process the entire dataset. This is key to Ray Data's efficiency, +# because it prevents hardware from sitting idle +# or parking intermediate data in memory waiting to be processed. # -# Ray Data provides useful methods to explore your data without loading it all into memory. -# The ``schema()`` method shows the column names and data types: +# Ray Data provides useful methods to explore your data without loading it all into memory. For example, the ``schema()`` method shows the column names and data types: print(ds.schema()) @@ -88,20 +83,20 @@ # The ``take_batch()`` method lets you copy a small sample for inspection: sample_batch = ds.take_batch(5) -first_img = sample_batch["image"][0] -print(f"Image shape: {first_img.shape}") -img = Image.fromarray(first_img) -img.show() +first_img_array = sample_batch["image"][0] +print(f"Image shape: {first_img_array.shape}") +first_img = Image.fromarray(first_img_array) +first_img.show() ############################################################################### -# Part 1: Batch Predictions +# Part 1: Batch predictions # ========================= # # Define the preprocessing function # --------------------------------- # -# First, we define a preprocessing function that transforms raw images into preprocessed tensors. -# We will use the same preprocessing function that the model used during training. In this case, +# First, define a preprocessing function that transforms raw input image files into preprocessed tensors. +# Use the same preprocessing function that the model used during training. In this case, # the EfficientNet preprocessing function includes resizing, normalization, and conversion to tensor. weights = EfficientNet_V2_S_Weights.DEFAULT @@ -110,11 +105,11 @@ def preprocess_image(row: dict[str, np.ndarray]) -> dict[str, np.ndarray]: """Transform a raw image into a tensor suitable for the model.""" - # Convert numpy array to PIL Image for torchvision transforms + # Convert numpy array to a PIL image for torchvision transforms pil_image = Image.fromarray(row["image"]) # Apply the model's preprocessing transforms tensor = preprocess(pil_image) - # Convert the tensor back to ndarray (a zero-copy operation since the tensor is on CPU). + # Convert the tensor back to ndarray, a zero-copy operation since the tensor is on CPU. return { "original_image": row["image"], "transformed_image": tensor.numpy(), @@ -124,9 +119,9 @@ def preprocess_image(row: dict[str, np.ndarray]) -> dict[str, np.ndarray]: ############################################################################### # Apply the preprocessing function with ``ds.map()``. This operation is **lazy**, -# meaning that Ray Data will not begin this stage until a non-lazy operation -# demands the results (in this case, when ``ds.write_parquet()`` is called). -# Lazy execution allows Ray to intelligently optimize the entire pipeline +# meaning that Ray Data doesn't begin this stage until a non-lazy operation +# demands the results, such as when ``ds.write_parquet()`` runs. +# Lazy execution lets Ray intelligently optimize the entire pipeline # before any work begins. ds = ds.map(preprocess_image) @@ -137,14 +132,14 @@ def preprocess_image(row: dict[str, np.ndarray]) -> dict[str, np.ndarray]: # Define the model class for batch inference # ------------------------------------------ # -# For batch inference, we wrap our model in a class. By passing a class to +# For batch inference, wrap the model in a class. By passing a class to # ``map_batches()``, Ray creates **Actor** processes that recycle state between # batches. The model loads once when the Actor starts and remains warm for all # subsequent batches, avoiding repeated model initialization overhead. # # Separating preprocessing (CPU) from model inference (GPU) is a key pattern # for high-throughput pipelines. This decoupling prevents GPUs from -# blocking on CPU work and allows you to scale stages independently +# blocking on CPU work and lets you scale stages independently # or eliminate bottlenecks. Ray takes care of moving the data to a node # with the appropriate resources if the current node doesn't have the # required resources. @@ -181,39 +176,37 @@ def __call__(self, batch: dict[str, np.ndarray]) -> dict[str, np.ndarray]: # Configure resource allocation and scaling # ----------------------------------------- # -# Ray Data allows you to specify **resource allocation** per worker, such as the +# Ray Data lets you specify **resource allocation** per worker, such as the # number of CPUs or GPUs. Ray handles the orchestration of these resources across # your cluster, automatically placing workers on nodes with available capacity. This -# means that when you move your workload from your laptop to a large cluster, you -# don't need to change your code since Ray will automatically detect the resources -# available in the cluster and scale the workload accordingly. +# means that scaling a batch inference job from a laptop to a large cluster doesn't require code changes, since Ray automatically detects the resources available in the cluster and scales the job accordingly. # # This flexibility enables you to mix different node types into your cluster, such as -# different accelerators or CPU-only machines. This is useful for multi-modal workloads or -# when you want to optimize the hardware utilization of different stages of your pipeline. +# different accelerators or CPU-only machines. This is useful for multi-modal data pipelines or +# when you want to optimize the hardware use of different stages of your pipeline. # # Ray also supports `fractional # resource allocation `__, -# allowing multiple workers to share a single GPU when models are small +# letting multiple workers share a single GPU when models are small # enough to fit in memory together. # # For example, on a cluster of 10 machines with 4 GPUs each, setting -# ``num_gpus=0.5`` would schedule 2 workers per GPU, giving you 80 workers +# ``num_gpus=0.5`` schedules 2 workers per GPU, giving you 80 workers # across the cluster. ############################################################################### # Run batch inference with map_batches # ------------------------------------ # -# The ``map_batches()`` method applies our model to batches of data in parallel. -# This enables you to speed up stages of your pipeline that can benefit from vectorized operations, +# The ``map_batches()`` method applies the model to batches of data in parallel. +# This speeds up stages of your pipeline that can benefit from vectorized operations, # which GPUs are particularly good at parallelizing. # # The ``num_gpus`` parameter tells Ray to place each replica on a node with an # available GPU. If a worker fails, Ray automatically restarts the task on # another node with the required resources. The ``batch_size`` parameter tells Ray how many -# images to process at each invocation of the actor. If you run into CUDA out of memory errors, -# you can try reducing the ``batch_size``, increasing the ``num_gpus`` per worker, or +# images to process at each invocation of the actor. If you run into CUDA out-of-memory errors, +# try reducing the ``batch_size``, increasing the ``num_gpus`` per worker, or # using a GPU with more memory. num_gpus_per_worker = 1 # Set to 0 for CPU-only @@ -231,14 +224,14 @@ def __call__(self, batch: dict[str, np.ndarray]) -> dict[str, np.ndarray]: prediction_batch = ds.take_batch(5) -for image, label in zip(prediction_batch["original_image"], prediction_batch["predicted_label"]): - img = Image.fromarray(image) +for img_array, label in zip(prediction_batch["original_image"], prediction_batch["predicted_label"]): + img = Image.fromarray(img_array) img.show() print(f"Prediction: {label}") -# Get the total number of images in the dataset +# Get the total number of input images in the dataset num_images = ds.count() print(f"Total images in dataset: {num_images}") @@ -248,16 +241,16 @@ def __call__(self, batch: dict[str, np.ndarray]) -> dict[str, np.ndarray]: # ------------------------------------------------- # # The ``write_parquet()`` method is a blocking operation that triggers the execution of the -# pipeline we defined above. As the pipeline streams results, the ``write_parquet()`` method +# pipeline defined earlier. As the pipeline streams results, the ``write_parquet()`` method # writes them to shards. Sharding the results is desirable because afterwards you can read -# the shards in parallel. Writing to shared storage such as S3, GCS, or NFS is efficient because -# different workers can upload shards in parallel and utilizes your cluster's upload bandwidth. +# the shards in parallel. Writing to shared storage such as Amazon S3, Google Cloud Platform (GCP) Storage, or network file systems such as Network File System (NFS) is efficient because +# different workers can upload shards in parallel and use your cluster's upload bandwidth. # Write predictions to parquet to trigger execution output_dir = os.path.join(os.getcwd(), "predictions") os.makedirs(output_dir, exist_ok=True) -# Drop original images now that we've inspected them +# Drop original image data now that we've inspected it ds = ds.drop_columns(["original_image"]) # Write predictions to parquet. This is a blocking operation that triggers the execution of the pipeline. # ds.write_parquet(f"local://{output_dir}") @@ -270,7 +263,7 @@ def __call__(self, batch: dict[str, np.ndarray]) -> dict[str, np.ndarray]: # # Measuring throughput is important for understanding how your batch inference # performs at scale. Ray Data provides fine-grained execution statistics for both -# the overall pipeline as well as invidivual operations with the ``stats()`` method. +# the overall pipeline as well as individual operations with the ``stats()`` method. print("\nExecution statistics:") print(ds.stats()) @@ -303,19 +296,23 @@ def __call__(self, batch: dict[str, np.ndarray]) -> dict[str, np.ndarray]: # This information helps identify bottlenecks and optimize your pipeline. ############################################################################### -# Part 2: Batch Embeddings +# Part 2: Batch embeddings # ======================== # # Embeddings are dense vector representations useful for similarity search, -# clustering, and downstream ML tasks. To extract embeddings, we modify the +# clustering, and downstream ML tasks. To extract embeddings, modify the # model to return the features before the final classification layer. ############################################################################### # Define the embedding model class # -------------------------------- # -# The key modification is replacing the classifier head with an Identity layer, -# so the model outputs the penultimate layer's features instead of class logits. +# ML models can also be used to extract internal representations of the data. These representations, sometimes called embeddings, latent representations, +# or features, are a compressed representation of the data that distills the semantic meaning of the data into a lower-dimensional space. These +# representations are useful for similarity search, clustering, and other ML tasks. +# +# To extract the penultimate layer's features, replace the model's final classification head with an identity layer. This layer is +# essentially a no-op that passes the data through unchanged. class Embedder: def __init__(self): @@ -356,8 +353,8 @@ def __call__(self, batch: dict[str, np.ndarray]) -> dict[str, np.ndarray]: # Inspect the embeddings: embedding_batch = ds.take_batch(3) -print(f"Embedding shape: {embedding_batch['embedding'].shape}") -print(f"First embedding (truncated): {embedding_batch['embedding'][0][:10]}...") +print(f"Embedding batch shape: {embedding_batch['embedding'].shape}") +print(f"First embedding vector (truncated): {embedding_batch['embedding'][0][:10]}...") ############################################################################### # Save embeddings to disk: @@ -374,28 +371,28 @@ def __call__(self, batch: dict[str, np.ndarray]) -> dict[str, np.ndarray]: ############################################################################### -# Fault Tolerance +# Fault tolerance # --------------- # -# In production, process and machine failures are inevitable during long-running -# batch jobs. Ray Data is designed to handle failures gracefully and continue +# In production, machine failures are inevitable during long-running +# batch jobs. Ray Data handles failures gracefully and continues # processing without losing progress. # # Ray Data provides several fault tolerance mechanisms: # # * **Backpressure**: Ray Data has multiple backpressure mechanisms to prevent a job from -# exhausting the cluster's shared memory. For instance, Ray Data can detect if a stage -# becomes a bottleneck, and throttle upstream stages to downstream to prevent queue buildup -# and exhausting memory. -# * **Disk spilling**: If the cluster's shared memory is exhaused, Ray Data will spill data +# exhausting the cluster's shared memory. For example, Ray Data can detect if a stage +# becomes a bottleneck and throttle upstream stages to prevent queue buildup +# and memory exhaustion. +# * **Disk spilling**: If the cluster's shared memory runs out, Ray Data spills data # from RAM to disk to prevent the job from failing due to out-of-memory errors. -# * **Task retry**: If a task fails (e.g., due to a network issue), Ray automatically +# * **Task retry**: If a task fails (for example, due to a network issue), Ray automatically # retries. # * **Actor reconstruction**: If an actor crashes, Ray creates a new # actor and reassigns pending tasks to it. # * **Lineage-based recovery**: Ray Data tracks the lineage of data transformations, -# so if a node fails, will recompute the lost data rather than -# than restarting the entire job. +# so if a node fails, Ray recomputes the lost data rather than +# restarting the entire job. # # Ray Data can recover from larger infrastructure failures, such as entire nodes # failing. @@ -407,24 +404,20 @@ def __call__(self, batch: dict[str, np.ndarray]) -> dict[str, np.ndarray]: # Monitoring is critical when running large-scale batch inference. The `Ray # dashboard `__ # displays Ray Data metrics like processing throughput, task status, and error -# rates. It also shows cluster resource usage (CPU, GPU, memory) and overall -# job health in real time. -# -# To access the dashboard: +# rates. It also shows cluster resource usage for CPU, GPU, and memory and overall +# job health. # -# 1. Start Ray with ``ray start --head`` (if running on a cluster) -# 2. Open your browser to ``http://localhost:8265`` (default port) -# 3. Navigate to the "Jobs" tab to see your Ray Data job -# 4. Click on the job to see detailed metrics and task execution timeline +# To view the dashboard, open the link printed in the logs after Ray initializes, typically +# ``http://localhost:8265``. # -# TODO: screenshots of the dashboard +# TODO: Add screenshots of the dashboard. # # The dashboard lets you: # -# * Monitor progress of your batch job in real time +# * Monitor the progress of your batch job # * Inspect logs from individual workers across the cluster # * Identify bottlenecks in your data pipeline -# * View resource utilization (CPU, GPU, memory) per worker +# * View resource usage for CPU, GPU, and memory per worker # * Debug failures with detailed error messages and stack traces # # For debugging, Ray offers `distributed debugging @@ -437,20 +430,20 @@ def __call__(self, batch: dict[str, np.ndarray]) -> dict[str, np.ndarray]: # Conclusion # ---------- # -# In this tutorial, you learned how to: -# -# * Load image data with Ray Data from cloud storage using **distributed -# ingestion** that leverages all nodes' network bandwidth -# * Explore datasets using ``schema()`` and ``take_batch()`` -# * Separate CPU preprocessing from GPU inference to **maximize hardware -# utilization** and enable independent scaling of each stage -# * Configure **resource allocation** and **fractional GPU usage** to -# efficiently scale across heterogeneous clusters -# * Run scalable batch predictions with a pretrained EfficientNet model -# * Extract embeddings by modifying the model's classification head -# * Measure and benchmark throughput for batch inference pipelines -# * Understand Ray Data's **fault tolerance** mechanisms -# * Monitor batch jobs using the Ray dashboard +# In this tutorial, you: +# +# * Loaded image data with Ray Data from cloud storage using **distributed +# ingestion** that leverages all nodes' network bandwidth. +# * Explored datasets using ``schema()`` and ``take_batch()``. +# * Separated CPU preprocessing from GPU inference to independently scale +# each stage, eliminating bottlenecks and maximizing hardware use. +# * Configured **resource allocation** to +# efficiently scale across heterogeneous clusters. +# * Ran scalable batch predictions with a pretrained EfficientNet model. +# * Extracted embeddings by modifying the model's classification head. +# * Measured and benchmarked throughput for batch inference pipelines. +# * Learned about Ray Data's **fault tolerance** mechanisms. +# * Monitored batch jobs using the Ray dashboard. # # The key advantage of Ray Data is that **the same code runs everywhere**: # from a laptop to a multi-node cluster with heterogeneous GPU types. Ray @@ -459,7 +452,7 @@ def __call__(self, batch: dict[str, np.ndarray]) -> dict[str, np.ndarray]: # the distributed systems complexity. ############################################################################### -# Further Reading +# Further reading # --------------- # # Ray Data has more production features that are out of scope for this From 2cd1d0eddf440c1c860a775ae7552550dddbc156 Mon Sep 17 00:00:00 2001 From: Ricardo Decal Date: Wed, 4 Feb 2026 16:06:51 -0800 Subject: [PATCH 08/10] checkpoint --- beginner_source/batch_inference_tutorial.py | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/beginner_source/batch_inference_tutorial.py b/beginner_source/batch_inference_tutorial.py index 92112fbd3a..f0bc6bbdd2 100644 --- a/beginner_source/batch_inference_tutorial.py +++ b/beginner_source/batch_inference_tutorial.py @@ -19,7 +19,7 @@ * Configure resource allocation (CPU/GPU) and fractional resources. * Measure and benchmark throughput for batch inference pipelines. * Use Ray Data fault tolerance to self-heal from failures. - * Monitor batch jobs with the Ray dashboard for real-time insights. + * Monitor batch jobs with the Ray dashboard. .. grid-item-card:: :octicon:`list-unordered;1em;` Prerequisites :class-card: card-prerequisites @@ -30,7 +30,7 @@ `Ray Data `__ is a scalable framework for data processing in production. -It's built on top of `Ray `__, a +Ray Data is built on top of `Ray `__, a unified framework for scaling AI and Python applications that simplifies the complexities of distributed computing. Ray is also open source and part of the PyTorch Foundation. @@ -73,7 +73,7 @@ # as a stage outputs a block, the next stage can begin processing it immediately without # waiting for previous stages to process the entire dataset. This is key to Ray Data's efficiency, # because it prevents hardware from sitting idle -# or parking intermediate data in memory waiting to be processed. +# or parking intermediate data in memory waiting for processing. # # Ray Data provides useful methods to explore your data without loading it all into memory. For example, the ``schema()`` method shows the column names and data types: @@ -263,7 +263,7 @@ def __call__(self, batch: dict[str, np.ndarray]) -> dict[str, np.ndarray]: # # Measuring throughput is important for understanding how your batch inference # performs at scale. Ray Data provides fine-grained execution statistics for both -# the overall pipeline as well as individual operations with the ``stats()`` method. +# the overall pipeline and individual operations with the ``stats()`` method. print("\nExecution statistics:") print(ds.stats()) @@ -307,7 +307,7 @@ def __call__(self, batch: dict[str, np.ndarray]) -> dict[str, np.ndarray]: # Define the embedding model class # -------------------------------- # -# ML models can also be used to extract internal representations of the data. These representations, sometimes called embeddings, latent representations, +# ML models can also extract internal representations of the data. These representations, sometimes called embeddings, latent representations, # or features, are a compressed representation of the data that distills the semantic meaning of the data into a lower-dimensional space. These # representations are useful for similarity search, clustering, and other ML tasks. # @@ -407,7 +407,8 @@ def __call__(self, batch: dict[str, np.ndarray]) -> dict[str, np.ndarray]: # rates. It also shows cluster resource usage for CPU, GPU, and memory and overall # job health. # -# To view the dashboard, open the link printed in the logs after Ray initializes, typically +# To view the dashboard, open the link printed in the logs after Ray initializes. +# Typically, this link is # ``http://localhost:8265``. # # TODO: Add screenshots of the dashboard. @@ -445,11 +446,8 @@ def __call__(self, batch: dict[str, np.ndarray]) -> dict[str, np.ndarray]: # * Learned about Ray Data's **fault tolerance** mechanisms. # * Monitored batch jobs using the Ray dashboard. # -# The key advantage of Ray Data is that **the same code runs everywhere**: -# from a laptop to a multi-node cluster with heterogeneous GPU types. Ray -# handles parallelization, batching, resource management, and failure recovery -# automatically—you focus on your model and transformations while Ray handles -# the distributed systems complexity. +# Ray Data handles the complexity of distributed systems and resource allocation +# so that you can focus on defining your data pipeline. ############################################################################### # Further reading From 467af1463216de65c8d82c4df3016f3be993330f Mon Sep 17 00:00:00 2001 From: Ricardo Decal Date: Thu, 5 Feb 2026 11:18:39 -0800 Subject: [PATCH 09/10] edit pass --- beginner_source/batch_inference_tutorial.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/beginner_source/batch_inference_tutorial.py b/beginner_source/batch_inference_tutorial.py index f0bc6bbdd2..945b0924e9 100644 --- a/beginner_source/batch_inference_tutorial.py +++ b/beginner_source/batch_inference_tutorial.py @@ -7,13 +7,14 @@ This tutorial shows how to run batch inference using a pretrained PyTorch model with Ray Data for scalable, production-ready data processing. + .. grid:: 2 .. grid-item-card:: :octicon:`mortar-board;1em;` You will learn how to: :class-card: card-prerequisites * Create a production-ready PyTorch offline batch inference pipeline. - * Scale the pipeline from your laptop to a cluster with thousands of nodes + * Scale the pipeline from your laptop to a cluster with thousands of nodes and GPUs with no code changes. * Use Ray Data to process data that is much larger than the cluster's shared memory. * Configure resource allocation (CPU/GPU) and fractional resources. @@ -24,13 +25,13 @@ .. grid-item-card:: :octicon:`list-unordered;1em;` Prerequisites :class-card: card-prerequisites - * PyTorch v2.9+ and ``torchvision`` - * Ray Data (``ray[data]``) v2.52.1+ + * PyTorch v2.9+ and ``torchvision``. + * Ray Data (``ray[data]``) v2.52.1+. * A GPU is recommended for higher throughput but is not required. `Ray Data `__ is a scalable framework for data processing in production. -Ray Data is built on top of `Ray `__, a +Ray Data builds on top of `Ray `__, a unified framework for scaling AI and Python applications that simplifies the complexities of distributed computing. Ray is also open source and part of the PyTorch Foundation. @@ -38,7 +39,7 @@ Setup ----- -To install the dependencies, run `pip install "ray[data]" torch torchvision`. +To install the dependencies, run ``pip install "ray[data]" torch torchvision``. """ @@ -246,7 +247,7 @@ def __call__(self, batch: dict[str, np.ndarray]) -> dict[str, np.ndarray]: # the shards in parallel. Writing to shared storage such as Amazon S3, Google Cloud Platform (GCP) Storage, or network file systems such as Network File System (NFS) is efficient because # different workers can upload shards in parallel and use your cluster's upload bandwidth. -# Write predictions to parquet to trigger execution +# Write predictions to Parquet to trigger execution output_dir = os.path.join(os.getcwd(), "predictions") os.makedirs(output_dir, exist_ok=True) @@ -354,7 +355,7 @@ def __call__(self, batch: dict[str, np.ndarray]) -> dict[str, np.ndarray]: embedding_batch = ds.take_batch(3) print(f"Embedding batch shape: {embedding_batch['embedding'].shape}") -print(f"First embedding vector (truncated): {embedding_batch['embedding'][0][:10]}...") +print(f"First embedding vector: {embedding_batch['embedding'][0][:10]}...") ############################################################################### # Save embeddings to disk: @@ -411,7 +412,7 @@ def __call__(self, batch: dict[str, np.ndarray]) -> dict[str, np.ndarray]: # Typically, this link is # ``http://localhost:8265``. # -# TODO: Add screenshots of the dashboard. +# TODO: Add screenshots of the Ray dashboard. # # The dashboard lets you: # From a4ca298d0a283e689e3b95263cf406da1f9295ec Mon Sep 17 00:00:00 2001 From: Ricardo Decal Date: Thu, 5 Feb 2026 14:52:06 -0800 Subject: [PATCH 10/10] add tutorial to docs index; use GPU in CI; add ray data logo --- .jenkins/metadata.json | 5 +++++ _static/img/ray-data.png | Bin 0 -> 14798 bytes index.rst | 7 +++++++ 3 files changed, 12 insertions(+) create mode 100644 _static/img/ray-data.png diff --git a/.jenkins/metadata.json b/.jenkins/metadata.json index 86bda8fa1e..990bdb5738 100644 --- a/.jenkins/metadata.json +++ b/.jenkins/metadata.json @@ -13,6 +13,11 @@ "needs": "linux.g5.4xlarge.nvidia.gpu", "duration": 1800 }, + "beginner_source/batch_inference_tutorial.py": { + "needs": "linux.g5.4xlarge.nvidia.gpu", + "duration": 1200, + "_comment": "Requires GPU for batch inference with Ray Data and PyTorch" + }, "advanced_source/dynamic_quantization_tutorial.py": { "duration": 380 }, diff --git a/_static/img/ray-data.png b/_static/img/ray-data.png new file mode 100644 index 0000000000000000000000000000000000000000..415c2858daa2f4025a2cce8fe9edd865e12b4878 GIT binary patch literal 14798 zcmeHui8qw}`~N-jOlm5TDN1FHL6$5D*~(TjGfE3(D`bgC_H|N`Y)!~x8=^=lOQP&5 zj68)#36G^rkA2_w-*r#V=bYak@H@Zr{hsr=&*{X>d%Lddb-k|TeG4}x8*pys-;5v# z=ZWLGrxApMf*@GTCJy+d*GOUl{=>T+H@kr#TL|bs43d>b*uiD1z>A7=$eLQ$|+8%=na=*F^ z227{*ybXWe<$6nRRHV2sVgiSLb^J#v`X87?3lH`|pDVYCzkg7C2Zn)%y88#7{X=!+ z?@5TYz7LlD$w5(d_72FO+v)5NkiRzlcbWg{hK+^)Ke&0AzqqKiQqqI+3ZjjDN3h;q z*0lwL5)X3!<^;|(8Uf}2q-*mXE_YQB;z5h^~ z`h4X2dY5cyul4!uhs*8`tCWNxVrvWFu(9ZFh-vd#WaNk3`edGKi+9xvYb+4JXo z|88>v#B%-pbGqF-Laz|~Oukgo8K_jbBJR-QHP<%JJiFRC(l&12Ha9dhq)8=4r6rg2 zMXXYn7vq}}(xw!5eRkTV<*m6j=<>?Q;_8AsLH&H7evsCiO-CryTPr>5lfh$k$gkB} z?2aSG$c3}Pq0S+^E|)2N34=?8QGy}yu2+Jss#LE1u^<$1ZEV=9Tyu-%FZ+WjP2vmT zlV*8AV=HRk>#wW?cKU3qjP%C#Y9G?}?N1%=u3CB&J5w%z-C^N{G#7vJiT7E!Y!N&b ztZ8W#)mBorG5G1eh`!m&)2Us# ztp!|%j27o7mz>HqQ?cD?$8e#<7+`2Yqjh z`8j;_BYZCw4t)Y|j8OM~^aGycte#em#vXkrc)A#>X^^R0I>Vd{tRoeOuMM?iC;o3HL}G5NjMc^ z(u~VL2*_0id4=9~f=(_d2VZ4z@tp}VG4{Z|*_utHydUp-2Zu3355wX7^M}$$!7Qdk zGSyD*QBmy##t20X1ujDo?uc|GFr=dCP&CoH)c+s{z zyZ$g!QY|$K1a6e5QT6$3es==*W)oPTIscQ-YPWfTHRoHg>*6XK zNyrvB2pUkzLf*3cP6l%kOsjHBGDWsVwaWpkq6HKp1(r)E zB>U{@d*ahbkx-M-Z3zoUyTg&<9LvizZRr# z5V*3hs%nuC_81a-Of>h>N0G%!u>)#>0ka%^c`l+GEy|GWvm!C`Jrjx}aw1%!5>3ZJ za+CoFj?1KMFhr9NkV*&87b57n3_rYUgO@W7IFnJihAfv}9*icq|dgru~|+ zJyiayP4N(>oG;}LoWS7s3Aix#u5S%=$Qd|e*Y5AG(QPA!sGJMmJ63l;c)3+89Zxm% z!9L!P+Px5uF_bf8O?=B>+TK+MrRGnn{t_3Un<@B=X%a|CDuIqTvX$Q&@*(@^s|(U< zq%FTjH82A@;p+6Vrv1$BSe_IJF~`~JWqC>G`)_kRAFe(pDQ-8|Aw%}Y>=pYo&>;V} zAbqjPr9Q!hm5%S-dYh>2heASBt`0kHa5SQDN9tJGMUSMB=_bph#7Si1pYQs4RJRrg zl8}9H0uozSi=#?E!}a$cP}A@xuVfmDQoyP9X6XtnhYIVkfQ&wLb5iwNkzZ(F=~*8u z^RDdBOVM%(-emkUW_>zM3y*Aj9pxD$oO8!9^1U z0{eQN>#!DFbM>;h{-6dv+1(my&qW z5x2Sf;er?nu$-l4w9rnOf>ueaSJ+XXC2xi8qRxCU%@ zyFIB<6``&KBq<6 zo)(;abx^?Jq1Y61zuwfmVox4Fw)|=4)4JjFee34++sd^pU#ni)WZA#GN<4Z+Y)YkQ z;0z~CCz1RKGodaseq^!R;Q$1wmkrc4ccSQ~Hc`oTJOM8DX^`2&{JSXnt7o6L%cZ76bz?MV(j;sKs zQH!T-^}@$nh8&Uq{;Of|pWGNBH~Q2qeg0pA6x9YBe$!(=|7)o#$_{^|OF z!0oFXlCL+d``#yot49yLNu8T=7Tu^vr1lSj*UiErI5!d4Y7e?qTan@~ug#}5BQZL|l(vr&CRv zV*S#rsmHUzNc9{;WP)Jw##|(+)#uEms()<7SzlkF0Rzl~`!uKJ>{@y5u)O1Af6aw>M&Dz-Y{$uf zhcu!f4#|rhoMGk6$JuuHU-oE&8Hg|-L*Q%67N%rej3`CkTd;O8_Uq7Fpt-@03W4!Q z$w=>6u?M@-W$i`Clc0yGY+{01>zOSC9hGP32Kh<#0rre`fjA?pyJ>xy)+9sL^@pE3)D+~VQZyy8$Eku1JEXX- z3(V0yj`?6G_)(29&$PZE8eiCK9F)0GCY5-P++WUB84gU$&79T^-&@>wqsuHP+7UYt zHi7HqWN*OnN8Hk#Mdddtdf9>vt*x;xwOxa zCI4YJDPkbOw@#D3ys`IaR@3_%qepy|bOrRi_d%qa;ANj>r>PI(zk`hm1BvtTCZ4d*Nq*uh`VCpDPbHIgo2t z|L}0xN^`ACAh8Y_HM!ZZ&+Be$OLEu-5zj{)MU`X6^2Bx4{LMR$5@PcQiasd1W>pX& zqpUbUt3O;2Y2;5a%s&ju6ky9*aDT-;V9|ng?iQhII>AIFM(^HHlk*eG?r*khVpAJ; zh{)~yZ1054pJyTO2viM-ROuJxMAp}0xk64hoblkltZJt3{k4}Rj;JZ!sRx^<&CyaU z3dN}zcq>ijF0ar)o|KrmM#S5^$(?5+)<28X1SKIlSZ`Zm!;P;+#k6#nJ(# z-6CAX5^8rG!{%-yqiFsyt_e~kmF=K>kN#Lj<6pzS0JtJV?JSF9X8LrMw;N`}O&kJp zxNYtN8IBzF!MYto5mA3;VxI_+NO@?}(=cs2*zuw!oJQ2<%gTQPr!=o#F#6MR{6q_G zA^_A#fNWO#d?8&^e%cSyevfP-0pA7(_2(gD~ml6(aTso=8dW-}F-6J^+m_sQtprb< z1MS~!1QwY%Iw1{PD?(({<{L`*C4J*%f8?lI6}qf1pP6Z}P-Yo5i`1uanD*YQbyv@l5jqP!aA%fWV^u^zz;#Vgz3f}Nx;Eqs0FW>5Q#gc)>wN*{+=7~OT485k zzoDBf`zt4E?5`bdpLX51*r;8maOcBSyi6!Vhfu8utsq{ghpAp9i)9_ww56NuTIypt@=;~6KAZ1DNmiiSsw*;ICMyDZuXa|X^>RgC&mo49mh96 zQSxoH!z9e#D4IGcc{H{6KGBPZ9mMwvVW2Ta_>|Skpz3&wjU!Jju66nvFI3j z4x*u}sn3;4&tTrn8vEhGGJaoALW&?@U)|Dm2&1G6RuD$FJ)L#!y}`R!Ciorqu#E>_ z^_|E^Zh^Ajb+^qk>9nuSxMp71cmwKEsyZwi#}zyjC?B|e2Rb%f07rbS{K5h2a2xhz zpDj9vyCQ3Kd(#hAgCkhzgu3|Q4x+fqWm>Qtvj9~%*xL|w&fVU$NZ+>XM*|+G6X!NUom!NkE70j! z#TYUoA!hWAhcM~@*?bXHq<7ql3ts~G#irg|yvn9_eLC`jG2^TJp|suq3j#}ige8Sa z3WQP}&%Dnhxorl*eXiL~a}I8<8rVsl}Hv|5$4(5qjSEd=0V`K0QI%&!Mym$s=Qgv((t|A6!(lU z#XUVGiB2wp(c_&7N87R{=6ghAtVlKm=VaQ)uex-s>OCN6L|mW33EixvK-E&;p|5bo zLhzbYG|DmR=PJIh?F3Dmf!&jZRdTX~F5lfd^vdYrl{>ZI77tJ-5@06$a$)_vJ^aIv zMavpcYEUX|WDta&8>Lz4upT&a(Z#9fVJr_6g4$B;b9Cnr9Y7nUBBoG%`C3XZ_JI`5 zAd3(lFHZdce%XLpF>3v;d5XHKdANF^bo*+z$Grn4+&NeIj@xxGi^J7JkK1SI_FZDUx^7OqCFJ@4fP+dSY+ zRftkwOhy=D%%@q-qGbYhPS}H`S0S0YkJ|8@%bpn_6PhsdOUJs>22C@%cb<2d1)hql zYkIC$_)1qG@!iZ@qxIk(qwy^e?eC-Y0K@HXxb;n*$ zJPy1^ZiFU8?+mDS5voF6NHUB-Do#*n^^mhDS8exdJVO)`p3S0Rj6P-Loby)SVTUXd zi#CY+IlPrte`LCc_fWu$tf zI_4XsM>47O+)zg5as`py=ogTj(WHc{CrPQ1BF0N-VJ#Nb&mtdP+gDo@u708OS5gk% zTCvhfxdr;AuvW}%qAM+bKuOnMVZ$*t=-DD@BIJ=m}*=r>SF!@-fKo*a$ z?dHb$OTp)Peyt$)={URI!S;Cu-xe3 zH+5}mV@ZpUKV=HK1qf0-Eur?Jp4&H$Za_z#pB9M`r%*oII$;OA#=|Gjs}MRw0}@|q zowWp$5&Q9{Pj5yYcS*Emhpxbju_L~5gH^7ImBGp_!tAZkmX6f!t_>3bVm!NbriA&G zQ@3^L3s@*)00wZGG74F4j~|C4_Y5RO8@aI z>5NC-P#wolhpQ{x*<25WwJgPWGkYOCK7yu(8083LK3JT-?{F3=w zWJMK0R7|rnm|oJAwo>IO7e+f)Ni5@`oGh|IE~XL4G!F>v>-t(U<599W@}uF2d2Ro_ zq{hxbKJz;9muo972%?Q;K?#B^RnWoPK*?Gk@?^XqGUe*EDQ-1Nsffn zdBIt$5Ag7&mX3WVsZ4zskcAc`OF_3sAQ>qfxa^U@)>+Uq(Aj;+84pkYycyA;grc^l zq(z>D3`5lq(x#pE_-KDu{Uw`56`zX6zpIRm1k3%tiMz5R)t$IhY@LnOr@+}y9n zdrE>iuEY1Q_bn9gyp>Q}6*6gi3D|fG*f4uPE_`=zM~*mkC*+iH#27m%wXOqds|*wy zafIp|2w0o`sl2P61U1!Pj#$&*^s|z#E1RfxCgH)LQgke_ItL)~cAt)`R{a$a z==XhOQ2Ibz54W%RxK|kuGzg=Nfl0Z`qs6K5?6YV~sJ*_tH7<1WmnP~8{9^6}6?M+f{8$zt?$f{SJz)7RBbH`C*>&s5 z=t5zXdgK^3`Ybr1&&Ni_lZJnf!aAU4H*~xr&Il6=!WJk*(aY23{3bY5%qVo2s6IC`|rdMx3sU73u>Jid_kt0y7o9TGsU4M0?hr;BzDrtlzPlC+b>6`;cx z_;g*HZcbSn@7ach;NqGuhUT>!YcgJnjCq$WsF-%2T~T)bG{%9vxs5_VLua3_0lbRV3jKtp=N_Ao=2-o(sXjN)3a3Pk!1-8dJC#0oZU`N*ryo7@V!Z${+FHk0Ax_c zhGj5=Kut-?qbGq;+cgv9WS~o`5OvHY6m_An?I~>O{o}t$= zh?peY$dZ?D*PlQVVn?ShZ`uYORk&n;6=n=uas;gm1nksyZkSLOoBB>(KJuN}iaV$) z@M%PY?OJHLETOi~piQ}c)O-x;hnat@uTwvy6)_F7td%--;2Qrw*5jC0R zGF~>-r4z;6PkE?4dj9CisDz!As z41~aqkBkoX>Sl>k%NTvPj{kNVK5&|`9jTu$-!%u=fjaFreHI494LjO88L~wZu;c&Z z`dyy4-@V;AY=jQ!zMLpY$^_6L%3G+Z}}z>@;ewzYTv>^qBIALc+Ni6w8e ztf~?N1)A7Z_MD?V8;NCI5eOnXniq1G0f_u8*_k8YPoV-F+C43GSf@oEdH+Gp+C)aE zqD6lTUl2*?c=C3OkFfzgV25@SbUBwiR^;a#t8ek(>uGAf*kaUuXlCC5d zY)TzV zFrx=Uwk`ElW{c${q4A^vgcgc&?(-~e5!WMLC_JIU!>!V+^z$jq{dyx2=5B|%j~}=c z+_3{UgJGbqrg`*B3%{%3z$qjHA{wxdsJ=gj+6a>N(tFzhK?IQWL@ zT~f1Q#rbYmT17wK+~*^sp7OFT?Y%6gBWq=v_~?mOE{Jxe*e@B)SCVQi8OeUn;6mi% zytIQA-hmRRxh&1|W;4_}i@mJW`Sk^F?@nakyJ_5#lzX7rYOSemC&zPz%9u3`_oy+y z-~d6+miv=Uy`2oc%QZKYysL_BS4P_P1fZ^bl29`g>6%%3WRwdtn)72_PC*H?t8+vC zWv~RpQOd`*)7PDkGjVOJGRCEF9@lXm+i@n1<|+csXeg~gKd+r@PAI8hK_`vJ)g026 zQyxBXK!bMf{azgMv>%0I2qW+Zh zePZca&jW)1_0qahX8^Y9l3!+Ce%&Fs&e*k1wzKA;0qdvwV!aK!iw0+4TF zJaTSc9SUz1)JbRT_~qR8sIHn6W#Zg4g2dyI(~0E#Ygp5+HCZrZIBM>bRZ-=k4%7_7 z%5l6q&kltxhW1+)8Z}-yq4)#G;&na(4A$n$={B);De06 zDNo^iUJVAxVAL;sy#d|8-c6_mO{PqUX9>&X9jU#q&wYKpMLh?1uq2WG>O9(*6@v45 z$V}(NI^9){LoDq)_*g$?_OG-$v5~xy8+2FYRo3Og()3g?x3K6k*aORCr>NhA3x&K3 zpQgLYFx8gmDe@eR&diw*t#pqVvZmb2HSvWx^Q-5XS2^1H?9cAj`65jH%0JFbp0p#p z4N$kV;%IN3?-&=2NnZM^BJ_$N$RdEQQo*0%EK(xOf4^v8X4Z7mcaFix@scve&TsJ! z_UeK~&vZ4dmQ)`$Ehy%Oa?ctvaok)to4EO|IP+C2qVC}OI@Uj6QNn!>zm#7b!+ulY zQZ}Qun_ybyoJkzV5miY$KV z@174X7Hd@{A5ubnkS?TxLF(kLc{ZI0U%7yu$Dl=0%c870$M%SZ8T7J`UsYQzjWWN6 zzGi~BvPbu_4)--iINYLHB;3dwXuU1$4n4AO&!Nm|4snLqhbFZX{9sh;tz@Skldl}! z7Z>~tL{kALonod=TVKB^-ui6a@|5^g6!z0OkA6t+fBj@vhk0O1#N<-uoeh&QnsDhY zqqe*KD^;GFdxR3oc!JA^G;DVW?^ar4Et?+{(M_a22OqNJvpVf7(s_=zmSV_K#Jym6 z-D7}8(?FxcH$GV0xBMbbjfh4SY6J$H|ewKgAqNUuM*TB4a?2-d`RHyT5(Q&hrQ8V5ePG zajRYdzPm>^5asq9nKc#>)XSdRN{eW*$QNiw>y+lm$0VfWI=sY`eia4%#ZlWmd8ND7 z^onv4O^5_N;y0mAw6?a^fI8C1U%}yIaa}YIpNKl9R1Dm|lC(@({sq0Ble91 z^&iyB1owrB$7m+rD}8aw<8oBts0M%(mX-D993$K46RM}-GCB(#7-(XBI%cXZP<&k= zFjV|&LLc9cgqAGmNZcb^FqEfQ%S|5<-b3Bqo^2iC?$`Yaxu8o9{W_(SXn;t@#+KqL z=?XbDC&DWO_i`Fu@F=t``NKr};ppS@cxy+ose#cjf$ImupJ|ogmcdeW!py8X#AOihttR?u2C|-4RvM9q0Nrfj^6B+*#XH0Z zv?QipfxWFYvNGmv777yAmL`{;56)hhySg{)*L91t;Q>2R)lNqo?(6U3e%t;0_0@Sh zF&K|M61MELQW3fHJMN^PD(^L|h+;`K+EEP`DB&)6 zEFDBInC&`!OgQqIaT9R$e;-^+=ymDHa!E~r@=@2~zS$E+d0@kuvxfJ###=Oaxpw$ zO>*E=af?OIVax|C;Oqi8WxC0WaaG?lB7)20rT7=U3x_c&$t|&u;q{$QFv5^MBF!Ve zpLMDX!(Iw_)d|MkF*8R@>9 zOrC1!z}GWgR&vrBMbOYMv=dmnIj8jQit^Ga6avwtu8#IAbqUDlT83<4!*|@ou!!%Nl4FZHl@fQ|J;V^NZ4ItpI{gD5R9e3mK4L9GB z`n1q<)Dl`Q$q3o0G*20tHuv-3W}ASpH+NCT5uwMIK?whf$XskT!>YXIS+qTnbVC3t zlw`yZea*}C(C?uhyHbn{SvEa*vw6DhUfRa1_A~n>qu_oItR)KXu)anKHoLgR)$KHx z)vQ?P#fVqgFhagunr6syZ<|!ysC7tJ*1pRhSnV9@5gyOHUaUWB6Vd;Y!EF9P=I@s7 z?2dhgbCVLAk}r&S>pULzijBapfLDFLAb@)1ipd}yG16YX^owjy~q(9y{yQtJD z(avKJjv;G^s^Mw$;839a$}nW=51g|fT!Vzh6WN$Jmd5k;Z_=_%uR)f5t23vjeLJc- zS$%ZMl|5Y9q{uSPPs8Z{77>l3X#U^U#pAX=$h=bcB?%h0zXw zdzG(wZ6<|UTkVb@lj#%YRtB<%%euzYsP>6t2|RC|vjcf29z#GTva$FXHG-1!5lca` zKS`QP3$omOAb}sH8#iGNX$O`rcafm2%5KT4>_I{ER5)l8$&#a9g4V=6VZ?MSaz2?j zs&TWw(Z}ygC1FlpS5u4(*Ul+-(9<+?YmF%R?oJ8GAZY9o;q#~TE~1%=x(Q?^JVdZ4 znT+oLILK_J%V+f?ITkuOP2Blu(D+p@>$*m$j;&jM9Q~$KSN45TJ$8=v;yUY=3Z&j{ zr%~%}fA13rZy+hP8F-ne(Bb3_ZwR1DpE7XcEjP@e1PjH?b za(KyyjjaWo#yZz@f{Dwv^}fzp&eWBiuZwZNJJ_z$IT#zOZoipA%|yYZJCsl*9JH%p zc5TO>TWc;qfV8WwW>rM8N z@VYorK?cLCd8wMN?(G=$nthd{D=@3MDuKR|)x_=gre2(CB)ae9-rY+O-fp2<+4<*2 zu~|>Ir2SFoR`ogXP7bSJ_aA9dO=lMhCM%zcjsk{L{S$5WyI>cDqMgxf#-9M2Zt1Mk zQJHdOS_QAq-$PWPm@E*U%E>I#++zJwSCdR}-wf>Wa`;BS{E^|?L=U^^<>X8O{1))H@710?ORrRaA&CRMjN-=hmBio;fn4%F29yc?fdp$4}Jk`H|I^xmn9iCy}v~$`3XLxeq z1+F&-e^1Dl1!fHScLJ()^rYy6-h{~hX97yfe}c=vV+wCX<)g(PJo!Pen=pVK9-)l@ ze6RD{=l}h63;JuBy!yY(nXzx^VBD44?870*fB(*j-LD3f{P&Ny{-^(8oA2Rt(e+U*ofH(vNoQXD#6(REMgk#%1lvkv(`wx-*+ literal 0 HcmV?d00001 diff --git a/index.rst b/index.rst index a995d45ce7..393a1957b6 100644 --- a/index.rst +++ b/index.rst @@ -160,6 +160,13 @@ Welcome to PyTorch Tutorials :link: advanced/usb_semisup_learn.html :tags: Image/Video +.. customcarditem:: + :header: Offline Batch Inference + :card_description: Learn how to run production-ready batch inference using PyTorch and Ray Data. + :image: _static/img/ray-data.png + :link: beginner/batch_inference_tutorial.html + :tags: Image/Video,Model-Optimization,Best-Practice,Ray-Distributed,Parallel-and-Distributed-Training + .. Audio .. customcarditem::