diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java index 786f506f218..7fe4acf402e 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java +++ b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java @@ -128,13 +128,7 @@ public abstract class AbstractDatadogSparkListener extends SparkListener { protected final HashMap sqlPlans = new HashMap<>(); private final HashMap liveExecutors = new HashMap<>(); - // There is no easy way to know if an accumulator is not useful anymore (meaning it is not part of - // an active SQL query) so capping the size of the collection storing them - // TODO (CY): Is this potentially the reason why some Spark Plans aren't showing up consistently? - // If we know we don't need the accumulator values, can we drop all associated data and just map - // stage ID -> accumulator ID? Put this behind some FF - private final Map accumulators = - new RemoveEldestHashMap<>(MAX_ACCUMULATOR_SIZE); + private final Map accumulatorToStageID = new HashMap<>(); private volatile boolean isStreamingJob = false; private final boolean isRunningOnDatabricks; @@ -650,7 +644,7 @@ public synchronized void onStageCompleted(SparkListenerStageCompleted stageCompl for (AccumulableInfo info : JavaConverters.asJavaCollection(stageInfo.accumulables().values())) { - accumulators.put(info.id(), new SparkSQLUtils.AccumulatorWithStage(stageId, info)); + accumulatorToStageID.put(info.id(), stageId); } Properties prop = stageProperties.remove(stageSpanKey); @@ -682,7 +676,8 @@ public synchronized void onStageCompleted(SparkListenerStageCompleted stageCompl SparkPlanInfo sqlPlan = sqlPlans.get(sqlExecutionId); if (sqlPlan != null) { - SparkSQLUtils.addSQLPlanToStageSpan(span, sqlPlan, accumulators, stageMetric, stageId); + SparkSQLUtils.addSQLPlanToStageSpan( + span, sqlPlan, accumulatorToStageID, stageMetric, stageId); } span.finish(completionTimeMs * 1000); diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkSQLUtils.java b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkSQLUtils.java index 7eff461a00a..eb5875f2a38 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkSQLUtils.java +++ b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkSQLUtils.java @@ -23,12 +23,12 @@ public class SparkSQLUtils { public static void addSQLPlanToStageSpan( AgentSpan span, SparkPlanInfo plan, - Map accumulators, + Map accumulatorToStageID, SparkAggregatedTaskMetrics stageMetric, int stageId) { Set parentStageIds = new HashSet<>(); SparkPlanInfoForStage planForStage = - computeStageInfoForStage(plan, accumulators, stageId, parentStageIds, false); + computeStageInfoForStage(plan, accumulatorToStageID, stageId, parentStageIds, false); span.setTag("_dd.spark.sql_parent_stage_ids", parentStageIds.toString()); @@ -40,11 +40,11 @@ public static void addSQLPlanToStageSpan( public static SparkPlanInfoForStage computeStageInfoForStage( SparkPlanInfo plan, - Map accumulators, + Map accumulatorToStageID, int stageId, Set parentStageIds, boolean foundStage) { - Set stageIds = stageIdsForPlan(plan, accumulators); + Set stageIds = stageIdsForPlan(plan, accumulatorToStageID); boolean hasStageInfo = !stageIds.isEmpty(); boolean isForStage = stageIds.contains(stageId); @@ -64,7 +64,7 @@ public static SparkPlanInfoForStage computeStageInfoForStage( List childrenForStage = new ArrayList<>(); for (SparkPlanInfo child : children) { SparkPlanInfoForStage planForStage = - computeStageInfoForStage(child, accumulators, stageId, parentStageIds, true); + computeStageInfoForStage(child, accumulatorToStageID, stageId, parentStageIds, true); if (planForStage != null) { childrenForStage.add(planForStage); @@ -76,7 +76,7 @@ public static SparkPlanInfoForStage computeStageInfoForStage( // The expected stage was not found yet, searching in the children nodes for (SparkPlanInfo child : children) { SparkPlanInfoForStage planForStage = - computeStageInfoForStage(child, accumulators, stageId, parentStageIds, false); + computeStageInfoForStage(child, accumulatorToStageID, stageId, parentStageIds, false); if (planForStage != null) { // Early stopping if the stage was found, no need to keep searching @@ -89,17 +89,17 @@ public static SparkPlanInfoForStage computeStageInfoForStage( } private static Set stageIdsForPlan( - SparkPlanInfo info, Map accumulators) { + SparkPlanInfo info, Map accumulatorToStageID) { Set stageIds = new HashSet<>(); Collection metrics = AbstractDatadogSparkListener.listener.getPlanInfoMetrics(info); for (SQLMetricInfo metric : metrics) { // Using the accumulators to associate a plan with its stage - AccumulatorWithStage acc = accumulators.get(metric.accumulatorId()); + Integer stageId = accumulatorToStageID.get(metric.accumulatorId()); - if (acc != null) { - stageIds.add(acc.stageId); + if (stageId != null) { + stageIds.add(stageId); } }