From 810958dc9b948aa45a36daa5e90501f25d393a1d Mon Sep 17 00:00:00 2001 From: Gabriel Igliozzi Date: Sun, 3 May 2026 11:49:47 +0200 Subject: [PATCH 1/2] Implement Logic Plan extension rule for time travel session level --- docs/docs/spark-configuration.md | 1 + .../IcebergSparkSessionExtensions.scala | 2 + .../analysis/ResolveDefaultTimestamp.scala | 83 +++++ .../TestSessionPropertyTimeTravel.java | 316 ++++++++++++++++++ .../iceberg/spark/SparkSQLProperties.java | 4 + 5 files changed, 406 insertions(+) create mode 100644 spark/v4.1/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultTimestamp.scala create mode 100644 spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSessionPropertyTimeTravel.java diff --git a/docs/docs/spark-configuration.md b/docs/docs/spark-configuration.md index e8e4f7e3c8c1..56d11881b8dc 100644 --- a/docs/docs/spark-configuration.md +++ b/docs/docs/spark-configuration.md @@ -206,6 +206,7 @@ val spark = SparkSession.builder() | spark.sql.iceberg.merge-schema | false | Enables modifying the table schema to match the write schema. Only adds columns missing columns | | spark.sql.iceberg.report-column-stats | true | Report Puffin Table Statistics if available to Spark's Cost Based Optimizer. CBO must be enabled for this to be effective | | spark.sql.iceberg.async-micro-batch-planning-enabled | false | Enables asynchronous microbatch planning to reduce planning latency by pre-fetching file scan tasks | +| spark.sql.iceberg.read.as-of-timestamp | null | Default timestamp in milliseconds for time-travel queries; applies to all reads when no explicit snapshot-id, as-of-timestamp, branch, or tag is specified | ### Read options diff --git a/spark/v4.1/spark-extensions/src/main/scala/org/apache/iceberg/spark/extensions/IcebergSparkSessionExtensions.scala b/spark/v4.1/spark-extensions/src/main/scala/org/apache/iceberg/spark/extensions/IcebergSparkSessionExtensions.scala index 81824e05e92d..e89126ca3117 100644 --- a/spark/v4.1/spark-extensions/src/main/scala/org/apache/iceberg/spark/extensions/IcebergSparkSessionExtensions.scala +++ b/spark/v4.1/spark-extensions/src/main/scala/org/apache/iceberg/spark/extensions/IcebergSparkSessionExtensions.scala @@ -21,6 +21,7 @@ package org.apache.iceberg.spark.extensions import org.apache.spark.sql.SparkSessionExtensions import org.apache.spark.sql.catalyst.analysis.CheckViews import org.apache.spark.sql.catalyst.analysis.ResolveBranch +import org.apache.spark.sql.catalyst.analysis.ResolveDefaultTimestamp import org.apache.spark.sql.catalyst.analysis.ResolveViews import org.apache.spark.sql.catalyst.optimizer.ReplaceStaticInvoke import org.apache.spark.sql.catalyst.parser.extensions.IcebergSparkSqlExtensionsParser @@ -35,6 +36,7 @@ class IcebergSparkSessionExtensions extends (SparkSessionExtensions => Unit) { // analyzer extensions extensions.injectResolutionRule { spark => ResolveViews(spark) } extensions.injectPostHocResolutionRule { spark => ResolveBranch(spark) } + extensions.injectPostHocResolutionRule { spark => ResolveDefaultTimestamp(spark) } extensions.injectCheckRule(_ => CheckViews) // optimizer extensions diff --git a/spark/v4.1/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultTimestamp.scala b/spark/v4.1/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultTimestamp.scala new file mode 100644 index 000000000000..a90f65b36fbb --- /dev/null +++ b/spark/v4.1/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultTimestamp.scala @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.spark.sql.catalyst.analysis + +import org.apache.iceberg.spark.PathIdentifier +import org.apache.iceberg.spark.SparkSQLProperties +import org.apache.iceberg.spark.TimeTravel +import org.apache.iceberg.spark.source.SparkTable +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.connector.catalog.Identifier +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation + +case class ResolveDefaultTimestamp(spark: SparkSession) extends Rule[LogicalPlan] { + + override def apply(plan: LogicalPlan): LogicalPlan = { + val sessionTimestamp = spark.conf.getOption(SparkSQLProperties.AS_OF_TIMESTAMP) + if (sessionTimestamp.isEmpty) return plan + + val timestampMillis = sessionTimestamp.get.toLong + + plan resolveOperators { + // SQL-level time travel (TIMESTAMP AS OF / VERSION AS OF branch) conflicts with session property + case DataSourceV2Relation(table: SparkTable, _, _, _, _, Some(_)) if table.branch() != null => + throw new IllegalArgumentException( + s"Cannot override ref, already set snapshot id=${table.snapshotId()}") + + // SQL-level time travel (TIMESTAMP AS OF / VERSION AS OF snapshot or tag) conflicts + case DataSourceV2Relation(_: SparkTable, _, _, _, _, Some(_)) => + throw new IllegalArgumentException( + "Cannot set both snapshot-id and as-of-timestamp to select which table snapshot to scan") + + // Catalog-level branch selection (e.g. branch_X identifier set by ResolveBranch) conflicts + case DataSourceV2Relation(table: SparkTable, _, _, _, _, None) if table.branch() != null => + throw new IllegalArgumentException( + s"Cannot override ref, already set snapshot id=${table.snapshotId()}") + + // Catalog-level snapshot/tag/timestamp selector (e.g. snapshot_id_X, tag_X) conflicts + case DataSourceV2Relation(table: SparkTable, _, _, ident, _, None) + if table.branch() == null && hasExplicitSelector(ident) => + throw new IllegalArgumentException( + "Cannot set both snapshot-id and as-of-timestamp to select which table snapshot to scan") + + // Plain read: apply session timestamp and encode the resolved snapshot in the identifier + case r @ DataSourceV2Relation(table: SparkTable, _, _, ident, _, None) + if table.branch() == null => + val newTable = SparkTable.create(table.table(), TimeTravel.timestampMillis(timestampMillis)) + val snapshotSelector = s"snapshot_id_${newTable.snapshotId()}" + val newIdent = ident.map { + case path: PathIdentifier if path.location.contains("#") => + new PathIdentifier(path.location + "," + snapshotSelector) + case path: PathIdentifier => + new PathIdentifier(path.location + "#" + snapshotSelector) + case i => + Identifier.of(i.namespace() :+ i.name(), snapshotSelector) + } + r.copy(table = newTable, identifier = newIdent) + } + } + + private def hasExplicitSelector(ident: Option[Identifier]): Boolean = + ident.exists { i => + val name = i.name() + name.startsWith("snapshot_id_") || name.startsWith("at_timestamp_") || name.startsWith("tag_") + } +} diff --git a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSessionPropertyTimeTravel.java b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSessionPropertyTimeTravel.java new file mode 100644 index 000000000000..dd186eb4cc44 --- /dev/null +++ b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSessionPropertyTimeTravel.java @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.extensions; + +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.apache.iceberg.Parameter; +import org.apache.iceberg.ParameterizedTestExtension; +import org.apache.iceberg.Parameters; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.spark.SparkCatalogConfig; +import org.apache.iceberg.spark.SparkReadOptions; +import org.apache.iceberg.spark.SparkSQLProperties; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; + +@ExtendWith(ParameterizedTestExtension.class) +public class TestSessionPropertyTimeTravel extends ExtensionsTestBase { + + @Parameter(index = 3) + private int formatVersion; + + @Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}, formatVersion = {3}") + protected static Object[][] parameters() { + return new Object[][] { + { + SparkCatalogConfig.HIVE.catalogName(), + SparkCatalogConfig.HIVE.implementation(), + SparkCatalogConfig.HIVE.properties(), + 2 + }, + { + SparkCatalogConfig.HADOOP.catalogName(), + SparkCatalogConfig.HADOOP.implementation(), + SparkCatalogConfig.HADOOP.properties(), + 2 + }, + { + SparkCatalogConfig.SPARK_SESSION.catalogName(), + SparkCatalogConfig.SPARK_SESSION.implementation(), + SparkCatalogConfig.SPARK_SESSION.properties(), + 2 + } + }; + } + + @BeforeEach + public void createTable() { + sql( + "CREATE TABLE %s (id bigint, data string, float float) USING iceberg" + + " TBLPROPERTIES ('format-version'='%s')", + tableName, formatVersion); + sql( + "INSERT INTO %s VALUES (1, 'a', 1.0), (2, 'b', 2.0), (3, 'c', CAST('NaN' AS FLOAT))", + tableName); + } + + @AfterEach + public void removeTables() { + spark.conf().unset(SparkSQLProperties.AS_OF_TIMESTAMP); + sql("DROP TABLE IF EXISTS %s", tableName); + } + + @TestTemplate + public void testSessionPropertyTimeTravel() { + Table table = validationCatalog.loadTable(tableIdent); + long timestampBeforeAnySnapshots = 1L; + long snapshotTs = table.currentSnapshot().timestampMillis(); + long timestamp = waitUntilAfter(snapshotTs + 2); + + List expected = sql("SELECT * FROM %s ORDER BY id", tableName); + + // create a second snapshot + sql("INSERT INTO %s VALUES (4, 'd', 4.0), (5, 'e', 5.0)", tableName); + + // session property with a valid timestamp works for simple SQL queries + spark.conf().set(SparkSQLProperties.AS_OF_TIMESTAMP, String.valueOf(timestamp)); + List actualWithSessionProperty = sql("SELECT * FROM %s ORDER BY id", tableName); + assertEquals("Should time-travel using session property", expected, actualWithSessionProperty); + + // session property with a valid timestamp works for DataFrame reads + Dataset dfSession = spark.read().format("iceberg").load(tableName).orderBy("id"); + assertEquals( + "DataFrame should time-travel using session property", + expected, + rowsToJava(dfSession.collectAsList())); + + // session property with timestamp before any snapshots raises exception + spark + .conf() + .set(SparkSQLProperties.AS_OF_TIMESTAMP, String.valueOf(timestampBeforeAnySnapshots)); + assertThatThrownBy(() -> sql("SELECT * FROM %s", tableName)) + .hasMessageContaining("Cannot find a snapshot older than") + .isInstanceOf(IllegalArgumentException.class); + } + + @TestTemplate + public void testSessionPropertyWithTableSelectorThrowsException() { + Table table = validationCatalog.loadTable(tableIdent); + long snapshotId = table.currentSnapshot().snapshotId(); + String tagName = "test_tag"; + String branchName = "test_branch"; + + table.manageSnapshots().createTag(tagName, snapshotId).commit(); + table.manageSnapshots().createBranch(branchName, snapshotId).commit(); + + long snapshotTs = waitUntilAfter(table.currentSnapshot().timestampMillis() + 1000); + long timestampInSeconds = TimeUnit.MILLISECONDS.toSeconds(snapshotTs); + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + String formattedDate = sdf.format(new Date(snapshotTs)); + + // create a second snapshot + sql("INSERT INTO %s VALUES (4, 'd', 4.0), (5, 'e', 5.0)", tableName); + + // set session property so it is active for all assertions below + spark.conf().set(SparkSQLProperties.AS_OF_TIMESTAMP, String.valueOf(snapshotTs)); + + // snapshot_id_ SQL prefix combined with session property raises exception + assertThatThrownBy(() -> sql("SELECT * FROM %s.snapshot_id_%s", tableName, snapshotId)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining( + "Cannot set both snapshot-id and as-of-timestamp to select which table snapshot to scan"); + + // at_timestamp_ SQL prefix combined with session property raises exception + assertThatThrownBy(() -> sql("SELECT * FROM %s.at_timestamp_%s", tableName, snapshotTs)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining( + "Cannot set both snapshot-id and as-of-timestamp to select which table snapshot to scan"); + + // VERSION AS OF snapshot SQL combined with session property raises exception + assertThatThrownBy(() -> sql("SELECT * FROM %s VERSION AS OF %s", tableName, snapshotId)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining( + "Cannot set both snapshot-id and as-of-timestamp to select which table snapshot to scan"); + + // VERSION_AS_OF DataFrame option combined with session property raises exception + assertThatThrownBy( + () -> + spark + .read() + .format("iceberg") + .option(SparkReadOptions.VERSION_AS_OF, snapshotId) + .load(tableName) + .collectAsList()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining( + "Cannot set both snapshot-id and as-of-timestamp to select which table snapshot to scan"); + + // VERSION AS OF tag SQL combined with session property raises exception + assertThatThrownBy(() -> sql("SELECT * FROM %s VERSION AS OF '%s'", tableName, tagName)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining( + "Cannot set both snapshot-id and as-of-timestamp to select which table snapshot to scan"); + + if (!"spark_catalog".equals(catalogName)) { + // tag_ SQL prefix combined with session property raises exception + assertThatThrownBy(() -> sql("SELECT * FROM %s.tag_%s", tableName, tagName)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining( + "Cannot set both snapshot-id and as-of-timestamp to select which table snapshot to scan"); + } + + // VERSION AS OF branch SQL combined with session property raises exception + assertThatThrownBy(() -> sql("SELECT * FROM %s VERSION AS OF '%s'", tableName, branchName)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Cannot override ref, already set snapshot id=%s", snapshotId); + + if (!"spark_catalog".equals(catalogName)) { + // branch_ SQL prefix combined with session property raises exception + assertThatThrownBy(() -> sql("SELECT * FROM %s.branch_%s", tableName, branchName)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Cannot override ref, already set snapshot id=%s", snapshotId); + } + + // BRANCH DataFrame option combined with session property raises exception + assertThatThrownBy( + () -> + spark + .read() + .format("iceberg") + .option(SparkReadOptions.BRANCH, branchName) + .load(tableName) + .collectAsList()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Cannot override ref, already set snapshot id=%s", snapshotId); + + // TIMESTAMP AS OF SQL combined with session property raises exception + assertThatThrownBy( + () -> sql("SELECT * FROM %s TIMESTAMP AS OF %s", tableName, timestampInSeconds)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining( + "Cannot set both snapshot-id and as-of-timestamp to select which table snapshot to scan"); + + // TIMESTAMP_AS_OF DataFrame option combined with session property raises exception + assertThatThrownBy( + () -> + spark + .read() + .format("iceberg") + .option(SparkReadOptions.TIMESTAMP_AS_OF, formattedDate) + .load(tableName) + .collectAsList()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining( + "Cannot set both snapshot-id and as-of-timestamp to select which table snapshot to scan"); + } + + @TestTemplate + public void testSessionPropertyWithMultiTableJoin() { + String table1Name = tableName("table1"); + TableIdentifier table1Identifier = TableIdentifier.of(Namespace.of("default"), "table1"); + String table2Name = tableName("table2"); + TableIdentifier table2Identifier = TableIdentifier.of(Namespace.of("default"), "table2"); + + sql("CREATE OR REPLACE TABLE %s (id bigint, data string) USING iceberg", table1Name); + sql("INSERT INTO %s VALUES (1, 'a'), (2, 'b')", table1Name); + + sql("CREATE OR REPLACE TABLE %s (id bigint, value string) USING iceberg", table2Name); + sql("INSERT INTO %s VALUES (1, 'x'), (2, 'y')", table2Name); + + long timestamp1 = + waitUntilAfter( + validationCatalog.loadTable(table1Identifier).currentSnapshot().timestampMillis() + + 1000); + long timestamp2 = + waitUntilAfter( + validationCatalog.loadTable(table2Identifier).currentSnapshot().timestampMillis() + + 1000); + long timestampSnapshot1 = Long.max(timestamp1, timestamp2); + + List expectedJoinSnapshot1 = ImmutableList.of(row(1L, "a", "x"), row(2L, "b", "y")); + + sql("INSERT INTO %s VALUES (3, 'c'), (4, 'd')", table1Name); + sql("INSERT INTO %s VALUES (3, 'z'), (4, 'w')", table2Name); + + long timestamp3 = + waitUntilAfter( + validationCatalog.loadTable(table1Identifier).currentSnapshot().timestampMillis() + + 1000); + long timestamp4 = + waitUntilAfter( + validationCatalog.loadTable(table2Identifier).currentSnapshot().timestampMillis() + + 1000); + long timestampSnapshot2 = Long.max(timestamp3, timestamp4); + + List expectedJoinSnapshot2 = + ImmutableList.of( + row(1L, "a", "x"), row(2L, "b", "y"), row(3L, "c", "z"), row(4L, "d", "w")); + + List actual1 = + sql( + "SELECT t1.id, t1.data, t2.value FROM %s t1 FULL OUTER JOIN %s t2 ON t1.id = t2.id ORDER BY t1.id", + table1Name, table2Name); + assertEquals( + "Without session property, last snapshot for both table should be read.", + expectedJoinSnapshot2, + actual1); + + spark.conf().set(SparkSQLProperties.AS_OF_TIMESTAMP, String.valueOf(timestampSnapshot1)); + + List actual2 = + sql( + "SELECT t1.id, t1.data, t2.value FROM %s t1 FULL OUTER JOIN %s t2 ON t1.id = t2.id ORDER BY t1.id", + table1Name, table2Name); + assertEquals( + "Session property should apply to both tables in join", expectedJoinSnapshot1, actual2); + + assertThatThrownBy( + () -> + sql( + "SELECT t1.id, t1.data, t2.value FROM %s TIMESTAMP AS OF %s t1 FULL OUTER JOIN %s t2 ON t1.id = t2.id ORDER BY t1.id", + table1Name, timestampSnapshot2 / 1000, table2Name)) + .hasMessageContaining( + "Cannot set both snapshot-id and as-of-timestamp to select which table snapshot to scan") + .isInstanceOf(IllegalArgumentException.class); + + assertThatThrownBy( + () -> + sql( + "SELECT t2.id, t1.data, t2.value FROM %s t1 FULL OUTER JOIN %s TIMESTAMP AS OF %s t2 ON t1.id = t2.id ORDER BY t2.id", + table1Name, table2Name, timestampSnapshot2 / 1000)) + .hasMessageContaining( + "Cannot set both snapshot-id and as-of-timestamp to select which table snapshot to scan") + .isInstanceOf(IllegalArgumentException.class); + + sql("DROP TABLE IF EXISTS %s", table1Name); + sql("DROP TABLE IF EXISTS %s", table2Name); + } +} diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java index 161f09d53e2c..960020edea14 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java @@ -114,4 +114,8 @@ private SparkSQLProperties() {} public static final String ASYNC_MICRO_BATCH_PLANNING_ENABLED = "spark.sql.iceberg.async-micro-batch-planning-enabled"; public static final boolean ASYNC_MICRO_BATCH_PLANNING_ENABLED_DEFAULT = false; + + // Session-level time-travel property; a timestamp in milliseconds. + // The snapshot used will be the snapshot current at this time. + public static final String AS_OF_TIMESTAMP = "spark.sql.iceberg.read.as-of-timestamp"; } From 97493a0c48542103ed9104bc6b33040cbc450669 Mon Sep 17 00:00:00 2001 From: Gabriel Igliozzi Date: Wed, 13 May 2026 16:46:00 +0200 Subject: [PATCH 2/2] spotless --- .../main/java/org/apache/iceberg/spark/SparkSQLProperties.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java index 2e5c5bdc7e83..d23ab0277acb 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java @@ -118,7 +118,7 @@ private SparkSQLProperties() {} // Session-level time-travel property; a timestamp in milliseconds. // The snapshot used will be the snapshot current at this time. public static final String AS_OF_TIMESTAMP = "spark.sql.iceberg.read.as-of-timestamp"; - + // Controls whether to shred variant columns during write operations public static final String SHRED_VARIANTS = "spark.sql.iceberg.shred-variants";