diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java index 7fe4acf402e..08adbfb1cbc 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java +++ b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java @@ -139,6 +139,9 @@ public abstract class AbstractDatadogSparkListener extends SparkListener { private boolean lastJobFailed = false; private String lastJobFailedMessage; private String lastJobFailedStackTrace; + private boolean lastSqlFailed = false; + private String lastSqlFailedMessage; + private String lastSqlFailedStackTrace; private int jobCount = 0; private int currentExecutorCount = 0; private int maxExecutorCount = 0; @@ -310,6 +313,23 @@ private void captureOpenlineageContextIfPresent( builder.withTag("openlineage_root_parent_run_id", context.getRootParentRunId()); } + /** + * Called by SparkSqlFailureAdvice when a SQL call (e.g. SparkSession.sql()) throws an exception + * during Catalyst analysis, before any Spark job is submitted. This ensures finishApplication() + * has an error signal even when no job/stage/task events fire. + */ + public synchronized void onSqlFailure(Throwable throwable) { + if (applicationEnded) { + return; + } + lastSqlFailed = true; + lastSqlFailedMessage = throwable.getMessage(); + + StringWriter sw = new StringWriter(); + throwable.printStackTrace(new PrintWriter(sw)); + lastSqlFailedStackTrace = sw.toString(); + } + @Override public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) { log.info( @@ -356,6 +376,11 @@ public synchronized void finishApplication( applicationSpan.setTag(DDTags.ERROR_TYPE, "Spark Application Failed"); applicationSpan.setTag(DDTags.ERROR_MSG, lastJobFailedMessage); applicationSpan.setTag(DDTags.ERROR_STACK, lastJobFailedStackTrace); + } else if (lastSqlFailed) { + applicationSpan.setError(true); + applicationSpan.setTag(DDTags.ERROR_TYPE, "Spark SQL Failed"); + applicationSpan.setTag(DDTags.ERROR_MSG, lastSqlFailedMessage); + applicationSpan.setTag(DDTags.ERROR_STACK, lastSqlFailedStackTrace); } applicationMetrics.setSpanMetrics(applicationSpan); @@ -544,6 +569,7 @@ public synchronized void onJobEnd(SparkListenerJobEnd jobEnd) { } } else { lastJobFailed = false; + lastSqlFailed = false; } SparkAggregatedTaskMetrics metrics = jobMetrics.remove(jobEnd.jobId()); diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkInstrumentation.java b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkInstrumentation.java index ea2c1080a06..0afe0db4721 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkInstrumentation.java +++ b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkInstrumentation.java @@ -39,7 +39,8 @@ public String[] knownMatchingTypes() { "org.apache.spark.util.Utils", "org.apache.spark.util.SparkClassUtils", "org.apache.spark.scheduler.LiveListenerBus", - "org.apache.spark.sql.execution.SparkPlanInfo$" + "org.apache.spark.sql.execution.SparkPlanInfo$", + "org.apache.spark.sql.SparkSession" }; } @@ -67,6 +68,15 @@ public void methodAdvice(MethodTransformer transformer) { .and(isDeclaredBy(named("org.apache.spark.deploy.yarn.ApplicationMaster"))), AbstractSparkInstrumentation.class.getName() + "$YarnFinishAdvice"); + // SparkSession.sql(String, ...) — catch AnalysisException failures that fire during Catalyst + // analysis before any Spark job is submitted and are invisible to the listener bus + transformer.applyAdvice( + isMethod() + .and(named("sql")) + .and(takesArgument(0, String.class)) + .and(isDeclaredBy(named("org.apache.spark.sql.SparkSession"))), + AbstractSparkInstrumentation.class.getName() + "$SparkSqlFailureAdvice"); + // LiveListenerBus class is used to manage spark listeners transformer.applyAdvice( isMethod() @@ -122,6 +132,15 @@ public static void enter(@Advice.Argument(1) int exitCode, @Advice.Argument(2) S } } + public static class SparkSqlFailureAdvice { + @Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class) + public static void exit(@Advice.Thrown Throwable throwable) { + if (throwable != null && AbstractDatadogSparkListener.listener != null) { + AbstractDatadogSparkListener.listener.onSqlFailure(throwable); + } + } + } + public static class LiveListenerBusAdvice { @Advice.OnMethodEnter(suppress = Throwable.class, skipOn = Advice.OnNonDefaultValue.class) // If OL is disabled in tracer config but user set it up manually don't interfere diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkListenerTest.groovy b/dd-java-agent/instrumentation/spark/spark-common/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkListenerTest.groovy index 16bd10d6ec1..0ccb4071659 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkListenerTest.groovy +++ b/dd-java-agent/instrumentation/spark/spark-common/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkListenerTest.groovy @@ -522,6 +522,75 @@ abstract class AbstractSparkListenerTest extends InstrumentationSpecification { } } + def "test SQL analysis failure marks application span as error"() { + setup: + def listener = getTestDatadogSparkListener() + listener.onApplicationStart(applicationStartEvent(1000L)) + + // Simulate a SQL failure during Catalyst analysis (before any Spark job is submitted) + def analysisException = new RuntimeException("[TABLE_OR_VIEW_NOT_FOUND] The table or view `missing_table` cannot be found.") + listener.onSqlFailure(analysisException) + + listener.onApplicationEnd(new SparkListenerApplicationEnd(2000L)) + + expect: + assertTraces(1) { + trace(1) { + span { + operationName "spark.application" + spanType "spark" + errored true + parent() + assert span.tags["error.type"] == "Spark SQL Failed" + assert span.tags["error.message"] == "[TABLE_OR_VIEW_NOT_FOUND] The table or view `missing_table` cannot be found." + assert span.tags["error.stack"] != null + } + } + } + } + + def "test SQL analysis failure does not override job failure"() { + setup: + def listener = getTestDatadogSparkListener() + listener.onApplicationStart(applicationStartEvent(1000L)) + + // SQL failure happens first + listener.onSqlFailure(new RuntimeException("SQL error")) + + // Then a job runs and fails — job failure should take precedence + listener.onJobStart(jobStartEvent(1, 1500L, [1])) + listener.onStageSubmitted(stageSubmittedEvent(1, 1500L)) + listener.onStageCompleted(stageCompletedEvent(1, 1800L)) + listener.onJobEnd(jobFailedEvent(1, 2000L, "Job aborted due to NullPointerException")) + + listener.onApplicationEnd(new SparkListenerApplicationEnd(3000L)) + + expect: + assertTraces(1) { + trace(3) { + span { + operationName "spark.application" + spanType "spark" + errored true + parent() + // Job failure should take precedence over SQL failure + assert span.tags["error.type"] == "Spark Application Failed" + } + span { + operationName "spark.job" + spanType "spark" + errored true + childOf(span(0)) + } + span { + operationName "spark.stage" + spanType "spark" + childOf(span(1)) + } + } + } + } + def "test setupOpenLineage gets service name"(boolean serviceNameSetByUser, String serviceName, String sparkAppName) { setup: SparkConf sparkConf = new SparkConf() diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkTest.groovy b/dd-java-agent/instrumentation/spark/spark-common/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkTest.groovy index c39f42f473c..9b551da2fa5 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkTest.groovy +++ b/dd-java-agent/instrumentation/spark/spark-common/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkTest.groovy @@ -193,6 +193,36 @@ abstract class AbstractSparkTest extends InstrumentationSpecification { } } + def "sql analysis failure on missing table marks application span as error"() { + setup: + def sparkSession = SparkSession.builder() + .config("spark.master", "local[2]") + .getOrCreate() + + try { + sparkSession.sql("SELECT * FROM missing_table").show() + } catch (Exception ignored) { + // Expected: AnalysisException thrown by Catalyst before any Spark job is submitted + } + sparkSession.stop() + + expect: + assertTraces(1) { + trace(1) { + span { + operationName "spark.application" + resourceName "spark.application" + spanType "spark" + errored true + parent() + assert span.tags["error.type"] == "Spark SQL Failed" + assert span.tags["error.message"] =~ /(?i).*missing_table.*/ + assert span.tags["error.stack"] =~ /(?s).*AnalysisException.*/ + } + } + } + } + def "capture SparkSubmit.runMain() errors"() { setup: def sparkSession = SparkSession.builder()