Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.apache.accumulo.core.compaction.thrift.CompactionCoordinatorService.Client;
import org.apache.accumulo.core.compaction.thrift.CompactorService;
import org.apache.accumulo.core.compaction.thrift.TCompactionState;
import org.apache.accumulo.core.compaction.thrift.TCompactionStatusUpdate;
import org.apache.accumulo.core.compaction.thrift.TExternalCompaction;
import org.apache.accumulo.core.compaction.thrift.TNextCompactionJob;
import org.apache.accumulo.core.compaction.thrift.UnknownCompactionIdException;
Expand Down Expand Up @@ -463,6 +464,21 @@ public void cancel(TInfo tinfo, TCredentials credentials, String externalCompact
cancel(externalCompactionId);
}

/**
* Send an update to the CompactionCoordinator for this job
*
* @param job compactionJob
* @param update status update
*/
protected void updateCompactionState(TExternalCompactionJob job, TCompactionStatusUpdate update) {
long updateTime = System.currentTimeMillis();
TExternalCompaction tec = JOB_HOLDER.getCurrentCompaction();
if (update.getState() == TCompactionState.STARTED) {
tec.setStartTime(updateTime);
}
tec.putToUpdates(updateTime, update);
}

/**
* Notify the CompactionCoordinator the job failed
*
Expand Down Expand Up @@ -584,6 +600,9 @@ protected FileCompactorRunnable createCompactionJob(final TExternalCompactionJob
public void initialize() throws RetriesExceededException {
LOG.info("Starting up compaction runnable for job: {}", job);
this.compactionStartTime = Timer.startNew();
TCompactionStatusUpdate update = new TCompactionStatusUpdate(TCompactionState.STARTED,
"Compaction started", -1, -1, -1, getCompactionAge().toNanos());
updateCompactionState(job, update);
final var extent = KeyExtent.fromThrift(job.getExtent());
final AccumuloConfiguration aConfig;
final TableConfiguration tConfig = getContext().getTableConfiguration(extent.tableId());
Expand Down Expand Up @@ -650,6 +669,11 @@ public void run() {
JOB_HOLDER.setStats(cs);

LOG.info("Compaction completed successfully {} ", job.getExternalCompactionId());
// Update state when completed
TCompactionStatusUpdate update2 = new TCompactionStatusUpdate(TCompactionState.SUCCEEDED,
"Compaction completed successfully", -1, -1, -1, this.getCompactionAge().toNanos());
updateCompactionState(job, update2);

} catch (FileCompactor.CompactionCanceledException cce) {
LOG.debug("Compaction canceled {}", job.getExternalCompactionId());
err.set(cce);
Expand Down Expand Up @@ -919,6 +943,10 @@ public void run() {
entriesRead, inputEntries, percentComplete, "%", entriesWritten);
watcher.run();
LOG.debug("Compaction progress: {}.", message);
TCompactionStatusUpdate update =
new TCompactionStatusUpdate(TCompactionState.IN_PROGRESS, message, inputEntries,
entriesRead, entriesWritten, fcr.getCompactionAge().toNanos());
updateCompactionState(job, update);
}
} else {
LOG.debug("Waiting on compaction thread to finish, but no RUNNING compaction");
Expand All @@ -939,6 +967,9 @@ public void run() {
if (compactionThread.isInterrupted() || JOB_HOLDER.isCancelled()
|| (err.get() != null && err.get().getClass().equals(InterruptedException.class))) {
LOG.warn("Compaction thread was interrupted");
TCompactionStatusUpdate update = new TCompactionStatusUpdate(TCompactionState.CANCELLED,
"Compaction cancelled", -1, -1, -1, fcr.getCompactionAge().toNanos());
updateCompactionState(job, update);
try {
updateCompactionFailed(job, TCompactionState.CANCELLED, "Compaction cancelled");
cancelled.incrementAndGet();
Expand All @@ -952,6 +983,10 @@ public void run() {
try {
LOG.info("Compaction failed: id: {}, extent: {}", job.getExternalCompactionId(),
fromThriftExtent);
TCompactionStatusUpdate update = new TCompactionStatusUpdate(TCompactionState.FAILED,
"Compaction failed due to: " + err.get().getMessage(), -1, -1, -1,
fcr.getCompactionAge().toNanos());
updateCompactionState(job, update);
updateCompactionFailed(job, TCompactionState.FAILED,
"Compaction failed due to: " + err.get().getMessage());
failed.incrementAndGet();
Expand Down