From a669f5d0c4f652dcebc70503086b8f9d1513e740 Mon Sep 17 00:00:00 2001 From: zifeif2 Date: Mon, 13 Apr 2026 20:47:26 +0000 Subject: [PATCH 1/4] initial commit --- .../apache/spark/sql/internal/SQLConf.scala | 3 +- .../streaming/state/StateStoreConf.scala | 3 +- ...cksDBCheckpointFailureInjectionSuite.scala | 162 ++++---- .../state/RocksDBStateStoreSuite.scala | 366 +++++++++++++++++ .../streaming/state/RocksDBSuite.scala | 27 +- .../streaming/state/StateStoreSuite.scala | 379 +----------------- 6 files changed, 470 insertions(+), 470 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 05fdb6bca9320..409ecd2eddbed 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -3774,7 +3774,8 @@ object SQLConf { buildConf("spark.sql.streaming.checkpoint.fileChecksum.enabled") .internal() .doc("When true, checksum would be generated and verified for checkpoint files. " + - "This is used to detect file corruption.") + "This is used to detect file corruption. This is only enabled when " + + "STATE_STORE_CHECKPOINT_FORMAT_VERSION >= 2") .version("4.1.0") .booleanConf .createWithDefault(true) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala index 5a3e875541d02..937cf1ed684a0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala @@ -114,7 +114,8 @@ class StateStoreConf( val compressionCodec: String = sqlConf.stateStoreCompressionCodec /** Whether file checksum generation and verification is enabled. */ - val checkpointFileChecksumEnabled: Boolean = sqlConf.checkpointFileChecksumEnabled + val checkpointFileChecksumEnabled: Boolean = + sqlConf.checkpointFileChecksumEnabled && sqlConf.enableStateStoreCheckpointIds /** Number of threads for the file checksum thread pool (0 to disable). */ val fileChecksumThreadPoolSize: Int = sqlConf.stateStoreFileChecksumThreadPoolSize diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBCheckpointFailureInjectionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBCheckpointFailureInjectionSuite.scala index c48b492e27c27..b0220c3396261 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBCheckpointFailureInjectionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBCheckpointFailureInjectionSuite.scala @@ -634,9 +634,8 @@ class RocksDBCheckpointFailureInjectionSuite extends StreamTest * enabled/disabled and the different behavior is shown in the test */ + // File checksum is disabled for checkpoint format version 1, so we only test version 2. Seq( - FailureConf3(skipCreationIfFileMissingChecksum = false, checkpointFormatVersion = "1"), - FailureConf3(skipCreationIfFileMissingChecksum = true, checkpointFormatVersion = "1"), FailureConf3(skipCreationIfFileMissingChecksum = false, checkpointFormatVersion = "2"), FailureConf3(skipCreationIfFileMissingChecksum = true, checkpointFormatVersion = "2") ).foreach { failureConf => @@ -770,60 +769,25 @@ class RocksDBCheckpointFailureInjectionSuite extends StreamTest assert((new File(checkpointDir, "commits/1")).exists()) assert((new File(checkpointDir, "commits/2")).exists()) - val failureCase = - !failureConf.skipCreationIfFileMissingChecksum && - failureConf.checkpointFormatVersion == "1" + // With checkpointFormatVersion = 2, the changelog file checksum should be written + assert(verifyChangelogFileChecksumExists(2)) - if (failureCase) { - assert(verifyChangelogFileChecksumExists(2)) - - // The query does not succeed, since we load the old changelog file with the checksum from - // the new changelog file that did not overwrite the old one. This will lead to a checksum - // verification failure when we try to load the old changelog file with the checksum from - // the new changelog file that did not overwrite the old one. - testStream(aggregated2, Update)( - StartStream( - checkpointLocation = checkpointDir.getAbsolutePath, - additionalConfs = secondRunConfs), - AddData(inputData, 4), - ExpectFailure[SparkException] { ex => - ex.getMessage.contains("CHECKPOINT_FILE_CHECKSUM_VERIFICATION_FAILED") - ex.getMessage.contains("2.changelog") - } - ) - - // Verify that the commit file was not written - assert(!(new File(checkpointDir, "commits/3")).exists()) - } else { - if (failureConf.checkpointFormatVersion == "1") { - // With checkpointFormatVersion = 1, the changelog file checksum should not be written - assert(!verifyChangelogFileChecksumExists(2)) - } else { - // With checkpointFormatVersion = 2, the changelog file checksum should be written - assert(verifyChangelogFileChecksumExists(2)) - } - - // The query should restart successfully - testStream(aggregated2, Update)( - StartStream( - checkpointLocation = checkpointDir.getAbsolutePath, - additionalConfs = secondRunConfs), - AddData(inputData, 4), - CheckLastBatch((1004, 2)), - StopStream - ) + // The query should restart successfully + testStream(aggregated2, Update)( + StartStream( + checkpointLocation = checkpointDir.getAbsolutePath, + additionalConfs = secondRunConfs), + AddData(inputData, 4), + CheckLastBatch((1004, 2)), + StopStream + ) - // Verify again the 2.changelog file checksum exists or not - if (failureConf.checkpointFormatVersion == "1") { - assert(!verifyChangelogFileChecksumExists(2)) - } else { - assert(verifyChangelogFileChecksumExists(2)) - } + // Verify again the 2_.changelog file checksum exists + assert(verifyChangelogFileChecksumExists(2)) - assert(verifyChangelogFileExists(4)) - assert(verifyChangelogFileChecksumExists(4)) - assert((new File(checkpointDir, "commits/3")).exists()) - } + assert(verifyChangelogFileExists(4)) + assert(verifyChangelogFileChecksumExists(4)) + assert((new File(checkpointDir, "commits/3")).exists()) } } } @@ -832,54 +796,66 @@ class RocksDBCheckpointFailureInjectionSuite extends StreamTest * Test that verifies that when a task is interrupted, the store's rollback() method does not * throw an exception and the store can still be used after the rollback. */ - test("SPARK-54585: Interrupted task calling rollback does not throw an exception") { - val hadoopConf = new Configuration() - hadoopConf.set( - STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key, - fileManagerClassName - ) - withTempDirAllowFailureInjection { (remoteDir, _) => - val sqlConf = new SQLConf() - sqlConf.setConfString("spark.sql.streaming.checkpoint.fileChecksum.enabled", "true") - val rocksdbChangelogCheckpointingConfKey = - RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".changelogCheckpointing.enabled" - sqlConf.setConfString(rocksdbChangelogCheckpointingConfKey, "true") - val conf = RocksDBConf(StateStoreConf(sqlConf)) + Seq(true, false).foreach { enableStateStoreCheckpointIds => + val newTestName = s"SPARK-54585: Interrupted task calling rollback does not throw an " + + s"exception - with enableStateStoreCheckpointIds = $enableStateStoreCheckpointIds" + + val v2Confs = if (enableStateStoreCheckpointIds) { + Seq(SQLConf.STATE_STORE_CHECKPOINT_FORMAT_VERSION.key -> "2", + SQLConf.STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED.key -> "true") + } else { + Seq.empty + } + test(newTestName) { + val hadoopConf = new Configuration() + hadoopConf.set( + STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key, + fileManagerClassName + ) + withSQLConf(v2Confs: _*) { + withTempDirAllowFailureInjection { (remoteDir, _) => { + val sqlConf = new SQLConf() + val rocksdbChangelogCheckpointingConfKey = + RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".changelogCheckpointing.enabled" + sqlConf.setConfString(rocksdbChangelogCheckpointingConfKey, "true") + val conf = RocksDBConf(StateStoreConf(sqlConf)) - withDB( - remoteDir.getAbsolutePath, - version = 0, - conf = conf, - hadoopConf = hadoopConf - ) { db => - db.put("key0", "value0") - val checkpointId1 = commitAndGetCheckpointId(db) + withDB( + remoteDir.getAbsolutePath, + version = 0, + conf = conf, + hadoopConf = hadoopConf + ) { db => + db.put("key0", "value0") + val checkpointId1 = commitAndGetCheckpointId(db) - db.load(1, checkpointId1) - db.put("key1", "value1") - val checkpointId2 = commitAndGetCheckpointId(db) + db.load(1, checkpointId1) + db.put("key1", "value1") + val checkpointId2 = commitAndGetCheckpointId(db) - db.load(2, checkpointId2) - db.put("key2", "value2") + db.load(2, checkpointId2) + db.put("key2", "value2") - // Simulate what happens when a task is killed, the thread's interrupt flag is set. - // This replicates the scenario where TaskContext.markTaskFailed() is called and - // the task failure listener invokes RocksDBStateStore.abort() -> rollback(). - Thread.currentThread().interrupt() + // Simulate what happens when a task is killed, the thread's interrupt flag is set. + // This replicates the scenario where TaskContext.markTaskFailed() is called and + // the task failure listener invokes RocksDBStateStore.abort() -> rollback(). + Thread.currentThread().interrupt() - // rollback() should not throw an exception - db.rollback() + // rollback() should not throw an exception + db.rollback() - // Clear the interrupt flag for subsequent operations - Thread.interrupted() + // Clear the interrupt flag for subsequent operations + Thread.interrupted() - // Reload the store and insert a new value - db.load(2, checkpointId2) - db.put("key3", "value3") + // Reload the store and insert a new value + db.load(2, checkpointId2) + db.put("key3", "value3") - // Verify the store has the correct values - assert(db.iterator().map(toStr).toSet === - Set(("key0", "value0"), ("key1", "value1"), ("key3", "value3"))) + // Verify the store has the correct values + assert(db.iterator().map(toStr).toSet === + Set(("key0", "value0"), ("key1", "value1"), ("key3", "value3"))) + } + }} } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala index 3e4b4b7320f53..948d3c4f4ead9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala @@ -17,7 +17,9 @@ package org.apache.spark.sql.execution.streaming.state +import java.io.File import java.util.UUID +import java.util.concurrent.{ConcurrentLinkedQueue, CountDownLatch, TimeUnit} import scala.collection.immutable import scala.concurrent.{ExecutionContext, Future} @@ -25,6 +27,7 @@ import scala.util.Random import org.apache.avro.AvroTypeException import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path import org.scalatest.BeforeAndAfter import org.scalatest.PrivateMethodTester import org.scalatest.matchers.should.Matchers @@ -37,6 +40,7 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection, UnsafeRow} import org.apache.spark.sql.catalyst.util.quietly +import org.apache.spark.sql.execution.streaming.checkpointing.{CheckpointFileManager, ChecksumCheckpointFileManager, ChecksumFile} import org.apache.spark.sql.execution.streaming.operators.stateful.StatefulOperatorStateInfo import org.apache.spark.sql.execution.streaming.runtime.StreamExecution import org.apache.spark.sql.internal.SQLConf @@ -3055,6 +3059,368 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid s"acquired thread should be curent thread ${Thread.currentThread().getId} " + s"after load but was $threadId") } + + private def withCheckpointFormatV2(f: => Unit): Unit = { + withSQLConf( + SQLConf.STATE_STORE_CHECKPOINT_FORMAT_VERSION.key -> "2") { + f + } + } + + private def verifyChecksumFiles( + dir: String, expectedNumFiles: Int, expectedNumChecksumFiles: Int): Unit = { + val allFiles = new File(dir) + // filter out dirs and local hdfs files + .listFiles().filter(f => f.isFile && !f.getName.startsWith(".")) + .map(f => new Path(f.toURI)).toSet + assert(allFiles.size == expectedNumFiles) + + val checksumFiles = allFiles.filter( + ChecksumCheckpointFileManager.isChecksumFile).map(ChecksumFile) + assert(checksumFiles.size == expectedNumChecksumFiles) + + // verify that no orphan checksum file i.e. the respective main file should be present + assert(checksumFiles.forall(c => allFiles.contains(c.mainFilePath))) + } + + /** + * Helper to put a value and commit, returning (newVersion, checkpointInfo). + * The checkpoint info carries the unique ID needed for V2 reloads. + */ + private def putAndCommitStore( + provider: RocksDBStateStoreProvider, + loadVersion: Long, + doMaintenance: Boolean, + uniqueId: Option[String] = None): (Long, StateStoreCheckpointInfo) = { + val store = provider.getStore(loadVersion, uniqueId) + put(store, loadVersion.toString, loadVersion.toInt, loadVersion.toInt * 100) + val newVersion = store.commit() + val ckptInfo = store.getStateStoreCheckpointInfo() + + if (doMaintenance) { + provider.doMaintenance() + } + + (newVersion, ckptInfo) + } + + testWithAllCodec("file checksum can be enabled and disabled for the same checkpoint") { _ => + withCheckpointFormatV2 { + val storeId = StateStoreId(newDir(), 0L, 1) + var version = 0L + var lastCkptId: Option[String] = None + + // Commit to store using file checksum + withSQLConf(SQLConf.STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED.key -> true.toString) { + tryWithProviderResource(newStoreProviderWithClonedConf(storeId)) { provider => + val store = provider.getStore(version) + put(store, "1", 11, 100) + put(store, "2", 22, 200) + version = store.commit() + lastCkptId = store.getStateStoreCheckpointInfo().stateStoreCkptId + } + } + + // Reload the store and commit without file checksum + withSQLConf(SQLConf.STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED.key -> false.toString) { + tryWithProviderResource(newStoreProviderWithClonedConf(storeId)) { provider => + assert(version == 1) + val store = provider.getStore(version, lastCkptId) + assert(get(store, "1", 11) === Some(100)) + assert(get(store, "2", 22) === Some(200)) + + put(store, "3", 33, 300) + put(store, "4", 44, 400) + version = store.commit() + lastCkptId = store.getStateStoreCheckpointInfo().stateStoreCkptId + } + } + + // Reload the store and commit with file checksum + withSQLConf(SQLConf.STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED.key -> true.toString) { + tryWithProviderResource(newStoreProviderWithClonedConf(storeId)) { provider => + assert(version == 2) + val store = provider.getStore(version, lastCkptId) + assert(get(store, "1", 11) === Some(100)) + assert(get(store, "2", 22) === Some(200)) + assert(get(store, "3", 33) === Some(300)) + assert(get(store, "4", 44) === Some(400)) + + put(store, "5", 55, 500) + version = store.commit() + } + } + } + } + + test("checksum files are also cleaned up during maintenance") { + withCheckpointFormatV2 { + val storeId = StateStoreId(newDir(), 0L, 1) + val numBatches = 6 + val minDeltas = 2 + // Adding 1 to ensure snapshot is uploaded. + // Snapshot upload might happen at minDeltas or minDeltas + 1, depending on the provider + val maintFrequency = minDeltas + 1 + var version = 0L + var lastCkptInfo: StateStoreCheckpointInfo = null + + withSQLConf(SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> minDeltas.toString, + SQLConf.MIN_BATCHES_TO_RETAIN.key -> "1", + // So that RocksDB will also generate changelog files + RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".changelogCheckpointing.enabled" -> + true.toString) { + + tryWithProviderResource(newStoreProviderWithClonedConf(storeId)) { provider => + var ckptId: Option[String] = None + (version + 1 to numBatches).foreach { i => + val (v, info) = putAndCommitStore( + provider, loadVersion = i - 1, doMaintenance = i % maintFrequency == 0, + uniqueId = ckptId) + version = v + ckptId = info.stateStoreCkptId + lastCkptInfo = info + } + + // For RocksDB State store, files left: + // With effective minVersionsToRetain = 2 and minVersionsToDelete = 1, we retain + // changelog versions 1-6 plus snapshots for 2 and 6 (each with checksum files). + // Files: 1-6.changelog (6) + 2.zip, 6.zip (2) => 8 main + 8 checksum. + // NOTE: 1.changelog (and associated checksum file) are retained because changelog + // files only get cleaned up after the preceding snapshot is deleted. Since there is + // no snapshot for version 0, the changelog file for version 1 will be retained till + // `2.zip` becomes eligible for deletion. + verifyChecksumFiles(storeId.storeCheckpointLocation().toString, + expectedNumFiles = 16, expectedNumChecksumFiles = 8) + } + + // turn off file checksum, and verify that the previously created checksum files + // will be deleted by maintenance + withSQLConf(SQLConf.STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED.key -> false.toString) { + tryWithProviderResource(newStoreProviderWithClonedConf(storeId)) { provider => + var ckptId = lastCkptInfo.stateStoreCkptId + (version + 1 to version + numBatches).foreach { i => + val (v, info) = putAndCommitStore( + provider, loadVersion = i - 1, doMaintenance = i % maintFrequency == 0, + uniqueId = ckptId) + version = v + ckptId = info.stateStoreCkptId + } + + // now verify no checksum files are left + // For RocksDB State store, files left: + // With effective minVersionsToRetain = 2, we retain versions 8-12 plus snapshots + // for 8 and 12 (no checksum files). + // Files: 8-12.changelog (5) + 8.zip, 12.zip (2) => 7 main files. + verifyChecksumFiles(storeId.storeCheckpointLocation().toString, + expectedNumFiles = 7, expectedNumChecksumFiles = 0) + } + } + } + } + } + + + test("Load spark 4.0 golden checkpoint written without checksum") { + withCheckpointFormatV2 { + // We have Spark 4.0 golden files for each of these operations and store names. + // Tuple(operation name, store name, key schema, value schema) + val stores = Seq( + ("agg", StateStoreId.DEFAULT_STORE_NAME, + StructType(Seq(StructField("_1", IntegerType, nullable = false))), + StructType(Seq(StructField("count", LongType, nullable = false)))), + ("dedup", StateStoreId.DEFAULT_STORE_NAME, + StructType(Seq(StructField("_1", IntegerType, nullable = false))), + StructType(Seq(StructField("__dummy__", NullType)))), + ("join1", "left-keyToNumValues", + StructType(Seq(StructField("field0", IntegerType, nullable = false))), + StructType(Seq(StructField("value", LongType)))), + ("join1", "left-keyWithIndexToValue", + StructType(Seq(StructField("field0", IntegerType, nullable = false))), + StructType(Seq(StructField("value", LongType)))), + ("join1", "right-keyToNumValues", + StructType(Seq(StructField("field0", IntegerType, nullable = false))), + StructType(Seq(StructField("value", LongType)))), + ("join1", "right-keyWithIndexToValue", + StructType(Seq(StructField("field0", IntegerType, nullable = false))), + StructType(Seq(StructField("value", LongType)))) + ) + + // now try to load the store with checksum enabled. + // Golden files are V1 format (no unique IDs), loaded via snapshots. + withSQLConf(SQLConf.STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED.key -> true.toString) { + stores.foreach { case (opName, storeName, keySchema, valueSchema) => + (1 to 4).foreach { version => + tryWithProviderResource(newStoreProviderNoInit()) { provider => + val checkpointPath = this.getClass.getResource( + s"/structured-streaming/checkpoint-version-4.0.0/rocksdb/$opName/state" + ).getPath + + val conf = new Configuration() + conf.set(StreamExecution.RUN_ID_KEY, UUID.randomUUID().toString) + provider.init( + StateStoreId(checkpointPath, 0, 0, storeName), + keySchema, + valueSchema, + NoPrefixKeyStateEncoderSpec(keySchema), + useColumnFamilies = false, + new StateStoreConf(cloneSQLConf()), + conf + ) + val store = provider.getStore(version) + store.abort() + } + } + } + } + } + } + + test("fileChecksumThreadPoolSize propagates to ChecksumCheckpointFileManager") { + withCheckpointFormatV2 { + Seq(0, 1, 6).foreach { numThreads => + val storeId = StateStoreId(newDir(), 0L, 0) + withSQLConf( + SQLConf.STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED.key -> "true", + SQLConf.STATE_STORE_FILE_CHECKSUM_THREAD_POOL_SIZE.key -> numThreads.toString) { + tryWithProviderResource(newStoreProviderWithClonedConf(storeId)) { provider => + val fmMethod = PrivateMethod[CheckpointFileManager](Symbol("fm")) + val fm = provider.rocksDB.fileManager invokePrivate fmMethod() + assert(fm.isInstanceOf[ChecksumCheckpointFileManager]) + assert(fm.asInstanceOf[ChecksumCheckpointFileManager].numThreads === numThreads) + } + } + } + } + } + + test("file checksum behavior respect checkpoint format version") { + val cases = Seq( + // (ckptFormatVersion, checksumConfEnabled, expectedResult) + // always disable file checksum with ckpt format v1 + (1, true, false), + (1, false, false), + (2, true, true), + (2, false, false) + ) + + cases.foreach { case (version, checksumConfEnabled, expected) => + val sqlConf = new SQLConf() + sqlConf.setConf(SQLConf.STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED, checksumConfEnabled) + sqlConf.setConf(SQLConf.STATE_STORE_CHECKPOINT_FORMAT_VERSION, version) + + val storeConf = StateStoreConf(sqlConf) + assert( + storeConf.checkpointFileChecksumEnabled == expected, + s"ckptVersion = $version, checksumConf = $checksumConfEnabled, expected = $expected," + + s" actual = ${storeConf.checkpointFileChecksumEnabled}" + ) + } + } + + test("STATE_STORE_FILE_CHECKSUM_THREAD_POOL_SIZE: invalid negative value is rejected") { + val sqlConf = SQLConf.get.clone() + val ex = intercept[IllegalArgumentException] { + sqlConf.setConfString(SQLConf.STATE_STORE_FILE_CHECKSUM_THREAD_POOL_SIZE.key, "-1") + } + assert(ex.getMessage.contains("Must be a non-negative integer")) + } + + test("fileChecksumThreadPoolSize = 0 supports sequential I/O (load, write, commit, reload)") { + withCheckpointFormatV2 { + val storeId = StateStoreId(newDir(), 0L, 0) + var lastCkptId: Option[String] = None + withSQLConf( + SQLConf.STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED.key -> "true", + SQLConf.STATE_STORE_FILE_CHECKSUM_THREAD_POOL_SIZE.key -> "0") { + // Write some state with sequential mode enabled + tryWithProviderResource(newStoreProviderWithClonedConf(storeId)) { provider => + val store = provider.getStore(0) + put(store, "a", 0, 1) + put(store, "b", 0, 2) + store.commit() + lastCkptId = store.getStateStoreCheckpointInfo().stateStoreCkptId + // Verify that main file and checksum file were both written to disk. + // RocksDB produces 2 files and 1 checksum file: 1.zip, 1.zip.crc + verifyChecksumFiles(storeId.storeCheckpointLocation().toString, + expectedNumFiles = 2, expectedNumChecksumFiles = 1) + } + + // Reload and verify state is intact + tryWithProviderResource(newStoreProviderWithClonedConf(storeId)) { provider => + val store = provider.getStore(1, lastCkptId) + assert(get(store, "a", 0) === Some(1)) + assert(get(store, "b", 0) === Some(2)) + store.abort() + } + } + } + } + + test( + "fileChecksumThreadPoolSize = 0: concurrent store commit and maintenance both complete") { + withCheckpointFormatV2 { + val storeId = StateStoreId(newDir(), 0L, 0) + var lastCkptId: Option[String] = None + withSQLConf( + SQLConf.STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED.key -> "true", + SQLConf.STATE_STORE_FILE_CHECKSUM_THREAD_POOL_SIZE.key -> "0", + SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1", + // So that RocksDB uploads a snapshot during maintenance rather than being a no-op. + RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".changelogCheckpointing.enabled" -> "true") { + tryWithProviderResource(newStoreProviderWithClonedConf(storeId)) { provider => + // Build up a few versions so maintenance has something to work with. + // With minDeltasForSnapshot=1 and changelog checkpointing enabled, doMaintenance() + // uploads a queued snapshot, exercising the checksum file manager concurrently. + var ckptId: Option[String] = None + (0L until 3L).foreach { version => + val (_, info) = putAndCommitStore(provider, version, + doMaintenance = false, uniqueId = ckptId) + ckptId = info.stateStoreCkptId + } + + // Load the store and prepare the write before maintenance starts, so that + // store.commit() (the actual file I/O) is what overlaps with doMaintenance(). + val store = provider.getStore(3, ckptId) + put(store, "3", 3, 300) + + val errors = new ConcurrentLinkedQueue[Throwable]() + val maintenanceStartedLatch = new CountDownLatch(1) + val maintenanceDoneLatch = new CountDownLatch(1) + + val maintenanceThread = new Thread(() => { + try { + maintenanceStartedLatch.countDown() + provider.doMaintenance() + } catch { + case t: Throwable => errors.add(t) + } finally { + maintenanceDoneLatch.countDown() + } + }) + maintenanceThread.setDaemon(true) + maintenanceThread.start() + + // Wait until maintenance is running, then commit to simulate concurrency. + assert(maintenanceStartedLatch.await(30, TimeUnit.SECONDS), + "Maintenance thread did not start within 30 seconds") + store.commit() + lastCkptId = store.getStateStoreCheckpointInfo().stateStoreCkptId + + assert(maintenanceDoneLatch.await(30, TimeUnit.SECONDS), + "Maintenance did not complete within 30 seconds") + assert(errors.isEmpty, + s"Maintenance failed with: ${Option(errors.peek()).map(_.getMessage).orNull}") + } + + // Verify state committed concurrently with maintenance is intact. + tryWithProviderResource(newStoreProviderWithClonedConf(storeId)) { provider => + val store = provider.getStore(4, lastCkptId) + assert(get(store, "3", 3) === Some(300)) + store.abort() + } + } + } + } } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala index 859aa7c094498..5fd016ac2ecee 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala @@ -264,7 +264,14 @@ trait AlsoTestWithRocksDBFeatures val newTestName = s"$testName - with enableStateStoreCheckpointIds = " + s"$enableStateStoreCheckpointIds" testWithColumnFamilies(newTestName, testMode, testTags: _*) { colFamiliesEnabled => - testBody(enableStateStoreCheckpointIds, colFamiliesEnabled) + val v2Confs = if (enableStateStoreCheckpointIds) { + Seq(SQLConf.STATE_STORE_CHECKPOINT_FORMAT_VERSION.key -> "2") + } else { + Seq.empty + } + withSQLConf(v2Confs: _*) { + testBody(enableStateStoreCheckpointIds, colFamiliesEnabled) + } } } } @@ -277,7 +284,14 @@ trait AlsoTestWithRocksDBFeatures val newTestName = s"$testName - with enableStateStoreCheckpointIds = " + s"$enableStateStoreCheckpointIds" test(newTestName, testTags: _*) { - testBody(enableStateStoreCheckpointIds) + val v2Confs = if (enableStateStoreCheckpointIds) { + Seq(SQLConf.STATE_STORE_CHECKPOINT_FORMAT_VERSION.key -> "2") + } else { + Seq.empty + } + withSQLConf(v2Confs: _*) { + testBody(enableStateStoreCheckpointIds) + } } } } @@ -290,7 +304,14 @@ trait AlsoTestWithRocksDBFeatures val newTestName = s"$testName - with enableStateStoreCheckpointIds = " + s"$enableStateStoreCheckpointIds" testWithChangelogCheckpointingDisabled(newTestName, testTags: _*) { - enableStateStoreCheckpointIds => testBody(enableStateStoreCheckpointIds) + val v2Confs = if (enableStateStoreCheckpointIds) { + Seq(SQLConf.STATE_STORE_CHECKPOINT_FORMAT_VERSION.key -> "2") + } else { + Seq.empty + } + withSQLConf(v2Confs: _*) { + testBody(enableStateStoreCheckpointIds) + } } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala index 7570cf942f061..1c98424987b63 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala @@ -43,7 +43,7 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection, UnsafeRow} import org.apache.spark.sql.catalyst.util.quietly import org.apache.spark.sql.execution.streaming._ -import org.apache.spark.sql.execution.streaming.checkpointing.{CheckpointFileManager, ChecksumCheckpointFileManager, ChecksumFile} +import org.apache.spark.sql.execution.streaming.checkpointing.CheckpointFileManager import org.apache.spark.sql.execution.streaming.runtime.{MemoryStream, StreamExecution} import org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorSuite.withCoordinatorRef import org.apache.spark.sql.functions.count @@ -1195,13 +1195,13 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] // Put should create a temp file put(store0, "a", 0, 1) - assert(numTempFiles === 2) + assert(numTempFiles === 1) assert(numDeltaFiles === 0) // Commit should remove temp file and create a delta file store0.commit() assert(numTempFiles === 0) - assert(numDeltaFiles === 2) + assert(numDeltaFiles === 1) // Remove should create a temp file val store1 = shouldNotCreateTempFile { @@ -1211,13 +1211,13 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] version = 1, None, None, useColumnFamilies = false, storeConf, hadoopConf) } remove(store1, _._1 == "a") - assert(numTempFiles === 2) - assert(numDeltaFiles === 2) + assert(numTempFiles === 1) + assert(numDeltaFiles === 1) // Commit should remove temp file and create a delta file store1.commit() assert(numTempFiles === 0) - assert(numDeltaFiles === 4) + assert(numDeltaFiles === 2) // Commit without any updates should create a delta file val store2 = shouldNotCreateTempFile { @@ -1228,7 +1228,7 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] } store2.commit() assert(numTempFiles === 0) - assert(numDeltaFiles === 6) + assert(numDeltaFiles === 3) } test("SPARK-21145: Restarted queries create new provider instances") { @@ -1463,7 +1463,6 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] test("Auto snapshot repair") { withSQLConf( - SQLConf.STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED.key -> false.toString, SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1" // for hdfs means every 2 versions ) { val storeId = StateStoreId(newDir(), 0L, 1) @@ -2248,370 +2247,6 @@ abstract class StateStoreSuiteBase[ProviderClass <: StateStoreProvider] } } - testWithAllCodec("file checksum can be enabled and disabled for the same checkpoint") { _ => - val storeId = StateStoreId(newDir(), 0L, 1) - var version = 0L - - // Commit to store using file checksum - withSQLConf(SQLConf.STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED.key -> true.toString) { - tryWithProviderResource(newStoreProviderWithClonedConf(storeId)) { provider => - val store = provider.getStore(version) - put(store, "1", 11, 100) - put(store, "2", 22, 200) - version = store.commit() - } - } - - // Reload the store and commit without file checksum - withSQLConf(SQLConf.STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED.key -> false.toString) { - tryWithProviderResource(newStoreProviderWithClonedConf(storeId)) { provider => - assert(version == 1) - val store = provider.getStore(version) - assert(get(store, "1", 11) === Some(100)) - assert(get(store, "2", 22) === Some(200)) - - put(store, "3", 33, 300) - put(store, "4", 44, 400) - version = store.commit() - } - } - - // Reload the store and commit with file checksum - withSQLConf(SQLConf.STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED.key -> true.toString) { - tryWithProviderResource(newStoreProviderWithClonedConf(storeId)) { provider => - assert(version == 2) - val store = provider.getStore(version) - assert(get(store, "1", 11) === Some(100)) - assert(get(store, "2", 22) === Some(200)) - assert(get(store, "3", 33) === Some(300)) - assert(get(store, "4", 44) === Some(400)) - - put(store, "5", 55, 500) - version = store.commit() - } - } - } - - test("checksum files are also cleaned up during maintenance") { - val storeId = StateStoreId(newDir(), 0L, 1) - val numBatches = 6 - val minDeltas = 2 - // Adding 1 to ensure snapshot is uploaded. - // Snapshot upload might happen at minDeltas or minDeltas + 1, depending on the provider - val maintFrequency = minDeltas + 1 - var version = 0L - - withSQLConf(SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> minDeltas.toString, - SQLConf.MIN_BATCHES_TO_RETAIN.key -> "1", - // So that RocksDB will also generate changelog files - RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".changelogCheckpointing.enabled" -> - true.toString) { - - tryWithProviderResource(newStoreProviderWithClonedConf(storeId)) { provider => - (version + 1 to numBatches).foreach { i => - version = putAndCommitStore( - provider, loadVersion = i - 1, doMaintenance = i % maintFrequency == 0) - } - - // This is because, hdfs and rocksdb old files detection logic is different - provider match { - case _: HDFSBackedStateStoreProvider => - // For HDFS State store, files left: - // 3.delta to 6.delta (+ checksum file) - // 3.snapshot (+ checksum file), 6.snapshot (+ checksum file) - verifyChecksumFiles(storeId.storeCheckpointLocation().toString, - expectedNumFiles = 12, expectedNumChecksumFiles = 6) - case _ => - // For RocksDB State store, files left: - // 6.changelog (+ checksum file), 6.zip (+ checksum file) - verifyChecksumFiles(storeId.storeCheckpointLocation().toString, - expectedNumFiles = 4, expectedNumChecksumFiles = 2) - } - } - - // turn off file checksum, and verify that the previously created checksum files - // will be deleted by maintenance - withSQLConf(SQLConf.STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED.key -> false.toString) { - tryWithProviderResource(newStoreProviderWithClonedConf(storeId)) { provider => - (version + 1 to version + numBatches).foreach { i => - version = putAndCommitStore( - provider, loadVersion = i - 1, doMaintenance = i % maintFrequency == 0) - } - - // now verify no checksum files are left - // This is because, hdfs and rocksdb old files detection logic is different - provider match { - case _: HDFSBackedStateStoreProvider => - // For HDFS State store, files left: - // 6.delta, 9.delta to 12.delta - // 9.snapshot, 12.snapshot - verifyChecksumFiles(storeId.storeCheckpointLocation().toString, - expectedNumFiles = 7, expectedNumChecksumFiles = 0) - case _ => - // For RocksDB State store, files left: - // 12.changelog, 12.zip - verifyChecksumFiles(storeId.storeCheckpointLocation().toString, - expectedNumFiles = 2, expectedNumChecksumFiles = 0) - } - } - } - } - } - - testWithAllCodec("overwrite state file without overwriting checksum file") { _ => - val storeId = StateStoreId(newDir(), 0L, 1) - val numBatches = 3 - val minDeltas = 2 - - withSQLConf(SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> minDeltas.toString, - // So that RocksDB will also generate changelog files - RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".changelogCheckpointing.enabled" -> - true.toString) { - - // First run with file checksum enabled. It will generate state and checksum files. - // Turn off file checksum, and regenerate only the state files - Seq(true, false).foreach { fileChecksumEnabled => - withSQLConf( - SQLConf.STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED.key -> fileChecksumEnabled.toString) { - tryWithProviderResource(newStoreProviderWithClonedConf(storeId)) { provider => - (1 to numBatches).foreach { i => - putAndCommitStore( - provider, loadVersion = i - 1, doMaintenance = false) - } - - // This should only create snapshot and no delete - provider.doMaintenance() - - // number of files should be the same. - // 3 delta/changelog files, 1 snapshot (with checksum files) - verifyChecksumFiles(storeId.storeCheckpointLocation().toString, - expectedNumFiles = 8, expectedNumChecksumFiles = 4) - } - } - } - - withSQLConf( - SQLConf.STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED.key -> true.toString) { - // now try to load the store with checksum enabled. - // It will verify the overwritten state files with the checksum files. - (1 to numBatches).foreach { i => - tryWithProviderResource(newStoreProviderWithClonedConf(storeId)) { provider => - // load from DFS should be successful - val store = provider.getStore(i) - store.abort() - } - } - } - } - } - - test("Load spark 4.0 golden checkpoint written without checksum") { - // We have Spark 4.0 golden files for each of these operations and store names. - // Tuple(operation name, store name, key schema, value schema) - val stores = Seq( - ("agg", StateStoreId.DEFAULT_STORE_NAME, - StructType(Seq(StructField("_1", IntegerType, nullable = false))), - StructType(Seq(StructField("count", LongType, nullable = false)))), - ("dedup", StateStoreId.DEFAULT_STORE_NAME, - StructType(Seq(StructField("_1", IntegerType, nullable = false))), - StructType(Seq(StructField("__dummy__", NullType)))), - ("join1", "left-keyToNumValues", - StructType(Seq(StructField("field0", IntegerType, nullable = false))), - StructType(Seq(StructField("value", LongType)))), - ("join1", "left-keyWithIndexToValue", - StructType(Seq(StructField("field0", IntegerType, nullable = false))), - StructType(Seq(StructField("value", LongType)))), - ("join1", "right-keyToNumValues", - StructType(Seq(StructField("field0", IntegerType, nullable = false))), - StructType(Seq(StructField("value", LongType)))), - ("join1", "right-keyWithIndexToValue", - StructType(Seq(StructField("field0", IntegerType, nullable = false))), - StructType(Seq(StructField("value", LongType)))) - ) - - // now try to load the store with checksum enabled. - withSQLConf(SQLConf.STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED.key -> true.toString) { - stores.foreach { case (opName, storeName, keySchema, valueSchema) => - (1 to 4).foreach { version => - tryWithProviderResource(newStoreProviderNoInit()) { provider => - // load from golden checkpoint should be successful - val providerName = provider match { - case _: HDFSBackedStateStoreProvider => "hdfs" - case _: RocksDBStateStoreProvider => "rocksdb" - case _ => throw new IllegalArgumentException("Unknown ProviderClass") - } - - val checkpointPath = this.getClass.getResource( - s"/structured-streaming/checkpoint-version-4.0.0/" + - s"$providerName/$opName/state" - ).getPath - - val conf = new Configuration() - conf.set(StreamExecution.RUN_ID_KEY, UUID.randomUUID().toString) - provider.init( - StateStoreId(checkpointPath, 0, 0, storeName), - keySchema, - valueSchema, - NoPrefixKeyStateEncoderSpec(keySchema), - useColumnFamilies = false, - new StateStoreConf(cloneSQLConf()), - conf - ) - val store = provider.getStore(version) - store.abort() - } - } - } - } - } - - test("fileChecksumThreadPoolSize propagates to ChecksumCheckpointFileManager") { - Seq(0, 1, 6).foreach { numThreads => - val storeId = StateStoreId(newDir(), 0L, 0) - withSQLConf( - SQLConf.STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED.key -> "true", - SQLConf.STATE_STORE_FILE_CHECKSUM_THREAD_POOL_SIZE.key -> numThreads.toString) { - tryWithProviderResource(newStoreProviderWithClonedConf(storeId)) { provider => - val fmMethod = PrivateMethod[CheckpointFileManager](Symbol("fm")) - val fm = provider match { - case hdfs: HDFSBackedStateStoreProvider => - hdfs.fm - case rocksdb: RocksDBStateStoreProvider => - rocksdb.rocksDB.fileManager invokePrivate fmMethod() - case _ => - fail(s"Unexpected provider type: ${provider.getClass.getName}") - } - assert(fm.isInstanceOf[ChecksumCheckpointFileManager]) - assert(fm.asInstanceOf[ChecksumCheckpointFileManager].numThreads === numThreads) - } - } - } - } - - test("STATE_STORE_FILE_CHECKSUM_THREAD_POOL_SIZE: invalid negative value is rejected") { - val sqlConf = SQLConf.get.clone() - val ex = intercept[IllegalArgumentException] { - sqlConf.setConfString(SQLConf.STATE_STORE_FILE_CHECKSUM_THREAD_POOL_SIZE.key, "-1") - } - assert(ex.getMessage.contains("Must be a non-negative integer")) - } - - test("fileChecksumThreadPoolSize = 0 supports sequential I/O (load, write, commit, reload)") { - val storeId = StateStoreId(newDir(), 0L, 0) - withSQLConf( - SQLConf.STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED.key -> "true", - SQLConf.STATE_STORE_FILE_CHECKSUM_THREAD_POOL_SIZE.key -> "0") { - // Write some state with sequential mode enabled - tryWithProviderResource(newStoreProviderWithClonedConf(storeId)) { provider => - val store = provider.getStore(0) - put(store, "a", 0, 1) - put(store, "b", 0, 2) - store.commit() - // Verify that main file and checksum file were both written to disk. - // Both HDFS and RocksDB produce 2 files and 1 checksum file: - // HDFS: 1.delta, 1.delta.checksum - // RocksDB: 1.zip, 1.zip.crc - verifyChecksumFiles(storeId.storeCheckpointLocation().toString, - expectedNumFiles = 2, expectedNumChecksumFiles = 1) - } - - // Reload and verify state is intact - tryWithProviderResource(newStoreProviderWithClonedConf(storeId)) { provider => - val store = provider.getStore(1) - assert(get(store, "a", 0) === Some(1)) - assert(get(store, "b", 0) === Some(2)) - store.abort() - } - } - } - - test("fileChecksumThreadPoolSize = 0: concurrent store commit and maintenance both complete") { - val storeId = StateStoreId(newDir(), 0L, 0) - withSQLConf( - SQLConf.STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED.key -> "true", - SQLConf.STATE_STORE_FILE_CHECKSUM_THREAD_POOL_SIZE.key -> "0", - SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1", - // So that RocksDB uploads a snapshot during maintenance rather than being a no-op. - RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".changelogCheckpointing.enabled" -> "true") { - tryWithProviderResource(newStoreProviderWithClonedConf(storeId)) { provider => - // Build up a few versions so maintenance has something to work with. - // With minDeltasForSnapshot=1 and changelog checkpointing enabled, doMaintenance() - // uploads a queued snapshot, exercising the checksum file manager concurrently. - (0L until 3L).foreach { version => - putAndCommitStore(provider, version, doMaintenance = false) - } - - // Load the store and prepare the write before maintenance starts, so that - // store.commit() (the actual file I/O) is what overlaps with doMaintenance(). - val store = provider.getStore(3) - put(store, "3", 3, 300) - - val errors = new ConcurrentLinkedQueue[Throwable]() - val maintenanceStartedLatch = new CountDownLatch(1) - val maintenanceDoneLatch = new CountDownLatch(1) - - val maintenanceThread = new Thread(() => { - try { - maintenanceStartedLatch.countDown() - provider.doMaintenance() - } catch { - case t: Throwable => errors.add(t) - } finally { - maintenanceDoneLatch.countDown() - } - }) - maintenanceThread.setDaemon(true) - maintenanceThread.start() - - // Wait until maintenance is running, then commit to simulate concurrency. - assert(maintenanceStartedLatch.await(30, TimeUnit.SECONDS), - "Maintenance thread did not start within 30 seconds") - store.commit() - - assert(maintenanceDoneLatch.await(30, TimeUnit.SECONDS), - "Maintenance did not complete within 30 seconds") - assert(errors.isEmpty, - s"Maintenance failed with: ${Option(errors.peek()).map(_.getMessage).orNull}") - } - - // Verify state committed concurrently with maintenance is intact. - tryWithProviderResource(newStoreProviderWithClonedConf(storeId)) { provider => - val store = provider.getStore(4) - assert(get(store, "3", 3) === Some(300)) - store.abort() - } - } - } - - private def verifyChecksumFiles( - dir: String, expectedNumFiles: Int, expectedNumChecksumFiles: Int): Unit = { - val allFiles = new File(dir) - // filter out dirs and local hdfs files - .listFiles().filter(f => f.isFile && !f.getName.startsWith(".")) - .map(f => new Path(f.toURI)).toSet - assert(allFiles.size == expectedNumFiles) - - val checksumFiles = allFiles.filter( - ChecksumCheckpointFileManager.isChecksumFile).map(ChecksumFile) - assert(checksumFiles.size == expectedNumChecksumFiles) - - // verify that no orphan checksum file i.e. the respective main file should be present - assert(checksumFiles.forall(c => allFiles.contains(c.mainFilePath))) - } - - private def putAndCommitStore( - provider: ProviderClass, loadVersion: Long, doMaintenance: Boolean): Long = { - val store = provider.getStore(loadVersion) - put(store, loadVersion.toString, loadVersion.toInt, loadVersion.toInt * 100) - val newVersion = store.commit() - - if (doMaintenance) { - provider.doMaintenance() - } - - newVersion - } - test("StateStore.get") { val conf = new SparkConf() .setMaster("local") From 4a321bd7fd1a7da0614a05d889b5d4f8e4c0b4b7 Mon Sep 17 00:00:00 2001 From: zifeif2 Date: Mon, 13 Apr 2026 21:56:26 +0000 Subject: [PATCH 2/4] use stateStoreCheckpointFormatVersion; --- .../spark/sql/execution/streaming/state/StateStoreConf.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala index 937cf1ed684a0..e19ac06732fa1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala @@ -115,7 +115,7 @@ class StateStoreConf( /** Whether file checksum generation and verification is enabled. */ val checkpointFileChecksumEnabled: Boolean = - sqlConf.checkpointFileChecksumEnabled && sqlConf.enableStateStoreCheckpointIds + sqlConf.checkpointFileChecksumEnabled && sqlConf.stateStoreCheckpointFormatVersion >= 2 /** Number of threads for the file checksum thread pool (0 to disable). */ val fileChecksumThreadPoolSize: Int = sqlConf.stateStoreFileChecksumThreadPoolSize From 9827534e1dd069290acd9c298dae68c4621d74fa Mon Sep 17 00:00:00 2001 From: zifeif2 Date: Tue, 14 Apr 2026 00:43:41 +0000 Subject: [PATCH 3/4] fix failing test --- .../streaming/state/RocksDBStateStoreSuite.scala | 16 ++++------------ .../execution/streaming/state/RocksDBSuite.scala | 14 ++++++++++---- 2 files changed, 14 insertions(+), 16 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala index 948d3c4f4ead9..302324043ec32 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala @@ -3182,15 +3182,9 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid } // For RocksDB State store, files left: - // With effective minVersionsToRetain = 2 and minVersionsToDelete = 1, we retain - // changelog versions 1-6 plus snapshots for 2 and 6 (each with checksum files). - // Files: 1-6.changelog (6) + 2.zip, 6.zip (2) => 8 main + 8 checksum. - // NOTE: 1.changelog (and associated checksum file) are retained because changelog - // files only get cleaned up after the preceding snapshot is deleted. Since there is - // no snapshot for version 0, the changelog file for version 1 will be retained till - // `2.zip` becomes eligible for deletion. + // 6.changelog (+ checksum file), 6.zip (+ checksum file) verifyChecksumFiles(storeId.storeCheckpointLocation().toString, - expectedNumFiles = 16, expectedNumChecksumFiles = 8) + expectedNumFiles = 4, expectedNumChecksumFiles = 2) } // turn off file checksum, and verify that the previously created checksum files @@ -3208,11 +3202,9 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid // now verify no checksum files are left // For RocksDB State store, files left: - // With effective minVersionsToRetain = 2, we retain versions 8-12 plus snapshots - // for 8 and 12 (no checksum files). - // Files: 8-12.changelog (5) + 8.zip, 12.zip (2) => 7 main files. + // 12.changelog, 12.zip verifyChecksumFiles(storeId.storeCheckpointLocation().toString, - expectedNumFiles = 7, expectedNumChecksumFiles = 0) + expectedNumFiles = 2, expectedNumChecksumFiles = 0) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala index 5fd016ac2ecee..d0bd5e5ef9307 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala @@ -3571,7 +3571,9 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with SharedSparkSession } // reload version 2 - should succeed - withDB(remoteDir, version = 2, conf = conf) { db => + withDB(remoteDir, version = 2, conf = conf, + enableStateStoreCheckpointIds = enableStateStoreCheckpointIds, + versionToUniqueId = versionToUniqueId) { db => } } @@ -3605,7 +3607,9 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with SharedSparkSession db.commit() // create snapshot again // load version 1 - should succeed - withDB(remoteDir, version = 1, conf = conf, hadoopConf = hadoopConf) { db => + withDB(remoteDir, version = 1, conf = conf, hadoopConf = hadoopConf, + enableStateStoreCheckpointIds = enableStateStoreCheckpointIds, + versionToUniqueId = versionToUniqueId) { db => } // upload recently created snapshot @@ -3613,7 +3617,9 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with SharedSparkSession assert(snapshotVersionsPresent(remoteDir) == Seq(1)) // load version 1 again - should succeed - withDB(remoteDir, version = 1, conf = conf, hadoopConf = hadoopConf) { db => + withDB(remoteDir, version = 1, conf = conf, hadoopConf = hadoopConf, + enableStateStoreCheckpointIds = enableStateStoreCheckpointIds, + versionToUniqueId = versionToUniqueId) { db => } } } @@ -3746,7 +3752,7 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with SharedSparkSession if (inc > 1) { // Create changelog files in the gap for (j <- 1 to inc - 1) { - db2.load(curVer + j) + db2.load(curVer + j, versionToUniqueId.get(curVer + j)) db2.put("foo", "bar") db2.commit() } From f0b079cff7b18f91700699bcbb6388a67154185d Mon Sep 17 00:00:00 2001 From: zifeif2 Date: Tue, 14 Apr 2026 04:35:36 +0000 Subject: [PATCH 4/4] fix more tests --- pom.xml | 4 ++++ .../sql/execution/streaming/state/RocksDBSuite.scala | 8 +++++++- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index d1a73594944f8..6142535753201 100644 --- a/pom.xml +++ b/pom.xml @@ -382,6 +382,10 @@ false + + databricks-maven-proxy + https://maven-proxy.dev.databricks.com + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala index d0bd5e5ef9307..4fa19adf05031 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala @@ -3614,7 +3614,13 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with SharedSparkSession // upload recently created snapshot db.doMaintenance() - assert(snapshotVersionsPresent(remoteDir) == Seq(1)) + // With V2 checkpoint format, each commit gets a unique ID so the second + // version-1 snapshot has a different filename and both coexist on disk. + if (enableStateStoreCheckpointIds) { + assert(snapshotVersionsPresent(remoteDir) == Seq(1, 1)) + } else { + assert(snapshotVersionsPresent(remoteDir) == Seq(1)) + } // load version 1 again - should succeed withDB(remoteDir, version = 1, conf = conf, hadoopConf = hadoopConf,