diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoSchemaEvolutionExtraSQLTests.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoSchemaEvolutionExtraSQLTests.scala index 8565c0b31c0c..caa7f878708a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoSchemaEvolutionExtraSQLTests.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoSchemaEvolutionExtraSQLTests.scala @@ -95,6 +95,60 @@ trait MergeIntoSchemaEvolutionExtraSQLTests extends RowLevelOperationSuiteBase { } } + Seq(true, false).foreach { withSchemaEvolution => + test(s"source is subquery with projection and alias - INSERT * -" + + s" schema evolution=$withSchemaEvolution") { + withTempView("source") { + createAndInitTable( + "pk INT NOT NULL, salary INT, dep STRING", + """{ "pk": 1, "salary": 100, "dep": "hr" } + |{ "pk": 2, "salary": 200, "dep": "software" }""".stripMargin) + + Seq((3, 300), (4, 400)).toDF("pk", "salary") + .createOrReplaceTempView("source") + + val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA EVOLUTION" else "" + val mergeStmt = + s"""MERGE $schemaEvolutionClause + |INTO $tableNameAsString t + |USING (SELECT pk, salary, 'new' AS dep, true AS active FROM source) AS s + |ON t.pk = s.pk + |WHEN NOT MATCHED THEN INSERT * + |""".stripMargin + + if (withSchemaEvolution) { + sql(mergeStmt) + val result = sql(s"SELECT * FROM $tableNameAsString") + checkAnswer(result, + Seq( + Row(1, 100, "hr", null), + Row(2, 200, "software", null), + Row(3, 300, "new", true), + Row(4, 400, "new", true))) + assert(result.schema === StructType(Seq( + StructField("pk", IntegerType, nullable = false), + StructField("salary", IntegerType), + StructField("dep", StringType), + StructField("active", BooleanType)))) + } else { + sql(mergeStmt) + val result = sql(s"SELECT * FROM $tableNameAsString") + checkAnswer(result, + Seq( + Row(1, 100, "hr"), + Row(2, 200, "software"), + Row(3, 300, "new"), + Row(4, 400, "new"))) + assert(result.schema === StructType(Seq( + StructField("pk", IntegerType, nullable = false), + StructField("salary", IntegerType), + StructField("dep", StringType)))) + } + } + sql(s"DROP TABLE IF EXISTS $tableNameAsString") + } + } + test("error when catalog ignores schema changes in MERGE WITH SCHEMA EVOLUTION") { spark.conf.set("spark.sql.catalog.cat", classOf[PartialSchemaEvolutionCatalog].getName) spark.sessionState.catalogManager.reset()