Skip to content
Merged
2 changes: 2 additions & 0 deletions core/src/main/java/org/apache/accumulo/core/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ public class Constants {
public static final String ZMANAGER_ASSISTANT_LOCK = ZMANAGERS + "/assistants";
public static final String ZMANAGER_GOAL_STATE = ZMANAGERS + "/goal_state";
public static final String ZMANAGER_TICK = ZMANAGERS + "/tick";
public static final String ZMANAGER_ASSIGNMENTS = ZMANAGERS + "/assignments";
public static final String ZMANAGER_FATE_ASSIGNMENTS = ZMANAGER_ASSIGNMENTS + "/fate";

public static final String ZGC = "/gc";
public static final String ZGC_LOCK = ZGC + "/lock";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import java.util.Collections;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
Expand Down Expand Up @@ -1328,15 +1327,12 @@ public void clearTabletLocationCache() {
}

private static Set<String> createPersistentWatcherPaths() {
Set<String> pathsToWatch = new HashSet<>();
for (String path : Set.of(Constants.ZCOMPACTORS, Constants.ZDEADTSERVERS, Constants.ZGC_LOCK,
return Set.of(Constants.ZCOMPACTORS, Constants.ZDEADTSERVERS, Constants.ZGC_LOCK,
Constants.ZMANAGER_LOCK, Constants.ZMINI_LOCK, Constants.ZMONITOR_LOCK,
Constants.ZNAMESPACES, Constants.ZRECOVERY, Constants.ZSSERVERS, Constants.ZTABLES,
Constants.ZTSERVERS, Constants.ZUSERS, RootTable.ZROOT_TABLET, Constants.ZTEST_LOCK,
Constants.ZMANAGER_ASSISTANT_LOCK, Constants.ZRESOURCEGROUPS)) {
pathsToWatch.add(path);
}
return pathsToWatch;
Constants.ZMANAGER_ASSISTANT_LOCK, Constants.ZRESOURCEGROUPS,
Constants.ZMANAGER_ASSIGNMENTS);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.accumulo.server.conf.store.ResourceGroupPropKey;
import org.apache.accumulo.server.conf.store.SystemPropKey;
import org.apache.accumulo.server.log.WalStateManager;
import org.apache.accumulo.server.manager.FateLocations;
import org.apache.accumulo.server.metadata.RootGcCandidates;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
Expand Down Expand Up @@ -180,6 +181,10 @@ void initialize(final ServerContext context, final String rootTabletDirName,
ZooUtil.NodeExistsPolicy.FAIL);
zrwChroot.putPersistentData(Constants.ZSHUTTING_DOWN_TSERVERS, EMPTY_BYTE_ARRAY,
ZooUtil.NodeExistsPolicy.FAIL);
zrwChroot.putPersistentData(Constants.ZMANAGER_ASSIGNMENTS, EMPTY_BYTE_ARRAY,
ZooUtil.NodeExistsPolicy.FAIL);
FateLocations.storeLocations(zrwChroot, Map.of(), ZooUtil.NodeExistsPolicy.FAIL);

}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.server.manager;

import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.stream.Collectors.toUnmodifiableSet;
import static org.apache.accumulo.core.util.LazySingletons.GSON;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.FatePartition;
import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy;
import org.apache.accumulo.server.ServerContext;
import org.apache.zookeeper.KeeperException;

import com.google.common.base.Preconditions;
import com.google.common.net.HostAndPort;
import com.google.common.reflect.TypeToken;

public class FateLocations {

private final ServerContext context;

private long lastUpdateCount;
private Map<HostAndPort,Set<FatePartition>> lastLocations = null;

public FateLocations(ServerContext context) {
this.context = context;
}

public synchronized Map<HostAndPort,Set<FatePartition>> getLocations() {

var zooCache = context.getZooCache();

if (lastLocations == null || lastUpdateCount != zooCache.getUpdateCount()) {
lastUpdateCount = zooCache.getUpdateCount();
var json = new String(context.getZooCache().get(Constants.ZMANAGER_FATE_ASSIGNMENTS), UTF_8);
var type = new TypeToken<Map<String,List<List<String>>>>() {}.getType();
Map<String,List<List<String>>> stringMap = GSON.get().fromJson(json, type);
Map<HostAndPort,Set<FatePartition>> locations = new HashMap<>();
stringMap.forEach((hp, parts) -> {
var partsSet = parts.stream().peek(part -> Preconditions.checkArgument(part.size() == 2))
.map(part -> new FatePartition(FateId.from(part.get(0)), FateId.from(part.get(1))))
.collect(toUnmodifiableSet());
locations.put(HostAndPort.fromString(hp), partsSet);
});
lastLocations = Map.copyOf(locations);
}

return lastLocations;
}

private static byte[] serialize(Map<HostAndPort,Set<FatePartition>> assignments) {
Map<String,List<List<String>>> jsonMap = new HashMap<>();
assignments.forEach((hp, parts) -> {
var listParts = parts.stream()
.map(part -> List.of(part.start().canonical(), part.end().canonical())).toList();
jsonMap.put(hp.toString(), listParts);
});

var json = GSON.get().toJson(jsonMap);
return json.getBytes(UTF_8);
}

public static void storeLocations(ZooReaderWriter zoo,
Map<HostAndPort,Set<FatePartition>> assignments, NodeExistsPolicy nodeExistsPolicy) {
try {
zoo.putPersistentData(Constants.ZMANAGER_FATE_ASSIGNMENTS, serialize(assignments),
nodeExistsPolicy);
} catch (KeeperException | InterruptedException e) {
throw new IllegalStateException("Unable to set fate locations in zookeeper", e);
}
}

public static void storeLocations(ServerContext context,
Map<HostAndPort,Set<FatePartition>> assignments) {
try {
context.getZooSession().setData(Constants.ZMANAGER_FATE_ASSIGNMENTS, serialize(assignments),
-1);
} catch (KeeperException | InterruptedException e) {
throw new IllegalStateException("Unable to set fate locations in zookeeper", e);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,15 @@
import java.util.SortedMap;
import java.util.UUID;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiFunction;
import java.util.function.Predicate;
import java.util.function.Supplier;
Expand Down Expand Up @@ -118,6 +119,7 @@
import org.apache.accumulo.core.zookeeper.ZcStat;
import org.apache.accumulo.manager.compaction.coordinator.CompactionCoordinator;
import org.apache.accumulo.manager.fate.FateManager;
import org.apache.accumulo.manager.fate.FateNotifier;
import org.apache.accumulo.manager.fate.FateWorker;
import org.apache.accumulo.manager.merge.FindMergeableRangeTask;
import org.apache.accumulo.manager.metrics.fate.meta.MetaFateMetrics;
Expand All @@ -129,6 +131,7 @@
import org.apache.accumulo.manager.upgrade.UpgradeCoordinator;
import org.apache.accumulo.manager.upgrade.UpgradeCoordinator.UpgradeStatus;
import org.apache.accumulo.server.AbstractServer;
import org.apache.accumulo.server.AccumuloDataVersion;
import org.apache.accumulo.server.PrimaryManagerThriftService;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.compaction.CompactionConfigStorage;
Expand Down Expand Up @@ -215,10 +218,6 @@ public class Manager extends AbstractServer

private ManagerState state = ManagerState.INITIAL;

// fateReadyLatch and fateRefs go together; when this latch is ready, then the fate references
// should already have been set; ConcurrentHashMap will guarantee that all threads will see
// the initialized fate references after the latch is ready
private final CountDownLatch fateReadyLatch = new CountDownLatch(1);
private final AtomicReference<Map<FateInstanceType,FateClient<FateEnv>>> fateClients =
new AtomicReference<>();
private volatile FateManager fateManager;
Expand Down Expand Up @@ -307,28 +306,7 @@ public boolean stillManager() {
return getManagerState() != ManagerState.STOP;
}

private void waitForFate() {
try {
// block up to 30 seconds until it's ready; if it's still not ready, introduce some logging
if (!fateReadyLatch.await(30, SECONDS)) {
String msgPrefix = "Unexpected use of fate in thread " + Thread.currentThread().getName()
+ " at time " + System.currentTimeMillis();
// include stack trace so we know where it's coming from, in case we need to troubleshoot it
log.warn("{} blocked until fate starts", msgPrefix,
new IllegalStateException("Attempted fate action before manager finished starting up; "
+ "if this doesn't make progress, please report it as a bug to the developers"));
int minutes = 0;
while (!fateReadyLatch.await(5, MINUTES)) {
minutes += 5;
log.warn("{} still blocked after {} minutes; this is getting weird", msgPrefix, minutes);
}
log.debug("{} no longer blocked", msgPrefix);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException("Thread was interrupted; cannot proceed");
}
}
private final Lock fateSetupLock = new ReentrantLock();

/**
* Retrieve the FateClient object, blocking until it is ready. This could cause problems if Fate
Expand All @@ -341,7 +319,23 @@ private void waitForFate() {
* @return the FateClient object, only after the fate components are running and ready
*/
public FateClient<FateEnv> fateClient(FateInstanceType type) {
waitForFate();
if (fateClients.get() == null) {
// only want one thread trying to setup fate, lots of threads could call this before its setup
fateSetupLock.lock();
try {
// check to see if another thread setup fate while we were waiting on the lock
if (fateClients.get() == null) {
// wait for upgrade to be complete
while (AccumuloDataVersion.getCurrentVersion(getContext()) < AccumuloDataVersion.get()) {
log.info("Attempted use of fate before upgrade complete, waiting for upgrade");
UtilWaitThread.sleep(5000);
}
setupFate(getContext());
}
} finally {
fateSetupLock.unlock();
}
}
var client = requireNonNull(fateClients.get(), "fateClients is not set yet").get(type);
return requireNonNull(client, () -> "fate client type " + type + " is not present");
}
Expand Down Expand Up @@ -921,7 +915,6 @@ private void setupPrimaryMetrics() {
watchers.forEach(watcher -> metricsInfo.addMetricsProducers(watcher.getMetrics()));
metricsInfo.addMetricsProducers(requireNonNull(compactionCoordinator));
// ensure fate is completely setup
Preconditions.checkState(fateReadyLatch.getCount() == 0);
metricsInfo.addMetricsProducers(new MetaFateMetrics(getContext(),
getConfiguration().getTimeInMillis(Property.MANAGER_FATE_METRICS_MIN_UPDATE_INTERVAL)));
metricsInfo.addMetricsProducers(new UserFateMetrics(getContext(),
Expand Down Expand Up @@ -1165,7 +1158,7 @@ boolean canSuspendTablets() {

fateManager = new FateManager(getContext());
fateManager.start();
fateClient(FateInstanceType.USER).setSeedingConsumer(fateManager::notifySeeded);
startFateMaintenance();

setupPrimaryMetrics();

Expand Down Expand Up @@ -1303,29 +1296,51 @@ protected Fate<FateEnv> createFateInstance(FateEnv env, FateStore<FateEnv> store

private void setupFate(ServerContext context) {
try {

Predicate<ZooUtil.LockID> isLockHeld =
lock -> ServiceLock.isLockHeld(context.getZooCache(), lock);
var metaStore = new MetaFateStore<FateEnv>(context.getZooSession(),
primaryManagerLock.getLockID(), isLockHeld);
var metaStore =
new MetaFateStore<FateEnv>(context.getZooSession(), managerLock.getLockID(), isLockHeld);
var metaFateClient = new FateClient<>(metaStore, TraceRepo::toLogString);
var userStore = new UserFateStore<FateEnv>(context, SystemTables.FATE.tableName(),
managerLock.getLockID(), isLockHeld);
var userFateClient = new FateClient<>(userStore, TraceRepo::toLogString);

var metaCleaner = new FateCleaner<>(metaStore, Duration.ofHours(8), this::getSteadyTime);
ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor()
.scheduleWithFixedDelay(metaCleaner::ageOff, 10, 4 * 60, MINUTES));
var userCleaner = new FateCleaner<>(userStore, Duration.ofHours(8), this::getSteadyTime);
ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor()
.scheduleWithFixedDelay(userCleaner::ageOff, 10, 4 * 60, MINUTES));
// wire up notifying the correct manager when a fate operation is seeded
FateNotifier fateNotifier = new FateNotifier(context);
fateNotifier.start();
metaFateClient.setSeedingConsumer(fateNotifier::notifySeeded);
userFateClient.setSeedingConsumer(fateNotifier::notifySeeded);

if (!fateClients.compareAndSet(null,
Map.of(FateInstanceType.META, metaFateClient, FateInstanceType.USER, userFateClient))) {
throw new IllegalStateException(
"Unexpected previous fateClient reference map already initialized");
}
} catch (KeeperException | InterruptedException e) {
throw new IllegalStateException("Exception setting up Fate clients", e);
}
}

/**
* Run fate maintenance task that only run in the primary manager.
*/
private void startFateMaintenance() {
try {
Predicate<ZooUtil.LockID> isLockHeld =
lock -> ServiceLock.isLockHeld(getContext().getZooCache(), lock);

var metaStore = new MetaFateStore<FateEnv>(getContext().getZooSession(),
managerLock.getLockID(), isLockHeld);
var userStore = new UserFateStore<FateEnv>(getContext(), SystemTables.FATE.tableName(),
managerLock.getLockID(), isLockHeld);

fateReadyLatch.countDown();
var metaCleaner = new FateCleaner<>(metaStore, Duration.ofHours(8), this::getSteadyTime);
ThreadPools.watchCriticalScheduledTask(getContext().getScheduledExecutor()
.scheduleWithFixedDelay(metaCleaner::ageOff, 10, 4 * 60, MINUTES));
var userCleaner = new FateCleaner<>(userStore, Duration.ofHours(8), this::getSteadyTime);
ThreadPools.watchCriticalScheduledTask(getContext().getScheduledExecutor()
.scheduleWithFixedDelay(userCleaner::ageOff, 10, 4 * 60, MINUTES));
} catch (KeeperException | InterruptedException e) {
throw new IllegalStateException("Exception setting up FaTE cleanup thread", e);
}
Expand Down
Loading