Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ class BalancerClusterState {
private int[] regionServerIndexWithBestRegionCachedRatio;
// Maps regionName -> oldServerName -> cache ratio of the region on the old server
Map<String, Pair<ServerName, Float>> regionCacheRatioOnOldServerMap;
// cache free space available on each server, aligned to the "servers" array indices;
long[] serverBlockCacheFreeSize;

private final Supplier<List<Integer>> shuffledServerIndicesSupplier =
Suppliers.memoizeWithExpiration(() -> {
Expand All @@ -148,20 +150,23 @@ public String getRack(ServerName server) {
BalancerClusterState(Map<ServerName, List<RegionInfo>> clusterState,
Map<String, Deque<BalancerRegionLoad>> loads, RegionHDFSBlockLocationFinder regionFinder,
RackManager rackManager) {
this(null, clusterState, loads, regionFinder, rackManager, null);
this(null, clusterState, loads, regionFinder, rackManager, null, null);
}

protected BalancerClusterState(Map<ServerName, List<RegionInfo>> clusterState,
Map<String, Deque<BalancerRegionLoad>> loads, RegionHDFSBlockLocationFinder regionFinder,
RackManager rackManager, Map<String, Pair<ServerName, Float>> oldRegionServerRegionCacheRatio) {
this(null, clusterState, loads, regionFinder, rackManager, oldRegionServerRegionCacheRatio);
RackManager rackManager, Map<String, Pair<ServerName, Float>> oldRegionServerRegionCacheRatio,
Map<ServerName, Long> serverBlockCacheFreeByServer) {
this(null, clusterState, loads, regionFinder, rackManager, oldRegionServerRegionCacheRatio,
serverBlockCacheFreeByServer);
}

@SuppressWarnings("unchecked")
BalancerClusterState(Collection<RegionInfo> unassignedRegions,
Map<ServerName, List<RegionInfo>> clusterState, Map<String, Deque<BalancerRegionLoad>> loads,
RegionHDFSBlockLocationFinder regionFinder, RackManager rackManager,
Map<String, Pair<ServerName, Float>> oldRegionServerRegionCacheRatio) {
Map<String, Pair<ServerName, Float>> oldRegionServerRegionCacheRatio,
Map<ServerName, Long> serverBlockCacheFreeByServer) {
if (unassignedRegions == null) {
unassignedRegions = Collections.emptyList();
}
Expand Down Expand Up @@ -394,6 +399,15 @@ protected BalancerClusterState(Map<ServerName, List<RegionInfo>> clusterState,
populateRegionPerLocationFromServer(regionsPerRack, colocatedReplicaCountsPerRack,
serversPerRack);
}

this.serverBlockCacheFreeSize = new long[numServers];
if (serverBlockCacheFreeByServer != null) {
for (int i = 0; i < numServers; i++) {
ServerName sn = servers[i];
this.serverBlockCacheFreeSize[i] =
sn == null ? 0L : serverBlockCacheFreeByServer.getOrDefault(sn, 0L);
}
}
}

private void populateRegionPerLocationFromServer(int[][] regionsPerLocation,
Expand Down Expand Up @@ -714,6 +728,18 @@ public int[] getOrComputeServerWithBestRegionCachedRatio() {
return regionServerIndexWithBestRegionCachedRatio;
}

/**
* Finds and return the latest reported cache ratio for the region on the RegionServer it's
* currently online.
*/
float getObservedRegionCacheRatio(int region) {
Deque<BalancerRegionLoad> dq = regionLoads[region];
if (dq == null || dq.isEmpty()) {
return 0.0f;
}
return dq.getLast().getCurrentRegionCacheRatio();
}

/**
* Maps region index to rack index
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ private boolean isViolating(RegionPlan regionPlan) {
private RegionPlanConditional createConditional(Class<? extends RegionPlanConditional> clazz,
BalancerClusterState cluster) {
if (cluster == null) {
cluster = new BalancerClusterState(Collections.emptyMap(), null, null, null, null);
cluster = new BalancerClusterState(Collections.emptyMap(), null, null, null, null, null);
}
try {
Constructor<? extends RegionPlanConditional> ctor =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ private BalancerClusterState createCluster(List<ServerName> servers,
}
}
return new BalancerClusterState(regions, clusterState, null, this.regionFinder, rackManager,
null);
null, null);
}

private List<ServerName> findIdleServers(List<ServerName> servers) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.hadoop.hbase.Size;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.master.RackManager;
import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;
Expand All @@ -58,6 +59,30 @@ public class CacheAwareLoadBalancer extends StochasticLoadBalancer {
"hbase.master.balancer.stochastic.throttling.cacheRatio";
public static final float CACHE_RATIO_THRESHOLD_DEFAULT = 0.8f;

/**
* Below this cache ratio on the current host, a move may be considered for the free-space
* heuristic.
*/
public static final String LOW_CACHE_RATIO_FOR_RELOCATION_KEY =
"hbase.master.balancer.cacheaware.lowCacheRatioThreshold";
public static final float LOW_CACHE_RATIO_FOR_RELOCATION_DEFAULT = 0.35f;

/**
* Optimistic region cache ratio assumed for cost purposes when a better host has free cache space
* (actual warmup is not modeled).
*/
public static final String POTENTIAL_CACHE_RATIO_AFTER_MOVE_KEY =
"hbase.master.balancer.cacheaware.potentialCacheRatioAfterMove";
public static final float POTENTIAL_CACHE_RATIO_AFTER_MOVE_DEFAULT = 0.95f;

/**
* Minimum free block cache on a target server, as a multiple of the region's on-disk size in
* bytes, required to count that server as a relocation opportunity.
*/
public static final String MIN_FREE_CACHE_SPACE_FACTOR_KEY =
"hbase.master.balancer.cacheaware.minFreeCacheSpaceFactor";
public static final float MIN_FREE_CACHE_SPACE_FACTOR_DEFAULT = 1.0f;

public Float ratioThreshold;

private Long sleepTime;
Expand Down Expand Up @@ -109,6 +134,23 @@ public void updateClusterMetrics(ClusterMetrics clusterMetrics) {
updateRegionLoad();
}

protected Map<ServerName, Long> getServerBlockCacheFreeBytes() {
if (clusterStatus == null) {
return null;
}
Map<ServerName, Long> map = new HashMap<>();
clusterStatus.getLiveServerMetrics().forEach((sn, sm) -> map.put(sn, sm.getCacheFreeSize()));
return map;
}

@Override
protected BalancerClusterState createState(Map<ServerName, List<RegionInfo>> clusterState,
Map<String, Deque<BalancerRegionLoad>> loads, RegionHDFSBlockLocationFinder finder,
RackManager rackManager) {
return new BalancerClusterState(clusterState, loads, finder, rackManager,
regionCacheRatioOnOldServerMap, getServerBlockCacheFreeBytes());
}

/**
* Collect the amount of region cached for all the regions from all the active region servers.
*/
Expand Down Expand Up @@ -149,8 +191,16 @@ private void updateRegionLoad() {
if (!ServerName.isSameAddress(currentServer, sn)) {
int regionSizeMB =
regionCacheRatioOnCurrentServerMap.get(regionEncodedName).getSecond();
// The coldDataSize accounts for data size classified as "cold" by DataTieringManager,
// which should be kept out of cache. We sum cold region size in the cache ratio, as we
// don't want to move regions with low cache ratio due to data classified as cold.
float regionCacheRatioOnOldServer =
regionSizeMB == 0 ? 0.0f : (float) regionSizeInCache / regionSizeMB;
regionSizeMB
== 0
? 0.0f
: (float) (regionSizeInCache
+ sm.getRegionColdDataSize().getOrDefault(regionEncodedName, 0))
/ regionSizeMB;
regionCacheRatioOnOldServerMap.put(regionEncodedName,
new Pair<>(sn, regionCacheRatioOnOldServer));
}
Expand Down Expand Up @@ -473,6 +523,9 @@ static class CacheAwareCostFunction extends CostFunction {
private static final String CACHE_COST_KEY = "hbase.master.balancer.stochastic.cacheCost";
private double cacheRatio;
private double bestCacheRatio;
private final float lowCacheRatioThreshold;
private final float potentialCacheRatioAfterMove;
private final float minFreeCacheSpaceFactor;

private static final float DEFAULT_CACHE_COST = 20;

Expand All @@ -483,25 +536,90 @@ static class CacheAwareCostFunction extends CostFunction {
!isPersistentCache ? 0.0f : conf.getFloat(CACHE_COST_KEY, DEFAULT_CACHE_COST));
bestCacheRatio = 0.0;
cacheRatio = 0.0;
lowCacheRatioThreshold =
conf.getFloat(LOW_CACHE_RATIO_FOR_RELOCATION_KEY, LOW_CACHE_RATIO_FOR_RELOCATION_DEFAULT);
potentialCacheRatioAfterMove = Math.min(1.0f, conf
.getFloat(POTENTIAL_CACHE_RATIO_AFTER_MOVE_KEY, POTENTIAL_CACHE_RATIO_AFTER_MOVE_DEFAULT));
minFreeCacheSpaceFactor =
conf.getFloat(MIN_FREE_CACHE_SPACE_FACTOR_KEY, MIN_FREE_CACHE_SPACE_FACTOR_DEFAULT);
}

@Override
void prepare(BalancerClusterState cluster) {
super.prepare(cluster);
cacheRatio = 0.0;
bestCacheRatio = 0.0;
recomputeCacheRatio(cluster);
if (LOG.isDebugEnabled()) {
LOG.debug("CacheAwareCostFunction: Cost: {}", 1 - cacheRatio);
}
}

private void recomputeCacheRatio(BalancerClusterState cluster) {
double[] currentWeighted = computeCurrentWeightedContributions(cluster);
double currentSum = 0.0;
double bestCacheSum = 0.0;
for (int region = 0; region < cluster.numRegions; region++) {
cacheRatio += cluster.getOrComputeWeightedRegionCacheRatio(region,
cluster.regionIndexToServerIndex[region]);
bestCacheRatio += cluster.getOrComputeWeightedRegionCacheRatio(region,
getServerWithBestCacheRatioForRegion(region));
currentSum += currentWeighted[region];
// here we only get the server index where this region cache ratio is the highest
int serverIndexBestCache = cluster.getOrComputeServerWithBestRegionCachedRatio()[region];
double currentHighestCache =
cluster.getOrComputeWeightedRegionCacheRatio(region, serverIndexBestCache);
// Get a hypothetical best cache ratio for this region if any server has enough free cache
// to host it.
double potentialHighestCache =
potentialBestWeightedFromFreeCache(cluster, region, currentHighestCache);
double actualHighest = Math.max(currentHighestCache, potentialHighestCache);
bestCacheSum += actualHighest;
}
bestCacheRatio = bestCacheSum;
if (bestCacheSum <= 0.0) {
cacheRatio = cluster.numRegions == 0 ? 1.0 : 0.0;
} else {
cacheRatio = Math.min(1.0, currentSum / bestCacheSum);
}
}

cacheRatio = bestCacheRatio == 0 ? 1.0 : cacheRatio / bestCacheRatio;
if (LOG.isDebugEnabled()) {
LOG.debug("CacheAwareCostFunction: Cost: {}", 1 - cacheRatio);
private double[] computeCurrentWeightedContributions(BalancerClusterState cluster) {
int totalRegions = cluster.numRegions;
double[] contrib = new double[totalRegions];
for (int r = 0; r < totalRegions; r++) {
int s = cluster.regionIndexToServerIndex[r];
int sizeMb = cluster.getTotalRegionHFileSizeMB(r);
if (sizeMb <= 0) {
contrib[r] = 0.0;
continue;
}
contrib[r] = cluster.getOrComputeWeightedRegionCacheRatio(r, s);
}
return contrib;
}

/*
* If this region is cold in metrics and at least one RS (including its current host) reports
* enough free block cache to hold it, return an optimistic weighted cache score ({@link
* #potentialCacheRatioAfterMove} * region MB) so placement is not considered optimal solely
* from low ratios when capacity exists somewhere in the cluster.
*/
private double potentialBestWeightedFromFreeCache(BalancerClusterState cluster, int region,
double currentHighestCache) {
if (cluster.serverBlockCacheFreeSize == null) {
return 0.0;
}
float observedRatio = cluster.getObservedRegionCacheRatio(region);
if (observedRatio >= lowCacheRatioThreshold) {
return 0.0;
}
int regionSizeMb = cluster.getTotalRegionHFileSizeMB(region);
if (regionSizeMb <= 0) {
return 0.0;
}
long regionSizeBytes = (long) regionSizeMb * 1024L * 1024L;
long requiredFree = (long) (regionSizeBytes * minFreeCacheSpaceFactor);
for (int s = 0; s < cluster.numServers; s++) {
if (cluster.serverBlockCacheFreeSize[s] >= requiredFree) {
return Math.max(currentHighestCache, regionSizeMb * potentialCacheRatioAfterMove);
}
}
return 0.0;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,15 +321,20 @@ private void updateBalancerTableLoadInfo(TableName tableName,
if ((this.localityCost != null) || (this.rackLocalityCost != null)) {
finder = this.regionFinder;
}
BalancerClusterState cluster =
new BalancerClusterState(loadOfOneTable, loads, finder, rackManager);
BalancerClusterState cluster = createState(loadOfOneTable, loads, finder, rackManager);

initCosts(cluster);
curOverallCost = computeCost(cluster, Double.MAX_VALUE);
System.arraycopy(tempFunctionCosts, 0, curFunctionCosts, 0, curFunctionCosts.length);
updateStochasticCosts(tableName, curOverallCost, curFunctionCosts);
}

protected BalancerClusterState createState(Map<ServerName, List<RegionInfo>> clusterState,
Map<String, Deque<BalancerRegionLoad>> loads, RegionHDFSBlockLocationFinder finder,
RackManager rackManager) {
return new BalancerClusterState(clusterState, loads, finder, rackManager);
}

@Override
public void
updateBalancerLoadInfo(Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfAllTable) {
Expand Down Expand Up @@ -577,8 +582,7 @@ protected List<RegionPlan> balanceTable(TableName tableName,
// The clusterState that is given to this method contains the state
// of all the regions in the table(s) (that's true today)
// Keep track of servers to iterate through them.
BalancerClusterState cluster = new BalancerClusterState(loadOfOneTable, loads, finder,
rackManager, regionCacheRatioOnOldServerMap);
BalancerClusterState cluster = createState(loadOfOneTable, loads, finder, rackManager);

long startTime = EnvironmentEdgeManager.currentTime();
cluster.setStopRequestedAt(startTime + maxRunningTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ static StochasticLoadBalancer buildStochasticLoadBalancer(BalancerClusterState c

static BalancerClusterState
createMockBalancerClusterState(Map<ServerName, List<RegionInfo>> serverToRegions) {
return new BalancerClusterState(serverToRegions, null, null, null, null);
return new BalancerClusterState(serverToRegions, null, null, null, null, null);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ private static class DummyBalancerClusterState extends BalancerClusterState {
private final RegionInfo[] testRegions;

DummyBalancerClusterState(BalancerClusterState bcs) {
super(bcs.clusterState, null, null, null, null);
super(bcs.clusterState, null, null, null, null, null);
this.testRegions = bcs.regions;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,4 +112,19 @@ default String getVersion() {
* rounded to MB
*/
Map<String, Integer> getRegionCachedInfo();

/**
* The available cache space on this region server (bytes), if reported in the server load.
*/
default long getCacheFreeSize() {
return 0L;
}

/**
* Returns the region cold data information for the regions hosted on this server. Here, cold data
* refers only to region data that is classified as cold by the DataTieringManager according to
* the configured priority logic. These data should be kept out of block cache.
* @return map of region encoded name and its total cold data size, rounded to MB
*/
Map<String, Integer> getRegionColdDataSize();
}
Loading
Loading