diff --git a/core/src/main/java/org/apache/accumulo/core/fate/FatePartition.java b/core/src/main/java/org/apache/accumulo/core/fate/FatePartition.java index 33cbbc97240..44907cb788c 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/FatePartition.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/FatePartition.java @@ -60,4 +60,8 @@ public boolean contains(FateId fateId) { } } + + public FateInstanceType getType() { + return start.getType(); + } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index b1edc708576..204de1d79d9 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -50,7 +50,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -77,7 +76,6 @@ import org.apache.accumulo.core.fate.FateClient; import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.FateInstanceType; -import org.apache.accumulo.core.fate.FatePartition; import org.apache.accumulo.core.fate.FateStore; import org.apache.accumulo.core.fate.TraceRepo; import org.apache.accumulo.core.fate.user.UserFateStore; @@ -122,11 +120,9 @@ import org.apache.accumulo.manager.fate.FateManager; import org.apache.accumulo.manager.fate.FateWorker; import org.apache.accumulo.manager.merge.FindMergeableRangeTask; -import org.apache.accumulo.manager.metrics.fate.FateExecutorMetricsProducer; import org.apache.accumulo.manager.metrics.fate.meta.MetaFateMetrics; import org.apache.accumulo.manager.metrics.fate.user.UserFateMetrics; import org.apache.accumulo.manager.recovery.RecoveryManager; -import org.apache.accumulo.manager.split.FileRangeCache; import org.apache.accumulo.manager.split.Splitter; import org.apache.accumulo.manager.state.TableCounts; import org.apache.accumulo.manager.tableOps.FateEnv; @@ -178,10 +174,8 @@ *

* The manager will also coordinate log recoveries and reports general status. */ -// TODO create standalone PrimaryFateEnv class and pull everything into there relatated to -// FateEnv... this will make it much more clear the env is for metadata ops only public class Manager extends AbstractServer - implements LiveTServerSet.Listener, FateEnv, PrimaryManagerThriftService { + implements LiveTServerSet.Listener, PrimaryManagerThriftService { static final Logger log = LoggerFactory.getLogger(Manager.class); @@ -227,8 +221,6 @@ public class Manager extends AbstractServer private final CountDownLatch fateReadyLatch = new CountDownLatch(1); private final AtomicReference>> fateClients = new AtomicReference<>(); - private final AtomicReference>> fateRefs = - new AtomicReference<>(); private volatile FateManager fateManager; static class TServerStatus { @@ -286,7 +278,6 @@ void setTserverStatus(LiveTServersSnapshot snapshot, private final long timeToCacheRecoveryWalExistence; private ExecutorService tableInformationStatusPool = null; - private ThreadPoolExecutor tabletRefreshThreadPool; private final TabletStateStore rootTabletStore; private final TabletStateStore metadataTabletStore; @@ -340,21 +331,15 @@ private void waitForFate() { } /** - * Retrieve the Fate object, blocking until it is ready. This could cause problems if Fate + * Retrieve the FateClient object, blocking until it is ready. This could cause problems if Fate * operations are attempted to be used prior to the Manager being ready for them. If these * operations are triggered by a client side request from a tserver or client, it should be safe * to wait to handle those until Fate is ready, but if it occurs during an upgrade, or some other * time in the Manager before Fate is started, that may result in a deadlock and will need to be * fixed. * - * @return the Fate object, only after the fate components are running and ready + * @return the FateClient object, only after the fate components are running and ready */ - public Fate fate(FateInstanceType type) { - waitForFate(); - var fate = requireNonNull(fateRefs.get(), "fateRefs is not set yet").get(type); - return requireNonNull(fate, () -> "fate type " + type + " is not present"); - } - public FateClient fateClient(FateInstanceType type) { waitForFate(); var client = requireNonNull(fateClients.get(), "fateClients is not set yet").get(type); @@ -497,16 +482,10 @@ int displayUnassigned() { return result; } - @Override public TableManager getTableManager() { return getContext().getTableManager(); } - @Override - public ThreadPoolExecutor getTabletRefreshThreadPool() { - return tabletRefreshThreadPool; - } - public static void main(String[] args) throws Exception { AbstractServer.startServer(new Manager(new ServerOpts(), ServerContext::new, args), log); } @@ -585,17 +564,11 @@ ManagerGoalState getManagerGoalState() { } private Splitter splitter; - private FileRangeCache fileRangeCache; public Splitter getSplitter() { return splitter; } - @Override - public FileRangeCache getFileRangeCache() { - return fileRangeCache; - } - public UpgradeCoordinator.UpgradeStatus getUpgradeStatus() { return upgradeCoordinator.getStatus(); } @@ -717,14 +690,14 @@ public void run() { case CLEAN_STOP: switch (getManagerState()) { case NORMAL: - fateManager.stop(Duration.ofMinutes(1)); + fateManager.stop(FateInstanceType.USER, Duration.ofMinutes(1)); setManagerState(ManagerState.SAFE_MODE); break; case SAFE_MODE: { // META fate stores its data in Zookeeper and its operations interact with // metadata and root tablets, need to completely shut it down before unloading // metadata and root tablets - fate(FateInstanceType.META).shutdown(1, MINUTES); + fateManager.stop(FateInstanceType.META, Duration.ofMinutes(1)); int count = nonMetaDataTabletsAssignedOrHosted(); log.debug( String.format("There are %d non-metadata tablets assigned or hosted", count)); @@ -953,9 +926,6 @@ private void setupPrimaryMetrics() { getConfiguration().getTimeInMillis(Property.MANAGER_FATE_METRICS_MIN_UPDATE_INTERVAL))); metricsInfo.addMetricsProducers(new UserFateMetrics(getContext(), getConfiguration().getTimeInMillis(Property.MANAGER_FATE_METRICS_MIN_UPDATE_INTERVAL))); - metricsInfo.addMetricsProducers(new FateExecutorMetricsProducer(getContext(), - fate(FateInstanceType.META).getFateExecutors(), - getConfiguration().getTimeInMillis(Property.MANAGER_FATE_METRICS_MIN_UPDATE_INTERVAL))); metricsInfo.addMetricsProducers(this); } @@ -1045,11 +1015,6 @@ public void run() { tableInformationStatusPool = ThreadPools.getServerThreadPools() .createExecutorService(getConfiguration(), Property.MANAGER_STATUS_THREAD_POOL_SIZE, false); - tabletRefreshThreadPool = ThreadPools.getServerThreadPools().getPoolBuilder("Tablet refresh ") - .numCoreThreads(getConfiguration().getCount(Property.MANAGER_TABLET_REFRESH_MINTHREADS)) - .numMaxThreads(getConfiguration().getCount(Property.MANAGER_TABLET_REFRESH_MAXTHREADS)) - .build(); - Thread statusThread = Threads.createCriticalThread("Status Thread", new StatusThread()); statusThread.start(); @@ -1195,7 +1160,6 @@ boolean canSuspendTablets() { this.splitter = new Splitter(this); this.splitter.start(); - this.fileRangeCache = new FileRangeCache(context); setupFate(context); @@ -1283,8 +1247,8 @@ boolean canSuspendTablets() { } log.debug("Shutting down fate."); - fate(FateInstanceType.META).close(); - fateManager.stop(Duration.ZERO); + fateManager.stop(FateInstanceType.USER, Duration.ZERO); + fateManager.stop(FateInstanceType.META, Duration.ZERO); splitter.stop(); @@ -1296,7 +1260,6 @@ boolean canSuspendTablets() { } tableInformationStatusPool.shutdownNow(); - tabletRefreshThreadPool.shutdownNow(); compactionCoordinator.shutdown(); @@ -1344,12 +1307,10 @@ private void setupFate(ServerContext context) { lock -> ServiceLock.isLockHeld(context.getZooCache(), lock); var metaStore = new MetaFateStore(context.getZooSession(), primaryManagerLock.getLockID(), isLockHeld); - var metaInstance = createFateInstance(this, metaStore, context); - // configure this instance to process all data - metaInstance.setPartitions(Set.of(FatePartition.all(FateInstanceType.META))); + var metaFateClient = new FateClient<>(metaStore, TraceRepo::toLogString); var userStore = new UserFateStore(context, SystemTables.FATE.tableName(), managerLock.getLockID(), isLockHeld); - var userFateClient = new FateClient(userStore, TraceRepo::toLogString); + var userFateClient = new FateClient<>(userStore, TraceRepo::toLogString); var metaCleaner = new FateCleaner<>(metaStore, Duration.ofHours(8), this::getSteadyTime); ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor() @@ -1359,14 +1320,10 @@ private void setupFate(ServerContext context) { .scheduleWithFixedDelay(userCleaner::ageOff, 10, 4 * 60, MINUTES)); if (!fateClients.compareAndSet(null, - Map.of(FateInstanceType.META, metaInstance, FateInstanceType.USER, userFateClient))) { + Map.of(FateInstanceType.META, metaFateClient, FateInstanceType.USER, userFateClient))) { throw new IllegalStateException( "Unexpected previous fateClient reference map already initialized"); } - if (!fateRefs.compareAndSet(null, Map.of(FateInstanceType.META, metaInstance))) { - throw new IllegalStateException( - "Unexpected previous fate reference map already initialized"); - } fateReadyLatch.countDown(); } catch (KeeperException | InterruptedException e) { @@ -1522,11 +1479,6 @@ private void getAssistantManagerLock() throws KeeperException, InterruptedExcept } } - @Override - public ServiceLock getServiceLock() { - return primaryManagerLock; - } - private ServiceLockData getPrimaryManagerLock(final ServiceLockPath zManagerLoc) throws KeeperException, InterruptedException { var zooKeeper = getContext().getZooSession(); @@ -1647,7 +1599,6 @@ public Set onlineTables() { return result; } - @Override public Set onlineTabletServers() { return tserverSet.getSnapshot().getTservers(); } @@ -1660,12 +1611,6 @@ public EventCoordinator getEventCoordinator() { return nextEvent; } - @Override - public EventPublisher getEventPublisher() { - return nextEvent; - } - - @Override public VolumeManager getVolumeManager() { return getContext().getVolumeManager(); } @@ -1732,7 +1677,6 @@ public Set shutdownServers() { * monotonic clock, which will be approximately consistent between different managers or different * runs of the same manager. SteadyTime supports both nanoseconds and milliseconds. */ - @Override public SteadyTime getSteadyTime() { return timeKeeper.getTime(); } @@ -1760,14 +1704,4 @@ public void registerMetrics(MeterRegistry registry) { public ServiceLock getLock() { return managerLock; } - - /** - * Get Threads Pool instance which is used by blocked I/O - * - * @return {@link ExecutorService} - */ - @Override - public ExecutorService getRenamePool() { - return this.renamePool; - } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/fate/FateManager.java b/server/manager/src/main/java/org/apache/accumulo/manager/fate/FateManager.java index f5724a0f9dc..a791da32298 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/fate/FateManager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/fate/FateManager.java @@ -32,7 +32,9 @@ import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.FateInstanceType; import org.apache.accumulo.core.fate.FatePartition; +import org.apache.accumulo.core.fate.FateStore; import org.apache.accumulo.core.fate.user.UserFateStore; +import org.apache.accumulo.core.fate.zookeeper.MetaFateStore; import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector; import org.apache.accumulo.core.manager.thrift.FateWorkerService; import org.apache.accumulo.core.metadata.SystemTables; @@ -44,6 +46,7 @@ import org.apache.accumulo.manager.tableOps.FateEnv; import org.apache.accumulo.server.ServerContext; import org.apache.thrift.TException; +import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -103,7 +106,8 @@ private void manageAssistants() { } Map> currentAssignments = new HashMap<>(); currentPartitions.forEach((k, v) -> currentAssignments.put(k, v.partitions())); - Set desiredParititions = getDesiredPartitions(currentAssignments.size()); + Map> desiredParititions = + getDesiredPartitions(currentAssignments.size()); Map> desired = computeDesiredAssignments(currentAssignments, desiredParititions); @@ -188,7 +192,7 @@ public synchronized void start() { @SuppressFBWarnings(value = "SWL_SLEEP_WITH_LOCK_HELD", justification = "Sleep is okay. Can hold the lock as long as needed, as we are shutting down." + " Don't need or want other operations to run.") - public synchronized void stop(Duration timeout) { + public synchronized void stop(FateInstanceType fateType, Duration timeout) { if (!stop.compareAndSet(false, true)) { return; } @@ -219,7 +223,9 @@ public synchronized void stop(Duration timeout) { var currentPartitions = entry.getValue(); if (!currentPartitions.partitions.isEmpty()) { try { - setPartitions(hostPort, currentPartitions.updateId(), Set.of()); + var copy = new HashSet<>(currentPartitions.partitions); + copy.removeIf(fp -> fp.getType() == fateType); + setPartitions(hostPort, currentPartitions.updateId(), copy); } catch (TException e) { log.warn("Failed to unassign fate partitions {}", hostPort, e); } @@ -229,8 +235,16 @@ public synchronized void stop(Duration timeout) { stableAssignments.set(TreeRangeMap.create()); if (!timer.isExpired()) { - var store = new UserFateStore(context, SystemTables.FATE.tableName(), null, null); - + FateStore store = switch (fateType) { + case USER -> new UserFateStore(context, SystemTables.FATE.tableName(), null, null); + case META -> { + try { + yield new MetaFateStore<>(context.getZooSession(), null, null); + } catch (KeeperException | InterruptedException e) { + throw new IllegalStateException(e); + } + } + }; var reserved = store.getActiveReservations(Set.of(FatePartition.all(FateInstanceType.USER))); while (!reserved.isEmpty() && !timer.isExpired()) { if (log.isTraceEnabled()) { @@ -333,28 +347,41 @@ private boolean setPartitions(HostAndPort address, long updateId, Set> computeDesiredAssignments( Map> currentAssignments, - Set desiredParititions) { + Map> desiredParititions) { - Preconditions.checkArgument(currentAssignments.size() == desiredParititions.size()); Map> desiredAssignments = new HashMap<>(); - var copy = new HashSet<>(desiredParititions); + currentAssignments.keySet().forEach(hp -> { + desiredAssignments.put(hp, new HashSet<>()); + }); + + desiredParititions.forEach((fateType, desiredForType) -> { + // This code can not handle more than one partition per host + Preconditions.checkState(desiredForType.size() <= currentAssignments.size()); + + var added = new HashSet(); + + currentAssignments.forEach((hp, partitions) -> { + var hostAssignments = desiredAssignments.get(hp); + partitions.forEach(partition -> { + if (desiredForType.contains(partition) + && hostAssignments.stream().noneMatch(fp -> fp.getType() == fateType) + && !added.contains(partition)) { + hostAssignments.add(partition); + Preconditions.checkState(added.add(partition)); + } + }); + }); - currentAssignments.forEach((hp, partitions) -> { - if (!partitions.isEmpty()) { - var firstPart = partitions.iterator().next(); - if (copy.contains(firstPart)) { - desiredAssignments.put(hp, Set.of(firstPart)); - copy.remove(firstPart); + var iter = Sets.difference(desiredForType, added).iterator(); + currentAssignments.forEach((hp, partitions) -> { + var hostAssignments = desiredAssignments.get(hp); + if (iter.hasNext() && hostAssignments.stream().noneMatch(fp -> fp.getType() == fateType)) { + hostAssignments.add(iter.next()); } - } - }); + }); - var iter = copy.iterator(); - currentAssignments.forEach((hp, partitions) -> { - if (!desiredAssignments.containsKey(hp)) { - desiredAssignments.put(hp, Set.of(iter.next())); - } + Preconditions.checkState(!iter.hasNext()); }); if (log.isTraceEnabled()) { @@ -363,7 +390,6 @@ private Map> computeDesiredAssignments( log.trace(" desired {} {} {}", hp, parts.size(), parts); }); } - return desiredAssignments; } @@ -371,15 +397,23 @@ private Map> computeDesiredAssignments( * Computes a single partition for each worker such that the partition cover all possible UUIDs * and evenly divide the UUIDs. */ - private Set getDesiredPartitions(int numWorkers) { + private Map> getDesiredPartitions(int numWorkers) { Preconditions.checkArgument(numWorkers >= 0); if (numWorkers == 0) { - return Set.of(); + return Map.of(FateInstanceType.META, Set.of(), FateInstanceType.USER, Set.of()); } // create a single partition per worker that equally divides the space - HashSet desired = new HashSet<>(); + Map> desired = new HashMap<>(); + + // meta fate will never see much activity, so give it a single partition. + desired.put(FateInstanceType.META, + Set.of(new FatePartition(FateId.from(FateInstanceType.META, new UUID(0, 0)), + FateId.from(FateInstanceType.META, new UUID(-1, -1))))); + + Set desiredUser = new HashSet<>(); + // All the shifting is because java does not have unsigned integers. Want to evenly partition // [0,2^64) into numWorker ranges, but can not directly do that. Work w/ 60 bit unsigned // integers to partition the space and then shift over by 4. Used 60 bits instead of 63 so it @@ -392,7 +426,7 @@ private Set getDesiredPartitions(int numWorkers) { UUID startUuid = new UUID(start, 0); UUID endUuid = new UUID(end, 0); - desired.add(new FatePartition(FateId.from(FateInstanceType.USER, startUuid), + desiredUser.add(new FatePartition(FateId.from(FateInstanceType.USER, startUuid), FateId.from(FateInstanceType.USER, endUuid))); } @@ -400,9 +434,11 @@ private Set getDesiredPartitions(int numWorkers) { UUID startUuid = new UUID(start, 0); // last partition has a special end uuid that is all f nibbles. UUID endUuid = new UUID(-1, -1); - desired.add(new FatePartition(FateId.from(FateInstanceType.USER, startUuid), + desiredUser.add(new FatePartition(FateId.from(FateInstanceType.USER, startUuid), FateId.from(FateInstanceType.USER, endUuid))); + desired.put(FateInstanceType.USER, desiredUser); + return desired; } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/fate/FateWorker.java b/server/manager/src/main/java/org/apache/accumulo/manager/fate/FateWorker.java index a1409af98ee..f60fc453c0a 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/fate/FateWorker.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/fate/FateWorker.java @@ -20,8 +20,11 @@ import static org.apache.accumulo.core.util.LazySingletons.RANDOM; +import java.util.Arrays; import java.util.List; -import java.util.concurrent.TimeUnit; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -31,9 +34,11 @@ import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.fate.Fate; +import org.apache.accumulo.core.fate.FateInstanceType; import org.apache.accumulo.core.fate.FatePartition; import org.apache.accumulo.core.fate.FateStore; import org.apache.accumulo.core.fate.user.UserFateStore; +import org.apache.accumulo.core.fate.zookeeper.MetaFateStore; import org.apache.accumulo.core.fate.zookeeper.ZooUtil; import org.apache.accumulo.core.lock.ServiceLock; import org.apache.accumulo.core.manager.thrift.FateWorkerService; @@ -49,6 +54,7 @@ import org.apache.accumulo.server.manager.LiveTServerSet; import org.apache.accumulo.server.security.AuditedSecurityOperation; import org.apache.thrift.TException; +import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,8 +67,7 @@ public class FateWorker implements FateWorkerService.Iface { private final AuditedSecurityOperation security; private final LiveTServerSet liveTserverSet; private final FateFactory fateFactory; - private Fate fate; - private FateWorkerEnv fateWorkerEnv; + private final Map> fates = new ConcurrentHashMap<>(); public interface FateFactory { Fate create(FateEnv env, FateStore store, ServerContext context); @@ -71,17 +76,23 @@ public interface FateFactory { public FateWorker(ServerContext ctx, LiveTServerSet liveTServerSet, FateFactory fateFactory) { this.context = ctx; this.security = ctx.getSecurityOperation(); - this.fate = null; this.liveTserverSet = liveTServerSet; this.fateFactory = fateFactory; } public synchronized void setLock(ServiceLock lock) { - fateWorkerEnv = new FateWorkerEnv(context, lock, liveTserverSet); + FateWorkerEnv fateWorkerEnv = new FateWorkerEnv(context, lock, liveTserverSet); Predicate isLockHeld = l -> ServiceLock.isLockHeld(context.getZooCache(), l); + try { + MetaFateStore metaStore = + new MetaFateStore<>(context.getZooSession(), lock.getLockID(), isLockHeld); + this.fates.put(FateInstanceType.META, fateFactory.create(fateWorkerEnv, metaStore, context)); + } catch (KeeperException | InterruptedException e) { + throw new IllegalStateException(e); + } UserFateStore store = new UserFateStore<>(context, SystemTables.FATE.tableName(), lock.getLockID(), isLockHeld); - this.fate = fateFactory.create(fateWorkerEnv, store, context); + this.fates.put(FateInstanceType.USER, fateFactory.create(fateWorkerEnv, store, context)); } private Long expectedUpdateId = null; @@ -105,12 +116,8 @@ public TFatePartitions getPartitions(TInfo tinfo, TCredentials credentials) // id expectedUpdateId = updateId; - if (fate == null) { - return new TFatePartitions(updateId, List.of()); - } else { - return new TFatePartitions(updateId, - fate.getPartitions().stream().map(FatePartition::toThrift).toList()); - } + return new TFatePartitions(updateId, fates.values().stream() + .flatMap(fate -> fate.getPartitions().stream()).map(FatePartition::toThrift).toList()); } } @@ -137,16 +144,22 @@ public boolean setPartitions(TInfo tinfo, TCredentials credentials, long updateI synchronized (this) { // The primary manager should not assign any fate partitions until after upgrade is complete. Preconditions.checkState(isUpgradeComplete()); - if (fate != null && expectedUpdateId != null && updateId == expectedUpdateId) { + + if (expectedUpdateId != null && updateId == expectedUpdateId) { // Set to null which makes it so that an update id can only be used once. expectedUpdateId = null; - var desiredSet = desired.stream().map(FatePartition::from).collect(Collectors.toSet()); - var oldPartitions = fate.setPartitions(desiredSet); - log.info("Changed partitions from {} to {}", oldPartitions, desiredSet); + for (var fateType : FateInstanceType.values()) { + var fate = fates.get(fateType); + var desiredSet = desired.stream().map(FatePartition::from) + .filter(fp -> fp.getType() == fateType).collect(Collectors.toSet()); + var oldPartitions = fate.setPartitions(desiredSet); + log.info("Changed partitions for {} from {} to {}", fateType, oldPartitions, desiredSet); + } + return true; } else { - log.debug("Did not change partitions to {} expectedUpdateId:{} updateId:{} fate==null:{}", - desired, expectedUpdateId, updateId, fate == null); + log.debug("Did not change partitions to {} expectedUpdateId:{} updateId:{} fates:{}", + desired, expectedUpdateId, updateId, fates.keySet()); return false; } } @@ -161,28 +174,24 @@ public void seeded(TInfo tinfo, TCredentials credentials, List t SecurityErrorCode.PERMISSION_DENIED).asThriftException(); } - Fate localFate; - synchronized (this) { - localFate = fate; - } + Map> partitions = + tpartitions.stream().map(FatePartition::from) + .collect(Collectors.groupingBy(FatePartition::getType, Collectors.toSet())); - if (localFate != null) { - localFate.seeded(tpartitions.stream().map(FatePartition::from).collect(Collectors.toSet())); - } - } - - public synchronized void stop() { - fate.shutdown(1, TimeUnit.MINUTES); - fate.close(); - fateWorkerEnv.stop(); - fate = null; - fateWorkerEnv = null; + partitions.forEach((fateType, typePartitions) -> { + var fate = fates.get(fateType); + if (fate != null) { + fate.seeded(typePartitions); + } + }); } public synchronized MetricsProducer[] getMetricsProducers() { - Preconditions.checkState(fate != null, "Not started yet"); - return new MetricsProducer[] { - new FateExecutorMetricsProducer(context, fate.getFateExecutors(), context.getConfiguration() - .getTimeInMillis(Property.MANAGER_FATE_METRICS_MIN_UPDATE_INTERVAL))}; + Preconditions.checkState(!fates.isEmpty(), "Not started yet"); + return Arrays.stream(FateInstanceType.values()).map(fates::get) + .map(fate -> new FateExecutorMetricsProducer(context, fate.getFateExecutors(), + context.getConfiguration() + .getTimeInMillis(Property.MANAGER_FATE_METRICS_MIN_UPDATE_INTERVAL))) + .toArray(MetricsProducer[]::new); } } diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriverTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriverTest.java index c95dcd9ad6a..8d407d763dc 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriverTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriverTest.java @@ -38,7 +38,7 @@ import org.apache.accumulo.core.fate.FateInstanceType; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.zookeeper.ZooSession; -import org.apache.accumulo.manager.Manager; +import org.apache.accumulo.manager.tableOps.FateEnv; import org.apache.accumulo.manager.tableOps.delete.PreDeleteTable; import org.apache.accumulo.server.ServerContext; import org.apache.zookeeper.data.Stat; @@ -74,24 +74,24 @@ protected boolean isCancelled(FateId fateId, ServerContext context) { } } - private Manager manager; + private FateEnv fateEnv; private ServerContext ctx; private ZooSession zk; @BeforeEach public void setup() { - manager = createMock(Manager.class); + fateEnv = createMock(FateEnv.class); ctx = createMock(ServerContext.class); zk = createMock(ZooSession.class); expect(ctx.getInstanceID()).andReturn(instance).anyTimes(); expect(ctx.getZooSession()).andReturn(zk).anyTimes(); expect(zk.asReaderWriter()).andReturn(new ZooReaderWriter(zk)).anyTimes(); - expect(manager.getContext()).andReturn(ctx).anyTimes(); + expect(fateEnv.getContext()).andReturn(ctx).anyTimes(); } @AfterEach public void teardown() { - verify(manager, ctx, zk); + verify(fateEnv, ctx, zk); } @Test @@ -107,9 +107,9 @@ public void testTableBeingDeleted() throws Exception { } private void runDriver(CompactionDriver driver, String expectedMessage) { - replay(manager, ctx, zk); + replay(fateEnv, ctx, zk); var e = assertThrows(AcceptableThriftTableOperationException.class, - () -> driver.isReady(FateId.from(FateInstanceType.USER, UUID.randomUUID()), manager)); + () -> driver.isReady(FateId.from(FateInstanceType.USER, UUID.randomUUID()), fateEnv)); assertEquals(e.getTableId(), tableId.toString()); assertEquals(e.getOp(), TableOperation.COMPACT); assertEquals(e.getType(), TableOperationExceptionType.OTHER); diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/merge/MergeTabletsTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/merge/MergeTabletsTest.java index 9e3037dbe01..7bd6824f2f6 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/merge/MergeTabletsTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/merge/MergeTabletsTest.java @@ -84,7 +84,7 @@ import org.apache.accumulo.core.metadata.schema.UnSplittableMetadata; import org.apache.accumulo.core.tabletserver.log.LogEntry; import org.apache.accumulo.core.util.time.SteadyTime; -import org.apache.accumulo.manager.Manager; +import org.apache.accumulo.manager.tableOps.FateEnv; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.gc.AllVolumesDirectory; import org.apache.accumulo.server.metadata.ConditionalTabletMutatorImpl; @@ -425,7 +425,7 @@ private static void testMerge(List inputTablets, TableId tableId end == null ? null : end.getBytes(UTF_8), MergeInfo.Operation.MERGE); MergeTablets mergeTablets = new MergeTablets(mergeInfo); - Manager manager = EasyMock.mock(Manager.class); + FateEnv fateEnv = EasyMock.mock(FateEnv.class); ServerContext context = EasyMock.mock(ServerContext.class); Ample ample = EasyMock.mock(Ample.class); TabletsMetadata.Builder tabletBuilder = EasyMock.mock(TabletsMetadata.Builder.class); @@ -438,7 +438,7 @@ private static void testMerge(List inputTablets, TableId tableId EasyMock.expect(context.getServiceLock()).andReturn(managerLock).anyTimes(); // setup reading the tablets - EasyMock.expect(manager.getContext()).andReturn(context).atLeastOnce(); + EasyMock.expect(fateEnv.getContext()).andReturn(context).atLeastOnce(); EasyMock.expect(context.getAmple()).andReturn(ample).atLeastOnce(); EasyMock.expect(ample.readTablets()).andReturn(tabletBuilder).once(); EasyMock.expect(tabletBuilder.forTable(tableId)).andReturn(tabletBuilder).once(); @@ -478,12 +478,12 @@ private static void testMerge(List inputTablets, TableId tableId ample.putGcFileAndDirCandidates(tableId, dirs); EasyMock.expectLastCall().once(); - EasyMock.replay(manager, context, ample, tabletBuilder, tabletsMetadata, tabletsMutator, + EasyMock.replay(fateEnv, context, ample, tabletBuilder, tabletsMetadata, tabletsMutator, tabletMutator, cr, managerLock); - mergeTablets.call(fateId, manager); + mergeTablets.call(fateId, fateEnv); - EasyMock.verify(manager, context, ample, tabletBuilder, tabletsMetadata, tabletsMutator, + EasyMock.verify(fateEnv, context, ample, tabletBuilder, tabletsMetadata, tabletsMutator, tabletMutator, cr, managerLock); } } diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/split/UpdateTabletsTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/split/UpdateTabletsTest.java index 6ceac3d6810..dad40a0cb13 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/split/UpdateTabletsTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/split/UpdateTabletsTest.java @@ -66,8 +66,8 @@ import org.apache.accumulo.core.metadata.schema.UnSplittableMetadata; import org.apache.accumulo.core.tabletserver.log.LogEntry; import org.apache.accumulo.core.util.time.SteadyTime; -import org.apache.accumulo.manager.Manager; import org.apache.accumulo.manager.split.FileRangeCache; +import org.apache.accumulo.manager.tableOps.FateEnv; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.metadata.ConditionalTabletMutatorImpl; import org.apache.hadoop.fs.Path; @@ -233,9 +233,9 @@ public void testManyColumns() throws Exception { String dir1 = "dir1"; String dir2 = "dir2"; - Manager manager = EasyMock.mock(Manager.class); + FateEnv fateEnv = EasyMock.mock(FateEnv.class); ServerContext context = EasyMock.mock(ServerContext.class); - EasyMock.expect(manager.getContext()).andReturn(context).atLeastOnce(); + EasyMock.expect(fateEnv.getContext()).andReturn(context).atLeastOnce(); Ample ample = EasyMock.mock(Ample.class); EasyMock.expect(context.getAmple()).andReturn(ample).atLeastOnce(); FileRangeCache fileRangeCache = EasyMock.mock(FileRangeCache.class); @@ -247,8 +247,8 @@ public void testManyColumns() throws Exception { .andReturn(newFileInfo("d", "f")); EasyMock.expect(fileRangeCache.getCachedFileInfo(tableId, file4)) .andReturn(newFileInfo("d", "j")); - EasyMock.expect(manager.getFileRangeCache()).andReturn(fileRangeCache).atLeastOnce(); - EasyMock.expect(manager.getSteadyTime()).andReturn(SteadyTime.from(100_000, TimeUnit.SECONDS)) + EasyMock.expect(fateEnv.getFileRangeCache()).andReturn(fileRangeCache).atLeastOnce(); + EasyMock.expect(fateEnv.getSteadyTime()).andReturn(SteadyTime.from(100_000, TimeUnit.SECONDS)) .atLeastOnce(); ServiceLock managerLock = EasyMock.mock(ServiceLock.class); @@ -394,7 +394,7 @@ public void testManyColumns() throws Exception { tabletsMutator.close(); EasyMock.expectLastCall().anyTimes(); - EasyMock.replay(manager, context, ample, tabletMeta, fileRangeCache, tabletsMutator, + EasyMock.replay(fateEnv, context, ample, tabletMeta, fileRangeCache, tabletsMutator, tablet1Mutator, tablet2Mutator, tablet3Mutator, cr, compactions); // Now we can actually test the split code that writes the new tablets with a bunch columns in // the original tablet @@ -404,9 +404,9 @@ public void testManyColumns() throws Exception { dirNames.add(dir2); UpdateTablets updateTablets = new UpdateTablets( new SplitInfo(origExtent, TabletMergeabilityUtil.systemDefaultSplits(splits)), dirNames); - updateTablets.call(fateId, manager); + updateTablets.call(fateId, fateEnv); - EasyMock.verify(manager, context, ample, tabletMeta, fileRangeCache, tabletsMutator, + EasyMock.verify(fateEnv, context, ample, tabletMeta, fileRangeCache, tabletsMutator, tablet1Mutator, tablet2Mutator, tablet3Mutator, cr, compactions); } @@ -469,15 +469,15 @@ public void testErrors() throws Exception { private static void testError(KeyExtent origExtent, TabletMetadata tm1, FateId fateId) throws Exception { - Manager manager = EasyMock.mock(Manager.class); + FateEnv fateEnv = EasyMock.mock(FateEnv.class); ServerContext context = EasyMock.mock(ServerContext.class); - EasyMock.expect(manager.getContext()).andReturn(context).atLeastOnce(); + EasyMock.expect(fateEnv.getContext()).andReturn(context).atLeastOnce(); Ample ample = EasyMock.mock(Ample.class); EasyMock.expect(context.getAmple()).andReturn(ample).atLeastOnce(); EasyMock.expect(ample.readTablet(origExtent)).andReturn(tm1); - EasyMock.replay(manager, context, ample); + EasyMock.replay(fateEnv, context, ample); // Now we can actually test the split code that writes the new tablets with a bunch columns in // the original tablet SortedSet splits = new TreeSet<>(List.of(new Text("c"))); @@ -485,8 +485,8 @@ private static void testError(KeyExtent origExtent, TabletMetadata tm1, FateId f dirNames.add("d1"); var updateTablets = new UpdateTablets( new SplitInfo(origExtent, TabletMergeabilityUtil.systemDefaultSplits(splits)), dirNames); - updateTablets.call(fateId, manager); + updateTablets.call(fateId, fateEnv); - EasyMock.verify(manager, context, ample); + EasyMock.verify(fateEnv, context, ample); } } diff --git a/test/src/main/java/org/apache/accumulo/test/MultipleManagerFateIT.java b/test/src/main/java/org/apache/accumulo/test/MultipleManagerFateIT.java index 5e825efbac6..dce79c99535 100644 --- a/test/src/main/java/org/apache/accumulo/test/MultipleManagerFateIT.java +++ b/test/src/main/java/org/apache/accumulo/test/MultipleManagerFateIT.java @@ -84,7 +84,6 @@ * */ public class MultipleManagerFateIT extends ConfigurableMacBase { - // A manager that will quickly clean up fate reservations held by dead managers public static class FastFateCleanupManager extends Manager { protected FastFateCleanupManager(ServerOpts opts, String[] args) throws IOException { @@ -111,6 +110,10 @@ protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSit cfg.getClusterServerConfiguration().setNumDefaultCompactors(8); // Set this lower so that locks timeout faster cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s"); + // This test could kill a manager after its written a compaction to the metadata table, but + // before it returns it to the compactor via RPC which creates a dead compaction. Need to speed + // up the dead compaction detection to handle this or else the test will hang. + cfg.setProperty(Property.COMPACTION_COORDINATOR_DEAD_COMPACTOR_CHECK_INTERVAL, "5s"); cfg.setServerClass(ServerType.MANAGER, r -> FastFateCleanupManager.class); super.configure(cfg, hadoopCoreSite); } diff --git a/test/src/main/java/org/apache/accumulo/test/ample/TestAmpleUtil.java b/test/src/main/java/org/apache/accumulo/test/ample/TestAmpleUtil.java index 8b4283f31db..834e9a10bdc 100644 --- a/test/src/main/java/org/apache/accumulo/test/ample/TestAmpleUtil.java +++ b/test/src/main/java/org/apache/accumulo/test/ample/TestAmpleUtil.java @@ -23,29 +23,29 @@ import java.time.Duration; import org.apache.accumulo.core.util.time.SteadyTime; -import org.apache.accumulo.manager.Manager; +import org.apache.accumulo.manager.tableOps.FateEnv; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.test.ample.metadata.TestAmple.TestServerAmpleImpl; import org.easymock.EasyMock; public class TestAmpleUtil { - public static Manager mockWithAmple(ServerContext context, TestServerAmpleImpl ample) { - Manager manager = EasyMock.mock(Manager.class); - EasyMock.expect(manager.getContext()).andReturn(testAmpleServerContext(context, ample)) + public static FateEnv mockWithAmple(ServerContext context, TestServerAmpleImpl ample) { + FateEnv fateEnv = EasyMock.mock(FateEnv.class); + EasyMock.expect(fateEnv.getContext()).andReturn(testAmpleServerContext(context, ample)) .atLeastOnce(); - EasyMock.replay(manager); - return manager; + EasyMock.replay(fateEnv); + return fateEnv; } - public static Manager mockWithAmple(ServerContext context, TestServerAmpleImpl ample, + public static FateEnv mockWithAmple(ServerContext context, TestServerAmpleImpl ample, Duration currentTime) { - Manager manager = EasyMock.mock(Manager.class); - EasyMock.expect(manager.getContext()).andReturn(testAmpleServerContext(context, ample)) + FateEnv fateEnv = EasyMock.mock(FateEnv.class); + EasyMock.expect(fateEnv.getContext()).andReturn(testAmpleServerContext(context, ample)) .atLeastOnce(); - EasyMock.expect(manager.getSteadyTime()).andReturn(SteadyTime.from(currentTime)).anyTimes(); - EasyMock.replay(manager); - return manager; + EasyMock.expect(fateEnv.getSteadyTime()).andReturn(SteadyTime.from(currentTime)).anyTimes(); + EasyMock.replay(fateEnv); + return fateEnv; } } diff --git a/test/src/main/java/org/apache/accumulo/test/fate/ManagerRepoIT_SimpleSuite.java b/test/src/main/java/org/apache/accumulo/test/fate/ManagerRepoIT_SimpleSuite.java index 1178f2d8ad8..a36f8704460 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/ManagerRepoIT_SimpleSuite.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/ManagerRepoIT_SimpleSuite.java @@ -75,9 +75,9 @@ import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.time.SteadyTime; import org.apache.accumulo.harness.SharedMiniClusterBase; -import org.apache.accumulo.manager.Manager; import org.apache.accumulo.manager.merge.FindMergeableRangeTask.UnmergeableReason; import org.apache.accumulo.manager.tableOps.AbstractFateOperation; +import org.apache.accumulo.manager.tableOps.FateEnv; import org.apache.accumulo.manager.tableOps.compact.CompactionDriver; import org.apache.accumulo.manager.tableOps.merge.DeleteRows; import org.apache.accumulo.manager.tableOps.merge.MergeInfo; @@ -133,7 +133,7 @@ public void testNoWalsMergeRepos(MergeInfo.Operation operation) throws Exception TestServerAmpleImpl testAmple = (TestServerAmpleImpl) TestAmple .create(getCluster().getServerContext(), Map.of(DataLevel.USER, metadataTable)); testAmple.createMetadataFromExisting(client, tableId); - Manager manager = mockWithAmple(getCluster().getServerContext(), testAmple); + FateEnv fateEnv = mockWithAmple(getCluster().getServerContext(), testAmple); // Create a test operation and fate id for testing merge and delete rows // and add operation to test metadata for the tablet @@ -148,15 +148,15 @@ public void testNoWalsMergeRepos(MergeInfo.Operation operation) throws Exception // Build either MergeTablets or DeleteRows repo for testing no WALs, both should check this // condition final MergeInfo mergeInfo = new MergeInfo(tableId, - manager.getContext().getNamespaceId(tableId), null, null, operation); + fateEnv.getContext().getNamespaceId(tableId), null, null, operation); final AbstractFateOperation repo = operation == Operation.MERGE ? new MergeTablets(mergeInfo) : new DeleteRows(mergeInfo); // Also test ReserveTablets isReady() final AbstractFateOperation reserve = new ReserveTablets(mergeInfo); // First, check no errors with the default case - assertEquals(0, reserve.isReady(fateId, manager)); - assertNotNull(repo.call(fateId, manager)); + assertEquals(0, reserve.isReady(fateId, fateEnv)); + assertNotNull(repo.call(fateId, fateEnv)); // Write a WAL to the test metadata and then re-run the repo to check for an error try (TabletsMutator tm = testAmple.mutateTablets()) { @@ -165,10 +165,10 @@ public void testNoWalsMergeRepos(MergeInfo.Operation operation) throws Exception } // Should not be ready due to the presence of a WAL - assertTrue(reserve.isReady(fateId, manager) > 0); + assertTrue(reserve.isReady(fateId, fateEnv) > 0); // Repo should throw an exception due to the WAL existence - var thrown = assertThrows(IllegalStateException.class, () -> repo.call(fateId, manager)); + var thrown = assertThrows(IllegalStateException.class, () -> repo.call(fateId, fateEnv)); assertTrue(thrown.getMessage().contains("has unexpected walogs")); } } @@ -200,57 +200,57 @@ public void testVerifyMergeability() throws Exception { TestServerAmpleImpl testAmple = (TestServerAmpleImpl) TestAmple .create(getCluster().getServerContext(), Map.of(DataLevel.USER, metadataTable)); testAmple.createMetadataFromExisting(client, tableId); - Manager manager = + FateEnv fateEnv = mockWithAmple(getCluster().getServerContext(), testAmple, Duration.ofDays(1)); // Create a test fate id var fateId = FateId.from(FateInstanceType.USER, UUID.randomUUID()); // Tablet c is set to never merge - MergeInfo mergeInfo = new MergeInfo(tableId, manager.getContext().getNamespaceId(tableId), + MergeInfo mergeInfo = new MergeInfo(tableId, fateEnv.getContext().getNamespaceId(tableId), null, new Text("c").getBytes(), Operation.SYSTEM_MERGE); - var repo = new VerifyMergeability(mergeInfo).call(fateId, manager); + var repo = new VerifyMergeability(mergeInfo).call(fateId, fateEnv); assertInstanceOf(UnreserveSystemMerge.class, repo); assertEquals(UnmergeableReason.TABLET_MERGEABILITY, ((UnreserveSystemMerge) repo).getReason()); // Tablets a and b are always merge - mergeInfo = new MergeInfo(tableId, manager.getContext().getNamespaceId(tableId), null, + mergeInfo = new MergeInfo(tableId, fateEnv.getContext().getNamespaceId(tableId), null, new Text("b").getBytes(), Operation.SYSTEM_MERGE); - assertInstanceOf(MergeTablets.class, new VerifyMergeability(mergeInfo).call(fateId, manager)); + assertInstanceOf(MergeTablets.class, new VerifyMergeability(mergeInfo).call(fateId, fateEnv)); - var context = manager.getContext(); + var context = fateEnv.getContext(); // split threshold is 10k so default max merge size is 2500 bytes. // this adds 6 files of 450 each which puts the tablets over teh 2500 threshold addFileMetadata(context, tableId, null, new Text("c"), 3, 450); // Data written to the first two tablets totals 2700 bytes and is too large - repo = new VerifyMergeability(mergeInfo).call(fateId, manager); + repo = new VerifyMergeability(mergeInfo).call(fateId, fateEnv); assertInstanceOf(UnreserveSystemMerge.class, repo); assertEquals(UnmergeableReason.MAX_TOTAL_SIZE, ((UnreserveSystemMerge) repo).getReason()); // Not enough time has passed for Tablet, should be able to merge d and e - mergeInfo = new MergeInfo(tableId, manager.getContext().getNamespaceId(tableId), + mergeInfo = new MergeInfo(tableId, fateEnv.getContext().getNamespaceId(tableId), new Text("c").getBytes(), new Text("e").getBytes(), Operation.SYSTEM_MERGE); - repo = new VerifyMergeability(mergeInfo).call(fateId, manager); + repo = new VerifyMergeability(mergeInfo).call(fateId, fateEnv); assertInstanceOf(UnreserveSystemMerge.class, repo); assertEquals(UnmergeableReason.TABLET_MERGEABILITY, ((UnreserveSystemMerge) repo).getReason()); // update time to 3 days so enough time has passed - manager = mockWithAmple(getCluster().getServerContext(), testAmple, Duration.ofDays(3)); - assertInstanceOf(MergeTablets.class, new VerifyMergeability(mergeInfo).call(fateId, manager)); + fateEnv = mockWithAmple(getCluster().getServerContext(), testAmple, Duration.ofDays(3)); + assertInstanceOf(MergeTablets.class, new VerifyMergeability(mergeInfo).call(fateId, fateEnv)); // last 3 tablets should total 9 files which is < max of 10 - mergeInfo = new MergeInfo(tableId, manager.getContext().getNamespaceId(tableId), + mergeInfo = new MergeInfo(tableId, fateEnv.getContext().getNamespaceId(tableId), new Text("c").getBytes(), null, Operation.SYSTEM_MERGE); addFileMetadata(context, tableId, new Text("c"), null, 3, 10); - assertInstanceOf(MergeTablets.class, new VerifyMergeability(mergeInfo).call(fateId, manager)); + assertInstanceOf(MergeTablets.class, new VerifyMergeability(mergeInfo).call(fateId, fateEnv)); // last 3 tablets should total 12 files which is > max of 10 addFileMetadata(context, tableId, new Text("c"), null, 4, 10); - repo = new VerifyMergeability(mergeInfo).call(fateId, manager); + repo = new VerifyMergeability(mergeInfo).call(fateId, fateEnv); assertInstanceOf(UnreserveSystemMerge.class, repo); assertEquals(UnmergeableReason.MAX_FILE_COUNT, ((UnreserveSystemMerge) repo).getReason()); } @@ -306,7 +306,7 @@ public void testSplitOffline() throws Exception { testAmple.mutateTablet(extent) .putOperation(TabletOperationId.from(TabletOperationType.SPLITTING, fateId)).mutate(); - Manager manager = mockWithAmple(getCluster().getServerContext(), testAmple); + FateEnv fateEnv = mockWithAmple(getCluster().getServerContext(), testAmple); assertEquals(opid, testAmple.readTablet(extent).getOperationId()); @@ -314,7 +314,7 @@ public void testSplitOffline() throws Exception { TabletMergeabilityUtil.systemDefaultSplits(new TreeSet<>(List.of(new Text("sp1")))))); // The repo should delete the opid and throw an exception - assertThrows(ThriftTableOperationException.class, () -> eoRepo.call(fateId, manager)); + assertThrows(ThriftTableOperationException.class, () -> eoRepo.call(fateId, fateEnv)); // the operation id should have been cleaned up before the exception was thrown assertNull(testAmple.readTablet(extent).getOperationId()); @@ -347,11 +347,11 @@ public void testFindSplitsUnsplittable() throws Exception { not(SplitColumnFamily.UNSPLITTABLE_COLUMN)); KeyExtent extent = new KeyExtent(tableId, null, null); - Manager manager = mockWithAmple(getCluster().getServerContext(), testAmple); + FateEnv fateEnv = mockWithAmple(getCluster().getServerContext(), testAmple); FindSplits findSplits = new FindSplits(extent); PreSplit preSplit = (PreSplit) findSplits - .call(FateId.from(FateInstanceType.USER, UUID.randomUUID()), manager); + .call(FateId.from(FateInstanceType.USER, UUID.randomUUID()), fateEnv); // The table should not need splitting assertNull(preSplit); @@ -366,7 +366,7 @@ public void testFindSplitsUnsplittable() throws Exception { findSplits = new FindSplits(extent); preSplit = (PreSplit) findSplits.call(FateId.from(FateInstanceType.USER, UUID.randomUUID()), - manager); + fateEnv); // The table SHOULD now need splitting assertNotNull(preSplit); @@ -408,11 +408,11 @@ public void testFindSplitsDeleteUnsplittable() throws Exception { not(SplitColumnFamily.UNSPLITTABLE_COLUMN)); KeyExtent extent = new KeyExtent(tableId, null, null); - Manager manager = mockWithAmple(getCluster().getServerContext(), testAmple); + FateEnv fateEnv = mockWithAmple(getCluster().getServerContext(), testAmple); FindSplits findSplits = new FindSplits(extent); PreSplit preSplit = (PreSplit) findSplits - .call(FateId.from(FateInstanceType.USER, UUID.randomUUID()), manager); + .call(FateId.from(FateInstanceType.USER, UUID.randomUUID()), fateEnv); // The table should not need splitting assertNull(preSplit); @@ -428,7 +428,7 @@ public void testFindSplitsDeleteUnsplittable() throws Exception { findSplits = new FindSplits(extent); preSplit = (PreSplit) findSplits.call(FateId.from(FateInstanceType.USER, UUID.randomUUID()), - manager); + fateEnv); // The table SHOULD not need splitting assertNull(preSplit); @@ -462,8 +462,8 @@ public void testCompactionDriverCleanup(Pair rangeText) throws Except TestServerAmpleImpl testAmple = (TestServerAmpleImpl) TestAmple .create(getCluster().getServerContext(), Map.of(DataLevel.USER, metadataTable)); testAmple.createMetadataFromExisting(client, tableId); - Manager manager = mockWithAmple(getCluster().getServerContext(), testAmple); - var ctx = manager.getContext(); + FateEnv fateEnv = mockWithAmple(getCluster().getServerContext(), testAmple); + var ctx = fateEnv.getContext(); // Create the CompactionDriver to test with the given range passed into the method final AbstractFateOperation repo = new CompactionDriver(ctx.getNamespaceId(tableId), tableId, @@ -489,7 +489,7 @@ public void testCompactionDriverCleanup(Pair rangeText) throws Except assertEquals(4, extents.size()); // First call undo using the second fateId and verify there's still metadata for the first one - repo.undo(fateId2, manager); + repo.undo(fateId2, fateEnv); try (TabletsMetadata tabletsMetadata = testAmple.readTablets().forTable(tableId).build()) { tabletsMetadata.forEach(tm -> { assertHasCompactionMetadata(fateId1, tm); @@ -499,7 +499,7 @@ public void testCompactionDriverCleanup(Pair rangeText) throws Except // Now call undo on the first fateId which would clean up all the metadata for all the // tablets that overlap with the given range that was provided to the CompactionDriver // during the creation of the repo - repo.undo(fateId1, manager); + repo.undo(fateId1, fateEnv); // First, iterate over only the overlapping tablets and verify that those tablets // were cleaned up and remove any visited tablets from the extents set