From 824aceb219fe4b0f0637d4e3bac47f2c07665180 Mon Sep 17 00:00:00 2001 From: Adrien Boitreaud Date: Fri, 27 Mar 2026 14:08:23 +0100 Subject: [PATCH 1/5] Track Spark SQL analysis failures to mark application spans as ERROR Add lastSqlFailed tracking to AbstractDatadogSparkListener when SQL calls (e.g. SparkSession.sql()) throw exceptions during Catalyst analysis, before any Spark job is submitted. This ensures finishApplication() can mark the application span as ERROR even when no job/stage/task events fire. The error priority in finishApplication() is: throwable (from caller) > exitCode != 0 > lastJobFailed > lastSqlFailed Add unit tests to verify SQL failures mark application spans as ERROR, and that job failures take precedence over SQL failures. Fixes: Spark application traces marked SUCCESS when SQL analysis fails --- .../spark/AbstractDatadogSparkListener.java | 25 +++++++ .../spark/AbstractSparkListenerTest.groovy | 69 +++++++++++++++++++ 2 files changed, 94 insertions(+) diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java index 7fe4acf402e..9d6a53ca469 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java +++ b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java @@ -139,6 +139,9 @@ public abstract class AbstractDatadogSparkListener extends SparkListener { private boolean lastJobFailed = false; private String lastJobFailedMessage; private String lastJobFailedStackTrace; + private boolean lastSqlFailed = false; + private String lastSqlFailedMessage; + private String lastSqlFailedStackTrace; private int jobCount = 0; private int currentExecutorCount = 0; private int maxExecutorCount = 0; @@ -310,6 +313,23 @@ private void captureOpenlineageContextIfPresent( builder.withTag("openlineage_root_parent_run_id", context.getRootParentRunId()); } + /** + * Called by SparkSqlFailureAdvice when a SQL call (e.g. SparkSession.sql()) throws an exception + * during Catalyst analysis, before any Spark job is submitted. This ensures finishApplication() + * has an error signal even when no job/stage/task events fire. + */ + public synchronized void onSqlFailure(Throwable throwable) { + if (applicationEnded) { + return; + } + lastSqlFailed = true; + lastSqlFailedMessage = throwable.getMessage(); + + StringWriter sw = new StringWriter(); + throwable.printStackTrace(new PrintWriter(sw)); + lastSqlFailedStackTrace = sw.toString(); + } + @Override public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) { log.info( @@ -356,6 +376,11 @@ public synchronized void finishApplication( applicationSpan.setTag(DDTags.ERROR_TYPE, "Spark Application Failed"); applicationSpan.setTag(DDTags.ERROR_MSG, lastJobFailedMessage); applicationSpan.setTag(DDTags.ERROR_STACK, lastJobFailedStackTrace); + } else if (lastSqlFailed) { + applicationSpan.setError(true); + applicationSpan.setTag(DDTags.ERROR_TYPE, "Spark SQL Failed"); + applicationSpan.setTag(DDTags.ERROR_MSG, lastSqlFailedMessage); + applicationSpan.setTag(DDTags.ERROR_STACK, lastSqlFailedStackTrace); } applicationMetrics.setSpanMetrics(applicationSpan); diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkListenerTest.groovy b/dd-java-agent/instrumentation/spark/spark-common/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkListenerTest.groovy index 16bd10d6ec1..0ccb4071659 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkListenerTest.groovy +++ b/dd-java-agent/instrumentation/spark/spark-common/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkListenerTest.groovy @@ -522,6 +522,75 @@ abstract class AbstractSparkListenerTest extends InstrumentationSpecification { } } + def "test SQL analysis failure marks application span as error"() { + setup: + def listener = getTestDatadogSparkListener() + listener.onApplicationStart(applicationStartEvent(1000L)) + + // Simulate a SQL failure during Catalyst analysis (before any Spark job is submitted) + def analysisException = new RuntimeException("[TABLE_OR_VIEW_NOT_FOUND] The table or view `missing_table` cannot be found.") + listener.onSqlFailure(analysisException) + + listener.onApplicationEnd(new SparkListenerApplicationEnd(2000L)) + + expect: + assertTraces(1) { + trace(1) { + span { + operationName "spark.application" + spanType "spark" + errored true + parent() + assert span.tags["error.type"] == "Spark SQL Failed" + assert span.tags["error.message"] == "[TABLE_OR_VIEW_NOT_FOUND] The table or view `missing_table` cannot be found." + assert span.tags["error.stack"] != null + } + } + } + } + + def "test SQL analysis failure does not override job failure"() { + setup: + def listener = getTestDatadogSparkListener() + listener.onApplicationStart(applicationStartEvent(1000L)) + + // SQL failure happens first + listener.onSqlFailure(new RuntimeException("SQL error")) + + // Then a job runs and fails — job failure should take precedence + listener.onJobStart(jobStartEvent(1, 1500L, [1])) + listener.onStageSubmitted(stageSubmittedEvent(1, 1500L)) + listener.onStageCompleted(stageCompletedEvent(1, 1800L)) + listener.onJobEnd(jobFailedEvent(1, 2000L, "Job aborted due to NullPointerException")) + + listener.onApplicationEnd(new SparkListenerApplicationEnd(3000L)) + + expect: + assertTraces(1) { + trace(3) { + span { + operationName "spark.application" + spanType "spark" + errored true + parent() + // Job failure should take precedence over SQL failure + assert span.tags["error.type"] == "Spark Application Failed" + } + span { + operationName "spark.job" + spanType "spark" + errored true + childOf(span(0)) + } + span { + operationName "spark.stage" + spanType "spark" + childOf(span(1)) + } + } + } + } + def "test setupOpenLineage gets service name"(boolean serviceNameSetByUser, String serviceName, String sparkAppName) { setup: SparkConf sparkConf = new SparkConf() From 0910238ee1502ac9e161401d9a5df21af340c56f Mon Sep 17 00:00:00 2001 From: Adrien Boitreaud Date: Fri, 27 Mar 2026 14:08:38 +0100 Subject: [PATCH 2/5] Instrument SparkSession.sql() to capture analysis failures Add SparkSqlFailureAdvice that intercepts SparkSession.sql() method calls and propagates any exceptions (e.g. AnalysisException) to the listener via the new onSqlFailure() callback. This ensures SQL analysis failures that occur before any Spark job is submitted are captured and can be reported as ERROR in the application span. --- .../spark/AbstractSparkInstrumentation.java | 21 ++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkInstrumentation.java b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkInstrumentation.java index ea2c1080a06..0afe0db4721 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkInstrumentation.java +++ b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkInstrumentation.java @@ -39,7 +39,8 @@ public String[] knownMatchingTypes() { "org.apache.spark.util.Utils", "org.apache.spark.util.SparkClassUtils", "org.apache.spark.scheduler.LiveListenerBus", - "org.apache.spark.sql.execution.SparkPlanInfo$" + "org.apache.spark.sql.execution.SparkPlanInfo$", + "org.apache.spark.sql.SparkSession" }; } @@ -67,6 +68,15 @@ public void methodAdvice(MethodTransformer transformer) { .and(isDeclaredBy(named("org.apache.spark.deploy.yarn.ApplicationMaster"))), AbstractSparkInstrumentation.class.getName() + "$YarnFinishAdvice"); + // SparkSession.sql(String, ...) — catch AnalysisException failures that fire during Catalyst + // analysis before any Spark job is submitted and are invisible to the listener bus + transformer.applyAdvice( + isMethod() + .and(named("sql")) + .and(takesArgument(0, String.class)) + .and(isDeclaredBy(named("org.apache.spark.sql.SparkSession"))), + AbstractSparkInstrumentation.class.getName() + "$SparkSqlFailureAdvice"); + // LiveListenerBus class is used to manage spark listeners transformer.applyAdvice( isMethod() @@ -122,6 +132,15 @@ public static void enter(@Advice.Argument(1) int exitCode, @Advice.Argument(2) S } } + public static class SparkSqlFailureAdvice { + @Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class) + public static void exit(@Advice.Thrown Throwable throwable) { + if (throwable != null && AbstractDatadogSparkListener.listener != null) { + AbstractDatadogSparkListener.listener.onSqlFailure(throwable); + } + } + } + public static class LiveListenerBusAdvice { @Advice.OnMethodEnter(suppress = Throwable.class, skipOn = Advice.OnNonDefaultValue.class) // If OL is disabled in tracer config but user set it up manually don't interfere From 53b46db7b08ebea56181f043fc5f08869dd7beea Mon Sep 17 00:00:00 2001 From: Adrien Boitreaud Date: Fri, 27 Mar 2026 14:57:37 +0100 Subject: [PATCH 3/5] reset on success --- .../instrumentation/spark/AbstractDatadogSparkListener.java | 1 + 1 file changed, 1 insertion(+) diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java index 9d6a53ca469..08adbfb1cbc 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java +++ b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java @@ -569,6 +569,7 @@ public synchronized void onJobEnd(SparkListenerJobEnd jobEnd) { } } else { lastJobFailed = false; + lastSqlFailed = false; } SparkAggregatedTaskMetrics metrics = jobMetrics.remove(jobEnd.jobId()); From 28234053ea564bf72b5e10af6913ff320b4642e2 Mon Sep 17 00:00:00 2001 From: Adrien Boitreaud Date: Mon, 30 Mar 2026 12:30:41 +0200 Subject: [PATCH 4/5] Test app status when SQL analysis fails --- .../spark/AbstractSparkTest.groovy | 31 +++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkTest.groovy b/dd-java-agent/instrumentation/spark/spark-common/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkTest.groovy index c39f42f473c..700a086c581 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkTest.groovy +++ b/dd-java-agent/instrumentation/spark/spark-common/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkTest.groovy @@ -193,6 +193,37 @@ abstract class AbstractSparkTest extends InstrumentationSpecification { } } + def "sql analysis failure on missing table marks application span as error"() { + setup: + def sparkSession = SparkSession.builder() + .config("spark.master", "local[2]") + .getOrCreate() + + try { + sparkSession.sql("SELECT * FROM missing_table").show() + } catch (Exception ignored) { + // Expected: AnalysisException thrown by Catalyst before any Spark job is submitted + } + sparkSession.stop() + + expect: + assertTraces(1) { + trace(1) { + span { + operationName "spark.application" + resourceName "spark.application" + spanType "spark" + errored true + parent() + assert span.tags["error.type"] == "Spark SQL Failed" + assert span.tags["error.message"] =~ /(?i).*missing_table.*/ + assert span.tags["error.stack"] =~ /(?s).*AnalysisException.*/ + } + } + } + + } + def "capture SparkSubmit.runMain() errors"() { setup: def sparkSession = SparkSession.builder() From f497183167b4b7e59a86a3851acec16b5c16148b Mon Sep 17 00:00:00 2001 From: Adrien Boitreaud Date: Tue, 31 Mar 2026 10:04:52 +0200 Subject: [PATCH 5/5] fix spotless --- .../datadog/trace/instrumentation/spark/AbstractSparkTest.groovy | 1 - 1 file changed, 1 deletion(-) diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkTest.groovy b/dd-java-agent/instrumentation/spark/spark-common/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkTest.groovy index 700a086c581..9b551da2fa5 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkTest.groovy +++ b/dd-java-agent/instrumentation/spark/spark-common/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkTest.groovy @@ -221,7 +221,6 @@ abstract class AbstractSparkTest extends InstrumentationSpecification { } } } - } def "capture SparkSubmit.runMain() errors"() {