Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,10 @@
<enabled>false</enabled>
</snapshots>
</repository>
<repository>
<id>databricks-maven-proxy</id>
<url>https://maven-proxy.dev.databricks.com</url>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3774,7 +3774,8 @@ object SQLConf {
buildConf("spark.sql.streaming.checkpoint.fileChecksum.enabled")
.internal()
.doc("When true, checksum would be generated and verified for checkpoint files. " +
"This is used to detect file corruption.")
"This is used to detect file corruption. This is only enabled when " +
"STATE_STORE_CHECKPOINT_FORMAT_VERSION >= 2")
.version("4.1.0")
.booleanConf
.createWithDefault(true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ class StateStoreConf(
val compressionCodec: String = sqlConf.stateStoreCompressionCodec

/** Whether file checksum generation and verification is enabled. */
val checkpointFileChecksumEnabled: Boolean = sqlConf.checkpointFileChecksumEnabled
val checkpointFileChecksumEnabled: Boolean =
sqlConf.checkpointFileChecksumEnabled && sqlConf.stateStoreCheckpointFormatVersion >= 2

/** Number of threads for the file checksum thread pool (0 to disable). */
val fileChecksumThreadPoolSize: Int = sqlConf.stateStoreFileChecksumThreadPoolSize
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -634,9 +634,8 @@ class RocksDBCheckpointFailureInjectionSuite extends StreamTest
* enabled/disabled and the different behavior is shown in the test
*/

// File checksum is disabled for checkpoint format version 1, so we only test version 2.
Seq(
FailureConf3(skipCreationIfFileMissingChecksum = false, checkpointFormatVersion = "1"),
FailureConf3(skipCreationIfFileMissingChecksum = true, checkpointFormatVersion = "1"),
FailureConf3(skipCreationIfFileMissingChecksum = false, checkpointFormatVersion = "2"),
FailureConf3(skipCreationIfFileMissingChecksum = true, checkpointFormatVersion = "2")
).foreach { failureConf =>
Expand Down Expand Up @@ -770,60 +769,25 @@ class RocksDBCheckpointFailureInjectionSuite extends StreamTest
assert((new File(checkpointDir, "commits/1")).exists())
assert((new File(checkpointDir, "commits/2")).exists())

val failureCase =
!failureConf.skipCreationIfFileMissingChecksum &&
failureConf.checkpointFormatVersion == "1"
// With checkpointFormatVersion = 2, the changelog file checksum should be written
assert(verifyChangelogFileChecksumExists(2))

if (failureCase) {
assert(verifyChangelogFileChecksumExists(2))

// The query does not succeed, since we load the old changelog file with the checksum from
// the new changelog file that did not overwrite the old one. This will lead to a checksum
// verification failure when we try to load the old changelog file with the checksum from
// the new changelog file that did not overwrite the old one.
testStream(aggregated2, Update)(
StartStream(
checkpointLocation = checkpointDir.getAbsolutePath,
additionalConfs = secondRunConfs),
AddData(inputData, 4),
ExpectFailure[SparkException] { ex =>
ex.getMessage.contains("CHECKPOINT_FILE_CHECKSUM_VERIFICATION_FAILED")
ex.getMessage.contains("2.changelog")
}
)

// Verify that the commit file was not written
assert(!(new File(checkpointDir, "commits/3")).exists())
} else {
if (failureConf.checkpointFormatVersion == "1") {
// With checkpointFormatVersion = 1, the changelog file checksum should not be written
assert(!verifyChangelogFileChecksumExists(2))
} else {
// With checkpointFormatVersion = 2, the changelog file checksum should be written
assert(verifyChangelogFileChecksumExists(2))
}

// The query should restart successfully
testStream(aggregated2, Update)(
StartStream(
checkpointLocation = checkpointDir.getAbsolutePath,
additionalConfs = secondRunConfs),
AddData(inputData, 4),
CheckLastBatch((1004, 2)),
StopStream
)
// The query should restart successfully
testStream(aggregated2, Update)(
StartStream(
checkpointLocation = checkpointDir.getAbsolutePath,
additionalConfs = secondRunConfs),
AddData(inputData, 4),
CheckLastBatch((1004, 2)),
StopStream
)

// Verify again the 2.changelog file checksum exists or not
if (failureConf.checkpointFormatVersion == "1") {
assert(!verifyChangelogFileChecksumExists(2))
} else {
assert(verifyChangelogFileChecksumExists(2))
}
// Verify again the 2_<uuid>.changelog file checksum exists
assert(verifyChangelogFileChecksumExists(2))

assert(verifyChangelogFileExists(4))
assert(verifyChangelogFileChecksumExists(4))
assert((new File(checkpointDir, "commits/3")).exists())
}
assert(verifyChangelogFileExists(4))
assert(verifyChangelogFileChecksumExists(4))
assert((new File(checkpointDir, "commits/3")).exists())
}
}
}
Expand All @@ -832,54 +796,66 @@ class RocksDBCheckpointFailureInjectionSuite extends StreamTest
* Test that verifies that when a task is interrupted, the store's rollback() method does not
* throw an exception and the store can still be used after the rollback.
*/
test("SPARK-54585: Interrupted task calling rollback does not throw an exception") {
val hadoopConf = new Configuration()
hadoopConf.set(
STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key,
fileManagerClassName
)
withTempDirAllowFailureInjection { (remoteDir, _) =>
val sqlConf = new SQLConf()
sqlConf.setConfString("spark.sql.streaming.checkpoint.fileChecksum.enabled", "true")
val rocksdbChangelogCheckpointingConfKey =
RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".changelogCheckpointing.enabled"
sqlConf.setConfString(rocksdbChangelogCheckpointingConfKey, "true")
val conf = RocksDBConf(StateStoreConf(sqlConf))
Seq(true, false).foreach { enableStateStoreCheckpointIds =>
val newTestName = s"SPARK-54585: Interrupted task calling rollback does not throw an " +
s"exception - with enableStateStoreCheckpointIds = $enableStateStoreCheckpointIds"

val v2Confs = if (enableStateStoreCheckpointIds) {
Seq(SQLConf.STATE_STORE_CHECKPOINT_FORMAT_VERSION.key -> "2",
SQLConf.STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED.key -> "true")
} else {
Seq.empty
}
test(newTestName) {
val hadoopConf = new Configuration()
hadoopConf.set(
STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key,
fileManagerClassName
)
withSQLConf(v2Confs: _*) {
withTempDirAllowFailureInjection { (remoteDir, _) => {
val sqlConf = new SQLConf()
val rocksdbChangelogCheckpointingConfKey =
RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".changelogCheckpointing.enabled"
sqlConf.setConfString(rocksdbChangelogCheckpointingConfKey, "true")
val conf = RocksDBConf(StateStoreConf(sqlConf))

withDB(
remoteDir.getAbsolutePath,
version = 0,
conf = conf,
hadoopConf = hadoopConf
) { db =>
db.put("key0", "value0")
val checkpointId1 = commitAndGetCheckpointId(db)
withDB(
remoteDir.getAbsolutePath,
version = 0,
conf = conf,
hadoopConf = hadoopConf
) { db =>
db.put("key0", "value0")
val checkpointId1 = commitAndGetCheckpointId(db)

db.load(1, checkpointId1)
db.put("key1", "value1")
val checkpointId2 = commitAndGetCheckpointId(db)
db.load(1, checkpointId1)
db.put("key1", "value1")
val checkpointId2 = commitAndGetCheckpointId(db)

db.load(2, checkpointId2)
db.put("key2", "value2")
db.load(2, checkpointId2)
db.put("key2", "value2")

// Simulate what happens when a task is killed, the thread's interrupt flag is set.
// This replicates the scenario where TaskContext.markTaskFailed() is called and
// the task failure listener invokes RocksDBStateStore.abort() -> rollback().
Thread.currentThread().interrupt()
// Simulate what happens when a task is killed, the thread's interrupt flag is set.
// This replicates the scenario where TaskContext.markTaskFailed() is called and
// the task failure listener invokes RocksDBStateStore.abort() -> rollback().
Thread.currentThread().interrupt()

// rollback() should not throw an exception
db.rollback()
// rollback() should not throw an exception
db.rollback()

// Clear the interrupt flag for subsequent operations
Thread.interrupted()
// Clear the interrupt flag for subsequent operations
Thread.interrupted()

// Reload the store and insert a new value
db.load(2, checkpointId2)
db.put("key3", "value3")
// Reload the store and insert a new value
db.load(2, checkpointId2)
db.put("key3", "value3")

// Verify the store has the correct values
assert(db.iterator().map(toStr).toSet ===
Set(("key0", "value0"), ("key1", "value1"), ("key3", "value3")))
// Verify the store has the correct values
assert(db.iterator().map(toStr).toSet ===
Set(("key0", "value0"), ("key1", "value1"), ("key3", "value3")))
}
}}
}
}
}
Expand Down
Loading