Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapreduce;

import org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience;

/**
* {@link RowKeyProgress} implementation for hex-encoded row keys (e.g. MD5/SHA prefixes). Non-hex
* bytes contribute zero.
*/
@InterfaceAudience.Public
public class HexStringRowKeyProgress implements RowKeyProgress {
/**
* Cap on hex characters interpreted. A {@code double} mantissa carries ~53 bits (~13 hex chars);
* reading more adds no information and risks precision loss.
*/
private static final int MAX_PREFIX_LENGTH = 13;

/**
* Hex characters past the start/stop divergence point to include for resolution. 4 hex chars =
* 65,536 buckets, finer than any progress bar can display.
*/
private static final int RESOLUTION_PADDING = 4;

private int prefixLength;
private double start;
private double stop;

@Override
public void setStartStopRows(byte[] startRow, byte[] stopRow) {
int common = commonPrefixLength(startRow, stopRow);
this.prefixLength = Math.min(common + RESOLUTION_PADDING, MAX_PREFIX_LENGTH);
this.start = hexPrefixToDouble(startRow);
this.stop = hexPrefixToDouble(stopRow);
}

@Override
public float getProgress(byte[] currentRow) {
if (currentRow == null || stop <= start) {
return 0.0f;
}
double current = hexPrefixToDouble(currentRow);
float progress = (float) ((current - start) / (stop - start));
return Math.min(1.0f, Math.max(0.0f, progress));
}

private static int commonPrefixLength(byte[] a, byte[] b) {
if (a == null || b == null) {
return 0;
}
return Bytes.findCommonPrefix(a, b, a.length, b.length, 0, 0);
}

private double hexPrefixToDouble(byte[] row) {
if (row == null) {
return 0;
}
int len = Math.min(prefixLength, row.length);
double d = 0;
for (int i = 0; i < prefixLength; i++) {
d *= 16;
if (i < len) {
d += hexCharToInt(row[i]);
}
}
return d;
}

private static int hexCharToInt(byte b) {
if (b >= '0' && b <= '9') {
return b - '0';
}
if (b >= 'a' && b <= 'f') {
return 10 + (b - 'a');
}
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that handling [A-F] would be useful.

if (b >= 'A' && b <= 'F') {
return 10 + (b - 'A');
}
return 0;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapreduce;

import org.apache.yetus.audience.InterfaceAudience;

/**
* Estimates scan progress based on row key positions within a start/stop range. Custom
* implementations can be plugged in via {@link RowKeyProgress#PROGRESS_CLASS_KEY}.
*/
@InterfaceAudience.Public
public interface RowKeyProgress {
String PROGRESS_CLASS_KEY = "hbase.mapreduce.rowkey.progress.class";

/**
* Initialize the progress estimator with the start and stop row keys.
* @param startRow the start row of the scan (inclusive), may be null or empty
* @param stopRow the stop row of the scan (exclusive), may be null or empty
*/
void setStartStopRows(byte[] startRow, byte[] stopRow);

/**
* Estimate progress as a fraction between 0.0 and 1.0 based on where {@code currentRow} falls in
* the range.
* @param currentRow the last successfully read row key, or null if no row has been read yet
* @return estimated progress between 0.0 and 1.0, or 0.0 if progress cannot be estimated
*/
float getProgress(byte[] currentRow);
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.ConnectionConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
Expand All @@ -34,6 +35,7 @@
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
Expand Down Expand Up @@ -66,6 +68,7 @@ public class TableRecordReaderImpl {
private int rowcount;
private boolean logScannerActivity = false;
private int logPerRowCount = 100;
private RowKeyProgress rowKeyProgress = null;

/**
* Restart from survivable exceptions by creating a new scanner.
Expand Down Expand Up @@ -141,9 +144,67 @@ public void initialize(InputSplit inputsplit, TaskAttemptContext context)
if (context != null) {
this.context = context;
}
initProgressBounds();
restart(scan.getStartRow());
}

/**
* Resolve the start/stop row keys used for progress estimation. The TableInputFormat splitter
* sets start and stop row keys from region boundaries, so they are only empty for the table's
* very first region (empty start) or last region (empty stop). In those cases, probe the table to
* discover the actual first or last row key as an approximation.
*/
private void initProgressBounds() {
if (context == null) {
return;
}
byte[] startRow = scan.getStartRow();
byte[] stopRow = scan.getStopRow();
if (startRow == null || startRow.length == 0) {
startRow = probeFirstRow();
}
if (stopRow == null || stopRow.length == 0) {
stopRow = probeLastRow();
}
Configuration conf = context.getConfiguration();
Class<? extends RowKeyProgress> progressClass = conf.getClass(RowKeyProgress.PROGRESS_CLASS_KEY,
UniformRowKeyProgress.class, RowKeyProgress.class);
rowKeyProgress = ReflectionUtils.newInstance(progressClass, conf);
rowKeyProgress.setStartStopRows(startRow, stopRow);
}

private byte[] probeFirstRow() {
try {
Scan probeScan = new Scan(scan);
probeScan.setOneRowLimit();
try (ResultScanner probeScanner = htable.getScanner(probeScan)) {
Result result = probeScanner.next();
return result != null ? result.getRow() : null;
}
} catch (IOException e) {
LOG.warn("Failed to probe first row for progress estimation", e);
return null;
Comment on lines +185 to +186
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there are any issues with the scan, this will simply report 0 progress.

}
}

private byte[] probeLastRow() {
try {
Scan probeScan = new Scan(scan);
// Only called for the last region, so swap row bounds for the reversed scan.
probeScan.withStartRow(HConstants.EMPTY_START_ROW);
probeScan.withStopRow(scan.getStartRow(), scan.includeStartRow());
probeScan.setReversed(true);
probeScan.setOneRowLimit();
try (ResultScanner probeScanner = htable.getScanner(probeScan)) {
Result result = probeScanner.next();
return result != null ? result.getRow() : null;
}
Comment on lines +190 to +201
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch. Fixed this part.

} catch (IOException e) {
LOG.warn("Failed to probe last row for progress estimation", e);
return null;
}
}

/**
* Closes the split.
*/
Expand Down Expand Up @@ -318,8 +379,10 @@ protected static void updateCounters(ScanMetrics scanMetrics, long numScannerRes
* @return A number between 0.0 and 1.0, the fraction of the data read.
*/
public float getProgress() {
// Depends on the total number of tuples
return 0;
if (rowKeyProgress == null) {
return 0;
}
return rowKeyProgress.getProgress(lastSuccessfulRow);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapreduce;

import org.apache.yetus.audience.InterfaceAudience;

/**
* {@link RowKeyProgress} implementation that treats row keys as raw byte sequences. Converts the
* leading bytes to a big-endian unsigned numeric value and computes progress as a linear fraction
* of the key space.
*/
@InterfaceAudience.Public
public class UniformRowKeyProgress implements RowKeyProgress {
private static final int BYTES_FOR_PROGRESS = Double.BYTES;

private double start;
private double stop;

@Override
public void setStartStopRows(byte[] startRow, byte[] stopRow) {
this.start = rowKeyToDouble(startRow);
this.stop = rowKeyToDouble(stopRow);
}

@Override
public float getProgress(byte[] currentRow) {
if (currentRow == null || stop <= start) {
return 0.0f;
}
double current = rowKeyToDouble(currentRow);
float progress = (float) ((current - start) / (stop - start));
return Math.min(1.0f, Math.max(0.0f, progress));
}

/**
* Interpret the leading bytes of a row key as an unsigned big-endian value. Keys shorter than
* {@link #BYTES_FOR_PROGRESS} bytes are treated as if right-padded with zeros.
*/
private static double rowKeyToDouble(byte[] row) {
if (row == null) {
return 0;
}
double d = 0;
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

double avoids unsigned arithmetic issues. Java has no unsigned long, so interpreting row key bytes as a raw long would be cumbersome. With double, all values are naturally non-negative and standard arithmetic just works.

for (int i = 0; i < BYTES_FOR_PROGRESS; i++) {
d *= 256; // shift left by one byte (2^8) to build a big-endian base-256 number
if (i < row.length) {
d += (row[i] & 0xFF);
}
}
return d;
}
}
Loading