Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions sql/api/src/main/scala/org/apache/spark/sql/functions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3168,6 +3168,32 @@ object functions {
struct((colName +: colNames).map(col): _*)
}

/**
* Returns true if the given struct is non-null and all of its fields are null.
* Returns null if the struct itself is null.
*
* This is useful for detecting "empty" structs produced by permissive deserialization
* handlers (e.g. Avro PermissiveRecordExceptionHandler) without resorting to expensive
* serialization like `to_json`.
*
* @group struct_funcs
* @since 4.1.0
*/
def is_struct_empty(e: Column): Column = Column.fn("is_struct_empty", e)

/**
* Returns true if the given struct is non-null and at least one field is non-null.
* Returns null if the struct itself is null.
*
* This is useful for filtering out "empty" structs produced by permissive deserialization
* handlers (e.g. Avro PermissiveRecordExceptionHandler) without resorting to expensive
* serialization like `to_json`.
*
* @group struct_funcs
* @since 4.1.0
*/
def is_struct_non_empty(e: Column): Column = Column.fn("is_struct_non_empty", e)

/**
* Evaluates a list of conditions and returns one of multiple possible result expressions. If
* otherwise is not defined at the end, null is returned for unmatched conditions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -832,6 +832,8 @@ object FunctionRegistry {
expression[Get]("get"),

CreateStruct.registryEntry,
expression[IsStructEmpty]("is_struct_empty"),
expression[IsStructNonEmpty]("is_struct_non_empty"),

// misc functions
expression[AssertTrue]("assert_true"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -832,3 +832,230 @@ object UpdateFields {
}
}
}

// scalastyle:off line.size.limit
/**
* Returns true if a non-null struct has all null fields.
*
* This is useful in Structured Streaming pipelines where deserialization failure handlers
* (e.g. Avro PermissiveRecordExceptionHandler) produce structs that are non-null but have
* all fields set to null. This expression provides an efficient, schema-agnostic way to
* detect such "empty" structs without resorting to serialization (e.g. to_json).
*
* Semantics:
* - NULL struct -> NULL
* - Non-null struct with all null fields -> TRUE
* - Non-null struct with at least one non-null field -> FALSE
* - Non-null struct with zero fields (empty schema) -> TRUE (vacuously empty)
*
* Only performs a shallow check: nested structs that are themselves non-null (even if all
* their children are null) count as non-null at the parent level.
*/
// scalastyle:on line.size.limit
@ExpressionDescription(
usage = """
_FUNC_(struct) - Returns true if the struct is non-null and all of its fields are null.
Returns null if the struct itself is null.
""",
arguments = """
Arguments:
* struct - a STRUCT expression
""",
examples = """
Examples:
> SELECT _FUNC_(named_struct('a', 1, 'b', 2));
false
> SELECT _FUNC_(named_struct('a', CAST(NULL AS INT), 'b', CAST(NULL AS STRING)));
true
> SELECT _FUNC_(CAST(NULL AS STRUCT<a: INT, b: STRING>));
NULL
""",
since = "4.1.0",
group = "struct_funcs")
case class IsStructEmpty(child: Expression)
extends UnaryExpression with Predicate with ExpectsInputTypes {

override def inputTypes: Seq[AbstractDataType] = Seq(StructType)

// Returns null when the input struct is null
override def nullable: Boolean = child.nullable
override def nullIntolerant: Boolean = true

@transient
private lazy val numFields: Int = child.dataType.asInstanceOf[StructType].fields.length

override def prettyName: String = "is_struct_empty"

override def eval(input: InternalRow): Any = {
val value = child.eval(input)
if (value == null) {
null
} else {
val row = value.asInstanceOf[InternalRow]
var i = 0
while (i < numFields) {
if (!row.isNullAt(i)) return false
i += 1
}
true
}
}

override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
val childGen = child.genCode(ctx)
val structEval = childGen.value

if (numFields == 0) {
// Zero-field struct: vacuously empty
ev.copy(code = code"""
${childGen.code}
boolean ${ev.isNull} = ${childGen.isNull};
boolean ${ev.value} = true;
""")
} else {
val result = ctx.freshName("isStructEmpty")
// For narrow structs, unroll the AND chain to avoid loop overhead.
// For wide structs, use a loop with early exit.
val fieldChecks = if (numFields <= 8) {
val checks = (0 until numFields).map(i => s"$structEval.isNullAt($i)")
s"boolean $result = ${checks.mkString(" && ")};"
} else {
val idx = ctx.freshName("i")
s"""
boolean $result = true;
for (int $idx = 0; $idx < $numFields; $idx++) {
if (!$structEval.isNullAt($idx)) {
$result = false;
break;
}
}
"""
}

ev.copy(code = code"""
${childGen.code}
boolean ${ev.isNull} = ${childGen.isNull};
boolean ${ev.value} = false;
if (!${ev.isNull}) {
$fieldChecks
${ev.value} = $result;
}
""")
}
}

override def sql: String = s"${prettyName}(${child.sql})"

override protected def withNewChildInternal(newChild: Expression): IsStructEmpty =
copy(child = newChild)
}

// scalastyle:off line.size.limit
/**
* Returns true if a non-null struct has at least one non-null field.
*
* This is the logical complement of [[IsStructEmpty]] for non-null inputs.
* When the input struct is NULL, both functions return NULL (three-valued SQL logic).
*
* Semantics:
* - NULL struct -> NULL
* - Non-null struct with all null fields -> FALSE
* - Non-null struct with at least one non-null field -> TRUE
* - Non-null struct with zero fields (empty schema) -> FALSE (vacuously empty)
*/
// scalastyle:on line.size.limit
@ExpressionDescription(
usage = """
_FUNC_(struct) - Returns true if the struct is non-null and at least one field is non-null.
Returns null if the struct itself is null.
""",
arguments = """
Arguments:
* struct - a STRUCT expression
""",
examples = """
Examples:
> SELECT _FUNC_(named_struct('a', 1, 'b', 2));
true
> SELECT _FUNC_(named_struct('a', CAST(NULL AS INT), 'b', CAST(NULL AS STRING)));
false
> SELECT _FUNC_(CAST(NULL AS STRUCT<a: INT, b: STRING>));
NULL
""",
since = "4.1.0",
group = "struct_funcs")
case class IsStructNonEmpty(child: Expression)
extends UnaryExpression with Predicate with ExpectsInputTypes {

override def inputTypes: Seq[AbstractDataType] = Seq(StructType)

// Returns null when the input struct is null
override def nullable: Boolean = child.nullable
override def nullIntolerant: Boolean = true

@transient
private lazy val numFields: Int = child.dataType.asInstanceOf[StructType].fields.length

override def prettyName: String = "is_struct_non_empty"

override def eval(input: InternalRow): Any = {
val value = child.eval(input)
if (value == null) {
null
} else {
val row = value.asInstanceOf[InternalRow]
var i = 0
while (i < numFields) {
if (!row.isNullAt(i)) return true
i += 1
}
false
}
}

override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
val childGen = child.genCode(ctx)
val structEval = childGen.value

if (numFields == 0) {
// Zero-field struct: vacuously empty -> non_empty is false
ev.copy(code = code"""
${childGen.code}
boolean ${ev.isNull} = ${childGen.isNull};
boolean ${ev.value} = false;
""")
} else {
val result = ctx.freshName("isStructNonEmpty")
val fieldChecks = if (numFields <= 8) {
val checks = (0 until numFields).map(i => s"!$structEval.isNullAt($i)")
s"boolean $result = ${checks.mkString(" || ")};"
} else {
val idx = ctx.freshName("i")
s"""
boolean $result = false;
for (int $idx = 0; $idx < $numFields; $idx++) {
if (!$structEval.isNullAt($idx)) {
$result = true;
break;
}
}
"""
}

ev.copy(code = code"""
${childGen.code}
boolean ${ev.isNull} = ${childGen.isNull};
boolean ${ev.value} = false;
if (!${ev.isNull}) {
$fieldChecks
${ev.value} = $result;
}
""")
}
}

override def sql: String = s"${prettyName}(${child.sql})"

override protected def withNewChildInternal(newChild: Expression): IsStructNonEmpty =
copy(child = newChild)
}
Loading