diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java index 5a1ff0e1155..d8c2c1889d4 100644 --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java @@ -66,6 +66,7 @@ import org.apache.accumulo.core.compaction.thrift.CompactionCoordinatorService.Client; import org.apache.accumulo.core.compaction.thrift.CompactorService; import org.apache.accumulo.core.compaction.thrift.TCompactionState; +import org.apache.accumulo.core.compaction.thrift.TCompactionStatusUpdate; import org.apache.accumulo.core.compaction.thrift.TExternalCompaction; import org.apache.accumulo.core.compaction.thrift.TNextCompactionJob; import org.apache.accumulo.core.compaction.thrift.UnknownCompactionIdException; @@ -463,6 +464,21 @@ public void cancel(TInfo tinfo, TCredentials credentials, String externalCompact cancel(externalCompactionId); } + /** + * Send an update to the CompactionCoordinator for this job + * + * @param job compactionJob + * @param update status update + */ + protected void updateCompactionState(TExternalCompactionJob job, TCompactionStatusUpdate update) { + long updateTime = System.currentTimeMillis(); + TExternalCompaction tec = JOB_HOLDER.getCurrentCompaction(); + if (update.getState() == TCompactionState.STARTED) { + tec.setStartTime(updateTime); + } + tec.putToUpdates(updateTime, update); + } + /** * Notify the CompactionCoordinator the job failed * @@ -584,6 +600,9 @@ protected FileCompactorRunnable createCompactionJob(final TExternalCompactionJob public void initialize() throws RetriesExceededException { LOG.info("Starting up compaction runnable for job: {}", job); this.compactionStartTime = Timer.startNew(); + TCompactionStatusUpdate update = new TCompactionStatusUpdate(TCompactionState.STARTED, + "Compaction started", -1, -1, -1, getCompactionAge().toNanos()); + updateCompactionState(job, update); final var extent = KeyExtent.fromThrift(job.getExtent()); final AccumuloConfiguration aConfig; final TableConfiguration tConfig = getContext().getTableConfiguration(extent.tableId()); @@ -650,6 +669,11 @@ public void run() { JOB_HOLDER.setStats(cs); LOG.info("Compaction completed successfully {} ", job.getExternalCompactionId()); + // Update state when completed + TCompactionStatusUpdate update2 = new TCompactionStatusUpdate(TCompactionState.SUCCEEDED, + "Compaction completed successfully", -1, -1, -1, this.getCompactionAge().toNanos()); + updateCompactionState(job, update2); + } catch (FileCompactor.CompactionCanceledException cce) { LOG.debug("Compaction canceled {}", job.getExternalCompactionId()); err.set(cce); @@ -919,6 +943,10 @@ public void run() { entriesRead, inputEntries, percentComplete, "%", entriesWritten); watcher.run(); LOG.debug("Compaction progress: {}.", message); + TCompactionStatusUpdate update = + new TCompactionStatusUpdate(TCompactionState.IN_PROGRESS, message, inputEntries, + entriesRead, entriesWritten, fcr.getCompactionAge().toNanos()); + updateCompactionState(job, update); } } else { LOG.debug("Waiting on compaction thread to finish, but no RUNNING compaction"); @@ -939,6 +967,9 @@ public void run() { if (compactionThread.isInterrupted() || JOB_HOLDER.isCancelled() || (err.get() != null && err.get().getClass().equals(InterruptedException.class))) { LOG.warn("Compaction thread was interrupted"); + TCompactionStatusUpdate update = new TCompactionStatusUpdate(TCompactionState.CANCELLED, + "Compaction cancelled", -1, -1, -1, fcr.getCompactionAge().toNanos()); + updateCompactionState(job, update); try { updateCompactionFailed(job, TCompactionState.CANCELLED, "Compaction cancelled"); cancelled.incrementAndGet(); @@ -952,6 +983,10 @@ public void run() { try { LOG.info("Compaction failed: id: {}, extent: {}", job.getExternalCompactionId(), fromThriftExtent); + TCompactionStatusUpdate update = new TCompactionStatusUpdate(TCompactionState.FAILED, + "Compaction failed due to: " + err.get().getMessage(), -1, -1, -1, + fcr.getCompactionAge().toNanos()); + updateCompactionState(job, update); updateCompactionFailed(job, TCompactionState.FAILED, "Compaction failed due to: " + err.get().getMessage()); failed.incrementAndGet();