diff --git a/.github/workflows/integration-test.yml b/.github/workflows/integration-test.yml index 3a96e093e9..6eb1fb6ee5 100644 --- a/.github/workflows/integration-test.yml +++ b/.github/workflows/integration-test.yml @@ -106,11 +106,24 @@ jobs: "python3 integration_test/scripts/runner.py integration_test/seidb/state_store_test.yaml", ], }, + { + name: "EVM Module (FlatKV Import)", + scripts: [ + "docker exec sei-node-0 integration_test/contracts/deploy_flatkv_evm_fixture.sh", + "python3 integration_test/scripts/runner.py integration_test/seidb/flatkv_evm_test.yaml", + "./integration_test/contracts/import_flatkv_evm_cluster.sh", + "python3 integration_test/scripts/runner.py integration_test/seidb/flatkv_evm_test.yaml", + "docker exec sei-node-0 integration_test/contracts/verify_flatkv_evm_store.sh", + ], + }, { name: "EVM Module", env: "GIGA_STORAGE=true", scripts: [ "./integration_test/evm_module/scripts/evm_tests.sh", + "docker exec sei-node-0 integration_test/contracts/deploy_flatkv_evm_fixture.sh", + "python3 integration_test/scripts/runner.py integration_test/seidb/flatkv_evm_test.yaml", + "docker exec sei-node-0 integration_test/contracts/verify_flatkv_evm_store.sh", ] }, { diff --git a/integration_test/contracts/deploy_flatkv_evm_fixture.sh b/integration_test/contracts/deploy_flatkv_evm_fixture.sh new file mode 100755 index 0000000000..13de35ff86 --- /dev/null +++ b/integration_test/contracts/deploy_flatkv_evm_fixture.sh @@ -0,0 +1,206 @@ +#!/bin/bash + +set -euo pipefail + +export PATH="$PATH:/root/.foundry/bin:/root/go/bin" + +RPC_URL=${EVM_RPC_URL:-http://localhost:8545} +FROM=${FLATKV_EVM_FIXTURE_FROM:-admin} +PASSWORD=${FLATKV_EVM_FIXTURE_PASSWORD:-12345678} +CHAIN_ID=${FLATKV_EVM_FIXTURE_CHAIN_ID:-sei} +RECIPIENT_ADDR=${FLATKV_EVM_FIXTURE_RECIPIENT:-0x70997970C51812dc3A010C7d01b50e0d17dc79C8} +MISSING_ADDR=${FLATKV_EVM_FIXTURE_MISSING:-0xc1cadaffffffffffffffffffffffffffffffffff} +TRANSFER_VALUE_WEI=${FLATKV_EVM_FIXTURE_TRANSFER_VALUE_WEI:-1} +KEYRING_ARGS=() +if [ -n "${FLATKV_EVM_FIXTURE_KEYRING_BACKEND:-}" ]; then + KEYRING_ARGS+=(--keyring-backend "$FLATKV_EVM_FIXTURE_KEYRING_BACKEND") +fi + +# Constructor: +# sstore(0, 42) +# return runtime bytecode that returns 42 for any call. +STORAGE_CONTRACT_INIT_CODE=0x602a600055600a600f600039600a6000f3602a60005260206000f3 +STORAGE_SLOT_ZERO=0x0000000000000000000000000000000000000000000000000000000000000000 + +seihome=$(git rev-parse --show-toplevel) +out_dir="$seihome/integration_test/contracts" + +write_fixture() { + local name=$1 + local value=$2 + printf "%s\n" "$value" > "$out_dir/$name" +} + +run_seid() { + printf "%s\n" "$PASSWORD" | seid "$@" +} + +wait_for_evm_rpc() { + local timeout=120 + local elapsed=0 + until cast block-number --rpc-url "$RPC_URL" >/dev/null 2>&1; do + if [ "$elapsed" -ge "$timeout" ]; then + echo "EVM RPC did not become ready within ${timeout}s" >&2 + exit 1 + fi + sleep 2 + elapsed=$((elapsed + 2)) + done +} + +block_number() { + cast block-number --rpc-url "$RPC_URL" +} + +query_balance() { + cast balance "$1" --block "$2" --rpc-url "$RPC_URL" +} + +query_balance_hex() { + cast to-hex "$(query_balance "$1" "$2")" +} + +query_storage() { + cast storage "$1" "$2" --block "$3" --rpc-url "$RPC_URL" +} + +query_code() { + cast code "$1" --block "$2" --rpc-url "$RPC_URL" +} + +extract_tx_hash() { + grep -oE '0x[a-fA-F0-9]{64}' | head -1 +} + +wait_for_receipt() { + local tx_hash=$1 + local timeout=${2:-60} + local elapsed=0 + local response + + until [ "$elapsed" -ge "$timeout" ]; do + response=$(curl -s -X POST -H "Content-Type: application/json" \ + -d "{\"jsonrpc\":\"2.0\",\"id\":1,\"method\":\"eth_getTransactionReceipt\",\"params\":[\"$tx_hash\"]}" \ + "$RPC_URL" || true) + if printf "%s\n" "$response" | jq -e '.result != null' >/dev/null 2>&1; then + printf "%s\n" "$response" + return 0 + fi + sleep 1 + elapsed=$((elapsed + 1)) + done + + echo "Timed out waiting for EVM receipt $tx_hash" >&2 + return 1 +} + +require_success_receipt() { + local name=$1 + local receipt=$2 + local status + status=$(printf "%s\n" "$receipt" | jq -r '.result.status // empty') + if [ "$status" != "0x1" ] && [ "$status" != "1" ]; then + echo "FlatKV EVM $name failed:" >&2 + printf "%s\n" "$receipt" >&2 + exit 1 + fi +} + +echo "Generating FlatKV EVM historical fixture via $RPC_URL..." +wait_for_evm_rpc + +initial_height=$(block_number) +write_fixture "flatkv_evm_initial_block_height.txt" "$initial_height" +write_fixture "flatkv_evm_recipient_addr.txt" "$RECIPIENT_ADDR" +write_fixture "flatkv_evm_missing_addr.txt" "$MISSING_ADDR" +write_fixture "flatkv_evm_storage_slot.txt" "$STORAGE_SLOT_ZERO" + +run_seid tx evm associate-address \ + --from "$FROM" \ + "${KEYRING_ARGS[@]}" \ + --chain-id "$CHAIN_ID" \ + -b block \ + -y >/tmp/flatkv_evm_associate.out 2>&1 || true + +echo "Sending native EVM transfer to create/update recipient account..." +if ! transfer_out=$(run_seid tx evm send "$RECIPIENT_ADDR" "$TRANSFER_VALUE_WEI" \ + --from "$FROM" \ + "${KEYRING_ARGS[@]}" \ + --chain-id "$CHAIN_ID" \ + --evm-rpc "$RPC_URL" \ + -b sync \ + -y 2>&1); then + echo "FlatKV EVM transfer command failed:" >&2 + printf "%s\n" "$transfer_out" >&2 + exit 1 +fi +printf "%s\n" "$transfer_out" >/tmp/flatkv_evm_transfer.out +transfer_tx=$(printf "%s\n" "$transfer_out" | extract_tx_hash || true) +if [ -z "$transfer_tx" ]; then + echo "Failed to extract FlatKV EVM transfer tx hash:" >&2 + printf "%s\n" "$transfer_out" >&2 + exit 1 +fi +transfer_receipt=$(wait_for_receipt "$transfer_tx") +require_success_receipt "transfer" "$transfer_receipt" +printf "%s\n" "$transfer_receipt" >/tmp/flatkv_evm_transfer_receipt.json + +balance_height=$(block_number) +balance_expected=$(query_balance_hex "$RECIPIENT_ADDR" "$balance_height") +write_fixture "flatkv_evm_balance_block_height.txt" "$balance_height" +write_fixture "flatkv_evm_balance_expected.txt" "$balance_expected" + +echo "Deploying storage/code fixture contract..." +contract_hex_file=/tmp/flatkv_evm_storage_contract.hex +printf "%s" "${STORAGE_CONTRACT_INIT_CODE#0x}" > "$contract_hex_file" +if ! deploy_out=$(run_seid tx evm deploy "$contract_hex_file" \ + --from "$FROM" \ + "${KEYRING_ARGS[@]}" \ + --chain-id "$CHAIN_ID" \ + --evm-rpc "$RPC_URL" \ + -b sync \ + -y 2>&1); then + echo "FlatKV EVM deploy command failed:" >&2 + printf "%s\n" "$deploy_out" >&2 + exit 1 +fi +deploy_tx=$(printf "%s\n" "$deploy_out" | extract_tx_hash || true) +if [ -z "$deploy_tx" ]; then + echo "Failed to extract FlatKV EVM deploy tx hash:" >&2 + printf "%s\n" "$deploy_out" >&2 + exit 1 +fi +deploy_receipt=$(wait_for_receipt "$deploy_tx") +require_success_receipt "contract deployment" "$deploy_receipt" +printf "%s\n" "$deploy_receipt" > "$out_dir/flatkv_evm_deploy_receipt.json" + +contract_addr=$(printf "%s\n" "$deploy_receipt" | jq -r '.result.contractAddress // empty') +if [ -z "$contract_addr" ] || [ "$contract_addr" = "null" ]; then + contract_addr=$(printf "%s\n" "$deploy_out" | sed -n 's/^Deployed to: //p' | tail -1) +fi +if [ -z "$contract_addr" ] || [ "$contract_addr" = "null" ]; then + echo "Failed to extract contract address from deploy receipt:" >&2 + printf "%s\n" "$deploy_receipt" >&2 + exit 1 +fi + +contract_height=$(block_number) +storage_expected=$(query_storage "$contract_addr" "$STORAGE_SLOT_ZERO" "$contract_height") +code_expected=$(query_code "$contract_addr" "$contract_height") + +write_fixture "flatkv_evm_contract_addr.txt" "$contract_addr" +write_fixture "flatkv_evm_contract_block_height.txt" "$contract_height" +write_fixture "flatkv_evm_storage_expected.txt" "$storage_expected" +write_fixture "flatkv_evm_code_expected.txt" "$code_expected" + +missing_balance_expected=$(query_balance_hex "$MISSING_ADDR" "$contract_height") +missing_storage_expected=$(query_storage "$MISSING_ADDR" "$STORAGE_SLOT_ZERO" "$contract_height") +write_fixture "flatkv_evm_missing_balance_expected.txt" "$missing_balance_expected" +write_fixture "flatkv_evm_missing_storage_expected.txt" "$missing_storage_expected" + +latest_height=$(block_number) +write_fixture "flatkv_evm_latest_fixture_block_height.txt" "$latest_height" + +echo "FlatKV EVM fixture generated:" +echo " recipient=$RECIPIENT_ADDR balance_height=$balance_height balance=$balance_expected" +echo " contract=$contract_addr contract_height=$contract_height storage=$storage_expected" diff --git a/integration_test/contracts/import_flatkv_evm_cluster.sh b/integration_test/contracts/import_flatkv_evm_cluster.sh new file mode 100755 index 0000000000..7ab07ed8d8 --- /dev/null +++ b/integration_test/contracts/import_flatkv_evm_cluster.sh @@ -0,0 +1,145 @@ +#!/bin/bash + +set -euo pipefail + +PROJECT_ROOT=$(git rev-parse --show-toplevel) +NODE_COUNT=${FLATKV_EVM_IMPORT_NODE_COUNT:-4} + +dump_node_log() { + local node=$1 + echo "==================== ${node} seid log (last 200 lines) ====================" >&2 + local node_id=${node#sei-node-} + docker exec "$node" tail -200 "/sei-protocol/sei-chain/build/generated/logs/seid-${node_id}.log" >&2 || true + echo "==================== ${node} end log ====================" >&2 +} + +wait_for_height() { + local min_height=$1 + local timeout=${2:-180} + local elapsed=0 + local height=0 + + until [ "$elapsed" -ge "$timeout" ]; do + height=$(docker exec sei-node-0 build/seid status 2>/dev/null | jq -r ".SyncInfo.latest_block_height // 0" || echo 0) + if [ "$height" -gt "$min_height" ]; then + echo "sei-node-0 reached height $height" + return 0 + fi + echo "Still waiting for sei-node-0 to advance past height $min_height (height=$height elapsed=${elapsed}s/${timeout}s)" + sleep 5 + elapsed=$((elapsed + 5)) + done + + echo "Timed out waiting for sei-node-0 to advance past height $min_height (last height: $height)" >&2 + for i in $(seq 0 $((NODE_COUNT - 1))); do + dump_node_log "sei-node-$i" + done + return 1 +} + +# wait_for_evm_rpc polls each node's EVM HTTP endpoint until it responds, so +# the post-restart flatkv_evm_test.yaml run can't race the seid restart and +# hit connection refused on http://localhost:8545. Tendermint typically +# advances a height or two before the in-process EVM RPC server finishes +# binding 8545, so wait_for_height alone is not a sufficient readiness gate +# for the next test phase. +wait_for_evm_rpc() { + local timeout=${1:-120} + for i in $(seq 0 $((NODE_COUNT - 1))); do + local node="sei-node-$i" + local elapsed=0 + until docker exec "$node" bash -lc 'curl -sf -o /dev/null -X POST -H "Content-Type: application/json" -d "{\"jsonrpc\":\"2.0\",\"id\":1,\"method\":\"eth_blockNumber\",\"params\":[]}" http://localhost:8545'; do + if [ "$elapsed" -ge "$timeout" ]; then + echo "EVM RPC on $node did not become ready within ${timeout}s after restart" >&2 + dump_node_log "$node" + return 1 + fi + echo "Waiting for EVM RPC on $node (elapsed=${elapsed}s/${timeout}s)" + sleep 3 + elapsed=$((elapsed + 3)) + done + echo "EVM RPC on $node is responding" + done +} + +echo "Building seidb import tool..." +# Go lives at /usr/local/go/bin/go in the container (see docker/localnode/Dockerfile) +# but is not on the default PATH for non-interactive shells, so call it absolutely. +GO_BIN=${GO_BIN:-/usr/local/go/bin/go} +docker exec -e GOPROXY="${GOPROXY:-https://proxy.golang.org,direct}" sei-node-0 bash -c "cd /sei-protocol/sei-chain && $GO_BIN build -o build/seidb ./sei-db/tools/cmd/seidb" + +start_height=$(docker exec sei-node-0 build/seid status | jq -r ".SyncInfo.latest_block_height") +echo "Stopping seid processes at height $start_height..." +for i in $(seq 0 $((NODE_COUNT - 1))); do + docker exec "sei-node-$i" pkill -f "seid start" >/dev/null 2>&1 || true +done + +echo "Waiting for seid processes to stop..." +for i in $(seq 0 $((NODE_COUNT - 1))); do + stopped=false + for _ in $(seq 1 30); do + if ! docker exec "sei-node-$i" pgrep -f "seid start" >/dev/null 2>&1; then + stopped=true + break + fi + sleep 1 + done + if [ "$stopped" != "true" ]; then + echo "sei-node-$i did not stop within 30s" >&2 + exit 1 + fi +done + +echo "Importing evm module from memiavl into FlatKV on all validators..." +for i in $(seq 0 $((NODE_COUNT - 1))); do + docker exec "sei-node-$i" bash -lc "cd /sei-protocol/sei-chain && build/seidb import-flatkv-from-memiavl --modules=evm --data-dir /root/.sei/data" +done + +echo "Applying GIGA_STORAGE config and restarting seid processes..." +for i in $(seq 0 $((NODE_COUNT - 1))); do + docker exec -e GIGA_STORAGE=true "sei-node-$i" /usr/bin/config_override.sh + # The import tool moves only SC-layer EVM data into FlatKV. SS history + # for EVM stays in the existing combined cosmos pebbledb, so we must keep + # evm-ss-split=false to avoid the rootmulti startup panic: + # "EVM SS directory ... does not exist but Cosmos SS already has history". + # Switching the SS layer to split mode mid-life requires a separate state-sync + # workflow which is out of scope for this SC import test. + docker exec "sei-node-$i" sed -i 's/evm-ss-split = true/evm-ss-split = false/' /root/.sei/config/app.toml + # Lattice hash must also stay off across the import boundary. Pre-import + # the chain ran without FlatKV, so tendermint persisted AppHash = memiavl-only + # for all blocks up to the import height. Turning sc-enable-lattice-hash + # on now would fold the FlatKV LtHash into the AppHash and the replay check + # at startup would fail with "state.AppHash does not match AppHash after replay". + # dual_write does not require lattice hash (see sei-db/config/toml_test.go); + # only split_write does. A real production rollout would coordinate this + # transition via a chain upgrade at an agreed height. + docker exec "sei-node-$i" sed -i 's/sc-enable-lattice-hash = true/sc-enable-lattice-hash = false/' /root/.sei/config/app.toml +done +# `docker exec -d` is required: start_sei.sh backgrounds seid then exits, and a +# non-detached docker exec session would close stdout/stderr, killing seid. +# See integration_test/autobahn/autobahn_test.go::restartNode for the precedent. +for i in $(seq 0 $((NODE_COUNT - 1))); do + docker exec -d -e "ID=$i" "sei-node-$i" /usr/bin/start_sei.sh +done + +# Confirm each seid actually came up before waiting on block production, so a +# crash on startup is reported promptly instead of after the 4 minute timeout. +sleep 5 +for i in $(seq 0 $((NODE_COUNT - 1))); do + if ! docker exec "sei-node-$i" pgrep -f "seid start" >/dev/null 2>&1; then + echo "ERROR: sei-node-$i did not stay running after restart" >&2 + dump_node_log "sei-node-$i" + exit 1 + fi +done + +wait_for_height "$start_height" 240 + +# Tendermint advancing past start_height does NOT imply the in-process EVM +# RPC HTTP server has finished binding 8545. The downstream +# integration_test/seidb/flatkv_evm_test.yaml docker-execs `cast` against +# http://localhost:8545; gate on that endpoint explicitly so it can't race +# the seid restart. +wait_for_evm_rpc 120 + +echo "FlatKV EVM import completed for $NODE_COUNT validators in $PROJECT_ROOT" diff --git a/integration_test/contracts/verify_flatkv_evm_store.sh b/integration_test/contracts/verify_flatkv_evm_store.sh new file mode 100755 index 0000000000..cbd520bc06 --- /dev/null +++ b/integration_test/contracts/verify_flatkv_evm_store.sh @@ -0,0 +1,67 @@ +#!/bin/bash + +set -euo pipefail + +export PATH="$PATH:/root/go/bin:/usr/local/go/bin" + +seihome=$(git rev-parse --show-toplevel) +flatkv_dir=${FLATKV_DIR:-/root/.sei/data/state_commit/flatkv} +dump_dir=${FLATKV_EVM_DUMP_DIR:-/tmp/flatkv-evm-dump} +storage_dump="$dump_dir/storage" +contract_addr_file="$seihome/integration_test/contracts/flatkv_evm_contract_addr.txt" + +cd "$seihome" + +if [ ! -x build/seidb ]; then + echo "Building seidb for FlatKV smoke verification..." + GOPROXY=${GOPROXY:-https://proxy.golang.org,direct} go build -o build/seidb ./sei-db/tools/cmd/seidb +fi + +rm -rf "$dump_dir" +mkdir -p "$dump_dir" + +echo "Dumping FlatKV storage bucket from $flatkv_dir..." +build/seidb dump-flatkv --db-dir "$flatkv_dir" --output-dir "$dump_dir" --bucket storage + +if [ ! -s "$storage_dump" ]; then + echo "FlatKV storage dump is missing or empty: $storage_dump" >&2 + exit 1 +fi + +if ! grep -q '^Key:' "$storage_dump"; then + echo "FlatKV storage dump has no key/value rows: $storage_dump" >&2 + exit 1 +fi + +if [ ! -s "$contract_addr_file" ]; then + echo "Missing FlatKV EVM fixture contract address: $contract_addr_file" >&2 + exit 1 +fi + +contract_hex=$(tail -1 "$contract_addr_file") +contract_hex=${contract_hex#0x} +contract_hex=$(printf "%s" "$contract_hex" | tr '[:lower:]' '[:upper:]') +if [ -z "$contract_hex" ]; then + echo "FlatKV EVM fixture contract address is empty: $contract_addr_file" >&2 + exit 1 +fi + +if ! grep -q "$contract_hex" "$storage_dump"; then + echo "FlatKV storage dump does not contain fixture contract address $contract_hex: $storage_dump" >&2 + exit 1 +fi + +if ! awk ' + /^Key:/ { + split($0, parts, "Value: ") + if (length(parts[2]) == 82) { + found = 1 + } + } + END { exit found ? 0 : 1 } +' "$storage_dump"; then + echo "FlatKV storage dump has no row with a 41-byte serialized storage value: $storage_dump" >&2 + exit 1 +fi + +echo "FlatKV storage bucket smoke verification passed: $storage_dump" diff --git a/integration_test/seidb/flatkv_evm_test.yaml b/integration_test/seidb/flatkv_evm_test.yaml new file mode 100644 index 0000000000..4cf716837f --- /dev/null +++ b/integration_test/seidb/flatkv_evm_test.yaml @@ -0,0 +1,95 @@ +- name: Test FlatKV EVM historical balance query + inputs: + - cmd: tail -1 integration_test/contracts/flatkv_evm_recipient_addr.txt + env: RECIPIENT_ADDR + - cmd: tail -1 integration_test/contracts/flatkv_evm_balance_block_height.txt + env: BALANCE_HEIGHT + - cmd: tail -1 integration_test/contracts/flatkv_evm_balance_expected.txt + env: EXPECTED_BALANCE + - cmd: cast to-hex $(cast balance $RECIPIENT_ADDR --block $BALANCE_HEIGHT --rpc-url http://localhost:8545) + env: ACTUAL_BALANCE + verifiers: + - type: eval + expr: ACTUAL_BALANCE == EXPECTED_BALANCE + +- name: Test FlatKV EVM historical storage query + inputs: + - cmd: tail -1 integration_test/contracts/flatkv_evm_contract_addr.txt + env: CONTRACT_ADDR + - cmd: tail -1 integration_test/contracts/flatkv_evm_storage_slot.txt + env: STORAGE_SLOT + - cmd: tail -1 integration_test/contracts/flatkv_evm_contract_block_height.txt + env: CONTRACT_HEIGHT + - cmd: tail -1 integration_test/contracts/flatkv_evm_storage_expected.txt + env: EXPECTED_STORAGE + - cmd: cast storage $CONTRACT_ADDR $STORAGE_SLOT --block $CONTRACT_HEIGHT --rpc-url http://localhost:8545 + env: ACTUAL_STORAGE + verifiers: + - type: eval + expr: ACTUAL_STORAGE == EXPECTED_STORAGE + +- name: Test FlatKV EVM historical code query + inputs: + - cmd: tail -1 integration_test/contracts/flatkv_evm_contract_addr.txt + env: CONTRACT_ADDR + - cmd: tail -1 integration_test/contracts/flatkv_evm_contract_block_height.txt + env: CONTRACT_HEIGHT + - cmd: tail -1 integration_test/contracts/flatkv_evm_code_expected.txt + env: EXPECTED_CODE + - cmd: cast code $CONTRACT_ADDR --block $CONTRACT_HEIGHT --rpc-url http://localhost:8545 + env: ACTUAL_CODE + verifiers: + - type: eval + expr: ACTUAL_CODE == EXPECTED_CODE + +- name: Test FlatKV EVM missing account and storage queries + inputs: + - cmd: tail -1 integration_test/contracts/flatkv_evm_missing_addr.txt + env: MISSING_ADDR + - cmd: tail -1 integration_test/contracts/flatkv_evm_storage_slot.txt + env: STORAGE_SLOT + - cmd: tail -1 integration_test/contracts/flatkv_evm_contract_block_height.txt + env: CONTRACT_HEIGHT + - cmd: tail -1 integration_test/contracts/flatkv_evm_missing_balance_expected.txt + env: EXPECTED_MISSING_BALANCE + - cmd: tail -1 integration_test/contracts/flatkv_evm_missing_storage_expected.txt + env: EXPECTED_MISSING_STORAGE + - cmd: cast to-hex $(cast balance $MISSING_ADDR --block $CONTRACT_HEIGHT --rpc-url http://localhost:8545) + env: ACTUAL_MISSING_BALANCE + - cmd: cast storage $MISSING_ADDR $STORAGE_SLOT --block $CONTRACT_HEIGHT --rpc-url http://localhost:8545 + env: ACTUAL_MISSING_STORAGE + verifiers: + - type: eval + expr: ACTUAL_MISSING_BALANCE == EXPECTED_MISSING_BALANCE + - type: eval + expr: ACTUAL_MISSING_STORAGE == EXPECTED_MISSING_STORAGE + +- name: Test FlatKV EVM latest query still sees migrated contract fixture + inputs: + - cmd: tail -1 integration_test/contracts/flatkv_evm_contract_addr.txt + env: CONTRACT_ADDR + - cmd: tail -1 integration_test/contracts/flatkv_evm_storage_slot.txt + env: STORAGE_SLOT + - cmd: tail -1 integration_test/contracts/flatkv_evm_storage_expected.txt + env: EXPECTED_STORAGE + - cmd: tail -1 integration_test/contracts/flatkv_evm_code_expected.txt + env: EXPECTED_CODE + - cmd: cast storage $CONTRACT_ADDR $STORAGE_SLOT --block latest --rpc-url http://localhost:8545 + env: ACTUAL_LATEST_STORAGE + - cmd: cast code $CONTRACT_ADDR --block latest --rpc-url http://localhost:8545 + env: ACTUAL_LATEST_CODE + verifiers: + - type: eval + expr: ACTUAL_LATEST_STORAGE == EXPECTED_STORAGE + - type: eval + expr: ACTUAL_LATEST_CODE == EXPECTED_CODE + +- name: Test non-EVM module remains queryable + inputs: + - cmd: printf "12345678\n" | seid keys list --output json | jq -r ".[0].address" + env: SEI_ADDR + - cmd: seid q bank balances $SEI_ADDR --output json | jq -r ".balances | length" + env: BANK_BALANCE_COUNT + verifiers: + - type: eval + expr: BANK_BALANCE_COUNT > 0 diff --git a/sei-db/state_db/sc/flatkv/import_translator.go b/sei-db/state_db/sc/flatkv/import_translator.go new file mode 100644 index 0000000000..2e3055cc14 --- /dev/null +++ b/sei-db/state_db/sc/flatkv/import_translator.go @@ -0,0 +1,169 @@ +package flatkv + +import ( + "fmt" + + "github.com/sei-protocol/sei-chain/sei-db/common/keys" + "github.com/sei-protocol/sei-chain/sei-db/proto" + "github.com/sei-protocol/sei-chain/sei-db/state_db/sc/flatkv/vtype" +) + +// PhysicalKVPair is a (physical_key, serialized_value) pair already encoded +// in FlatKV's on-disk layout, ready for direct insertion into KVImporter +// (e.g. via types.SnapshotNode). +type PhysicalKVPair struct { + Key []byte + Value []byte +} + +// ImportTranslator converts raw EVM/non-EVM changesets into physically-encoded +// pairs ready for FlatKV bulk import. +// +// It applies the same translation logic that CommitStore.ApplyChangeSets uses +// (classifyAndPrefix + processStorageChanges + processCodeChanges + +// processLegacyChanges + mergeAccountUpdates), but assumes the import target +// is empty so it does not merge with prior DB values. +// +// Storage / code / legacy / non-EVM pairs are emitted directly from each +// Translate call. Account-related entries (nonce, codehash) are buffered +// across all Translate calls so that each address is written exactly once +// with its fully-merged AccountData; flush them by calling Finalize. +// +// Deletes are dropped: importing into a fresh store has no prior values to +// remove. +// +// ImportTranslator is not safe for concurrent use. +type ImportTranslator struct { + blockHeight int64 + pendingAccts map[string]*vtype.PendingAccountWrite +} + +// NewImportTranslator creates a translator that stamps blockHeight onto every +// emitted value. blockHeight should match the memiavl version that the import +// is sourced from. +func NewImportTranslator(blockHeight int64) *ImportTranslator { + return &ImportTranslator{ + blockHeight: blockHeight, + pendingAccts: make(map[string]*vtype.PendingAccountWrite), + } +} + +// Translate returns the storage / code / legacy / non-EVM physical pairs +// encoded from cs. Account fragments (nonce, codehash) are buffered +// internally; flush them via Finalize after all changesets have been fed in. +// +// nil or empty changesets return (nil, nil). +func (t *ImportTranslator) Translate(cs *proto.NamedChangeSet) ([]PhysicalKVPair, error) { + if cs == nil || len(cs.Changeset.Pairs) == 0 { + return nil, nil + } + + // Drop deletes up front: import targets an empty store, so deleting a + // non-existent key is a no-op. This also keeps mergeAccountUpdates + // from interpreting nil values as "set field to zero". + filteredPairs := make([]*proto.KVPair, 0, len(cs.Changeset.Pairs)) + for _, p := range cs.Changeset.Pairs { + if p == nil || p.Delete { + continue + } + filteredPairs = append(filteredPairs, p) + } + if len(filteredPairs) == 0 { + return nil, nil + } + filteredCS := &proto.NamedChangeSet{ + Name: cs.Name, + Changeset: proto.ChangeSet{Pairs: filteredPairs}, + } + + changesByType, err := classifyAndPrefix([]*proto.NamedChangeSet{filteredCS}) + if err != nil { + return nil, err + } + + out := make([]PhysicalKVPair, 0, len(filteredPairs)) + + storageChanges, err := processStorageChanges(changesByType[keys.EVMKeyStorage], t.blockHeight) + if err != nil { + return nil, fmt.Errorf("failed to process storage changes: %w", err) + } + for k, v := range storageChanges { + if v.IsDelete() { + continue + } + out = append(out, PhysicalKVPair{Key: []byte(k), Value: v.Serialize()}) + } + + codeChanges, err := processCodeChanges(changesByType[keys.EVMKeyCode], t.blockHeight) + if err != nil { + return nil, fmt.Errorf("failed to process code changes: %w", err) + } + for k, v := range codeChanges { + if v.IsDelete() { + continue + } + out = append(out, PhysicalKVPair{Key: []byte(k), Value: v.Serialize()}) + } + + legacyChanges, err := processLegacyChanges(changesByType[keys.EVMKeyLegacy], t.blockHeight) + if err != nil { + return nil, fmt.Errorf("failed to process legacy changes: %w", err) + } + for k, v := range legacyChanges { + if v.IsDelete() { + continue + } + out = append(out, PhysicalKVPair{Key: []byte(k), Value: v.Serialize()}) + } + + // Accumulate nonce + codeHash entries from this batch into the + // translator-level pending account map. Multiple Translate calls + // naturally fold updates for the same address together: the SetXxx + // methods on PendingAccountWrite mutate the pointer in place when the + // receiver is non-nil. + batchAccts, err := mergeAccountUpdates( + changesByType[keys.EVMKeyNonce], + changesByType[keys.EVMKeyCodeHash], + nil, // TODO: balance, when balance key kind is introduced + ) + if err != nil { + return nil, fmt.Errorf("failed to merge account changes: %w", err) + } + for addr, batchUpdate := range batchAccts { + existing, ok := t.pendingAccts[addr] + if !ok || existing == nil { + t.pendingAccts[addr] = batchUpdate + continue + } + if batchUpdate.IsNonceSet() { + existing.SetNonce(batchUpdate.GetNonce()) + } + if batchUpdate.IsCodeHashSet() { + existing.SetCodeHash(batchUpdate.GetCodeHash()) + } + if batchUpdate.IsBalanceSet() { + existing.SetBalance(batchUpdate.GetBalance()) + } + } + + return out, nil +} + +// Finalize flushes the buffered account writes as physically-encoded pairs. +// Each accumulated address is merged into a fresh AccountData (no base, since +// the import target is empty) and serialized. +// +// Call once after all Translate calls. Translate must not be called after +// Finalize. +func (t *ImportTranslator) Finalize() []PhysicalKVPair { + out := make([]PhysicalKVPair, 0, len(t.pendingAccts)) + for addr, pending := range t.pendingAccts { + merged := pending.Merge(nil, t.blockHeight) + if merged.IsDelete() { + continue + } + out = append(out, PhysicalKVPair{Key: []byte(addr), Value: merged.Serialize()}) + } + t.pendingAccts = nil + return out +} diff --git a/sei-db/state_db/sc/flatkv/import_translator_test.go b/sei-db/state_db/sc/flatkv/import_translator_test.go new file mode 100644 index 0000000000..20282d83ae --- /dev/null +++ b/sei-db/state_db/sc/flatkv/import_translator_test.go @@ -0,0 +1,297 @@ +package flatkv + +import ( + "testing" + + "github.com/sei-protocol/sei-chain/sei-db/common/keys" + "github.com/sei-protocol/sei-chain/sei-db/proto" + "github.com/sei-protocol/sei-chain/sei-db/state_db/sc/flatkv/ktype" + "github.com/sei-protocol/sei-chain/sei-db/state_db/sc/flatkv/vtype" + "github.com/stretchr/testify/require" +) + +const importBlockHeight = int64(42) + +func findPair(t *testing.T, pairs []PhysicalKVPair, key []byte) PhysicalKVPair { + t.Helper() + for _, p := range pairs { + if string(p.Key) == string(key) { + return p + } + } + t.Fatalf("pair with key %x not found", key) + return PhysicalKVPair{} +} + +func TestImportTranslator_NilOrEmptyChangeSet(t *testing.T) { + tr := NewImportTranslator(importBlockHeight) + + pairs, err := tr.Translate(nil) + require.NoError(t, err) + require.Empty(t, pairs) + + emptyCS := &proto.NamedChangeSet{Name: keys.EVMStoreKey} + pairs, err = tr.Translate(emptyCS) + require.NoError(t, err) + require.Empty(t, pairs) + + require.Empty(t, tr.Finalize()) +} + +func TestImportTranslator_StorageEntry(t *testing.T) { + addr := addrN(0x42) + slot := slotN(0x07) + val := padLeft32(0x2A) + + tr := NewImportTranslator(importBlockHeight) + pairs, err := tr.Translate(namedCS(storagePair(addr, slot, []byte{0x2A}))) + require.NoError(t, err) + require.Len(t, pairs, 1) + + expectedKey := storagePhysKey(addr, slot) + require.Equal(t, expectedKey, pairs[0].Key) + + got, err := vtype.DeserializeStorageData(pairs[0].Value) + require.NoError(t, err) + require.Equal(t, importBlockHeight, got.GetBlockHeight()) + require.Equal(t, val, got.GetValue()[:]) + require.False(t, got.IsDelete()) + + require.Empty(t, tr.Finalize()) +} + +func TestImportTranslator_CodeEntry(t *testing.T) { + addr := addrN(0x42) + bytecode := []byte{0x60, 0x2A, 0x60, 0x00, 0x52, 0x60, 0x20, 0x60, 0x00, 0xF3} + + tr := NewImportTranslator(importBlockHeight) + pairs, err := tr.Translate(namedCS(codePair(addr, bytecode))) + require.NoError(t, err) + require.Len(t, pairs, 1) + + expectedKey := ktype.EVMPhysicalKey(keys.EVMKeyCode, addr[:]) + require.Equal(t, expectedKey, pairs[0].Key) + + got, err := vtype.DeserializeCodeData(pairs[0].Value) + require.NoError(t, err) + require.Equal(t, importBlockHeight, got.GetBlockHeight()) + require.Equal(t, bytecode, got.GetBytecode()) + + require.Empty(t, tr.Finalize()) +} + +func TestImportTranslator_LegacyEntryWithinEVMModule(t *testing.T) { + addr := addrN(0x42) + rawKey := append([]byte{0x09}, addr[:]...) + rawValue := []byte{0xAA, 0xBB} + + tr := NewImportTranslator(importBlockHeight) + cs := &proto.NamedChangeSet{ + Name: keys.EVMStoreKey, + Changeset: proto.ChangeSet{Pairs: []*proto.KVPair{ + {Key: rawKey, Value: rawValue}, + }}, + } + pairs, err := tr.Translate(cs) + require.NoError(t, err) + require.Len(t, pairs, 1) + + expectedKey := ktype.ModulePhysicalKey(keys.EVMStoreKey, rawKey) + require.Equal(t, expectedKey, pairs[0].Key) + + got, err := vtype.DeserializeLegacyData(pairs[0].Value) + require.NoError(t, err) + require.Equal(t, importBlockHeight, got.GetBlockHeight()) + require.Equal(t, rawValue, got.GetValue()) + require.False(t, got.IsDelete()) + + require.Empty(t, tr.Finalize()) +} + +func TestImportTranslator_NonEVMModuleRoutesToLegacy(t *testing.T) { + rawKey := []byte("custom-key") + rawValue := []byte("custom-value") + + tr := NewImportTranslator(importBlockHeight) + cs := &proto.NamedChangeSet{ + Name: "bank", + Changeset: proto.ChangeSet{Pairs: []*proto.KVPair{ + {Key: rawKey, Value: rawValue}, + }}, + } + pairs, err := tr.Translate(cs) + require.NoError(t, err) + require.Len(t, pairs, 1) + + expectedKey := ktype.ModulePhysicalKey("bank", rawKey) + require.Equal(t, expectedKey, pairs[0].Key) + + got, err := vtype.DeserializeLegacyData(pairs[0].Value) + require.NoError(t, err) + require.Equal(t, rawValue, got.GetValue()) + + require.Empty(t, tr.Finalize()) +} + +func TestImportTranslator_NonceOnlyAccountEmittedByFinalize(t *testing.T) { + addr := addrN(0x42) + + tr := NewImportTranslator(importBlockHeight) + pairs, err := tr.Translate(namedCS(noncePair(addr, 7))) + require.NoError(t, err) + require.Empty(t, pairs, "account fragments must be buffered, not emitted by Translate") + + finalized := tr.Finalize() + require.Len(t, finalized, 1) + + expectedKey := accountPhysKey(addr) + require.Equal(t, expectedKey, finalized[0].Key) + + got, err := vtype.DeserializeAccountData(finalized[0].Value) + require.NoError(t, err) + require.Equal(t, uint64(7), got.GetNonce()) + require.Equal(t, importBlockHeight, got.GetBlockHeight()) + + var zero vtype.CodeHash + require.Equal(t, zero, *got.GetCodeHash(), "code hash must default to zero for EOA") +} + +func TestImportTranslator_CodeHashOnlyAccountEmittedByFinalize(t *testing.T) { + addr := addrN(0x44) + ch := codeHashN(0xCD) + + tr := NewImportTranslator(importBlockHeight) + pairs, err := tr.Translate(namedCS(codeHashPair(addr, ch))) + require.NoError(t, err) + require.Empty(t, pairs) + + finalized := tr.Finalize() + require.Len(t, finalized, 1) + + got, err := vtype.DeserializeAccountData(finalized[0].Value) + require.NoError(t, err) + require.Equal(t, ch, *got.GetCodeHash()) + require.Equal(t, uint64(0), got.GetNonce(), "nonce must default to zero") +} + +func TestImportTranslator_NonceAndCodeHashSameCallMerge(t *testing.T) { + addr := addrN(0x42) + ch := codeHashN(0xAB) + + tr := NewImportTranslator(importBlockHeight) + pairs, err := tr.Translate(namedCS( + noncePair(addr, 9), + codeHashPair(addr, ch), + )) + require.NoError(t, err) + require.Empty(t, pairs) + + finalized := tr.Finalize() + require.Len(t, finalized, 1) + + got, err := vtype.DeserializeAccountData(finalized[0].Value) + require.NoError(t, err) + require.Equal(t, uint64(9), got.GetNonce()) + require.Equal(t, ch, *got.GetCodeHash()) +} + +func TestImportTranslator_NonceAndCodeHashCrossCallMerge(t *testing.T) { + addr := addrN(0x42) + ch := codeHashN(0xAB) + + tr := NewImportTranslator(importBlockHeight) + _, err := tr.Translate(namedCS(noncePair(addr, 9))) + require.NoError(t, err) + + _, err = tr.Translate(namedCS(codeHashPair(addr, ch))) + require.NoError(t, err) + + finalized := tr.Finalize() + require.Len(t, finalized, 1, "fragments split across calls must merge into one account") + + got, err := vtype.DeserializeAccountData(finalized[0].Value) + require.NoError(t, err) + require.Equal(t, uint64(9), got.GetNonce()) + require.Equal(t, ch, *got.GetCodeHash()) +} + +func TestImportTranslator_DropsDeletes(t *testing.T) { + addr := addrN(0x42) + slot := slotN(0x01) + + tr := NewImportTranslator(importBlockHeight) + pairs, err := tr.Translate(namedCS( + storageDeletePair(addr, slot), + codeDeletePair(addr), + nonceDeletePair(addr), + codeHashDeletePair(addr), + )) + require.NoError(t, err) + require.Empty(t, pairs) + require.Empty(t, tr.Finalize(), "deletes must not produce any account either") +} + +func TestImportTranslator_RejectsEmptyKey(t *testing.T) { + tr := NewImportTranslator(importBlockHeight) + cs := &proto.NamedChangeSet{ + Name: keys.EVMStoreKey, + Changeset: proto.ChangeSet{Pairs: []*proto.KVPair{ + {Key: nil, Value: []byte{0x01}}, + }}, + } + _, err := tr.Translate(cs) + require.Error(t, err) +} + +func TestImportTranslator_RejectsInvalidNonce(t *testing.T) { + addr := addrN(0x42) + tr := NewImportTranslator(importBlockHeight) + cs := &proto.NamedChangeSet{ + Name: keys.EVMStoreKey, + Changeset: proto.ChangeSet{Pairs: []*proto.KVPair{ + {Key: keys.BuildEVMKey(keys.EVMKeyNonce, addr[:]), Value: []byte{0x01, 0x02}}, + }}, + } + _, err := tr.Translate(cs) + require.Error(t, err) + require.Contains(t, err.Error(), "nonce") +} + +func TestImportTranslator_StorageAndAccountInOneCall(t *testing.T) { + addr := addrN(0x42) + slot := slotN(0x07) + ch := codeHashN(0xAB) + + tr := NewImportTranslator(importBlockHeight) + pairs, err := tr.Translate(namedCS( + storagePair(addr, slot, []byte{0x2A}), + noncePair(addr, 7), + codeHashPair(addr, ch), + )) + require.NoError(t, err) + require.Len(t, pairs, 1, "storage emitted immediately; account fragments buffered") + + storagePair := findPair(t, pairs, storagePhysKey(addr, slot)) + storageGot, err := vtype.DeserializeStorageData(storagePair.Value) + require.NoError(t, err) + require.Equal(t, padLeft32(0x2A), storageGot.GetValue()[:]) + + finalized := tr.Finalize() + require.Len(t, finalized, 1) + acctGot, err := vtype.DeserializeAccountData(finalized[0].Value) + require.NoError(t, err) + require.Equal(t, uint64(7), acctGot.GetNonce()) + require.Equal(t, ch, *acctGot.GetCodeHash()) +} + +func TestImportTranslator_FinalizeClearsBuffer(t *testing.T) { + addr := addrN(0x42) + tr := NewImportTranslator(importBlockHeight) + _, err := tr.Translate(namedCS(noncePair(addr, 1))) + require.NoError(t, err) + + first := tr.Finalize() + require.Len(t, first, 1) + second := tr.Finalize() + require.Empty(t, second, "Finalize must be idempotent on an exhausted translator") +} diff --git a/sei-db/state_db/sc/flatkv/importer.go b/sei-db/state_db/sc/flatkv/importer.go index aef128b359..e3bb23a66a 100644 --- a/sei-db/state_db/sc/flatkv/importer.go +++ b/sei-db/state_db/sc/flatkv/importer.go @@ -2,6 +2,7 @@ package flatkv import ( "context" + "errors" "fmt" "sync" "sync/atomic" @@ -21,6 +22,12 @@ const ( var _ types.Importer = (*KVImporter)(nil) +// flushHookForTest, when set by tests in this package, is invoked at the +// start of every dbWorker flush. It exists solely for whitebox tests of +// the backpressure / fail-fast paths (see importer_test.go) and is nil in +// production. The cost on the hot path is a single nil load per flush. +var flushHookForTest func(dir string) + // dbWorker owns a single PebbleDB and its LtHash accumulation. It reads // key/value pairs from its channel, buffers them into a PebbleDB batch, // and flushes (commit + LtHash update) when the buffer is full or the @@ -87,6 +94,9 @@ func (w *dbWorker) flush() (err error) { if len(w.ltPairs) == 0 { return nil } + if hook := flushHookForTest; hook != nil { + hook(w.dir) + } start := time.Now() pairCount := len(w.ltPairs) defer func() { @@ -125,9 +135,11 @@ type KVImporter struct { // done is closed on the first pipeline error so that AddNode, // the dispatcher, and all workers bail immediately. - done chan struct{} - closeOnce sync.Once - firstErr atomic.Pointer[error] + done chan struct{} + closeOnce sync.Once + firstErr atomic.Pointer[error] + finishOnce sync.Once + finishErr error } func NewKVImporter(store *CommitStore, version int64) types.Importer { @@ -214,6 +226,10 @@ func (imp *KVImporter) getErr() error { return *p } +func (imp *KVImporter) Err() error { + return imp.getErr() +} + func (imp *KVImporter) AddModule(_ string) error { return nil } @@ -228,53 +244,86 @@ func (imp *KVImporter) AddNode(node *types.SnapshotNode) { } } -func (imp *KVImporter) Close() (err error) { - start := time.Now() - defer func() { - otelMetrics.ImportLatency.Record(imp.store.ctx, secondsSince(start), - metric.WithAttributes(successAttr(err))) - flushes, pairs := imp.importStats() - if err == nil { - otelMetrics.CurrentVersion.Record(imp.store.ctx, imp.store.committedVersion) - otelMetrics.CurrentSnapshotHeight.Record(imp.store.ctx, imp.store.committedVersion) - logger.Info("FlatKV import complete", - "version", imp.version, - "flushes", flushes, - "pairs", pairs, - "elapsed", time.Since(start)) - } else { - logger.Error("FlatKV import failed", - "version", imp.version, - "flushes", flushes, - "pairs", pairs, - "elapsed", time.Since(start), - "err", err) - } - }() +// Abort tears down the worker pipeline without finalizing the import. +// It records reason as the first pipeline error (so any in-flight worker +// also bails fast) and then runs Close, which observes the non-nil error +// and skips FinalizeImport / WriteSnapshot. The on-disk FlatKV directory +// is left at its pre-import committed version, allowing the operator to +// retry without --force. +// +// Use this when an external error (context cancellation, exporter +// failure, translator failure, etc.) makes the in-progress import +// unsafe to commit. Abort is idempotent and safe to interleave with +// Close: whichever runs first wins; later calls are no-ops. +func (imp *KVImporter) Abort(reason error) error { + if reason == nil { + reason = errors.New("flatkv import aborted") + } + imp.setErr(reason) + return imp.Close() +} - close(imp.ingestCh) - imp.wg.Wait() +// Close is idempotent: the first call drains workers, finalizes the import, +// and writes a snapshot; subsequent calls just return the cached result. +// Idempotency is required because the import-from-memiavl tool may invoke +// Close on both the success and error paths. +// +// If the first pipeline error has already been recorded (either by a +// worker or by Abort), Close skips FinalizeImport / WriteSnapshot so the +// store stays at its pre-import version. +func (imp *KVImporter) Close() error { + imp.finishOnce.Do(func() { + start := time.Now() + var err error + defer func() { + otelMetrics.ImportLatency.Record(imp.store.ctx, secondsSince(start), + metric.WithAttributes(successAttr(err))) + flushes, pairs := imp.importStats() + if err == nil { + otelMetrics.CurrentVersion.Record(imp.store.ctx, imp.store.committedVersion) + otelMetrics.CurrentSnapshotHeight.Record(imp.store.ctx, imp.store.committedVersion) + logger.Info("FlatKV import complete", + "version", imp.version, + "flushes", flushes, + "pairs", pairs, + "elapsed", time.Since(start)) + } else { + logger.Error("FlatKV import failed", + "version", imp.version, + "flushes", flushes, + "pairs", pairs, + "elapsed", time.Since(start), + "err", err) + } + imp.finishErr = err + }() - if err := imp.getErr(); err != nil { - return err - } + close(imp.ingestCh) + imp.wg.Wait() - for _, w := range imp.workers { - imp.store.perDBWorkingLtHash[w.dir] = w.ltHash - } + if err = imp.getErr(); err != nil { + return + } - if err := imp.store.FinalizeImport(imp.version); err != nil { - return fmt.Errorf("failed to finalize import: %w", err) - } + for _, w := range imp.workers { + imp.store.perDBWorkingLtHash[w.dir] = w.ltHash + } - // Write a snapshot so the imported data survives store reopen / restart. - // Import bypasses the WAL, so without a snapshot the next LoadVersion - // would clone from the pre-import snapshot and lose all imported data. - if err := imp.store.WriteSnapshot(""); err != nil { - return fmt.Errorf("failed to import when writing snapshot: %w", err) - } + if err = imp.store.FinalizeImport(imp.version); err != nil { + err = fmt.Errorf("failed to finalize import: %w", err) + return + } - return nil + // Write a snapshot so the imported data survives store reopen / restart. + // Import bypasses the WAL, so without a snapshot the next LoadVersion + // would clone from the pre-import snapshot and lose all imported data. + if err = imp.store.WriteSnapshot(""); err != nil { + err = fmt.Errorf("failed to import when writing snapshot: %w", err) + return + } + }) + + return imp.finishErr } func (imp *KVImporter) importStats() (flushes int64, pairs int64) { diff --git a/sei-db/state_db/sc/flatkv/importer_test.go b/sei-db/state_db/sc/flatkv/importer_test.go new file mode 100644 index 0000000000..4a1fc19095 --- /dev/null +++ b/sei-db/state_db/sc/flatkv/importer_test.go @@ -0,0 +1,387 @@ +package flatkv + +import ( + "errors" + "sync/atomic" + "testing" + "time" + + "github.com/sei-protocol/sei-chain/sei-db/state_db/sc/flatkv/ktype" + "github.com/sei-protocol/sei-chain/sei-db/state_db/sc/types" + "github.com/stretchr/testify/require" +) + +// ============================================================================= +// KVImporter concurrency / lifecycle tests +// +// These tests exercise paths that the higher-level Export → Import round-trip +// tests in import_export_test.go don't reach: +// * Close idempotency (finishOnce) +// * Err() return value across the success / error / post-Close lifecycle +// * setErr fail-fast atomicity (firstErr CAS + closeOnce(done)) +// * AddNode after the done channel is closed (must not block) +// * Multi-flush behavior under load larger than importBatchSize +// ============================================================================= + +func newKVImporterForTest(t *testing.T, version int64) (*CommitStore, *KVImporter) { + t.Helper() + s := setupTestStore(t) + imp, err := s.Importer(version) + require.NoError(t, err) + kvi, ok := imp.(*KVImporter) + require.True(t, ok, "expected *KVImporter, got %T", imp) + return s, kvi +} + +// TestKVImporter_CloseIdempotent_HappyPath verifies that Close can be called +// multiple times after a successful import without panicking on a re-close of +// ingestCh and that every call returns the same (nil) finishErr. +func TestKVImporter_CloseIdempotent_HappyPath(t *testing.T) { + s, imp := newKVImporterForTest(t, 1) + defer func() { require.NoError(t, s.Close()) }() + + imp.AddNode(&types.SnapshotNode{ + Key: storagePhysKey(addrN(0x01), slotN(0x01)), + Value: padLeft32(0x11), + Version: 1, + }) + + require.NoError(t, imp.Close()) + require.NoError(t, imp.Close(), "second Close must not panic and must return the same nil result") + require.NoError(t, imp.Close(), "third Close must remain idempotent") + require.NoError(t, imp.Err(), "Err() should report no error after a successful import") +} + +// TestKVImporter_CloseIdempotent_AfterError verifies double-Close after a +// fail-fast error: the first Close drains the pipeline and surfaces the error; +// subsequent Close calls must return the cached finishErr without re-closing +// ingestCh (which would panic). +func TestKVImporter_CloseIdempotent_AfterError(t *testing.T) { + s, imp := newKVImporterForTest(t, 1) + defer func() { require.NoError(t, s.Close()) }() + + imp.AddNode(&types.SnapshotNode{ + Key: []byte{0xDE, 0xAD}, + Value: []byte{0x01}, + Version: 1, + }) + + first := imp.Close() + require.Error(t, first) + require.Contains(t, first.Error(), "route key") + + second := imp.Close() + require.Error(t, second) + require.Equal(t, first, second, "subsequent Close must return the same cached error") + + third := imp.Close() + require.Equal(t, first, third) +} + +// TestKVImporter_ErrLifecycle locks in the contract that Err() returns the +// first pipeline error as soon as it propagates, before Close is invoked. +// This is the path the seidb tool relies on to short-circuit a failing import +// without forcing a full Close. +func TestKVImporter_ErrLifecycle(t *testing.T) { + s, imp := newKVImporterForTest(t, 1) + defer func() { require.NoError(t, s.Close()) }() + + require.NoError(t, imp.Err(), "Err() should be nil before any pipeline error") + + imp.AddNode(&types.SnapshotNode{ + Key: []byte{0xDE, 0xAD}, + Value: []byte{0x01}, + Version: 1, + }) + + deadline := time.Now().Add(2 * time.Second) + for time.Now().Before(deadline) { + if imp.Err() != nil { + break + } + time.Sleep(5 * time.Millisecond) + } + require.Error(t, imp.Err(), "Err() must surface the route-key error from the dispatcher") + require.Contains(t, imp.Err().Error(), "route key") + + closeErr := imp.Close() + require.ErrorIs(t, closeErr, imp.Err(), + "Close result must mirror Err() once the pipeline has already failed") + + require.Equal(t, closeErr, imp.Err(), + "Err() must remain stable after Close; it returns the cached firstErr, not finishErr") +} + +// TestKVImporter_SetErrAtomicCAS exercises setErr directly to lock the +// CompareAndSwap-based fail-fast invariant: only the first error is recorded, +// and the done channel is closed exactly once even if setErr races. Without +// this, a worker that errors out after another worker already did would +// clobber firstErr and double-close done (panic). +func TestKVImporter_SetErrAtomicCAS(t *testing.T) { + s, imp := newKVImporterForTest(t, 1) + defer func() { require.NoError(t, s.Close()) }() + + first := errors.New("first error") + second := errors.New("second error") + + imp.setErr(first) + require.ErrorIs(t, imp.Err(), first) + + imp.setErr(second) + require.ErrorIs(t, imp.Err(), first, "subsequent setErr calls must not overwrite firstErr") + + select { + case <-imp.done: + default: + t.Fatalf("done channel must be closed after the first setErr") + } + + imp.setErr(errors.New("third error")) +} + +// TestKVImporter_AddNodeAfterDoneDoesNotBlock guards the AddNode select arm: +// once setErr fires and closes done, AddNode must exit via <-imp.done instead +// of blocking on a full ingestCh. We saturate ingestCh first by sending more +// pairs than its buffer, then trip the error and assert that further AddNode +// calls return promptly. +func TestKVImporter_AddNodeAfterDoneDoesNotBlock(t *testing.T) { + s, imp := newKVImporterForTest(t, 1) + defer func() { require.NoError(t, s.Close()) }() + + imp.setErr(errors.New("synthetic test error")) + + done := make(chan struct{}) + go func() { + defer close(done) + for i := 0; i < ingestChanSize+1024; i++ { + imp.AddNode(&types.SnapshotNode{ + Key: storagePhysKey(addrN(0x01), slotN(0x01)), + Value: padLeft32(0x11), + Version: 1, + }) + } + }() + + select { + case <-done: + case <-time.After(5 * time.Second): + t.Fatalf("AddNode blocked after done was closed; fail-fast path is broken") + } +} + +// TestKVImporter_LargeImportTriggersMultipleFlushes drives more than +// importBatchSize pairs through a single worker so that flush() is invoked +// repeatedly. Without this, the existing happy-path tests only ever hit +// flush once (at Close), which masks any regression in the +// pairs >= importBatchSize branch. +func TestKVImporter_LargeImportTriggersMultipleFlushes(t *testing.T) { + if testing.Short() { + t.Skip("skipping large-import test in -short mode") + } + + const totalPairs = importBatchSize*3 + 100 + s, imp := newKVImporterForTest(t, 1) + defer func() { require.NoError(t, s.Close()) }() + + for i := 0; i < totalPairs; i++ { + var addr ktype.Address + addr[16] = byte(i >> 16) + addr[17] = byte(i >> 8) + addr[18] = byte(i) + var slot ktype.Slot + slot[29] = byte(i >> 16) + slot[30] = byte(i >> 8) + slot[31] = byte(i) + imp.AddNode(&types.SnapshotNode{ + Key: storagePhysKey(addr, slot), + Value: padLeft32(byte(i & 0xFF)), + Version: 1, + }) + } + + require.NoError(t, imp.Close()) + + flushes, pairs := imp.importStats() + require.Equal(t, int64(totalPairs), pairs, "all pairs must be accounted for in importStats") + require.GreaterOrEqual(t, flushes, int64(3), + "importBatchSize=%d * 3 + 100 storage pairs must trigger at least 3 mid-pipeline flushes (got %d)", + importBatchSize, flushes) +} + +// TestKVImporter_AbortSkipsFinalize locks in the contract that Abort tears +// down the worker pipeline WITHOUT finalizing the import: the underlying +// CommitStore must remain at its pre-import committed version, so a +// failed offline migration can be retried without --force. +// +// Without this guarantee, the seidb import-flatkv-from-memiavl tool's +// deferred Close would commit whatever pairs happened to be buffered when +// an external error (ctx cancellation, exporter failure, translator +// failure) tripped, leaving FlatKV at the target version with only a +// partial copy of the source state. +func TestKVImporter_AbortSkipsFinalize(t *testing.T) { + s, imp := newKVImporterForTest(t, 1) + defer func() { require.NoError(t, s.Close()) }() + + preVersion := s.Version() + + // Add a couple of valid pairs so there is real buffered work that the + // happy-path Close would have committed. + imp.AddNode(&types.SnapshotNode{ + Key: storagePhysKey(addrN(0x01), slotN(0x01)), + Value: padLeft32(0x11), + Version: 1, + }) + imp.AddNode(&types.SnapshotNode{ + Key: storagePhysKey(addrN(0x02), slotN(0x02)), + Value: padLeft32(0x22), + Version: 1, + }) + + abortReason := errors.New("synthetic external abort") + require.ErrorIs(t, imp.Abort(abortReason), abortReason, + "Abort must surface the supplied reason") + + require.ErrorIs(t, imp.Err(), abortReason, + "Err() must report the abort reason") + require.ErrorIs(t, imp.Close(), abortReason, + "Close after Abort must be a no-op returning the cached error") + + require.Equal(t, preVersion, s.Version(), + "Abort must not advance the store's committed version (no FinalizeImport)") +} + +// TestKVImporter_AbortNilReasonStillAborts ensures Abort with nil substitutes +// a generic reason rather than silently no-op'ing into a finalize. +func TestKVImporter_AbortNilReasonStillAborts(t *testing.T) { + s, imp := newKVImporterForTest(t, 1) + defer func() { require.NoError(t, s.Close()) }() + + preVersion := s.Version() + + imp.AddNode(&types.SnapshotNode{ + Key: storagePhysKey(addrN(0x01), slotN(0x01)), + Value: padLeft32(0x11), + Version: 1, + }) + + require.Error(t, imp.Abort(nil), "Abort(nil) must still surface a non-nil reason") + require.Error(t, imp.Err()) + require.Equal(t, preVersion, s.Version(), + "Abort(nil) must not finalize the import") +} + +// TestKVImporter_AbortAfterCloseIsNoop confirms the finishOnce contract: +// once Close has finalized successfully, a later Abort cannot retroactively +// invalidate the committed state. The store stays advanced; the abort +// reason is not surfaced through Err(). +func TestKVImporter_AbortAfterCloseIsNoop(t *testing.T) { + s, imp := newKVImporterForTest(t, 1) + defer func() { require.NoError(t, s.Close()) }() + + imp.AddNode(&types.SnapshotNode{ + Key: storagePhysKey(addrN(0x01), slotN(0x01)), + Value: padLeft32(0x11), + Version: 1, + }) + + require.NoError(t, imp.Close()) + postCloseVersion := s.Version() + require.Equal(t, int64(1), postCloseVersion, "successful Close must advance the store") + + require.NoError(t, imp.Abort(errors.New("too late")), + "Abort after a successful Close must return the cached nil result") + require.Equal(t, postCloseVersion, s.Version(), + "Abort cannot rewind a committed version") +} + +// TestKVImporter_BackpressureBlocksProducerUntilWorkersDrain explicitly +// exercises the backpressure path. It gates every dbWorker.flush() on a +// release channel via flushHookForTest, sends enough pairs to overflow +// ingestCh + worker.ch + the in-flight worker batch, and asserts that: +// +// 1. While flushes are gated, the producer goroutine is observably blocked +// (i.e. AddNode is sitting on its <-imp.ingestCh send arm) — the +// producer does NOT finish even after a soak period. +// 2. After the gate is released, the producer drains, Close succeeds, and +// every pair is persisted. +// +// Without this test the only coverage of true backpressure is incidental +// (TestImportMemiavlModulesToFlatKVHandlesLargeDataset). A regression that +// broke AddNode's <-imp.done arm or the dispatcher's worker.ch select +// would silently pass as long as data still landed correctly. +func TestKVImporter_BackpressureBlocksProducerUntilWorkersDrain(t *testing.T) { + if testing.Short() { + t.Skip("skipping backpressure test in -short mode") + } + + release := make(chan struct{}) + var flushObserved atomic.Int64 + flushHookForTest = func(string) { + flushObserved.Add(1) + <-release + } + t.Cleanup(func() { flushHookForTest = nil }) + + s, imp := newKVImporterForTest(t, 1) + defer func() { require.NoError(t, s.Close()) }() + + // Producer can advance at most ingestChanSize + workerChanSize + + // importBatchSize pairs before any worker drains. Send strictly more + // than that so AddNode is forced to block once flushes are gated. + const totalPairs = ingestChanSize + workerChanSize + importBatchSize + 8192 + + producerDone := make(chan struct{}) + go func() { + defer close(producerDone) + for i := 0; i < totalPairs; i++ { + var addr ktype.Address + addr[16] = byte(i >> 16) + addr[17] = byte(i >> 8) + addr[18] = byte(i) + var slot ktype.Slot + slot[29] = byte(i >> 16) + slot[30] = byte(i >> 8) + slot[31] = byte(i) + imp.AddNode(&types.SnapshotNode{ + Key: storagePhysKey(addr, slot), + Value: padLeft32(byte(i & 0xFF)), + Version: 1, + }) + } + }() + + // Wait for the first flush to hit the gate. By this point the storage + // worker has consumed importBatchSize pairs and the producer is racing + // ahead to fill ingestCh. + deadline := time.Now().Add(5 * time.Second) + for flushObserved.Load() == 0 { + if time.Now().After(deadline) { + t.Fatalf("no flush observed within 5s; worker pipeline not running") + } + time.Sleep(time.Millisecond) + } + + // Soak: give the producer ample time to fill ingestCh and block on + // AddNode. If backpressure works, producerDone must NOT be closed yet. + time.Sleep(200 * time.Millisecond) + select { + case <-producerDone: + t.Fatalf("producer finished while flushes were gated; backpressure was not exercised") + default: + } + + close(release) + + select { + case <-producerDone: + case <-time.After(30 * time.Second): + t.Fatalf("producer never finished after gate release; pipeline deadlocked") + } + + require.NoError(t, imp.Close()) + + flushes, pairs := imp.importStats() + require.Equal(t, int64(totalPairs), pairs, "every pair must be persisted") + require.GreaterOrEqual(t, flushes, int64(2), + "expected multiple flushes for %d storage pairs (got %d)", totalPairs, flushes) +} diff --git a/sei-db/tools/cmd/seidb/main.go b/sei-db/tools/cmd/seidb/main.go index 8a026d59f9..1b19e0a0f1 100644 --- a/sei-db/tools/cmd/seidb/main.go +++ b/sei-db/tools/cmd/seidb/main.go @@ -26,6 +26,7 @@ func main() { operations.DumpIAVLCmd(), operations.DumpFlatKVCmd(), operations.StateSizeCmd(), + operations.ImportFlatKVFromMemiavlCmd(), operations.ReplayChangelogCmd(), operations.TraceProfileReportCmd()) if err := rootCmd.Execute(); err != nil { diff --git a/sei-db/tools/cmd/seidb/operations/import_flatkv_from_memiavl.go b/sei-db/tools/cmd/seidb/operations/import_flatkv_from_memiavl.go new file mode 100644 index 0000000000..b42a4a38dd --- /dev/null +++ b/sei-db/tools/cmd/seidb/operations/import_flatkv_from_memiavl.go @@ -0,0 +1,305 @@ +package operations + +import ( + "context" + "errors" + "fmt" + "math" + "os" + "path/filepath" + "strings" + + errorutils "github.com/sei-protocol/sei-chain/sei-db/common/errors" + "github.com/sei-protocol/sei-chain/sei-db/common/keys" + "github.com/sei-protocol/sei-chain/sei-db/common/utils" + "github.com/sei-protocol/sei-chain/sei-db/proto" + "github.com/sei-protocol/sei-chain/sei-db/state_db/sc/flatkv" + flatkvconfig "github.com/sei-protocol/sei-chain/sei-db/state_db/sc/flatkv/config" + "github.com/sei-protocol/sei-chain/sei-db/state_db/sc/memiavl" + sctypes "github.com/sei-protocol/sei-chain/sei-db/state_db/sc/types" + "github.com/spf13/cobra" +) + +// importBatchSize bounds how many memiavl key/value pairs we hand to a single +// flatkv.ImportTranslator.Translate call. Batching amortizes the per-call +// classifyAndPrefix map allocations across many keys without growing +// ImportTranslator's account-buffer memory beyond what an unbatched stream +// would already need. +const importBatchSize = 2048 + +// ImportFlatKVFromMemiavlCmd imports selected memiavl modules into FlatKV. +// +// Initial production scope is intentionally narrow: only the evm module is +// accepted. Non-EVM modules remain in memiavl and are not copied into FlatKV. +// Importing resets FlatKV and replaces it with the selected memiavl data; the +// CLI refuses to run over existing FlatKV data unless --force is supplied. +func ImportFlatKVFromMemiavlCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "import-flatkv-from-memiavl", + Short: "Import selected memiavl modules into FlatKV", + Long: strings.TrimSpace(`Import selected memiavl modules into FlatKV. + +WARNING: this restore-style import resets the FlatKV directory before loading +the imported rows. If FlatKV already has committed data, the command refuses to +run unless --force is supplied.`), + RunE: func(cmd *cobra.Command, args []string) error { + homeDir, _ := cmd.Flags().GetString("home") + dataDir, _ := cmd.Flags().GetString("data-dir") + modules, _ := cmd.Flags().GetStringSlice("modules") + height, _ := cmd.Flags().GetInt64("height") + force, _ := cmd.Flags().GetBool("force") + + resolvedHome, err := resolveSeiHome(homeDir, dataDir) + if err != nil { + return err + } + modules, err = normalizeImportModules(modules) + if err != nil { + return err + } + if height < 0 { + return fmt.Errorf("height %d out of range", height) + } + + return importMemiavlModulesToFlatKV(cmd.Context(), resolvedHome, modules, height, force) + }, + } + cmd.Flags().String("home", "", "Sei home directory. Defaults to $HOME/.sei") + cmd.Flags().String("data-dir", "", "Sei data directory or home directory. If the basename is data, its parent is used as home") + cmd.Flags().StringSlice("modules", []string{keys.EVMStoreKey}, "Comma-separated module names to import. Initial production scope supports only evm") + cmd.Flags().Int64("height", 0, "memiavl version to import. 0 means latest") + cmd.Flags().Bool("force", false, "Overwrite existing committed FlatKV data") + return cmd +} + +func resolveSeiHome(homeDir, dataDir string) (string, error) { + if homeDir != "" { + return filepath.Abs(homeDir) + } + if dataDir != "" { + clean := filepath.Clean(dataDir) + if filepath.Base(clean) == "data" { + return filepath.Abs(filepath.Dir(clean)) + } + return filepath.Abs(clean) + } + home, err := os.UserHomeDir() + if err != nil { + return "", fmt.Errorf("failed to resolve user home: %w", err) + } + return filepath.Join(home, ".sei"), nil +} + +func normalizeImportModules(modules []string) ([]string, error) { + if len(modules) == 0 { + modules = []string{keys.EVMStoreKey} + } + seen := make(map[string]struct{}, len(modules)) + normalized := make([]string, 0, len(modules)) + for _, module := range modules { + for _, part := range strings.Split(module, ",") { + name := strings.TrimSpace(part) + if name == "" { + continue + } + if name != keys.EVMStoreKey { + return nil, fmt.Errorf("module %q is not supported yet; initial import scope is evm-only", name) + } + if _, ok := seen[name]; ok { + continue + } + seen[name] = struct{}{} + normalized = append(normalized, name) + } + } + if len(normalized) == 0 { + return nil, errors.New("at least one module must be specified") + } + return normalized, nil +} + +// importerErr surfaces any pipeline error the FlatKV importer's worker +// goroutines have already recorded, so the import loop can fail-fast +// between exporter reads instead of waiting until Close. Err() is only +// defined on *flatkv.KVImporter (the only concrete Importer this CLI +// hands data to); other Importer implementations don't have an async +// pipeline that could surface mid-stream errors. +func importerErr(importer sctypes.Importer) error { + if kvi, ok := importer.(*flatkv.KVImporter); ok { + return kvi.Err() + } + return nil +} + +// emitPairs forwards translator output to the FlatKV importer, returning the +// number of pairs written. +func emitPairs(importer sctypes.Importer, pairs []flatkv.PhysicalKVPair, height int64) int64 { + for _, p := range pairs { + importer.AddNode(&sctypes.SnapshotNode{ + Key: p.Key, + Value: p.Value, + Version: height, + Height: 0, + }) + } + return int64(len(pairs)) +} + +func importMemiavlModulesToFlatKV(ctx context.Context, homeDir string, modules []string, height int64, force bool) (err error) { + cosmosDir := utils.GetCosmosSCStorePath(homeDir) + if height == 0 { + latest, err := memiavl.GetLatestVersion(cosmosDir) + if err != nil { + return fmt.Errorf("failed to resolve latest memiavl version from %s: %w", cosmosDir, err) + } + height = latest + } + if height <= 0 { + return fmt.Errorf("height must be positive after resolution, got %d", height) + } + if height > math.MaxUint32 { + return fmt.Errorf("height %d out of range", height) + } + + moduleSet := make(map[string]struct{}, len(modules)) + for _, module := range modules { + moduleSet[module] = struct{}{} + } + + cfg := flatkvconfig.DefaultConfig() + cfg.DataDir = utils.GetFlatKVPath(homeDir) + store, err := flatkv.NewCommitStore(ctx, cfg) + if err != nil { + return fmt.Errorf("failed to create FlatKV store: %w", err) + } + defer func() { _ = store.Close() }() + if _, err := store.LoadVersion(0, false); err != nil { + return fmt.Errorf("failed to open FlatKV store: %w", err) + } + + if store.Version() > 0 { + if !force { + return fmt.Errorf("FlatKV store at %s already has committed version %d; rerun with --force to overwrite it", + cfg.DataDir, store.Version()) + } + fmt.Printf("WARNING: --force set; overwriting existing FlatKV store at %s (current version %d)\n", + cfg.DataDir, store.Version()) + } + + exporter, err := memiavl.NewMultiTreeExporter(cosmosDir, uint32(height), false) //nolint:gosec // height range checked above + if err != nil { + return fmt.Errorf("failed to open memiavl exporter at height %d: %w", height, err) + } + defer func() { _ = exporter.Close() }() + + importer, err := store.Importer(height) + if err != nil { + return fmt.Errorf("failed to create FlatKV importer at height %d: %w", height, err) + } + // On the failure path we must NOT finalize: KVImporter.Close otherwise + // commits whatever pairs were already buffered, leaving FlatKV at the + // target version with only a partial copy of the source state. Route + // errors through Abort instead, which records the failure on the + // importer and then drains workers without writing a snapshot. On the + // success path the explicit Close below has already run, so the + // deferred Close here is just an idempotent safety net. + defer func() { + if err != nil { + if kvi, ok := importer.(*flatkv.KVImporter); ok { + _ = kvi.Abort(err) + return + } + } + _ = importer.Close() + }() + + translator := flatkv.NewImportTranslator(height) + batch := &proto.NamedChangeSet{ + Changeset: proto.ChangeSet{Pairs: make([]*proto.KVPair, 0, importBatchSize)}, + } + var written int64 + flush := func() error { + if len(batch.Changeset.Pairs) == 0 { + return nil + } + pairs, err := translator.Translate(batch) + if err != nil { + return fmt.Errorf("translate batch (module=%s): %w", batch.Name, err) + } + written += emitPairs(importer, pairs, height) + batch.Changeset.Pairs = batch.Changeset.Pairs[:0] + return nil + } + + var currentModule string + var imported int64 + moduleCounts := make(map[string]int64, len(modules)) + for { + if err := ctx.Err(); err != nil { + return fmt.Errorf("import interrupted: %w", err) + } + if err := importerErr(importer); err != nil { + return fmt.Errorf("FlatKV import failed: %w", err) + } + + item, err := exporter.Next() + if err != nil { + if errors.Is(err, errorutils.ErrorExportDone) { + break + } + return fmt.Errorf("failed to export memiavl data: %w", err) + } + switch v := item.(type) { + case string: + if err := flush(); err != nil { + return err + } + currentModule = v + batch.Name = currentModule + if _, ok := moduleSet[currentModule]; ok { + if err := importer.AddModule(keys.FlatKVStoreKey); err != nil { + return fmt.Errorf("failed to add FlatKV import module: %w", err) + } + } + case *sctypes.SnapshotNode: + if _, ok := moduleSet[currentModule]; !ok { + continue + } + if v == nil || v.Height != 0 || v.Value == nil { + continue + } + batch.Changeset.Pairs = append(batch.Changeset.Pairs, &proto.KVPair{ + Key: v.Key, + Value: v.Value, + }) + imported++ + moduleCounts[currentModule]++ + if len(batch.Changeset.Pairs) >= importBatchSize { + if err := flush(); err != nil { + return err + } + } + default: + return fmt.Errorf("unexpected export item type %T", item) + } + } + if err := flush(); err != nil { + return err + } + + if err := ctx.Err(); err != nil { + return fmt.Errorf("import interrupted: %w", err) + } + if err := importerErr(importer); err != nil { + return fmt.Errorf("FlatKV import failed: %w", err) + } + + written += emitPairs(importer, translator.Finalize(), height) + + if err := importer.Close(); err != nil { + return fmt.Errorf("failed to finalize FlatKV import: %w", err) + } + fmt.Printf("Imported %d memiavl key/value pairs into %d FlatKV rows from modules %v at height %d (per-module: %v)\n", + imported, written, modules, height, moduleCounts) + return nil +} diff --git a/sei-db/tools/cmd/seidb/operations/import_flatkv_from_memiavl_test.go b/sei-db/tools/cmd/seidb/operations/import_flatkv_from_memiavl_test.go new file mode 100644 index 0000000000..44d5b8aea7 --- /dev/null +++ b/sei-db/tools/cmd/seidb/operations/import_flatkv_from_memiavl_test.go @@ -0,0 +1,300 @@ +package operations + +import ( + "context" + "math" + "testing" + + "github.com/sei-protocol/sei-chain/sei-db/common/keys" + "github.com/sei-protocol/sei-chain/sei-db/common/utils" + "github.com/sei-protocol/sei-chain/sei-db/proto" + "github.com/sei-protocol/sei-chain/sei-db/state_db/sc/flatkv" + flatkvconfig "github.com/sei-protocol/sei-chain/sei-db/state_db/sc/flatkv/config" + "github.com/sei-protocol/sei-chain/sei-db/state_db/sc/flatkv/ktype" + "github.com/sei-protocol/sei-chain/sei-db/state_db/sc/memiavl" + "github.com/stretchr/testify/require" +) + +func TestImportMemiavlModulesToFlatKVEncodesEVMValues(t *testing.T) { + homeDir := t.TempDir() + addr := addrN(0x42) + eoaAddr := addrN(0x43) + contractOnlyAddr := addrN(0x44) + slot := slotN(0x07) + codeHash := codeHashOf(0xAB) + contractOnlyCodeHash := codeHashOf(0xCD) + bytecode := []byte{0x60, 0x2A, 0x60, 0x00, 0x52, 0x60, 0x20, 0x60, 0x00, 0xF3} + storageValue := padLeft32(0x2A) + nonceValue := uint64(7) + eoaNonceValue := uint64(1) + legacyKey := append([]byte{0x09}, addr[:]...) + legacyValue := []byte{0x00, 0x03} + + memStore := newTestMemiavlStore(t, homeDir) + require.NoError(t, memStore.ApplyChangeSets([]*proto.NamedChangeSet{{ + Name: keys.EVMStoreKey, + Changeset: proto.ChangeSet{Pairs: []*proto.KVPair{ + storagePair(addr, slot, 0x2A), + codePair(addr, bytecode), + noncePair(addr, nonceValue), + codeHashPair(addr, codeHash), + noncePair(eoaAddr, eoaNonceValue), + codeHashPair(contractOnlyAddr, contractOnlyCodeHash), + {Key: legacyKey, Value: legacyValue}, + }}, + }})) + version, err := memStore.Commit() + require.NoError(t, err) + require.Equal(t, int64(1), version) + require.NoError(t, memStore.Close()) + + require.NoError(t, importMemiavlModulesToFlatKV(context.Background(), homeDir, []string{keys.EVMStoreKey}, 0, false)) + + flatStore := openImportedFlatKVStore(t, homeDir) + defer func() { require.NoError(t, flatStore.Close()) }() + + gotStorage, found := flatStore.Get(keys.EVMStoreKey, keys.BuildEVMKey(keys.EVMKeyStorage, ktype.StorageKey(addr, slot))) + require.True(t, found) + require.Equal(t, storageValue, gotStorage) + + gotCode, found := flatStore.Get(keys.EVMStoreKey, keys.BuildEVMKey(keys.EVMKeyCode, addr[:])) + require.True(t, found) + require.Equal(t, bytecode, gotCode) + + gotNonce, found := flatStore.Get(keys.EVMStoreKey, keys.BuildEVMKey(keys.EVMKeyNonce, addr[:])) + require.True(t, found) + require.Equal(t, nonceBytesBE(nonceValue), gotNonce) + + gotCodeHash, found := flatStore.Get(keys.EVMStoreKey, keys.BuildEVMKey(keys.EVMKeyCodeHash, addr[:])) + require.True(t, found) + require.Equal(t, codeHash[:], gotCodeHash) + + gotEOANonce, found := flatStore.Get(keys.EVMStoreKey, keys.BuildEVMKey(keys.EVMKeyNonce, eoaAddr[:])) + require.True(t, found) + require.Equal(t, nonceBytesBE(eoaNonceValue), gotEOANonce) + + gotContractOnlyCodeHash, found := flatStore.Get(keys.EVMStoreKey, keys.BuildEVMKey(keys.EVMKeyCodeHash, contractOnlyAddr[:])) + require.True(t, found) + require.Equal(t, contractOnlyCodeHash[:], gotContractOnlyCodeHash) + + gotLegacy, found := flatStore.Get(keys.EVMStoreKey, legacyKey) + require.True(t, found) + require.Equal(t, legacyValue, gotLegacy) +} + +func TestImportMemiavlModulesToFlatKVRefusesExistingFlatKVWithoutForce(t *testing.T) { + homeDir := t.TempDir() + oldAddr := addrN(0x11) + newAddr := addrN(0x22) + + memStore := newTestMemiavlStore(t, homeDir) + require.NoError(t, memStore.ApplyChangeSets([]*proto.NamedChangeSet{{ + Name: keys.EVMStoreKey, + Changeset: proto.ChangeSet{Pairs: []*proto.KVPair{ + noncePair(newAddr, 7), + }}, + }})) + _, err := memStore.Commit() + require.NoError(t, err) + require.NoError(t, memStore.Close()) + + flatStore := newTestFlatKVStoreAtHome(t, homeDir) + require.NoError(t, flatStore.ApplyChangeSets([]*proto.NamedChangeSet{{ + Name: keys.EVMStoreKey, + Changeset: proto.ChangeSet{Pairs: []*proto.KVPair{ + noncePair(oldAddr, 9), + }}, + }})) + _, err = flatStore.Commit() + require.NoError(t, err) + require.NoError(t, flatStore.Close()) + + err = importMemiavlModulesToFlatKV(context.Background(), homeDir, []string{keys.EVMStoreKey}, 0, false) + require.Error(t, err) + require.Contains(t, err.Error(), "already has committed version") + require.Contains(t, err.Error(), "--force") + + flatStore = openImportedFlatKVStore(t, homeDir) + gotOldNonce, found := flatStore.Get(keys.EVMStoreKey, keys.BuildEVMKey(keys.EVMKeyNonce, oldAddr[:])) + require.True(t, found) + require.Equal(t, nonceBytesBE(9), gotOldNonce) + _, found = flatStore.Get(keys.EVMStoreKey, keys.BuildEVMKey(keys.EVMKeyNonce, newAddr[:])) + require.False(t, found) + require.NoError(t, flatStore.Close()) + + require.NoError(t, importMemiavlModulesToFlatKV(context.Background(), homeDir, []string{keys.EVMStoreKey}, 0, true)) + + flatStore = openImportedFlatKVStore(t, homeDir) + defer func() { require.NoError(t, flatStore.Close()) }() + _, found = flatStore.Get(keys.EVMStoreKey, keys.BuildEVMKey(keys.EVMKeyNonce, oldAddr[:])) + require.False(t, found) + gotNewNonce, found := flatStore.Get(keys.EVMStoreKey, keys.BuildEVMKey(keys.EVMKeyNonce, newAddr[:])) + require.True(t, found) + require.Equal(t, nonceBytesBE(7), gotNewNonce) +} + +func TestImportMemiavlModulesToFlatKVRejectsOutOfRangeResolvedHeight(t *testing.T) { + err := importMemiavlModulesToFlatKV(context.Background(), t.TempDir(), []string{keys.EVMStoreKey}, math.MaxUint32+1, false) + require.Error(t, err) + require.Contains(t, err.Error(), "out of range") +} + +// The CLI failure-path contract — that an interrupted import must NOT +// finalize partial state — is locked in at the unit level by the +// TestKVImporter_AbortSkipsFinalize / AbortNilReasonStillAborts / +// AbortAfterCloseIsNoop trio in sei-db/state_db/sc/flatkv/importer_test.go. +// importMemiavlModulesToFlatKV's defer just routes any non-nil return +// through (*flatkv.KVImporter).Abort, which those tests cover directly. +// +// A CLI-level test that exercises this end-to-end (e.g. ctx canceled +// mid-import) trips an unrelated pre-existing race in flatkv.LoadVersion's +// pebble-recovery / dbcache pool interaction; tracking it here would make +// this test brittle and is out of scope for the bug under fix. Once that +// race is addressed, this is a good spot to add an end-to-end variant. + +// TestImportMemiavlModulesToFlatKVHandlesLargeDataset exercises the +// memiavl→FlatKV pipeline at a scale large enough to: +// - cross the importBatchSize threshold inside KVImporter so that +// dbWorker.flush() fires multiple times instead of just once at Close +// - exercise dispatcher → worker channel backpressure with a steady +// stream of pairs across all four FlatKV bucket types +// - exercise the translator's cross-batch account merge buffer +// (nonce/codeHash for the same address may land in different +// translator batches at this volume) +// +// The smaller TestImportMemiavlModulesToFlatKVEncodesEVMValues only writes +// ~7 pairs so the batching/backpressure paths are never hit; this test +// fills that gap without a docker cluster. +// +// Sized to run in a few seconds: ~50K total pairs is enough to trip the +// 20K-pair flush threshold three times on the storage worker while +// keeping CI cost low. +func TestImportMemiavlModulesToFlatKVHandlesLargeDataset(t *testing.T) { + if testing.Short() { + t.Skip("skipping large-dataset import test in -short mode") + } + + const ( + numAddrs = 10000 + storagePerAddr = 4 + // totalPairs ≈ numAddrs*(nonce+codeHash+code) + numAddrs*storagePerAddr + // = 10000*3 + 10000*4 = 70000 source pairs (storage dominates). + ) + homeDir := t.TempDir() + + makeAddr := func(i int) ktype.Address { + var a ktype.Address + a[16] = byte(i >> 24) + a[17] = byte(i >> 16) + a[18] = byte(i >> 8) + a[19] = byte(i) + return a + } + makeSlot := func(i int) ktype.Slot { + var s ktype.Slot + s[28] = byte(i >> 24) + s[29] = byte(i >> 16) + s[30] = byte(i >> 8) + s[31] = byte(i) + return s + } + + // Helpers used below force every codeHash and storage value to have a + // non-zero low byte. flatkv treats an all-zero codeHash or storage value + // as a tombstone (Get returns false; IsDelete is true), which would + // silently drop legitimate test fixtures. + nonzeroByte := func(i int) byte { return byte((i & 0x7F) | 0x80) } + + pairs := make([]*proto.KVPair, 0, numAddrs*(3+storagePerAddr)) + for i := 0; i < numAddrs; i++ { + addr := makeAddr(i) + pairs = append(pairs, + noncePair(addr, uint64(i+1)), + codeHashPair(addr, codeHashOf(nonzeroByte(i))), + codePair(addr, []byte{0x60, byte(i & 0xFF), 0x00, 0x52, 0x60, 0x20, 0x60, 0x00, 0xF3}), + ) + for j := 0; j < storagePerAddr; j++ { + pairs = append(pairs, storagePair(addr, makeSlot(i*storagePerAddr+j), nonzeroByte(i+j))) + } + } + + memStore := newTestMemiavlStore(t, homeDir) + require.NoError(t, memStore.ApplyChangeSets([]*proto.NamedChangeSet{{ + Name: keys.EVMStoreKey, + Changeset: proto.ChangeSet{Pairs: pairs}, + }})) + version, err := memStore.Commit() + require.NoError(t, err) + require.Equal(t, int64(1), version) + require.NoError(t, memStore.Close()) + + require.NoError(t, importMemiavlModulesToFlatKV(context.Background(), homeDir, []string{keys.EVMStoreKey}, 0, false)) + + flatStore := openImportedFlatKVStore(t, homeDir) + defer func() { require.NoError(t, flatStore.Close()) }() + + // Spot-check several addresses across the dataset to catch any + // boundary issues (first / middle / last batch) in the translator's + // cross-call account merge buffer and the importer's batched writes. + checkpoints := []int{0, 1, numAddrs / 4, numAddrs / 2, numAddrs - 1} + for _, i := range checkpoints { + addr := makeAddr(i) + + gotNonce, found := flatStore.Get(keys.EVMStoreKey, keys.BuildEVMKey(keys.EVMKeyNonce, addr[:])) + require.Truef(t, found, "nonce for addr index %d missing", i) + require.Equalf(t, nonceBytesBE(uint64(i+1)), gotNonce, "nonce mismatch for addr index %d", i) + + want := codeHashOf(nonzeroByte(i)) + gotCodeHash, found := flatStore.Get(keys.EVMStoreKey, keys.BuildEVMKey(keys.EVMKeyCodeHash, addr[:])) + require.Truef(t, found, "codehash for addr index %d missing", i) + require.Equalf(t, want[:], gotCodeHash, "codehash mismatch for addr index %d", i) + + expectedCode := []byte{0x60, byte(i & 0xFF), 0x00, 0x52, 0x60, 0x20, 0x60, 0x00, 0xF3} + gotCode, found := flatStore.Get(keys.EVMStoreKey, keys.BuildEVMKey(keys.EVMKeyCode, addr[:])) + require.Truef(t, found, "code for addr index %d missing", i) + require.Equalf(t, expectedCode, gotCode, "code mismatch for addr index %d", i) + + for j := 0; j < storagePerAddr; j++ { + slot := makeSlot(i*storagePerAddr + j) + expectedStorage := padLeft32(nonzeroByte(i + j)) + gotStorage, found := flatStore.Get(keys.EVMStoreKey, keys.BuildEVMKey(keys.EVMKeyStorage, ktype.StorageKey(addr, slot))) + require.Truef(t, found, "storage[%d][%d] missing", i, j) + require.Equalf(t, expectedStorage, gotStorage, "storage[%d][%d] mismatch", i, j) + } + } + + // A non-existent address must still miss after the bulk import: this + // is the regression knob that catches a translator that accidentally + // emits zero-default rows for unseen account fields when the buffer + // grows past one batch. + missingAddr := makeAddr(numAddrs + 1) + _, found := flatStore.Get(keys.EVMStoreKey, keys.BuildEVMKey(keys.EVMKeyNonce, missingAddr[:])) + require.False(t, found, "synthetic-out-of-range address must not exist") +} + +func newTestMemiavlStore(t *testing.T, homeDir string) *memiavl.CommitStore { + t.Helper() + cfg := memiavl.DefaultConfig() + cfg.AsyncCommitBuffer = 0 + store := memiavl.NewCommitStore(homeDir, cfg) + store.Initialize([]string{keys.EVMStoreKey}) + _, err := store.LoadVersion(0, false) + require.NoError(t, err) + return store +} + +func openImportedFlatKVStore(t *testing.T, homeDir string) *flatkv.CommitStore { + t.Helper() + return newTestFlatKVStoreAtHome(t, homeDir) +} + +func newTestFlatKVStoreAtHome(t *testing.T, homeDir string) *flatkv.CommitStore { + t.Helper() + cfg := flatkvconfig.DefaultTestConfig(t) + cfg.DataDir = utils.GetFlatKVPath(homeDir) + store, err := flatkv.NewCommitStore(context.Background(), cfg) + require.NoError(t, err) + _, err = store.LoadVersion(0, false) + require.NoError(t, err) + return store +}