diff --git a/core/src/main/java/org/apache/accumulo/core/Constants.java b/core/src/main/java/org/apache/accumulo/core/Constants.java index d2c556f2c2a..bb6e49eda45 100644 --- a/core/src/main/java/org/apache/accumulo/core/Constants.java +++ b/core/src/main/java/org/apache/accumulo/core/Constants.java @@ -52,6 +52,8 @@ public class Constants { public static final String ZMANAGER_ASSISTANT_LOCK = ZMANAGERS + "/assistants"; public static final String ZMANAGER_GOAL_STATE = ZMANAGERS + "/goal_state"; public static final String ZMANAGER_TICK = ZMANAGERS + "/tick"; + public static final String ZMANAGER_ASSIGNMENTS = ZMANAGERS + "/assignments"; + public static final String ZMANAGER_FATE_ASSIGNMENTS = ZMANAGER_ASSIGNMENTS + "/fate"; public static final String ZGC = "/gc"; public static final String ZGC_LOCK = ZGC + "/lock"; diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java index 8b65091c491..c9a1fb4d610 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java @@ -37,7 +37,6 @@ import java.util.Collections; import java.util.EnumMap; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -1328,15 +1327,12 @@ public void clearTabletLocationCache() { } private static Set createPersistentWatcherPaths() { - Set pathsToWatch = new HashSet<>(); - for (String path : Set.of(Constants.ZCOMPACTORS, Constants.ZDEADTSERVERS, Constants.ZGC_LOCK, + return Set.of(Constants.ZCOMPACTORS, Constants.ZDEADTSERVERS, Constants.ZGC_LOCK, Constants.ZMANAGER_LOCK, Constants.ZMINI_LOCK, Constants.ZMONITOR_LOCK, Constants.ZNAMESPACES, Constants.ZRECOVERY, Constants.ZSSERVERS, Constants.ZTABLES, Constants.ZTSERVERS, Constants.ZUSERS, RootTable.ZROOT_TABLET, Constants.ZTEST_LOCK, - Constants.ZMANAGER_ASSISTANT_LOCK, Constants.ZRESOURCEGROUPS)) { - pathsToWatch.add(path); - } - return pathsToWatch; + Constants.ZMANAGER_ASSISTANT_LOCK, Constants.ZRESOURCEGROUPS, + Constants.ZMANAGER_ASSIGNMENTS); } } diff --git a/server/base/src/main/java/org/apache/accumulo/server/init/ZooKeeperInitializer.java b/server/base/src/main/java/org/apache/accumulo/server/init/ZooKeeperInitializer.java index b23c6746803..9d6a59607d3 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/init/ZooKeeperInitializer.java +++ b/server/base/src/main/java/org/apache/accumulo/server/init/ZooKeeperInitializer.java @@ -50,6 +50,7 @@ import org.apache.accumulo.server.conf.store.ResourceGroupPropKey; import org.apache.accumulo.server.conf.store.SystemPropKey; import org.apache.accumulo.server.log.WalStateManager; +import org.apache.accumulo.server.manager.FateLocations; import org.apache.accumulo.server.metadata.RootGcCandidates; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooDefs; @@ -180,6 +181,10 @@ void initialize(final ServerContext context, final String rootTabletDirName, ZooUtil.NodeExistsPolicy.FAIL); zrwChroot.putPersistentData(Constants.ZSHUTTING_DOWN_TSERVERS, EMPTY_BYTE_ARRAY, ZooUtil.NodeExistsPolicy.FAIL); + zrwChroot.putPersistentData(Constants.ZMANAGER_ASSIGNMENTS, EMPTY_BYTE_ARRAY, + ZooUtil.NodeExistsPolicy.FAIL); + FateLocations.storeLocations(zrwChroot, Map.of(), ZooUtil.NodeExistsPolicy.FAIL); + } /** diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/FateLocations.java b/server/base/src/main/java/org/apache/accumulo/server/manager/FateLocations.java new file mode 100644 index 00000000000..4a71ff2e085 --- /dev/null +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/FateLocations.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.server.manager; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.stream.Collectors.toUnmodifiableSet; +import static org.apache.accumulo.core.util.LazySingletons.GSON; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.FatePartition; +import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; +import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; +import org.apache.accumulo.server.ServerContext; +import org.apache.zookeeper.KeeperException; + +import com.google.common.base.Preconditions; +import com.google.common.net.HostAndPort; +import com.google.common.reflect.TypeToken; + +public class FateLocations { + + private final ServerContext context; + + private long lastUpdateCount; + private Map> lastLocations = null; + + public FateLocations(ServerContext context) { + this.context = context; + } + + public synchronized Map> getLocations() { + + var zooCache = context.getZooCache(); + + if (lastLocations == null || lastUpdateCount != zooCache.getUpdateCount()) { + lastUpdateCount = zooCache.getUpdateCount(); + var json = new String(context.getZooCache().get(Constants.ZMANAGER_FATE_ASSIGNMENTS), UTF_8); + var type = new TypeToken>>>() {}.getType(); + Map>> stringMap = GSON.get().fromJson(json, type); + Map> locations = new HashMap<>(); + stringMap.forEach((hp, parts) -> { + var partsSet = parts.stream().peek(part -> Preconditions.checkArgument(part.size() == 2)) + .map(part -> new FatePartition(FateId.from(part.get(0)), FateId.from(part.get(1)))) + .collect(toUnmodifiableSet()); + locations.put(HostAndPort.fromString(hp), partsSet); + }); + lastLocations = Map.copyOf(locations); + } + + return lastLocations; + } + + private static byte[] serialize(Map> assignments) { + Map>> jsonMap = new HashMap<>(); + assignments.forEach((hp, parts) -> { + var listParts = parts.stream() + .map(part -> List.of(part.start().canonical(), part.end().canonical())).toList(); + jsonMap.put(hp.toString(), listParts); + }); + + var json = GSON.get().toJson(jsonMap); + return json.getBytes(UTF_8); + } + + public static void storeLocations(ZooReaderWriter zoo, + Map> assignments, NodeExistsPolicy nodeExistsPolicy) { + try { + zoo.putPersistentData(Constants.ZMANAGER_FATE_ASSIGNMENTS, serialize(assignments), + nodeExistsPolicy); + } catch (KeeperException | InterruptedException e) { + throw new IllegalStateException("Unable to set fate locations in zookeeper", e); + } + } + + public static void storeLocations(ServerContext context, + Map> assignments) { + try { + context.getZooSession().setData(Constants.ZMANAGER_FATE_ASSIGNMENTS, serialize(assignments), + -1); + } catch (KeeperException | InterruptedException e) { + throw new IllegalStateException("Unable to set fate locations in zookeeper", e); + } + } + +} diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index 204de1d79d9..9fb59deb181 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -46,7 +46,6 @@ import java.util.SortedMap; import java.util.UUID; import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; @@ -54,6 +53,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.BiFunction; import java.util.function.Predicate; import java.util.function.Supplier; @@ -118,6 +119,7 @@ import org.apache.accumulo.core.zookeeper.ZcStat; import org.apache.accumulo.manager.compaction.coordinator.CompactionCoordinator; import org.apache.accumulo.manager.fate.FateManager; +import org.apache.accumulo.manager.fate.FateNotifier; import org.apache.accumulo.manager.fate.FateWorker; import org.apache.accumulo.manager.merge.FindMergeableRangeTask; import org.apache.accumulo.manager.metrics.fate.meta.MetaFateMetrics; @@ -129,6 +131,7 @@ import org.apache.accumulo.manager.upgrade.UpgradeCoordinator; import org.apache.accumulo.manager.upgrade.UpgradeCoordinator.UpgradeStatus; import org.apache.accumulo.server.AbstractServer; +import org.apache.accumulo.server.AccumuloDataVersion; import org.apache.accumulo.server.PrimaryManagerThriftService; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.compaction.CompactionConfigStorage; @@ -215,10 +218,6 @@ public class Manager extends AbstractServer private ManagerState state = ManagerState.INITIAL; - // fateReadyLatch and fateRefs go together; when this latch is ready, then the fate references - // should already have been set; ConcurrentHashMap will guarantee that all threads will see - // the initialized fate references after the latch is ready - private final CountDownLatch fateReadyLatch = new CountDownLatch(1); private final AtomicReference>> fateClients = new AtomicReference<>(); private volatile FateManager fateManager; @@ -307,28 +306,7 @@ public boolean stillManager() { return getManagerState() != ManagerState.STOP; } - private void waitForFate() { - try { - // block up to 30 seconds until it's ready; if it's still not ready, introduce some logging - if (!fateReadyLatch.await(30, SECONDS)) { - String msgPrefix = "Unexpected use of fate in thread " + Thread.currentThread().getName() - + " at time " + System.currentTimeMillis(); - // include stack trace so we know where it's coming from, in case we need to troubleshoot it - log.warn("{} blocked until fate starts", msgPrefix, - new IllegalStateException("Attempted fate action before manager finished starting up; " - + "if this doesn't make progress, please report it as a bug to the developers")); - int minutes = 0; - while (!fateReadyLatch.await(5, MINUTES)) { - minutes += 5; - log.warn("{} still blocked after {} minutes; this is getting weird", msgPrefix, minutes); - } - log.debug("{} no longer blocked", msgPrefix); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IllegalStateException("Thread was interrupted; cannot proceed"); - } - } + private final Lock fateSetupLock = new ReentrantLock(); /** * Retrieve the FateClient object, blocking until it is ready. This could cause problems if Fate @@ -341,7 +319,23 @@ private void waitForFate() { * @return the FateClient object, only after the fate components are running and ready */ public FateClient fateClient(FateInstanceType type) { - waitForFate(); + if (fateClients.get() == null) { + // only want one thread trying to setup fate, lots of threads could call this before its setup + fateSetupLock.lock(); + try { + // check to see if another thread setup fate while we were waiting on the lock + if (fateClients.get() == null) { + // wait for upgrade to be complete + while (AccumuloDataVersion.getCurrentVersion(getContext()) < AccumuloDataVersion.get()) { + log.info("Attempted use of fate before upgrade complete, waiting for upgrade"); + UtilWaitThread.sleep(5000); + } + setupFate(getContext()); + } + } finally { + fateSetupLock.unlock(); + } + } var client = requireNonNull(fateClients.get(), "fateClients is not set yet").get(type); return requireNonNull(client, () -> "fate client type " + type + " is not present"); } @@ -921,7 +915,6 @@ private void setupPrimaryMetrics() { watchers.forEach(watcher -> metricsInfo.addMetricsProducers(watcher.getMetrics())); metricsInfo.addMetricsProducers(requireNonNull(compactionCoordinator)); // ensure fate is completely setup - Preconditions.checkState(fateReadyLatch.getCount() == 0); metricsInfo.addMetricsProducers(new MetaFateMetrics(getContext(), getConfiguration().getTimeInMillis(Property.MANAGER_FATE_METRICS_MIN_UPDATE_INTERVAL))); metricsInfo.addMetricsProducers(new UserFateMetrics(getContext(), @@ -1165,7 +1158,7 @@ boolean canSuspendTablets() { fateManager = new FateManager(getContext()); fateManager.start(); - fateClient(FateInstanceType.USER).setSeedingConsumer(fateManager::notifySeeded); + startFateMaintenance(); setupPrimaryMetrics(); @@ -1303,29 +1296,51 @@ protected Fate createFateInstance(FateEnv env, FateStore store private void setupFate(ServerContext context) { try { + Predicate isLockHeld = lock -> ServiceLock.isLockHeld(context.getZooCache(), lock); - var metaStore = new MetaFateStore(context.getZooSession(), - primaryManagerLock.getLockID(), isLockHeld); + var metaStore = + new MetaFateStore(context.getZooSession(), managerLock.getLockID(), isLockHeld); var metaFateClient = new FateClient<>(metaStore, TraceRepo::toLogString); var userStore = new UserFateStore(context, SystemTables.FATE.tableName(), managerLock.getLockID(), isLockHeld); var userFateClient = new FateClient<>(userStore, TraceRepo::toLogString); - var metaCleaner = new FateCleaner<>(metaStore, Duration.ofHours(8), this::getSteadyTime); - ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor() - .scheduleWithFixedDelay(metaCleaner::ageOff, 10, 4 * 60, MINUTES)); - var userCleaner = new FateCleaner<>(userStore, Duration.ofHours(8), this::getSteadyTime); - ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor() - .scheduleWithFixedDelay(userCleaner::ageOff, 10, 4 * 60, MINUTES)); + // wire up notifying the correct manager when a fate operation is seeded + FateNotifier fateNotifier = new FateNotifier(context); + fateNotifier.start(); + metaFateClient.setSeedingConsumer(fateNotifier::notifySeeded); + userFateClient.setSeedingConsumer(fateNotifier::notifySeeded); if (!fateClients.compareAndSet(null, Map.of(FateInstanceType.META, metaFateClient, FateInstanceType.USER, userFateClient))) { throw new IllegalStateException( "Unexpected previous fateClient reference map already initialized"); } + } catch (KeeperException | InterruptedException e) { + throw new IllegalStateException("Exception setting up Fate clients", e); + } + } + + /** + * Run fate maintenance task that only run in the primary manager. + */ + private void startFateMaintenance() { + try { + Predicate isLockHeld = + lock -> ServiceLock.isLockHeld(getContext().getZooCache(), lock); + + var metaStore = new MetaFateStore(getContext().getZooSession(), + managerLock.getLockID(), isLockHeld); + var userStore = new UserFateStore(getContext(), SystemTables.FATE.tableName(), + managerLock.getLockID(), isLockHeld); - fateReadyLatch.countDown(); + var metaCleaner = new FateCleaner<>(metaStore, Duration.ofHours(8), this::getSteadyTime); + ThreadPools.watchCriticalScheduledTask(getContext().getScheduledExecutor() + .scheduleWithFixedDelay(metaCleaner::ageOff, 10, 4 * 60, MINUTES)); + var userCleaner = new FateCleaner<>(userStore, Duration.ofHours(8), this::getSteadyTime); + ThreadPools.watchCriticalScheduledTask(getContext().getScheduledExecutor() + .scheduleWithFixedDelay(userCleaner::ageOff, 10, 4 * 60, MINUTES)); } catch (KeeperException | InterruptedException e) { throw new IllegalStateException("Exception setting up FaTE cleanup thread", e); } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/fate/FateManager.java b/server/manager/src/main/java/org/apache/accumulo/manager/fate/FateManager.java index a791da32298..7ae13cbe816 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/fate/FateManager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/fate/FateManager.java @@ -26,7 +26,6 @@ import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import org.apache.accumulo.core.fate.FateId; @@ -45,16 +44,14 @@ import org.apache.accumulo.core.util.threads.Threads; import org.apache.accumulo.manager.tableOps.FateEnv; import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.server.manager.FateLocations; import org.apache.thrift.TException; import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; -import com.google.common.collect.Range; -import com.google.common.collect.RangeMap; import com.google.common.collect.Sets; -import com.google.common.collect.TreeRangeMap; import com.google.common.net.HostAndPort; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; @@ -80,17 +77,10 @@ public FateManager(ServerContext context) { private final AtomicBoolean stop = new AtomicBoolean(false); - record FateHostPartition(HostAndPort hostPort, FatePartition partition) { - } - - private final AtomicReference> stableAssignments = - new AtomicReference<>(TreeRangeMap.create()); - - private final Map> pendingNotifications = new HashMap<>(); - private void manageAssistants() { log.debug("Started Fate Manager"); long stableCount = 0; + long unstableCount = 0; outer: while (!stop.get()) { try { long sleepTime = Math.min(stableCount * 100, 5_000); @@ -113,18 +103,18 @@ private void manageAssistants() { computeDesiredAssignments(currentAssignments, desiredParititions); if (desired.equals(currentAssignments)) { - RangeMap rangeMap = TreeRangeMap.create(); - currentAssignments.forEach((hostAndPort, partitions) -> { - partitions.forEach(partition -> { - rangeMap.put(Range.closed(partition.start(), partition.end()), - new FateHostPartition(hostAndPort, partition)); - }); - }); - stableAssignments.set(rangeMap); + if (stableCount == 0) { + FateLocations.storeLocations(context, currentAssignments); + } stableCount++; + unstableCount = 0; + continue; } else { - stableAssignments.set(TreeRangeMap.create()); + if (unstableCount == 0) { + FateLocations.storeLocations(context, Map.of()); + } stableCount = 0; + unstableCount++; } // are there any workers with extra partitions? If so need to unload those first. @@ -175,18 +165,13 @@ private void manageAssistants() { } private Thread assignmentThread = null; - private Thread ntfyThread = null; public synchronized void start() { Preconditions.checkState(assignmentThread == null); - Preconditions.checkState(ntfyThread == null); Preconditions.checkState(!stop.get()); assignmentThread = Threads.createCriticalThread("Fate Manager", this::manageAssistants); assignmentThread.start(); - - ntfyThread = Threads.createCriticalThread("Fate Notify", new NotifyTask()); - ntfyThread.start(); } @SuppressFBWarnings(value = "SWL_SLEEP_WITH_LOCK_HELD", @@ -203,9 +188,6 @@ public synchronized void stop(FateInstanceType fateType, Duration timeout) { if (assignmentThread != null) { assignmentThread.join(); } - if (ntfyThread != null) { - ntfyThread.join(); - } } catch (InterruptedException e) { throw new IllegalStateException(e); } @@ -232,8 +214,6 @@ public synchronized void stop(FateInstanceType fateType, Duration timeout) { } } - stableAssignments.set(TreeRangeMap.create()); - if (!timer.isExpired()) { FateStore store = switch (fateType) { case USER -> new UserFateStore(context, SystemTables.FATE.tableName(), null, null); @@ -261,58 +241,6 @@ public synchronized void stop(FateInstanceType fateType, Duration timeout) { } } - /** - * Makes a best effort to notify this fate operation was seeded. - */ - public void notifySeeded(FateId fateId) { - var hostPartition = stableAssignments.get().get(fateId); - if (hostPartition != null) { - synchronized (pendingNotifications) { - pendingNotifications.computeIfAbsent(hostPartition.hostPort(), k -> new HashSet<>()) - .add(hostPartition.partition()); - pendingNotifications.notify(); - } - } - } - - private class NotifyTask implements Runnable { - - @Override - public void run() { - while (!stop.get()) { - try { - Map> copy; - synchronized (pendingNotifications) { - if (pendingNotifications.isEmpty()) { - pendingNotifications.wait(100); - } - copy = Map.copyOf(pendingNotifications); - pendingNotifications.clear(); - } - - for (var entry : copy.entrySet()) { - HostAndPort address = entry.getKey(); - Set partitions = entry.getValue(); - FateWorkerService.Client client = - ThriftUtil.getClient(ThriftClientTypes.FATE_WORKER, address, context); - try { - log.trace("Notifying about seeding {} {}", address, partitions); - client.seeded(TraceUtil.traceInfo(), context.rpcCreds(), - partitions.stream().map(FatePartition::toThrift).toList()); - } finally { - ThriftUtil.returnClient(client, context); - } - } - - } catch (InterruptedException e) { - throw new IllegalStateException(e); - } catch (TException e) { - log.warn("Failed to send notification that fate was seeded", e); - } - } - } - } - /** * Sets the complete set of partitions an assistant manager should work on. It will only succeed * if the update id is valid. The update id avoids race conditions w/ previously queued network diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/fate/FateNotifier.java b/server/manager/src/main/java/org/apache/accumulo/manager/fate/FateNotifier.java new file mode 100644 index 00000000000..5789832277a --- /dev/null +++ b/server/manager/src/main/java/org/apache/accumulo/manager/fate/FateNotifier.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.manager.fate; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.FatePartition; +import org.apache.accumulo.core.manager.thrift.FateWorkerService; +import org.apache.accumulo.core.rpc.ThriftUtil; +import org.apache.accumulo.core.rpc.clients.ThriftClientTypes; +import org.apache.accumulo.core.trace.TraceUtil; +import org.apache.accumulo.core.util.threads.Threads; +import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.server.manager.FateLocations; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Range; +import com.google.common.collect.RangeMap; +import com.google.common.collect.TreeRangeMap; +import com.google.common.net.HostAndPort; + +/** + * Responsible for sending notifications that fate operations were seeded between managers. These + * notifications are best effort, it's ok if they are lost. When lost it means fate operations will + * not be as responsive, but they will still eventually run. These notifications are important for + * interactive use of Accumulo where something like create table run in the shell should be + * responsive. Responsiveness is also important for Accumulo's integration test. + */ +public class FateNotifier { + + private static final Logger log = LoggerFactory.getLogger(FateNotifier.class); + + private final Map> pendingNotifications = new HashMap<>(); + private final ServerContext context; + private final AtomicBoolean stop = new AtomicBoolean(); + private final FateLocations fateLocations; + + private Map> lastLocations; + private RangeMap hostMapping; + + private Thread ntfyThread; + + public FateNotifier(ServerContext context) { + this.context = context; + this.fateLocations = new FateLocations(context); + } + + public synchronized void start() { + Preconditions.checkState(ntfyThread == null); + ntfyThread = Threads.createCriticalThread("Fate Notification Sender", new NotifyTask()); + ntfyThread.start(); + } + + record FateHostPartition(HostAndPort hostPort, FatePartition partition) { + } + + private synchronized RangeMap getHostMapping() { + + if (hostMapping == null || lastLocations != fateLocations.getLocations()) { + lastLocations = fateLocations.getLocations(); + RangeMap rangeMap = TreeRangeMap.create(); + lastLocations.forEach((hostAndPort, partitions) -> { + partitions.forEach(partition -> { + rangeMap.put(Range.closed(partition.start(), partition.end()), + new FateHostPartition(hostAndPort, partition)); + }); + }); + hostMapping = rangeMap; + } + + return hostMapping; + } + + /** + * Makes a best effort to notify the appropriate manager this fate operation was seeded. + */ + public void notifySeeded(FateId fateId) { + var hostPartition = getHostMapping().get(fateId); + if (hostPartition != null) { + synchronized (pendingNotifications) { + pendingNotifications.computeIfAbsent(hostPartition.hostPort(), k -> new HashSet<>()) + .add(hostPartition.partition()); + pendingNotifications.notify(); + } + } + } + + private class NotifyTask implements Runnable { + + @Override + public void run() { + while (!stop.get()) { + try { + Map> copy; + synchronized (pendingNotifications) { + if (pendingNotifications.isEmpty()) { + pendingNotifications.wait(100); + } + copy = Map.copyOf(pendingNotifications); + pendingNotifications.clear(); + } + + for (var entry : copy.entrySet()) { + HostAndPort address = entry.getKey(); + Set partitions = entry.getValue(); + FateWorkerService.Client client = + ThriftUtil.getClient(ThriftClientTypes.FATE_WORKER, address, context); + try { + log.trace("Notifying about seeding {} {}", address, partitions); + client.seeded(TraceUtil.traceInfo(), context.rpcCreds(), + partitions.stream().map(FatePartition::toThrift).toList()); + } finally { + ThriftUtil.returnClient(client, context); + } + } + + } catch (InterruptedException e) { + throw new IllegalStateException(e); + } catch (TException e) { + log.warn("Failed to send notification that fate was seeded", e); + } + } + } + } + +}