Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 33 additions & 2 deletions paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
Expand Down Expand Up @@ -62,6 +63,8 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.file.Files;
import java.nio.file.Paths;
Expand All @@ -72,6 +75,7 @@
import java.util.UUID;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.apache.paimon.CoreOptions.BUCKET;
import static org.apache.paimon.CoreOptions.DATA_EVOLUTION_ENABLED;
Expand All @@ -88,6 +92,8 @@
/** Mixed language overwrite test for Java and Python interoperability. */
public class JavaPyE2ETest {

private static final Logger LOG = LoggerFactory.getLogger(JavaPyE2ETest.class);

java.nio.file.Path tempDir = Paths.get("../paimon-python/pypaimon/tests/e2e").toAbsolutePath();

// Fields from TableTestBase that we need
Expand Down Expand Up @@ -131,7 +137,7 @@ public void testReadAppendTable() throws Exception {
read,
splits,
row -> DataFormatTestUtil.toStringNoRowKind(row, table.rowType()));
System.out.println(res);
LOG.info("Read append table: {} row(s)", res.size());
}
}

Expand Down Expand Up @@ -391,7 +397,7 @@ public void testReadPkTable() throws Exception {
TableRead read = fileStoreTable.newRead();
List<String> res =
getResult(read, splits, row -> rowToStringWithStruct(row, table.rowType()));
System.out.println("Result for " + format + " : " + res);
LOG.info("Result for {}: {} row(s)", format, res.size());
assertThat(table.rowType().getFieldTypes().get(4)).isEqualTo(DataTypes.TIMESTAMP());
assertThat(table.rowType().getFieldTypes().get(5))
.isEqualTo(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE());
Expand All @@ -406,6 +412,31 @@ public void testReadPkTable() throws Exception {
"+I[4, Broccoli, Vegetable, 1.2, 1970-01-01T00:16:40.003, 1970-01-01T00:33:20.003, (store2, 1004, (Seoul, Korea))]",
"+I[5, Chicken, Meat, 5.0, 1970-01-01T00:16:40.004, 1970-01-01T00:33:20.004, (store3, 1005, (NewYork, USA))]",
"+I[6, Beef, Meat, 8.0, 1970-01-01T00:16:40.005, 1970-01-01T00:33:20.005, (store3, 1006, (London, UK))]");

PredicateBuilder predicateBuilder = new PredicateBuilder(table.rowType());
int[] ids = {1, 2, 3, 4, 5, 6};
String[] names = {"Apple", "Banana", "Carrot", "Broccoli", "Chicken", "Beef"};
for (int i = 0; i < ids.length; i++) {
int id = ids[i];
String expectedName = names[i];
Predicate predicate = predicateBuilder.equal(0, id);
ReadBuilder readBuilder = fileStoreTable.newReadBuilder().withFilter(predicate);
List<String> byKey =
getResult(
readBuilder.newRead(),
readBuilder.newScan().plan().splits(),
row -> rowToStringWithStruct(row, table.rowType()));
List<String> matching =
byKey.stream()
.filter(s -> s.trim().startsWith("+I[" + id + ", "))
.collect(Collectors.toList());
assertThat(matching)
.as(
"Python wrote row id=%d; Java read with predicate id=%d should return it.",
id, id)
.hasSize(1);
assertThat(matching.get(0)).contains(String.valueOf(id)).contains(expectedName);
}
}
}

Expand Down
12 changes: 6 additions & 6 deletions paimon-python/dev/run_mixed_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,12 @@ run_python_read_test() {

# Function to run Python Write test for Python-Write-Java-Read scenario
run_python_write_test() {
echo -e "${YELLOW}=== Step 3: Running Python Write Test (JavaPyReadWriteTest.test_py_write_read_pk_table) ===${NC}"
echo -e "${YELLOW}=== Step 3: Running Python Write Test (test_py_write_read_pk_table) ===${NC}"

cd "$PAIMON_PYTHON_DIR"

# Run the parameterized Python test method for writing data (runs for both Parquet/Orc/Avro and Lance)
echo "Running Python test for JavaPyReadWriteTest.test_py_write_read_pk_table (Python Write)..."
# Run the parameterized Python test method for writing data (pk table, includes bucket num assertion)
echo "Running Python test for JavaPyReadWriteTest (test_py_write_read_pk_table)..."
if python -m pytest java_py_read_write_test.py::JavaPyReadWriteTest -k "test_py_write_read_pk_table" -v; then
echo -e "${GREEN}✓ Python write test completed successfully${NC}"
return 0
Expand All @@ -134,7 +134,7 @@ run_python_write_test() {

# Function to run Java Read test for Python-Write-Java-Read scenario
run_java_read_test() {
echo -e "${YELLOW}=== Step 4: Running Java Read Test (JavaPyE2ETest.testReadPkTable for Parquet/Orc/Avro, JavaPyLanceE2ETest.testReadPkTableLance for Lance) ===${NC}"
echo -e "${YELLOW}=== Step 4: Running Java Read Test (testReadPkTable, testReadPkTableLance) ===${NC}"

cd "$PROJECT_ROOT"

Expand Down Expand Up @@ -361,9 +361,9 @@ main() {
fi

if [[ $python_write_result -eq 0 ]]; then
echo -e "${GREEN}✓ Python Write Test (JavaPyReadWriteTest.test_py_write_read_pk_table): PASSED${NC}"
echo -e "${GREEN}✓ Python Write Test (test_py_write_read_pk_table): PASSED${NC}"
else
echo -e "${RED}✗ Python Write Test (JavaPyReadWriteTest.test_py_write_read_pk_table): FAILED${NC}"
echo -e "${RED}✗ Python Write Test (test_py_write_read_pk_table): FAILED${NC}"
fi

if [[ $java_read_result -eq 0 ]]; then
Expand Down
11 changes: 10 additions & 1 deletion paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ def test_py_write_read_pk_table(self, file_format):
primary_keys=['id'],
options={
'dynamic-partition-overwrite': 'false',
'bucket': '2',
'bucket': '4',
'file.format': file_format,
"orc.timestamp-ltz.legacy.type": "false"
}
Expand Down Expand Up @@ -208,6 +208,15 @@ def test_py_write_read_pk_table(self, file_format):
actual_names = set(initial_result['name'].tolist())
self.assertEqual(actual_names, expected_names)

from pypaimon.write.row_key_extractor import FixedBucketRowKeyExtractor
expected_bucket_first_row = 2
first_row = initial_data.head(1)
batch = pa.RecordBatch.from_pandas(first_row, schema=pa_schema)
extractor = FixedBucketRowKeyExtractor(table.table_schema)
_, buckets = extractor.extract_partition_bucket_batch(batch)
self.assertEqual(buckets[0], expected_bucket_first_row,
"bucket for first row (id=1) with num_buckets=4 must be %d" % expected_bucket_first_row)

@parameterized.expand(get_file_format_params())
def test_read_pk_table(self, file_format):
# Skip ORC format for Python < 3.8 due to pyarrow limitation with TIMESTAMP_INSTANT
Expand Down
8 changes: 4 additions & 4 deletions paimon-python/pypaimon/tests/py36/ao_simple_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,10 +147,10 @@ def test_with_shard_ao_fixed_bucket(self):
splits = read_builder.new_scan().with_shard(0, 3).plan().splits()
actual = table_sort_by(table_read.to_arrow(splits), 'user_id')
expected = pa.Table.from_pydict({
'user_id': [1, 2, 3, 5, 8, 12],
'item_id': [1001, 1002, 1003, 1005, 1008, 1012],
'behavior': ['a', 'b', 'c', 'd', 'g', 'k'],
'dt': ['p1', 'p1', 'p2', 'p2', 'p1', 'p1'],
'user_id': [1, 2, 3, 4, 5, 13],
'item_id': [1001, 1002, 1003, 1004, 1005, 1013],
'behavior': ['a', 'b', 'c', None, 'd', 'l'],
'dt': ['p1', 'p1', 'p2', 'p1', 'p2', 'p2'],
}, schema=self.pa_schema)
self.assertEqual(actual, expected)

Expand Down
18 changes: 9 additions & 9 deletions paimon-python/pypaimon/tests/rest/rest_simple_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,10 +205,10 @@ def test_with_shard_ao_fixed_bucket(self):
splits = read_builder.new_scan().with_shard(0, 3).plan().splits()
actual = table_sort_by(table_read.to_arrow(splits), 'user_id')
expected = pa.Table.from_pydict({
'user_id': [1, 2, 3, 5, 8, 12],
'item_id': [1001, 1002, 1003, 1005, 1008, 1012],
'behavior': ['a', 'b', 'c', 'd', 'g', 'k'],
'dt': ['p1', 'p1', 'p2', 'p2', 'p1', 'p1'],
'user_id': [1, 2, 3, 4, 5, 13],
'item_id': [1001, 1002, 1003, 1004, 1005, 1013],
'behavior': ['a', 'b', 'c', None, 'd', 'l'],
'dt': ['p1', 'p1', 'p2', 'p1', 'p2', 'p2'],
}, schema=self.pa_schema)
self.assertEqual(actual, expected)

Expand Down Expand Up @@ -636,13 +636,13 @@ def test_with_shard_pk_fixed_bucket(self):
table_read = read_builder.new_read()
actual = table_read.to_arrow(splits)
data_expected = {
'user_id': [4, 6, 2, 10, 8],
'item_id': [1002, 1003, 1001, 1005, 1004],
'behavior': ['b', 'c', 'a', 'e', 'd'],
'dt': ['2025-08-10', '2025-08-11', '2000-10-10', '2025-08-13', '2025-08-12']
'user_id': [4, 6, 2, 8, 10],
'item_id': [1002, 1003, 1001, 1004, 1005],
'behavior': ['b', 'c', 'a', 'd', 'e'],
'dt': ['2025-08-10', '2025-08-11', '2000-10-10', '2025-08-12', '2025-08-13']
}
expected = pa.Table.from_pydict(data_expected, schema=self.pa_schema)
self.assertEqual(actual, expected)
self.assertEqual(actual.combine_chunks(), expected.combine_chunks())

def test_with_shard_uniform_division(self):
schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt'])
Expand Down
80 changes: 68 additions & 12 deletions paimon-python/pypaimon/write/row_key_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
# limitations under the License.
################################################################################

import hashlib
import json
import math
import struct
from abc import ABC, abstractmethod
from typing import List, Tuple

Expand All @@ -26,6 +26,56 @@
from pypaimon.common.options.core_options import CoreOptions
from pypaimon.schema.table_schema import TableSchema
from pypaimon.table.bucket_mode import BucketMode
from pypaimon.table.row.generic_row import GenericRow, GenericRowSerializer
from pypaimon.table.row.internal_row import RowKind

_MURMUR_C1 = 0xCC9E2D51
_MURMUR_C2 = 0x1B873593
_DEFAULT_SEED = 42


def _mix_k1(k1: int) -> int:
k1 = (k1 * _MURMUR_C1) & 0xFFFFFFFF
k1 = ((k1 << 15) | (k1 >> 17)) & 0xFFFFFFFF
k1 = (k1 * _MURMUR_C2) & 0xFFFFFFFF
return k1


def _mix_h1(h1: int, k1: int) -> int:
h1 = (h1 ^ k1) & 0xFFFFFFFF
h1 = ((h1 << 13) | (h1 >> 19)) & 0xFFFFFFFF
h1 = (h1 * 5 + 0xE6546B64) & 0xFFFFFFFF
return h1


def _fmix(h1: int, length: int) -> int:
h1 = (h1 ^ length) & 0xFFFFFFFF
h1 ^= h1 >> 16
h1 = (h1 * 0x85EBCA6B) & 0xFFFFFFFF
h1 ^= h1 >> 13
h1 = (h1 * 0xC2B2AE35) & 0xFFFFFFFF
h1 ^= h1 >> 16
return h1


def _hash_bytes_by_words(data: bytes, seed: int = _DEFAULT_SEED) -> int:
n = len(data)
length_aligned = n - (n % 4)
h1 = seed
for i in range(0, length_aligned, 4):
k1 = struct.unpack_from("<I", data, i)[0]
k1 = _mix_k1(k1)
h1 = _mix_h1(h1, k1)
return _fmix(h1, n)


def _bucket_from_hash(hash_unsigned: int, num_buckets: int) -> int:
if hash_unsigned >= 0x80000000:
hash_signed = hash_unsigned - 0x100000000
else:
hash_signed = hash_unsigned
rem = hash_signed - math.trunc(hash_signed / num_buckets) * num_buckets
return abs(rem)


class RowKeyExtractor(ABC):
Expand Down Expand Up @@ -82,19 +132,25 @@ def __init__(self, table_schema: TableSchema):
if pk not in table_schema.partition_keys]

self.bucket_key_indices = self._get_field_indices(self.bucket_keys)
field_map = {f.name: f for f in table_schema.fields}
self._bucket_key_fields = [
field_map[name] for name in self.bucket_keys if name in field_map
]

def _extract_buckets_batch(self, data: pa.RecordBatch) -> List[int]:
columns = [data.column(i) for i in self.bucket_key_indices]
hashes = []
for row_idx in range(data.num_rows):
row_values = tuple(col[row_idx].as_py() for col in columns)
hashes.append(self.hash(row_values))
return [abs(hash_val) % self.num_buckets for hash_val in hashes]
return [
_bucket_from_hash(
self._binary_row_hash_code(tuple(col[row_idx].as_py() for col in columns)),
self.num_buckets,
)
for row_idx in range(data.num_rows)
]

@staticmethod
def hash(data) -> int:
data_json = json.dumps(data)
return int(hashlib.md5(data_json.encode()).hexdigest(), 16)
def _binary_row_hash_code(self, row_values: Tuple) -> int:
row = GenericRow(list(row_values), self._bucket_key_fields, RowKind.INSERT)
serialized = GenericRowSerializer.to_bytes(row)
return _hash_bytes_by_words(serialized[4:])


class UnawareBucketRowKeyExtractor(RowKeyExtractor):
Expand Down Expand Up @@ -126,7 +182,7 @@ def __init__(self, table_schema: 'TableSchema'):
f"Only 'bucket' = '-1' is allowed for 'DynamicBucketRowKeyExtractor', but found: {num_buckets}"
)

def _extract_buckets_batch(self, data: pa.RecordBatch) -> int:
def _extract_buckets_batch(self, data: pa.RecordBatch) -> List[int]:
raise ValueError("Can't extract bucket from row in dynamic bucket mode")


Expand Down
Loading