Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,9 @@ public abstract class AbstractDatadogSparkListener extends SparkListener {
private boolean lastJobFailed = false;
private String lastJobFailedMessage;
private String lastJobFailedStackTrace;
private boolean lastSqlFailed = false;
private String lastSqlFailedMessage;
private String lastSqlFailedStackTrace;
private int jobCount = 0;
private int currentExecutorCount = 0;
private int maxExecutorCount = 0;
Expand Down Expand Up @@ -310,6 +313,23 @@ private void captureOpenlineageContextIfPresent(
builder.withTag("openlineage_root_parent_run_id", context.getRootParentRunId());
}

/**
* Called by SparkSqlFailureAdvice when a SQL call (e.g. SparkSession.sql()) throws an exception
* during Catalyst analysis, before any Spark job is submitted. This ensures finishApplication()
* has an error signal even when no job/stage/task events fire.
*/
public synchronized void onSqlFailure(Throwable throwable) {
if (applicationEnded) {
return;
}
lastSqlFailed = true;
lastSqlFailedMessage = throwable.getMessage();

StringWriter sw = new StringWriter();
throwable.printStackTrace(new PrintWriter(sw));
lastSqlFailedStackTrace = sw.toString();
}

@Override
public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) {
log.info(
Expand Down Expand Up @@ -356,6 +376,11 @@ public synchronized void finishApplication(
applicationSpan.setTag(DDTags.ERROR_TYPE, "Spark Application Failed");
applicationSpan.setTag(DDTags.ERROR_MSG, lastJobFailedMessage);
applicationSpan.setTag(DDTags.ERROR_STACK, lastJobFailedStackTrace);
} else if (lastSqlFailed) {
applicationSpan.setError(true);
applicationSpan.setTag(DDTags.ERROR_TYPE, "Spark SQL Failed");
applicationSpan.setTag(DDTags.ERROR_MSG, lastSqlFailedMessage);
applicationSpan.setTag(DDTags.ERROR_STACK, lastSqlFailedStackTrace);
}

applicationMetrics.setSpanMetrics(applicationSpan);
Expand Down Expand Up @@ -544,6 +569,7 @@ public synchronized void onJobEnd(SparkListenerJobEnd jobEnd) {
}
} else {
lastJobFailed = false;
lastSqlFailed = false;
}

SparkAggregatedTaskMetrics metrics = jobMetrics.remove(jobEnd.jobId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ public String[] knownMatchingTypes() {
"org.apache.spark.util.Utils",
"org.apache.spark.util.SparkClassUtils",
"org.apache.spark.scheduler.LiveListenerBus",
"org.apache.spark.sql.execution.SparkPlanInfo$"
"org.apache.spark.sql.execution.SparkPlanInfo$",
"org.apache.spark.sql.SparkSession"
};
}

Expand Down Expand Up @@ -67,6 +68,15 @@ public void methodAdvice(MethodTransformer transformer) {
.and(isDeclaredBy(named("org.apache.spark.deploy.yarn.ApplicationMaster"))),
AbstractSparkInstrumentation.class.getName() + "$YarnFinishAdvice");

// SparkSession.sql(String, ...) — catch AnalysisException failures that fire during Catalyst
// analysis before any Spark job is submitted and are invisible to the listener bus
transformer.applyAdvice(
isMethod()
.and(named("sql"))
.and(takesArgument(0, String.class))
.and(isDeclaredBy(named("org.apache.spark.sql.SparkSession"))),
AbstractSparkInstrumentation.class.getName() + "$SparkSqlFailureAdvice");

// LiveListenerBus class is used to manage spark listeners
transformer.applyAdvice(
isMethod()
Expand Down Expand Up @@ -122,6 +132,15 @@ public static void enter(@Advice.Argument(1) int exitCode, @Advice.Argument(2) S
}
}

public static class SparkSqlFailureAdvice {
@Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class)
public static void exit(@Advice.Thrown Throwable throwable) {
if (throwable != null && AbstractDatadogSparkListener.listener != null) {
AbstractDatadogSparkListener.listener.onSqlFailure(throwable);
}
}
}

public static class LiveListenerBusAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class, skipOn = Advice.OnNonDefaultValue.class)
// If OL is disabled in tracer config but user set it up manually don't interfere
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,75 @@ abstract class AbstractSparkListenerTest extends InstrumentationSpecification {
}
}

def "test SQL analysis failure marks application span as error"() {
setup:
def listener = getTestDatadogSparkListener()
listener.onApplicationStart(applicationStartEvent(1000L))

// Simulate a SQL failure during Catalyst analysis (before any Spark job is submitted)
def analysisException = new RuntimeException("[TABLE_OR_VIEW_NOT_FOUND] The table or view `missing_table` cannot be found.")
listener.onSqlFailure(analysisException)

listener.onApplicationEnd(new SparkListenerApplicationEnd(2000L))

expect:
assertTraces(1) {
trace(1) {
span {
operationName "spark.application"
spanType "spark"
errored true
parent()
assert span.tags["error.type"] == "Spark SQL Failed"
assert span.tags["error.message"] == "[TABLE_OR_VIEW_NOT_FOUND] The table or view `missing_table` cannot be found."
assert span.tags["error.stack"] != null
}
}
}
}

def "test SQL analysis failure does not override job failure"() {
setup:
def listener = getTestDatadogSparkListener()
listener.onApplicationStart(applicationStartEvent(1000L))

// SQL failure happens first
listener.onSqlFailure(new RuntimeException("SQL error"))

// Then a job runs and fails — job failure should take precedence
listener.onJobStart(jobStartEvent(1, 1500L, [1]))
listener.onStageSubmitted(stageSubmittedEvent(1, 1500L))
listener.onStageCompleted(stageCompletedEvent(1, 1800L))
listener.onJobEnd(jobFailedEvent(1, 2000L, "Job aborted due to NullPointerException"))

listener.onApplicationEnd(new SparkListenerApplicationEnd(3000L))

expect:
assertTraces(1) {
trace(3) {
span {
operationName "spark.application"
spanType "spark"
errored true
parent()
// Job failure should take precedence over SQL failure
assert span.tags["error.type"] == "Spark Application Failed"
}
span {
operationName "spark.job"
spanType "spark"
errored true
childOf(span(0))
}
span {
operationName "spark.stage"
spanType "spark"
childOf(span(1))
}
}
}
}

def "test setupOpenLineage gets service name"(boolean serviceNameSetByUser, String serviceName, String sparkAppName) {
setup:
SparkConf sparkConf = new SparkConf()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,36 @@ abstract class AbstractSparkTest extends InstrumentationSpecification {
}
}

def "sql analysis failure on missing table marks application span as error"() {
setup:
def sparkSession = SparkSession.builder()
.config("spark.master", "local[2]")
.getOrCreate()

try {
sparkSession.sql("SELECT * FROM missing_table").show()
} catch (Exception ignored) {
// Expected: AnalysisException thrown by Catalyst before any Spark job is submitted
}
sparkSession.stop()

expect:
assertTraces(1) {
trace(1) {
span {
operationName "spark.application"
resourceName "spark.application"
spanType "spark"
errored true
parent()
assert span.tags["error.type"] == "Spark SQL Failed"
assert span.tags["error.message"] =~ /(?i).*missing_table.*/
assert span.tags["error.stack"] =~ /(?s).*AnalysisException.*/
}
}
}
}

def "capture SparkSubmit.runMain() errors"() {
setup:
def sparkSession = SparkSession.builder()
Expand Down
Loading