diff --git a/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java b/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java index 507b3a8b6985..ca5e2ef72c2e 100644 --- a/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java +++ b/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java @@ -34,6 +34,7 @@ import org.apache.paimon.manifest.IndexManifestEntry; import org.apache.paimon.options.MemorySize; import org.apache.paimon.options.Options; +import org.apache.paimon.predicate.Predicate; import org.apache.paimon.predicate.PredicateBuilder; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaManager; @@ -62,6 +63,8 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.condition.EnabledIfSystemProperty; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.nio.file.Files; import java.nio.file.Paths; @@ -72,6 +75,7 @@ import java.util.UUID; import java.util.function.Consumer; import java.util.function.Function; +import java.util.stream.Collectors; import static org.apache.paimon.CoreOptions.BUCKET; import static org.apache.paimon.CoreOptions.DATA_EVOLUTION_ENABLED; @@ -88,6 +92,8 @@ /** Mixed language overwrite test for Java and Python interoperability. */ public class JavaPyE2ETest { + private static final Logger LOG = LoggerFactory.getLogger(JavaPyE2ETest.class); + java.nio.file.Path tempDir = Paths.get("../paimon-python/pypaimon/tests/e2e").toAbsolutePath(); // Fields from TableTestBase that we need @@ -131,7 +137,7 @@ public void testReadAppendTable() throws Exception { read, splits, row -> DataFormatTestUtil.toStringNoRowKind(row, table.rowType())); - System.out.println(res); + LOG.info("Read append table: {} row(s)", res.size()); } } @@ -391,7 +397,7 @@ public void testReadPkTable() throws Exception { TableRead read = fileStoreTable.newRead(); List res = getResult(read, splits, row -> rowToStringWithStruct(row, table.rowType())); - System.out.println("Result for " + format + " : " + res); + LOG.info("Result for {}: {} row(s)", format, res.size()); assertThat(table.rowType().getFieldTypes().get(4)).isEqualTo(DataTypes.TIMESTAMP()); assertThat(table.rowType().getFieldTypes().get(5)) .isEqualTo(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()); @@ -406,6 +412,31 @@ public void testReadPkTable() throws Exception { "+I[4, Broccoli, Vegetable, 1.2, 1970-01-01T00:16:40.003, 1970-01-01T00:33:20.003, (store2, 1004, (Seoul, Korea))]", "+I[5, Chicken, Meat, 5.0, 1970-01-01T00:16:40.004, 1970-01-01T00:33:20.004, (store3, 1005, (NewYork, USA))]", "+I[6, Beef, Meat, 8.0, 1970-01-01T00:16:40.005, 1970-01-01T00:33:20.005, (store3, 1006, (London, UK))]"); + + PredicateBuilder predicateBuilder = new PredicateBuilder(table.rowType()); + int[] ids = {1, 2, 3, 4, 5, 6}; + String[] names = {"Apple", "Banana", "Carrot", "Broccoli", "Chicken", "Beef"}; + for (int i = 0; i < ids.length; i++) { + int id = ids[i]; + String expectedName = names[i]; + Predicate predicate = predicateBuilder.equal(0, id); + ReadBuilder readBuilder = fileStoreTable.newReadBuilder().withFilter(predicate); + List byKey = + getResult( + readBuilder.newRead(), + readBuilder.newScan().plan().splits(), + row -> rowToStringWithStruct(row, table.rowType())); + List matching = + byKey.stream() + .filter(s -> s.trim().startsWith("+I[" + id + ", ")) + .collect(Collectors.toList()); + assertThat(matching) + .as( + "Python wrote row id=%d; Java read with predicate id=%d should return it.", + id, id) + .hasSize(1); + assertThat(matching.get(0)).contains(String.valueOf(id)).contains(expectedName); + } } } diff --git a/paimon-python/dev/run_mixed_tests.sh b/paimon-python/dev/run_mixed_tests.sh index ed7f54e0146a..c6cd49e1ec31 100755 --- a/paimon-python/dev/run_mixed_tests.sh +++ b/paimon-python/dev/run_mixed_tests.sh @@ -117,12 +117,12 @@ run_python_read_test() { # Function to run Python Write test for Python-Write-Java-Read scenario run_python_write_test() { - echo -e "${YELLOW}=== Step 3: Running Python Write Test (JavaPyReadWriteTest.test_py_write_read_pk_table) ===${NC}" + echo -e "${YELLOW}=== Step 3: Running Python Write Test (test_py_write_read_pk_table) ===${NC}" cd "$PAIMON_PYTHON_DIR" - # Run the parameterized Python test method for writing data (runs for both Parquet/Orc/Avro and Lance) - echo "Running Python test for JavaPyReadWriteTest.test_py_write_read_pk_table (Python Write)..." + # Run the parameterized Python test method for writing data (pk table, includes bucket num assertion) + echo "Running Python test for JavaPyReadWriteTest (test_py_write_read_pk_table)..." if python -m pytest java_py_read_write_test.py::JavaPyReadWriteTest -k "test_py_write_read_pk_table" -v; then echo -e "${GREEN}✓ Python write test completed successfully${NC}" return 0 @@ -134,7 +134,7 @@ run_python_write_test() { # Function to run Java Read test for Python-Write-Java-Read scenario run_java_read_test() { - echo -e "${YELLOW}=== Step 4: Running Java Read Test (JavaPyE2ETest.testReadPkTable for Parquet/Orc/Avro, JavaPyLanceE2ETest.testReadPkTableLance for Lance) ===${NC}" + echo -e "${YELLOW}=== Step 4: Running Java Read Test (testReadPkTable, testReadPkTableLance) ===${NC}" cd "$PROJECT_ROOT" @@ -361,9 +361,9 @@ main() { fi if [[ $python_write_result -eq 0 ]]; then - echo -e "${GREEN}✓ Python Write Test (JavaPyReadWriteTest.test_py_write_read_pk_table): PASSED${NC}" + echo -e "${GREEN}✓ Python Write Test (test_py_write_read_pk_table): PASSED${NC}" else - echo -e "${RED}✗ Python Write Test (JavaPyReadWriteTest.test_py_write_read_pk_table): FAILED${NC}" + echo -e "${RED}✗ Python Write Test (test_py_write_read_pk_table): FAILED${NC}" fi if [[ $java_read_result -eq 0 ]]; then diff --git a/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py b/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py index 3180d7b69913..71c7e43cb44b 100644 --- a/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py +++ b/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py @@ -146,7 +146,7 @@ def test_py_write_read_pk_table(self, file_format): primary_keys=['id'], options={ 'dynamic-partition-overwrite': 'false', - 'bucket': '2', + 'bucket': '4', 'file.format': file_format, "orc.timestamp-ltz.legacy.type": "false" } @@ -208,6 +208,15 @@ def test_py_write_read_pk_table(self, file_format): actual_names = set(initial_result['name'].tolist()) self.assertEqual(actual_names, expected_names) + from pypaimon.write.row_key_extractor import FixedBucketRowKeyExtractor + expected_bucket_first_row = 2 + first_row = initial_data.head(1) + batch = pa.RecordBatch.from_pandas(first_row, schema=pa_schema) + extractor = FixedBucketRowKeyExtractor(table.table_schema) + _, buckets = extractor.extract_partition_bucket_batch(batch) + self.assertEqual(buckets[0], expected_bucket_first_row, + "bucket for first row (id=1) with num_buckets=4 must be %d" % expected_bucket_first_row) + @parameterized.expand(get_file_format_params()) def test_read_pk_table(self, file_format): # Skip ORC format for Python < 3.8 due to pyarrow limitation with TIMESTAMP_INSTANT diff --git a/paimon-python/pypaimon/tests/py36/ao_simple_test.py b/paimon-python/pypaimon/tests/py36/ao_simple_test.py index f6bcfa976711..2aba35427ca2 100644 --- a/paimon-python/pypaimon/tests/py36/ao_simple_test.py +++ b/paimon-python/pypaimon/tests/py36/ao_simple_test.py @@ -147,10 +147,10 @@ def test_with_shard_ao_fixed_bucket(self): splits = read_builder.new_scan().with_shard(0, 3).plan().splits() actual = table_sort_by(table_read.to_arrow(splits), 'user_id') expected = pa.Table.from_pydict({ - 'user_id': [1, 2, 3, 5, 8, 12], - 'item_id': [1001, 1002, 1003, 1005, 1008, 1012], - 'behavior': ['a', 'b', 'c', 'd', 'g', 'k'], - 'dt': ['p1', 'p1', 'p2', 'p2', 'p1', 'p1'], + 'user_id': [1, 2, 3, 4, 5, 13], + 'item_id': [1001, 1002, 1003, 1004, 1005, 1013], + 'behavior': ['a', 'b', 'c', None, 'd', 'l'], + 'dt': ['p1', 'p1', 'p2', 'p1', 'p2', 'p2'], }, schema=self.pa_schema) self.assertEqual(actual, expected) diff --git a/paimon-python/pypaimon/tests/rest/rest_simple_test.py b/paimon-python/pypaimon/tests/rest/rest_simple_test.py index 3e29f950df2b..c00979ffd40d 100644 --- a/paimon-python/pypaimon/tests/rest/rest_simple_test.py +++ b/paimon-python/pypaimon/tests/rest/rest_simple_test.py @@ -205,10 +205,10 @@ def test_with_shard_ao_fixed_bucket(self): splits = read_builder.new_scan().with_shard(0, 3).plan().splits() actual = table_sort_by(table_read.to_arrow(splits), 'user_id') expected = pa.Table.from_pydict({ - 'user_id': [1, 2, 3, 5, 8, 12], - 'item_id': [1001, 1002, 1003, 1005, 1008, 1012], - 'behavior': ['a', 'b', 'c', 'd', 'g', 'k'], - 'dt': ['p1', 'p1', 'p2', 'p2', 'p1', 'p1'], + 'user_id': [1, 2, 3, 4, 5, 13], + 'item_id': [1001, 1002, 1003, 1004, 1005, 1013], + 'behavior': ['a', 'b', 'c', None, 'd', 'l'], + 'dt': ['p1', 'p1', 'p2', 'p1', 'p2', 'p2'], }, schema=self.pa_schema) self.assertEqual(actual, expected) @@ -636,13 +636,13 @@ def test_with_shard_pk_fixed_bucket(self): table_read = read_builder.new_read() actual = table_read.to_arrow(splits) data_expected = { - 'user_id': [4, 6, 2, 10, 8], - 'item_id': [1002, 1003, 1001, 1005, 1004], - 'behavior': ['b', 'c', 'a', 'e', 'd'], - 'dt': ['2025-08-10', '2025-08-11', '2000-10-10', '2025-08-13', '2025-08-12'] + 'user_id': [4, 6, 2, 8, 10], + 'item_id': [1002, 1003, 1001, 1004, 1005], + 'behavior': ['b', 'c', 'a', 'd', 'e'], + 'dt': ['2025-08-10', '2025-08-11', '2000-10-10', '2025-08-12', '2025-08-13'] } expected = pa.Table.from_pydict(data_expected, schema=self.pa_schema) - self.assertEqual(actual, expected) + self.assertEqual(actual.combine_chunks(), expected.combine_chunks()) def test_with_shard_uniform_division(self): schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt']) diff --git a/paimon-python/pypaimon/write/row_key_extractor.py b/paimon-python/pypaimon/write/row_key_extractor.py index 7deebdb5b53d..9ff23404c3f5 100644 --- a/paimon-python/pypaimon/write/row_key_extractor.py +++ b/paimon-python/pypaimon/write/row_key_extractor.py @@ -16,8 +16,8 @@ # limitations under the License. ################################################################################ -import hashlib -import json +import math +import struct from abc import ABC, abstractmethod from typing import List, Tuple @@ -26,6 +26,56 @@ from pypaimon.common.options.core_options import CoreOptions from pypaimon.schema.table_schema import TableSchema from pypaimon.table.bucket_mode import BucketMode +from pypaimon.table.row.generic_row import GenericRow, GenericRowSerializer +from pypaimon.table.row.internal_row import RowKind + +_MURMUR_C1 = 0xCC9E2D51 +_MURMUR_C2 = 0x1B873593 +_DEFAULT_SEED = 42 + + +def _mix_k1(k1: int) -> int: + k1 = (k1 * _MURMUR_C1) & 0xFFFFFFFF + k1 = ((k1 << 15) | (k1 >> 17)) & 0xFFFFFFFF + k1 = (k1 * _MURMUR_C2) & 0xFFFFFFFF + return k1 + + +def _mix_h1(h1: int, k1: int) -> int: + h1 = (h1 ^ k1) & 0xFFFFFFFF + h1 = ((h1 << 13) | (h1 >> 19)) & 0xFFFFFFFF + h1 = (h1 * 5 + 0xE6546B64) & 0xFFFFFFFF + return h1 + + +def _fmix(h1: int, length: int) -> int: + h1 = (h1 ^ length) & 0xFFFFFFFF + h1 ^= h1 >> 16 + h1 = (h1 * 0x85EBCA6B) & 0xFFFFFFFF + h1 ^= h1 >> 13 + h1 = (h1 * 0xC2B2AE35) & 0xFFFFFFFF + h1 ^= h1 >> 16 + return h1 + + +def _hash_bytes_by_words(data: bytes, seed: int = _DEFAULT_SEED) -> int: + n = len(data) + length_aligned = n - (n % 4) + h1 = seed + for i in range(0, length_aligned, 4): + k1 = struct.unpack_from(" int: + if hash_unsigned >= 0x80000000: + hash_signed = hash_unsigned - 0x100000000 + else: + hash_signed = hash_unsigned + rem = hash_signed - math.trunc(hash_signed / num_buckets) * num_buckets + return abs(rem) class RowKeyExtractor(ABC): @@ -82,19 +132,25 @@ def __init__(self, table_schema: TableSchema): if pk not in table_schema.partition_keys] self.bucket_key_indices = self._get_field_indices(self.bucket_keys) + field_map = {f.name: f for f in table_schema.fields} + self._bucket_key_fields = [ + field_map[name] for name in self.bucket_keys if name in field_map + ] def _extract_buckets_batch(self, data: pa.RecordBatch) -> List[int]: columns = [data.column(i) for i in self.bucket_key_indices] - hashes = [] - for row_idx in range(data.num_rows): - row_values = tuple(col[row_idx].as_py() for col in columns) - hashes.append(self.hash(row_values)) - return [abs(hash_val) % self.num_buckets for hash_val in hashes] + return [ + _bucket_from_hash( + self._binary_row_hash_code(tuple(col[row_idx].as_py() for col in columns)), + self.num_buckets, + ) + for row_idx in range(data.num_rows) + ] - @staticmethod - def hash(data) -> int: - data_json = json.dumps(data) - return int(hashlib.md5(data_json.encode()).hexdigest(), 16) + def _binary_row_hash_code(self, row_values: Tuple) -> int: + row = GenericRow(list(row_values), self._bucket_key_fields, RowKind.INSERT) + serialized = GenericRowSerializer.to_bytes(row) + return _hash_bytes_by_words(serialized[4:]) class UnawareBucketRowKeyExtractor(RowKeyExtractor): @@ -126,7 +182,7 @@ def __init__(self, table_schema: 'TableSchema'): f"Only 'bucket' = '-1' is allowed for 'DynamicBucketRowKeyExtractor', but found: {num_buckets}" ) - def _extract_buckets_batch(self, data: pa.RecordBatch) -> int: + def _extract_buckets_batch(self, data: pa.RecordBatch) -> List[int]: raise ValueError("Can't extract bucket from row in dynamic bucket mode")