diff --git a/CHANGES.txt b/CHANGES.txt index 0f61587cd198..1d0f44e63f3d 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,7 +1,8 @@ 5.1 + * Wire compaction_read_disk_access_mode through cursor-based compaction (CASSANDRA-21147) * Introduce created_at column to system_distributed.compression_dictionaries (CASSANDRA-21178) * Be able to detect and remove orphaned compression dictionaries (CASSANDRA-21157) - * Fix BigTableVerifier to only read a data file during extended verification (CASSANDRA-21150) + * Fix BigTableVerifier to only read a data file during extended verification (CASSANDRA-21150) * Reduce memory allocation during transformation of BatchStatement to Mutation (CASSANDRA-21141) * Direct I/O support for compaction reads (CASSANDRA-19987) * Support custom StartupCheck implementations via SPI (CASSANDRA-21093) diff --git a/src/java/org/apache/cassandra/db/compaction/CursorCompactor.java b/src/java/org/apache/cassandra/db/compaction/CursorCompactor.java index 3b4819c15670..7d528b5e2d9d 100644 --- a/src/java/org/apache/cassandra/db/compaction/CursorCompactor.java +++ b/src/java/org/apache/cassandra/db/compaction/CursorCompactor.java @@ -27,11 +27,11 @@ import java.util.function.LongPredicate; import com.google.common.collect.ImmutableSet; -import com.google.common.collect.UnmodifiableIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.config.Config.DiskAccessMode; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.AbstractCompactionController; import org.apache.cassandra.db.ClusteringComparator; @@ -70,6 +70,7 @@ import org.apache.cassandra.schema.CompactionParams; import org.apache.cassandra.schema.SchemaConstants; import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.utils.Throwables; import org.apache.cassandra.utils.TimeUUID; import static org.apache.cassandra.db.ClusteringPrefix.Kind.EXCL_END_BOUND; @@ -294,15 +295,8 @@ private CursorCompactor(OperationType type, * {@link CompactionIterator#CompactionIterator(OperationType, List, AbstractCompactionController, long, TimeUUID, ActiveCompactionsTracker)} */ - // Convert Readers to Cursors - this.sstableCursors = new StatefulCursor[sstables.size()]; + this.sstableCursors = convertScannersToCursors(scanners, sstables, DatabaseDescriptor.getCompactionReadDiskAccessMode()); this.sstableCursorsEqualsNext = new boolean[sstables.size()]; - UnmodifiableIterator iterator = sstables.iterator(); - for (int i = 0; i < this.sstableCursors.length; i++) - { - SSTableReader ssTableReader = iterator.next(); - this.sstableCursors[i] = new StatefulCursor(ssTableReader); - } this.enforceStrictLiveness = controller.cfs.metadata.get().enforceStrictLiveness(); purger = new Purger(type, controller, nowInSec); @@ -1553,6 +1547,33 @@ private static String mergeHistogramToString(long[] histogram) return sb.toString(); } + /** + * Closes scanner-opened readers before opening cursor-specific readers with the configured disk access mode. + * In cursor compaction, scanners are only used for metadata; closing them avoids holding redundant file + * descriptors and prevents conflicts when scan and non-scan readers for the same file share thread-local + * buffer state on the same thread. + */ + private static StatefulCursor[] convertScannersToCursors(List scanners, ImmutableSet sstables, + DiskAccessMode diskAccessMode) + { + for (ISSTableScanner scanner : scanners) + scanner.close(); + + StatefulCursor[] cursors = new StatefulCursor[sstables.size()]; + int i = 0; + try + { + for (SSTableReader reader : sstables) + cursors[i++] = new StatefulCursor(reader, diskAccessMode); + return cursors; + } + catch (RuntimeException | Error e) + { + Throwables.closeNonNullAndAddSuppressed(e, cursors); + throw e; + } + } + public void close() { try diff --git a/src/java/org/apache/cassandra/db/compaction/StatefulCursor.java b/src/java/org/apache/cassandra/db/compaction/StatefulCursor.java index c28d218ac321..f0d81a26c182 100644 --- a/src/java/org/apache/cassandra/db/compaction/StatefulCursor.java +++ b/src/java/org/apache/cassandra/db/compaction/StatefulCursor.java @@ -19,6 +19,7 @@ package org.apache.cassandra.db.compaction; import org.apache.cassandra.config.Config; +import org.apache.cassandra.config.Config.DiskAccessMode; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.ReusableLivenessInfo; @@ -55,9 +56,9 @@ class StatefulCursor extends SSTableCursorReader private boolean isOpenRangeTombstonePresent = false; - public StatefulCursor(SSTableReader reader) + public StatefulCursor(SSTableReader reader, DiskAccessMode diskAccessMode) { - super(reader); + super(reader, diskAccessMode); currPartition = new PartitionDescriptor(reader.getPartitioner().createReusableKey(0)); prevPartition = new PartitionDescriptor(reader.getPartitioner().createReusableKey(0)); unfiltered = new UnfilteredDescriptor(reader.header.clusteringTypes().toArray(AbstractType[]::new)); diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableCursorReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableCursorReader.java index ccf94ce6dac8..9d6a2b990faa 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableCursorReader.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableCursorReader.java @@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableList; +import org.apache.cassandra.config.Config.DiskAccessMode; import org.apache.cassandra.db.ClusteringPrefix; import org.apache.cassandra.db.Columns; import org.apache.cassandra.db.DeletionTime; @@ -197,15 +198,20 @@ public static SSTableCursorReader fromDescriptor(Descriptor desc) throws IOExcep { TableMetadata metadata = Util.metadataFromSSTable(desc); SSTableReader reader = SSTableReader.openNoValidation(null, desc, TableMetadataRef.forOfflineTools(metadata)); - return new SSTableCursorReader(reader, metadata, reader.ref()); + return new SSTableCursorReader(reader, metadata, reader.ref(), null); } public SSTableCursorReader(SSTableReader reader) { - this(reader, reader.metadata(), null); + this(reader, reader.metadata(), null, null); } - private SSTableCursorReader(SSTableReader reader, TableMetadata metadata, Ref readerRef) + public SSTableCursorReader(SSTableReader reader, DiskAccessMode diskAccessMode) + { + this(reader, reader.metadata(), null, diskAccessMode); + } + + private SSTableCursorReader(SSTableReader reader, TableMetadata metadata, Ref readerRef, DiskAccessMode diskAccessMode) { ssTableReader = reader; ssTableReaderRef = readerRef; @@ -221,7 +227,7 @@ private SSTableCursorReader(SSTableReader reader, TableMetadata metadata, Ref params() + { + return Arrays.asList(new Object[]{ DiskAccessMode.standard, true }, + new Object[]{ DiskAccessMode.standard, false }, + new Object[]{ DiskAccessMode.direct, true }, + new Object[]{ DiskAccessMode.direct, false }); + } + + private DiskAccessMode originalDiskAccessMode; + private boolean originalCursorCompactionEnabled; + + @Before + public void setCompactionParams() + { + originalDiskAccessMode = DatabaseDescriptor.getCompactionReadDiskAccessMode(); + originalCursorCompactionEnabled = DatabaseDescriptor.cursorCompactionEnabled(); + DatabaseDescriptor.setCompactionReadDiskAccessMode(compactionReadDiskAccessMode); + DatabaseDescriptor.setCursorCompactionEnabled(cursorCompactionEnabled); + } + + @After + public void restoreCompactionParams() + { + DatabaseDescriptor.setCompactionReadDiskAccessMode(originalDiskAccessMode); + DatabaseDescriptor.setCursorCompactionEnabled(originalCursorCompactionEnabled); + } + @BeforeClass public static void defineSchema() throws ConfigurationException { diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java index 8d9a8dcaa777..31ae05045d16 100644 --- a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java @@ -19,6 +19,7 @@ package org.apache.cassandra.db.compaction; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -26,13 +27,19 @@ import java.util.Map; import java.util.concurrent.TimeUnit; +import org.junit.After; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Ignore; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.Util; import org.apache.cassandra.config.CassandraRelevantProperties; +import org.apache.cassandra.config.Config.DiskAccessMode; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.cql3.ColumnIdentifier; import org.apache.cassandra.db.Clustering; import org.apache.cassandra.db.ColumnFamilyStore; @@ -81,6 +88,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +@RunWith(Parameterized.class) public class CompactionsTest { private static final String KEYSPACE1 = "Keyspace1"; @@ -89,6 +97,41 @@ public class CompactionsTest private static final String CF_STANDARD2 = "Standard2"; private static final String CF_STANDARD3 = "Standard3"; private static final String CF_STANDARD4 = "Standard4"; + + @Parameterized.Parameter(0) + public DiskAccessMode compactionReadDiskAccessMode; + + @Parameterized.Parameter(1) + public boolean cursorCompactionEnabled; + + @Parameterized.Parameters(name = "diskAccessMode={0},cursor={1}") + public static Collection params() + { + return Arrays.asList(new Object[]{ DiskAccessMode.standard, true }, + new Object[]{ DiskAccessMode.standard, false }, + new Object[]{ DiskAccessMode.direct, true }, + new Object[]{ DiskAccessMode.direct, false }); + } + + private DiskAccessMode originalDiskAccessMode; + private boolean originalCursorCompactionEnabled; + + @Before + public void setCompactionParams() + { + originalDiskAccessMode = DatabaseDescriptor.getCompactionReadDiskAccessMode(); + originalCursorCompactionEnabled = DatabaseDescriptor.cursorCompactionEnabled(); + DatabaseDescriptor.setCompactionReadDiskAccessMode(compactionReadDiskAccessMode); + DatabaseDescriptor.setCursorCompactionEnabled(cursorCompactionEnabled); + } + + @After + public void restoreCompactionParams() + { + DatabaseDescriptor.setCompactionReadDiskAccessMode(originalDiskAccessMode); + DatabaseDescriptor.setCursorCompactionEnabled(originalCursorCompactionEnabled); + } + @BeforeClass public static void defineSchema() throws ConfigurationException { @@ -260,8 +303,9 @@ public static void assertMaxTimestamp(ColumnFamilyStore cfs, long maxTimestampEx public void testUserDefinedCompaction() throws Exception { Keyspace keyspace = Keyspace.open(KEYSPACE1); - final String cfname = "Standard3"; // use clean(no sstable) CF + final String cfname = "Standard3"; ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfname); + cfs.clearUnsafe(); TableMetadata table = cfs.metadata(); // disable compaction while flushing diff --git a/test/unit/org/apache/cassandra/db/compaction/simple/SimpleCompactionTest.java b/test/unit/org/apache/cassandra/db/compaction/simple/SimpleCompactionTest.java index 27c7ba7e17a4..d5f7eedf6203 100644 --- a/test/unit/org/apache/cassandra/db/compaction/simple/SimpleCompactionTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/simple/SimpleCompactionTest.java @@ -19,18 +19,61 @@ package org.apache.cassandra.db.compaction.simple; import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; import java.util.concurrent.ExecutionException; +import org.junit.After; import org.junit.AfterClass; +import org.junit.Before; import org.junit.Ignore; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.apache.cassandra.config.Config.DiskAccessMode; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.cql3.CQLTester; import org.apache.cassandra.utils.TestHelper; @Ignore +@RunWith(Parameterized.class) public abstract class SimpleCompactionTest extends CQLTester { + @Parameterized.Parameter(0) + public DiskAccessMode compactionReadDiskAccessMode; + + @Parameterized.Parameter(1) + public boolean cursorCompactionEnabled; + + @Parameterized.Parameters(name = "diskAccessMode={0},cursor={1}") + public static Collection params() + { + return Arrays.asList(new Object[]{ DiskAccessMode.standard, true }, + new Object[]{ DiskAccessMode.standard, false }, + new Object[]{ DiskAccessMode.direct, true }, + new Object[]{ DiskAccessMode.direct, false }); + } + + private DiskAccessMode originalDiskAccessMode; + private boolean originalCursorCompactionEnabled; + + @Before + public void setCompactionParams() + { + originalDiskAccessMode = DatabaseDescriptor.getCompactionReadDiskAccessMode(); + originalCursorCompactionEnabled = DatabaseDescriptor.cursorCompactionEnabled(); + DatabaseDescriptor.setCompactionReadDiskAccessMode(compactionReadDiskAccessMode); + DatabaseDescriptor.setCursorCompactionEnabled(cursorCompactionEnabled); + } + + @After + public void restoreCompactionParams() + { + DatabaseDescriptor.setCompactionReadDiskAccessMode(originalDiskAccessMode); + DatabaseDescriptor.setCursorCompactionEnabled(originalCursorCompactionEnabled); + } + @AfterClass public static void teardown() throws IOException, InterruptedException, ExecutionException { diff --git a/test/unit/org/apache/cassandra/io/sstable/format/SSTableReaderDataReaderTest.java b/test/unit/org/apache/cassandra/io/sstable/format/SSTableReaderDataReaderTest.java new file mode 100644 index 000000000000..49165b6577c4 --- /dev/null +++ b/test/unit/org/apache/cassandra/io/sstable/format/SSTableReaderDataReaderTest.java @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.io.sstable.format; + +import java.util.ArrayList; +import java.util.List; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.config.Config.DiskAccessMode; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.RowUpdateBuilder; +import org.apache.cassandra.db.compaction.CompactionManager; +import org.apache.cassandra.io.util.ChannelProxy; +import org.apache.cassandra.io.util.RandomAccessReader; +import org.apache.cassandra.schema.CompressionParams; +import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.Throwables; +import org.apache.cassandra.utils.concurrent.Ref; + +import static org.apache.cassandra.db.ColumnFamilyStore.FlushReason.UNIT_TESTS; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; + +/** + * Tests for {@code SSTableReader#canReuseDfile} / {@code SSTableReader#openDataReaderInternal}. + */ +public class SSTableReaderDataReaderTest +{ + private static final String KEYSPACE = "SSTableReaderDataReaderTest"; + private static final String CF_UNCOMPRESSED = "Uncompressed"; + private static final String CF_COMPRESSED = "Compressed"; + + private static DiskAccessMode originalDiskAccessMode; + private final List> refsToRelease = new ArrayList<>(); + + @BeforeClass + public static void defineSchema() throws Exception + { + SchemaLoader.prepareServer(); + originalDiskAccessMode = DatabaseDescriptor.getDiskAccessMode(); + DatabaseDescriptor.setDiskAccessMode(DiskAccessMode.standard); + SchemaLoader.createKeyspace(KEYSPACE, + KeyspaceParams.simple(1), + SchemaLoader.standardCFMD(KEYSPACE, CF_UNCOMPRESSED) + .compression(CompressionParams.noCompression()), + SchemaLoader.standardCFMD(KEYSPACE, CF_COMPRESSED) + .compression(CompressionParams.DEFAULT)); + CompactionManager.instance.disableAutoCompaction(); + } + + @AfterClass + public static void restoreConfiguration() + { + DatabaseDescriptor.setDiskAccessMode(originalDiskAccessMode); + } + + @After + public void teardown() + { + Throwable exceptions = null; + for (Ref ref : refsToRelease) + { + try + { + ref.close(); + } + catch (Throwable t) + { + exceptions = Throwables.merge(exceptions, t); + } + } + refsToRelease.clear(); + Throwables.maybeFail(exceptions); + } + + @Test + public void testNullModeReusesExistingDfile() + { + SSTableReader sstable = createSSTable(CF_UNCOMPRESSED); + + try (RandomAccessReader reader = sstable.openDataReader()) + { + assertReaderSharesDfileChannel(sstable, reader); + } + } + + @Test + public void testSameModeReusesExistingDfile() + { + SSTableReader sstable = createSSTable(CF_UNCOMPRESSED); + + try (RandomAccessReader reader = sstable.openDataReader(sstable.dfile.diskAccessMode())) + { + assertReaderSharesDfileChannel(sstable, reader); + } + } + + @Test + public void testDirectOnUnsupportedFallsBackToReuse() + { + SSTableReader sstable = createSSTable(CF_UNCOMPRESSED); + assertFalse("Uncompressed SSTable should not support direct IO", + sstable.dfile.supportsDirectIO()); + + try (RandomAccessReader reader = sstable.openDataReader(DiskAccessMode.direct)) + { + assertReaderSharesDfileChannel(sstable, reader); + } + } + + @Test + public void testDirectOnCompressedCreatesNewHandle() + { + SSTableReader sstable = createSSTable(CF_COMPRESSED); + + try (RandomAccessReader reader = sstable.openDataReader(DiskAccessMode.direct)) + { + assertReaderHasOwnChannel(sstable, reader); + } + } + + @Test + public void testNewHandleCloseDoesNotAffectOriginalDfile() + { + SSTableReader sstable = createSSTable(CF_COMPRESSED); + + RandomAccessReader reader = sstable.openDataReader(DiskAccessMode.direct); + ChannelProxy newChannel = reader.getChannel(); + assertNotSame(sstable.dfile.channel, newChannel); + + reader.close(); + + assertTrue("New handle's channel should be cleaned up after reader close", + newChannel.isCleanedUp()); + assertFalse("Original dfile channel should not be affected", + sstable.dfile.channel.isCleanedUp()); + + try (RandomAccessReader reader2 = sstable.openDataReader()) + { + assertReaderSharesDfileChannel(sstable, reader2); + } + } + + @Test + public void testReusedReaderCloseDoesNotAffectDfile() + { + SSTableReader sstable = createSSTable(CF_UNCOMPRESSED); + + RandomAccessReader reader = sstable.openDataReader(); + ChannelProxy channel = reader.getChannel(); + assertSame(sstable.dfile.channel, channel); + + reader.close(); + + assertFalse("Dfile channel should not be cleaned up after reused reader close", + channel.isCleanedUp()); + + try (RandomAccessReader reader2 = sstable.openDataReader()) + { + assertReaderSharesDfileChannel(sstable, reader2); + } + } + + @Test + public void testMultipleNewHandleReadersDoNotLeakResources() + { + SSTableReader sstable = createSSTable(CF_COMPRESSED); + + ChannelProxy[] newChannels = new ChannelProxy[3]; + for (int i = 0; i < 3; i++) + { + try (RandomAccessReader reader = sstable.openDataReader(DiskAccessMode.direct)) + { + newChannels[i] = reader.getChannel(); + assertNotSame(sstable.dfile.channel, newChannels[i]); + } + } + + for (int i = 0; i < 3; i++) + assertTrue("New handle channel " + i + " should be cleaned up", + newChannels[i].isCleanedUp()); + + assertFalse(sstable.dfile.channel.isCleanedUp()); + try (RandomAccessReader reader = sstable.openDataReader()) + { + assertReaderSharesDfileChannel(sstable, reader); + assertEquals(0, reader.getFilePointer()); + } + } + + @Test + public void testForScanReusesWithNullMode() + { + SSTableReader sstable = createSSTable(CF_UNCOMPRESSED); + + try (RandomAccessReader reader = sstable.openDataReaderForScan()) + { + assertReaderSharesDfileChannel(sstable, reader); + } + } + + @Test + public void testForScanCreatesNewHandleWithDirect() + { + SSTableReader sstable = createSSTable(CF_COMPRESSED); + + try (RandomAccessReader reader = sstable.openDataReaderForScan(DiskAccessMode.direct)) + { + assertReaderHasOwnChannel(sstable, reader); + } + } + + private void assertReaderSharesDfileChannel(SSTableReader sstable, RandomAccessReader reader) + { + assertNotNull(reader); + assertSame("Reader should share the dfile's channel (reuse path)", + sstable.dfile.channel, reader.getChannel()); + } + + private void assertReaderHasOwnChannel(SSTableReader sstable, RandomAccessReader reader) + { + assertNotNull(reader); + assertNotSame("Reader should have its own channel (new handle path)", + sstable.dfile.channel, reader.getChannel()); + } + + private SSTableReader createSSTable(String cf) + { + Keyspace keyspace = Keyspace.open(KEYSPACE); + ColumnFamilyStore store = keyspace.getColumnFamilyStore(cf); + store.clearUnsafe(); + + long timestamp = System.currentTimeMillis(); + for (int i = 0; i < 10; i++) + { + new RowUpdateBuilder(store.metadata(), timestamp, String.valueOf(i)) + .clustering("col") + .add("val", ByteBufferUtil.EMPTY_BYTE_BUFFER) + .build() + .applyUnsafe(); + } + store.forceBlockingFlush(UNIT_TESTS); + + SSTableReader sstable = store.getLiveSSTables().iterator().next(); + refsToRelease.add(sstable.selfRef()); + return sstable; + } +} diff --git a/test/unit/org/apache/cassandra/io/util/DirectThreadLocalReadAheadBufferTest.java b/test/unit/org/apache/cassandra/io/util/DirectThreadLocalReadAheadBufferTest.java index 84b60bc5c8da..46b3091f1167 100644 --- a/test/unit/org/apache/cassandra/io/util/DirectThreadLocalReadAheadBufferTest.java +++ b/test/unit/org/apache/cassandra/io/util/DirectThreadLocalReadAheadBufferTest.java @@ -17,8 +17,12 @@ */ package org.apache.cassandra.io.util; +import java.lang.management.BufferPoolMXBean; +import java.lang.management.ManagementFactory; import java.util.Arrays; +import java.util.List; +import org.junit.Assert; import org.junit.Test; import org.apache.cassandra.utils.Pair; @@ -61,4 +65,49 @@ private void testReads(InputData propertyInputs, int bufferSize, int blockSize) tlrab.close(); } } + + @Test + public void testDirectMemoryIsCleanedOnClose() + { + BufferPoolMXBean directPool = getDirectBufferPool(); + int blockSize = FileUtils.getFileBlockSize(files[0]); + int bufferSize = 64 * 1024 * 1024; // 64MB - large enough to reliably detect + + try (ChannelProxy channel = new ChannelProxy(files[0], ChannelProxy.IOMode.DIRECT)) + { + DirectThreadLocalReadAheadBuffer tlrab = + new DirectThreadLocalReadAheadBuffer(channel, bufferSize, blockSize); + + // Force buffer allocation + tlrab.allocateBuffer(); + + long memoryUsedBefore = directPool.getMemoryUsed(); + + // Close should clean the direct memory + tlrab.close(); + + long memoryUsedAfter = directPool.getMemoryUsed(); + + // Memory should decrease by approximately buffer size (+ alignment overhead) + long expectedDecrease = bufferSize; + long actualDecrease = memoryUsedBefore - memoryUsedAfter; + + Assert.assertTrue( + "Direct memory should decrease after close(). " + + "Before: " + memoryUsedBefore + ", After: " + memoryUsedAfter + + ", Expected decrease: ~" + expectedDecrease + ", Actual: " + actualDecrease, + actualDecrease >= expectedDecrease * 0.9); // 10% tolerance for alignment + } + } + + private static BufferPoolMXBean getDirectBufferPool() + { + List pools = ManagementFactory.getPlatformMXBeans(BufferPoolMXBean.class); + for (BufferPoolMXBean pool : pools) + if (pool.getName().equals("direct")) + return pool; + + throw new IllegalStateException("Direct buffer pool not found"); + } + } \ No newline at end of file diff --git a/test/unit/org/apache/cassandra/transport/WriteBytesTest.java b/test/unit/org/apache/cassandra/transport/WriteBytesTest.java index 19a70e999fd6..987ec78c52aa 100644 --- a/test/unit/org/apache/cassandra/transport/WriteBytesTest.java +++ b/test/unit/org/apache/cassandra/transport/WriteBytesTest.java @@ -18,6 +18,8 @@ package org.apache.cassandra.transport; +import java.nio.ByteBuffer; + import org.assertj.core.api.Assertions; import org.junit.Test; @@ -26,6 +28,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; +import sun.nio.ch.DirectBuffer; import static org.quicktheories.QuickTheory.qt; @@ -52,7 +55,11 @@ public void test() Assertions.assertThat(buf.writerIndex()).isEqualTo(size); for (int i = 0; i < size; i++) Assertions.assertThat(buf.getByte(buf.readerIndex() + i)).describedAs("byte mismatch at index %d", i).isEqualTo(bb.get(bb.position() + i)); - MemoryUtil.clean(bb); + + if (bb.isDirect()) { + Object attachment = ((DirectBuffer) bb).attachment(); + MemoryUtil.clean(attachment == null ? bb : (ByteBuffer) attachment); + } }); } diff --git a/test/unit/org/apache/cassandra/utils/memory/MemoryUtilTest.java b/test/unit/org/apache/cassandra/utils/memory/MemoryUtilTest.java index 842823a2d54c..1b06274daeed 100644 --- a/test/unit/org/apache/cassandra/utils/memory/MemoryUtilTest.java +++ b/test/unit/org/apache/cassandra/utils/memory/MemoryUtilTest.java @@ -50,35 +50,31 @@ public void testClean() } @Test - public void testCleanViewDoesNotThrow() + public void testCleanSliceThrows() { - // Use a large buffer to likely get mmap'd memory from the OS. This ensures that if cleaning a view incorrectly - // unmaps the original buffer's memory, subsequent access to 'original' would more reliably fail. - // For context: glibc's mmap threshold is 32MB on 64-bit systems - ByteBuffer original = ByteBuffer.allocateDirect(64 * 1024 * 1024); - + ByteBuffer original = ByteBuffer.allocateDirect(1024); ByteBuffer slice = original.slice(); - MemoryUtil.clean(slice); - try - { - original.putInt(10); - } - catch (Exception exc) - { - Assertions.fail("Unable to write to original buffer after cleaning (slice). " + exc.getMessage(), exc); - } + Assertions.assertThatThrownBy(() -> MemoryUtil.clean(slice)) + .isInstanceOf(IllegalArgumentException.class); + + // original should still be usable after the rejected clean + original.putInt(10); + MemoryUtil.clean(original); + } + + @Test + public void testCleanDuplicateThrows() + { + ByteBuffer original = ByteBuffer.allocateDirect(1024); ByteBuffer duplicate = original.duplicate(); - MemoryUtil.clean(duplicate); - try - { - original.putInt(10); - } - catch (Exception exc) - { - Assertions.fail("Unable to write to original buffer after cleaning (duplicate). " + exc.getMessage(), exc); - } + Assertions.assertThatThrownBy(() -> MemoryUtil.clean(duplicate)) + .isInstanceOf(IllegalArgumentException.class); + + // original should still be usable after the rejected clean + original.putInt(10); + MemoryUtil.clean(original); } @Test