diff --git a/pom.xml b/pom.xml
index d1a73594944f..614253575320 100644
--- a/pom.xml
+++ b/pom.xml
@@ -382,6 +382,10 @@
false
+
+ databricks-maven-proxy
+ https://maven-proxy.dev.databricks.com
+
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 05fdb6bca932..409ecd2eddbe 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -3774,7 +3774,8 @@ object SQLConf {
buildConf("spark.sql.streaming.checkpoint.fileChecksum.enabled")
.internal()
.doc("When true, checksum would be generated and verified for checkpoint files. " +
- "This is used to detect file corruption.")
+ "This is used to detect file corruption. This is only enabled when " +
+ "STATE_STORE_CHECKPOINT_FORMAT_VERSION >= 2")
.version("4.1.0")
.booleanConf
.createWithDefault(true)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala
index 5a3e875541d0..e19ac06732fa 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala
@@ -114,7 +114,8 @@ class StateStoreConf(
val compressionCodec: String = sqlConf.stateStoreCompressionCodec
/** Whether file checksum generation and verification is enabled. */
- val checkpointFileChecksumEnabled: Boolean = sqlConf.checkpointFileChecksumEnabled
+ val checkpointFileChecksumEnabled: Boolean =
+ sqlConf.checkpointFileChecksumEnabled && sqlConf.stateStoreCheckpointFormatVersion >= 2
/** Number of threads for the file checksum thread pool (0 to disable). */
val fileChecksumThreadPoolSize: Int = sqlConf.stateStoreFileChecksumThreadPoolSize
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBCheckpointFailureInjectionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBCheckpointFailureInjectionSuite.scala
index c48b492e27c2..b0220c339626 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBCheckpointFailureInjectionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBCheckpointFailureInjectionSuite.scala
@@ -634,9 +634,8 @@ class RocksDBCheckpointFailureInjectionSuite extends StreamTest
* enabled/disabled and the different behavior is shown in the test
*/
+ // File checksum is disabled for checkpoint format version 1, so we only test version 2.
Seq(
- FailureConf3(skipCreationIfFileMissingChecksum = false, checkpointFormatVersion = "1"),
- FailureConf3(skipCreationIfFileMissingChecksum = true, checkpointFormatVersion = "1"),
FailureConf3(skipCreationIfFileMissingChecksum = false, checkpointFormatVersion = "2"),
FailureConf3(skipCreationIfFileMissingChecksum = true, checkpointFormatVersion = "2")
).foreach { failureConf =>
@@ -770,60 +769,25 @@ class RocksDBCheckpointFailureInjectionSuite extends StreamTest
assert((new File(checkpointDir, "commits/1")).exists())
assert((new File(checkpointDir, "commits/2")).exists())
- val failureCase =
- !failureConf.skipCreationIfFileMissingChecksum &&
- failureConf.checkpointFormatVersion == "1"
+ // With checkpointFormatVersion = 2, the changelog file checksum should be written
+ assert(verifyChangelogFileChecksumExists(2))
- if (failureCase) {
- assert(verifyChangelogFileChecksumExists(2))
-
- // The query does not succeed, since we load the old changelog file with the checksum from
- // the new changelog file that did not overwrite the old one. This will lead to a checksum
- // verification failure when we try to load the old changelog file with the checksum from
- // the new changelog file that did not overwrite the old one.
- testStream(aggregated2, Update)(
- StartStream(
- checkpointLocation = checkpointDir.getAbsolutePath,
- additionalConfs = secondRunConfs),
- AddData(inputData, 4),
- ExpectFailure[SparkException] { ex =>
- ex.getMessage.contains("CHECKPOINT_FILE_CHECKSUM_VERIFICATION_FAILED")
- ex.getMessage.contains("2.changelog")
- }
- )
-
- // Verify that the commit file was not written
- assert(!(new File(checkpointDir, "commits/3")).exists())
- } else {
- if (failureConf.checkpointFormatVersion == "1") {
- // With checkpointFormatVersion = 1, the changelog file checksum should not be written
- assert(!verifyChangelogFileChecksumExists(2))
- } else {
- // With checkpointFormatVersion = 2, the changelog file checksum should be written
- assert(verifyChangelogFileChecksumExists(2))
- }
-
- // The query should restart successfully
- testStream(aggregated2, Update)(
- StartStream(
- checkpointLocation = checkpointDir.getAbsolutePath,
- additionalConfs = secondRunConfs),
- AddData(inputData, 4),
- CheckLastBatch((1004, 2)),
- StopStream
- )
+ // The query should restart successfully
+ testStream(aggregated2, Update)(
+ StartStream(
+ checkpointLocation = checkpointDir.getAbsolutePath,
+ additionalConfs = secondRunConfs),
+ AddData(inputData, 4),
+ CheckLastBatch((1004, 2)),
+ StopStream
+ )
- // Verify again the 2.changelog file checksum exists or not
- if (failureConf.checkpointFormatVersion == "1") {
- assert(!verifyChangelogFileChecksumExists(2))
- } else {
- assert(verifyChangelogFileChecksumExists(2))
- }
+ // Verify again the 2_.changelog file checksum exists
+ assert(verifyChangelogFileChecksumExists(2))
- assert(verifyChangelogFileExists(4))
- assert(verifyChangelogFileChecksumExists(4))
- assert((new File(checkpointDir, "commits/3")).exists())
- }
+ assert(verifyChangelogFileExists(4))
+ assert(verifyChangelogFileChecksumExists(4))
+ assert((new File(checkpointDir, "commits/3")).exists())
}
}
}
@@ -832,54 +796,66 @@ class RocksDBCheckpointFailureInjectionSuite extends StreamTest
* Test that verifies that when a task is interrupted, the store's rollback() method does not
* throw an exception and the store can still be used after the rollback.
*/
- test("SPARK-54585: Interrupted task calling rollback does not throw an exception") {
- val hadoopConf = new Configuration()
- hadoopConf.set(
- STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key,
- fileManagerClassName
- )
- withTempDirAllowFailureInjection { (remoteDir, _) =>
- val sqlConf = new SQLConf()
- sqlConf.setConfString("spark.sql.streaming.checkpoint.fileChecksum.enabled", "true")
- val rocksdbChangelogCheckpointingConfKey =
- RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".changelogCheckpointing.enabled"
- sqlConf.setConfString(rocksdbChangelogCheckpointingConfKey, "true")
- val conf = RocksDBConf(StateStoreConf(sqlConf))
+ Seq(true, false).foreach { enableStateStoreCheckpointIds =>
+ val newTestName = s"SPARK-54585: Interrupted task calling rollback does not throw an " +
+ s"exception - with enableStateStoreCheckpointIds = $enableStateStoreCheckpointIds"
+
+ val v2Confs = if (enableStateStoreCheckpointIds) {
+ Seq(SQLConf.STATE_STORE_CHECKPOINT_FORMAT_VERSION.key -> "2",
+ SQLConf.STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED.key -> "true")
+ } else {
+ Seq.empty
+ }
+ test(newTestName) {
+ val hadoopConf = new Configuration()
+ hadoopConf.set(
+ STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key,
+ fileManagerClassName
+ )
+ withSQLConf(v2Confs: _*) {
+ withTempDirAllowFailureInjection { (remoteDir, _) => {
+ val sqlConf = new SQLConf()
+ val rocksdbChangelogCheckpointingConfKey =
+ RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".changelogCheckpointing.enabled"
+ sqlConf.setConfString(rocksdbChangelogCheckpointingConfKey, "true")
+ val conf = RocksDBConf(StateStoreConf(sqlConf))
- withDB(
- remoteDir.getAbsolutePath,
- version = 0,
- conf = conf,
- hadoopConf = hadoopConf
- ) { db =>
- db.put("key0", "value0")
- val checkpointId1 = commitAndGetCheckpointId(db)
+ withDB(
+ remoteDir.getAbsolutePath,
+ version = 0,
+ conf = conf,
+ hadoopConf = hadoopConf
+ ) { db =>
+ db.put("key0", "value0")
+ val checkpointId1 = commitAndGetCheckpointId(db)
- db.load(1, checkpointId1)
- db.put("key1", "value1")
- val checkpointId2 = commitAndGetCheckpointId(db)
+ db.load(1, checkpointId1)
+ db.put("key1", "value1")
+ val checkpointId2 = commitAndGetCheckpointId(db)
- db.load(2, checkpointId2)
- db.put("key2", "value2")
+ db.load(2, checkpointId2)
+ db.put("key2", "value2")
- // Simulate what happens when a task is killed, the thread's interrupt flag is set.
- // This replicates the scenario where TaskContext.markTaskFailed() is called and
- // the task failure listener invokes RocksDBStateStore.abort() -> rollback().
- Thread.currentThread().interrupt()
+ // Simulate what happens when a task is killed, the thread's interrupt flag is set.
+ // This replicates the scenario where TaskContext.markTaskFailed() is called and
+ // the task failure listener invokes RocksDBStateStore.abort() -> rollback().
+ Thread.currentThread().interrupt()
- // rollback() should not throw an exception
- db.rollback()
+ // rollback() should not throw an exception
+ db.rollback()
- // Clear the interrupt flag for subsequent operations
- Thread.interrupted()
+ // Clear the interrupt flag for subsequent operations
+ Thread.interrupted()
- // Reload the store and insert a new value
- db.load(2, checkpointId2)
- db.put("key3", "value3")
+ // Reload the store and insert a new value
+ db.load(2, checkpointId2)
+ db.put("key3", "value3")
- // Verify the store has the correct values
- assert(db.iterator().map(toStr).toSet ===
- Set(("key0", "value0"), ("key1", "value1"), ("key3", "value3")))
+ // Verify the store has the correct values
+ assert(db.iterator().map(toStr).toSet ===
+ Set(("key0", "value0"), ("key1", "value1"), ("key3", "value3")))
+ }
+ }}
}
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
index 3e4b4b7320f5..302324043ec3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
@@ -17,7 +17,9 @@
package org.apache.spark.sql.execution.streaming.state
+import java.io.File
import java.util.UUID
+import java.util.concurrent.{ConcurrentLinkedQueue, CountDownLatch, TimeUnit}
import scala.collection.immutable
import scala.concurrent.{ExecutionContext, Future}
@@ -25,6 +27,7 @@ import scala.util.Random
import org.apache.avro.AvroTypeException
import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
import org.scalatest.BeforeAndAfter
import org.scalatest.PrivateMethodTester
import org.scalatest.matchers.should.Matchers
@@ -37,6 +40,7 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.catalyst.util.quietly
+import org.apache.spark.sql.execution.streaming.checkpointing.{CheckpointFileManager, ChecksumCheckpointFileManager, ChecksumFile}
import org.apache.spark.sql.execution.streaming.operators.stateful.StatefulOperatorStateInfo
import org.apache.spark.sql.execution.streaming.runtime.StreamExecution
import org.apache.spark.sql.internal.SQLConf
@@ -3055,6 +3059,360 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid
s"acquired thread should be curent thread ${Thread.currentThread().getId} " +
s"after load but was $threadId")
}
+
+ private def withCheckpointFormatV2(f: => Unit): Unit = {
+ withSQLConf(
+ SQLConf.STATE_STORE_CHECKPOINT_FORMAT_VERSION.key -> "2") {
+ f
+ }
+ }
+
+ private def verifyChecksumFiles(
+ dir: String, expectedNumFiles: Int, expectedNumChecksumFiles: Int): Unit = {
+ val allFiles = new File(dir)
+ // filter out dirs and local hdfs files
+ .listFiles().filter(f => f.isFile && !f.getName.startsWith("."))
+ .map(f => new Path(f.toURI)).toSet
+ assert(allFiles.size == expectedNumFiles)
+
+ val checksumFiles = allFiles.filter(
+ ChecksumCheckpointFileManager.isChecksumFile).map(ChecksumFile)
+ assert(checksumFiles.size == expectedNumChecksumFiles)
+
+ // verify that no orphan checksum file i.e. the respective main file should be present
+ assert(checksumFiles.forall(c => allFiles.contains(c.mainFilePath)))
+ }
+
+ /**
+ * Helper to put a value and commit, returning (newVersion, checkpointInfo).
+ * The checkpoint info carries the unique ID needed for V2 reloads.
+ */
+ private def putAndCommitStore(
+ provider: RocksDBStateStoreProvider,
+ loadVersion: Long,
+ doMaintenance: Boolean,
+ uniqueId: Option[String] = None): (Long, StateStoreCheckpointInfo) = {
+ val store = provider.getStore(loadVersion, uniqueId)
+ put(store, loadVersion.toString, loadVersion.toInt, loadVersion.toInt * 100)
+ val newVersion = store.commit()
+ val ckptInfo = store.getStateStoreCheckpointInfo()
+
+ if (doMaintenance) {
+ provider.doMaintenance()
+ }
+
+ (newVersion, ckptInfo)
+ }
+
+ testWithAllCodec("file checksum can be enabled and disabled for the same checkpoint") { _ =>
+ withCheckpointFormatV2 {
+ val storeId = StateStoreId(newDir(), 0L, 1)
+ var version = 0L
+ var lastCkptId: Option[String] = None
+
+ // Commit to store using file checksum
+ withSQLConf(SQLConf.STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED.key -> true.toString) {
+ tryWithProviderResource(newStoreProviderWithClonedConf(storeId)) { provider =>
+ val store = provider.getStore(version)
+ put(store, "1", 11, 100)
+ put(store, "2", 22, 200)
+ version = store.commit()
+ lastCkptId = store.getStateStoreCheckpointInfo().stateStoreCkptId
+ }
+ }
+
+ // Reload the store and commit without file checksum
+ withSQLConf(SQLConf.STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED.key -> false.toString) {
+ tryWithProviderResource(newStoreProviderWithClonedConf(storeId)) { provider =>
+ assert(version == 1)
+ val store = provider.getStore(version, lastCkptId)
+ assert(get(store, "1", 11) === Some(100))
+ assert(get(store, "2", 22) === Some(200))
+
+ put(store, "3", 33, 300)
+ put(store, "4", 44, 400)
+ version = store.commit()
+ lastCkptId = store.getStateStoreCheckpointInfo().stateStoreCkptId
+ }
+ }
+
+ // Reload the store and commit with file checksum
+ withSQLConf(SQLConf.STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED.key -> true.toString) {
+ tryWithProviderResource(newStoreProviderWithClonedConf(storeId)) { provider =>
+ assert(version == 2)
+ val store = provider.getStore(version, lastCkptId)
+ assert(get(store, "1", 11) === Some(100))
+ assert(get(store, "2", 22) === Some(200))
+ assert(get(store, "3", 33) === Some(300))
+ assert(get(store, "4", 44) === Some(400))
+
+ put(store, "5", 55, 500)
+ version = store.commit()
+ }
+ }
+ }
+ }
+
+ test("checksum files are also cleaned up during maintenance") {
+ withCheckpointFormatV2 {
+ val storeId = StateStoreId(newDir(), 0L, 1)
+ val numBatches = 6
+ val minDeltas = 2
+ // Adding 1 to ensure snapshot is uploaded.
+ // Snapshot upload might happen at minDeltas or minDeltas + 1, depending on the provider
+ val maintFrequency = minDeltas + 1
+ var version = 0L
+ var lastCkptInfo: StateStoreCheckpointInfo = null
+
+ withSQLConf(SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> minDeltas.toString,
+ SQLConf.MIN_BATCHES_TO_RETAIN.key -> "1",
+ // So that RocksDB will also generate changelog files
+ RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".changelogCheckpointing.enabled" ->
+ true.toString) {
+
+ tryWithProviderResource(newStoreProviderWithClonedConf(storeId)) { provider =>
+ var ckptId: Option[String] = None
+ (version + 1 to numBatches).foreach { i =>
+ val (v, info) = putAndCommitStore(
+ provider, loadVersion = i - 1, doMaintenance = i % maintFrequency == 0,
+ uniqueId = ckptId)
+ version = v
+ ckptId = info.stateStoreCkptId
+ lastCkptInfo = info
+ }
+
+ // For RocksDB State store, files left:
+ // 6.changelog (+ checksum file), 6.zip (+ checksum file)
+ verifyChecksumFiles(storeId.storeCheckpointLocation().toString,
+ expectedNumFiles = 4, expectedNumChecksumFiles = 2)
+ }
+
+ // turn off file checksum, and verify that the previously created checksum files
+ // will be deleted by maintenance
+ withSQLConf(SQLConf.STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED.key -> false.toString) {
+ tryWithProviderResource(newStoreProviderWithClonedConf(storeId)) { provider =>
+ var ckptId = lastCkptInfo.stateStoreCkptId
+ (version + 1 to version + numBatches).foreach { i =>
+ val (v, info) = putAndCommitStore(
+ provider, loadVersion = i - 1, doMaintenance = i % maintFrequency == 0,
+ uniqueId = ckptId)
+ version = v
+ ckptId = info.stateStoreCkptId
+ }
+
+ // now verify no checksum files are left
+ // For RocksDB State store, files left:
+ // 12.changelog, 12.zip
+ verifyChecksumFiles(storeId.storeCheckpointLocation().toString,
+ expectedNumFiles = 2, expectedNumChecksumFiles = 0)
+ }
+ }
+ }
+ }
+ }
+
+
+ test("Load spark 4.0 golden checkpoint written without checksum") {
+ withCheckpointFormatV2 {
+ // We have Spark 4.0 golden files for each of these operations and store names.
+ // Tuple(operation name, store name, key schema, value schema)
+ val stores = Seq(
+ ("agg", StateStoreId.DEFAULT_STORE_NAME,
+ StructType(Seq(StructField("_1", IntegerType, nullable = false))),
+ StructType(Seq(StructField("count", LongType, nullable = false)))),
+ ("dedup", StateStoreId.DEFAULT_STORE_NAME,
+ StructType(Seq(StructField("_1", IntegerType, nullable = false))),
+ StructType(Seq(StructField("__dummy__", NullType)))),
+ ("join1", "left-keyToNumValues",
+ StructType(Seq(StructField("field0", IntegerType, nullable = false))),
+ StructType(Seq(StructField("value", LongType)))),
+ ("join1", "left-keyWithIndexToValue",
+ StructType(Seq(StructField("field0", IntegerType, nullable = false))),
+ StructType(Seq(StructField("value", LongType)))),
+ ("join1", "right-keyToNumValues",
+ StructType(Seq(StructField("field0", IntegerType, nullable = false))),
+ StructType(Seq(StructField("value", LongType)))),
+ ("join1", "right-keyWithIndexToValue",
+ StructType(Seq(StructField("field0", IntegerType, nullable = false))),
+ StructType(Seq(StructField("value", LongType))))
+ )
+
+ // now try to load the store with checksum enabled.
+ // Golden files are V1 format (no unique IDs), loaded via snapshots.
+ withSQLConf(SQLConf.STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED.key -> true.toString) {
+ stores.foreach { case (opName, storeName, keySchema, valueSchema) =>
+ (1 to 4).foreach { version =>
+ tryWithProviderResource(newStoreProviderNoInit()) { provider =>
+ val checkpointPath = this.getClass.getResource(
+ s"/structured-streaming/checkpoint-version-4.0.0/rocksdb/$opName/state"
+ ).getPath
+
+ val conf = new Configuration()
+ conf.set(StreamExecution.RUN_ID_KEY, UUID.randomUUID().toString)
+ provider.init(
+ StateStoreId(checkpointPath, 0, 0, storeName),
+ keySchema,
+ valueSchema,
+ NoPrefixKeyStateEncoderSpec(keySchema),
+ useColumnFamilies = false,
+ new StateStoreConf(cloneSQLConf()),
+ conf
+ )
+ val store = provider.getStore(version)
+ store.abort()
+ }
+ }
+ }
+ }
+ }
+ }
+
+ test("fileChecksumThreadPoolSize propagates to ChecksumCheckpointFileManager") {
+ withCheckpointFormatV2 {
+ Seq(0, 1, 6).foreach { numThreads =>
+ val storeId = StateStoreId(newDir(), 0L, 0)
+ withSQLConf(
+ SQLConf.STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED.key -> "true",
+ SQLConf.STATE_STORE_FILE_CHECKSUM_THREAD_POOL_SIZE.key -> numThreads.toString) {
+ tryWithProviderResource(newStoreProviderWithClonedConf(storeId)) { provider =>
+ val fmMethod = PrivateMethod[CheckpointFileManager](Symbol("fm"))
+ val fm = provider.rocksDB.fileManager invokePrivate fmMethod()
+ assert(fm.isInstanceOf[ChecksumCheckpointFileManager])
+ assert(fm.asInstanceOf[ChecksumCheckpointFileManager].numThreads === numThreads)
+ }
+ }
+ }
+ }
+ }
+
+ test("file checksum behavior respect checkpoint format version") {
+ val cases = Seq(
+ // (ckptFormatVersion, checksumConfEnabled, expectedResult)
+ // always disable file checksum with ckpt format v1
+ (1, true, false),
+ (1, false, false),
+ (2, true, true),
+ (2, false, false)
+ )
+
+ cases.foreach { case (version, checksumConfEnabled, expected) =>
+ val sqlConf = new SQLConf()
+ sqlConf.setConf(SQLConf.STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED, checksumConfEnabled)
+ sqlConf.setConf(SQLConf.STATE_STORE_CHECKPOINT_FORMAT_VERSION, version)
+
+ val storeConf = StateStoreConf(sqlConf)
+ assert(
+ storeConf.checkpointFileChecksumEnabled == expected,
+ s"ckptVersion = $version, checksumConf = $checksumConfEnabled, expected = $expected," +
+ s" actual = ${storeConf.checkpointFileChecksumEnabled}"
+ )
+ }
+ }
+
+ test("STATE_STORE_FILE_CHECKSUM_THREAD_POOL_SIZE: invalid negative value is rejected") {
+ val sqlConf = SQLConf.get.clone()
+ val ex = intercept[IllegalArgumentException] {
+ sqlConf.setConfString(SQLConf.STATE_STORE_FILE_CHECKSUM_THREAD_POOL_SIZE.key, "-1")
+ }
+ assert(ex.getMessage.contains("Must be a non-negative integer"))
+ }
+
+ test("fileChecksumThreadPoolSize = 0 supports sequential I/O (load, write, commit, reload)") {
+ withCheckpointFormatV2 {
+ val storeId = StateStoreId(newDir(), 0L, 0)
+ var lastCkptId: Option[String] = None
+ withSQLConf(
+ SQLConf.STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED.key -> "true",
+ SQLConf.STATE_STORE_FILE_CHECKSUM_THREAD_POOL_SIZE.key -> "0") {
+ // Write some state with sequential mode enabled
+ tryWithProviderResource(newStoreProviderWithClonedConf(storeId)) { provider =>
+ val store = provider.getStore(0)
+ put(store, "a", 0, 1)
+ put(store, "b", 0, 2)
+ store.commit()
+ lastCkptId = store.getStateStoreCheckpointInfo().stateStoreCkptId
+ // Verify that main file and checksum file were both written to disk.
+ // RocksDB produces 2 files and 1 checksum file: 1.zip, 1.zip.crc
+ verifyChecksumFiles(storeId.storeCheckpointLocation().toString,
+ expectedNumFiles = 2, expectedNumChecksumFiles = 1)
+ }
+
+ // Reload and verify state is intact
+ tryWithProviderResource(newStoreProviderWithClonedConf(storeId)) { provider =>
+ val store = provider.getStore(1, lastCkptId)
+ assert(get(store, "a", 0) === Some(1))
+ assert(get(store, "b", 0) === Some(2))
+ store.abort()
+ }
+ }
+ }
+ }
+
+ test(
+ "fileChecksumThreadPoolSize = 0: concurrent store commit and maintenance both complete") {
+ withCheckpointFormatV2 {
+ val storeId = StateStoreId(newDir(), 0L, 0)
+ var lastCkptId: Option[String] = None
+ withSQLConf(
+ SQLConf.STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED.key -> "true",
+ SQLConf.STATE_STORE_FILE_CHECKSUM_THREAD_POOL_SIZE.key -> "0",
+ SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1",
+ // So that RocksDB uploads a snapshot during maintenance rather than being a no-op.
+ RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".changelogCheckpointing.enabled" -> "true") {
+ tryWithProviderResource(newStoreProviderWithClonedConf(storeId)) { provider =>
+ // Build up a few versions so maintenance has something to work with.
+ // With minDeltasForSnapshot=1 and changelog checkpointing enabled, doMaintenance()
+ // uploads a queued snapshot, exercising the checksum file manager concurrently.
+ var ckptId: Option[String] = None
+ (0L until 3L).foreach { version =>
+ val (_, info) = putAndCommitStore(provider, version,
+ doMaintenance = false, uniqueId = ckptId)
+ ckptId = info.stateStoreCkptId
+ }
+
+ // Load the store and prepare the write before maintenance starts, so that
+ // store.commit() (the actual file I/O) is what overlaps with doMaintenance().
+ val store = provider.getStore(3, ckptId)
+ put(store, "3", 3, 300)
+
+ val errors = new ConcurrentLinkedQueue[Throwable]()
+ val maintenanceStartedLatch = new CountDownLatch(1)
+ val maintenanceDoneLatch = new CountDownLatch(1)
+
+ val maintenanceThread = new Thread(() => {
+ try {
+ maintenanceStartedLatch.countDown()
+ provider.doMaintenance()
+ } catch {
+ case t: Throwable => errors.add(t)
+ } finally {
+ maintenanceDoneLatch.countDown()
+ }
+ })
+ maintenanceThread.setDaemon(true)
+ maintenanceThread.start()
+
+ // Wait until maintenance is running, then commit to simulate concurrency.
+ assert(maintenanceStartedLatch.await(30, TimeUnit.SECONDS),
+ "Maintenance thread did not start within 30 seconds")
+ store.commit()
+ lastCkptId = store.getStateStoreCheckpointInfo().stateStoreCkptId
+
+ assert(maintenanceDoneLatch.await(30, TimeUnit.SECONDS),
+ "Maintenance did not complete within 30 seconds")
+ assert(errors.isEmpty,
+ s"Maintenance failed with: ${Option(errors.peek()).map(_.getMessage).orNull}")
+ }
+
+ // Verify state committed concurrently with maintenance is intact.
+ tryWithProviderResource(newStoreProviderWithClonedConf(storeId)) { provider =>
+ val store = provider.getStore(4, lastCkptId)
+ assert(get(store, "3", 3) === Some(300))
+ store.abort()
+ }
+ }
+ }
+ }
}
/**
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
index 859aa7c09449..4fa19adf0503 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
@@ -264,7 +264,14 @@ trait AlsoTestWithRocksDBFeatures
val newTestName = s"$testName - with enableStateStoreCheckpointIds = " +
s"$enableStateStoreCheckpointIds"
testWithColumnFamilies(newTestName, testMode, testTags: _*) { colFamiliesEnabled =>
- testBody(enableStateStoreCheckpointIds, colFamiliesEnabled)
+ val v2Confs = if (enableStateStoreCheckpointIds) {
+ Seq(SQLConf.STATE_STORE_CHECKPOINT_FORMAT_VERSION.key -> "2")
+ } else {
+ Seq.empty
+ }
+ withSQLConf(v2Confs: _*) {
+ testBody(enableStateStoreCheckpointIds, colFamiliesEnabled)
+ }
}
}
}
@@ -277,7 +284,14 @@ trait AlsoTestWithRocksDBFeatures
val newTestName = s"$testName - with enableStateStoreCheckpointIds = " +
s"$enableStateStoreCheckpointIds"
test(newTestName, testTags: _*) {
- testBody(enableStateStoreCheckpointIds)
+ val v2Confs = if (enableStateStoreCheckpointIds) {
+ Seq(SQLConf.STATE_STORE_CHECKPOINT_FORMAT_VERSION.key -> "2")
+ } else {
+ Seq.empty
+ }
+ withSQLConf(v2Confs: _*) {
+ testBody(enableStateStoreCheckpointIds)
+ }
}
}
}
@@ -290,7 +304,14 @@ trait AlsoTestWithRocksDBFeatures
val newTestName = s"$testName - with enableStateStoreCheckpointIds = " +
s"$enableStateStoreCheckpointIds"
testWithChangelogCheckpointingDisabled(newTestName, testTags: _*) {
- enableStateStoreCheckpointIds => testBody(enableStateStoreCheckpointIds)
+ val v2Confs = if (enableStateStoreCheckpointIds) {
+ Seq(SQLConf.STATE_STORE_CHECKPOINT_FORMAT_VERSION.key -> "2")
+ } else {
+ Seq.empty
+ }
+ withSQLConf(v2Confs: _*) {
+ testBody(enableStateStoreCheckpointIds)
+ }
}
}
}
@@ -3550,7 +3571,9 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with SharedSparkSession
}
// reload version 2 - should succeed
- withDB(remoteDir, version = 2, conf = conf) { db =>
+ withDB(remoteDir, version = 2, conf = conf,
+ enableStateStoreCheckpointIds = enableStateStoreCheckpointIds,
+ versionToUniqueId = versionToUniqueId) { db =>
}
}
@@ -3584,15 +3607,25 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with SharedSparkSession
db.commit() // create snapshot again
// load version 1 - should succeed
- withDB(remoteDir, version = 1, conf = conf, hadoopConf = hadoopConf) { db =>
+ withDB(remoteDir, version = 1, conf = conf, hadoopConf = hadoopConf,
+ enableStateStoreCheckpointIds = enableStateStoreCheckpointIds,
+ versionToUniqueId = versionToUniqueId) { db =>
}
// upload recently created snapshot
db.doMaintenance()
- assert(snapshotVersionsPresent(remoteDir) == Seq(1))
+ // With V2 checkpoint format, each commit gets a unique ID so the second
+ // version-1 snapshot has a different filename and both coexist on disk.
+ if (enableStateStoreCheckpointIds) {
+ assert(snapshotVersionsPresent(remoteDir) == Seq(1, 1))
+ } else {
+ assert(snapshotVersionsPresent(remoteDir) == Seq(1))
+ }
// load version 1 again - should succeed
- withDB(remoteDir, version = 1, conf = conf, hadoopConf = hadoopConf) { db =>
+ withDB(remoteDir, version = 1, conf = conf, hadoopConf = hadoopConf,
+ enableStateStoreCheckpointIds = enableStateStoreCheckpointIds,
+ versionToUniqueId = versionToUniqueId) { db =>
}
}
}
@@ -3725,7 +3758,7 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with SharedSparkSession
if (inc > 1) {
// Create changelog files in the gap
for (j <- 1 to inc - 1) {
- db2.load(curVer + j)
+ db2.load(curVer + j, versionToUniqueId.get(curVer + j))
db2.put("foo", "bar")
db2.commit()
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
index 7570cf942f06..1c98424987b6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
@@ -43,7 +43,7 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.catalyst.util.quietly
import org.apache.spark.sql.execution.streaming._
-import org.apache.spark.sql.execution.streaming.checkpointing.{CheckpointFileManager, ChecksumCheckpointFileManager, ChecksumFile}
+import org.apache.spark.sql.execution.streaming.checkpointing.CheckpointFileManager
import org.apache.spark.sql.execution.streaming.runtime.{MemoryStream, StreamExecution}
import org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorSuite.withCoordinatorRef
import org.apache.spark.sql.functions.count
@@ -1195,13 +1195,13 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider]
// Put should create a temp file
put(store0, "a", 0, 1)
- assert(numTempFiles === 2)
+ assert(numTempFiles === 1)
assert(numDeltaFiles === 0)
// Commit should remove temp file and create a delta file
store0.commit()
assert(numTempFiles === 0)
- assert(numDeltaFiles === 2)
+ assert(numDeltaFiles === 1)
// Remove should create a temp file
val store1 = shouldNotCreateTempFile {
@@ -1211,13 +1211,13 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider]
version = 1, None, None, useColumnFamilies = false, storeConf, hadoopConf)
}
remove(store1, _._1 == "a")
- assert(numTempFiles === 2)
- assert(numDeltaFiles === 2)
+ assert(numTempFiles === 1)
+ assert(numDeltaFiles === 1)
// Commit should remove temp file and create a delta file
store1.commit()
assert(numTempFiles === 0)
- assert(numDeltaFiles === 4)
+ assert(numDeltaFiles === 2)
// Commit without any updates should create a delta file
val store2 = shouldNotCreateTempFile {
@@ -1228,7 +1228,7 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider]
}
store2.commit()
assert(numTempFiles === 0)
- assert(numDeltaFiles === 6)
+ assert(numDeltaFiles === 3)
}
test("SPARK-21145: Restarted queries create new provider instances") {
@@ -1463,7 +1463,6 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider]
test("Auto snapshot repair") {
withSQLConf(
- SQLConf.STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED.key -> false.toString,
SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1" // for hdfs means every 2 versions
) {
val storeId = StateStoreId(newDir(), 0L, 1)
@@ -2248,370 +2247,6 @@ abstract class StateStoreSuiteBase[ProviderClass <: StateStoreProvider]
}
}
- testWithAllCodec("file checksum can be enabled and disabled for the same checkpoint") { _ =>
- val storeId = StateStoreId(newDir(), 0L, 1)
- var version = 0L
-
- // Commit to store using file checksum
- withSQLConf(SQLConf.STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED.key -> true.toString) {
- tryWithProviderResource(newStoreProviderWithClonedConf(storeId)) { provider =>
- val store = provider.getStore(version)
- put(store, "1", 11, 100)
- put(store, "2", 22, 200)
- version = store.commit()
- }
- }
-
- // Reload the store and commit without file checksum
- withSQLConf(SQLConf.STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED.key -> false.toString) {
- tryWithProviderResource(newStoreProviderWithClonedConf(storeId)) { provider =>
- assert(version == 1)
- val store = provider.getStore(version)
- assert(get(store, "1", 11) === Some(100))
- assert(get(store, "2", 22) === Some(200))
-
- put(store, "3", 33, 300)
- put(store, "4", 44, 400)
- version = store.commit()
- }
- }
-
- // Reload the store and commit with file checksum
- withSQLConf(SQLConf.STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED.key -> true.toString) {
- tryWithProviderResource(newStoreProviderWithClonedConf(storeId)) { provider =>
- assert(version == 2)
- val store = provider.getStore(version)
- assert(get(store, "1", 11) === Some(100))
- assert(get(store, "2", 22) === Some(200))
- assert(get(store, "3", 33) === Some(300))
- assert(get(store, "4", 44) === Some(400))
-
- put(store, "5", 55, 500)
- version = store.commit()
- }
- }
- }
-
- test("checksum files are also cleaned up during maintenance") {
- val storeId = StateStoreId(newDir(), 0L, 1)
- val numBatches = 6
- val minDeltas = 2
- // Adding 1 to ensure snapshot is uploaded.
- // Snapshot upload might happen at minDeltas or minDeltas + 1, depending on the provider
- val maintFrequency = minDeltas + 1
- var version = 0L
-
- withSQLConf(SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> minDeltas.toString,
- SQLConf.MIN_BATCHES_TO_RETAIN.key -> "1",
- // So that RocksDB will also generate changelog files
- RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".changelogCheckpointing.enabled" ->
- true.toString) {
-
- tryWithProviderResource(newStoreProviderWithClonedConf(storeId)) { provider =>
- (version + 1 to numBatches).foreach { i =>
- version = putAndCommitStore(
- provider, loadVersion = i - 1, doMaintenance = i % maintFrequency == 0)
- }
-
- // This is because, hdfs and rocksdb old files detection logic is different
- provider match {
- case _: HDFSBackedStateStoreProvider =>
- // For HDFS State store, files left:
- // 3.delta to 6.delta (+ checksum file)
- // 3.snapshot (+ checksum file), 6.snapshot (+ checksum file)
- verifyChecksumFiles(storeId.storeCheckpointLocation().toString,
- expectedNumFiles = 12, expectedNumChecksumFiles = 6)
- case _ =>
- // For RocksDB State store, files left:
- // 6.changelog (+ checksum file), 6.zip (+ checksum file)
- verifyChecksumFiles(storeId.storeCheckpointLocation().toString,
- expectedNumFiles = 4, expectedNumChecksumFiles = 2)
- }
- }
-
- // turn off file checksum, and verify that the previously created checksum files
- // will be deleted by maintenance
- withSQLConf(SQLConf.STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED.key -> false.toString) {
- tryWithProviderResource(newStoreProviderWithClonedConf(storeId)) { provider =>
- (version + 1 to version + numBatches).foreach { i =>
- version = putAndCommitStore(
- provider, loadVersion = i - 1, doMaintenance = i % maintFrequency == 0)
- }
-
- // now verify no checksum files are left
- // This is because, hdfs and rocksdb old files detection logic is different
- provider match {
- case _: HDFSBackedStateStoreProvider =>
- // For HDFS State store, files left:
- // 6.delta, 9.delta to 12.delta
- // 9.snapshot, 12.snapshot
- verifyChecksumFiles(storeId.storeCheckpointLocation().toString,
- expectedNumFiles = 7, expectedNumChecksumFiles = 0)
- case _ =>
- // For RocksDB State store, files left:
- // 12.changelog, 12.zip
- verifyChecksumFiles(storeId.storeCheckpointLocation().toString,
- expectedNumFiles = 2, expectedNumChecksumFiles = 0)
- }
- }
- }
- }
- }
-
- testWithAllCodec("overwrite state file without overwriting checksum file") { _ =>
- val storeId = StateStoreId(newDir(), 0L, 1)
- val numBatches = 3
- val minDeltas = 2
-
- withSQLConf(SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> minDeltas.toString,
- // So that RocksDB will also generate changelog files
- RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".changelogCheckpointing.enabled" ->
- true.toString) {
-
- // First run with file checksum enabled. It will generate state and checksum files.
- // Turn off file checksum, and regenerate only the state files
- Seq(true, false).foreach { fileChecksumEnabled =>
- withSQLConf(
- SQLConf.STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED.key -> fileChecksumEnabled.toString) {
- tryWithProviderResource(newStoreProviderWithClonedConf(storeId)) { provider =>
- (1 to numBatches).foreach { i =>
- putAndCommitStore(
- provider, loadVersion = i - 1, doMaintenance = false)
- }
-
- // This should only create snapshot and no delete
- provider.doMaintenance()
-
- // number of files should be the same.
- // 3 delta/changelog files, 1 snapshot (with checksum files)
- verifyChecksumFiles(storeId.storeCheckpointLocation().toString,
- expectedNumFiles = 8, expectedNumChecksumFiles = 4)
- }
- }
- }
-
- withSQLConf(
- SQLConf.STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED.key -> true.toString) {
- // now try to load the store with checksum enabled.
- // It will verify the overwritten state files with the checksum files.
- (1 to numBatches).foreach { i =>
- tryWithProviderResource(newStoreProviderWithClonedConf(storeId)) { provider =>
- // load from DFS should be successful
- val store = provider.getStore(i)
- store.abort()
- }
- }
- }
- }
- }
-
- test("Load spark 4.0 golden checkpoint written without checksum") {
- // We have Spark 4.0 golden files for each of these operations and store names.
- // Tuple(operation name, store name, key schema, value schema)
- val stores = Seq(
- ("agg", StateStoreId.DEFAULT_STORE_NAME,
- StructType(Seq(StructField("_1", IntegerType, nullable = false))),
- StructType(Seq(StructField("count", LongType, nullable = false)))),
- ("dedup", StateStoreId.DEFAULT_STORE_NAME,
- StructType(Seq(StructField("_1", IntegerType, nullable = false))),
- StructType(Seq(StructField("__dummy__", NullType)))),
- ("join1", "left-keyToNumValues",
- StructType(Seq(StructField("field0", IntegerType, nullable = false))),
- StructType(Seq(StructField("value", LongType)))),
- ("join1", "left-keyWithIndexToValue",
- StructType(Seq(StructField("field0", IntegerType, nullable = false))),
- StructType(Seq(StructField("value", LongType)))),
- ("join1", "right-keyToNumValues",
- StructType(Seq(StructField("field0", IntegerType, nullable = false))),
- StructType(Seq(StructField("value", LongType)))),
- ("join1", "right-keyWithIndexToValue",
- StructType(Seq(StructField("field0", IntegerType, nullable = false))),
- StructType(Seq(StructField("value", LongType))))
- )
-
- // now try to load the store with checksum enabled.
- withSQLConf(SQLConf.STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED.key -> true.toString) {
- stores.foreach { case (opName, storeName, keySchema, valueSchema) =>
- (1 to 4).foreach { version =>
- tryWithProviderResource(newStoreProviderNoInit()) { provider =>
- // load from golden checkpoint should be successful
- val providerName = provider match {
- case _: HDFSBackedStateStoreProvider => "hdfs"
- case _: RocksDBStateStoreProvider => "rocksdb"
- case _ => throw new IllegalArgumentException("Unknown ProviderClass")
- }
-
- val checkpointPath = this.getClass.getResource(
- s"/structured-streaming/checkpoint-version-4.0.0/" +
- s"$providerName/$opName/state"
- ).getPath
-
- val conf = new Configuration()
- conf.set(StreamExecution.RUN_ID_KEY, UUID.randomUUID().toString)
- provider.init(
- StateStoreId(checkpointPath, 0, 0, storeName),
- keySchema,
- valueSchema,
- NoPrefixKeyStateEncoderSpec(keySchema),
- useColumnFamilies = false,
- new StateStoreConf(cloneSQLConf()),
- conf
- )
- val store = provider.getStore(version)
- store.abort()
- }
- }
- }
- }
- }
-
- test("fileChecksumThreadPoolSize propagates to ChecksumCheckpointFileManager") {
- Seq(0, 1, 6).foreach { numThreads =>
- val storeId = StateStoreId(newDir(), 0L, 0)
- withSQLConf(
- SQLConf.STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED.key -> "true",
- SQLConf.STATE_STORE_FILE_CHECKSUM_THREAD_POOL_SIZE.key -> numThreads.toString) {
- tryWithProviderResource(newStoreProviderWithClonedConf(storeId)) { provider =>
- val fmMethod = PrivateMethod[CheckpointFileManager](Symbol("fm"))
- val fm = provider match {
- case hdfs: HDFSBackedStateStoreProvider =>
- hdfs.fm
- case rocksdb: RocksDBStateStoreProvider =>
- rocksdb.rocksDB.fileManager invokePrivate fmMethod()
- case _ =>
- fail(s"Unexpected provider type: ${provider.getClass.getName}")
- }
- assert(fm.isInstanceOf[ChecksumCheckpointFileManager])
- assert(fm.asInstanceOf[ChecksumCheckpointFileManager].numThreads === numThreads)
- }
- }
- }
- }
-
- test("STATE_STORE_FILE_CHECKSUM_THREAD_POOL_SIZE: invalid negative value is rejected") {
- val sqlConf = SQLConf.get.clone()
- val ex = intercept[IllegalArgumentException] {
- sqlConf.setConfString(SQLConf.STATE_STORE_FILE_CHECKSUM_THREAD_POOL_SIZE.key, "-1")
- }
- assert(ex.getMessage.contains("Must be a non-negative integer"))
- }
-
- test("fileChecksumThreadPoolSize = 0 supports sequential I/O (load, write, commit, reload)") {
- val storeId = StateStoreId(newDir(), 0L, 0)
- withSQLConf(
- SQLConf.STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED.key -> "true",
- SQLConf.STATE_STORE_FILE_CHECKSUM_THREAD_POOL_SIZE.key -> "0") {
- // Write some state with sequential mode enabled
- tryWithProviderResource(newStoreProviderWithClonedConf(storeId)) { provider =>
- val store = provider.getStore(0)
- put(store, "a", 0, 1)
- put(store, "b", 0, 2)
- store.commit()
- // Verify that main file and checksum file were both written to disk.
- // Both HDFS and RocksDB produce 2 files and 1 checksum file:
- // HDFS: 1.delta, 1.delta.checksum
- // RocksDB: 1.zip, 1.zip.crc
- verifyChecksumFiles(storeId.storeCheckpointLocation().toString,
- expectedNumFiles = 2, expectedNumChecksumFiles = 1)
- }
-
- // Reload and verify state is intact
- tryWithProviderResource(newStoreProviderWithClonedConf(storeId)) { provider =>
- val store = provider.getStore(1)
- assert(get(store, "a", 0) === Some(1))
- assert(get(store, "b", 0) === Some(2))
- store.abort()
- }
- }
- }
-
- test("fileChecksumThreadPoolSize = 0: concurrent store commit and maintenance both complete") {
- val storeId = StateStoreId(newDir(), 0L, 0)
- withSQLConf(
- SQLConf.STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED.key -> "true",
- SQLConf.STATE_STORE_FILE_CHECKSUM_THREAD_POOL_SIZE.key -> "0",
- SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1",
- // So that RocksDB uploads a snapshot during maintenance rather than being a no-op.
- RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".changelogCheckpointing.enabled" -> "true") {
- tryWithProviderResource(newStoreProviderWithClonedConf(storeId)) { provider =>
- // Build up a few versions so maintenance has something to work with.
- // With minDeltasForSnapshot=1 and changelog checkpointing enabled, doMaintenance()
- // uploads a queued snapshot, exercising the checksum file manager concurrently.
- (0L until 3L).foreach { version =>
- putAndCommitStore(provider, version, doMaintenance = false)
- }
-
- // Load the store and prepare the write before maintenance starts, so that
- // store.commit() (the actual file I/O) is what overlaps with doMaintenance().
- val store = provider.getStore(3)
- put(store, "3", 3, 300)
-
- val errors = new ConcurrentLinkedQueue[Throwable]()
- val maintenanceStartedLatch = new CountDownLatch(1)
- val maintenanceDoneLatch = new CountDownLatch(1)
-
- val maintenanceThread = new Thread(() => {
- try {
- maintenanceStartedLatch.countDown()
- provider.doMaintenance()
- } catch {
- case t: Throwable => errors.add(t)
- } finally {
- maintenanceDoneLatch.countDown()
- }
- })
- maintenanceThread.setDaemon(true)
- maintenanceThread.start()
-
- // Wait until maintenance is running, then commit to simulate concurrency.
- assert(maintenanceStartedLatch.await(30, TimeUnit.SECONDS),
- "Maintenance thread did not start within 30 seconds")
- store.commit()
-
- assert(maintenanceDoneLatch.await(30, TimeUnit.SECONDS),
- "Maintenance did not complete within 30 seconds")
- assert(errors.isEmpty,
- s"Maintenance failed with: ${Option(errors.peek()).map(_.getMessage).orNull}")
- }
-
- // Verify state committed concurrently with maintenance is intact.
- tryWithProviderResource(newStoreProviderWithClonedConf(storeId)) { provider =>
- val store = provider.getStore(4)
- assert(get(store, "3", 3) === Some(300))
- store.abort()
- }
- }
- }
-
- private def verifyChecksumFiles(
- dir: String, expectedNumFiles: Int, expectedNumChecksumFiles: Int): Unit = {
- val allFiles = new File(dir)
- // filter out dirs and local hdfs files
- .listFiles().filter(f => f.isFile && !f.getName.startsWith("."))
- .map(f => new Path(f.toURI)).toSet
- assert(allFiles.size == expectedNumFiles)
-
- val checksumFiles = allFiles.filter(
- ChecksumCheckpointFileManager.isChecksumFile).map(ChecksumFile)
- assert(checksumFiles.size == expectedNumChecksumFiles)
-
- // verify that no orphan checksum file i.e. the respective main file should be present
- assert(checksumFiles.forall(c => allFiles.contains(c.mainFilePath)))
- }
-
- private def putAndCommitStore(
- provider: ProviderClass, loadVersion: Long, doMaintenance: Boolean): Long = {
- val store = provider.getStore(loadVersion)
- put(store, loadVersion.toString, loadVersion.toInt, loadVersion.toInt * 100)
- val newVersion = store.commit()
-
- if (doMaintenance) {
- provider.doMaintenance()
- }
-
- newVersion
- }
-
test("StateStore.get") {
val conf = new SparkConf()
.setMaster("local")