[MEDIUM] Patch libarrow for CVE-2026-25087#16145
Open
durgajagadeesh wants to merge 3 commits intomicrosoft:3.0-devfrom
Open
[MEDIUM] Patch libarrow for CVE-2026-25087#16145durgajagadeesh wants to merge 3 commits intomicrosoft:3.0-devfrom
durgajagadeesh wants to merge 3 commits intomicrosoft:3.0-devfrom
Conversation
Contributor
Contributor
Author
|
Buddy build has License warning. I am working on to resolve this issue.. |
Contributor
Contributor
Author
|
|
0c425be to
43a2202
Compare
Contributor
Contributor
🔒 CVE Patch Review: CVE-2026-25087PR #16145 — [MEDIUM] Patch libarrow for CVE-2026-25087 Spec File Validation
Build Verification
🤖 AI Build Log Analysis
🧪 Test Log AnalysisNo test log found (package may not have a %check section). Patch Analysis
Raw diff (upstream vs PR)--- upstream
+++ pr
@@ -1,771 +1,848 @@
-From a4ae90929d6e959e9a1fb29f3907bbbf2799472e Mon Sep 17 00:00:00 2001
-From: Antoine Pitrou <antoine@python.org>
-Date: Wed, 21 Jan 2026 17:54:00 +0100
-Subject: [PATCH] GH-48924: [C++][CI] Fuzz IPC file metadata pre-buffering
-
----
- ci/scripts/cpp_test.sh | 9 +
- cpp/src/arrow/ipc/read_write_test.cc | 75 +++++----
- cpp/src/arrow/ipc/reader.cc | 222 ++++++++++++++++---------
- cpp/src/arrow/ipc/test_common.cc | 47 +++---
- cpp/src/arrow/type.h | 10 ++
- cpp/src/arrow/util/int_util_overflow.h | 33 ++++
- cpp/src/arrow/util/int_util_test.cc | 18 ++
- 7 files changed, 286 insertions(+), 128 deletions(-)
-
-diff --git a/ci/scripts/cpp_test.sh b/ci/scripts/cpp_test.sh
-index 0ad59bc308f..5d6d5e099ab 100755
---- a/ci/scripts/cpp_test.sh
-+++ b/ci/scripts/cpp_test.sh
-@@ -182,6 +182,15 @@ if [ "${ARROW_FUZZING}" == "ON" ]; then
- # Some fuzz regression files may trigger huge memory allocations,
- # let the allocator return null instead of aborting.
- export ASAN_OPTIONS="$ASAN_OPTIONS allocator_may_return_null=1"
-+ export ARROW_FUZZING_VERBOSITY=1
-+ # Run golden IPC integration files: these should ideally load without errors,
-+ # though some very old ones carry invalid data (such as decimal values
-+ # larger than their advertised precision).
-+ # shellcheck disable=SC2046
-+ "${binary_output_dir}/arrow-ipc-stream-fuzz" $(find "${ARROW_TEST_DATA}"/arrow-ipc-stream/integration -name "*.stream")
-+ # shellcheck disable=SC2046
-+ "${binary_output_dir}/arrow-ipc-file-fuzz" $(find "${ARROW_TEST_DATA}"/arrow-ipc-stream/integration -name "*.arrow_file")
-+ # Run known crash files
- "${binary_output_dir}/arrow-ipc-stream-fuzz" "${ARROW_TEST_DATA}"/arrow-ipc-stream/crash-*
- "${binary_output_dir}/arrow-ipc-stream-fuzz" "${ARROW_TEST_DATA}"/arrow-ipc-stream/*-testcase-*
- "${binary_output_dir}/arrow-ipc-file-fuzz" "${ARROW_TEST_DATA}"/arrow-ipc-file/*-testcase-*
-diff --git a/cpp/src/arrow/ipc/read_write_test.cc b/cpp/src/arrow/ipc/read_write_test.cc
-index 315d8bd07d9..9f7df541bd7 100644
---- a/cpp/src/arrow/ipc/read_write_test.cc
-+++ b/cpp/src/arrow/ipc/read_write_test.cc
-@@ -1252,40 +1252,55 @@ struct FileGeneratorWriterHelper : public FileWriterHelper {
- Status ReadBatches(const IpcReadOptions& options, RecordBatchVector* out_batches,
- ReadStats* out_stats = nullptr,
- MetadataVector* out_metadata_list = nullptr) override {
-- std::shared_ptr<io::RandomAccessFile> buf_reader;
-- if (kCoalesce) {
-- // Use a non-zero-copy enabled BufferReader so we can test paths properly
-- buf_reader = std::make_shared<NoZeroCopyBufferReader>(buffer_);
-- } else {
-- buf_reader = std::make_shared<io::BufferReader>(buffer_);
-- }
-- AsyncGenerator<std::shared_ptr<RecordBatch>> generator;
-+ // The generator doesn't track stats.
-+ EXPECT_EQ(nullptr, out_stats);
-
-- {
-- auto fut = RecordBatchFileReader::OpenAsync(buf_reader, footer_offset_, options);
-- // Do NOT assert OK since some tests check whether this fails properly
-- EXPECT_FINISHES(fut);
-- ARROW_ASSIGN_OR_RAISE(auto reader, fut.result());
-- EXPECT_EQ(num_batches_written_, reader->num_record_batches());
-- // Generator will keep reader alive internally
-- ARROW_ASSIGN_OR_RAISE(generator, reader->GetRecordBatchGenerator(kCoalesce));
-- }
-+ auto read_batches = [&](bool pre_buffer) -> Result<RecordBatchVector> {
-+ std::shared_ptr<io::RandomAccessFile> buf_reader;
-+ if (kCoalesce) {
-+ // Use a non-zero-copy enabled BufferReader so we can test paths properly
-+ buf_reader = std::make_shared<NoZeroCopyBufferReader>(buffer_);
-+ } else {
-+ buf_reader = std::make_shared<io::BufferReader>(buffer_);
-+ }
-+ AsyncGenerator<std::shared_ptr<RecordBatch>> generator;
+diff --git a/SPECS/libarrow/CVE-2026-25087.patch b/SPECS/libarrow/CVE-2026-25087.patch
+new file mode 100644
+index 00000000000..6dc36806b10
+--- /dev/null
++++ b/SPECS/libarrow/CVE-2026-25087.patch
+@@ -0,0 +1,842 @@
++From a4ae90929d6e959e9a1fb29f3907bbbf2799472e Mon Sep 17 00:00:00 2001
++From: Antoine Pitrou <antoine@python.org>
++Date: Wed, 21 Jan 2026 17:54:00 +0100
++Subject: [PATCH] GH-48924: [C++][CI] Fuzz IPC file metadata pre-buffering
+
-+ {
-+ auto fut = RecordBatchFileReader::OpenAsync(buf_reader, footer_offset_, options);
-+ ARROW_ASSIGN_OR_RAISE(auto reader, fut.result());
-+ EXPECT_EQ(num_batches_written_, reader->num_record_batches());
-+ if (pre_buffer) {
-+ RETURN_NOT_OK(reader->PreBufferMetadata(/*indices=*/{}));
-+ }
-+ // Generator will keep reader alive internally
-+ ARROW_ASSIGN_OR_RAISE(generator, reader->GetRecordBatchGenerator(kCoalesce));
-+ }
-
-- // Generator is async-reentrant
-- std::vector<Future<std::shared_ptr<RecordBatch>>> futures;
-+ // Generator is async-reentrant
-+ std::vector<Future<std::shared_ptr<RecordBatch>>> futures;
-+ for (int i = 0; i < num_batches_written_; ++i) {
-+ futures.push_back(generator());
-+ }
-+ auto fut = generator();
-+ ARROW_ASSIGN_OR_RAISE(auto final_batch, fut.result());
-+ EXPECT_EQ(nullptr, final_batch);
++Upstream Patch reference: https://patch-diff.githubusercontent.com/raw/apache/arrow/pull/48925.patch
+
-+ RecordBatchVector batches;
-+ for (auto& future : futures) {
-+ ARROW_ASSIGN_OR_RAISE(auto batch, future.result());
-+ EXPECT_NE(nullptr, batch);
-+ batches.push_back(batch);
-+ }
-+ return batches;
-+ };
++---
++ ci/scripts/cpp_test.sh | 12 ++
++ cpp/src/arrow/ipc/read_write_test.cc | 75 +++++---
++ cpp/src/arrow/ipc/reader.cc | 252 +++++++++++++++++--------
++ cpp/src/arrow/ipc/test_common.cc | 47 +++--
++ cpp/src/arrow/type.h | 10 +
++ cpp/src/arrow/util/int_util_overflow.h | 33 ++++
++ cpp/src/arrow/util/int_util_test.cc | 18 ++
++ 7 files changed, 316 insertions(+), 131 deletions(-)
+
-+ ARROW_ASSIGN_OR_RAISE(*out_batches, read_batches(/*pre_buffer=*/false));
-+ // Also read with pre-buffered metadata, and check the results are equal
-+ ARROW_ASSIGN_OR_RAISE(auto batches_pre_buffered, read_batches(/*pre_buffer=*/true));
- for (int i = 0; i < num_batches_written_; ++i) {
-- futures.push_back(generator());
-- }
-- auto fut = generator();
-- EXPECT_FINISHES_OK_AND_EQ(nullptr, fut);
-- for (auto& future : futures) {
-- EXPECT_FINISHES_OK_AND_ASSIGN(auto batch, future);
-- out_batches->push_back(batch);
-+ AssertBatchesEqual(*batches_pre_buffered[i], *(*out_batches)[i],
-+ /*check_metadata=*/true);
- }
--
-- // The generator doesn't track stats.
-- EXPECT_EQ(nullptr, out_stats);
--
- return Status::OK();
- }
- };
-diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc
-index 8e125fc5ede..f1571f76c24 100644
---- a/cpp/src/arrow/ipc/reader.cc
-+++ b/cpp/src/arrow/ipc/reader.cc
-@@ -54,6 +54,7 @@
- #include "arrow/util/compression.h"
- #include "arrow/util/endian.h"
- #include "arrow/util/fuzz_internal.h"
-+#include "arrow/util/int_util_overflow.h"
- #include "arrow/util/key_value_metadata.h"
- #include "arrow/util/logging_internal.h"
- #include "arrow/util/parallel.h"
-@@ -72,6 +73,7 @@ namespace arrow {
-
- namespace flatbuf = org::apache::arrow::flatbuf;
-
-+using internal::AddWithOverflow;
- using internal::checked_cast;
- using internal::checked_pointer_cast;
-
-@@ -177,14 +179,16 @@ class ArrayLoader {
-
- explicit ArrayLoader(const flatbuf::RecordBatch* metadata,
- MetadataVersion metadata_version, const IpcReadOptions& options,
-- int64_t file_offset)
-+ int64_t file_offset, int64_t file_length)
- : metadata_(metadata),
- metadata_version_(metadata_version),
- file_(nullptr),
- file_offset_(file_offset),
-+ file_length_(file_length),
- max_recursion_depth_(options.max_recursion_depth) {}
-
- Status ReadBuffer(int64_t offset, int64_t length, std::shared_ptr<Buffer>* out) {
-+ // This construct permits overriding GetBuffer at compile time
- if (skip_io_) {
- return Status::OK();
- }
-@@ -194,7 +198,10 @@ class ArrayLoader {
- if (length < 0) {
- return Status::Invalid("Negative length for reading buffer ", buffer_index_);
- }
-- // This construct permits overriding GetBuffer at compile time
-+ auto read_end = AddWithOverflow({offset, length});
-+ if (!read_end.has_value() || (file_length_.has_value() && read_end > file_length_)) {
-+ return Status::Invalid("Buffer ", buffer_index_, " exceeds IPC file area");
-+ }
- if (!bit_util::IsMultipleOf8(offset)) {
- return Status::Invalid("Buffer ", buffer_index_,
- " did not start on 8-byte aligned offset: ", offset);
-@@ -202,6 +209,9 @@ class ArrayLoader {
- if (file_) {
- return file_->ReadAt(offset, length).Value(out);
- } else {
-+ if (!AddWithOverflow({read_end.value(), file_offset_}).has_value()) {
-+ return Status::Invalid("Buffer ", buffer_index_, " exceeds IPC file area");
-+ }
- read_request_.RequestRange(offset + file_offset_, length, out);
- return Status::OK();
- }
-@@ -292,6 +302,16 @@ class ArrayLoader {
- // we can skip that buffer without reading from shared memory
- RETURN_NOT_OK(GetFieldMetadata(field_index_++, out_));
-
-+ if (::arrow::internal::has_variadic_buffers(type_id)) {
-+ ARROW_ASSIGN_OR_RAISE(auto data_buffer_count,
-+ GetVariadicCount(variadic_count_index_++));
-+ const int64_t start = static_cast<int64_t>(out_->buffers.size());
-+ // NOTE: this must be done before any other call to `GetBuffer` because
-+ // BatchDataReadRequest will keep pointers to `std::shared_ptr<Buffer>`
-+ // objects.
-+ out_->buffers.resize(start + data_buffer_count);
-+ }
++diff --git a/ci/scripts/cpp_test.sh b/ci/scripts/cpp_test.sh
++index 0c6e1c6..1110378 100755
++--- a/ci/scripts/cpp_test.sh
+++++ b/ci/scripts/cpp_test.sh
++@@ -107,6 +107,18 @@ fi
++
++ if [ "${ARROW_FUZZING}" == "ON" ]; then
++ # Fuzzing regression tests
+++ # Some fuzz regression files may trigger huge memory allocations,
+++ # let the allocator return null instead of aborting.
+++ export ASAN_OPTIONS="$ASAN_OPTIONS allocator_may_return_null=1"
+++ export ARROW_FUZZING_VERBOSITY=1
+++ # Run golden IPC integration files: these should ideally load without errors,
+++ # though some very old ones carry invalid data (such as decimal values
+++ # larger than their advertised precision).
+++ # shellcheck disable=SC2046
+++ "${binary_output_dir}/arrow-ipc-stream-fuzz" $(find "${ARROW_TEST_DATA}"/arrow-ipc-stream/integration -name "*.stream")
+++ # shellcheck disable=SC2046
+++ "${binary_output_dir}/arrow-ipc-file-fuzz" $(find "${ARROW_TEST_DATA}"/arrow-ipc-stream/integration -name "*.arrow_file")
+++ # Run known crash files
++ ${binary_output_dir}/arrow-ipc-stream-fuzz ${ARROW_TEST_DATA}/arrow-ipc-stream/crash-*
++ ${binary_output_dir}/arrow-ipc-stream-fuzz ${ARROW_TEST_DATA}/arrow-ipc-stream/*-testcase-*
++ ${binary_output_dir}/arrow-ipc-file-fuzz ${ARROW_TEST_DATA}/arrow-ipc-file/*-testcase-*
++diff --git a/cpp/src/arrow/ipc/read_write_test.cc b/cpp/src/arrow/ipc/read_write_test.cc
++index bd2c2b7..af749ec 100644
++--- a/cpp/src/arrow/ipc/read_write_test.cc
+++++ b/cpp/src/arrow/ipc/read_write_test.cc
++@@ -1220,40 +1220,55 @@ struct FileGeneratorWriterHelper : public FileWriterHelper {
++ Status ReadBatches(const IpcReadOptions& options, RecordBatchVector* out_batches,
++ ReadStats* out_stats = nullptr,
++ MetadataVector* out_metadata_list = nullptr) override {
++- std::shared_ptr<io::RandomAccessFile> buf_reader;
++- if (kCoalesce) {
++- // Use a non-zero-copy enabled BufferReader so we can test paths properly
++- buf_reader = std::make_shared<NoZeroCopyBufferReader>(buffer_);
++- } else {
++- buf_reader = std::make_shared<io::BufferReader>(buffer_);
++- }
++- AsyncGenerator<std::shared_ptr<RecordBatch>> generator;
+++ // The generator doesn't track stats.
+++ EXPECT_EQ(nullptr, out_stats);
++
++- {
++- auto fut = RecordBatchFileReader::OpenAsync(buf_reader, footer_offset_, options);
++- // Do NOT assert OK since some tests check whether this fails properly
++- EXPECT_FINISHES(fut);
++- ARROW_ASSIGN_OR_RAISE(auto reader, fut.result());
++- EXPECT_EQ(num_batches_written_, reader->num_record_batches());
++- // Generator will keep reader alive internally
++- ARROW_ASSIGN_OR_RAISE(generator, reader->GetRecordBatchGenerator(kCoalesce));
++- }
+++ auto read_batches = [&](bool pre_buffer) -> Result<RecordBatchVector> {
+++ std::shared_ptr<io::RandomAccessFile> buf_reader;
+++ if (kCoalesce) {
+++ // Use a non-zero-copy enabled BufferReader so we can test paths properly
+++ buf_reader = std::make_shared<NoZeroCopyBufferReader>(buffer_);
+++ } else {
+++ buf_reader = std::make_shared<io::BufferReader>(buffer_);
+++ }
+++ AsyncGenerator<std::shared_ptr<RecordBatch>> generator;
+++
+++ {
+++ auto fut = RecordBatchFileReader::OpenAsync(buf_reader, footer_offset_, options);
+++ ARROW_ASSIGN_OR_RAISE(auto reader, fut.result());
+++ EXPECT_EQ(num_batches_written_, reader->num_record_batches());
+++ if (pre_buffer) {
+++ RETURN_NOT_OK(reader->PreBufferMetadata(/*indices=*/{}));
+++ }
+++ // Generator will keep reader alive internally
+++ ARROW_ASSIGN_OR_RAISE(generator, reader->GetRecordBatchGenerator(kCoalesce));
+++ }
++
++- // Generator is async-reentrant
++- std::vector<Future<std::shared_ptr<RecordBatch>>> futures;
+++ // Generator is async-reentrant
+++ std::vector<Future<std::shared_ptr<RecordBatch>>> futures;
+++ for (int i = 0; i < num_batches_written_; ++i) {
+++ futures.push_back(generator());
+++ }
+++ auto fut = generator();
+++ ARROW_ASSIGN_OR_RAISE(auto final_batch, fut.result());
+++ EXPECT_EQ(nullptr, final_batch);
+++
+++ RecordBatchVector batches;
+++ for (auto& future : futures) {
+++ ARROW_ASSIGN_OR_RAISE(auto batch, future.result());
+++ EXPECT_NE(nullptr, batch);
+++ batches.push_back(batch);
+++ }
+++ return batches;
+++ };
+++
+++ ARROW_ASSIGN_OR_RAISE(*out_batches, read_batches(/*pre_buffer=*/false));
+++ // Also read with pre-buffered metadata, and check the results are equal
+++ ARROW_ASSIGN_OR_RAISE(auto batches_pre_buffered, read_batches(/*pre_buffer=*/true));
++ for (int i = 0; i < num_batches_written_; ++i) {
++- futures.push_back(generator());
++- }
++- auto fut = generator();
++- EXPECT_FINISHES_OK_AND_EQ(nullptr, fut);
++- for (auto& future : futures) {
++- EXPECT_FINISHES_OK_AND_ASSIGN(auto batch, future);
++- out_batches->push_back(batch);
+++ AssertBatchesEqual(*batches_pre_buffered[i], *(*out_batches)[i],
+++ /*check_metadata=*/true);
++ }
++-
++- // The generator doesn't track stats.
++- EXPECT_EQ(nullptr, out_stats);
++-
++ return Status::OK();
++ }
++ };
++diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc
++index d272c78..3a2987b 100644
++--- a/cpp/src/arrow/ipc/reader.cc
+++++ b/cpp/src/arrow/ipc/reader.cc
++@@ -52,6 +52,7 @@
++ #include "arrow/util/checked_cast.h"
++ #include "arrow/util/compression.h"
++ #include "arrow/util/endian.h"
+++#include "arrow/util/int_util_overflow.h"
++ #include "arrow/util/key_value_metadata.h"
++ #include "arrow/util/logging.h"
++ #include "arrow/util/parallel.h"
++@@ -73,6 +74,8 @@ namespace flatbuf = org::apache::arrow::flatbuf;
++ using internal::checked_cast;
++ using internal::checked_pointer_cast;
++
+++using internal::AddWithOverflow;
+++
++ namespace ipc {
++
++ using internal::FileBlock;
++@@ -166,23 +169,26 @@ class ArrayLoader {
++ public:
++ explicit ArrayLoader(const flatbuf::RecordBatch* metadata,
++ MetadataVersion metadata_version, const IpcReadOptions& options,
++- io::RandomAccessFile* file)
+++ io::RandomAccessFile* file, std::optional<int64_t> file_length)
++ : metadata_(metadata),
++ metadata_version_(metadata_version),
++ file_(file),
++ file_offset_(0),
+++ file_length_(file_length),
++ max_recursion_depth_(options.max_recursion_depth) {}
++
++ explicit ArrayLoader(const flatbuf::RecordBatch* metadata,
++ MetadataVersion metadata_version, const IpcReadOptions& options,
++- int64_t file_offset)
+++ int64_t file_offset, std::optional<int64_t> file_length)
++ : metadata_(metadata),
++ metadata_version_(metadata_version),
++ file_(nullptr),
++ file_offset_(file_offset),
+++ file_length_(file_length),
++ max_recursion_depth_(options.max_recursion_depth) {}
++
++ Status ReadBuffer(int64_t offset, int64_t length, std::shared_ptr<Buffer>* out) {
+++ // This construct permits overriding GetBuffer at compile time
++ if (skip_io_) {
++ return Status::OK();
++ }
++@@ -192,7 +198,10 @@ class ArrayLoader {
++ if (length < 0) {
++ return Status::Invalid("Negative length for reading buffer ", buffer_index_);
++ }
++- // This construct permits overriding GetBuffer at compile time
+++ auto read_end = AddWithOverflow({offset, length});
+++ if (!read_end.has_value() || (file_length_.has_value() && read_end > file_length_)) {
+++ return Status::Invalid("Buffer ", buffer_index_, " exceeds IPC file area");
+++ }
++ if (!bit_util::IsMultipleOf8(offset)) {
++ return Status::Invalid("Buffer ", buffer_index_,
++ " did not start on 8-byte aligned offset: ", offset);
++@@ -200,6 +209,9 @@ class ArrayLoader {
++ if (file_) {
++ return file_->ReadAt(offset, length).Value(out);
++ } else {
+++ if (!AddWithOverflow({read_end.value(), file_offset_}).has_value()) {
+++ return Status::Invalid("Buffer ", buffer_index_, " exceeds IPC file area");
+++ }
++ read_request_.RequestRange(offset + file_offset_, length, out);
++ return Status::OK();
++ }
++@@ -284,6 +296,16 @@ class ArrayLoader {
++ // we can skip that buffer without reading from shared memory
++ RETURN_NOT_OK(GetFieldMetadata(field_index_++, out_));
++
+++ if (::arrow::internal::has_variadic_buffers(type_id)) {
+++ ARROW_ASSIGN_OR_RAISE(auto data_buffer_count,
+++ GetVariadicCount(variadic_count_index_++));
+++ const int64_t start = static_cast<int64_t>(out_->buffers.size());
+++ // NOTE: this must be done before any other call to `GetBuffer` because
+++ // BatchDataReadRequest will keep pointers to `std::shared_ptr<Buffer>`
+++ // objects.
+++ out_->buffers.resize(start + data_buffer_count);
+++ }
+++
++ if (internal::HasValidityBitmap(type_id, metadata_version_)) {
++ // Extract null_bitmap which is common to all arrays except for unions
++ // and nulls.
++@@ -292,6 +314,7 @@ class ArrayLoader {
++ }
++ buffer_index_++;
++ }
+++
++ return Status::OK();
++ }
++
++@@ -390,14 +413,9 @@ class ArrayLoader {
++ Status Visit(const BinaryViewType& type) {
++ out_->buffers.resize(2);
++
++- RETURN_NOT_OK(LoadCommon(type.id()));
++- RETURN_NOT_OK(GetBuffer(buffer_index_++, &out_->buffers[1]));
++-
++- ARROW_ASSIGN_OR_RAISE(auto data_buffer_count,
++- GetVariadicCount(variadic_count_index_++));
++- out_->buffers.resize(data_buffer_count + 2);
++- for (size_t i = 0; i < data_buffer_count; ++i) {
++- RETURN_NOT_OK(GetBuffer(buffer_index_++, &out_->buffers[i + 2]));
+++ RETURN_NOT_OK(LoadCommon(type.id())); // also initializes variadic buffers
+++ for (int64_t i = 1; i < static_cast<int64_t>(out_->buffers.size()); ++i) {
+++ RETURN_NOT_OK(GetBuffer(buffer_index_++, &out_->buffers[i]));
++ }
++ return Status::OK();
++ }
++@@ -495,6 +513,7 @@ class ArrayLoader {
++ const MetadataVersion metadata_version_;
++ io::RandomAccessFile* file_;
++ int64_t file_offset_;
+++ std::optional<int64_t> file_length_;
++ int max_recursion_depth_;
++ int buffer_index_ = 0;
++ int field_index_ = 0;
++@@ -583,7 +602,12 @@ Result<std::shared_ptr<RecordBatch>> LoadRecordBatchSubset(
++ const flatbuf::RecordBatch* metadata, const std::shared_ptr<Schema>& schema,
++ const std::vector<bool>* inclusion_mask, const IpcReadContext& context,
++ io::RandomAccessFile* file) {
++- ArrayLoader loader(metadata, context.metadata_version, context.options, file);
+++ std::optional<int64_t> file_length;
+++ if (file) {
+++ ARROW_ASSIGN_OR_RAISE(file_length, file->GetSize());
+++ }
+++ ArrayLoader loader(metadata, context.metadata_version, context.options, file,
+++ file_length);
++
++ ArrayDataVector columns(schema->num_fields());
++ ArrayDataVector filtered_columns;
++@@ -832,8 +856,12 @@ Status ReadDictionary(const Buffer& metadata, const IpcReadContext& context,
++ ARROW_ASSIGN_OR_RAISE(auto value_type, context.dictionary_memo->GetDictionaryType(id));
++
++ // Load the dictionary data from the dictionary batch
+++ std::optional<int64_t> file_length;
+++ if (file) {
+++ ARROW_ASSIGN_OR_RAISE(file_length, file->GetSize());
+++ }
++ ArrayLoader loader(batch_meta, internal::GetMetadataVersion(message->version()),
++- context.options, file);
+++ context.options, file, file_length);
++ auto dict_data = std::make_shared<ArrayData>();
++ const Field dummy_field("", value_type);
++ RETURN_NOT_OK(loader.Load(&dummy_field, dict_data.get()));
++@@ -1152,8 +1180,19 @@ Result<std::shared_ptr<RecordBatchStreamReader>> RecordBatchStreamReader::Open(
++
++ // Common functions used in both the random-access file reader and the
++ // asynchronous generator
++-static inline FileBlock FileBlockFromFlatbuffer(const flatbuf::Block* block) {
++- return FileBlock{block->offset(), block->metaDataLength(), block->bodyLength()};
+++static inline Result<FileBlock> FileBlockFromFlatbuffer(const flatbuf::Block* fb_block,
+++ int64_t max_offset) {
+++ auto block =
+++ FileBlock{fb_block->offset(), fb_block->metaDataLength(), fb_block->bodyLength()};
+++ if (block.metadata_length < 0 || block.body_length < 0 || block.offset < 0) {
+++ return Status::IOError("Invalid Block in IPC file footer");
+++ }
+++ auto block_end =
+++ AddWithOverflow<int64_t>({block.offset, block.metadata_length, block.body_length});
+++ if (!block_end.has_value() || block_end > max_offset) {
+++ return Status::IOError("Invalid Block in IPC file footer");
+++ }
+++ return block;
++ }
++
++ Status CheckAligned(const FileBlock& block) {
++@@ -1267,7 +1306,11 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
++ const std::shared_ptr<Schema>& schema,
++ const std::vector<bool>* inclusion_mask,
++ MetadataVersion metadata_version = MetadataVersion::V5) {
++- ArrayLoader loader(metadata, metadata_version, options, file);
+++ std::optional<int64_t> file_length;
+++ if (file) {
+++ ARROW_ASSIGN_OR_RAISE(file_length, file->GetSize());
+++ }
+++ ArrayLoader loader(metadata, metadata_version, options, file, file_length);
++ for (int i = 0; i < schema->num_fields(); ++i) {
++ const Field& field = *schema->field(i);
++ if (!inclusion_mask || (*inclusion_mask)[i]) {
++@@ -1336,8 +1379,9 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
++ read_options, file, schema, &inclusion_mask);
++ };
++ }
+++ ARROW_ASSIGN_OR_RAISE(auto block, GetRecordBatchBlock(i));
++ ARROW_ASSIGN_OR_RAISE(auto message,
++- ReadMessageFromBlock(GetRecordBatchBlock(i), fields_loader));
+++ ReadMessageFromBlock(block, fields_loader));
++
++ CHECK_HAS_BODY(*message);
++ ARROW_ASSIGN_OR_RAISE(auto reader, Buffer::GetReader(message->body()));
++@@ -1353,8 +1397,9 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
++ Result<int64_t> CountRows() override {
++ int64_t total = 0;
++ for (int i = 0; i < num_record_batches(); i++) {
+++ ARROW_ASSIGN_OR_RAISE(auto block, GetRecordBatchBlock(i));
++ ARROW_ASSIGN_OR_RAISE(auto outer_message,
++- ReadMessageFromBlock(GetRecordBatchBlock(i)));
+++ ReadMessageFromBlock(block));
++ auto metadata = outer_message->metadata();
++ const flatbuf::Message* message = nullptr;
++ RETURN_NOT_OK(
++@@ -1468,13 +1513,13 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
++
++ Status DoPreBufferMetadata(const std::vector<int>& indices) {
++ RETURN_NOT_OK(CacheMetadata(indices));
++- EnsureDictionaryReadStarted();
+++ RETURN_NOT_OK(EnsureDictionaryReadStarted());
++ Future<> all_metadata_ready = WaitForMetadatas(indices);
++ for (int index : indices) {
++ Future<std::shared_ptr<Message>> metadata_loaded =
++ all_metadata_ready.Then([this, index]() -> Result<std::shared_ptr<Message>> {
++ stats_.num_messages.fetch_add(1, std::memory_order_relaxed);
++- FileBlock block = GetRecordBatchBlock(index);
+++ ARROW_ASSIGN_OR_RAISE(FileBlock block, GetRecordBatchBlock(index));
++ ARROW_ASSIGN_OR_RAISE(
++ std::shared_ptr<Buffer> metadata,
++ metadata_cache_->Read({block.offset, block.metadata_length}));
++@@ -1523,12 +1568,12 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
++ }
++ };
++
++- FileBlock GetRecordBatchBlock(int i) const {
++- return FileBlockFromFlatbuffer(footer_->recordBatches()->Get(i));
+++ Result<FileBlock> GetRecordBatchBlock(int i) const {
+++ return FileBlockFromFlatbuffer(footer_->recordBatches()->Get(i), footer_offset_);
++ }
++
++- FileBlock GetDictionaryBlock(int i) const {
++- return FileBlockFromFlatbuffer(footer_->dictionaries()->Get(i));
+++ Result<FileBlock> GetDictionaryBlock(int i) const {
+++ return FileBlockFromFlatbuffer(footer_->dictionaries()->Get(i), footer_offset_);
++ }
++
++ Result<std::unique_ptr<Message>> ReadMessageFromBlock(
++@@ -1541,16 +1586,26 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
++
++ Status ReadDictionaries() {
++ // Read all the dictionaries
+++ std::vector<std::shared_ptr<Message>> messages(num_dictionaries());
+++ for (int i = 0; i < num_dictionaries(); ++i) {
+++ ARROW_ASSIGN_OR_RAISE(FileBlock block, GetDictionaryBlock(i));
+++ ARROW_ASSIGN_OR_RAISE(messages[i], ReadMessageFromBlock(block));
+++ }
+++ return ReadDictionaries(messages);
+++ }
+++
+++ Status ReadDictionaries(
+++ const std::vector<std::shared_ptr<Message>>& dictionary_messages) {
+++ DCHECK_EQ(dictionary_messages.size(), static_cast<size_t>(num_dictionaries()));
++ IpcReadContext context(&dictionary_memo_, options_, swap_endian_);
++ for (int i = 0; i < num_dictionaries(); ++i) {
++- ARROW_ASSIGN_OR_RAISE(auto message, ReadMessageFromBlock(GetDictionaryBlock(i)));
++- RETURN_NOT_OK(ReadOneDictionary(message.get(), context));
++- stats_.num_dictionary_batches.fetch_add(1, std::memory_order_relaxed);
+++ RETURN_NOT_OK(ReadOneDictionary(i, dictionary_messages[i].get(), context));
++ }
++ return Status::OK();
++ }
++
++- Status ReadOneDictionary(Message* message, const IpcReadContext& context) {
+++ Status ReadOneDictionary(int dict_index, Message* message,
+++ const IpcReadContext& context) {
++ CHECK_HAS_BODY(*message);
++ ARROW_ASSIGN_OR_RAISE(auto reader, Buffer::GetReader(message->body()));
++ DictionaryKind kind;
++@@ -1560,44 +1615,48 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
++ } else if (kind == DictionaryKind::Delta) {
++ stats_.num_dictionary_deltas.fetch_add(1, std::memory_order_relaxed);
++ }
+++ stats_.num_dictionary_batches.fetch_add(1, std::memory_order_relaxed);
++ return Status::OK();
++ }
++
++- void AddDictionaryRanges(std::vector<io::ReadRange>* ranges) const {
+++ Status AddDictionaryRanges(std::vector<io::ReadRange>* ranges) const {
++ // Adds all dictionaries to the range cache
++ for (int i = 0; i < num_dictionaries(); ++i) {
++- FileBlock block = GetDictionaryBlock(i);
+++ ARROW_ASSIGN_OR_RAISE(FileBlock block, GetDictionaryBlock(i));
++ ranges->push_back({block.offset, block.metadata_length + block.body_length});
++ }
+++ return Status::OK();
++ }
++
++- void AddMetadataRanges(const std::vector<int>& indices,
++- std::vector<io::ReadRange>* ranges) {
+++ Status AddMetadataRanges(const std::vector<int>& indices,
+++ std::vector<io::ReadRange>* ranges) {
++ for (int index : indices) {
++- FileBlock block = GetRecordBatchBlock(static_cast<int>(index));
+++ ARROW_ASSIGN_OR_RAISE(FileBlock block, GetRecordBatchBlock(index));
++ ranges->push_back({block.offset, block.metadata_length});
++ }
+++ return Status::OK();
++ }
++
++ Status CacheMetadata(const std::vector<int>& indices) {
++ std::vector<io::ReadRange> ranges;
++ if (!read_dictionaries_) {
++- AddDictionaryRanges(&ranges);
+++ RETURN_NOT_OK(AddDictionaryRanges(&ranges));
++ }
++- AddMetadataRanges(indices, &ranges);
+++ RETURN_NOT_OK(AddMetadataRanges(indices, &ranges));
++ return metadata_cache_->Cache(std::move(ranges));
++ }
++
++- void EnsureDictionaryReadStarted() {
+++ Status EnsureDictionaryReadStarted() {
++ if (!dictionary_load_finished_.is_valid()) {
++ read_dictionaries_ = true;
++ std::vector<io::ReadRange> ranges;
++- AddDictionaryRanges(&ranges);
+++ RETURN_NOT_OK(AddDictionaryRanges(&ranges));
++ dictionary_load_finished_ =
++ metadata_cache_->WaitFor(std::move(ranges)).Then([this] {
++ return ReadDictionaries();
++ });
++ }
+++ return Status::OK();
++ }
++
++ Status WaitForDictionaryReadFinished() {
++@@ -1615,7 +1674,7 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
++
++ Future<> WaitForMetadatas(const std::vector<int>& indices) {
++ std::vector<io::ReadRange> ranges;
++- AddMetadataRanges(indices, &ranges);
+++ RETURN_NOT_OK(AddMetadataRanges(indices, &ranges));
++ return metadata_cache_->WaitFor(std::move(ranges));
++ }
++
++@@ -1659,12 +1718,13 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
++ const flatbuf::RecordBatch* batch,
++ IpcReadContext context, io::RandomAccessFile* file,
++ std::shared_ptr<io::RandomAccessFile> owned_file,
++- int64_t block_data_offset)
+++ int64_t block_data_offset, int64_t block_data_length)
++ : schema(std::move(sch)),
++ context(std::move(context)),
++ file(file),
++ owned_file(std::move(owned_file)),
++- loader(batch, context.metadata_version, context.options, block_data_offset),
+++ loader(batch, context.metadata_version, context.options, block_data_offset,
+++ block_data_length),
++ columns(schema->num_fields()),
++ cache(file, file->io_context(), io::CacheOptions::LazyDefaults()),
++ length(batch->length()) {}
++@@ -1763,14 +1823,15 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
++ return dictionary_load_finished_.Then([message_fut] { return message_fut; })
++ .Then([this, index](const std::shared_ptr<Message>& message_obj)
++ -> Future<std::shared_ptr<RecordBatch>> {
++- FileBlock block = GetRecordBatchBlock(index);
+++ ARROW_ASSIGN_OR_RAISE(auto block, GetRecordBatchBlock(index));
++ ARROW_ASSIGN_OR_RAISE(auto message, GetFlatbufMessage(message_obj));
++ ARROW_ASSIGN_OR_RAISE(auto batch, GetBatchFromMessage(message));
++ ARROW_ASSIGN_OR_RAISE(auto context, GetIpcReadContext(message, batch));
++
++ auto read_context = std::make_shared<CachedRecordBatchReadContext>(
++ schema_, batch, std::move(context), file_, owned_file_,
++- block.offset + static_cast<int64_t>(block.metadata_length));
+++ block.offset + static_cast<int64_t>(block.metadata_length),
+++ block.body_length);
++ RETURN_NOT_OK(read_context->CalculateLoadRequest());
++ return read_context->ReadAsync().Then(
++ [read_context] { return read_context->CreateRecordBatch(); });
++@@ -1958,25 +2019,31 @@ Future<WholeIpcFileRecordBatchGenerator::Item>
++ WholeIpcFileRecordBatchGenerator::operator()() {
++ auto state = state_;
++ if (!read_dictionaries_.is_valid()) {
++- std::vector<Future<std::shared_ptr<Message>>> messages(state->num_dictionaries());
++- for (int i = 0; i < state->num_dictionaries(); i++) {
++- auto block = FileBlockFromFlatbuffer(state->footer_->dictionaries()->Get(i));
++- messages[i] = ReadBlock(block);
++- }
++- auto read_messages = All(std::move(messages));
++- if (executor_) read_messages = executor_->Transfer(read_messages);
++- read_dictionaries_ = read_messages.Then(
++- [=](const std::vector<Result<std::shared_ptr<Message>>>& maybe_messages)
++- -> Status {
++- ARROW_ASSIGN_OR_RAISE(auto messages,
++- arrow::internal::UnwrapOrRaise(maybe_messages));
++- return ReadDictionaries(state.get(), std::move(messages));
++- });
+++ if (state->dictionary_load_finished_.is_valid()) {
+++ // PreBufferMetadata has started reading dictionaries in the background
+++ read_dictionaries_ = state->dictionary_load_finished_;
+++ } else {
+++ // Start reading dictionaries
+++ std::vector<Future<std::shared_ptr<Message>>> messages(state->num_dictionaries());
+++ for (int i = 0; i < state->num_dictionaries(); i++) {
+++ ARROW_ASSIGN_OR_RAISE(auto block, state->GetDictionaryBlock(i));
+++ messages[i] = ReadBlock(block);
+++ }
+++ auto read_messages = All(std::move(messages));
+++ if (executor_) read_messages = executor_->Transfer(read_messages);
+++ read_dictionaries_ = read_messages.Then(
+++ [=](const std::vector<Result<std::shared_ptr<Message>>>& maybe_messages)
+++ -> Status {
+++ ARROW_ASSIGN_OR_RAISE(auto messages,
+++ arrow::internal::UnwrapOrRaise(maybe_messages));
+++ return state->ReadDictionaries(messages);
+++ });
+++ }
++ }
++ if (index_ >= state_->num_record_batches()) {
++ return Future<Item>::MakeFinished(IterationTraits<Item>::End());
++ }
++- auto block = FileBlockFromFlatbuffer(state->footer_->recordBatches()->Get(index_++));
+++ ARROW_ASSIGN_OR_RAISE(auto block, state->GetRecordBatchBlock(index_++));
++ auto read_message = ReadBlock(block);
++ auto read_messages = read_dictionaries_.Then([read_message]() { return read_message; });
++ // Force transfer. This may be wasteful in some cases, but ensures we get off the
++@@ -2012,16 +2079,6 @@ Future<std::shared_ptr<Message>> WholeIpcFileRecordBatchGenerator::ReadBlock(
++ }
++ }
++
++-Status WholeIpcFileRecordBatchGenerator::ReadDictionaries(
++- RecordBatchFileReaderImpl* state,
++- std::vector<std::shared_ptr<Message>> dictionary_messages) {
++- IpcReadContext context(&state->dictionary_memo_, state->options_, state->swap_endian_);
++- for (const auto& message : dictionary_messages) {
++- RETURN_NOT_OK(state->ReadOneDictionary(message.get(), context));
++- }
++- return Status::OK();
++-}
++-
++ Result<std::shared_ptr<RecordBatch>> WholeIpcFileRecordBatchGenerator::ReadRecordBatch(
++ RecordBatchFileReaderImpl* state, Message* message) {
++ CHECK_HAS_BODY(*message);
++@@ -2598,23 +2655,37 @@ Status ValidateFuzzBatch(const RecordBatch& batch) {
++ return st;
++ }
++
+++Status ValidateFuzzBatch(const RecordBatchWithMetadata& batch) {
+++ if (batch.batch) {
+++ RETURN_NOT_OK(ValidateFuzzBatch(*batch.batch));
+++ }
+++ // XXX do something with custom metadata?
+++ return Status::OK();
+++}
+++
++ } // namespace
++
+++IpcReadOptions FuzzingOptions() {
+++ IpcReadOptions options;
+++ options.memory_pool = default_memory_pool();
+++ options.max_recursion_depth = 256;
+++ return options;
+++}
+++
++ Status FuzzIpcStream(const uint8_t* data, int64_t size) {
++ auto buffer = std::make_shared<Buffer>(data, size);
++ io::BufferReader buffer_reader(buffer);
++
++- std::shared_ptr<RecordBatchReader> batch_reader;
++- ARROW_ASSIGN_OR_RAISE(batch_reader, RecordBatchStreamReader::Open(&buffer_reader));
+++ ARROW_ASSIGN_OR_RAISE(auto batch_reader, RecordBatchStreamReader::Open(&buffer_reader));
++ Status st;
++
++ while (true) {
++- std::shared_ptr<arrow::RecordBatch> batch;
++- RETURN_NOT_OK(batch_reader->ReadNext(&batch));
++- if (batch == nullptr) {
+++ ARROW_ASSIGN_OR_RAISE(auto batch, batch_reader->ReadNext());
+++ if (!batch.batch && !batch.custom_metadata) {
+++ // EOS
++ break;
++ }
++- st &= ValidateFuzzBatch(*batch);
+++ st &= ValidateFuzzBatch(batch);
++ }
++
++ return st;
++@@ -2622,19 +2693,36 @@ Status FuzzIpcStream(const uint8_t* data, int64_t size) {
++
++ Status FuzzIpcFile(const uint8_t* data, int64_t size) {
++ auto buffer = std::make_shared<Buffer>(data, size);
++- io::BufferReader buffer_reader(buffer);
++
++- std::shared_ptr<RecordBatchFileReader> batch_reader;
++- ARROW_ASSIGN_OR_RAISE(batch_reader, RecordBatchFileReader::Open(&buffer_reader));
++- Status st;
+++ Status final_status;
+++
+++ auto do_read = [&](bool pre_buffer) {
+++ io::BufferReader buffer_reader(buffer);
+++ ARROW_ASSIGN_OR_RAISE(auto batch_reader,
+++ RecordBatchFileReader::Open(&buffer_reader, FuzzingOptions()));
+++ if (pre_buffer) {
+++ // Pre-buffer all record batches
+++ RETURN_NOT_OK(batch_reader->PreBufferMetadata(/*indices=*/{}));
+++ }
++
++- const int n_batches = batch_reader->num_record_batches();
++- for (int i = 0; i < n_batches; ++i) {
++- ARROW_ASSIGN_OR_RAISE(auto batch, batch_reader->ReadRecordBatch(i));
++- st &= ValidateFuzzBatch(*batch);
+++ const int n_batches = batch_reader->num_record_batches();
+++ for (int i = 0; i < n_batches; ++i) {
+++ RecordBatchWithMetadata batch;
+++ auto st = batch_reader->ReadRecordBatchWithCustomMetadata(i).Value(&batch);
+++ final_status &= st;
+++ if (!st.ok()) {
+++ continue;
+++ }
+++ final_status &= ValidateFuzzBatch(batch);
+++ }
+++ return Status::OK();
+++ };
+++
+++ for (const bool pre_buffer : {false, true}) {
+++ final_status &= do_read(pre_buffer);
++ }
++
++- return st;
+++ return final_status;
++ }
++
++ Status FuzzIpcTensorStream(const uint8_t* data, int64_t size) {
++diff --git a/cpp/src/arrow/ipc/test_common.cc b/cpp/src/arrow/ipc/test_common.cc
++index 87c02e2..3a632fe 100644
++--- a/cpp/src/arrow/ipc/test_common.cc
+++++ b/cpp/src/arrow/ipc/test_common.cc
++@@ -16,6 +16,7 @@
++ // under the License.
++
++ #include <algorithm>
+++#include <concepts>
++ #include <cstdint>
++ #include <functional>
++ #include <memory>
++@@ -362,19 +363,27 @@ Status MakeRandomStringArray(int64_t length, bool include_nulls, MemoryPool* poo
++ return builder.Finish(out);
++ }
++
++-template <class BuilderType>
++-static Status MakeBinaryArrayWithUniqueValues(int64_t length, bool include_nulls,
++- MemoryPool* pool,
++- std::shared_ptr<Array>* out) {
++- BuilderType builder(pool);
+++template <std::derived_from<ArrayBuilder> BuilderType>
+++static Result<std::shared_ptr<Array>> MakeBinaryArrayWithUniqueValues(
+++ BuilderType builder, int64_t length, bool include_nulls) {
+++ if constexpr (std::is_base_of_v<BinaryViewBuilder, BuilderType>) {
+++ // Try to emit several variadic buffers by choosing a small block size.
+++ builder.SetBlockSize(512);
+++ }
++ for (int64_t i = 0; i < length; ++i) {
++ if (include_nulls && (i % 7 == 0)) {
++ RETURN_NOT_OK(builder.AppendNull());
++ } else {
++- RETURN_NOT_OK(builder.Append(std::to_string(i)));
+++ // Make sure that some strings are long enough to have non-inline binary views
+++ const auto base = std::to_string(i);
+++ std::string value;
+++ for (int64_t j = 0; j < 3 * (i % 10); ++j) {
+++ value += base;
+++ }
+++ RETURN_NOT_OK(builder.Append(value));
++ }
++ }
++- return builder.Finish(out);
+++ return builder.Finish();
++ }
++
++ Status MakeStringTypesRecordBatch(std::shared_ptr<RecordBatch>* out, bool with_nulls,
++@@ -384,22 +393,22 @@ Status MakeStringTypesRecordBatch(std::shared_ptr<RecordBatch>* out, bool with_n
++ ArrayVector arrays;
++ FieldVector fields;
++
++- auto AppendColumn = [&](auto& MakeArray) {
++- arrays.emplace_back();
++- RETURN_NOT_OK(MakeArray(length, with_nulls, default_memory_pool(), &arrays.back()));
++-
++- const auto& type = arrays.back()->type();
++- fields.push_back(field(type->ToString(), type));
+++ auto AppendColumn = [&](auto builder) {
+++ ARROW_ASSIGN_OR_RAISE(auto array, MakeBinaryArrayWithUniqueValues(
+++ std::move(builder), length, with_nulls));
+++ arrays.push_back(array);
+++ fields.push_back(field(array->type()->ToString(), array->type()));
++ return Status::OK();
++ };
++
++- RETURN_NOT_OK(AppendColumn(MakeBinaryArrayWithUniqueValues<StringBuilder>));
++- RETURN_NOT_OK(AppendColumn(MakeBinaryArrayWithUniqueValues<BinaryBuilder>));
++- RETURN_NOT_OK(AppendColumn(MakeBinaryArrayWithUniqueValues<LargeStringBuilder>));
++- RETURN_NOT_OK(AppendColumn(MakeBinaryArrayWithUniqueValues<LargeBinaryBuilder>));
+++ auto pool = default_memory_pool();
+++ RETURN_NOT_OK(AppendColumn(StringBuilder(pool)));
+++ RETURN_NOT_OK(AppendColumn(BinaryBuilder(pool)));
+++ RETURN_NOT_OK(AppendColumn(LargeStringBuilder(pool)));
+++ RETURN_NOT_OK(AppendColumn(LargeBinaryBuilder(pool)));
++ if (with_view_types) {
++- RETURN_NOT_OK(AppendColumn(MakeBinaryArrayWithUniqueValues<StringViewBuilder>));
++- RETURN_NOT_OK(AppendColumn(MakeBinaryArrayWithUniqueValues<BinaryViewBuilder>));
+++ RETURN_NOT_OK(AppendColumn(StringViewBuilder(pool)));
+++ RETURN_NOT_OK(AppendColumn(BinaryViewBuilder(pool)));
++ }
++
++ *out = RecordBatch::Make(schema(std::move(fields)), length, std::move(arrays));
++diff --git a/cpp/src/arrow/type.h b/cpp/src/arrow/type.h
++index 5b1331a..42e83f6 100644
++--- a/cpp/src/arrow/type.h
+++++ b/cpp/src/arrow/type.h
++@@ -2494,6 +2494,16 @@ constexpr bool HasValidityBitmap(Type::type id) {
++ }
++ }
++
+++constexpr bool has_variadic_buffers(Type::type id) {
+++ switch (id) {
+++ case Type::BINARY_VIEW:
+++ case Type::STRING_VIEW:
+++ return true;
+++ default:
+++ return false;
+++ }
+++}
+++
++ ARROW_EXPORT
++ std::string ToString(Type::type id);
++
++diff --git a/cpp/src/arrow/util/int_util_overflow.h b/cpp/src/arrow/util/int_util_overflow.h
++index ffe78be..841d503 100644
++--- a/cpp/src/arrow/util/int_util_overflow.h
+++++ b/cpp/src/arrow/util/int_util_overflow.h
++@@ -18,7 +18,9 @@
++ #pragma once
++
++ #include <cstdint>
+++#include <initializer_list>
++ #include <limits>
+++#include <optional>
++ #include <type_traits>
++
++ #include "arrow/status.h"
++@@ -114,5 +116,36 @@ SignedInt SafeLeftShift(SignedInt u, Shift shift) {
++ return static_cast<SignedInt>(static_cast<UnsignedInt>(u) << shift);
++ }
++
+++// Convenience functions over an arbitrary number of arguments
+++template <typename Int>
+++std::optional<Int> AddWithOverflow(std::initializer_list<Int> vs) {
+++ if (vs.size() == 0) {
+++ return {};
+++ }
+++ auto it = vs.begin();
+++ Int v = *it++;
+++ while (it != vs.end()) {
+++ if (ARROW_PREDICT_FALSE(AddWithOverflow(v, *it++, &v))) {
+++ return {};
+++ }
+++ }
+++ return v;
+++}
+++
+++template <typename Int>
+++std::optional<Int> MultiplyWithOverflow(std::initializer_list<Int> vs) {
+++ if (vs.size() == 0) {
+++ return {};
+++ }
+++ auto it = vs.begin();
+++ Int v = *it++;
+++ while (it != vs.end()) {
+++ if (ARROW_PREDICT_FALSE(MultiplyWithOverflow(v, *it++, &v))) {
+++ return {};
+++ }
+++ }
+++ return v;
+++}
+++
++ } // namespace internal
++ } // namespace arrow
++diff --git a/cpp/src/arrow/util/int_util_test.cc b/cpp/src/arrow/util/int_util_test.cc
++index 7217c10..cffa4e9 100644
++--- a/cpp/src/arrow/util/int_util_test.cc
+++++ b/cpp/src/arrow/util/int_util_test.cc
++@@ -649,5 +649,23 @@ TYPED_TEST(TestAddWithOverflow, Basics) {
++ this->CheckOk(almost_min, almost_max + T{2}, T{1});
++ }
++
+++TEST(AddWithOverflow, Variadic) {
+++ ASSERT_EQ(AddWithOverflow<int>({}), std::nullopt);
+++ ASSERT_EQ(AddWithOverflow({1, 2, 3}), 6);
+++ ASSERT_EQ(AddWithOverflow<int8_t>({1, 2, 125}), std::nullopt);
+++ ASSERT_EQ(AddWithOverflow<int8_t>({125, 2, 1}), std::nullopt);
+++ ASSERT_EQ(AddWithOverflow<int16_t>({1, 2, 125}), 128);
+++ ASSERT_EQ(AddWithOverflow<int16_t>({125, 2, 1}), 128);
+++}
+++
+++TEST(MultiplyWithOverflow, Variadic) {
+++ ASSERT_EQ(MultiplyWithOverflow<int>({}), std::nullopt);
+++ ASSERT_EQ(MultiplyWithOverflow({1, 2, 3, 4}), 24);
+++ ASSERT_EQ(MultiplyWithOverflow<int8_t>({2, 2, 32}), std::nullopt);
+++ ASSERT_EQ(MultiplyWithOverflow<int8_t>({32, 4, 1}), std::nullopt);
+++ ASSERT_EQ(MultiplyWithOverflow<int16_t>({2, 2, 32}), 128);
+++ ASSERT_EQ(MultiplyWithOverflow<int16_t>({32, 4, 1}), 128);
+++}
+++
++ } // namespace internal
++ } // namespace arrow
++--
++2.45.4
+
- if (internal::HasValidityBitmap(type_id, metadata_version_)) {
- // Extract null_bitmap which is common to all arrays except for unions
- // and nulls.
-@@ -300,6 +320,7 @@ class ArrayLoader {
- }
- buffer_index_++;
- }
-+
- return Status::OK();
- }
-
-@@ -398,14 +419,9 @@ class ArrayLoader {
- Status Visit(const BinaryViewType& type) {
- out_->buffers.resize(2);
-
-- RETURN_NOT_OK(LoadCommon(type.id()));
-- RETURN_NOT_OK(GetBuffer(buffer_index_++, &out_->buffers[1]));
--
-- ARROW_ASSIGN_OR_RAISE(auto data_buffer_count,
-- GetVariadicCount(variadic_count_index_++));
-- out_->buffers.resize(data_buffer_count + 2);
-- for (int64_t i = 0; i < data_buffer_count; ++i) {
-- RETURN_NOT_OK(GetBuffer(buffer_index_++, &out_->buffers[i + 2]));
-+ RETURN_NOT_OK(LoadCommon(type.id())); // also initializes variadic buffers
-+ for (int64_t i = 1; i < static_cast<int64_t>(out_->buffers.size()); ++i) {
-+ RETURN_NOT_OK(GetBuffer(buffer_index_++, &out_->buffers[i]));
- }
- return Status::OK();
- }
-@@ -503,6 +519,7 @@ class ArrayLoader {
- const MetadataVersion metadata_version_;
- io::RandomAccessFile* file_;
- int64_t file_offset_;
-+ std::optional<int64_t> file_length_;
- int max_recursion_depth_;
- int buffer_index_ = 0;
- int field_index_ = 0;
-@@ -1173,8 +1190,19 @@ namespace {
-
- // Common functions used in both the random-access file reader and the
- // asynchronous generator
--inline FileBlock FileBlockFromFlatbuffer(const flatbuf::Block* block) {
-- return FileBlock{block->offset(), block->metaDataLength(), block->bodyLength()};
-+Result<FileBlock> FileBlockFromFlatbuffer(const flatbuf::Block* fb_block,
-+ int64_t max_offset) {
-+ auto block =
-+ FileBlock{fb_block->offset(), fb_block->metaDataLength(), fb_block->bodyLength()};
-+ if (block.metadata_length < 0 || block.body_length < 0 || block.offset < 0) {
-+ return Status::IOError("Invalid Block in IPC file footer");
-+ }
-+ auto block_end =
-+ AddWithOverflow<int64_t>({block.offset, block.metadata_length, block.body_length});
-+ if (!block_end.has_value() || block_end > max_offset) {
-+ return Status::IOError("Invalid Block in IPC file footer");
-+ }
-+ return block;
- }
-
- Status CheckAligned(const FileBlock& block) {
-@@ -1362,8 +1390,8 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
- read_options, file, schema, &inclusion_mask);
- };
- }
-- ARROW_ASSIGN_OR_RAISE(auto message,
-- ReadMessageFromBlock(GetRecordBatchBlock(i), fields_loader));
-+ ARROW_ASSIGN_OR_RAISE(auto block, GetRecordBatchBlock(i));
-+ ARROW_ASSIGN_OR_RAISE(auto message, ReadMessageFromBlock(block, fields_loader));
-
- CHECK_HAS_BODY(*message);
- ARROW_ASSIGN_OR_RAISE(auto reader, Buffer::GetReader(message->body()));
-@@ -1379,8 +1407,8 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
- Result<int64_t> CountRows() override {
- int64_t total = 0;
- for (int i = 0; i < num_record_batches(); i++) {
-- ARROW_ASSIGN_OR_RAISE(auto outer_message,
-- ReadMessageFromBlock(GetRecordBatchBlock(i)));
-+ ARROW_ASSIGN_OR_RAISE(auto block, GetRecordBatchBlock(i));
-+ ARROW_ASSIGN_OR_RAISE(auto outer_message, ReadMessageFromBlock(block));
- auto metadata = outer_message->metadata();
- const flatbuf::Message* message = nullptr;
- RETURN_NOT_OK(
-@@ -1494,13 +1522,13 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
-
- Status DoPreBufferMetadata(const std::vector<int>& indices) {
- RETURN_NOT_OK(CacheMetadata(indices));
-- EnsureDictionaryReadStarted();
-+ RETURN_NOT_OK(EnsureDictionaryReadStarted());
- Future<> all_metadata_ready = WaitForMetadatas(indices);
- for (int index : indices) {
- Future<std::shared_ptr<Message>> metadata_loaded =
- all_metadata_ready.Then([this, index]() -> Result<std::shared_ptr<Message>> {
- stats_.num_messages.fetch_add(1, std::memory_order_relaxed);
-- FileBlock block = GetRecordBatchBlock(index);
-+ ARROW_ASSIGN_OR_RAISE(FileBlock block, GetRecordBatchBlock(index));
- ARROW_ASSIGN_OR_RAISE(
- std::shared_ptr<Buffer> metadata,
- metadata_cache_->Read({block.offset, block.metadata_length}));
-@@ -1549,12 +1577,12 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
- }
- };
-
-- FileBlock GetRecordBatchBlock(int i) const {
-- return FileBlockFromFlatbuffer(footer_->recordBatches()->Get(i));
-+ Result<FileBlock> GetRecordBatchBlock(int i) const {
-+ return FileBlockFromFlatbuffer(footer_->recordBatches()->Get(i), footer_offset_);
- }
-
-- FileBlock GetDictionaryBlock(int i) const {
-- return FileBlockFromFlatbuffer(footer_->dictionaries()->Get(i));
-+ Result<FileBlock> GetDictionaryBlock(int i) const {
-+ return FileBlockFromFlatbuffer(footer_->dictionaries()->Get(i), footer_offset_);
- }
-
- Result<std::unique_ptr<Message>> ReadMessageFromBlock(
-@@ -1567,16 +1595,26 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
-
- Status ReadDictionaries() {
- // Read all the dictionaries
-+ std::vector<std::shared_ptr<Message>> messages(num_dictionaries());
-+ for (int i = 0; i < num_dictionaries(); ++i) {
-+ ARROW_ASSIGN_OR_RAISE(FileBlock block, GetDictionaryBlock(i));
-+ ARROW_ASSIGN_OR_RAISE(messages[i], ReadMessageFromBlock(block));
-+ }
-+ return ReadDictionaries(messages);
-+ }
-+
-+ Status ReadDictionaries(
-+ const std::vector<std::shared_ptr<Message>>& dictionary_messages) {
-+ DCHECK_EQ(dictionary_messages.size(), static_cast<size_t>(num_dictionaries()));
- IpcReadContext context(&dictionary_memo_, options_, swap_endian_);
- for (int i = 0; i < num_dictionaries(); ++i) {
-- ARROW_ASSIGN_OR_RAISE(auto message, ReadMessageFromBlock(GetDictionaryBlock(i)));
-- RETURN_NOT_OK(ReadOneDictionary(message.get(), context));
-- stats_.num_dictionary_batches.fetch_add(1, std::memory_order_relaxed);
-+ RETURN_NOT_OK(ReadOneDictionary(i, dictionary_messages[i].get(), context));
- }
- return Status::OK();
- }
-
-- Status ReadOneDictionary(Message* message, const IpcReadContext& context) {
-+ Status ReadOneDictionary(int dict_index, Message* message,
-+ const IpcReadContext& context) {
- CHECK_HAS_BODY(*message);
- ARROW_ASSIGN_OR_RAISE(auto reader, Buffer::GetReader(message->body()));
- DictionaryKind kind;
-@@ -1586,44 +1624,48 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
- } else if (kind == DictionaryKind::Delta) {
- stats_.num_dictionary_deltas.fetch_add(1, std::memory_order_relaxed);
- }
-+ stats_.num_dictionary_batches.fetch_add(1, std::memory_order_relaxed);
- return Status::OK();
- }
-
-- void AddDictionaryRanges(std::vector<io::ReadRange>* ranges) const {
-+ Status AddDictionaryRanges(std::vector<io::ReadRange>* ranges) const {
- // Adds all dictionaries to the range cache
- for (int i = 0; i < num_dictionaries(); ++i) {
-- FileBlock block = GetDictionaryBlock(i);
-+ ARROW_ASSIGN_OR_RAISE(FileBlock block, GetDictionaryBlock(i));
- ranges->push_back({block.offset, block.metadata_length + block.body_length});
- }
-+ return Status::OK();
- }
-
-- void AddMetadataRanges(const std::vector<int>& indices,
-- std::vector<io::ReadRange>* ranges) {
-+ Status AddMetadataRanges(const std::vector<int>& indices,
-+ std::vector<io::ReadRange>* ranges) {
- for (int index : indices) {
-- FileBlock block = GetRecordBatchBlock(static_cast<int>(index));
-+ ARROW_ASSIGN_OR_RAISE(FileBlock block, GetRecordBatchBlock(index));
- ranges->push_back({block.offset, block.metadata_length});
- }
-+ return Status::OK();
- }
-
- Status CacheMetadata(const std::vector<int>& indices) {
- std::vector<io::ReadRange> ranges;
- if (!read_dictionaries_) {
-- AddDictionaryRanges(&ranges);
-+ RETURN_NOT_OK(AddDictionaryRanges(&ranges));
- }
-- AddMetadataRanges(indices, &ranges);
-+ RETURN_NOT_OK(AddMetadataRanges(indices, &ranges));
- return metadata_cache_->Cache(std::move(ranges));
- }
-
-- void EnsureDictionaryReadStarted() {
-+ Status EnsureDictionaryReadStarted() {
- if (!dictionary_load_finished_.is_valid()) {
- read_dictionaries_ = true;
- std::vector<io::ReadRange> ranges;
-- AddDictionaryRanges(&ranges);
-+ RETURN_NOT_OK(AddDictionaryRanges(&ranges));
- dictionary_load_finished_ =
- metadata_cache_->WaitFor(std::move(ranges)).Then([this] {
- return ReadDictionaries();
- });
- }
-+ return Status::OK();
- }
-
- Status WaitForDictionaryReadFinished() {
-@@ -1641,7 +1683,7 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
-
- Future<> WaitForMetadatas(const std::vector<int>& indices) {
- std::vector<io::ReadRange> ranges;
-- AddMetadataRanges(indices, &ranges);
-+ RETURN_NOT_OK(AddMetadataRanges(indices, &ranges));
- return metadata_cache_->WaitFor(std::move(ranges));
- }
-
-@@ -1685,12 +1727,13 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
- const flatbuf::RecordBatch* batch,
- IpcReadContext context, io::RandomAccessFile* file,
- std::shared_ptr<io::RandomAccessFile> owned_file,
-- int64_t block_data_offset)
-+ int64_t block_data_offset, int64_t block_data_length)
- : schema(std::move(sch)),
- context(std::move(context)),
- file(file),
- owned_file(std::move(owned_file)),
-- loader(batch, context.metadata_version, context.options, block_data_offset),
-+ loader(batch, context.metadata_version, context.options, block_data_offset,
-+ block_data_length),
- columns(schema->num_fields()),
- cache(file, file->io_context(), io::CacheOptions::LazyDefaults()),
- length(batch->length()) {}
-@@ -1789,14 +1832,15 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
- return dictionary_load_finished_.Then([message_fut] { return message_fut; })
- .Then([this, index](const std::shared_ptr<Message>& message_obj)
- -> Future<std::shared_ptr<RecordBatch>> {
-- FileBlock block = GetRecordBatchBlock(index);
-+ ARROW_ASSIGN_OR_RAISE(auto block, GetRecordBatchBlock(index));
- ARROW_ASSIGN_OR_RAISE(auto message, GetFlatbufMessage(message_obj));
- ARROW_ASSIGN_OR_RAISE(auto batch, GetBatchFromMessage(message));
- ARROW_ASSIGN_OR_RAISE(auto context, GetIpcReadContext(message, batch));
-
- auto read_context = std::make_shared<CachedRecordBatchReadContext>(
- schema_, batch, std::move(context), file_, owned_file_,
-- block.offset + static_cast<int64_t>(block.metadata_length));
-+ block.offset + static_cast<int64_t>(block.metadata_length),
-+ block.body_length);
- RETURN_NOT_OK(read_context->CalculateLoadRequest());
- return read_context->ReadAsync().Then(
- [read_context] { return read_context->CreateRecordBatch(); });
-@@ -1915,25 +1959,31 @@ Future<WholeIpcFileRecordBatchGenerator::Item>
- WholeIpcFileRecordBatchGenerator::operator()() {
- auto state = state_;
- if (!read_dictionaries_.is_valid()) {
-- std::vector<Future<std::shared_ptr<Message>>> messages(state->num_dictionaries());
-- for (int i = 0; i < state->num_dictionaries(); i++) {
-- auto block = FileBlockFromFlatbuffer(state->footer_->dictionaries()->Get(i));
-- messages[i] = ReadBlock(block);
-- }
-- auto read_messages = All(std::move(messages));
-- if (executor_) read_messages = executor_->Transfer(read_messages);
-- read_dictionaries_ = read_messages.Then(
-- [=](const std::vector<Result<std::shared_ptr<Message>>>& maybe_messages)
-- -> Status {
-- ARROW_ASSIGN_OR_RAISE(auto messages,
-- arrow::internal::UnwrapOrRaise(maybe_messages));
-- return ReadDictionaries(state.get(), std::move(messages));
-- });
-+ if (state->dictionary_load_finished_.is_valid()) {
-+ // PreBufferMetadata has started reading dictionaries in the background
-+ read_dictionaries_ = state->dictionary_load_finished_;
-+ } else {
-+ // Start reading dictionaries
-+ std::vector<Future<std::shared_ptr<Message>>> messages(state->num_dictionaries());
-+ for (int i = 0; i < state->num_dictionaries(); i++) {
-+ ARROW_ASSIGN_OR_RAISE(auto block, state->GetDictionaryBlock(i));
-+ messages[i] = ReadBlock(block);
-+ }
-+ auto read_messages = All(std::move(messages));
-+ if (executor_) read_messages = executor_->Transfer(read_messages);
-+ read_dictionaries_ = read_messages.Then(
-+ [=](const std::vector<Result<std::shared_ptr<Message>>>& maybe_messages)
-+ -> Status {
-+ ARROW_ASSIGN_OR_RAISE(auto messages,
-+ arrow::internal::UnwrapOrRaise(maybe_messages));
-+ return state->ReadDictionaries(messages);
-+ });
-+ }
- }
- if (index_ >= state_->num_record_batches()) {
- return Future<Item>::MakeFinished(IterationTraits<Item>::End());
- }
-- auto block = FileBlockFromFlatbuffer(state->footer_->recordBatches()->Get(index_++));
-+ ARROW_ASSIGN_OR_RAISE(auto block, state->GetRecordBatchBlock(index_++));
- auto read_message = ReadBlock(block);
- auto read_messages = read_dictionaries_.Then([read_message]() { return read_message; });
- // Force transfer. This may be wasteful in some cases, but ensures we get off the
-@@ -1969,16 +2019,6 @@ Future<std::shared_ptr<Message>> WholeIpcFileRecordBatchGenerator::ReadBlock(
- }
- }
-
--Status WholeIpcFileRecordBatchGenerator::ReadDictionaries(
-- RecordBatchFileReaderImpl* state,
-- std::vector<std::shared_ptr<Message>> dictionary_messages) {
-- IpcReadContext context(&state->dictionary_memo_, state->options_, state->swap_endian_);
-- for (const auto& message : dictionary_messages) {
-- RETURN_NOT_OK(state->ReadOneDictionary(message.get(), context));
-- }
-- return Status::OK();
--}
--
- Result<std::shared_ptr<RecordBatch>> WholeIpcFileRecordBatchGenerator::ReadRecordBatch(
- RecordBatchFileReaderImpl* state, Message* message) {
- CHECK_HAS_BODY(*message);
-@@ -2630,6 +2670,14 @@ Status ValidateFuzzBatch(const RecordBatch& batch) {
- return st;
- }
-
-+Status ValidateFuzzBatch(const RecordBatchWithMetadata& batch) {
-+ if (batch.batch) {
-+ RETURN_NOT_OK(ValidateFuzzBatch(*batch.batch));
-+ }
-+ // XXX do something with custom metadata?
-+ return Status::OK();
-+}
-+
- IpcReadOptions FuzzingOptions() {
- IpcReadOptions options;
- options.memory_pool = ::arrow::internal::fuzzing_memory_pool();
-@@ -2648,12 +2696,12 @@ Status FuzzIpcStream(const uint8_t* data, int64_t size) {
- Status st;
-
- while (true) {
-- std::shared_ptr<arrow::RecordBatch> batch;
-- RETURN_NOT_OK(batch_reader->ReadNext(&batch));
-- if (batch == nullptr) {
-+ ARROW_ASSIGN_OR_RAISE(auto batch, batch_reader->ReadNext());
-+ if (!batch.batch && !batch.custom_metadata) {
-+ // EOS
- break;
- }
-- st &= ValidateFuzzBatch(*batch);
-+ st &= ValidateFuzzBatch(batch);
- }
-
- return st;
-@@ -2661,20 +2709,36 @@ Status FuzzIpcStream(const uint8_t* data, int64_t size) {
-
- Status FuzzIpcFile(const uint8_t* data, int64_t size) {
- auto buffer = std::make_shared<Buffer>(data, size);
-- io::BufferReader buffer_reader(buffer);
-
-- std::shared_ptr<RecordBatchFileReader> batch_reader;
-- ARROW_ASSIGN_OR_RAISE(batch_reader,
-- RecordBatchFileReader::Open(&buffer_reader, FuzzingOptions()));
-- Status st;
-+ Status final_status;
-
-- const int n_batches = batch_reader->num_record_batches();
-- for (int i = 0; i < n_batches; ++i) {
-- ARROW_ASSIGN_OR_RAISE(auto batch, batch_reader->ReadRecordBatch(i));
-- st &= ValidateFuzzBatch(*batch);
-+ auto do_read = [&](bool pre_buffer) {
-+ io::BufferReader buffer_reader(buffer);
-+ ARROW_ASSIGN_OR_RAISE(auto batch_reader,
-+ RecordBatchFileReader::Open(&buffer_reader, FuzzingOptions()));
-+ if (pre_buffer) {
-+ // Pre-buffer all record batches
-+ RETURN_NOT_OK(batch_reader->PreBufferMetadata(/*indices=*/{}));
-+ }
-+
-+ const int n_batches = batch_reader->num_record_batches();
-+ for (int i = 0; i < n_batches; ++i) {
-+ RecordBatchWithMetadata batch;
-+ auto st = batch_reader->ReadRecordBatchWithCustomMetadata(i).Value(&batch);
-+ final_status &= st;
-+ if (!st.ok()) {
-+ continue;
-+ }
-+ final_status &= ValidateFuzzBatch(batch);
-+ }
-+ return Status::OK();
-+ };
-+
-+ for (const bool pre_buffer : {false, true}) {
-+ final_status &= do_read(pre_buffer);
- }
-
-- return st;
-+ return final_status;
- }
-
- Status FuzzIpcTensorStream(const uint8_t* data, int64_t size) {
-diff --git a/cpp/src/arrow/ipc/test_common.cc b/cpp/src/arrow/ipc/test_common.cc
-index 02e6b816c0b..ceca6d9e434 100644
---- a/cpp/src/arrow/ipc/test_common.cc
-+++ b/cpp/src/arrow/ipc/test_common.cc
-@@ -16,6 +16,7 @@
- // under the License.
-
- #include <algorithm>
-+#include <concepts>
- #include <cstdint>
- #include <functional>
- #include <memory>
-@@ -368,19 +369,27 @@ Status MakeRandomStringArray(int64_t length, bool include_nulls, MemoryPool* poo
- return builder.Finish(out);
- }
-
--template <class BuilderType>
--static Status MakeBinaryArrayWithUniqueValues(int64_t length, bool include_nulls,
-- MemoryPool* pool,
-- std::shared_ptr<Array>* out) {
-- BuilderType builder(pool);
-+template <std::derived_from<ArrayBuilder> BuilderType>
-+static Result<std::shared_ptr<Array>> MakeBinaryArrayWithUniqueValues(
-+ BuilderType builder, int64_t length, bool include_nulls) {
-+ if constexpr (std::is_base_of_v<BinaryViewBuilder, BuilderType>) {
-+ // Try to emit several variadic buffers by choosing a small block size.
-+ builder.SetBlockSize(512);
-+ }
- for (int64_t i = 0; i < length; ++i) {
- if (include_nulls && (i % 7 == 0)) {
- RETURN_NOT_OK(builder.AppendNull());
- } else {
-- RETURN_NOT_OK(builder.Append(std::to_string(i)));
-+ // Make sure that some strings are long enough to have non-inline binary views
-+ const auto base = std::to_string(i);
-+ std::string value;
-+ for (int64_t j = 0; j < 3 * (i % 10); ++j) {
-+ value += base;
-+ }
-+ RETURN_NOT_OK(builder.Append(value));
- }
- }
-- return builder.Finish(out);
-+ return builder.Finish();
- }
-
- Status MakeStringTypesRecordBatch(std::shared_ptr<RecordBatch>* out, bool with_nulls,
-@@ -390,22 +399,22 @@ Status MakeStringTypesRecordBatch(std::shared_ptr<RecordBatch>* out, bool with_n
- ArrayVector arrays;
- FieldVector fields;
-
-- auto AppendColumn = [&](auto& MakeArray) {
-- arrays.emplace_back();
-- RETURN_NOT_OK(MakeArray(length, with_nulls, default_memory_pool(), &arrays.back()));
--
-- const auto& type = arrays.back()->type();
-- fields.push_back(field(type->ToString(), type));
-+ auto AppendColumn = [&](auto builder) {
-+ ARROW_ASSIGN_OR_RAISE(auto array, MakeBinaryArrayWithUniqueValues(
-+ std::move(builder), length, with_nulls));
-+ arrays.push_back(array);
-+ fields.push_back(field(array->type()->ToString(), array->type()));
- return Status::OK();
- };
-
-- RETURN_NOT_OK(AppendColumn(MakeBinaryArrayWithUniqueValues<StringBuilder>));
-- RETURN_NOT_OK(AppendColumn(MakeBinaryArrayWithUniqueValues<BinaryBuilder>));
-- RETURN_NOT_OK(AppendColumn(MakeBinaryArrayWithUniqueValues<LargeStringBuilder>));
-- RETURN_NOT_OK(AppendColumn(MakeBinaryArrayWithUniqueValues<LargeBinaryBuilder>));
-+ auto pool = default_memory_pool();
-+ RETURN_NOT_OK(AppendColumn(StringBuilder(pool)));
-+ RETURN_NOT_OK(AppendColumn(BinaryBuilder(pool)));
-+ RETURN_NOT_OK(AppendColumn(LargeStringBuilder(pool)));
-+ RETURN_NOT_OK(AppendColumn(LargeBinaryBuilder(pool)));
- if (with_view_types) {
-- RETURN_NOT_OK(AppendColumn(MakeBinaryArrayWithUniqueValues<StringViewBuilder>));
-- RETURN_NOT_OK(AppendColumn(MakeBinaryArrayWithUniqueValues<BinaryViewBuilder>));
-+ RETURN_NOT_OK(AppendColumn(StringViewBuilder(pool)));
-+ RETURN_NOT_OK(AppendColumn(BinaryViewBuilder(pool)));
- }
-
- *out = RecordBatch::Make(schema(std::move(fields)), length, std::move(arrays));
-diff --git a/cpp/src/arrow/type.h b/cpp/src/arrow/type.h
-index f68d2dcb619..e3582056ead 100644
---- a/cpp/src/arrow/type.h
-+++ b/cpp/src/arrow/type.h
-@@ -2575,6 +2575,16 @@ constexpr bool may_have_validity_bitmap(Type::type id) {
- }
- }
-
-+constexpr bool has_variadic_buffers(Type::type id) {
-+ switch (id) {
-+ case Type::BINARY_VIEW:
-+ case Type::STRING_VIEW:
-+ return true;
-+ default:
-+ return false;
-+ }
-+}
-+
- ARROW_DEPRECATED("Deprecated in 17.0.0. Use may_have_validity_bitmap() instead.")
- constexpr bool HasValidityBitmap(Type::type id) { return may_have_validity_bitmap(id); }
-
-diff --git a/cpp/src/arrow/util/int_util_overflow.h b/cpp/src/arrow/util/int_util_overflow.h
-index 93066fecafa..69714a935a4 100644
---- a/cpp/src/arrow/util/int_util_overflow.h
-+++ b/cpp/src/arrow/util/int_util_overflow.h
-@@ -18,7 +18,9 @@
- #pragma once
-
- #include <cstdint>
-+#include <initializer_list>
- #include <limits>
-+#include <optional>
- #include <type_traits>
-
- #include "arrow/status.h"
-@@ -162,6 +164,37 @@ NON_GENERIC_OPS_WITH_OVERFLOW(DivideWithOverflow)
- #undef NON_GENERIC_OPS_WITH_OVERFLOW
- #undef NON_GENERIC_OP_WITH_OVERFLOW
-
-+// Convenience functions over an arbitrary number of arguments
-+template <typename Int>
-+std::optional<Int> AddWithOverflow(std::initializer_list<Int> vs) {
-+ if (vs.size() == 0) {
-+ return {};
-+ }
-+ auto it = vs.begin();
-+ Int v = *it++;
-+ while (it != vs.end()) {
-+ if (ARROW_PREDICT_FALSE(AddWithOverflowGeneric(v, *it++, &v))) {
-+ return {};
-+ }
-+ }
-+ return v;
-+}
-+
-+template <typename Int>
-+std::optional<Int> MultiplyWithOverflow(std::initializer_list<Int> vs) {
-+ if (vs.size() == 0) {
-+ return {};
-+ }
-+ auto it = vs.begin();
-+ Int v = *it++;
-+ while (it != vs.end()) {
-+ if (ARROW_PREDICT_FALSE(MultiplyWithOverflowGeneric(v, *it++, &v))) {
-+ return {};
-+ }
-+ }
-+ return v;
-+}
-+
- // Define function NegateWithOverflow with the signature `bool(T u, T* out)`
- // where T is a signed integer type. On overflow, these functions return true.
- // Otherwise, false is returned and `out` is updated with the result of the
-diff --git a/cpp/src/arrow/util/int_util_test.cc b/cpp/src/arrow/util/int_util_test.cc
-index 7217c1097e4..cffa4e9d15e 100644
---- a/cpp/src/arrow/util/int_util_test.cc
-+++ b/cpp/src/arrow/util/int_util_test.cc
-@@ -649,5 +649,23 @@ TYPED_TEST(TestAddWithOverflow, Basics) {
- this->CheckOk(almost_min, almost_max + T{2}, T{1});
- }
-
-+TEST(AddWithOverflow, Variadic) {
-+ ASSERT_EQ(AddWithOverflow<int>({}), std::nullopt);
-+ ASSERT_EQ(AddWithOverflow({1, 2, 3}), 6);
-+ ASSERT_EQ(AddWithOverflow<int8_t>({1, 2, 125}), std::nullopt);
-+ ASSERT_EQ(AddWithOverflow<int8_t>({125, 2, 1}), std::nullopt);
-+ ASSERT_EQ(AddWithOverflow<int16_t>({1, 2, 125}), 128);
-+ ASSERT_EQ(AddWithOverflow<int16_t>({125, 2, 1}), 128);
-+}
-+
-+TEST(MultiplyWithOverflow, Variadic) {
-+ ASSERT_EQ(MultiplyWithOverflow<int>({}), std::nullopt);
-+ ASSERT_EQ(MultiplyWithOverflow({1, 2, 3, 4}), 24);
-+ ASSERT_EQ(MultiplyWithOverflow<int8_t>({2, 2, 32}), std::nullopt);
-+ ASSERT_EQ(MultiplyWithOverflow<int8_t>({32, 4, 1}), std::nullopt);
-+ ASSERT_EQ(MultiplyWithOverflow<int16_t>({2, 2, 32}), 128);
-+ ASSERT_EQ(MultiplyWithOverflow<int16_t>({32, 4, 1}), 128);
-+}
-+
- } // namespace internal
- } // namespace arrow
Verdict❌ CHANGES REQUESTED — Please address the issues flagged above. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.

Merge Checklist
All boxes should be checked before merging the PR (just tick any boxes which don't apply to this PR)
*-staticsubpackages, etc.) have had theirReleasetag incremented../cgmanifest.json,./toolkit/scripts/toolchain/cgmanifest.json,.github/workflows/cgmanifest.json)./LICENSES-AND-NOTICES/SPECS/data/licenses.json,./LICENSES-AND-NOTICES/SPECS/LICENSES-MAP.md,./LICENSES-AND-NOTICES/SPECS/LICENSE-EXCEPTIONS.PHOTON)*.signatures.jsonfilessudo make go-tidy-allandsudo make go-test-coveragepassSummary
What does the PR accomplish, why was it needed?
Patch libarrow for CVE-2026-25087
Astrolabe Patch Reference: apache/arrow#48925
Upstream patch reference: https://patch-diff.githubusercontent.com/raw/apache/arrow/pull/48925.patch
Patch Modified: Yes
reader.cc file has modified to align with upstream patch
Change Log
Does this affect the toolchain?
NO
Associated issues
Links to CVEs
Test Methodology
Installation:


Uninstallation:
