diff --git a/sql/api/src/main/scala/org/apache/spark/sql/functions.scala b/sql/api/src/main/scala/org/apache/spark/sql/functions.scala index fce3662c36674..245c3a2ca1233 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/functions.scala @@ -3168,6 +3168,32 @@ object functions { struct((colName +: colNames).map(col): _*) } + /** + * Returns true if the given struct is non-null and all of its fields are null. + * Returns null if the struct itself is null. + * + * This is useful for detecting "empty" structs produced by permissive deserialization + * handlers (e.g. Avro PermissiveRecordExceptionHandler) without resorting to expensive + * serialization like `to_json`. + * + * @group struct_funcs + * @since 4.1.0 + */ + def is_struct_empty(e: Column): Column = Column.fn("is_struct_empty", e) + + /** + * Returns true if the given struct is non-null and at least one field is non-null. + * Returns null if the struct itself is null. + * + * This is useful for filtering out "empty" structs produced by permissive deserialization + * handlers (e.g. Avro PermissiveRecordExceptionHandler) without resorting to expensive + * serialization like `to_json`. + * + * @group struct_funcs + * @since 4.1.0 + */ + def is_struct_non_empty(e: Column): Column = Column.fn("is_struct_non_empty", e) + /** * Evaluates a list of conditions and returns one of multiple possible result expressions. If * otherwise is not defined at the end, null is returned for unmatched conditions. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index cadb7750c2460..8f018adc68baf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -832,6 +832,8 @@ object FunctionRegistry { expression[Get]("get"), CreateStruct.registryEntry, + expression[IsStructEmpty]("is_struct_empty"), + expression[IsStructNonEmpty]("is_struct_non_empty"), // misc functions expression[AssertTrue]("assert_true"), diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala index b062d9430b83b..244e3e677d2ec 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala @@ -832,3 +832,230 @@ object UpdateFields { } } } + +// scalastyle:off line.size.limit +/** + * Returns true if a non-null struct has all null fields. + * + * This is useful in Structured Streaming pipelines where deserialization failure handlers + * (e.g. Avro PermissiveRecordExceptionHandler) produce structs that are non-null but have + * all fields set to null. This expression provides an efficient, schema-agnostic way to + * detect such "empty" structs without resorting to serialization (e.g. to_json). + * + * Semantics: + * - NULL struct -> NULL + * - Non-null struct with all null fields -> TRUE + * - Non-null struct with at least one non-null field -> FALSE + * - Non-null struct with zero fields (empty schema) -> TRUE (vacuously empty) + * + * Only performs a shallow check: nested structs that are themselves non-null (even if all + * their children are null) count as non-null at the parent level. + */ +// scalastyle:on line.size.limit +@ExpressionDescription( + usage = """ + _FUNC_(struct) - Returns true if the struct is non-null and all of its fields are null. + Returns null if the struct itself is null. + """, + arguments = """ + Arguments: + * struct - a STRUCT expression + """, + examples = """ + Examples: + > SELECT _FUNC_(named_struct('a', 1, 'b', 2)); + false + > SELECT _FUNC_(named_struct('a', CAST(NULL AS INT), 'b', CAST(NULL AS STRING))); + true + > SELECT _FUNC_(CAST(NULL AS STRUCT)); + NULL + """, + since = "4.1.0", + group = "struct_funcs") +case class IsStructEmpty(child: Expression) + extends UnaryExpression with Predicate with ExpectsInputTypes { + + override def inputTypes: Seq[AbstractDataType] = Seq(StructType) + + // Returns null when the input struct is null + override def nullable: Boolean = child.nullable + override def nullIntolerant: Boolean = true + + @transient + private lazy val numFields: Int = child.dataType.asInstanceOf[StructType].fields.length + + override def prettyName: String = "is_struct_empty" + + override def eval(input: InternalRow): Any = { + val value = child.eval(input) + if (value == null) { + null + } else { + val row = value.asInstanceOf[InternalRow] + var i = 0 + while (i < numFields) { + if (!row.isNullAt(i)) return false + i += 1 + } + true + } + } + + override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { + val childGen = child.genCode(ctx) + val structEval = childGen.value + + if (numFields == 0) { + // Zero-field struct: vacuously empty + ev.copy(code = code""" + ${childGen.code} + boolean ${ev.isNull} = ${childGen.isNull}; + boolean ${ev.value} = true; + """) + } else { + val result = ctx.freshName("isStructEmpty") + // For narrow structs, unroll the AND chain to avoid loop overhead. + // For wide structs, use a loop with early exit. + val fieldChecks = if (numFields <= 8) { + val checks = (0 until numFields).map(i => s"$structEval.isNullAt($i)") + s"boolean $result = ${checks.mkString(" && ")};" + } else { + val idx = ctx.freshName("i") + s""" + boolean $result = true; + for (int $idx = 0; $idx < $numFields; $idx++) { + if (!$structEval.isNullAt($idx)) { + $result = false; + break; + } + } + """ + } + + ev.copy(code = code""" + ${childGen.code} + boolean ${ev.isNull} = ${childGen.isNull}; + boolean ${ev.value} = false; + if (!${ev.isNull}) { + $fieldChecks + ${ev.value} = $result; + } + """) + } + } + + override def sql: String = s"${prettyName}(${child.sql})" + + override protected def withNewChildInternal(newChild: Expression): IsStructEmpty = + copy(child = newChild) +} + +// scalastyle:off line.size.limit +/** + * Returns true if a non-null struct has at least one non-null field. + * + * This is the logical complement of [[IsStructEmpty]] for non-null inputs. + * When the input struct is NULL, both functions return NULL (three-valued SQL logic). + * + * Semantics: + * - NULL struct -> NULL + * - Non-null struct with all null fields -> FALSE + * - Non-null struct with at least one non-null field -> TRUE + * - Non-null struct with zero fields (empty schema) -> FALSE (vacuously empty) + */ +// scalastyle:on line.size.limit +@ExpressionDescription( + usage = """ + _FUNC_(struct) - Returns true if the struct is non-null and at least one field is non-null. + Returns null if the struct itself is null. + """, + arguments = """ + Arguments: + * struct - a STRUCT expression + """, + examples = """ + Examples: + > SELECT _FUNC_(named_struct('a', 1, 'b', 2)); + true + > SELECT _FUNC_(named_struct('a', CAST(NULL AS INT), 'b', CAST(NULL AS STRING))); + false + > SELECT _FUNC_(CAST(NULL AS STRUCT)); + NULL + """, + since = "4.1.0", + group = "struct_funcs") +case class IsStructNonEmpty(child: Expression) + extends UnaryExpression with Predicate with ExpectsInputTypes { + + override def inputTypes: Seq[AbstractDataType] = Seq(StructType) + + // Returns null when the input struct is null + override def nullable: Boolean = child.nullable + override def nullIntolerant: Boolean = true + + @transient + private lazy val numFields: Int = child.dataType.asInstanceOf[StructType].fields.length + + override def prettyName: String = "is_struct_non_empty" + + override def eval(input: InternalRow): Any = { + val value = child.eval(input) + if (value == null) { + null + } else { + val row = value.asInstanceOf[InternalRow] + var i = 0 + while (i < numFields) { + if (!row.isNullAt(i)) return true + i += 1 + } + false + } + } + + override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { + val childGen = child.genCode(ctx) + val structEval = childGen.value + + if (numFields == 0) { + // Zero-field struct: vacuously empty -> non_empty is false + ev.copy(code = code""" + ${childGen.code} + boolean ${ev.isNull} = ${childGen.isNull}; + boolean ${ev.value} = false; + """) + } else { + val result = ctx.freshName("isStructNonEmpty") + val fieldChecks = if (numFields <= 8) { + val checks = (0 until numFields).map(i => s"!$structEval.isNullAt($i)") + s"boolean $result = ${checks.mkString(" || ")};" + } else { + val idx = ctx.freshName("i") + s""" + boolean $result = false; + for (int $idx = 0; $idx < $numFields; $idx++) { + if (!$structEval.isNullAt($idx)) { + $result = true; + break; + } + } + """ + } + + ev.copy(code = code""" + ${childGen.code} + boolean ${ev.isNull} = ${childGen.isNull}; + boolean ${ev.value} = false; + if (!${ev.isNull}) { + $fieldChecks + ${ev.value} = $result; + } + """) + } + } + + override def sql: String = s"${prettyName}(${child.sql})" + + override protected def withNewChildInternal(newChild: Expression): IsStructNonEmpty = + copy(child = newChild) +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ComplexTypeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ComplexTypeSuite.scala index 0c66372c1c3ad..7ac76c3f47f52 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ComplexTypeSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ComplexTypeSuite.scala @@ -658,6 +658,132 @@ class ComplexTypeSuite extends SparkFunSuite with ExpressionEvalHelper { assert(m1.semanticEquals(m2)) } + test("IsStructEmpty") { + // Basic: non-null struct with non-null fields -> false + val type1 = StructType(StructField("a", IntegerType) :: StructField("b", StringType) :: Nil) + val struct1 = Literal.create(create_row(1, "hello"), type1) + checkEvaluation(IsStructEmpty(struct1), false) + + // Non-null struct with all null fields -> true + val struct2 = Literal.create(create_row(null, null), type1) + checkEvaluation(IsStructEmpty(struct2), true) + + // Null struct -> null + val nullStruct = Literal.create(null, type1) + checkEvaluation(IsStructEmpty(nullStruct), null) + + // Mixed: some fields null, some non-null -> false + val struct3 = Literal.create(create_row(1, null), type1) + checkEvaluation(IsStructEmpty(struct3), false) + + val struct4 = Literal.create(create_row(null, "hello"), type1) + checkEvaluation(IsStructEmpty(struct4), false) + + // Single field struct, non-null -> false + val type2 = StructType(StructField("x", IntegerType) :: Nil) + val struct5 = Literal.create(create_row(42), type2) + checkEvaluation(IsStructEmpty(struct5), false) + + // Single field struct, null field -> true + val struct6 = Literal.create(create_row(null), type2) + checkEvaluation(IsStructEmpty(struct6), true) + + // Empty schema (zero fields) -> vacuously true + val emptyType = StructType(Nil) + val emptyStruct = Literal.create(create_row(), emptyType) + checkEvaluation(IsStructEmpty(emptyStruct), true) + + // Nested struct: inner struct is non-null (even if all its fields are null) -> false + val innerType = StructType(StructField("c", IntegerType) :: Nil) + val outerType = StructType(StructField("inner", innerType) :: Nil) + val nestedStruct = Literal.create(create_row(create_row(null)), outerType) + checkEvaluation(IsStructEmpty(nestedStruct), false) + + // Nested struct: inner struct is null -> true (parent's field is null) + val nestedNull = Literal.create(create_row(null), outerType) + checkEvaluation(IsStructEmpty(nestedNull), true) + + // Struct with array field: non-null empty array counts as non-null -> false + val typeWithArray = StructType(StructField("arr", ArrayType(IntegerType)) :: Nil) + val structWithEmptyArray = Literal.create(create_row(Seq.empty[Int]), typeWithArray) + checkEvaluation(IsStructEmpty(structWithEmptyArray), false) + + // Struct with map field: non-null empty map counts as non-null -> false + val typeWithMap = StructType(StructField("m", MapType(StringType, IntegerType)) :: Nil) + val structWithEmptyMap = Literal.create(create_row(Map.empty[String, Int]), typeWithMap) + checkEvaluation(IsStructEmpty(structWithEmptyMap), false) + + // Type check: non-struct input should fail + assert(IsStructEmpty(Literal(1)).checkInputDataTypes().isFailure) + assert(IsStructEmpty(Literal("hello")).checkInputDataTypes().isFailure) + + // Nullable semantics: nullable follows child nullability + assert(!IsStructEmpty(struct1).nullable) // non-null literal -> not nullable + assert(IsStructEmpty(nullStruct).nullable) // null literal -> nullable + assert(IsStructEmpty(struct1).dataType == BooleanType) + } + + test("IsStructNonEmpty") { + val type1 = StructType(StructField("a", IntegerType) :: StructField("b", StringType) :: Nil) + + // Non-null struct with non-null fields -> true + val struct1 = Literal.create(create_row(1, "hello"), type1) + checkEvaluation(IsStructNonEmpty(struct1), true) + + // Non-null struct with all null fields -> false + val struct2 = Literal.create(create_row(null, null), type1) + checkEvaluation(IsStructNonEmpty(struct2), false) + + // Null struct -> null + val nullStruct = Literal.create(null, type1) + checkEvaluation(IsStructNonEmpty(nullStruct), null) + + // Mixed -> true + val struct3 = Literal.create(create_row(1, null), type1) + checkEvaluation(IsStructNonEmpty(struct3), true) + + val struct4 = Literal.create(create_row(null, "hello"), type1) + checkEvaluation(IsStructNonEmpty(struct4), true) + + // Single field struct, non-null -> true + val type2 = StructType(StructField("x", IntegerType) :: Nil) + val struct5 = Literal.create(create_row(42), type2) + checkEvaluation(IsStructNonEmpty(struct5), true) + + // Single field struct, null field -> false + val struct6 = Literal.create(create_row(null), type2) + checkEvaluation(IsStructNonEmpty(struct6), false) + + // Empty schema (zero fields) -> false (vacuously empty) + val emptyType = StructType(Nil) + val emptyStruct = Literal.create(create_row(), emptyType) + checkEvaluation(IsStructNonEmpty(emptyStruct), false) + + // Nested struct: inner struct is non-null but all its fields null -> true + val innerType = StructType(StructField("c", IntegerType) :: Nil) + val outerType = StructType(StructField("inner", innerType) :: Nil) + val nestedStruct = Literal.create(create_row(create_row(null)), outerType) + checkEvaluation(IsStructNonEmpty(nestedStruct), true) + + // Nested struct: inner struct is null -> false + val nestedNull = Literal.create(create_row(null), outerType) + checkEvaluation(IsStructNonEmpty(nestedNull), false) + + // Complementarity: for non-null structs, isEmpty and isNonEmpty are logical inverses + checkEvaluation(IsStructEmpty(struct1), false) + checkEvaluation(IsStructNonEmpty(struct1), true) + checkEvaluation(IsStructEmpty(struct2), true) + checkEvaluation(IsStructNonEmpty(struct2), false) + + // Type check: non-struct input should fail + assert(IsStructNonEmpty(Literal(1)).checkInputDataTypes().isFailure) + + // Nullable semantics: nullable follows child nullability + assert(!IsStructNonEmpty(struct1).nullable) + assert(IsStructNonEmpty(nullStruct).nullable) + assert(IsStructNonEmpty(struct1).dataType == BooleanType) + } + test("SPARK-40315: Literals of ArrayBasedMapData should have deterministic hashCode.") { val keys = new Array[UTF8String](1) val values1 = new Array[UTF8String](1) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameComplexTypeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameComplexTypeSuite.scala index 984ff8fc51dd7..96bd100058079 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameComplexTypeSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameComplexTypeSuite.scala @@ -333,6 +333,100 @@ class DataFrameComplexTypeSuite extends QueryTest with SharedSparkSession { sourceDF.cache() checkResult() } + + test("is_struct_empty and is_struct_non_empty - Avro permissive handler") { + // Simulate what ABRiS PermissiveRecordExceptionHandler produces: + // - On success: a non-null struct with real field values + // - On deserialization failure: a non-null struct with ALL fields null + // - Possible: a null struct (if upstream produces null) + val avroSchema = StructType(Seq( + StructField("id", IntegerType, nullable = true), + StructField("name", StringType, nullable = true), + StructField("amount", DoubleType, nullable = true) + )) + val valueSchema = StructType(Seq( + StructField("value", avroSchema, nullable = true) + )) + + val rows = new java.util.ArrayList[Row]() + // Row 0: successful deserialization + rows.add(Row(Row(1, "alice", 100.0))) + // Row 1: failed deserialization -> GenericData.Record with all nulls + rows.add(Row(Row(null, null, null))) + // Row 2: successful deserialization + rows.add(Row(Row(2, "bob", 200.0))) + // Row 3: failed deserialization -> all nulls again + rows.add(Row(Row(null, null, null))) + // Row 4: null struct (e.g. tombstone record or upstream null) + rows.add(Row(null)) + // Row 5: partial nulls (valid record, some optional fields missing) + rows.add(Row(Row(3, null, null))) + + val df = spark.createDataFrame(rows, valueSchema) + + // -- Test is_struct_empty -- + checkAnswer( + df.select(is_struct_empty(col("value"))), + Seq(Row(false), Row(true), Row(false), Row(true), Row(null), Row(false)) + ) + + // -- Test is_struct_non_empty -- + checkAnswer( + df.select(is_struct_non_empty(col("value"))), + Seq(Row(true), Row(false), Row(true), Row(false), Row(null), Row(true)) + ) + + // -- The real use case: filter out malformed records -- + // New approach: is_struct_non_empty (no serialization) + val filtered = df + .filter(col("value").isNotNull && is_struct_non_empty(col("value"))) + .select("value.*") + + checkAnswer( + filtered, + Seq(Row(1, "alice", 100.0), Row(2, "bob", 200.0), Row(3, null, null)) + ) + + // Old approach with to_json should produce the same results + val filteredOld = df + .filter(col("value").isNotNull.and(to_json(col("value")) =!= "{}")) + .select("value.*") + + checkAnswer(filteredOld, filtered.collect().toSeq) + + // -- SQL syntax works too -- + df.createOrReplaceTempView("kafka_messages") + checkAnswer( + spark.sql( + """SELECT value.* + |FROM kafka_messages + |WHERE value IS NOT NULL AND is_struct_non_empty(value)""".stripMargin), + Seq(Row(1, "alice", 100.0), Row(2, "bob", 200.0), Row(3, null, null)) + ) + } + + test("is_struct_empty - wide struct triggers loop codegen path") { + // Structs with > 8 fields use the loop codegen path instead of unrolled AND chain. + // This tests that path with a 12-field struct. + val fields = (1 to 12).map(i => StructField(s"f$i", IntegerType, nullable = true)) + val wideSchema = StructType(Seq(StructField("s", StructType(fields), nullable = true))) + + val allNull = Row((1 to 12).map(_ => null): _*) + val firstNonNull = Row(Seq(42) ++ (2 to 12).map(_ => null): _*) + val lastNonNull = Row((1 to 11).map(_ => null) ++ Seq(99): _*) + + val rows = new java.util.ArrayList[Row]() + rows.add(Row(allNull)) + rows.add(Row(firstNonNull)) + rows.add(Row(lastNonNull)) + + val df = spark.createDataFrame(rows, wideSchema) + + checkAnswer( + df.select(is_struct_empty(col("s")), is_struct_non_empty(col("s"))), + Seq(Row(true, false), Row(false, true), Row(false, true)) + ) + } } class S100(