Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,8 @@ public boolean contains(FateId fateId) {
}

}

public FateInstanceType getType() {
return start.getType();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -77,7 +76,6 @@
import org.apache.accumulo.core.fate.FateClient;
import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.FateInstanceType;
import org.apache.accumulo.core.fate.FatePartition;
import org.apache.accumulo.core.fate.FateStore;
import org.apache.accumulo.core.fate.TraceRepo;
import org.apache.accumulo.core.fate.user.UserFateStore;
Expand Down Expand Up @@ -122,11 +120,9 @@
import org.apache.accumulo.manager.fate.FateManager;
import org.apache.accumulo.manager.fate.FateWorker;
import org.apache.accumulo.manager.merge.FindMergeableRangeTask;
import org.apache.accumulo.manager.metrics.fate.FateExecutorMetricsProducer;
import org.apache.accumulo.manager.metrics.fate.meta.MetaFateMetrics;
import org.apache.accumulo.manager.metrics.fate.user.UserFateMetrics;
import org.apache.accumulo.manager.recovery.RecoveryManager;
import org.apache.accumulo.manager.split.FileRangeCache;
import org.apache.accumulo.manager.split.Splitter;
import org.apache.accumulo.manager.state.TableCounts;
import org.apache.accumulo.manager.tableOps.FateEnv;
Expand Down Expand Up @@ -178,10 +174,8 @@
* <p>
* The manager will also coordinate log recoveries and reports general status.
*/
// TODO create standalone PrimaryFateEnv class and pull everything into there relatated to
// FateEnv... this will make it much more clear the env is for metadata ops only
public class Manager extends AbstractServer
implements LiveTServerSet.Listener, FateEnv, PrimaryManagerThriftService {
implements LiveTServerSet.Listener, PrimaryManagerThriftService {

static final Logger log = LoggerFactory.getLogger(Manager.class);

Expand Down Expand Up @@ -227,8 +221,6 @@ public class Manager extends AbstractServer
private final CountDownLatch fateReadyLatch = new CountDownLatch(1);
private final AtomicReference<Map<FateInstanceType,FateClient<FateEnv>>> fateClients =
new AtomicReference<>();
private final AtomicReference<Map<FateInstanceType,Fate<FateEnv>>> fateRefs =
new AtomicReference<>();
private volatile FateManager fateManager;

static class TServerStatus {
Expand Down Expand Up @@ -286,7 +278,6 @@ void setTserverStatus(LiveTServersSnapshot snapshot,

private final long timeToCacheRecoveryWalExistence;
private ExecutorService tableInformationStatusPool = null;
private ThreadPoolExecutor tabletRefreshThreadPool;

private final TabletStateStore rootTabletStore;
private final TabletStateStore metadataTabletStore;
Expand Down Expand Up @@ -340,21 +331,15 @@ private void waitForFate() {
}

/**
* Retrieve the Fate object, blocking until it is ready. This could cause problems if Fate
* Retrieve the FateClient object, blocking until it is ready. This could cause problems if Fate
* operations are attempted to be used prior to the Manager being ready for them. If these
* operations are triggered by a client side request from a tserver or client, it should be safe
* to wait to handle those until Fate is ready, but if it occurs during an upgrade, or some other
* time in the Manager before Fate is started, that may result in a deadlock and will need to be
* fixed.
*
* @return the Fate object, only after the fate components are running and ready
* @return the FateClient object, only after the fate components are running and ready
*/
public Fate<FateEnv> fate(FateInstanceType type) {
waitForFate();
var fate = requireNonNull(fateRefs.get(), "fateRefs is not set yet").get(type);
return requireNonNull(fate, () -> "fate type " + type + " is not present");
}

public FateClient<FateEnv> fateClient(FateInstanceType type) {
waitForFate();
var client = requireNonNull(fateClients.get(), "fateClients is not set yet").get(type);
Expand Down Expand Up @@ -497,16 +482,10 @@ int displayUnassigned() {
return result;
}

@Override
public TableManager getTableManager() {
return getContext().getTableManager();
}

@Override
public ThreadPoolExecutor getTabletRefreshThreadPool() {
return tabletRefreshThreadPool;
}

public static void main(String[] args) throws Exception {
AbstractServer.startServer(new Manager(new ServerOpts(), ServerContext::new, args), log);
}
Expand Down Expand Up @@ -585,17 +564,11 @@ ManagerGoalState getManagerGoalState() {
}

private Splitter splitter;
private FileRangeCache fileRangeCache;

public Splitter getSplitter() {
return splitter;
}

@Override
public FileRangeCache getFileRangeCache() {
return fileRangeCache;
}

public UpgradeCoordinator.UpgradeStatus getUpgradeStatus() {
return upgradeCoordinator.getStatus();
}
Expand Down Expand Up @@ -717,14 +690,14 @@ public void run() {
case CLEAN_STOP:
switch (getManagerState()) {
case NORMAL:
fateManager.stop(Duration.ofMinutes(1));
fateManager.stop(FateInstanceType.USER, Duration.ofMinutes(1));
setManagerState(ManagerState.SAFE_MODE);
break;
case SAFE_MODE: {
// META fate stores its data in Zookeeper and its operations interact with
// metadata and root tablets, need to completely shut it down before unloading
// metadata and root tablets
fate(FateInstanceType.META).shutdown(1, MINUTES);
fateManager.stop(FateInstanceType.META, Duration.ofMinutes(1));
int count = nonMetaDataTabletsAssignedOrHosted();
log.debug(
String.format("There are %d non-metadata tablets assigned or hosted", count));
Expand Down Expand Up @@ -953,9 +926,6 @@ private void setupPrimaryMetrics() {
getConfiguration().getTimeInMillis(Property.MANAGER_FATE_METRICS_MIN_UPDATE_INTERVAL)));
metricsInfo.addMetricsProducers(new UserFateMetrics(getContext(),
getConfiguration().getTimeInMillis(Property.MANAGER_FATE_METRICS_MIN_UPDATE_INTERVAL)));
metricsInfo.addMetricsProducers(new FateExecutorMetricsProducer(getContext(),
fate(FateInstanceType.META).getFateExecutors(),
getConfiguration().getTimeInMillis(Property.MANAGER_FATE_METRICS_MIN_UPDATE_INTERVAL)));
metricsInfo.addMetricsProducers(this);
}

Expand Down Expand Up @@ -1045,11 +1015,6 @@ public void run() {
tableInformationStatusPool = ThreadPools.getServerThreadPools()
.createExecutorService(getConfiguration(), Property.MANAGER_STATUS_THREAD_POOL_SIZE, false);

tabletRefreshThreadPool = ThreadPools.getServerThreadPools().getPoolBuilder("Tablet refresh ")
.numCoreThreads(getConfiguration().getCount(Property.MANAGER_TABLET_REFRESH_MINTHREADS))
.numMaxThreads(getConfiguration().getCount(Property.MANAGER_TABLET_REFRESH_MAXTHREADS))
.build();

Thread statusThread = Threads.createCriticalThread("Status Thread", new StatusThread());
statusThread.start();

Expand Down Expand Up @@ -1195,7 +1160,6 @@ boolean canSuspendTablets() {

this.splitter = new Splitter(this);
this.splitter.start();
this.fileRangeCache = new FileRangeCache(context);

setupFate(context);

Expand Down Expand Up @@ -1283,8 +1247,8 @@ boolean canSuspendTablets() {
}

log.debug("Shutting down fate.");
fate(FateInstanceType.META).close();
fateManager.stop(Duration.ZERO);
fateManager.stop(FateInstanceType.USER, Duration.ZERO);
fateManager.stop(FateInstanceType.META, Duration.ZERO);

splitter.stop();

Expand All @@ -1296,7 +1260,6 @@ boolean canSuspendTablets() {
}

tableInformationStatusPool.shutdownNow();
tabletRefreshThreadPool.shutdownNow();

compactionCoordinator.shutdown();

Expand Down Expand Up @@ -1344,12 +1307,10 @@ private void setupFate(ServerContext context) {
lock -> ServiceLock.isLockHeld(context.getZooCache(), lock);
var metaStore = new MetaFateStore<FateEnv>(context.getZooSession(),
primaryManagerLock.getLockID(), isLockHeld);
var metaInstance = createFateInstance(this, metaStore, context);
// configure this instance to process all data
metaInstance.setPartitions(Set.of(FatePartition.all(FateInstanceType.META)));
var metaFateClient = new FateClient<>(metaStore, TraceRepo::toLogString);
var userStore = new UserFateStore<FateEnv>(context, SystemTables.FATE.tableName(),
managerLock.getLockID(), isLockHeld);
var userFateClient = new FateClient<FateEnv>(userStore, TraceRepo::toLogString);
var userFateClient = new FateClient<>(userStore, TraceRepo::toLogString);

var metaCleaner = new FateCleaner<>(metaStore, Duration.ofHours(8), this::getSteadyTime);
ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor()
Expand All @@ -1359,14 +1320,10 @@ private void setupFate(ServerContext context) {
.scheduleWithFixedDelay(userCleaner::ageOff, 10, 4 * 60, MINUTES));

if (!fateClients.compareAndSet(null,
Map.of(FateInstanceType.META, metaInstance, FateInstanceType.USER, userFateClient))) {
Map.of(FateInstanceType.META, metaFateClient, FateInstanceType.USER, userFateClient))) {
throw new IllegalStateException(
"Unexpected previous fateClient reference map already initialized");
}
if (!fateRefs.compareAndSet(null, Map.of(FateInstanceType.META, metaInstance))) {
throw new IllegalStateException(
"Unexpected previous fate reference map already initialized");
}

fateReadyLatch.countDown();
} catch (KeeperException | InterruptedException e) {
Expand Down Expand Up @@ -1522,11 +1479,6 @@ private void getAssistantManagerLock() throws KeeperException, InterruptedExcept
}
}

@Override
public ServiceLock getServiceLock() {
return primaryManagerLock;
}

private ServiceLockData getPrimaryManagerLock(final ServiceLockPath zManagerLoc)
throws KeeperException, InterruptedException {
var zooKeeper = getContext().getZooSession();
Expand Down Expand Up @@ -1647,7 +1599,6 @@ public Set<TableId> onlineTables() {
return result;
}

@Override
public Set<TServerInstance> onlineTabletServers() {
return tserverSet.getSnapshot().getTservers();
}
Expand All @@ -1660,12 +1611,6 @@ public EventCoordinator getEventCoordinator() {
return nextEvent;
}

@Override
public EventPublisher getEventPublisher() {
return nextEvent;
}

@Override
public VolumeManager getVolumeManager() {
return getContext().getVolumeManager();
}
Expand Down Expand Up @@ -1732,7 +1677,6 @@ public Set<TServerInstance> shutdownServers() {
* monotonic clock, which will be approximately consistent between different managers or different
* runs of the same manager. SteadyTime supports both nanoseconds and milliseconds.
*/
@Override
public SteadyTime getSteadyTime() {
return timeKeeper.getTime();
}
Expand Down Expand Up @@ -1760,14 +1704,4 @@ public void registerMetrics(MeterRegistry registry) {
public ServiceLock getLock() {
return managerLock;
}

/**
* Get Threads Pool instance which is used by blocked I/O
*
* @return {@link ExecutorService}
*/
@Override
public ExecutorService getRenamePool() {
return this.renamePool;
}
}
Loading