From ad00fac2681484a9d4410aab45a774e380720782 Mon Sep 17 00:00:00 2001 From: JingsongLi Date: Tue, 3 Feb 2026 21:26:38 +0800 Subject: [PATCH 1/4] [python] Data Evolution with_slice should use row id to do slice --- .../scanner/data_evolution_split_generator.py | 165 +++++++++++------- 1 file changed, 102 insertions(+), 63 deletions(-) diff --git a/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py b/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py index f52efb7ae19f..a6a16d1160c1 100644 --- a/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py +++ b/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py @@ -24,7 +24,6 @@ from pypaimon.manifest.schema.manifest_entry import ManifestEntry from pypaimon.read.scanner.split_generator import AbstractSplitGenerator from pypaimon.read.split import DataSplit, Split -from pypaimon.read.sliced_split import SlicedSplit class DataEvolutionSplitGenerator(AbstractSplitGenerator): @@ -65,17 +64,14 @@ def sort_key(manifest_entry: ManifestEntry) -> tuple: for entry in sorted_entries: partitioned_files[(tuple(entry.partition.values), entry.bucket)].append(entry) - plan_start_pos = 0 - plan_end_pos = 0 + slice_row_ranges = None # Row ID ranges for slice-based filtering if self.start_pos_of_this_subtask is not None: - # shard data range: [plan_start_pos, plan_end_pos) - partitioned_files, plan_start_pos, plan_end_pos = \ - self._filter_by_row_range( - partitioned_files, - self.start_pos_of_this_subtask, - self.end_pos_of_this_subtask - ) + # Calculate Row ID range for slice-based filtering + slice_row_ranges = self._calculate_slice_row_ranges(partitioned_files) + if slice_row_ranges: + # Filter files by Row ID range + partitioned_files = self._filter_files_by_row_ranges(partitioned_files, slice_row_ranges) elif self.idx_of_this_subtask is not None: partitioned_files = self._filter_by_shard( partitioned_files, self.idx_of_this_subtask, self.number_of_para_subtasks @@ -109,11 +105,15 @@ def weight_func(file_list: List[DataFileMeta]) -> int: flatten_packed_files, packed_files, sorted_entries_list ) - if self.start_pos_of_this_subtask is not None: - splits = self._wrap_to_sliced_splits(splits, plan_start_pos, plan_end_pos) - # Wrap splits with IndexedSplit if row_ranges is provided - if self.row_ranges: - splits = self._wrap_to_indexed_splits(splits) + # merge slice_row_ranges and self.row_ranges + if slice_row_ranges is None: + slice_row_ranges = self.row_ranges + elif self.row_ranges is not None: + slice_row_ranges = Range.and_(slice_row_ranges, self.row_ranges) + + # Wrap splits with IndexedSplit for slice-based filtering or row_ranges + if slice_row_ranges: + splits = self._wrap_to_indexed_splits(splits, slice_row_ranges) return splits @@ -171,76 +171,115 @@ def _build_split_from_pack_for_data_evolution( splits.append(split) return splits - def _wrap_to_sliced_splits(self, splits: List[Split], plan_start_pos: int, plan_end_pos: int) -> List[Split]: + def _calculate_slice_row_ranges(self, partitioned_files: defaultdict) -> List[Range]: """ - Wrap splits with SlicedSplit to add file-level slicing information. + Calculate Row ID ranges for slice-based filtering based on start_pos and end_pos. """ - sliced_splits = [] - file_end_pos = 0 # end row position of current file in all splits data + # Collect all Row ID ranges from files + list_ranges = [] + for file_entries in partitioned_files.values(): + for entry in file_entries: + first_row_id = entry.file.first_row_id + # Range is inclusive [from_, to], so use row_count - 1 + list_ranges.append(Range(first_row_id, first_row_id + entry.file.row_count - 1)) - for split in splits: - # Compute file index map for both data and blob files - # Blob files share the same row position tracking as data files - shard_file_idx_map = self._compute_slice_split_file_idx_map( - plan_start_pos, plan_end_pos, split, file_end_pos - ) - file_end_pos = shard_file_idx_map[self.NEXT_POS_KEY] - del shard_file_idx_map[self.NEXT_POS_KEY] - - if shard_file_idx_map: - sliced_splits.append(SlicedSplit(split, shard_file_idx_map)) - else: - sliced_splits.append(split) + # Merge overlapping ranges + sorted_ranges = Range.sort_and_merge_overlap(list_ranges, True, False) + + # Calculate the Row ID range for this slice + start_range, end_range = self._divide_ranges_by_position(sorted_ranges) + if start_range is None or end_range is None: + return [] - return sliced_splits + # Return the range for this slice + return [Range(start_range.from_, end_range.to)] + + def _divide_ranges_by_position(self, sorted_ranges: List[Range]) -> Tuple[Optional[Range], Optional[Range]]: + """ + Divide ranges by position (start_pos, end_pos) to get the Row ID range for this slice. + """ + if not sorted_ranges: + return None, None - def _filter_by_row_range( + total_row_count = sum(r.count() for r in sorted_ranges) + start_pos = self.start_pos_of_this_subtask + end_pos = self.end_pos_of_this_subtask + + if start_pos >= total_row_count: + return None, None + + # Find the start Row ID + current_pos = 0 + start_row_id = None + end_row_id = None + + for r in sorted_ranges: + range_end_pos = current_pos + r.count() + + # Find start Row ID + if start_row_id is None and start_pos < range_end_pos: + offset = start_pos - current_pos + start_row_id = r.from_ + offset + + # Find end Row ID + if end_pos <= range_end_pos: + offset = end_pos - current_pos + end_row_id = r.from_ + offset - 1 # -1 because end_pos is exclusive + break + + current_pos = range_end_pos + + if start_row_id is None: + return None, None + if end_row_id is None: + end_row_id = sorted_ranges[-1].to + + return Range(start_row_id, start_row_id), Range(end_row_id, end_row_id) + + def _filter_files_by_row_ranges( self, partitioned_files: defaultdict, - start_pos: int, - end_pos: int - ) -> tuple: + row_ranges: List[Range] + ) -> defaultdict: """ - Filter file entries by row range for data evolution tables. + Filter files by Row ID ranges. Keep files that overlap with the given ranges. """ - plan_start_pos = 0 - plan_end_pos = 0 - entry_end_pos = 0 # end row position of current file in all data - splits_start_pos = 0 filtered_partitioned_files = defaultdict(list) - # Iterate through all file entries to find files that overlap with current shard range for key, file_entries in partitioned_files.items(): filtered_entries = [] - blob_added = False # If it is true, all blobs corresponding to this data file are added + blob_added = False + for entry in file_entries: if self._is_blob_file(entry.file.file_name): if blob_added: filtered_entries.append(entry) continue blob_added = False - entry_begin_pos = entry_end_pos # Starting row position of current file in all data - entry_end_pos += entry.file.row_count # Update to row position after current file - # If current file is completely after shard range, stop iteration - if entry_begin_pos >= end_pos: - break - # If current file is completely before shard range, skip it - if entry_end_pos <= start_pos: + first_row_id = entry.file.first_row_id + if first_row_id is None: + # Files without first_row_id are kept + filtered_entries.append(entry) continue - if entry_begin_pos <= start_pos < entry_end_pos: - splits_start_pos = entry_begin_pos - plan_start_pos = start_pos - entry_begin_pos - # If shard end position is within current file, record relative end position - if entry_begin_pos < end_pos <= entry_end_pos: - plan_end_pos = end_pos - splits_start_pos - # Add files that overlap with shard range to result - filtered_entries.append(entry) - blob_added = True + + file_range = Range(first_row_id, first_row_id + entry.file.row_count - 1) + + # Check if file overlaps with any of the row ranges + overlaps = False + for r in row_ranges: + if Range.and_([file_range], [r]): + overlaps = True + break + + if overlaps: + filtered_entries.append(entry) + blob_added = True + if filtered_entries: filtered_partitioned_files[key] = filtered_entries - return filtered_partitioned_files, plan_start_pos, plan_end_pos + return filtered_partitioned_files def _filter_by_shard(self, partitioned_files: defaultdict, sub_task_id: int, total_tasks: int) -> defaultdict: list_ranges = [] @@ -413,7 +452,7 @@ def _compute_slice_split_file_idx_map( shard_file_idx_map[self.NEXT_POS_KEY] = next_pos return shard_file_idx_map - def _wrap_to_indexed_splits(self, splits: List[Split]) -> List[Split]: + def _wrap_to_indexed_splits(self, splits: List[Split], row_ranges: List[Range]) -> List[Split]: """ Wrap splits with IndexedSplit for row range filtering. """ @@ -438,7 +477,7 @@ def _wrap_to_indexed_splits(self, splits: List[Split]) -> List[Split]: file_ranges = Range.merge_sorted_as_possible(file_ranges) # Intersect with row_ranges from global index - expected = Range.and_(file_ranges, self.row_ranges) + expected = Range.and_(file_ranges, row_ranges) if not expected: # No intersection, skip this split From bf884851a75e0d2058e22864329d25cf3d2dbe07 Mon Sep 17 00:00:00 2001 From: JingsongLi Date: Tue, 3 Feb 2026 21:40:44 +0800 Subject: [PATCH 2/4] fix --- .../scanner/data_evolution_split_generator.py | 22 +++---------------- 1 file changed, 3 insertions(+), 19 deletions(-) diff --git a/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py b/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py index a6a16d1160c1..6d396286355d 100644 --- a/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py +++ b/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py @@ -236,11 +236,8 @@ def _divide_ranges_by_position(self, sorted_ranges: List[Range]) -> Tuple[Option return Range(start_row_id, start_row_id), Range(end_row_id, end_row_id) - def _filter_files_by_row_ranges( - self, - partitioned_files: defaultdict, - row_ranges: List[Range] - ) -> defaultdict: + @staticmethod + def _filter_files_by_row_ranges(partitioned_files: defaultdict, row_ranges: List[Range]) -> defaultdict: """ Filter files by Row ID ranges. Keep files that overlap with the given ranges. """ @@ -248,33 +245,20 @@ def _filter_files_by_row_ranges( for key, file_entries in partitioned_files.items(): filtered_entries = [] - blob_added = False for entry in file_entries: - if self._is_blob_file(entry.file.file_name): - if blob_added: - filtered_entries.append(entry) - continue - blob_added = False - first_row_id = entry.file.first_row_id - if first_row_id is None: - # Files without first_row_id are kept - filtered_entries.append(entry) - continue - file_range = Range(first_row_id, first_row_id + entry.file.row_count - 1) # Check if file overlaps with any of the row ranges overlaps = False for r in row_ranges: - if Range.and_([file_range], [r]): + if r.overlaps(file_range): overlaps = True break if overlaps: filtered_entries.append(entry) - blob_added = True if filtered_entries: filtered_partitioned_files[key] = filtered_entries From 542c44b589b5aff363eada614177dde7194df735 Mon Sep 17 00:00:00 2001 From: JingsongLi Date: Tue, 3 Feb 2026 22:45:12 +0800 Subject: [PATCH 3/4] fix --- paimon-python/pypaimon/read/split_read.py | 27 ++++++----------------- 1 file changed, 7 insertions(+), 20 deletions(-) diff --git a/paimon-python/pypaimon/read/split_read.py b/paimon-python/pypaimon/read/split_read.py index e9b2f398774c..8bc02728bdb9 100644 --- a/paimon-python/pypaimon/read/split_read.py +++ b/paimon-python/pypaimon/read/split_read.py @@ -592,27 +592,14 @@ def _create_union_reader(self, need_merge_files: List[DataFileMeta]) -> RecordRe def _create_file_reader(self, file: DataFileMeta, read_fields: [str]) -> Optional[RecordReader]: """Create a file reader for a single file.""" - def create_record_reader(): - record_reader = self.file_reader_supplier( - file=file, - for_merge_read=False, - read_fields=read_fields, - row_tracking_enabled=True) - if self.row_ranges is not None: - record_reader = RowIdFilterRecordBatchReader(record_reader, file.first_row_id, self.row_ranges) + record_reader = self.file_reader_supplier( + file=file, + for_merge_read=False, + read_fields=read_fields, + row_tracking_enabled=True) + if self.row_ranges is None: return record_reader - - shard_file_idx_map = ( - self.split.shard_file_idx_map() if isinstance(self.split, SlicedSplit) else {} - ) - if file.file_name in shard_file_idx_map: - (begin_pos, end_pos) = shard_file_idx_map[file.file_name] - if (begin_pos, end_pos) == (-1, -1): - return None - else: - return ShardBatchReader(create_record_reader(), begin_pos, end_pos) - else: - return create_record_reader() + return RowIdFilterRecordBatchReader(record_reader, file.first_row_id, self.row_ranges) def _split_field_bunches(self, need_merge_files: List[DataFileMeta]) -> List[FieldBunch]: """Split files into field bunches.""" From 80af2aea178546d51b3116e559a909946d5d8fa1 Mon Sep 17 00:00:00 2001 From: JingsongLi Date: Wed, 4 Feb 2026 00:07:55 +0800 Subject: [PATCH 4/4] fix --- .../read/reader/iface/record_batch_reader.py | 9 ++++++++ paimon-python/pypaimon/read/split_read.py | 22 ++++++++++++------- 2 files changed, 23 insertions(+), 8 deletions(-) diff --git a/paimon-python/pypaimon/read/reader/iface/record_batch_reader.py b/paimon-python/pypaimon/read/reader/iface/record_batch_reader.py index ec3a1bc424dc..34a1f5dbbcf6 100644 --- a/paimon-python/pypaimon/read/reader/iface/record_batch_reader.py +++ b/paimon-python/pypaimon/read/reader/iface/record_batch_reader.py @@ -126,3 +126,12 @@ def replace_iterator(self, iterator: Iterator[tuple]) -> Optional[RecordIterator def __next__(self): return self.next() + + +class EmptyRecordBatchReader(RecordBatchReader): + + def read_arrow_batch(self) -> Optional[RecordBatch]: + return None + + def close(self): + return None diff --git a/paimon-python/pypaimon/read/split_read.py b/paimon-python/pypaimon/read/split_read.py index 8bc02728bdb9..2088310aa4c0 100644 --- a/paimon-python/pypaimon/read/split_read.py +++ b/paimon-python/pypaimon/read/split_read.py @@ -25,6 +25,7 @@ from pypaimon.common.predicate import Predicate from pypaimon.deletionvectors import ApplyDeletionVectorReader from pypaimon.deletionvectors.deletion_vector import DeletionVector +from pypaimon.globalindex import Range from pypaimon.manifest.schema.data_file_meta import DataFileMeta from pypaimon.read.interval_partition import IntervalPartition, SortedRun from pypaimon.read.partition_info import PartitionInfo @@ -44,7 +45,7 @@ from pypaimon.read.reader.format_lance_reader import FormatLanceReader from pypaimon.read.reader.format_pyarrow_reader import FormatPyArrowReader from pypaimon.read.reader.iface.record_batch_reader import (RecordBatchReader, - RowPositionReader) + RowPositionReader, EmptyRecordBatchReader) from pypaimon.read.reader.iface.record_reader import RecordReader from pypaimon.read.reader.key_value_unwrap_reader import \ KeyValueUnwrapRecordReader @@ -592,14 +593,19 @@ def _create_union_reader(self, need_merge_files: List[DataFileMeta]) -> RecordRe def _create_file_reader(self, file: DataFileMeta, read_fields: [str]) -> Optional[RecordReader]: """Create a file reader for a single file.""" - record_reader = self.file_reader_supplier( - file=file, - for_merge_read=False, - read_fields=read_fields, - row_tracking_enabled=True) + def create_record_reader(): + return self.file_reader_supplier( + file=file, + for_merge_read=False, + read_fields=read_fields, + row_tracking_enabled=True) if self.row_ranges is None: - return record_reader - return RowIdFilterRecordBatchReader(record_reader, file.first_row_id, self.row_ranges) + return create_record_reader() + file_range = Range(file.first_row_id, file.first_row_id + file.row_count - 1) + row_ranges = Range.and_(self.row_ranges, [file_range]) + if len(row_ranges) == 0: + return EmptyRecordBatchReader() + return RowIdFilterRecordBatchReader(create_record_reader(), file.first_row_id, row_ranges) def _split_field_bunches(self, need_merge_files: List[DataFileMeta]) -> List[FieldBunch]: """Split files into field bunches."""