[SPARK-56438][SQL][CORE] Optimize VectorizedPlainValuesReader.readBinary for direct ByteBuffer by eliminating intermediate byte[] copy#55296
Conversation
| * Returns the native memory address of a direct {@link ByteBuffer}. | ||
| * The buffer must be direct; passing a heap buffer produces an undefined result. | ||
| */ | ||
| public static long getDirectBufferAddress(ByteBuffer buffer) { |
There was a problem hiding this comment.
If this pr is accepted, a corresponding benchmark can be added to PlatformBenchmark at a later time.
There was a problem hiding this comment.
+1. It'd be great to have some benchmark results!
There was a problem hiding this comment.
already add in this pr
sunchao
left a comment
There was a problem hiding this comment.
Looks reasonable to me. Left some comments.
| Platform.copyMemory(src.array(), Platform.BYTE_ARRAY_OFFSET + src.arrayOffset() + srcIndex, | ||
| null, data + rowId, count); | ||
| } else { | ||
| long srcAddr = Platform.getDirectBufferAddress(src) + srcIndex; |
There was a problem hiding this comment.
I think hasArray() does not necessarily mean the buffer is a direct buffer. We can perhaps strengthen this by also check src.isDirect?
| * Copies {@code count} bytes from a {@link ByteBuffer} starting at absolute position | ||
| * {@code srcIndex} into this column at {@code rowId}. Does not modify the buffer's position. | ||
| */ | ||
| public abstract void putBytes(int rowId, int count, ByteBuffer src, int srcIndex); |
There was a problem hiding this comment.
Anyway we can add some test cases for this new API?
| verifyPutByteArray(testVector) | ||
| } | ||
|
|
||
| testVectors("putBytes from ByteBuffer", 16, ByteType) { testVector => |
There was a problem hiding this comment.
@sunchao Added new tests for both new APIs.
…17, Scala 2.13, split 1 of 1)
| DirectBuffer Address Access: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative | ||
| ------------------------------------------------------------------------------------------------------------------------ | ||
| getLong (baseline) 156 159 2 640.4 1.6 1.0X | ||
| getDirectBufferAddress 325 327 2 308.1 3.2 0.5X |
There was a problem hiding this comment.
With 8 million distinct objects/addresses per iteration, array cycling causes CPU cache misses in both scenarios. However, getDirectBufferAddress incurs additional overhead because it dereferences a Java object (ByteBuffer) to read its address field, whereas the baseline getLong reads directly from a raw address with no object indirection.
This is a fair comparison under real, cache-cold conditions. The 0.5x performance ratio therefore reflects the cost of object field access compared to raw pointer reads — which is the true extra overhead in a non-cached scenario. In real-world usage, readBinary reuses the same VectorizedPlainValuesReader instance (and the same ByteBuffer) in a loop. As a result, the hot-cache performance is far better than the current numbers: reusing the ByteBuffer yields 5–8x the baseline performance due to cached objects. Still, the cache-cold benchmark numbers are more defensible for a formal benchmark.
…21, Scala 2.13, split 1 of 1)
…25, Scala 2.13, split 1 of 1)
…25, Scala 2.13, split 1 of 1)
…25, Scala 2.13, split 1 of 1)
What changes were proposed in this pull request?
This PR optimizes
VectorizedPlainValuesReader.readBinaryfor the direct (non-heap)ByteBufferpath by eliminating an intermediatebyte[]copy.Previously, when reading binary/string values from a direct
ByteBuffer, each value required:byte[len]ByteBufferinto the temp arrayThis PR adds
ByteBuffer-awareputBytes/putByteArrayoverloads toWritableColumnVector, enabling a single-copy path:OnHeapColumnVector: usesByteBuffer.get(index, byte[], offset, length)(absolute bulk get) to copy directly into the backingbyte[]— one native-to-heap copy.OffHeapColumnVector: usesPlatform.copyMemorywith the direct buffer's native address — one native-to-native copy.A
Platform.getDirectBufferAddress(ByteBuffer)helper is added to read aDirectByteBuffer's native address via theBuffer.addressfield offset, consistent withPlatform's existingUnsafe-based accessor pattern.Why are the changes needed?
In Spark's vectorized Parquet reader, binary/string columns from memory-mapped (direct)
ByteBuffersources incur two fullmemcpyoperations per value. The intermediatebyte[]allocation also adds GC pressure.Eliminating one copy per value yields a 10–22% improvement on the default
DIRECT/ON_HEAPpath across JDK 17, 21, and 25.Does this PR introduce any user-facing change?
No.
How was this patch tested?
Benchmark Code (click to expand)
Perform
build/sbt "sql/Test/runMain org.apache.spark.sql.execution.datasources.parquet.VectorizedPlainValuesReaderBenchmark"to conduct the testBenchmark results(click to expand)
Across JDK 17, 21 and 25, the default
DIRECT/ON_HEAPpath achieves a performance improvement of 10%–22% andHEAPpath results are unchanged as expected (code path not affected).Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code