Skip to content

[MEDIUM] Patch libarrow for CVE-2026-25087#16145

Open
durgajagadeesh wants to merge 3 commits intomicrosoft:3.0-devfrom
durgajagadeesh:topic_libarrow-3.0
Open

[MEDIUM] Patch libarrow for CVE-2026-25087#16145
durgajagadeesh wants to merge 3 commits intomicrosoft:3.0-devfrom
durgajagadeesh:topic_libarrow-3.0

Conversation

@durgajagadeesh
Copy link
Contributor

@durgajagadeesh durgajagadeesh commented Mar 8, 2026

Merge Checklist

All boxes should be checked before merging the PR (just tick any boxes which don't apply to this PR)

  • The toolchain has been rebuilt successfully (or no changes were made to it)
  • The toolchain/worker package manifests are up-to-date
  • Any updated packages successfully build (or no packages were changed)
  • Packages depending on static components modified in this PR (Golang, *-static subpackages, etc.) have had their Release tag incremented.
  • Package tests (%check section) have been verified with RUN_CHECK=y for existing SPEC files, or added to new SPEC files
  • All package sources are available
  • cgmanifest files are up-to-date and sorted (./cgmanifest.json, ./toolkit/scripts/toolchain/cgmanifest.json, .github/workflows/cgmanifest.json)
  • LICENSE-MAP files are up-to-date (./LICENSES-AND-NOTICES/SPECS/data/licenses.json, ./LICENSES-AND-NOTICES/SPECS/LICENSES-MAP.md, ./LICENSES-AND-NOTICES/SPECS/LICENSE-EXCEPTIONS.PHOTON)
  • All source files have up-to-date hashes in the *.signatures.json files
  • sudo make go-tidy-all and sudo make go-test-coverage pass
  • Documentation has been updated to match any changes to the build system
  • Ready to merge

Summary

What does the PR accomplish, why was it needed?
Patch libarrow for CVE-2026-25087

Astrolabe Patch Reference: apache/arrow#48925
Upstream patch reference: https://patch-diff.githubusercontent.com/raw/apache/arrow/pull/48925.patch
Patch Modified: Yes
reader.cc file has modified to align with upstream patch

Change Log
Does this affect the toolchain?

NO

Associated issues
  • #xxxx
Links to CVEs
Test Methodology
  • Pipeline build id: xxxx
  • Local VM build summary:
image -Patch applies cleanly: image

Installation:
image
image

Uninstallation:
image

@durgajagadeesh durgajagadeesh requested a review from a team as a code owner March 8, 2026 20:04
@microsoft-github-policy-service microsoft-github-policy-service bot added Packaging 3.0-dev PRs Destined for AzureLinux 3.0 labels Mar 8, 2026
@durgajagadeesh durgajagadeesh marked this pull request as draft March 8, 2026 20:05
@Kanishk-Bansal
Copy link
Contributor

Buddy Build

@durgajagadeesh
Copy link
Contributor Author

Buddy Build

Buddy build has License warning. I am working on to resolve this issue..
WARN: (libarrow-doc-15.0.0-8.azl3.noarch.rpm) has license warnings:
bad %doc files:
/usr/share/doc/libarrow-doc/NOTICE.txt

@durgajagadeesh durgajagadeesh marked this pull request as ready for review March 10, 2026 12:18
@Kanishk-Bansal
Copy link
Contributor

Buddy Build

@durgajagadeesh
Copy link
Contributor Author

durgajagadeesh commented Mar 11, 2026

Buddy Build

The buddy build has passed.
image

@durgajagadeesh durgajagadeesh force-pushed the topic_libarrow-3.0 branch 2 times, most recently from 0c425be to 43a2202 Compare March 11, 2026 09:59
@Kanishk-Bansal
Copy link
Contributor

Buddy Build

@microsoft microsoft deleted a comment from azurelinux-security Mar 12, 2026
@azurelinux-security
Copy link
Contributor

🔒 CVE Patch Review: CVE-2026-25087

PR #16145 — [MEDIUM] Patch libarrow for CVE-2026-25087
Package: libarrow | Branch: 3.0-dev


Spec File Validation

Check Status Detail
Release bump Release bumped 7 → 8
Patch entry Patch entries added: ['CVE-2026-25087.patch'] (covers ['CVE-2026-25087'])
Patch application %autosetup found in full spec — patches applied automatically
Changelog Changelog entry looks good
Signatures No source tarball changes — signatures N/A
Manifests Not a toolchain PR — manifests N/A

Build Verification

  • Build status: ❌ FAILED
  • Artifact downloaded:
  • CVE applied during build:
  • Errors (16):
    • L66: time="2026-03-11T19:41:39Z" level=debug msg="Error: Failed to synchronize cache for repo 'Azure Linux Official Base 3.0 x86_64'"
    • L80: time="2026-03-11T19:41:43Z" level=debug msg="Error: Failed to synchronize cache for repo 'Azure Linux Official Base 3.0 x86_64'"
    • L1420: time="2026-03-11T19:46:53Z" level=debug msg="warning: File not found: /usr/src/azl/BUILDROOT/libarrow-15.0.0-8.azl3.x86_64/usr/include/arrow-flight-glib"
    • L1421: time="2026-03-11T19:46:53Z" level=debug msg="warning: File not found: /usr/src/azl/BUILDROOT/libarrow-15.0.0-8.azl3.x86_64/usr/lib/cmake/Arrow/FindORC.cmake"
    • L1422: time="2026-03-11T19:46:53Z" level=debug msg="warning: File not found: /usr/src/azl/BUILDROOT/libarrow-15.0.0-8.azl3.x86_64/usr/lib/cmake/Arrow/FindorcAlt.cmake"
    • L1423: time="2026-03-11T19:46:53Z" level=debug msg="warning: File not found: /usr/src/azl/BUILDROOT/libarrow-15.0.0-8.azl3.x86_64/usr/lib/cmake/Arrow/FindgRPCAlt.cmake"
    • L1424: time="2026-03-11T19:46:53Z" level=debug msg="warning: File not found: /usr/src/azl/BUILDROOT/libarrow-15.0.0-8.azl3.x86_64/usr/lib/cmake/Arrow/FindzstdAlt.cmake"
    • L1425: time="2026-03-11T19:46:53Z" level=debug msg="warning: File not found: /usr/src/azl/BUILDROOT/libarrow-15.0.0-8.azl3.x86_64/usr/lib/cmake/Arrow/FindThriftAlt.cmake"
    • L1426: time="2026-03-11T19:46:53Z" level=debug msg="warning: File not found: /usr/src/azl/BUILDROOT/libarrow-15.0.0-8.azl3.x86_64/usr/lib/cmake/Arrow/FindProtobufAlt.cmake"
    • L1488: time="2026-03-11T19:46:55Z" level=debug msg=" File not found: /usr/src/azl/BUILDROOT/libarrow-15.0.0-8.azl3.x86_64/usr/include/arrow-flight-glib"
    • … and 6 more
  • Warnings (3):
    • L637: time="2026-03-11T19:41:47Z" level=debug msg="CMake Warning:"
    • L762: time="2026-03-11T19:42:29Z" level=debug msg="/usr/include/c++/13.2.0/bits/shared_ptr_base.h:1532:9: warning: '*(std::__shared_ptr<arrow::Buffer, __gnu_cxx::_S_atomic>*)((char*)&next + offsetof(arrow::TransformFlow<std::shared_ptr<arrow::Buffer> >,arrow::TransformFlow<std::shared_ptr<arrow::Buffer> >::yield_value_.std::optional<std::shared_ptr<arrow::Buffer> >::<unnamed>.std::_Optional_base<std::shared_ptr<arrow::Buffer>, false, false>::<unnamed>)).std::__shared_ptr<arrow::Buffer, __gnu_cxx::_S_atomic>::_M_ptr' may be used uninitialized [-Wmaybe-uninitialized]"
    • L784: time="2026-03-11T19:42:29Z" level=debug msg="/usr/include/c++/13.2.0/bits/shared_ptr_base.h:1071:28: warning: '((std::__shared_count<__gnu_cxx::_S_atomic>*)((char*)&next + offsetof(arrow::TransformFlow<std::shared_ptr<arrow::Buffer> >,arrow::TransformFlow<std::shared_ptr<arrow::Buffer> >::yield_value_.std::optional<std::shared_ptr<arrow::Buffer> >::<unnamed>.std::_Optional_base<std::shared_ptr<arrow::Buffer>, false, false>::_M_payload.std::_Optional_payload<std::shared_ptr<arrow::Buffer>, false, false, false>::<unnamed>.std::_Optional_payload<std::shared_ptr<arrow::Buffer>, true, false, false>::<unnamed>.std::_Optional_payload_base<std::shared_ptr<arrow::Buffer> >::_M_payload)))[1].std::__shared_count<>::_M_pi' may be used uninitialized [-Wmaybe-uninitialized]"

🤖 AI Build Log Analysis

  • Risk: medium
  • Summary: The libarrow 15.0.0-8.azl3 build completed successfully and produced all expected binary, devel, debuginfo, doc, and parquet RPMs. The CVE-2026-25087 patch was applied during %prep with patch -p1 --fuzz=0 and no patch failures were reported. Compilation and installation finished without errors; however, the build emitted several packaging-related warnings (missing/conditional files and "installed but unpackaged" items) and network repo sync errors that did not affect the build. Tests were disabled (--nocheck).
  • AI-detected warnings:
    • Network repo sync issues: tdnf reported 'Error(1207) : Could not resolve hostname' and 'Failed to synchronize cache for repo', but required packages were already present so the build continued.
    • rpmbuild: 'Could not canonicalize hostname' message during build environment setup.
    • Packaging: Directory not found and file not found warnings for disabled components (e.g., arrow/flight headers and CMake Find*.cmake files for ORC/gRPC/zstd/Thrift/Protobuf).
    • Packaging: 'Installed (but unpackaged) file(s) found' including ArrowAcero and ArrowDataset libraries and CMake config files, .pc files, and a gdb auto-load script, indicating the spec may not list these artifacts.
    • CMake: clang-tidy and clang-format not found; infer not found (non-fatal tooling warnings).

🧪 Test Log Analysis

No test log found (package may not have a %check section).


Patch Analysis

  • Match type: unknown
  • Risk assessment: unknown
  • Summary:
Raw diff (upstream vs PR)
--- upstream
+++ pr
@@ -1,771 +1,848 @@
-From a4ae90929d6e959e9a1fb29f3907bbbf2799472e Mon Sep 17 00:00:00 2001
-From: Antoine Pitrou <antoine@python.org>
-Date: Wed, 21 Jan 2026 17:54:00 +0100
-Subject: [PATCH] GH-48924: [C++][CI] Fuzz IPC file metadata pre-buffering
-
----
- ci/scripts/cpp_test.sh                 |   9 +
- cpp/src/arrow/ipc/read_write_test.cc   |  75 +++++----
- cpp/src/arrow/ipc/reader.cc            | 222 ++++++++++++++++---------
- cpp/src/arrow/ipc/test_common.cc       |  47 +++---
- cpp/src/arrow/type.h                   |  10 ++
- cpp/src/arrow/util/int_util_overflow.h |  33 ++++
- cpp/src/arrow/util/int_util_test.cc    |  18 ++
- 7 files changed, 286 insertions(+), 128 deletions(-)
-
-diff --git a/ci/scripts/cpp_test.sh b/ci/scripts/cpp_test.sh
-index 0ad59bc308f..5d6d5e099ab 100755
---- a/ci/scripts/cpp_test.sh
-+++ b/ci/scripts/cpp_test.sh
-@@ -182,6 +182,15 @@ if [ "${ARROW_FUZZING}" == "ON" ]; then
-     # Some fuzz regression files may trigger huge memory allocations,
-     # let the allocator return null instead of aborting.
-     export ASAN_OPTIONS="$ASAN_OPTIONS allocator_may_return_null=1"
-+    export ARROW_FUZZING_VERBOSITY=1
-+    # Run golden IPC integration files: these should ideally load without errors,
-+    # though some very old ones carry invalid data (such as decimal values
-+    # larger than their advertised precision).
-+    # shellcheck disable=SC2046
-+    "${binary_output_dir}/arrow-ipc-stream-fuzz" $(find "${ARROW_TEST_DATA}"/arrow-ipc-stream/integration -name "*.stream")
-+    # shellcheck disable=SC2046
-+    "${binary_output_dir}/arrow-ipc-file-fuzz" $(find "${ARROW_TEST_DATA}"/arrow-ipc-stream/integration -name "*.arrow_file")
-+    # Run known crash files
-     "${binary_output_dir}/arrow-ipc-stream-fuzz" "${ARROW_TEST_DATA}"/arrow-ipc-stream/crash-*
-     "${binary_output_dir}/arrow-ipc-stream-fuzz" "${ARROW_TEST_DATA}"/arrow-ipc-stream/*-testcase-*
-     "${binary_output_dir}/arrow-ipc-file-fuzz" "${ARROW_TEST_DATA}"/arrow-ipc-file/*-testcase-*
-diff --git a/cpp/src/arrow/ipc/read_write_test.cc b/cpp/src/arrow/ipc/read_write_test.cc
-index 315d8bd07d9..9f7df541bd7 100644
---- a/cpp/src/arrow/ipc/read_write_test.cc
-+++ b/cpp/src/arrow/ipc/read_write_test.cc
-@@ -1252,40 +1252,55 @@ struct FileGeneratorWriterHelper : public FileWriterHelper {
-   Status ReadBatches(const IpcReadOptions& options, RecordBatchVector* out_batches,
-                      ReadStats* out_stats = nullptr,
-                      MetadataVector* out_metadata_list = nullptr) override {
--    std::shared_ptr<io::RandomAccessFile> buf_reader;
--    if (kCoalesce) {
--      // Use a non-zero-copy enabled BufferReader so we can test paths properly
--      buf_reader = std::make_shared<NoZeroCopyBufferReader>(buffer_);
--    } else {
--      buf_reader = std::make_shared<io::BufferReader>(buffer_);
--    }
--    AsyncGenerator<std::shared_ptr<RecordBatch>> generator;
-+    // The generator doesn't track stats.
-+    EXPECT_EQ(nullptr, out_stats);
- 
--    {
--      auto fut = RecordBatchFileReader::OpenAsync(buf_reader, footer_offset_, options);
--      // Do NOT assert OK since some tests check whether this fails properly
--      EXPECT_FINISHES(fut);
--      ARROW_ASSIGN_OR_RAISE(auto reader, fut.result());
--      EXPECT_EQ(num_batches_written_, reader->num_record_batches());
--      // Generator will keep reader alive internally
--      ARROW_ASSIGN_OR_RAISE(generator, reader->GetRecordBatchGenerator(kCoalesce));
--    }
-+    auto read_batches = [&](bool pre_buffer) -> Result<RecordBatchVector> {
-+      std::shared_ptr<io::RandomAccessFile> buf_reader;
-+      if (kCoalesce) {
-+        // Use a non-zero-copy enabled BufferReader so we can test paths properly
-+        buf_reader = std::make_shared<NoZeroCopyBufferReader>(buffer_);
-+      } else {
-+        buf_reader = std::make_shared<io::BufferReader>(buffer_);
-+      }
-+      AsyncGenerator<std::shared_ptr<RecordBatch>> generator;
+diff --git a/SPECS/libarrow/CVE-2026-25087.patch b/SPECS/libarrow/CVE-2026-25087.patch
+new file mode 100644
+index 00000000000..6dc36806b10
+--- /dev/null
++++ b/SPECS/libarrow/CVE-2026-25087.patch
+@@ -0,0 +1,842 @@
++From a4ae90929d6e959e9a1fb29f3907bbbf2799472e Mon Sep 17 00:00:00 2001
++From: Antoine Pitrou <antoine@python.org>
++Date: Wed, 21 Jan 2026 17:54:00 +0100
++Subject: [PATCH] GH-48924: [C++][CI] Fuzz IPC file metadata pre-buffering
 +
-+      {
-+        auto fut = RecordBatchFileReader::OpenAsync(buf_reader, footer_offset_, options);
-+        ARROW_ASSIGN_OR_RAISE(auto reader, fut.result());
-+        EXPECT_EQ(num_batches_written_, reader->num_record_batches());
-+        if (pre_buffer) {
-+          RETURN_NOT_OK(reader->PreBufferMetadata(/*indices=*/{}));
-+        }
-+        // Generator will keep reader alive internally
-+        ARROW_ASSIGN_OR_RAISE(generator, reader->GetRecordBatchGenerator(kCoalesce));
-+      }
- 
--    // Generator is async-reentrant
--    std::vector<Future<std::shared_ptr<RecordBatch>>> futures;
-+      // Generator is async-reentrant
-+      std::vector<Future<std::shared_ptr<RecordBatch>>> futures;
-+      for (int i = 0; i < num_batches_written_; ++i) {
-+        futures.push_back(generator());
-+      }
-+      auto fut = generator();
-+      ARROW_ASSIGN_OR_RAISE(auto final_batch, fut.result());
-+      EXPECT_EQ(nullptr, final_batch);
++Upstream Patch reference: https://patch-diff.githubusercontent.com/raw/apache/arrow/pull/48925.patch
 +
-+      RecordBatchVector batches;
-+      for (auto& future : futures) {
-+        ARROW_ASSIGN_OR_RAISE(auto batch, future.result());
-+        EXPECT_NE(nullptr, batch);
-+        batches.push_back(batch);
-+      }
-+      return batches;
-+    };
++---
++ ci/scripts/cpp_test.sh                 |  12 ++
++ cpp/src/arrow/ipc/read_write_test.cc   |  75 +++++---
++ cpp/src/arrow/ipc/reader.cc            | 252 +++++++++++++++++--------
++ cpp/src/arrow/ipc/test_common.cc       |  47 +++--
++ cpp/src/arrow/type.h                   |  10 +
++ cpp/src/arrow/util/int_util_overflow.h |  33 ++++
++ cpp/src/arrow/util/int_util_test.cc    |  18 ++
++ 7 files changed, 316 insertions(+), 131 deletions(-)
 +
-+    ARROW_ASSIGN_OR_RAISE(*out_batches, read_batches(/*pre_buffer=*/false));
-+    // Also read with pre-buffered metadata, and check the results are equal
-+    ARROW_ASSIGN_OR_RAISE(auto batches_pre_buffered, read_batches(/*pre_buffer=*/true));
-     for (int i = 0; i < num_batches_written_; ++i) {
--      futures.push_back(generator());
--    }
--    auto fut = generator();
--    EXPECT_FINISHES_OK_AND_EQ(nullptr, fut);
--    for (auto& future : futures) {
--      EXPECT_FINISHES_OK_AND_ASSIGN(auto batch, future);
--      out_batches->push_back(batch);
-+      AssertBatchesEqual(*batches_pre_buffered[i], *(*out_batches)[i],
-+                         /*check_metadata=*/true);
-     }
--
--    // The generator doesn't track stats.
--    EXPECT_EQ(nullptr, out_stats);
--
-     return Status::OK();
-   }
- };
-diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc
-index 8e125fc5ede..f1571f76c24 100644
---- a/cpp/src/arrow/ipc/reader.cc
-+++ b/cpp/src/arrow/ipc/reader.cc
-@@ -54,6 +54,7 @@
- #include "arrow/util/compression.h"
- #include "arrow/util/endian.h"
- #include "arrow/util/fuzz_internal.h"
-+#include "arrow/util/int_util_overflow.h"
- #include "arrow/util/key_value_metadata.h"
- #include "arrow/util/logging_internal.h"
- #include "arrow/util/parallel.h"
-@@ -72,6 +73,7 @@ namespace arrow {
- 
- namespace flatbuf = org::apache::arrow::flatbuf;
- 
-+using internal::AddWithOverflow;
- using internal::checked_cast;
- using internal::checked_pointer_cast;
- 
-@@ -177,14 +179,16 @@ class ArrayLoader {
- 
-   explicit ArrayLoader(const flatbuf::RecordBatch* metadata,
-                        MetadataVersion metadata_version, const IpcReadOptions& options,
--                       int64_t file_offset)
-+                       int64_t file_offset, int64_t file_length)
-       : metadata_(metadata),
-         metadata_version_(metadata_version),
-         file_(nullptr),
-         file_offset_(file_offset),
-+        file_length_(file_length),
-         max_recursion_depth_(options.max_recursion_depth) {}
- 
-   Status ReadBuffer(int64_t offset, int64_t length, std::shared_ptr<Buffer>* out) {
-+    // This construct permits overriding GetBuffer at compile time
-     if (skip_io_) {
-       return Status::OK();
-     }
-@@ -194,7 +198,10 @@ class ArrayLoader {
-     if (length < 0) {
-       return Status::Invalid("Negative length for reading buffer ", buffer_index_);
-     }
--    // This construct permits overriding GetBuffer at compile time
-+    auto read_end = AddWithOverflow({offset, length});
-+    if (!read_end.has_value() || (file_length_.has_value() && read_end > file_length_)) {
-+      return Status::Invalid("Buffer ", buffer_index_, " exceeds IPC file area");
-+    }
-     if (!bit_util::IsMultipleOf8(offset)) {
-       return Status::Invalid("Buffer ", buffer_index_,
-                              " did not start on 8-byte aligned offset: ", offset);
-@@ -202,6 +209,9 @@ class ArrayLoader {
-     if (file_) {
-       return file_->ReadAt(offset, length).Value(out);
-     } else {
-+      if (!AddWithOverflow({read_end.value(), file_offset_}).has_value()) {
-+        return Status::Invalid("Buffer ", buffer_index_, " exceeds IPC file area");
-+      }
-       read_request_.RequestRange(offset + file_offset_, length, out);
-       return Status::OK();
-     }
-@@ -292,6 +302,16 @@ class ArrayLoader {
-     // we can skip that buffer without reading from shared memory
-     RETURN_NOT_OK(GetFieldMetadata(field_index_++, out_));
- 
-+    if (::arrow::internal::has_variadic_buffers(type_id)) {
-+      ARROW_ASSIGN_OR_RAISE(auto data_buffer_count,
-+                            GetVariadicCount(variadic_count_index_++));
-+      const int64_t start = static_cast<int64_t>(out_->buffers.size());
-+      // NOTE: this must be done before any other call to `GetBuffer` because
-+      // BatchDataReadRequest will keep pointers to `std::shared_ptr<Buffer>`
-+      // objects.
-+      out_->buffers.resize(start + data_buffer_count);
-+    }
++diff --git a/ci/scripts/cpp_test.sh b/ci/scripts/cpp_test.sh
++index 0c6e1c6..1110378 100755
++--- a/ci/scripts/cpp_test.sh
+++++ b/ci/scripts/cpp_test.sh
++@@ -107,6 +107,18 @@ fi
++ 
++ if [ "${ARROW_FUZZING}" == "ON" ]; then
++     # Fuzzing regression tests
+++    # Some fuzz regression files may trigger huge memory allocations,
+++    # let the allocator return null instead of aborting.
+++    export ASAN_OPTIONS="$ASAN_OPTIONS allocator_may_return_null=1"
+++    export ARROW_FUZZING_VERBOSITY=1
+++    # Run golden IPC integration files: these should ideally load without errors,
+++    # though some very old ones carry invalid data (such as decimal values
+++    # larger than their advertised precision).
+++    # shellcheck disable=SC2046
+++    "${binary_output_dir}/arrow-ipc-stream-fuzz" $(find "${ARROW_TEST_DATA}"/arrow-ipc-stream/integration -name "*.stream")
+++    # shellcheck disable=SC2046
+++    "${binary_output_dir}/arrow-ipc-file-fuzz" $(find "${ARROW_TEST_DATA}"/arrow-ipc-stream/integration -name "*.arrow_file")
+++    # Run known crash files
++     ${binary_output_dir}/arrow-ipc-stream-fuzz ${ARROW_TEST_DATA}/arrow-ipc-stream/crash-*
++     ${binary_output_dir}/arrow-ipc-stream-fuzz ${ARROW_TEST_DATA}/arrow-ipc-stream/*-testcase-*
++     ${binary_output_dir}/arrow-ipc-file-fuzz ${ARROW_TEST_DATA}/arrow-ipc-file/*-testcase-*
++diff --git a/cpp/src/arrow/ipc/read_write_test.cc b/cpp/src/arrow/ipc/read_write_test.cc
++index bd2c2b7..af749ec 100644
++--- a/cpp/src/arrow/ipc/read_write_test.cc
+++++ b/cpp/src/arrow/ipc/read_write_test.cc
++@@ -1220,40 +1220,55 @@ struct FileGeneratorWriterHelper : public FileWriterHelper {
++   Status ReadBatches(const IpcReadOptions& options, RecordBatchVector* out_batches,
++                      ReadStats* out_stats = nullptr,
++                      MetadataVector* out_metadata_list = nullptr) override {
++-    std::shared_ptr<io::RandomAccessFile> buf_reader;
++-    if (kCoalesce) {
++-      // Use a non-zero-copy enabled BufferReader so we can test paths properly
++-      buf_reader = std::make_shared<NoZeroCopyBufferReader>(buffer_);
++-    } else {
++-      buf_reader = std::make_shared<io::BufferReader>(buffer_);
++-    }
++-    AsyncGenerator<std::shared_ptr<RecordBatch>> generator;
+++    // The generator doesn't track stats.
+++    EXPECT_EQ(nullptr, out_stats);
++ 
++-    {
++-      auto fut = RecordBatchFileReader::OpenAsync(buf_reader, footer_offset_, options);
++-      // Do NOT assert OK since some tests check whether this fails properly
++-      EXPECT_FINISHES(fut);
++-      ARROW_ASSIGN_OR_RAISE(auto reader, fut.result());
++-      EXPECT_EQ(num_batches_written_, reader->num_record_batches());
++-      // Generator will keep reader alive internally
++-      ARROW_ASSIGN_OR_RAISE(generator, reader->GetRecordBatchGenerator(kCoalesce));
++-    }
+++    auto read_batches = [&](bool pre_buffer) -> Result<RecordBatchVector> {
+++      std::shared_ptr<io::RandomAccessFile> buf_reader;
+++      if (kCoalesce) {
+++        // Use a non-zero-copy enabled BufferReader so we can test paths properly
+++        buf_reader = std::make_shared<NoZeroCopyBufferReader>(buffer_);
+++      } else {
+++        buf_reader = std::make_shared<io::BufferReader>(buffer_);
+++      }
+++      AsyncGenerator<std::shared_ptr<RecordBatch>> generator;
+++
+++      {
+++        auto fut = RecordBatchFileReader::OpenAsync(buf_reader, footer_offset_, options);
+++        ARROW_ASSIGN_OR_RAISE(auto reader, fut.result());
+++        EXPECT_EQ(num_batches_written_, reader->num_record_batches());
+++        if (pre_buffer) {
+++          RETURN_NOT_OK(reader->PreBufferMetadata(/*indices=*/{}));
+++        }
+++        // Generator will keep reader alive internally
+++        ARROW_ASSIGN_OR_RAISE(generator, reader->GetRecordBatchGenerator(kCoalesce));
+++      }
++ 
++-    // Generator is async-reentrant
++-    std::vector<Future<std::shared_ptr<RecordBatch>>> futures;
+++      // Generator is async-reentrant
+++      std::vector<Future<std::shared_ptr<RecordBatch>>> futures;
+++      for (int i = 0; i < num_batches_written_; ++i) {
+++        futures.push_back(generator());
+++      }
+++      auto fut = generator();
+++      ARROW_ASSIGN_OR_RAISE(auto final_batch, fut.result());
+++      EXPECT_EQ(nullptr, final_batch);
+++
+++      RecordBatchVector batches;
+++      for (auto& future : futures) {
+++        ARROW_ASSIGN_OR_RAISE(auto batch, future.result());
+++        EXPECT_NE(nullptr, batch);
+++        batches.push_back(batch);
+++      }
+++      return batches;
+++    };
+++
+++    ARROW_ASSIGN_OR_RAISE(*out_batches, read_batches(/*pre_buffer=*/false));
+++    // Also read with pre-buffered metadata, and check the results are equal
+++    ARROW_ASSIGN_OR_RAISE(auto batches_pre_buffered, read_batches(/*pre_buffer=*/true));
++     for (int i = 0; i < num_batches_written_; ++i) {
++-      futures.push_back(generator());
++-    }
++-    auto fut = generator();
++-    EXPECT_FINISHES_OK_AND_EQ(nullptr, fut);
++-    for (auto& future : futures) {
++-      EXPECT_FINISHES_OK_AND_ASSIGN(auto batch, future);
++-      out_batches->push_back(batch);
+++      AssertBatchesEqual(*batches_pre_buffered[i], *(*out_batches)[i],
+++                         /*check_metadata=*/true);
++     }
++-
++-    // The generator doesn't track stats.
++-    EXPECT_EQ(nullptr, out_stats);
++-
++     return Status::OK();
++   }
++ };
++diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc
++index d272c78..3a2987b 100644
++--- a/cpp/src/arrow/ipc/reader.cc
+++++ b/cpp/src/arrow/ipc/reader.cc
++@@ -52,6 +52,7 @@
++ #include "arrow/util/checked_cast.h"
++ #include "arrow/util/compression.h"
++ #include "arrow/util/endian.h"
+++#include "arrow/util/int_util_overflow.h"
++ #include "arrow/util/key_value_metadata.h"
++ #include "arrow/util/logging.h"
++ #include "arrow/util/parallel.h"
++@@ -73,6 +74,8 @@ namespace flatbuf = org::apache::arrow::flatbuf;
++ using internal::checked_cast;
++ using internal::checked_pointer_cast;
++ 
+++using internal::AddWithOverflow;
+++
++ namespace ipc {
++ 
++ using internal::FileBlock;
++@@ -166,23 +169,26 @@ class ArrayLoader {
++  public:
++   explicit ArrayLoader(const flatbuf::RecordBatch* metadata,
++                        MetadataVersion metadata_version, const IpcReadOptions& options,
++-                       io::RandomAccessFile* file)
+++                       io::RandomAccessFile* file, std::optional<int64_t> file_length)
++       : metadata_(metadata),
++         metadata_version_(metadata_version),
++         file_(file),
++         file_offset_(0),
+++        file_length_(file_length),
++         max_recursion_depth_(options.max_recursion_depth) {}
++ 
++   explicit ArrayLoader(const flatbuf::RecordBatch* metadata,
++                        MetadataVersion metadata_version, const IpcReadOptions& options,
++-                       int64_t file_offset)
+++                       int64_t file_offset, std::optional<int64_t> file_length)
++       : metadata_(metadata),
++         metadata_version_(metadata_version),
++         file_(nullptr),
++         file_offset_(file_offset),
+++        file_length_(file_length),
++         max_recursion_depth_(options.max_recursion_depth) {}
++ 
++   Status ReadBuffer(int64_t offset, int64_t length, std::shared_ptr<Buffer>* out) {
+++    // This construct permits overriding GetBuffer at compile time
++     if (skip_io_) {
++       return Status::OK();
++     }
++@@ -192,7 +198,10 @@ class ArrayLoader {
++     if (length < 0) {
++       return Status::Invalid("Negative length for reading buffer ", buffer_index_);
++     }
++-    // This construct permits overriding GetBuffer at compile time
+++    auto read_end = AddWithOverflow({offset, length});
+++    if (!read_end.has_value() || (file_length_.has_value() && read_end > file_length_)) {
+++      return Status::Invalid("Buffer ", buffer_index_, " exceeds IPC file area");
+++    }
++     if (!bit_util::IsMultipleOf8(offset)) {
++       return Status::Invalid("Buffer ", buffer_index_,
++                              " did not start on 8-byte aligned offset: ", offset);
++@@ -200,6 +209,9 @@ class ArrayLoader {
++     if (file_) {
++       return file_->ReadAt(offset, length).Value(out);
++     } else {
+++      if (!AddWithOverflow({read_end.value(), file_offset_}).has_value()) {
+++        return Status::Invalid("Buffer ", buffer_index_, " exceeds IPC file area");
+++      }
++       read_request_.RequestRange(offset + file_offset_, length, out);
++       return Status::OK();
++     }
++@@ -284,6 +296,16 @@ class ArrayLoader {
++     // we can skip that buffer without reading from shared memory
++     RETURN_NOT_OK(GetFieldMetadata(field_index_++, out_));
++ 
+++    if (::arrow::internal::has_variadic_buffers(type_id)) {
+++      ARROW_ASSIGN_OR_RAISE(auto data_buffer_count,
+++                            GetVariadicCount(variadic_count_index_++));
+++      const int64_t start = static_cast<int64_t>(out_->buffers.size());
+++      // NOTE: this must be done before any other call to `GetBuffer` because
+++      // BatchDataReadRequest will keep pointers to `std::shared_ptr<Buffer>`
+++      // objects.
+++      out_->buffers.resize(start + data_buffer_count);
+++    }
+++
++     if (internal::HasValidityBitmap(type_id, metadata_version_)) {
++       // Extract null_bitmap which is common to all arrays except for unions
++       // and nulls.
++@@ -292,6 +314,7 @@ class ArrayLoader {
++       }
++       buffer_index_++;
++     }
+++
++     return Status::OK();
++   }
++ 
++@@ -390,14 +413,9 @@ class ArrayLoader {
++   Status Visit(const BinaryViewType& type) {
++     out_->buffers.resize(2);
++ 
++-    RETURN_NOT_OK(LoadCommon(type.id()));
++-    RETURN_NOT_OK(GetBuffer(buffer_index_++, &out_->buffers[1]));
++-
++-    ARROW_ASSIGN_OR_RAISE(auto data_buffer_count,
++-                          GetVariadicCount(variadic_count_index_++));
++-    out_->buffers.resize(data_buffer_count + 2);
++-    for (size_t i = 0; i < data_buffer_count; ++i) {
++-      RETURN_NOT_OK(GetBuffer(buffer_index_++, &out_->buffers[i + 2]));
+++    RETURN_NOT_OK(LoadCommon(type.id()));  // also initializes variadic buffers
+++    for (int64_t i = 1; i < static_cast<int64_t>(out_->buffers.size()); ++i) {
+++      RETURN_NOT_OK(GetBuffer(buffer_index_++, &out_->buffers[i]));
++     }
++     return Status::OK();
++   }
++@@ -495,6 +513,7 @@ class ArrayLoader {
++   const MetadataVersion metadata_version_;
++   io::RandomAccessFile* file_;
++   int64_t file_offset_;
+++  std::optional<int64_t> file_length_;
++   int max_recursion_depth_;
++   int buffer_index_ = 0;
++   int field_index_ = 0;
++@@ -583,7 +602,12 @@ Result<std::shared_ptr<RecordBatch>> LoadRecordBatchSubset(
++     const flatbuf::RecordBatch* metadata, const std::shared_ptr<Schema>& schema,
++     const std::vector<bool>* inclusion_mask, const IpcReadContext& context,
++     io::RandomAccessFile* file) {
++-  ArrayLoader loader(metadata, context.metadata_version, context.options, file);
+++  std::optional<int64_t> file_length;
+++  if (file) {
+++    ARROW_ASSIGN_OR_RAISE(file_length, file->GetSize());
+++  }
+++  ArrayLoader loader(metadata, context.metadata_version, context.options, file,
+++                     file_length);
++ 
++   ArrayDataVector columns(schema->num_fields());
++   ArrayDataVector filtered_columns;
++@@ -832,8 +856,12 @@ Status ReadDictionary(const Buffer& metadata, const IpcReadContext& context,
++   ARROW_ASSIGN_OR_RAISE(auto value_type, context.dictionary_memo->GetDictionaryType(id));
++ 
++   // Load the dictionary data from the dictionary batch
+++  std::optional<int64_t> file_length;
+++  if (file) {
+++    ARROW_ASSIGN_OR_RAISE(file_length, file->GetSize());
+++  }
++   ArrayLoader loader(batch_meta, internal::GetMetadataVersion(message->version()),
++-                     context.options, file);
+++                     context.options, file, file_length);
++   auto dict_data = std::make_shared<ArrayData>();
++   const Field dummy_field("", value_type);
++   RETURN_NOT_OK(loader.Load(&dummy_field, dict_data.get()));
++@@ -1152,8 +1180,19 @@ Result<std::shared_ptr<RecordBatchStreamReader>> RecordBatchStreamReader::Open(
++ 
++ // Common functions used in both the random-access file reader and the
++ // asynchronous generator
++-static inline FileBlock FileBlockFromFlatbuffer(const flatbuf::Block* block) {
++-  return FileBlock{block->offset(), block->metaDataLength(), block->bodyLength()};
+++static inline Result<FileBlock> FileBlockFromFlatbuffer(const flatbuf::Block* fb_block,
+++                                                        int64_t max_offset) {
+++  auto block =
+++      FileBlock{fb_block->offset(), fb_block->metaDataLength(), fb_block->bodyLength()};
+++  if (block.metadata_length < 0 || block.body_length < 0 || block.offset < 0) {
+++    return Status::IOError("Invalid Block in IPC file footer");
+++  }
+++  auto block_end =
+++      AddWithOverflow<int64_t>({block.offset, block.metadata_length, block.body_length});
+++  if (!block_end.has_value() || block_end > max_offset) {
+++    return Status::IOError("Invalid Block in IPC file footer");
+++  }
+++  return block;
++ }
++ 
++ Status CheckAligned(const FileBlock& block) {
++@@ -1267,7 +1306,11 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
++                                  const std::shared_ptr<Schema>& schema,
++                                  const std::vector<bool>* inclusion_mask,
++                                  MetadataVersion metadata_version = MetadataVersion::V5) {
++-    ArrayLoader loader(metadata, metadata_version, options, file);
+++    std::optional<int64_t> file_length;
+++  if (file) {
+++    ARROW_ASSIGN_OR_RAISE(file_length, file->GetSize());
+++  }
+++  ArrayLoader loader(metadata, metadata_version, options, file, file_length);
++     for (int i = 0; i < schema->num_fields(); ++i) {
++       const Field& field = *schema->field(i);
++       if (!inclusion_mask || (*inclusion_mask)[i]) {
++@@ -1336,8 +1379,9 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
++                                 read_options, file, schema, &inclusion_mask);
++       };
++     }
+++    ARROW_ASSIGN_OR_RAISE(auto block, GetRecordBatchBlock(i));
++     ARROW_ASSIGN_OR_RAISE(auto message,
++-                          ReadMessageFromBlock(GetRecordBatchBlock(i), fields_loader));
+++                          ReadMessageFromBlock(block, fields_loader));
++ 
++     CHECK_HAS_BODY(*message);
++     ARROW_ASSIGN_OR_RAISE(auto reader, Buffer::GetReader(message->body()));
++@@ -1353,8 +1397,9 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
++   Result<int64_t> CountRows() override {
++     int64_t total = 0;
++     for (int i = 0; i < num_record_batches(); i++) {
+++      ARROW_ASSIGN_OR_RAISE(auto block, GetRecordBatchBlock(i));
++       ARROW_ASSIGN_OR_RAISE(auto outer_message,
++-                            ReadMessageFromBlock(GetRecordBatchBlock(i)));
+++                            ReadMessageFromBlock(block));
++       auto metadata = outer_message->metadata();
++       const flatbuf::Message* message = nullptr;
++       RETURN_NOT_OK(
++@@ -1468,13 +1513,13 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
++ 
++   Status DoPreBufferMetadata(const std::vector<int>& indices) {
++     RETURN_NOT_OK(CacheMetadata(indices));
++-    EnsureDictionaryReadStarted();
+++    RETURN_NOT_OK(EnsureDictionaryReadStarted());
++     Future<> all_metadata_ready = WaitForMetadatas(indices);
++     for (int index : indices) {
++       Future<std::shared_ptr<Message>> metadata_loaded =
++           all_metadata_ready.Then([this, index]() -> Result<std::shared_ptr<Message>> {
++             stats_.num_messages.fetch_add(1, std::memory_order_relaxed);
++-            FileBlock block = GetRecordBatchBlock(index);
+++            ARROW_ASSIGN_OR_RAISE(FileBlock block, GetRecordBatchBlock(index));
++             ARROW_ASSIGN_OR_RAISE(
++                 std::shared_ptr<Buffer> metadata,
++                 metadata_cache_->Read({block.offset, block.metadata_length}));
++@@ -1523,12 +1568,12 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
++     }
++   };
++ 
++-  FileBlock GetRecordBatchBlock(int i) const {
++-    return FileBlockFromFlatbuffer(footer_->recordBatches()->Get(i));
+++  Result<FileBlock> GetRecordBatchBlock(int i) const {
+++    return FileBlockFromFlatbuffer(footer_->recordBatches()->Get(i), footer_offset_);
++   }
++ 
++-  FileBlock GetDictionaryBlock(int i) const {
++-    return FileBlockFromFlatbuffer(footer_->dictionaries()->Get(i));
+++  Result<FileBlock> GetDictionaryBlock(int i) const {
+++    return FileBlockFromFlatbuffer(footer_->dictionaries()->Get(i), footer_offset_);
++   }
++ 
++   Result<std::unique_ptr<Message>> ReadMessageFromBlock(
++@@ -1541,16 +1586,26 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
++ 
++   Status ReadDictionaries() {
++     // Read all the dictionaries
+++    std::vector<std::shared_ptr<Message>> messages(num_dictionaries());
+++    for (int i = 0; i < num_dictionaries(); ++i) {
+++      ARROW_ASSIGN_OR_RAISE(FileBlock block, GetDictionaryBlock(i));
+++      ARROW_ASSIGN_OR_RAISE(messages[i], ReadMessageFromBlock(block));
+++    }
+++    return ReadDictionaries(messages);
+++  }
+++
+++  Status ReadDictionaries(
+++      const std::vector<std::shared_ptr<Message>>& dictionary_messages) {
+++    DCHECK_EQ(dictionary_messages.size(), static_cast<size_t>(num_dictionaries()));
++     IpcReadContext context(&dictionary_memo_, options_, swap_endian_);
++     for (int i = 0; i < num_dictionaries(); ++i) {
++-      ARROW_ASSIGN_OR_RAISE(auto message, ReadMessageFromBlock(GetDictionaryBlock(i)));
++-      RETURN_NOT_OK(ReadOneDictionary(message.get(), context));
++-      stats_.num_dictionary_batches.fetch_add(1, std::memory_order_relaxed);
+++      RETURN_NOT_OK(ReadOneDictionary(i, dictionary_messages[i].get(), context));
++     }
++     return Status::OK();
++   }
++ 
++-  Status ReadOneDictionary(Message* message, const IpcReadContext& context) {
+++  Status ReadOneDictionary(int dict_index, Message* message,
+++                           const IpcReadContext& context) {
++     CHECK_HAS_BODY(*message);
++     ARROW_ASSIGN_OR_RAISE(auto reader, Buffer::GetReader(message->body()));
++     DictionaryKind kind;
++@@ -1560,44 +1615,48 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
++     } else if (kind == DictionaryKind::Delta) {
++       stats_.num_dictionary_deltas.fetch_add(1, std::memory_order_relaxed);
++     }
+++    stats_.num_dictionary_batches.fetch_add(1, std::memory_order_relaxed);
++     return Status::OK();
++   }
++ 
++-  void AddDictionaryRanges(std::vector<io::ReadRange>* ranges) const {
+++  Status AddDictionaryRanges(std::vector<io::ReadRange>* ranges) const {
++     // Adds all dictionaries to the range cache
++     for (int i = 0; i < num_dictionaries(); ++i) {
++-      FileBlock block = GetDictionaryBlock(i);
+++      ARROW_ASSIGN_OR_RAISE(FileBlock block, GetDictionaryBlock(i));
++       ranges->push_back({block.offset, block.metadata_length + block.body_length});
++     }
+++    return Status::OK();
++   }
++ 
++-  void AddMetadataRanges(const std::vector<int>& indices,
++-                         std::vector<io::ReadRange>* ranges) {
+++  Status AddMetadataRanges(const std::vector<int>& indices,
+++                           std::vector<io::ReadRange>* ranges) {
++     for (int index : indices) {
++-      FileBlock block = GetRecordBatchBlock(static_cast<int>(index));
+++      ARROW_ASSIGN_OR_RAISE(FileBlock block, GetRecordBatchBlock(index));
++       ranges->push_back({block.offset, block.metadata_length});
++     }
+++    return Status::OK();
++   }
++ 
++   Status CacheMetadata(const std::vector<int>& indices) {
++     std::vector<io::ReadRange> ranges;
++     if (!read_dictionaries_) {
++-      AddDictionaryRanges(&ranges);
+++      RETURN_NOT_OK(AddDictionaryRanges(&ranges));
++     }
++-    AddMetadataRanges(indices, &ranges);
+++    RETURN_NOT_OK(AddMetadataRanges(indices, &ranges));
++     return metadata_cache_->Cache(std::move(ranges));
++   }
++ 
++-  void EnsureDictionaryReadStarted() {
+++  Status EnsureDictionaryReadStarted() {
++     if (!dictionary_load_finished_.is_valid()) {
++       read_dictionaries_ = true;
++       std::vector<io::ReadRange> ranges;
++-      AddDictionaryRanges(&ranges);
+++      RETURN_NOT_OK(AddDictionaryRanges(&ranges));
++       dictionary_load_finished_ =
++           metadata_cache_->WaitFor(std::move(ranges)).Then([this] {
++             return ReadDictionaries();
++           });
++     }
+++    return Status::OK();
++   }
++ 
++   Status WaitForDictionaryReadFinished() {
++@@ -1615,7 +1674,7 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
++ 
++   Future<> WaitForMetadatas(const std::vector<int>& indices) {
++     std::vector<io::ReadRange> ranges;
++-    AddMetadataRanges(indices, &ranges);
+++    RETURN_NOT_OK(AddMetadataRanges(indices, &ranges));
++     return metadata_cache_->WaitFor(std::move(ranges));
++   }
++ 
++@@ -1659,12 +1718,13 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
++                                  const flatbuf::RecordBatch* batch,
++                                  IpcReadContext context, io::RandomAccessFile* file,
++                                  std::shared_ptr<io::RandomAccessFile> owned_file,
++-                                 int64_t block_data_offset)
+++                                 int64_t block_data_offset, int64_t block_data_length)
++         : schema(std::move(sch)),
++           context(std::move(context)),
++           file(file),
++           owned_file(std::move(owned_file)),
++-          loader(batch, context.metadata_version, context.options, block_data_offset),
+++          loader(batch, context.metadata_version, context.options, block_data_offset,
+++                 block_data_length),
++           columns(schema->num_fields()),
++           cache(file, file->io_context(), io::CacheOptions::LazyDefaults()),
++           length(batch->length()) {}
++@@ -1763,14 +1823,15 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
++     return dictionary_load_finished_.Then([message_fut] { return message_fut; })
++         .Then([this, index](const std::shared_ptr<Message>& message_obj)
++                   -> Future<std::shared_ptr<RecordBatch>> {
++-          FileBlock block = GetRecordBatchBlock(index);
+++          ARROW_ASSIGN_OR_RAISE(auto block, GetRecordBatchBlock(index));
++           ARROW_ASSIGN_OR_RAISE(auto message, GetFlatbufMessage(message_obj));
++           ARROW_ASSIGN_OR_RAISE(auto batch, GetBatchFromMessage(message));
++           ARROW_ASSIGN_OR_RAISE(auto context, GetIpcReadContext(message, batch));
++ 
++           auto read_context = std::make_shared<CachedRecordBatchReadContext>(
++               schema_, batch, std::move(context), file_, owned_file_,
++-              block.offset + static_cast<int64_t>(block.metadata_length));
+++              block.offset + static_cast<int64_t>(block.metadata_length),
+++              block.body_length);
++           RETURN_NOT_OK(read_context->CalculateLoadRequest());
++           return read_context->ReadAsync().Then(
++               [read_context] { return read_context->CreateRecordBatch(); });
++@@ -1958,25 +2019,31 @@ Future<WholeIpcFileRecordBatchGenerator::Item>
++ WholeIpcFileRecordBatchGenerator::operator()() {
++   auto state = state_;
++   if (!read_dictionaries_.is_valid()) {
++-    std::vector<Future<std::shared_ptr<Message>>> messages(state->num_dictionaries());
++-    for (int i = 0; i < state->num_dictionaries(); i++) {
++-      auto block = FileBlockFromFlatbuffer(state->footer_->dictionaries()->Get(i));
++-      messages[i] = ReadBlock(block);
++-    }
++-    auto read_messages = All(std::move(messages));
++-    if (executor_) read_messages = executor_->Transfer(read_messages);
++-    read_dictionaries_ = read_messages.Then(
++-        [=](const std::vector<Result<std::shared_ptr<Message>>>& maybe_messages)
++-            -> Status {
++-          ARROW_ASSIGN_OR_RAISE(auto messages,
++-                                arrow::internal::UnwrapOrRaise(maybe_messages));
++-          return ReadDictionaries(state.get(), std::move(messages));
++-        });
+++    if (state->dictionary_load_finished_.is_valid()) {
+++      // PreBufferMetadata has started reading dictionaries in the background
+++      read_dictionaries_ = state->dictionary_load_finished_;
+++    } else {
+++      // Start reading dictionaries
+++      std::vector<Future<std::shared_ptr<Message>>> messages(state->num_dictionaries());
+++      for (int i = 0; i < state->num_dictionaries(); i++) {
+++        ARROW_ASSIGN_OR_RAISE(auto block, state->GetDictionaryBlock(i));
+++        messages[i] = ReadBlock(block);
+++      }
+++      auto read_messages = All(std::move(messages));
+++      if (executor_) read_messages = executor_->Transfer(read_messages);
+++      read_dictionaries_ = read_messages.Then(
+++          [=](const std::vector<Result<std::shared_ptr<Message>>>& maybe_messages)
+++              -> Status {
+++            ARROW_ASSIGN_OR_RAISE(auto messages,
+++                                  arrow::internal::UnwrapOrRaise(maybe_messages));
+++            return state->ReadDictionaries(messages);
+++          });
+++    }
++   }
++   if (index_ >= state_->num_record_batches()) {
++     return Future<Item>::MakeFinished(IterationTraits<Item>::End());
++   }
++-  auto block = FileBlockFromFlatbuffer(state->footer_->recordBatches()->Get(index_++));
+++  ARROW_ASSIGN_OR_RAISE(auto block, state->GetRecordBatchBlock(index_++));
++   auto read_message = ReadBlock(block);
++   auto read_messages = read_dictionaries_.Then([read_message]() { return read_message; });
++   // Force transfer. This may be wasteful in some cases, but ensures we get off the
++@@ -2012,16 +2079,6 @@ Future<std::shared_ptr<Message>> WholeIpcFileRecordBatchGenerator::ReadBlock(
++   }
++ }
++ 
++-Status WholeIpcFileRecordBatchGenerator::ReadDictionaries(
++-    RecordBatchFileReaderImpl* state,
++-    std::vector<std::shared_ptr<Message>> dictionary_messages) {
++-  IpcReadContext context(&state->dictionary_memo_, state->options_, state->swap_endian_);
++-  for (const auto& message : dictionary_messages) {
++-    RETURN_NOT_OK(state->ReadOneDictionary(message.get(), context));
++-  }
++-  return Status::OK();
++-}
++-
++ Result<std::shared_ptr<RecordBatch>> WholeIpcFileRecordBatchGenerator::ReadRecordBatch(
++     RecordBatchFileReaderImpl* state, Message* message) {
++   CHECK_HAS_BODY(*message);
++@@ -2598,23 +2655,37 @@ Status ValidateFuzzBatch(const RecordBatch& batch) {
++   return st;
++ }
++ 
+++Status ValidateFuzzBatch(const RecordBatchWithMetadata& batch) {
+++  if (batch.batch) {
+++    RETURN_NOT_OK(ValidateFuzzBatch(*batch.batch));
+++  }
+++  // XXX do something with custom metadata?
+++  return Status::OK();
+++}
+++
++ }  // namespace
++ 
+++IpcReadOptions FuzzingOptions() {
+++  IpcReadOptions options;
+++  options.memory_pool = default_memory_pool();
+++  options.max_recursion_depth = 256;
+++  return options;
+++}
+++
++ Status FuzzIpcStream(const uint8_t* data, int64_t size) {
++   auto buffer = std::make_shared<Buffer>(data, size);
++   io::BufferReader buffer_reader(buffer);
++ 
++-  std::shared_ptr<RecordBatchReader> batch_reader;
++-  ARROW_ASSIGN_OR_RAISE(batch_reader, RecordBatchStreamReader::Open(&buffer_reader));
+++  ARROW_ASSIGN_OR_RAISE(auto batch_reader, RecordBatchStreamReader::Open(&buffer_reader));
++   Status st;
++ 
++   while (true) {
++-    std::shared_ptr<arrow::RecordBatch> batch;
++-    RETURN_NOT_OK(batch_reader->ReadNext(&batch));
++-    if (batch == nullptr) {
+++    ARROW_ASSIGN_OR_RAISE(auto batch, batch_reader->ReadNext());
+++    if (!batch.batch && !batch.custom_metadata) {
+++      // EOS
++       break;
++     }
++-    st &= ValidateFuzzBatch(*batch);
+++    st &= ValidateFuzzBatch(batch);
++   }
++ 
++   return st;
++@@ -2622,19 +2693,36 @@ Status FuzzIpcStream(const uint8_t* data, int64_t size) {
++ 
++ Status FuzzIpcFile(const uint8_t* data, int64_t size) {
++   auto buffer = std::make_shared<Buffer>(data, size);
++-  io::BufferReader buffer_reader(buffer);
++ 
++-  std::shared_ptr<RecordBatchFileReader> batch_reader;
++-  ARROW_ASSIGN_OR_RAISE(batch_reader, RecordBatchFileReader::Open(&buffer_reader));
++-  Status st;
+++  Status final_status;
+++
+++  auto do_read = [&](bool pre_buffer) {
+++    io::BufferReader buffer_reader(buffer);
+++    ARROW_ASSIGN_OR_RAISE(auto batch_reader,
+++                          RecordBatchFileReader::Open(&buffer_reader, FuzzingOptions()));
+++    if (pre_buffer) {
+++      // Pre-buffer all record batches
+++      RETURN_NOT_OK(batch_reader->PreBufferMetadata(/*indices=*/{}));
+++    }
++ 
++-  const int n_batches = batch_reader->num_record_batches();
++-  for (int i = 0; i < n_batches; ++i) {
++-    ARROW_ASSIGN_OR_RAISE(auto batch, batch_reader->ReadRecordBatch(i));
++-    st &= ValidateFuzzBatch(*batch);
+++    const int n_batches = batch_reader->num_record_batches();
+++    for (int i = 0; i < n_batches; ++i) {
+++      RecordBatchWithMetadata batch;
+++      auto st = batch_reader->ReadRecordBatchWithCustomMetadata(i).Value(&batch);
+++      final_status &= st;
+++      if (!st.ok()) {
+++        continue;
+++      }
+++      final_status &= ValidateFuzzBatch(batch);
+++    }
+++    return Status::OK();
+++  };
+++
+++  for (const bool pre_buffer : {false, true}) {
+++    final_status &= do_read(pre_buffer);
++   }
++ 
++-  return st;
+++  return final_status;
++ }
++ 
++ Status FuzzIpcTensorStream(const uint8_t* data, int64_t size) {
++diff --git a/cpp/src/arrow/ipc/test_common.cc b/cpp/src/arrow/ipc/test_common.cc
++index 87c02e2..3a632fe 100644
++--- a/cpp/src/arrow/ipc/test_common.cc
+++++ b/cpp/src/arrow/ipc/test_common.cc
++@@ -16,6 +16,7 @@
++ // under the License.
++ 
++ #include <algorithm>
+++#include <concepts>
++ #include <cstdint>
++ #include <functional>
++ #include <memory>
++@@ -362,19 +363,27 @@ Status MakeRandomStringArray(int64_t length, bool include_nulls, MemoryPool* poo
++   return builder.Finish(out);
++ }
++ 
++-template <class BuilderType>
++-static Status MakeBinaryArrayWithUniqueValues(int64_t length, bool include_nulls,
++-                                              MemoryPool* pool,
++-                                              std::shared_ptr<Array>* out) {
++-  BuilderType builder(pool);
+++template <std::derived_from<ArrayBuilder> BuilderType>
+++static Result<std::shared_ptr<Array>> MakeBinaryArrayWithUniqueValues(
+++    BuilderType builder, int64_t length, bool include_nulls) {
+++  if constexpr (std::is_base_of_v<BinaryViewBuilder, BuilderType>) {
+++    // Try to emit several variadic buffers by choosing a small block size.
+++    builder.SetBlockSize(512);
+++  }
++   for (int64_t i = 0; i < length; ++i) {
++     if (include_nulls && (i % 7 == 0)) {
++       RETURN_NOT_OK(builder.AppendNull());
++     } else {
++-      RETURN_NOT_OK(builder.Append(std::to_string(i)));
+++      // Make sure that some strings are long enough to have non-inline binary views
+++      const auto base = std::to_string(i);
+++      std::string value;
+++      for (int64_t j = 0; j < 3 * (i % 10); ++j) {
+++        value += base;
+++      }
+++      RETURN_NOT_OK(builder.Append(value));
++     }
++   }
++-  return builder.Finish(out);
+++  return builder.Finish();
++ }
++ 
++ Status MakeStringTypesRecordBatch(std::shared_ptr<RecordBatch>* out, bool with_nulls,
++@@ -384,22 +393,22 @@ Status MakeStringTypesRecordBatch(std::shared_ptr<RecordBatch>* out, bool with_n
++   ArrayVector arrays;
++   FieldVector fields;
++ 
++-  auto AppendColumn = [&](auto& MakeArray) {
++-    arrays.emplace_back();
++-    RETURN_NOT_OK(MakeArray(length, with_nulls, default_memory_pool(), &arrays.back()));
++-
++-    const auto& type = arrays.back()->type();
++-    fields.push_back(field(type->ToString(), type));
+++  auto AppendColumn = [&](auto builder) {
+++    ARROW_ASSIGN_OR_RAISE(auto array, MakeBinaryArrayWithUniqueValues(
+++                                          std::move(builder), length, with_nulls));
+++    arrays.push_back(array);
+++    fields.push_back(field(array->type()->ToString(), array->type()));
++     return Status::OK();
++   };
++ 
++-  RETURN_NOT_OK(AppendColumn(MakeBinaryArrayWithUniqueValues<StringBuilder>));
++-  RETURN_NOT_OK(AppendColumn(MakeBinaryArrayWithUniqueValues<BinaryBuilder>));
++-  RETURN_NOT_OK(AppendColumn(MakeBinaryArrayWithUniqueValues<LargeStringBuilder>));
++-  RETURN_NOT_OK(AppendColumn(MakeBinaryArrayWithUniqueValues<LargeBinaryBuilder>));
+++  auto pool = default_memory_pool();
+++  RETURN_NOT_OK(AppendColumn(StringBuilder(pool)));
+++  RETURN_NOT_OK(AppendColumn(BinaryBuilder(pool)));
+++  RETURN_NOT_OK(AppendColumn(LargeStringBuilder(pool)));
+++  RETURN_NOT_OK(AppendColumn(LargeBinaryBuilder(pool)));
++   if (with_view_types) {
++-    RETURN_NOT_OK(AppendColumn(MakeBinaryArrayWithUniqueValues<StringViewBuilder>));
++-    RETURN_NOT_OK(AppendColumn(MakeBinaryArrayWithUniqueValues<BinaryViewBuilder>));
+++    RETURN_NOT_OK(AppendColumn(StringViewBuilder(pool)));
+++    RETURN_NOT_OK(AppendColumn(BinaryViewBuilder(pool)));
++   }
++ 
++   *out = RecordBatch::Make(schema(std::move(fields)), length, std::move(arrays));
++diff --git a/cpp/src/arrow/type.h b/cpp/src/arrow/type.h
++index 5b1331a..42e83f6 100644
++--- a/cpp/src/arrow/type.h
+++++ b/cpp/src/arrow/type.h
++@@ -2494,6 +2494,16 @@ constexpr bool HasValidityBitmap(Type::type id) {
++   }
++ }
++ 
+++constexpr bool has_variadic_buffers(Type::type id) {
+++  switch (id) {
+++    case Type::BINARY_VIEW:
+++    case Type::STRING_VIEW:
+++      return true;
+++    default:
+++      return false;
+++  }
+++}
+++
++ ARROW_EXPORT
++ std::string ToString(Type::type id);
++ 
++diff --git a/cpp/src/arrow/util/int_util_overflow.h b/cpp/src/arrow/util/int_util_overflow.h
++index ffe78be..841d503 100644
++--- a/cpp/src/arrow/util/int_util_overflow.h
+++++ b/cpp/src/arrow/util/int_util_overflow.h
++@@ -18,7 +18,9 @@
++ #pragma once
++ 
++ #include <cstdint>
+++#include <initializer_list>
++ #include <limits>
+++#include <optional>
++ #include <type_traits>
++ 
++ #include "arrow/status.h"
++@@ -114,5 +116,36 @@ SignedInt SafeLeftShift(SignedInt u, Shift shift) {
++   return static_cast<SignedInt>(static_cast<UnsignedInt>(u) << shift);
++ }
++ 
+++// Convenience functions over an arbitrary number of arguments
+++template <typename Int>
+++std::optional<Int> AddWithOverflow(std::initializer_list<Int> vs) {
+++  if (vs.size() == 0) {
+++    return {};
+++  }
+++  auto it = vs.begin();
+++  Int v = *it++;
+++  while (it != vs.end()) {
+++    if (ARROW_PREDICT_FALSE(AddWithOverflow(v, *it++, &v))) {
+++      return {};
+++    }
+++  }
+++  return v;
+++}
+++
+++template <typename Int>
+++std::optional<Int> MultiplyWithOverflow(std::initializer_list<Int> vs) {
+++  if (vs.size() == 0) {
+++    return {};
+++  }
+++  auto it = vs.begin();
+++  Int v = *it++;
+++  while (it != vs.end()) {
+++    if (ARROW_PREDICT_FALSE(MultiplyWithOverflow(v, *it++, &v))) {
+++      return {};
+++    }
+++  }
+++  return v;
+++}
+++
++ }  // namespace internal
++ }  // namespace arrow
++diff --git a/cpp/src/arrow/util/int_util_test.cc b/cpp/src/arrow/util/int_util_test.cc
++index 7217c10..cffa4e9 100644
++--- a/cpp/src/arrow/util/int_util_test.cc
+++++ b/cpp/src/arrow/util/int_util_test.cc
++@@ -649,5 +649,23 @@ TYPED_TEST(TestAddWithOverflow, Basics) {
++   this->CheckOk(almost_min, almost_max + T{2}, T{1});
++ }
++ 
+++TEST(AddWithOverflow, Variadic) {
+++  ASSERT_EQ(AddWithOverflow<int>({}), std::nullopt);
+++  ASSERT_EQ(AddWithOverflow({1, 2, 3}), 6);
+++  ASSERT_EQ(AddWithOverflow<int8_t>({1, 2, 125}), std::nullopt);
+++  ASSERT_EQ(AddWithOverflow<int8_t>({125, 2, 1}), std::nullopt);
+++  ASSERT_EQ(AddWithOverflow<int16_t>({1, 2, 125}), 128);
+++  ASSERT_EQ(AddWithOverflow<int16_t>({125, 2, 1}), 128);
+++}
+++
+++TEST(MultiplyWithOverflow, Variadic) {
+++  ASSERT_EQ(MultiplyWithOverflow<int>({}), std::nullopt);
+++  ASSERT_EQ(MultiplyWithOverflow({1, 2, 3, 4}), 24);
+++  ASSERT_EQ(MultiplyWithOverflow<int8_t>({2, 2, 32}), std::nullopt);
+++  ASSERT_EQ(MultiplyWithOverflow<int8_t>({32, 4, 1}), std::nullopt);
+++  ASSERT_EQ(MultiplyWithOverflow<int16_t>({2, 2, 32}), 128);
+++  ASSERT_EQ(MultiplyWithOverflow<int16_t>({32, 4, 1}), 128);
+++}
+++
++ }  // namespace internal
++ }  // namespace arrow
++-- 
++2.45.4
 +
-     if (internal::HasValidityBitmap(type_id, metadata_version_)) {
-       // Extract null_bitmap which is common to all arrays except for unions
-       // and nulls.
-@@ -300,6 +320,7 @@ class ArrayLoader {
-       }
-       buffer_index_++;
-     }
-+
-     return Status::OK();
-   }
- 
-@@ -398,14 +419,9 @@ class ArrayLoader {
-   Status Visit(const BinaryViewType& type) {
-     out_->buffers.resize(2);
- 
--    RETURN_NOT_OK(LoadCommon(type.id()));
--    RETURN_NOT_OK(GetBuffer(buffer_index_++, &out_->buffers[1]));
--
--    ARROW_ASSIGN_OR_RAISE(auto data_buffer_count,
--                          GetVariadicCount(variadic_count_index_++));
--    out_->buffers.resize(data_buffer_count + 2);
--    for (int64_t i = 0; i < data_buffer_count; ++i) {
--      RETURN_NOT_OK(GetBuffer(buffer_index_++, &out_->buffers[i + 2]));
-+    RETURN_NOT_OK(LoadCommon(type.id()));  // also initializes variadic buffers
-+    for (int64_t i = 1; i < static_cast<int64_t>(out_->buffers.size()); ++i) {
-+      RETURN_NOT_OK(GetBuffer(buffer_index_++, &out_->buffers[i]));
-     }
-     return Status::OK();
-   }
-@@ -503,6 +519,7 @@ class ArrayLoader {
-   const MetadataVersion metadata_version_;
-   io::RandomAccessFile* file_;
-   int64_t file_offset_;
-+  std::optional<int64_t> file_length_;
-   int max_recursion_depth_;
-   int buffer_index_ = 0;
-   int field_index_ = 0;
-@@ -1173,8 +1190,19 @@ namespace {
- 
- // Common functions used in both the random-access file reader and the
- // asynchronous generator
--inline FileBlock FileBlockFromFlatbuffer(const flatbuf::Block* block) {
--  return FileBlock{block->offset(), block->metaDataLength(), block->bodyLength()};
-+Result<FileBlock> FileBlockFromFlatbuffer(const flatbuf::Block* fb_block,
-+                                          int64_t max_offset) {
-+  auto block =
-+      FileBlock{fb_block->offset(), fb_block->metaDataLength(), fb_block->bodyLength()};
-+  if (block.metadata_length < 0 || block.body_length < 0 || block.offset < 0) {
-+    return Status::IOError("Invalid Block in IPC file footer");
-+  }
-+  auto block_end =
-+      AddWithOverflow<int64_t>({block.offset, block.metadata_length, block.body_length});
-+  if (!block_end.has_value() || block_end > max_offset) {
-+    return Status::IOError("Invalid Block in IPC file footer");
-+  }
-+  return block;
- }
- 
- Status CheckAligned(const FileBlock& block) {
-@@ -1362,8 +1390,8 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
-                                 read_options, file, schema, &inclusion_mask);
-       };
-     }
--    ARROW_ASSIGN_OR_RAISE(auto message,
--                          ReadMessageFromBlock(GetRecordBatchBlock(i), fields_loader));
-+    ARROW_ASSIGN_OR_RAISE(auto block, GetRecordBatchBlock(i));
-+    ARROW_ASSIGN_OR_RAISE(auto message, ReadMessageFromBlock(block, fields_loader));
- 
-     CHECK_HAS_BODY(*message);
-     ARROW_ASSIGN_OR_RAISE(auto reader, Buffer::GetReader(message->body()));
-@@ -1379,8 +1407,8 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
-   Result<int64_t> CountRows() override {
-     int64_t total = 0;
-     for (int i = 0; i < num_record_batches(); i++) {
--      ARROW_ASSIGN_OR_RAISE(auto outer_message,
--                            ReadMessageFromBlock(GetRecordBatchBlock(i)));
-+      ARROW_ASSIGN_OR_RAISE(auto block, GetRecordBatchBlock(i));
-+      ARROW_ASSIGN_OR_RAISE(auto outer_message, ReadMessageFromBlock(block));
-       auto metadata = outer_message->metadata();
-       const flatbuf::Message* message = nullptr;
-       RETURN_NOT_OK(
-@@ -1494,13 +1522,13 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
- 
-   Status DoPreBufferMetadata(const std::vector<int>& indices) {
-     RETURN_NOT_OK(CacheMetadata(indices));
--    EnsureDictionaryReadStarted();
-+    RETURN_NOT_OK(EnsureDictionaryReadStarted());
-     Future<> all_metadata_ready = WaitForMetadatas(indices);
-     for (int index : indices) {
-       Future<std::shared_ptr<Message>> metadata_loaded =
-           all_metadata_ready.Then([this, index]() -> Result<std::shared_ptr<Message>> {
-             stats_.num_messages.fetch_add(1, std::memory_order_relaxed);
--            FileBlock block = GetRecordBatchBlock(index);
-+            ARROW_ASSIGN_OR_RAISE(FileBlock block, GetRecordBatchBlock(index));
-             ARROW_ASSIGN_OR_RAISE(
-                 std::shared_ptr<Buffer> metadata,
-                 metadata_cache_->Read({block.offset, block.metadata_length}));
-@@ -1549,12 +1577,12 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
-     }
-   };
- 
--  FileBlock GetRecordBatchBlock(int i) const {
--    return FileBlockFromFlatbuffer(footer_->recordBatches()->Get(i));
-+  Result<FileBlock> GetRecordBatchBlock(int i) const {
-+    return FileBlockFromFlatbuffer(footer_->recordBatches()->Get(i), footer_offset_);
-   }
- 
--  FileBlock GetDictionaryBlock(int i) const {
--    return FileBlockFromFlatbuffer(footer_->dictionaries()->Get(i));
-+  Result<FileBlock> GetDictionaryBlock(int i) const {
-+    return FileBlockFromFlatbuffer(footer_->dictionaries()->Get(i), footer_offset_);
-   }
- 
-   Result<std::unique_ptr<Message>> ReadMessageFromBlock(
-@@ -1567,16 +1595,26 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
- 
-   Status ReadDictionaries() {
-     // Read all the dictionaries
-+    std::vector<std::shared_ptr<Message>> messages(num_dictionaries());
-+    for (int i = 0; i < num_dictionaries(); ++i) {
-+      ARROW_ASSIGN_OR_RAISE(FileBlock block, GetDictionaryBlock(i));
-+      ARROW_ASSIGN_OR_RAISE(messages[i], ReadMessageFromBlock(block));
-+    }
-+    return ReadDictionaries(messages);
-+  }
-+
-+  Status ReadDictionaries(
-+      const std::vector<std::shared_ptr<Message>>& dictionary_messages) {
-+    DCHECK_EQ(dictionary_messages.size(), static_cast<size_t>(num_dictionaries()));
-     IpcReadContext context(&dictionary_memo_, options_, swap_endian_);
-     for (int i = 0; i < num_dictionaries(); ++i) {
--      ARROW_ASSIGN_OR_RAISE(auto message, ReadMessageFromBlock(GetDictionaryBlock(i)));
--      RETURN_NOT_OK(ReadOneDictionary(message.get(), context));
--      stats_.num_dictionary_batches.fetch_add(1, std::memory_order_relaxed);
-+      RETURN_NOT_OK(ReadOneDictionary(i, dictionary_messages[i].get(), context));
-     }
-     return Status::OK();
-   }
- 
--  Status ReadOneDictionary(Message* message, const IpcReadContext& context) {
-+  Status ReadOneDictionary(int dict_index, Message* message,
-+                           const IpcReadContext& context) {
-     CHECK_HAS_BODY(*message);
-     ARROW_ASSIGN_OR_RAISE(auto reader, Buffer::GetReader(message->body()));
-     DictionaryKind kind;
-@@ -1586,44 +1624,48 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
-     } else if (kind == DictionaryKind::Delta) {
-       stats_.num_dictionary_deltas.fetch_add(1, std::memory_order_relaxed);
-     }
-+    stats_.num_dictionary_batches.fetch_add(1, std::memory_order_relaxed);
-     return Status::OK();
-   }
- 
--  void AddDictionaryRanges(std::vector<io::ReadRange>* ranges) const {
-+  Status AddDictionaryRanges(std::vector<io::ReadRange>* ranges) const {
-     // Adds all dictionaries to the range cache
-     for (int i = 0; i < num_dictionaries(); ++i) {
--      FileBlock block = GetDictionaryBlock(i);
-+      ARROW_ASSIGN_OR_RAISE(FileBlock block, GetDictionaryBlock(i));
-       ranges->push_back({block.offset, block.metadata_length + block.body_length});
-     }
-+    return Status::OK();
-   }
- 
--  void AddMetadataRanges(const std::vector<int>& indices,
--                         std::vector<io::ReadRange>* ranges) {
-+  Status AddMetadataRanges(const std::vector<int>& indices,
-+                           std::vector<io::ReadRange>* ranges) {
-     for (int index : indices) {
--      FileBlock block = GetRecordBatchBlock(static_cast<int>(index));
-+      ARROW_ASSIGN_OR_RAISE(FileBlock block, GetRecordBatchBlock(index));
-       ranges->push_back({block.offset, block.metadata_length});
-     }
-+    return Status::OK();
-   }
- 
-   Status CacheMetadata(const std::vector<int>& indices) {
-     std::vector<io::ReadRange> ranges;
-     if (!read_dictionaries_) {
--      AddDictionaryRanges(&ranges);
-+      RETURN_NOT_OK(AddDictionaryRanges(&ranges));
-     }
--    AddMetadataRanges(indices, &ranges);
-+    RETURN_NOT_OK(AddMetadataRanges(indices, &ranges));
-     return metadata_cache_->Cache(std::move(ranges));
-   }
- 
--  void EnsureDictionaryReadStarted() {
-+  Status EnsureDictionaryReadStarted() {
-     if (!dictionary_load_finished_.is_valid()) {
-       read_dictionaries_ = true;
-       std::vector<io::ReadRange> ranges;
--      AddDictionaryRanges(&ranges);
-+      RETURN_NOT_OK(AddDictionaryRanges(&ranges));
-       dictionary_load_finished_ =
-           metadata_cache_->WaitFor(std::move(ranges)).Then([this] {
-             return ReadDictionaries();
-           });
-     }
-+    return Status::OK();
-   }
- 
-   Status WaitForDictionaryReadFinished() {
-@@ -1641,7 +1683,7 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
- 
-   Future<> WaitForMetadatas(const std::vector<int>& indices) {
-     std::vector<io::ReadRange> ranges;
--    AddMetadataRanges(indices, &ranges);
-+    RETURN_NOT_OK(AddMetadataRanges(indices, &ranges));
-     return metadata_cache_->WaitFor(std::move(ranges));
-   }
- 
-@@ -1685,12 +1727,13 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
-                                  const flatbuf::RecordBatch* batch,
-                                  IpcReadContext context, io::RandomAccessFile* file,
-                                  std::shared_ptr<io::RandomAccessFile> owned_file,
--                                 int64_t block_data_offset)
-+                                 int64_t block_data_offset, int64_t block_data_length)
-         : schema(std::move(sch)),
-           context(std::move(context)),
-           file(file),
-           owned_file(std::move(owned_file)),
--          loader(batch, context.metadata_version, context.options, block_data_offset),
-+          loader(batch, context.metadata_version, context.options, block_data_offset,
-+                 block_data_length),
-           columns(schema->num_fields()),
-           cache(file, file->io_context(), io::CacheOptions::LazyDefaults()),
-           length(batch->length()) {}
-@@ -1789,14 +1832,15 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
-     return dictionary_load_finished_.Then([message_fut] { return message_fut; })
-         .Then([this, index](const std::shared_ptr<Message>& message_obj)
-                   -> Future<std::shared_ptr<RecordBatch>> {
--          FileBlock block = GetRecordBatchBlock(index);
-+          ARROW_ASSIGN_OR_RAISE(auto block, GetRecordBatchBlock(index));
-           ARROW_ASSIGN_OR_RAISE(auto message, GetFlatbufMessage(message_obj));
-           ARROW_ASSIGN_OR_RAISE(auto batch, GetBatchFromMessage(message));
-           ARROW_ASSIGN_OR_RAISE(auto context, GetIpcReadContext(message, batch));
- 
-           auto read_context = std::make_shared<CachedRecordBatchReadContext>(
-               schema_, batch, std::move(context), file_, owned_file_,
--              block.offset + static_cast<int64_t>(block.metadata_length));
-+              block.offset + static_cast<int64_t>(block.metadata_length),
-+              block.body_length);
-           RETURN_NOT_OK(read_context->CalculateLoadRequest());
-           return read_context->ReadAsync().Then(
-               [read_context] { return read_context->CreateRecordBatch(); });
-@@ -1915,25 +1959,31 @@ Future<WholeIpcFileRecordBatchGenerator::Item>
- WholeIpcFileRecordBatchGenerator::operator()() {
-   auto state = state_;
-   if (!read_dictionaries_.is_valid()) {
--    std::vector<Future<std::shared_ptr<Message>>> messages(state->num_dictionaries());
--    for (int i = 0; i < state->num_dictionaries(); i++) {
--      auto block = FileBlockFromFlatbuffer(state->footer_->dictionaries()->Get(i));
--      messages[i] = ReadBlock(block);
--    }
--    auto read_messages = All(std::move(messages));
--    if (executor_) read_messages = executor_->Transfer(read_messages);
--    read_dictionaries_ = read_messages.Then(
--        [=](const std::vector<Result<std::shared_ptr<Message>>>& maybe_messages)
--            -> Status {
--          ARROW_ASSIGN_OR_RAISE(auto messages,
--                                arrow::internal::UnwrapOrRaise(maybe_messages));
--          return ReadDictionaries(state.get(), std::move(messages));
--        });
-+    if (state->dictionary_load_finished_.is_valid()) {
-+      // PreBufferMetadata has started reading dictionaries in the background
-+      read_dictionaries_ = state->dictionary_load_finished_;
-+    } else {
-+      // Start reading dictionaries
-+      std::vector<Future<std::shared_ptr<Message>>> messages(state->num_dictionaries());
-+      for (int i = 0; i < state->num_dictionaries(); i++) {
-+        ARROW_ASSIGN_OR_RAISE(auto block, state->GetDictionaryBlock(i));
-+        messages[i] = ReadBlock(block);
-+      }
-+      auto read_messages = All(std::move(messages));
-+      if (executor_) read_messages = executor_->Transfer(read_messages);
-+      read_dictionaries_ = read_messages.Then(
-+          [=](const std::vector<Result<std::shared_ptr<Message>>>& maybe_messages)
-+              -> Status {
-+            ARROW_ASSIGN_OR_RAISE(auto messages,
-+                                  arrow::internal::UnwrapOrRaise(maybe_messages));
-+            return state->ReadDictionaries(messages);
-+          });
-+    }
-   }
-   if (index_ >= state_->num_record_batches()) {
-     return Future<Item>::MakeFinished(IterationTraits<Item>::End());
-   }
--  auto block = FileBlockFromFlatbuffer(state->footer_->recordBatches()->Get(index_++));
-+  ARROW_ASSIGN_OR_RAISE(auto block, state->GetRecordBatchBlock(index_++));
-   auto read_message = ReadBlock(block);
-   auto read_messages = read_dictionaries_.Then([read_message]() { return read_message; });
-   // Force transfer. This may be wasteful in some cases, but ensures we get off the
-@@ -1969,16 +2019,6 @@ Future<std::shared_ptr<Message>> WholeIpcFileRecordBatchGenerator::ReadBlock(
-   }
- }
- 
--Status WholeIpcFileRecordBatchGenerator::ReadDictionaries(
--    RecordBatchFileReaderImpl* state,
--    std::vector<std::shared_ptr<Message>> dictionary_messages) {
--  IpcReadContext context(&state->dictionary_memo_, state->options_, state->swap_endian_);
--  for (const auto& message : dictionary_messages) {
--    RETURN_NOT_OK(state->ReadOneDictionary(message.get(), context));
--  }
--  return Status::OK();
--}
--
- Result<std::shared_ptr<RecordBatch>> WholeIpcFileRecordBatchGenerator::ReadRecordBatch(
-     RecordBatchFileReaderImpl* state, Message* message) {
-   CHECK_HAS_BODY(*message);
-@@ -2630,6 +2670,14 @@ Status ValidateFuzzBatch(const RecordBatch& batch) {
-   return st;
- }
- 
-+Status ValidateFuzzBatch(const RecordBatchWithMetadata& batch) {
-+  if (batch.batch) {
-+    RETURN_NOT_OK(ValidateFuzzBatch(*batch.batch));
-+  }
-+  // XXX do something with custom metadata?
-+  return Status::OK();
-+}
-+
- IpcReadOptions FuzzingOptions() {
-   IpcReadOptions options;
-   options.memory_pool = ::arrow::internal::fuzzing_memory_pool();
-@@ -2648,12 +2696,12 @@ Status FuzzIpcStream(const uint8_t* data, int64_t size) {
-   Status st;
- 
-   while (true) {
--    std::shared_ptr<arrow::RecordBatch> batch;
--    RETURN_NOT_OK(batch_reader->ReadNext(&batch));
--    if (batch == nullptr) {
-+    ARROW_ASSIGN_OR_RAISE(auto batch, batch_reader->ReadNext());
-+    if (!batch.batch && !batch.custom_metadata) {
-+      // EOS
-       break;
-     }
--    st &= ValidateFuzzBatch(*batch);
-+    st &= ValidateFuzzBatch(batch);
-   }
- 
-   return st;
-@@ -2661,20 +2709,36 @@ Status FuzzIpcStream(const uint8_t* data, int64_t size) {
- 
- Status FuzzIpcFile(const uint8_t* data, int64_t size) {
-   auto buffer = std::make_shared<Buffer>(data, size);
--  io::BufferReader buffer_reader(buffer);
- 
--  std::shared_ptr<RecordBatchFileReader> batch_reader;
--  ARROW_ASSIGN_OR_RAISE(batch_reader,
--                        RecordBatchFileReader::Open(&buffer_reader, FuzzingOptions()));
--  Status st;
-+  Status final_status;
- 
--  const int n_batches = batch_reader->num_record_batches();
--  for (int i = 0; i < n_batches; ++i) {
--    ARROW_ASSIGN_OR_RAISE(auto batch, batch_reader->ReadRecordBatch(i));
--    st &= ValidateFuzzBatch(*batch);
-+  auto do_read = [&](bool pre_buffer) {
-+    io::BufferReader buffer_reader(buffer);
-+    ARROW_ASSIGN_OR_RAISE(auto batch_reader,
-+                          RecordBatchFileReader::Open(&buffer_reader, FuzzingOptions()));
-+    if (pre_buffer) {
-+      // Pre-buffer all record batches
-+      RETURN_NOT_OK(batch_reader->PreBufferMetadata(/*indices=*/{}));
-+    }
-+
-+    const int n_batches = batch_reader->num_record_batches();
-+    for (int i = 0; i < n_batches; ++i) {
-+      RecordBatchWithMetadata batch;
-+      auto st = batch_reader->ReadRecordBatchWithCustomMetadata(i).Value(&batch);
-+      final_status &= st;
-+      if (!st.ok()) {
-+        continue;
-+      }
-+      final_status &= ValidateFuzzBatch(batch);
-+    }
-+    return Status::OK();
-+  };
-+
-+  for (const bool pre_buffer : {false, true}) {
-+    final_status &= do_read(pre_buffer);
-   }
- 
--  return st;
-+  return final_status;
- }
- 
- Status FuzzIpcTensorStream(const uint8_t* data, int64_t size) {
-diff --git a/cpp/src/arrow/ipc/test_common.cc b/cpp/src/arrow/ipc/test_common.cc
-index 02e6b816c0b..ceca6d9e434 100644
---- a/cpp/src/arrow/ipc/test_common.cc
-+++ b/cpp/src/arrow/ipc/test_common.cc
-@@ -16,6 +16,7 @@
- // under the License.
- 
- #include <algorithm>
-+#include <concepts>
- #include <cstdint>
- #include <functional>
- #include <memory>
-@@ -368,19 +369,27 @@ Status MakeRandomStringArray(int64_t length, bool include_nulls, MemoryPool* poo
-   return builder.Finish(out);
- }
- 
--template <class BuilderType>
--static Status MakeBinaryArrayWithUniqueValues(int64_t length, bool include_nulls,
--                                              MemoryPool* pool,
--                                              std::shared_ptr<Array>* out) {
--  BuilderType builder(pool);
-+template <std::derived_from<ArrayBuilder> BuilderType>
-+static Result<std::shared_ptr<Array>> MakeBinaryArrayWithUniqueValues(
-+    BuilderType builder, int64_t length, bool include_nulls) {
-+  if constexpr (std::is_base_of_v<BinaryViewBuilder, BuilderType>) {
-+    // Try to emit several variadic buffers by choosing a small block size.
-+    builder.SetBlockSize(512);
-+  }
-   for (int64_t i = 0; i < length; ++i) {
-     if (include_nulls && (i % 7 == 0)) {
-       RETURN_NOT_OK(builder.AppendNull());
-     } else {
--      RETURN_NOT_OK(builder.Append(std::to_string(i)));
-+      // Make sure that some strings are long enough to have non-inline binary views
-+      const auto base = std::to_string(i);
-+      std::string value;
-+      for (int64_t j = 0; j < 3 * (i % 10); ++j) {
-+        value += base;
-+      }
-+      RETURN_NOT_OK(builder.Append(value));
-     }
-   }
--  return builder.Finish(out);
-+  return builder.Finish();
- }
- 
- Status MakeStringTypesRecordBatch(std::shared_ptr<RecordBatch>* out, bool with_nulls,
-@@ -390,22 +399,22 @@ Status MakeStringTypesRecordBatch(std::shared_ptr<RecordBatch>* out, bool with_n
-   ArrayVector arrays;
-   FieldVector fields;
- 
--  auto AppendColumn = [&](auto& MakeArray) {
--    arrays.emplace_back();
--    RETURN_NOT_OK(MakeArray(length, with_nulls, default_memory_pool(), &arrays.back()));
--
--    const auto& type = arrays.back()->type();
--    fields.push_back(field(type->ToString(), type));
-+  auto AppendColumn = [&](auto builder) {
-+    ARROW_ASSIGN_OR_RAISE(auto array, MakeBinaryArrayWithUniqueValues(
-+                                          std::move(builder), length, with_nulls));
-+    arrays.push_back(array);
-+    fields.push_back(field(array->type()->ToString(), array->type()));
-     return Status::OK();
-   };
- 
--  RETURN_NOT_OK(AppendColumn(MakeBinaryArrayWithUniqueValues<StringBuilder>));
--  RETURN_NOT_OK(AppendColumn(MakeBinaryArrayWithUniqueValues<BinaryBuilder>));
--  RETURN_NOT_OK(AppendColumn(MakeBinaryArrayWithUniqueValues<LargeStringBuilder>));
--  RETURN_NOT_OK(AppendColumn(MakeBinaryArrayWithUniqueValues<LargeBinaryBuilder>));
-+  auto pool = default_memory_pool();
-+  RETURN_NOT_OK(AppendColumn(StringBuilder(pool)));
-+  RETURN_NOT_OK(AppendColumn(BinaryBuilder(pool)));
-+  RETURN_NOT_OK(AppendColumn(LargeStringBuilder(pool)));
-+  RETURN_NOT_OK(AppendColumn(LargeBinaryBuilder(pool)));
-   if (with_view_types) {
--    RETURN_NOT_OK(AppendColumn(MakeBinaryArrayWithUniqueValues<StringViewBuilder>));
--    RETURN_NOT_OK(AppendColumn(MakeBinaryArrayWithUniqueValues<BinaryViewBuilder>));
-+    RETURN_NOT_OK(AppendColumn(StringViewBuilder(pool)));
-+    RETURN_NOT_OK(AppendColumn(BinaryViewBuilder(pool)));
-   }
- 
-   *out = RecordBatch::Make(schema(std::move(fields)), length, std::move(arrays));
-diff --git a/cpp/src/arrow/type.h b/cpp/src/arrow/type.h
-index f68d2dcb619..e3582056ead 100644
---- a/cpp/src/arrow/type.h
-+++ b/cpp/src/arrow/type.h
-@@ -2575,6 +2575,16 @@ constexpr bool may_have_validity_bitmap(Type::type id) {
-   }
- }
- 
-+constexpr bool has_variadic_buffers(Type::type id) {
-+  switch (id) {
-+    case Type::BINARY_VIEW:
-+    case Type::STRING_VIEW:
-+      return true;
-+    default:
-+      return false;
-+  }
-+}
-+
- ARROW_DEPRECATED("Deprecated in 17.0.0. Use may_have_validity_bitmap() instead.")
- constexpr bool HasValidityBitmap(Type::type id) { return may_have_validity_bitmap(id); }
- 
-diff --git a/cpp/src/arrow/util/int_util_overflow.h b/cpp/src/arrow/util/int_util_overflow.h
-index 93066fecafa..69714a935a4 100644
---- a/cpp/src/arrow/util/int_util_overflow.h
-+++ b/cpp/src/arrow/util/int_util_overflow.h
-@@ -18,7 +18,9 @@
- #pragma once
- 
- #include <cstdint>
-+#include <initializer_list>
- #include <limits>
-+#include <optional>
- #include <type_traits>
- 
- #include "arrow/status.h"
-@@ -162,6 +164,37 @@ NON_GENERIC_OPS_WITH_OVERFLOW(DivideWithOverflow)
- #undef NON_GENERIC_OPS_WITH_OVERFLOW
- #undef NON_GENERIC_OP_WITH_OVERFLOW
- 
-+// Convenience functions over an arbitrary number of arguments
-+template <typename Int>
-+std::optional<Int> AddWithOverflow(std::initializer_list<Int> vs) {
-+  if (vs.size() == 0) {
-+    return {};
-+  }
-+  auto it = vs.begin();
-+  Int v = *it++;
-+  while (it != vs.end()) {
-+    if (ARROW_PREDICT_FALSE(AddWithOverflowGeneric(v, *it++, &v))) {
-+      return {};
-+    }
-+  }
-+  return v;
-+}
-+
-+template <typename Int>
-+std::optional<Int> MultiplyWithOverflow(std::initializer_list<Int> vs) {
-+  if (vs.size() == 0) {
-+    return {};
-+  }
-+  auto it = vs.begin();
-+  Int v = *it++;
-+  while (it != vs.end()) {
-+    if (ARROW_PREDICT_FALSE(MultiplyWithOverflowGeneric(v, *it++, &v))) {
-+      return {};
-+    }
-+  }
-+  return v;
-+}
-+
- // Define function NegateWithOverflow with the signature `bool(T u, T* out)`
- // where T is a signed integer type.  On overflow, these functions return true.
- // Otherwise, false is returned and `out` is updated with the result of the
-diff --git a/cpp/src/arrow/util/int_util_test.cc b/cpp/src/arrow/util/int_util_test.cc
-index 7217c1097e4..cffa4e9d15e 100644
---- a/cpp/src/arrow/util/int_util_test.cc
-+++ b/cpp/src/arrow/util/int_util_test.cc
-@@ -649,5 +649,23 @@ TYPED_TEST(TestAddWithOverflow, Basics) {
-   this->CheckOk(almost_min, almost_max + T{2}, T{1});
- }
- 
-+TEST(AddWithOverflow, Variadic) {
-+  ASSERT_EQ(AddWithOverflow<int>({}), std::nullopt);
-+  ASSERT_EQ(AddWithOverflow({1, 2, 3}), 6);
-+  ASSERT_EQ(AddWithOverflow<int8_t>({1, 2, 125}), std::nullopt);
-+  ASSERT_EQ(AddWithOverflow<int8_t>({125, 2, 1}), std::nullopt);
-+  ASSERT_EQ(AddWithOverflow<int16_t>({1, 2, 125}), 128);
-+  ASSERT_EQ(AddWithOverflow<int16_t>({125, 2, 1}), 128);
-+}
-+
-+TEST(MultiplyWithOverflow, Variadic) {
-+  ASSERT_EQ(MultiplyWithOverflow<int>({}), std::nullopt);
-+  ASSERT_EQ(MultiplyWithOverflow({1, 2, 3, 4}), 24);
-+  ASSERT_EQ(MultiplyWithOverflow<int8_t>({2, 2, 32}), std::nullopt);
-+  ASSERT_EQ(MultiplyWithOverflow<int8_t>({32, 4, 1}), std::nullopt);
-+  ASSERT_EQ(MultiplyWithOverflow<int16_t>({2, 2, 32}), 128);
-+  ASSERT_EQ(MultiplyWithOverflow<int16_t>({32, 4, 1}), 128);
-+}
-+
- }  // namespace internal
- }  // namespace arrow

Verdict

CHANGES REQUESTED — Please address the issues flagged above.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

3.0-dev PRs Destined for AzureLinux 3.0 Packaging security

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants