From 159375290ac8617e5a0166ed86da45b4648493c1 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Tue, 3 Feb 2026 17:27:06 +0800 Subject: [PATCH 1/7] [python/hotfix] fix bucket hash calculation --- .../java/org/apache/paimon/JavaPyE2ETest.java | 38 +++- paimon-python/dev/run_mixed_tests.sh | 30 +++- .../tests/e2e/java_py_read_write_test.py | 49 ++++++ .../pypaimon/write/row_key_extractor.py | 162 ++++++++++++++++-- 4 files changed, 257 insertions(+), 22 deletions(-) diff --git a/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java b/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java index 507b3a8b6985..8adefa789b9c 100644 --- a/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java +++ b/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java @@ -34,6 +34,7 @@ import org.apache.paimon.manifest.IndexManifestEntry; import org.apache.paimon.options.MemorySize; import org.apache.paimon.options.Options; +import org.apache.paimon.predicate.Predicate; import org.apache.paimon.predicate.PredicateBuilder; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaManager; @@ -62,6 +63,8 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.condition.EnabledIfSystemProperty; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.nio.file.Files; import java.nio.file.Paths; @@ -88,6 +91,8 @@ /** Mixed language overwrite test for Java and Python interoperability. */ public class JavaPyE2ETest { + private static final Logger LOG = LoggerFactory.getLogger(JavaPyE2ETest.class); + java.nio.file.Path tempDir = Paths.get("../paimon-python/pypaimon/tests/e2e").toAbsolutePath(); // Fields from TableTestBase that we need @@ -131,7 +136,7 @@ public void testReadAppendTable() throws Exception { read, splits, row -> DataFormatTestUtil.toStringNoRowKind(row, table.rowType())); - System.out.println(res); + LOG.info("Read append table: {} row(s)", res.size()); } } @@ -391,7 +396,7 @@ public void testReadPkTable() throws Exception { TableRead read = fileStoreTable.newRead(); List res = getResult(read, splits, row -> rowToStringWithStruct(row, table.rowType())); - System.out.println("Result for " + format + " : " + res); + LOG.info("Result for {}: {} row(s)", format, res.size()); assertThat(table.rowType().getFieldTypes().get(4)).isEqualTo(DataTypes.TIMESTAMP()); assertThat(table.rowType().getFieldTypes().get(5)) .isEqualTo(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()); @@ -409,6 +414,35 @@ public void testReadPkTable() throws Exception { } } + @Test + @EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true") + public void testReadPkTableBucketNumCalculate() throws Exception { + for (String format : Arrays.asList("parquet", "orc", "avro")) { + Identifier identifier = + identifier("mixed_test_pk_table_bucket_num_calculate_" + format); + Table table = catalog.getTable(identifier); + PredicateBuilder predicateBuilder = new PredicateBuilder(table.rowType()); + Predicate predicate = + PredicateBuilder.and( + predicateBuilder.equal(0, BinaryString.fromString("e2e_pk_001")), + predicateBuilder.equal(1, BinaryString.fromString("e2e_suite_001")), + predicateBuilder.equal(2, 1)); + ReadBuilder readBuilder = table.newReadBuilder().withFilter(predicate); + List res = + getResult( + readBuilder.newRead(), + readBuilder.newScan().plan().splits(), + row -> rowToStringWithStruct(row, table.rowType())); + LOG.info("Read bucket_num_calculate table {}: {} row(s)", format, res.size()); + assertThat(res) + .as( + "Python wrote 1 row (pk_str_a=e2e_pk_001, pk_str_b=e2e_suite_001," + + " pk_int=1); Java read with predicate should return it.).") + .hasSize(1); + assertThat(res.get(0)).contains("e2e_pk_001").contains("e2e_suite_001").contains("1"); + } + } + @Test @EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true") public void testBtreeIndexWrite() throws Exception { diff --git a/paimon-python/dev/run_mixed_tests.sh b/paimon-python/dev/run_mixed_tests.sh index ed7f54e0146a..4aa83cd7023e 100755 --- a/paimon-python/dev/run_mixed_tests.sh +++ b/paimon-python/dev/run_mixed_tests.sh @@ -117,12 +117,12 @@ run_python_read_test() { # Function to run Python Write test for Python-Write-Java-Read scenario run_python_write_test() { - echo -e "${YELLOW}=== Step 3: Running Python Write Test (JavaPyReadWriteTest.test_py_write_read_pk_table) ===${NC}" + echo -e "${YELLOW}=== Step 3: Running Python Write Test (test_py_write_read_pk_table, test_py_write_read_pk_table_bucket_num_calculate) ===${NC}" cd "$PAIMON_PYTHON_DIR" - # Run the parameterized Python test method for writing data (runs for both Parquet/Orc/Avro and Lance) - echo "Running Python test for JavaPyReadWriteTest.test_py_write_read_pk_table (Python Write)..." + # Run the parameterized Python test method for writing data (pk table + bucket num calculate for Python/Java mismatch repro) + echo "Running Python test for JavaPyReadWriteTest (test_py_write_read_pk_table, test_py_write_read_pk_table_bucket_num_calculate)..." if python -m pytest java_py_read_write_test.py::JavaPyReadWriteTest -k "test_py_write_read_pk_table" -v; then echo -e "${GREEN}✓ Python write test completed successfully${NC}" return 0 @@ -134,7 +134,7 @@ run_python_write_test() { # Function to run Java Read test for Python-Write-Java-Read scenario run_java_read_test() { - echo -e "${YELLOW}=== Step 4: Running Java Read Test (JavaPyE2ETest.testReadPkTable for Parquet/Orc/Avro, JavaPyLanceE2ETest.testReadPkTableLance for Lance) ===${NC}" + echo -e "${YELLOW}=== Step 4: Running Java Read Test (testReadPkTable, testReadPkTableBucketNumCalculate, testReadPkTableLance) ===${NC}" cd "$PROJECT_ROOT" @@ -154,6 +154,18 @@ run_java_read_test() { echo "" + # Run Java test for bucket_num_calculate table (Python write, Java read with predicate; repro then fix in Python) + echo "Running Maven test for JavaPyE2ETest.testReadPkTableBucketNumCalculate (reads mixed_test_pk_table_bucket_num_calculate_*)..." + local bucket_num_calculate_result=0 + if mvn test -Dtest=org.apache.paimon.JavaPyE2ETest#testReadPkTableBucketNumCalculate -pl paimon-core -Drun.e2e.tests=true -Dpython.version="$PYTHON_VERSION"; then + echo -e "${GREEN}✓ Java read bucket_num_calculate test completed successfully${NC}" + else + echo -e "${RED}✗ Java read bucket_num_calculate test failed (expected until Python bucket hash aligns with Java)${NC}" + bucket_num_calculate_result=1 + fi + + echo "" + # Run Java test for lance format in paimon-lance echo "Running Maven test for JavaPyLanceE2ETest.testReadPkTableLance (Java Read Lance)..." echo "Note: Maven may download dependencies on first run, this may take a while..." @@ -165,7 +177,7 @@ run_java_read_test() { lance_result=1 fi - if [[ $parquet_result -eq 0 && $lance_result -eq 0 ]]; then + if [[ $parquet_result -eq 0 && $bucket_num_calculate_result -eq 0 && $lance_result -eq 0 ]]; then return 0 else return 1 @@ -361,15 +373,15 @@ main() { fi if [[ $python_write_result -eq 0 ]]; then - echo -e "${GREEN}✓ Python Write Test (JavaPyReadWriteTest.test_py_write_read_pk_table): PASSED${NC}" + echo -e "${GREEN}✓ Python Write Test (test_py_write_read_pk_table, test_py_write_read_pk_table_bucket_num_calculate): PASSED${NC}" else - echo -e "${RED}✗ Python Write Test (JavaPyReadWriteTest.test_py_write_read_pk_table): FAILED${NC}" + echo -e "${RED}✗ Python Write Test (test_py_write_read_pk_table, test_py_write_read_pk_table_bucket_num_calculate): FAILED${NC}" fi if [[ $java_read_result -eq 0 ]]; then - echo -e "${GREEN}✓ Java Read Test (Parquet/Orc/Avro + Lance): PASSED${NC}" + echo -e "${GREEN}✓ Java Read Test (Parquet/Orc/Avro + bucket_num_calculate + Lance): PASSED${NC}" else - echo -e "${RED}✗ Java Read Test (Parquet/Orc/Avro + Lance): FAILED${NC}" + echo -e "${RED}✗ Java Read Test (Parquet/Orc/Avro + bucket_num_calculate + Lance): FAILED${NC}" fi if [[ $pk_dv_result -eq 0 ]]; then diff --git a/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py b/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py index 3180d7b69913..8083cc0a277c 100644 --- a/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py +++ b/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py @@ -208,6 +208,55 @@ def test_py_write_read_pk_table(self, file_format): actual_names = set(initial_result['name'].tolist()) self.assertEqual(actual_names, expected_names) + @parameterized.expand([('parquet',), ('orc',), ('avro',)]) + def test_py_write_read_pk_table_bucket_num_calculate(self, file_format): + from pypaimon.write.row_key_extractor import FixedBucketRowKeyExtractor + pa_schema = pa.schema([ + ('pk_str_a', pa.string()), + ('pk_str_b', pa.string()), + ('pk_int', pa.int32()), + ('value', pa.int64()), + ]) + table_name = f'default.mixed_test_pk_table_bucket_num_calculate_{file_format}' + schema = Schema.from_pyarrow_schema( + pa_schema, + primary_keys=['pk_str_a', 'pk_str_b', 'pk_int'], + options={'bucket': '4', 'file.format': file_format}, + ) + try: + existing = self.catalog.get_table(table_name) + table_path = self.catalog.get_table_path(existing.identifier) + if self.catalog.file_io.exists(table_path): + self.catalog.file_io.delete(table_path, recursive=True) + except Exception: + pass + self.catalog.create_table(table_name, schema, False) + table = self.catalog.get_table(table_name) + pk_key = ('e2e_pk_001', 'e2e_suite_001', 1) + initial_data = pd.DataFrame({ + 'pk_str_a': [pk_key[0]], + 'pk_str_b': [pk_key[1]], + 'pk_int': [pk_key[2]], + 'value': [100], + }) + write_builder = table.new_batch_write_builder() + table_write = write_builder.new_write() + table_commit = write_builder.new_commit() + table_write.write_pandas(initial_data) + table_commit.commit(table_write.prepare_commit()) + table_write.close() + table_commit.close() + batch = pa.RecordBatch.from_pydict( + {'pk_str_a': [pk_key[0]], 'pk_str_b': [pk_key[1]], 'pk_int': [pk_key[2]], 'value': [100]}, + schema=pa_schema + ) + extractor = FixedBucketRowKeyExtractor(table.table_schema) + _, buckets = extractor.extract_partition_bucket_batch(batch) + expected_bucket = buckets[0] + splits = table.new_read_builder().new_scan().plan().splits() + self.assertEqual(len(splits), 1, "one row => one split") + self.assertEqual(splits[0].bucket, expected_bucket, "split bucket must match extractor hash") + @parameterized.expand(get_file_format_params()) def test_read_pk_table(self, file_format): # Skip ORC format for Python < 3.8 due to pyarrow limitation with TIMESTAMP_INSTANT diff --git a/paimon-python/pypaimon/write/row_key_extractor.py b/paimon-python/pypaimon/write/row_key_extractor.py index 7deebdb5b53d..73e7e643f404 100644 --- a/paimon-python/pypaimon/write/row_key_extractor.py +++ b/paimon-python/pypaimon/write/row_key_extractor.py @@ -15,11 +15,9 @@ # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ - -import hashlib -import json +import struct from abc import ABC, abstractmethod -from typing import List, Tuple +from typing import Any, List, Tuple import pyarrow as pa @@ -27,6 +25,144 @@ from pypaimon.schema.table_schema import TableSchema from pypaimon.table.bucket_mode import BucketMode +_MURMUR_C1 = 0xCC9E2D51 +_MURMUR_C2 = 0x1B873593 +_DEFAULT_SEED = 42 +_HEADER_SIZE_IN_BITS = 8 + + +def _round_number_of_bytes_to_nearest_word(num_bytes: int) -> int: + remainder = num_bytes & 0x07 + return num_bytes if remainder == 0 else num_bytes + (8 - remainder) + + +def _calculate_bit_set_width_in_bytes(arity: int) -> int: + return ((arity + 63 + _HEADER_SIZE_IN_BITS) // 64) * 8 + + +def _get_fixed_length_part_size(arity: int) -> int: + return _calculate_bit_set_width_in_bytes(arity) + 8 * arity + + +def _mix_k1(k1: int) -> int: + k1 = (k1 * _MURMUR_C1) & 0xFFFFFFFF + k1 = ((k1 << 15) | (k1 >> 17)) & 0xFFFFFFFF + k1 = (k1 * _MURMUR_C2) & 0xFFFFFFFF + return k1 + + +def _mix_h1(h1: int, k1: int) -> int: + h1 = (h1 ^ k1) & 0xFFFFFFFF + h1 = ((h1 << 13) | (h1 >> 19)) & 0xFFFFFFFF + h1 = (h1 * 5 + 0xE6546B64) & 0xFFFFFFFF + return h1 + + +def _fmix(h1: int, length: int) -> int: + # Finalization mix - force all bits of a hash block to avalanche + h1 = (h1 ^ length) & 0xFFFFFFFF + h1 ^= h1 >> 16 + h1 = (h1 * 0x85EBCA6B) & 0xFFFFFFFF + h1 ^= h1 >> 13 + h1 = (h1 * 0xC2B2AE35) & 0xFFFFFFFF + h1 ^= h1 >> 16 + return h1 + + +def _hash_bytes_by_words(data: bytes, seed: int = _DEFAULT_SEED) -> int: + n = len(data) + length_aligned = n - (n % 4) + h1 = seed + for i in range(0, length_aligned, 4): + k1 = struct.unpack_from(" str: + if hasattr(field_type, "type") and isinstance(getattr(field_type, "type"), str): + return getattr(field_type, "type").upper() + s = str(field_type).upper() + if "BIGINT" in s or "LONG" in s: + return "BIGINT" + if "TINYINT" in s: + return "TINYINT" + if "SMALLINT" in s: + return "SMALLINT" + if "INT" in s or "INTEGER" in s: + return "INT" + if "STRING" in s or "VARCHAR" in s or "CHAR" in s: + return "STRING" + if "FLOAT" in s: + return "FLOAT" + if "DOUBLE" in s: + return "DOUBLE" + if "BOOLEAN" in s or "BOOL" in s: + return "BOOLEAN" + return s + + +def _to_binary_row_bytes(type_names: List[str], values: Tuple[Any, ...]) -> bytes: + arity = len(type_names) + assert arity == len(values) + null_size = _calculate_bit_set_width_in_bytes(arity) + fixed_size = _get_fixed_length_part_size(arity) + buf = bytearray(fixed_size) + buf[0] = 0 + var_cursor = fixed_size + var_parts = [] + for pos in range(arity): + type_name = type_names[pos] + value = values[pos] + field_offset = null_size + 8 * pos + if value is None: + byte_idx = (pos + _HEADER_SIZE_IN_BITS) // 8 + bit_idx = (pos + _HEADER_SIZE_IN_BITS) % 8 + buf[byte_idx] |= 1 << bit_idx + continue + if type_name in ("STRING", "VARCHAR", "CHAR"): + b = value.encode("utf-8") if isinstance(value, str) else value + length = len(b) + if length <= 7: + buf[field_offset:field_offset + length] = b + buf[field_offset + 7] = 0x80 | length + else: + rounded = _round_number_of_bytes_to_nearest_word(length) + var_parts.append((b, length, rounded)) + struct.pack_into(" List[int]: columns = [data.column(i) for i in self.bucket_key_indices] hashes = [] for row_idx in range(data.num_rows): row_values = tuple(col[row_idx].as_py() for col in columns) - hashes.append(self.hash(row_values)) - return [abs(hash_val) % self.num_buckets for hash_val in hashes] + hashes.append(self._binary_row_hash_code(row_values)) + return [abs(h) % self.num_buckets for h in hashes] - @staticmethod - def hash(data) -> int: - data_json = json.dumps(data) - return int(hashlib.md5(data_json.encode()).hexdigest(), 16) + def _binary_row_hash_code(self, row_values: Tuple[Any, ...]) -> int: + row_bytes = _to_binary_row_bytes(self._bucket_key_type_names, row_values) + return _hash_bytes_by_words(row_bytes) class UnawareBucketRowKeyExtractor(RowKeyExtractor): @@ -126,7 +266,7 @@ def __init__(self, table_schema: 'TableSchema'): f"Only 'bucket' = '-1' is allowed for 'DynamicBucketRowKeyExtractor', but found: {num_buckets}" ) - def _extract_buckets_batch(self, data: pa.RecordBatch) -> int: + def _extract_buckets_batch(self, data: pa.RecordBatch) -> List[int]: raise ValueError("Can't extract bucket from row in dynamic bucket mode") From 0605cbb7054155ff35d4c995deb6ce75f424ffc0 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Tue, 3 Feb 2026 17:55:41 +0800 Subject: [PATCH 2/7] fix --- .../java/org/apache/paimon/JavaPyE2ETest.java | 56 +++++++++++++------ .../tests/e2e/java_py_read_write_test.py | 34 +++++++---- .../pypaimon/write/row_key_extractor.py | 12 +++- 3 files changed, 73 insertions(+), 29 deletions(-) diff --git a/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java b/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java index 8adefa789b9c..af11bff1070a 100644 --- a/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java +++ b/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java @@ -414,6 +414,12 @@ public void testReadPkTable() throws Exception { } } + private static final Object[][] BUCKET_TEST_KEYS = { + {"e2e_pk_001", "e2e_suite_001", 1}, + {"k", "v", 0}, + {"e2e_pk_002", "e2e_suite_002", 2}, + }; + @Test @EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true") public void testReadPkTableBucketNumCalculate() throws Exception { @@ -422,24 +428,38 @@ public void testReadPkTableBucketNumCalculate() throws Exception { identifier("mixed_test_pk_table_bucket_num_calculate_" + format); Table table = catalog.getTable(identifier); PredicateBuilder predicateBuilder = new PredicateBuilder(table.rowType()); - Predicate predicate = - PredicateBuilder.and( - predicateBuilder.equal(0, BinaryString.fromString("e2e_pk_001")), - predicateBuilder.equal(1, BinaryString.fromString("e2e_suite_001")), - predicateBuilder.equal(2, 1)); - ReadBuilder readBuilder = table.newReadBuilder().withFilter(predicate); - List res = - getResult( - readBuilder.newRead(), - readBuilder.newScan().plan().splits(), - row -> rowToStringWithStruct(row, table.rowType())); - LOG.info("Read bucket_num_calculate table {}: {} row(s)", format, res.size()); - assertThat(res) - .as( - "Python wrote 1 row (pk_str_a=e2e_pk_001, pk_str_b=e2e_suite_001," - + " pk_int=1); Java read with predicate should return it.).") - .hasSize(1); - assertThat(res.get(0)).contains("e2e_pk_001").contains("e2e_suite_001").contains("1"); + for (Object[] key : BUCKET_TEST_KEYS) { + String pkStrA = (String) key[0]; + String pkStrB = (String) key[1]; + int pkInt = (Integer) key[2]; + Predicate predicate = + PredicateBuilder.and( + predicateBuilder.equal(0, BinaryString.fromString(pkStrA)), + predicateBuilder.equal(1, BinaryString.fromString(pkStrB)), + predicateBuilder.equal(2, pkInt)); + ReadBuilder readBuilder = table.newReadBuilder().withFilter(predicate); + List res = + getResult( + readBuilder.newRead(), + readBuilder.newScan().plan().splits(), + row -> rowToStringWithStruct(row, table.rowType())); + LOG.info( + "Read bucket_num_calculate table {} key ({}, {}, {}): {} row(s)", + format, + pkStrA, + pkStrB, + pkInt, + res.size()); + assertThat(res) + .as( + "Python wrote row (pk_str_a=%s, pk_str_b=%s, pk_int=%s); " + + "Java read with predicate should return it.", + pkStrA, + pkStrB, + pkInt) + .hasSize(1); + assertThat(res.get(0)).contains(pkStrA).contains(pkStrB).contains(String.valueOf(pkInt)); + } } } diff --git a/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py b/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py index 8083cc0a277c..a4d5785b8856 100644 --- a/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py +++ b/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py @@ -208,6 +208,12 @@ def test_py_write_read_pk_table(self, file_format): actual_names = set(initial_result['name'].tolist()) self.assertEqual(actual_names, expected_names) + _BUCKET_TEST_KEYS = [ + ('e2e_pk_001', 'e2e_suite_001', 1), + ('k', 'v', 0), + ('e2e_pk_002', 'e2e_suite_002', 2), + ] + @parameterized.expand([('parquet',), ('orc',), ('avro',)]) def test_py_write_read_pk_table_bucket_num_calculate(self, file_format): from pypaimon.write.row_key_extractor import FixedBucketRowKeyExtractor @@ -232,12 +238,12 @@ def test_py_write_read_pk_table_bucket_num_calculate(self, file_format): pass self.catalog.create_table(table_name, schema, False) table = self.catalog.get_table(table_name) - pk_key = ('e2e_pk_001', 'e2e_suite_001', 1) + test_keys = self._BUCKET_TEST_KEYS initial_data = pd.DataFrame({ - 'pk_str_a': [pk_key[0]], - 'pk_str_b': [pk_key[1]], - 'pk_int': [pk_key[2]], - 'value': [100], + 'pk_str_a': [k[0] for k in test_keys], + 'pk_str_b': [k[1] for k in test_keys], + 'pk_int': [k[2] for k in test_keys], + 'value': [100 + i for i in range(len(test_keys))], }) write_builder = table.new_batch_write_builder() table_write = write_builder.new_write() @@ -247,15 +253,23 @@ def test_py_write_read_pk_table_bucket_num_calculate(self, file_format): table_write.close() table_commit.close() batch = pa.RecordBatch.from_pydict( - {'pk_str_a': [pk_key[0]], 'pk_str_b': [pk_key[1]], 'pk_int': [pk_key[2]], 'value': [100]}, - schema=pa_schema + { + 'pk_str_a': [k[0] for k in test_keys], + 'pk_str_b': [k[1] for k in test_keys], + 'pk_int': [k[2] for k in test_keys], + 'value': [100 + i for i in range(len(test_keys))], + }, + schema=pa_schema, ) extractor = FixedBucketRowKeyExtractor(table.table_schema) _, buckets = extractor.extract_partition_bucket_batch(batch) - expected_bucket = buckets[0] splits = table.new_read_builder().new_scan().plan().splits() - self.assertEqual(len(splits), 1, "one row => one split") - self.assertEqual(splits[0].bucket, expected_bucket, "split bucket must match extractor hash") + split_buckets = {s.bucket for s in splits} + expected_buckets = set(buckets) + self.assertEqual(split_buckets, expected_buckets, "split buckets must match extractor") + self.assertGreaterEqual( + len(expected_buckets), 2, + "test keys should span at least 2 buckets (low-bit and high-bit hash)") @parameterized.expand(get_file_format_params()) def test_read_pk_table(self, file_format): diff --git a/paimon-python/pypaimon/write/row_key_extractor.py b/paimon-python/pypaimon/write/row_key_extractor.py index 73e7e643f404..0f091463c64d 100644 --- a/paimon-python/pypaimon/write/row_key_extractor.py +++ b/paimon-python/pypaimon/write/row_key_extractor.py @@ -15,6 +15,7 @@ # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ +import math import struct from abc import ABC, abstractmethod from typing import Any, List, Tuple @@ -80,6 +81,15 @@ def _hash_bytes_by_words(data: bytes, seed: int = _DEFAULT_SEED) -> int: return _fmix(h1, n) +def _bucket_from_hash(hash_unsigned: int, num_buckets: int) -> int: + if hash_unsigned >= 0x80000000: + hash_signed = hash_unsigned - 0x100000000 + else: + hash_signed = hash_unsigned + rem = hash_signed - math.trunc(hash_signed / num_buckets) * num_buckets + return abs(rem) + + def _type_name_for_bucket(field_type: Any) -> str: if hasattr(field_type, "type") and isinstance(getattr(field_type, "type"), str): return getattr(field_type, "type").upper() @@ -230,7 +240,7 @@ def _extract_buckets_batch(self, data: pa.RecordBatch) -> List[int]: for row_idx in range(data.num_rows): row_values = tuple(col[row_idx].as_py() for col in columns) hashes.append(self._binary_row_hash_code(row_values)) - return [abs(h) % self.num_buckets for h in hashes] + return [_bucket_from_hash(h, self.num_buckets) for h in hashes] def _binary_row_hash_code(self, row_values: Tuple[Any, ...]) -> int: row_bytes = _to_binary_row_bytes(self._bucket_key_type_names, row_values) From 7eb4a20cf9c4972afe07423bdb263601255f0129 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Tue, 3 Feb 2026 18:27:36 +0800 Subject: [PATCH 3/7] fix code format --- .../src/test/java/org/apache/paimon/JavaPyE2ETest.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java b/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java index af11bff1070a..6298aa36ad88 100644 --- a/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java +++ b/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java @@ -454,11 +454,12 @@ public void testReadPkTableBucketNumCalculate() throws Exception { .as( "Python wrote row (pk_str_a=%s, pk_str_b=%s, pk_int=%s); " + "Java read with predicate should return it.", - pkStrA, - pkStrB, - pkInt) + pkStrA, pkStrB, pkInt) .hasSize(1); - assertThat(res.get(0)).contains(pkStrA).contains(pkStrB).contains(String.valueOf(pkInt)); + assertThat(res.get(0)) + .contains(pkStrA) + .contains(pkStrB) + .contains(String.valueOf(pkInt)); } } } From d5b20b43d1e9b7ff4913134e8c5832b7ee90d4aa Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Tue, 3 Feb 2026 19:39:32 +0800 Subject: [PATCH 4/7] fix test case failure because of bucket hash method update --- .../pypaimon/tests/py36/ao_simple_test.py | 8 ++++---- .../pypaimon/tests/rest/rest_simple_test.py | 18 +++++++++--------- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/paimon-python/pypaimon/tests/py36/ao_simple_test.py b/paimon-python/pypaimon/tests/py36/ao_simple_test.py index f6bcfa976711..2aba35427ca2 100644 --- a/paimon-python/pypaimon/tests/py36/ao_simple_test.py +++ b/paimon-python/pypaimon/tests/py36/ao_simple_test.py @@ -147,10 +147,10 @@ def test_with_shard_ao_fixed_bucket(self): splits = read_builder.new_scan().with_shard(0, 3).plan().splits() actual = table_sort_by(table_read.to_arrow(splits), 'user_id') expected = pa.Table.from_pydict({ - 'user_id': [1, 2, 3, 5, 8, 12], - 'item_id': [1001, 1002, 1003, 1005, 1008, 1012], - 'behavior': ['a', 'b', 'c', 'd', 'g', 'k'], - 'dt': ['p1', 'p1', 'p2', 'p2', 'p1', 'p1'], + 'user_id': [1, 2, 3, 4, 5, 13], + 'item_id': [1001, 1002, 1003, 1004, 1005, 1013], + 'behavior': ['a', 'b', 'c', None, 'd', 'l'], + 'dt': ['p1', 'p1', 'p2', 'p1', 'p2', 'p2'], }, schema=self.pa_schema) self.assertEqual(actual, expected) diff --git a/paimon-python/pypaimon/tests/rest/rest_simple_test.py b/paimon-python/pypaimon/tests/rest/rest_simple_test.py index 3e29f950df2b..c00979ffd40d 100644 --- a/paimon-python/pypaimon/tests/rest/rest_simple_test.py +++ b/paimon-python/pypaimon/tests/rest/rest_simple_test.py @@ -205,10 +205,10 @@ def test_with_shard_ao_fixed_bucket(self): splits = read_builder.new_scan().with_shard(0, 3).plan().splits() actual = table_sort_by(table_read.to_arrow(splits), 'user_id') expected = pa.Table.from_pydict({ - 'user_id': [1, 2, 3, 5, 8, 12], - 'item_id': [1001, 1002, 1003, 1005, 1008, 1012], - 'behavior': ['a', 'b', 'c', 'd', 'g', 'k'], - 'dt': ['p1', 'p1', 'p2', 'p2', 'p1', 'p1'], + 'user_id': [1, 2, 3, 4, 5, 13], + 'item_id': [1001, 1002, 1003, 1004, 1005, 1013], + 'behavior': ['a', 'b', 'c', None, 'd', 'l'], + 'dt': ['p1', 'p1', 'p2', 'p1', 'p2', 'p2'], }, schema=self.pa_schema) self.assertEqual(actual, expected) @@ -636,13 +636,13 @@ def test_with_shard_pk_fixed_bucket(self): table_read = read_builder.new_read() actual = table_read.to_arrow(splits) data_expected = { - 'user_id': [4, 6, 2, 10, 8], - 'item_id': [1002, 1003, 1001, 1005, 1004], - 'behavior': ['b', 'c', 'a', 'e', 'd'], - 'dt': ['2025-08-10', '2025-08-11', '2000-10-10', '2025-08-13', '2025-08-12'] + 'user_id': [4, 6, 2, 8, 10], + 'item_id': [1002, 1003, 1001, 1004, 1005], + 'behavior': ['b', 'c', 'a', 'd', 'e'], + 'dt': ['2025-08-10', '2025-08-11', '2000-10-10', '2025-08-12', '2025-08-13'] } expected = pa.Table.from_pydict(data_expected, schema=self.pa_schema) - self.assertEqual(actual, expected) + self.assertEqual(actual.combine_chunks(), expected.combine_chunks()) def test_with_shard_uniform_division(self): schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt']) From 0d7c3840c3ef9129c25eaf9460c1e43f3aab3773 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Tue, 3 Feb 2026 22:04:41 +0800 Subject: [PATCH 5/7] remove duplicate code row ser with GenericRowSerializer --- .../java/org/apache/paimon/JavaPyE2ETest.java | 72 ++++------ paimon-python/dev/run_mixed_tests.sh | 30 ++--- .../tests/e2e/java_py_read_write_test.py | 66 +-------- .../pypaimon/write/row_key_extractor.py | 127 +++--------------- 4 files changed, 54 insertions(+), 241 deletions(-) diff --git a/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java b/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java index 6298aa36ad88..1cd881b9cf6f 100644 --- a/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java +++ b/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java @@ -75,6 +75,7 @@ import java.util.UUID; import java.util.function.Consumer; import java.util.function.Function; +import java.util.stream.Collectors; import static org.apache.paimon.CoreOptions.BUCKET; import static org.apache.paimon.CoreOptions.DATA_EVOLUTION_ENABLED; @@ -412,55 +413,30 @@ public void testReadPkTable() throws Exception { "+I[5, Chicken, Meat, 5.0, 1970-01-01T00:16:40.004, 1970-01-01T00:33:20.004, (store3, 1005, (NewYork, USA))]", "+I[6, Beef, Meat, 8.0, 1970-01-01T00:16:40.005, 1970-01-01T00:33:20.005, (store3, 1006, (London, UK))]"); } - } - - private static final Object[][] BUCKET_TEST_KEYS = { - {"e2e_pk_001", "e2e_suite_001", 1}, - {"k", "v", 0}, - {"e2e_pk_002", "e2e_suite_002", 2}, - }; - @Test - @EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true") - public void testReadPkTableBucketNumCalculate() throws Exception { - for (String format : Arrays.asList("parquet", "orc", "avro")) { - Identifier identifier = - identifier("mixed_test_pk_table_bucket_num_calculate_" + format); - Table table = catalog.getTable(identifier); - PredicateBuilder predicateBuilder = new PredicateBuilder(table.rowType()); - for (Object[] key : BUCKET_TEST_KEYS) { - String pkStrA = (String) key[0]; - String pkStrB = (String) key[1]; - int pkInt = (Integer) key[2]; - Predicate predicate = - PredicateBuilder.and( - predicateBuilder.equal(0, BinaryString.fromString(pkStrA)), - predicateBuilder.equal(1, BinaryString.fromString(pkStrB)), - predicateBuilder.equal(2, pkInt)); - ReadBuilder readBuilder = table.newReadBuilder().withFilter(predicate); - List res = - getResult( - readBuilder.newRead(), - readBuilder.newScan().plan().splits(), - row -> rowToStringWithStruct(row, table.rowType())); - LOG.info( - "Read bucket_num_calculate table {} key ({}, {}, {}): {} row(s)", - format, - pkStrA, - pkStrB, - pkInt, - res.size()); - assertThat(res) - .as( - "Python wrote row (pk_str_a=%s, pk_str_b=%s, pk_int=%s); " - + "Java read with predicate should return it.", - pkStrA, pkStrB, pkInt) - .hasSize(1); - assertThat(res.get(0)) - .contains(pkStrA) - .contains(pkStrB) - .contains(String.valueOf(pkInt)); - } + PredicateBuilder predicateBuilder = new PredicateBuilder(table.rowType()); + int[] ids = {1, 2, 3, 4, 5, 6}; + String[] names = {"Apple", "Banana", "Carrot", "Broccoli", "Chicken", "Beef"}; + for (int i = 0; i < ids.length; i++) { + int id = ids[i]; + String expectedName = names[i]; + Predicate predicate = predicateBuilder.equal(0, id); + ReadBuilder readBuilder = fileStoreTable.newReadBuilder().withFilter(predicate); + List byKey = + getResult( + readBuilder.newRead(), + readBuilder.newScan().plan().splits(), + row -> rowToStringWithStruct(row, table.rowType())); + List matching = + byKey.stream() + .filter(s -> s.trim().startsWith("+I[" + id + ", ")) + .collect(Collectors.toList()); + assertThat(matching) + .as( + "Python wrote row id=%d; Java read with predicate id=%d should return it.", + id, id) + .hasSize(1); + assertThat(matching.get(0)).contains(String.valueOf(id)).contains(expectedName); } } diff --git a/paimon-python/dev/run_mixed_tests.sh b/paimon-python/dev/run_mixed_tests.sh index 4aa83cd7023e..c6cd49e1ec31 100755 --- a/paimon-python/dev/run_mixed_tests.sh +++ b/paimon-python/dev/run_mixed_tests.sh @@ -117,12 +117,12 @@ run_python_read_test() { # Function to run Python Write test for Python-Write-Java-Read scenario run_python_write_test() { - echo -e "${YELLOW}=== Step 3: Running Python Write Test (test_py_write_read_pk_table, test_py_write_read_pk_table_bucket_num_calculate) ===${NC}" + echo -e "${YELLOW}=== Step 3: Running Python Write Test (test_py_write_read_pk_table) ===${NC}" cd "$PAIMON_PYTHON_DIR" - # Run the parameterized Python test method for writing data (pk table + bucket num calculate for Python/Java mismatch repro) - echo "Running Python test for JavaPyReadWriteTest (test_py_write_read_pk_table, test_py_write_read_pk_table_bucket_num_calculate)..." + # Run the parameterized Python test method for writing data (pk table, includes bucket num assertion) + echo "Running Python test for JavaPyReadWriteTest (test_py_write_read_pk_table)..." if python -m pytest java_py_read_write_test.py::JavaPyReadWriteTest -k "test_py_write_read_pk_table" -v; then echo -e "${GREEN}✓ Python write test completed successfully${NC}" return 0 @@ -134,7 +134,7 @@ run_python_write_test() { # Function to run Java Read test for Python-Write-Java-Read scenario run_java_read_test() { - echo -e "${YELLOW}=== Step 4: Running Java Read Test (testReadPkTable, testReadPkTableBucketNumCalculate, testReadPkTableLance) ===${NC}" + echo -e "${YELLOW}=== Step 4: Running Java Read Test (testReadPkTable, testReadPkTableLance) ===${NC}" cd "$PROJECT_ROOT" @@ -154,18 +154,6 @@ run_java_read_test() { echo "" - # Run Java test for bucket_num_calculate table (Python write, Java read with predicate; repro then fix in Python) - echo "Running Maven test for JavaPyE2ETest.testReadPkTableBucketNumCalculate (reads mixed_test_pk_table_bucket_num_calculate_*)..." - local bucket_num_calculate_result=0 - if mvn test -Dtest=org.apache.paimon.JavaPyE2ETest#testReadPkTableBucketNumCalculate -pl paimon-core -Drun.e2e.tests=true -Dpython.version="$PYTHON_VERSION"; then - echo -e "${GREEN}✓ Java read bucket_num_calculate test completed successfully${NC}" - else - echo -e "${RED}✗ Java read bucket_num_calculate test failed (expected until Python bucket hash aligns with Java)${NC}" - bucket_num_calculate_result=1 - fi - - echo "" - # Run Java test for lance format in paimon-lance echo "Running Maven test for JavaPyLanceE2ETest.testReadPkTableLance (Java Read Lance)..." echo "Note: Maven may download dependencies on first run, this may take a while..." @@ -177,7 +165,7 @@ run_java_read_test() { lance_result=1 fi - if [[ $parquet_result -eq 0 && $bucket_num_calculate_result -eq 0 && $lance_result -eq 0 ]]; then + if [[ $parquet_result -eq 0 && $lance_result -eq 0 ]]; then return 0 else return 1 @@ -373,15 +361,15 @@ main() { fi if [[ $python_write_result -eq 0 ]]; then - echo -e "${GREEN}✓ Python Write Test (test_py_write_read_pk_table, test_py_write_read_pk_table_bucket_num_calculate): PASSED${NC}" + echo -e "${GREEN}✓ Python Write Test (test_py_write_read_pk_table): PASSED${NC}" else - echo -e "${RED}✗ Python Write Test (test_py_write_read_pk_table, test_py_write_read_pk_table_bucket_num_calculate): FAILED${NC}" + echo -e "${RED}✗ Python Write Test (test_py_write_read_pk_table): FAILED${NC}" fi if [[ $java_read_result -eq 0 ]]; then - echo -e "${GREEN}✓ Java Read Test (Parquet/Orc/Avro + bucket_num_calculate + Lance): PASSED${NC}" + echo -e "${GREEN}✓ Java Read Test (Parquet/Orc/Avro + Lance): PASSED${NC}" else - echo -e "${RED}✗ Java Read Test (Parquet/Orc/Avro + bucket_num_calculate + Lance): FAILED${NC}" + echo -e "${RED}✗ Java Read Test (Parquet/Orc/Avro + Lance): FAILED${NC}" fi if [[ $pk_dv_result -eq 0 ]]; then diff --git a/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py b/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py index a4d5785b8856..71c7e43cb44b 100644 --- a/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py +++ b/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py @@ -146,7 +146,7 @@ def test_py_write_read_pk_table(self, file_format): primary_keys=['id'], options={ 'dynamic-partition-overwrite': 'false', - 'bucket': '2', + 'bucket': '4', 'file.format': file_format, "orc.timestamp-ltz.legacy.type": "false" } @@ -208,68 +208,14 @@ def test_py_write_read_pk_table(self, file_format): actual_names = set(initial_result['name'].tolist()) self.assertEqual(actual_names, expected_names) - _BUCKET_TEST_KEYS = [ - ('e2e_pk_001', 'e2e_suite_001', 1), - ('k', 'v', 0), - ('e2e_pk_002', 'e2e_suite_002', 2), - ] - - @parameterized.expand([('parquet',), ('orc',), ('avro',)]) - def test_py_write_read_pk_table_bucket_num_calculate(self, file_format): from pypaimon.write.row_key_extractor import FixedBucketRowKeyExtractor - pa_schema = pa.schema([ - ('pk_str_a', pa.string()), - ('pk_str_b', pa.string()), - ('pk_int', pa.int32()), - ('value', pa.int64()), - ]) - table_name = f'default.mixed_test_pk_table_bucket_num_calculate_{file_format}' - schema = Schema.from_pyarrow_schema( - pa_schema, - primary_keys=['pk_str_a', 'pk_str_b', 'pk_int'], - options={'bucket': '4', 'file.format': file_format}, - ) - try: - existing = self.catalog.get_table(table_name) - table_path = self.catalog.get_table_path(existing.identifier) - if self.catalog.file_io.exists(table_path): - self.catalog.file_io.delete(table_path, recursive=True) - except Exception: - pass - self.catalog.create_table(table_name, schema, False) - table = self.catalog.get_table(table_name) - test_keys = self._BUCKET_TEST_KEYS - initial_data = pd.DataFrame({ - 'pk_str_a': [k[0] for k in test_keys], - 'pk_str_b': [k[1] for k in test_keys], - 'pk_int': [k[2] for k in test_keys], - 'value': [100 + i for i in range(len(test_keys))], - }) - write_builder = table.new_batch_write_builder() - table_write = write_builder.new_write() - table_commit = write_builder.new_commit() - table_write.write_pandas(initial_data) - table_commit.commit(table_write.prepare_commit()) - table_write.close() - table_commit.close() - batch = pa.RecordBatch.from_pydict( - { - 'pk_str_a': [k[0] for k in test_keys], - 'pk_str_b': [k[1] for k in test_keys], - 'pk_int': [k[2] for k in test_keys], - 'value': [100 + i for i in range(len(test_keys))], - }, - schema=pa_schema, - ) + expected_bucket_first_row = 2 + first_row = initial_data.head(1) + batch = pa.RecordBatch.from_pandas(first_row, schema=pa_schema) extractor = FixedBucketRowKeyExtractor(table.table_schema) _, buckets = extractor.extract_partition_bucket_batch(batch) - splits = table.new_read_builder().new_scan().plan().splits() - split_buckets = {s.bucket for s in splits} - expected_buckets = set(buckets) - self.assertEqual(split_buckets, expected_buckets, "split buckets must match extractor") - self.assertGreaterEqual( - len(expected_buckets), 2, - "test keys should span at least 2 buckets (low-bit and high-bit hash)") + self.assertEqual(buckets[0], expected_bucket_first_row, + "bucket for first row (id=1) with num_buckets=4 must be %d" % expected_bucket_first_row) @parameterized.expand(get_file_format_params()) def test_read_pk_table(self, file_format): diff --git a/paimon-python/pypaimon/write/row_key_extractor.py b/paimon-python/pypaimon/write/row_key_extractor.py index 0f091463c64d..cbdc907cdd8b 100644 --- a/paimon-python/pypaimon/write/row_key_extractor.py +++ b/paimon-python/pypaimon/write/row_key_extractor.py @@ -15,34 +15,23 @@ # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ + import math import struct from abc import ABC, abstractmethod -from typing import Any, List, Tuple +from typing import List, Tuple import pyarrow as pa from pypaimon.common.options.core_options import CoreOptions from pypaimon.schema.table_schema import TableSchema from pypaimon.table.bucket_mode import BucketMode +from pypaimon.table.row.generic_row import GenericRow, GenericRowSerializer +from pypaimon.table.row.internal_row import RowKind _MURMUR_C1 = 0xCC9E2D51 _MURMUR_C2 = 0x1B873593 _DEFAULT_SEED = 42 -_HEADER_SIZE_IN_BITS = 8 - - -def _round_number_of_bytes_to_nearest_word(num_bytes: int) -> int: - remainder = num_bytes & 0x07 - return num_bytes if remainder == 0 else num_bytes + (8 - remainder) - - -def _calculate_bit_set_width_in_bytes(arity: int) -> int: - return ((arity + 63 + _HEADER_SIZE_IN_BITS) // 64) * 8 - - -def _get_fixed_length_part_size(arity: int) -> int: - return _calculate_bit_set_width_in_bytes(arity) + 8 * arity def _mix_k1(k1: int) -> int: @@ -60,7 +49,6 @@ def _mix_h1(h1: int, k1: int) -> int: def _fmix(h1: int, length: int) -> int: - # Finalization mix - force all bits of a hash block to avalanche h1 = (h1 ^ length) & 0xFFFFFFFF h1 ^= h1 >> 16 h1 = (h1 * 0x85EBCA6B) & 0xFFFFFFFF @@ -90,90 +78,6 @@ def _bucket_from_hash(hash_unsigned: int, num_buckets: int) -> int: return abs(rem) -def _type_name_for_bucket(field_type: Any) -> str: - if hasattr(field_type, "type") and isinstance(getattr(field_type, "type"), str): - return getattr(field_type, "type").upper() - s = str(field_type).upper() - if "BIGINT" in s or "LONG" in s: - return "BIGINT" - if "TINYINT" in s: - return "TINYINT" - if "SMALLINT" in s: - return "SMALLINT" - if "INT" in s or "INTEGER" in s: - return "INT" - if "STRING" in s or "VARCHAR" in s or "CHAR" in s: - return "STRING" - if "FLOAT" in s: - return "FLOAT" - if "DOUBLE" in s: - return "DOUBLE" - if "BOOLEAN" in s or "BOOL" in s: - return "BOOLEAN" - return s - - -def _to_binary_row_bytes(type_names: List[str], values: Tuple[Any, ...]) -> bytes: - arity = len(type_names) - assert arity == len(values) - null_size = _calculate_bit_set_width_in_bytes(arity) - fixed_size = _get_fixed_length_part_size(arity) - buf = bytearray(fixed_size) - buf[0] = 0 - var_cursor = fixed_size - var_parts = [] - for pos in range(arity): - type_name = type_names[pos] - value = values[pos] - field_offset = null_size + 8 * pos - if value is None: - byte_idx = (pos + _HEADER_SIZE_IN_BITS) // 8 - bit_idx = (pos + _HEADER_SIZE_IN_BITS) % 8 - buf[byte_idx] |= 1 << bit_idx - continue - if type_name in ("STRING", "VARCHAR", "CHAR"): - b = value.encode("utf-8") if isinstance(value, str) else value - length = len(b) - if length <= 7: - buf[field_offset:field_offset + length] = b - buf[field_offset + 7] = 0x80 | length - else: - rounded = _round_number_of_bytes_to_nearest_word(length) - var_parts.append((b, length, rounded)) - struct.pack_into(" List[int]: columns = [data.column(i) for i in self.bucket_key_indices] - hashes = [] - for row_idx in range(data.num_rows): - row_values = tuple(col[row_idx].as_py() for col in columns) - hashes.append(self._binary_row_hash_code(row_values)) - return [_bucket_from_hash(h, self.num_buckets) for h in hashes] + return [ + _bucket_from_hash(self._binary_row_hash_code(tuple(col[row_idx].as_py() for col in columns)), self.num_buckets) + for row_idx in range(data.num_rows) + ] - def _binary_row_hash_code(self, row_values: Tuple[Any, ...]) -> int: - row_bytes = _to_binary_row_bytes(self._bucket_key_type_names, row_values) - return _hash_bytes_by_words(row_bytes) + def _binary_row_hash_code(self, row_values: Tuple) -> int: + row = GenericRow(list(row_values), self._bucket_key_fields, RowKind.INSERT) + serialized = GenericRowSerializer.to_bytes(row) + return _hash_bytes_by_words(serialized[4:]) class UnawareBucketRowKeyExtractor(RowKeyExtractor): @@ -290,4 +193,4 @@ def __init__(self, table_schema: TableSchema): raise ValueError(f"Postpone bucket mode requires bucket = -2, got {num_buckets}") def _extract_buckets_batch(self, data: pa.RecordBatch) -> List[int]: - return [BucketMode.POSTPONE_BUCKET.value] * data.num_rows + return [BucketMode.POSTPONE_BUCKET.value] * data.num_rows \ No newline at end of file From 0d75c56dd7ff23ff4e45105d0001c7614ab8b019 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Tue, 3 Feb 2026 22:11:23 +0800 Subject: [PATCH 6/7] fix code format --- .../java/org/apache/paimon/JavaPyE2ETest.java | 48 +++++++++---------- 1 file changed, 24 insertions(+), 24 deletions(-) diff --git a/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java b/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java index 1cd881b9cf6f..ca5e2ef72c2e 100644 --- a/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java +++ b/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java @@ -412,31 +412,31 @@ public void testReadPkTable() throws Exception { "+I[4, Broccoli, Vegetable, 1.2, 1970-01-01T00:16:40.003, 1970-01-01T00:33:20.003, (store2, 1004, (Seoul, Korea))]", "+I[5, Chicken, Meat, 5.0, 1970-01-01T00:16:40.004, 1970-01-01T00:33:20.004, (store3, 1005, (NewYork, USA))]", "+I[6, Beef, Meat, 8.0, 1970-01-01T00:16:40.005, 1970-01-01T00:33:20.005, (store3, 1006, (London, UK))]"); - } - PredicateBuilder predicateBuilder = new PredicateBuilder(table.rowType()); - int[] ids = {1, 2, 3, 4, 5, 6}; - String[] names = {"Apple", "Banana", "Carrot", "Broccoli", "Chicken", "Beef"}; - for (int i = 0; i < ids.length; i++) { - int id = ids[i]; - String expectedName = names[i]; - Predicate predicate = predicateBuilder.equal(0, id); - ReadBuilder readBuilder = fileStoreTable.newReadBuilder().withFilter(predicate); - List byKey = - getResult( - readBuilder.newRead(), - readBuilder.newScan().plan().splits(), - row -> rowToStringWithStruct(row, table.rowType())); - List matching = - byKey.stream() - .filter(s -> s.trim().startsWith("+I[" + id + ", ")) - .collect(Collectors.toList()); - assertThat(matching) - .as( - "Python wrote row id=%d; Java read with predicate id=%d should return it.", - id, id) - .hasSize(1); - assertThat(matching.get(0)).contains(String.valueOf(id)).contains(expectedName); + PredicateBuilder predicateBuilder = new PredicateBuilder(table.rowType()); + int[] ids = {1, 2, 3, 4, 5, 6}; + String[] names = {"Apple", "Banana", "Carrot", "Broccoli", "Chicken", "Beef"}; + for (int i = 0; i < ids.length; i++) { + int id = ids[i]; + String expectedName = names[i]; + Predicate predicate = predicateBuilder.equal(0, id); + ReadBuilder readBuilder = fileStoreTable.newReadBuilder().withFilter(predicate); + List byKey = + getResult( + readBuilder.newRead(), + readBuilder.newScan().plan().splits(), + row -> rowToStringWithStruct(row, table.rowType())); + List matching = + byKey.stream() + .filter(s -> s.trim().startsWith("+I[" + id + ", ")) + .collect(Collectors.toList()); + assertThat(matching) + .as( + "Python wrote row id=%d; Java read with predicate id=%d should return it.", + id, id) + .hasSize(1); + assertThat(matching.get(0)).contains(String.valueOf(id)).contains(expectedName); + } } } From c9a036b4c42c552eaa4b29eab5628ac23bf552bb Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Tue, 3 Feb 2026 22:31:34 +0800 Subject: [PATCH 7/7] fix code format --- paimon-python/pypaimon/write/row_key_extractor.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/paimon-python/pypaimon/write/row_key_extractor.py b/paimon-python/pypaimon/write/row_key_extractor.py index cbdc907cdd8b..9ff23404c3f5 100644 --- a/paimon-python/pypaimon/write/row_key_extractor.py +++ b/paimon-python/pypaimon/write/row_key_extractor.py @@ -140,7 +140,10 @@ def __init__(self, table_schema: TableSchema): def _extract_buckets_batch(self, data: pa.RecordBatch) -> List[int]: columns = [data.column(i) for i in self.bucket_key_indices] return [ - _bucket_from_hash(self._binary_row_hash_code(tuple(col[row_idx].as_py() for col in columns)), self.num_buckets) + _bucket_from_hash( + self._binary_row_hash_code(tuple(col[row_idx].as_py() for col in columns)), + self.num_buckets, + ) for row_idx in range(data.num_rows) ] @@ -193,4 +196,4 @@ def __init__(self, table_schema: TableSchema): raise ValueError(f"Postpone bucket mode requires bucket = -2, got {num_buckets}") def _extract_buckets_batch(self, data: pa.RecordBatch) -> List[int]: - return [BucketMode.POSTPONE_BUCKET.value] * data.num_rows \ No newline at end of file + return [BucketMode.POSTPONE_BUCKET.value] * data.num_rows