Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
53 changes: 46 additions & 7 deletions bigtable-dataflow-parent/bigtable-beam-import/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,33 @@ limitations under the License.
<type>pom</type>
<scope>import</scope>
</dependency>

<!-- Version alignment -->
<!-- Mark all annotations as provided. They don't affect the runtime of the pipeline so
there is no need to try to version align them -->
<dependency>
<groupId>org.checkerframework</groupId>
<artifactId>checker-qual</artifactId>
<version>3.31.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.errorprone</groupId>
<artifactId>error_prone_annotations</artifactId>
<version>2.18.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.codehaus.mojo</groupId>
<artifactId>animal-sniffer-annotations</artifactId>
<version>1.22</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>jakarta.annotation</groupId>
<artifactId>jakarta.annotation-api</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
</dependencyManagement>

Expand Down Expand Up @@ -118,6 +145,10 @@ limitations under the License.
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-shaded-client</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
Expand All @@ -134,7 +165,11 @@ limitations under the License.
<artifactId>beam-runners-direct-java</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-bigtable-emulator-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-shaded-testing-util</artifactId>
Expand All @@ -148,11 +183,6 @@ limitations under the License.
</exclusions>
</dependency>

<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-bigtable-emulator-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.cloud.bigtable</groupId>
<artifactId>bigtable-internal-test-helper</artifactId>
Expand Down Expand Up @@ -186,7 +216,7 @@ limitations under the License.
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<artifactId>mockito-inline</artifactId>
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>
Expand Down Expand Up @@ -221,6 +251,14 @@ limitations under the License.


<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<fork>true</fork>
</configuration>
</plugin>

<plugin>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
Expand Down Expand Up @@ -287,6 +325,7 @@ limitations under the License.
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/services/java.net.spi.InetAddressResolverProvider</exclude>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
Expand Down
139 changes: 139 additions & 0 deletions bigtable-dataflow-parent/bigtable-beam-import/run-snapshot-import.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
#!/bin/bash

# This script runs a range of Dataflow snapshot import jobs sequentially.
# It should be executed from the 'bigtable-dataflow-parent/bigtable-beam-import' directory.
#
# Usage: ./run-snapshot-import.sh <start_shard> <end_shard>
# Or: ./run-snapshot-import.sh --all
# Example: ./run-snapshot-import.sh 0 3
# Example: ./run-snapshot-import.sh --all
#
# You can override default configurations by setting environment variables in your terminal.
# Example: TABLE_NAME="my-table" SNAPSHOT_NAME="my-snap" ./run-snapshot-import.sh 0 3
#
# NOTE: If you are running on a newer JDK (like Java 21 or 26) and hit ByteBuddy errors,
# you can add '-Dnet.bytebuddy.experimental=true' to the java command lines below.
#
# --- Manual Parallel Execution ---
# To run shards in parallel groups of 4 (assuming 20 shards total), you can run 5 instances of this script.
#
# IMPORTANT: Shard 0 performs the restore step. You MUST run the first group (including shard 0)
# first and let it complete the restore step before launching other groups in parallel,
# otherwise they will fail because the restored files won't exist yet!
#
# Example for manual parallel execution:
# ./run-snapshot-import.sh 0 3 & # Run this first!
# # Wait for shard 0 to finish restore, then run the rest:
# ./run-snapshot-import.sh 4 7 &
# ./run-snapshot-import.sh 8 11 &
# ./run-snapshot-import.sh 12 15 &
# ./run-snapshot-import.sh 16 19 &
#
# --- Automated Parallel Execution ---
# Alternatively, use the --all flag to automatically handle the restore step and launch all groups:
# ./run-snapshot-import.sh --all

if [ "$#" -ne 2 ] && [ "$1" != "--all" ]; then
echo "Usage: $0 <start_shard> <end_shard>"
echo " Or: $0 --all"
exit 1
fi

START_SHARD=$1
END_SHARD=$2

# Configurations (Uses environment variables if set, otherwise defaults)
export PROJECT_ID="${PROJECT_ID:-google.com:cloud-bigtable-dev}"
export INSTANCE_ID="${INSTANCE_ID:-tianlei-test-inst}"
export BUCKET="${BUCKET:-tianlei-beam-test-bucket}"
export REGION="${REGION:-us-central1}"

export TABLE_NAME="${TABLE_NAME:-validation_test}"
export SNAPSHOT_NAME="${SNAPSHOT_NAME:-validation_test_20200929}"
export SERVICE_ACCOUNT="${SERVICE_ACCOUNT:-295490517436-compute@developer.gserviceaccount.com}"

export NUM_SHARDS="${NUM_SHARDS:-20}"

export NETWORK="${NETWORK:-tianlei-network}"
export SUBNETWORK="${SUBNETWORK:-regions/us-central1/subnetworks/tianlei-network}"

JAR_PATH="target/bigtable-beam-import-2.17.0-shaded.jar"

# --- AUTO-PARALLEL MODE ---
if [ "$1" == "--all" ]; then
echo "🚀 Starting fully automated snapshot import..."

# Step 1: Perform ONLY the restore step
echo "Step 1/2: Performing snapshot restore (blocking)..."
java -jar ${JAR_PATH} importsnapshot \
--runner=DataflowRunner \
--project=${PROJECT_ID} \
--bigtableInstanceId=${INSTANCE_ID} \
--bigtableTableId=${TABLE_NAME} \
--importConfigFilePath=import-config-test.json \
--stagingLocation=gs://${BUCKET}/dataflow/staging \
--tempLocation=gs://${BUCKET}/dataflow/temp \
--region=${REGION} \
--performOnlyRestoreStep=true \
--jobName="restore-job" \
--network=${NETWORK} \
--subnetwork=${SUBNETWORK}

echo "Restore completed. Proceeding to data import."

# Step 2: Launch parallel groups of 4
echo "Step 2/2: Launching parallel groups of 4 shards..."
SHARDS_PER_GROUP=4

for (( start=0; start<$NUM_SHARDS; start+=$SHARDS_PER_GROUP )); do
end=$((start + SHARDS_PER_GROUP - 1))
[ $end -ge $NUM_SHARDS ] && end=$((NUM_SHARDS - 1))

echo "Launching group: shards $start to $end in background"
# Call ourselves with the range!
$0 $start $end &
done

echo "All groups launched. Waiting for all background jobs to finish..."
wait
echo "🎉 All import jobs completed!"
exit 0
fi
# ----------------------------------------

# Standard Range Mode
for i in $(seq $START_SHARD $END_SHARD); do
echo "Submitting Dataflow job for shardIndex: $i"

# We skip restore for all shards if running via --all because Step 1 handled it.
# If running manually via ranges, shard 0 will perform restore.
SKIP_RESTORE="true"
if [ $i -eq 0 ]; then
SKIP_RESTORE="false"
fi

JOB="job-${i}"
java -jar ${JAR_PATH} importsnapshot \
--runner=DataflowRunner \
--project=${PROJECT_ID} \
--bigtableInstanceId=${INSTANCE_ID} \
--bigtableTableId=${TABLE_NAME} \
--importConfigFilePath=import-config-test.json \
--stagingLocation=gs://${BUCKET}/dataflow/staging \
--tempLocation=gs://${BUCKET}/dataflow/temp \
--workerMachineType=n1-highmem-4 \
--diskSizeGb=500 \
--maxNumWorkers=10 \
--region=${REGION} \
--serviceAccount=${SERVICE_ACCOUNT} \
--usePublicIps=false \
--enableSnappy=true \
--skipRestoreStep=${SKIP_RESTORE} \
--numShards=${NUM_SHARDS} \
--shardIndex=$i \
--jobName="${JOB}" \
--network=${NETWORK} \
--subnetwork=${SUBNETWORK}

# Sequential within this script instance
done
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import com.google.bigtable.repackaged.com.google.api.core.InternalApi;
import com.google.bigtable.repackaged.com.google.api.core.InternalExtensionOnly;
import com.google.cloud.bigtable.beam.hbasesnapshots.HBaseSnapshotRestoreTool;
import com.google.cloud.bigtable.beam.hbasesnapshots.ImportJobFromHbaseSnapshot;
import com.google.cloud.bigtable.beam.sequencefiles.CreateTableHelper;
import com.google.cloud.bigtable.beam.sequencefiles.ExportJob;
Expand Down Expand Up @@ -51,6 +52,9 @@ public static void main(String[] args) throws Exception {
case "importsnapshot":
ImportJobFromHbaseSnapshot.main(subArgs);
break;
case "restoresnapshot":
HBaseSnapshotRestoreTool.main(subArgs);
break;
case "create-table":
CreateTableHelper.main(subArgs);
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import com.google.cloud.bigtable.beam.sequencefiles.ImportJob.ImportOptions;
import com.google.cloud.bigtable.beam.validation.SyncTableJob.SyncTableOptions;
import com.google.cloud.bigtable.hbase.BigtableOptionsFactory;
import com.google.cloud.bigtable.hbase.wrappers.BigtableHBaseSettings;
import java.util.concurrent.TimeUnit;
import org.apache.beam.sdk.options.ValueProvider;

/**
Expand All @@ -44,7 +46,11 @@ public static CloudBigtableTableConfiguration buildImportConfig(
.withProjectId(opts.getBigtableProject())
.withInstanceId(opts.getBigtableInstanceId())
.withTableId(opts.getBigtableTableId())
.withConfiguration(BigtableOptionsFactory.CUSTOM_USER_AGENT_KEY, customUserAgent);
.withConfiguration(BigtableOptionsFactory.CUSTOM_USER_AGENT_KEY, customUserAgent)
.withConfiguration(BigtableOptionsFactory.MAX_INFLIGHT_RPCS_KEY, "100")
.withConfiguration(
BigtableHBaseSettings.BULK_MUTATION_CLOSE_TIMEOUT_MILLISECONDS,
Long.toString(TimeUnit.MINUTES.toMillis(30)));
if (opts.getBigtableAppProfileId() != null) {
builder.withAppProfileId(opts.getBigtableAppProfileId());
}
Expand Down
Loading
Loading