From 19feeee57f90a4373b78f934461c134868e98e75 Mon Sep 17 00:00:00 2001 From: Dave Marion Date: Fri, 27 Mar 2026 20:48:14 +0000 Subject: [PATCH] Added job update tracking back to Compactor The changes in #6252 removed code to report state back to the Coordinator and removed the code that was tracking the state of the compaction in the Compactor. The latter is still needed. This puts it back. --- .../apache/accumulo/compactor/Compactor.java | 35 +++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java index 5a1ff0e1155..d8c2c1889d4 100644 --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java @@ -66,6 +66,7 @@ import org.apache.accumulo.core.compaction.thrift.CompactionCoordinatorService.Client; import org.apache.accumulo.core.compaction.thrift.CompactorService; import org.apache.accumulo.core.compaction.thrift.TCompactionState; +import org.apache.accumulo.core.compaction.thrift.TCompactionStatusUpdate; import org.apache.accumulo.core.compaction.thrift.TExternalCompaction; import org.apache.accumulo.core.compaction.thrift.TNextCompactionJob; import org.apache.accumulo.core.compaction.thrift.UnknownCompactionIdException; @@ -463,6 +464,21 @@ public void cancel(TInfo tinfo, TCredentials credentials, String externalCompact cancel(externalCompactionId); } + /** + * Send an update to the CompactionCoordinator for this job + * + * @param job compactionJob + * @param update status update + */ + protected void updateCompactionState(TExternalCompactionJob job, TCompactionStatusUpdate update) { + long updateTime = System.currentTimeMillis(); + TExternalCompaction tec = JOB_HOLDER.getCurrentCompaction(); + if (update.getState() == TCompactionState.STARTED) { + tec.setStartTime(updateTime); + } + tec.putToUpdates(updateTime, update); + } + /** * Notify the CompactionCoordinator the job failed * @@ -584,6 +600,9 @@ protected FileCompactorRunnable createCompactionJob(final TExternalCompactionJob public void initialize() throws RetriesExceededException { LOG.info("Starting up compaction runnable for job: {}", job); this.compactionStartTime = Timer.startNew(); + TCompactionStatusUpdate update = new TCompactionStatusUpdate(TCompactionState.STARTED, + "Compaction started", -1, -1, -1, getCompactionAge().toNanos()); + updateCompactionState(job, update); final var extent = KeyExtent.fromThrift(job.getExtent()); final AccumuloConfiguration aConfig; final TableConfiguration tConfig = getContext().getTableConfiguration(extent.tableId()); @@ -650,6 +669,11 @@ public void run() { JOB_HOLDER.setStats(cs); LOG.info("Compaction completed successfully {} ", job.getExternalCompactionId()); + // Update state when completed + TCompactionStatusUpdate update2 = new TCompactionStatusUpdate(TCompactionState.SUCCEEDED, + "Compaction completed successfully", -1, -1, -1, this.getCompactionAge().toNanos()); + updateCompactionState(job, update2); + } catch (FileCompactor.CompactionCanceledException cce) { LOG.debug("Compaction canceled {}", job.getExternalCompactionId()); err.set(cce); @@ -919,6 +943,10 @@ public void run() { entriesRead, inputEntries, percentComplete, "%", entriesWritten); watcher.run(); LOG.debug("Compaction progress: {}.", message); + TCompactionStatusUpdate update = + new TCompactionStatusUpdate(TCompactionState.IN_PROGRESS, message, inputEntries, + entriesRead, entriesWritten, fcr.getCompactionAge().toNanos()); + updateCompactionState(job, update); } } else { LOG.debug("Waiting on compaction thread to finish, but no RUNNING compaction"); @@ -939,6 +967,9 @@ public void run() { if (compactionThread.isInterrupted() || JOB_HOLDER.isCancelled() || (err.get() != null && err.get().getClass().equals(InterruptedException.class))) { LOG.warn("Compaction thread was interrupted"); + TCompactionStatusUpdate update = new TCompactionStatusUpdate(TCompactionState.CANCELLED, + "Compaction cancelled", -1, -1, -1, fcr.getCompactionAge().toNanos()); + updateCompactionState(job, update); try { updateCompactionFailed(job, TCompactionState.CANCELLED, "Compaction cancelled"); cancelled.incrementAndGet(); @@ -952,6 +983,10 @@ public void run() { try { LOG.info("Compaction failed: id: {}, extent: {}", job.getExternalCompactionId(), fromThriftExtent); + TCompactionStatusUpdate update = new TCompactionStatusUpdate(TCompactionState.FAILED, + "Compaction failed due to: " + err.get().getMessage(), -1, -1, -1, + fcr.getCompactionAge().toNanos()); + updateCompactionState(job, update); updateCompactionFailed(job, TCompactionState.FAILED, "Compaction failed due to: " + err.get().getMessage()); failed.incrementAndGet();