Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -126,3 +126,12 @@ def replace_iterator(self, iterator: Iterator[tuple]) -> Optional[RecordIterator

def __next__(self):
return self.next()


class EmptyRecordBatchReader(RecordBatchReader):

def read_arrow_batch(self) -> Optional[RecordBatch]:
return None

def close(self):
return None
167 changes: 95 additions & 72 deletions paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
from pypaimon.manifest.schema.manifest_entry import ManifestEntry
from pypaimon.read.scanner.split_generator import AbstractSplitGenerator
from pypaimon.read.split import DataSplit, Split
from pypaimon.read.sliced_split import SlicedSplit


class DataEvolutionSplitGenerator(AbstractSplitGenerator):
Expand Down Expand Up @@ -65,17 +64,14 @@ def sort_key(manifest_entry: ManifestEntry) -> tuple:
for entry in sorted_entries:
partitioned_files[(tuple(entry.partition.values), entry.bucket)].append(entry)

plan_start_pos = 0
plan_end_pos = 0
slice_row_ranges = None # Row ID ranges for slice-based filtering

if self.start_pos_of_this_subtask is not None:
# shard data range: [plan_start_pos, plan_end_pos)
partitioned_files, plan_start_pos, plan_end_pos = \
self._filter_by_row_range(
partitioned_files,
self.start_pos_of_this_subtask,
self.end_pos_of_this_subtask
)
# Calculate Row ID range for slice-based filtering
slice_row_ranges = self._calculate_slice_row_ranges(partitioned_files)
if slice_row_ranges:
# Filter files by Row ID range
partitioned_files = self._filter_files_by_row_ranges(partitioned_files, slice_row_ranges)
elif self.idx_of_this_subtask is not None:
partitioned_files = self._filter_by_shard(
partitioned_files, self.idx_of_this_subtask, self.number_of_para_subtasks
Expand Down Expand Up @@ -109,11 +105,15 @@ def weight_func(file_list: List[DataFileMeta]) -> int:
flatten_packed_files, packed_files, sorted_entries_list
)

if self.start_pos_of_this_subtask is not None:
splits = self._wrap_to_sliced_splits(splits, plan_start_pos, plan_end_pos)
# Wrap splits with IndexedSplit if row_ranges is provided
if self.row_ranges:
splits = self._wrap_to_indexed_splits(splits)
# merge slice_row_ranges and self.row_ranges
if slice_row_ranges is None:
slice_row_ranges = self.row_ranges
elif self.row_ranges is not None:
slice_row_ranges = Range.and_(slice_row_ranges, self.row_ranges)

# Wrap splits with IndexedSplit for slice-based filtering or row_ranges
if slice_row_ranges:
splits = self._wrap_to_indexed_splits(splits, slice_row_ranges)

return splits

Expand Down Expand Up @@ -171,76 +171,99 @@ def _build_split_from_pack_for_data_evolution(
splits.append(split)
return splits

def _wrap_to_sliced_splits(self, splits: List[Split], plan_start_pos: int, plan_end_pos: int) -> List[Split]:
def _calculate_slice_row_ranges(self, partitioned_files: defaultdict) -> List[Range]:
"""
Wrap splits with SlicedSplit to add file-level slicing information.
Calculate Row ID ranges for slice-based filtering based on start_pos and end_pos.
"""
sliced_splits = []
file_end_pos = 0 # end row position of current file in all splits data
# Collect all Row ID ranges from files
list_ranges = []
for file_entries in partitioned_files.values():
for entry in file_entries:
first_row_id = entry.file.first_row_id
# Range is inclusive [from_, to], so use row_count - 1
list_ranges.append(Range(first_row_id, first_row_id + entry.file.row_count - 1))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

blob is included in the entry here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does a blob have to do with it?

Copy link
Contributor

@XiaoHongbo-Hope XiaoHongbo-Hope Feb 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does a blob have to do with it?

It should be ok, I misunderstood.

for split in splits:
# Compute file index map for both data and blob files
# Blob files share the same row position tracking as data files
shard_file_idx_map = self._compute_slice_split_file_idx_map(
plan_start_pos, plan_end_pos, split, file_end_pos
)
file_end_pos = shard_file_idx_map[self.NEXT_POS_KEY]
del shard_file_idx_map[self.NEXT_POS_KEY]

if shard_file_idx_map:
sliced_splits.append(SlicedSplit(split, shard_file_idx_map))
else:
sliced_splits.append(split)
# Merge overlapping ranges
sorted_ranges = Range.sort_and_merge_overlap(list_ranges, True, False)

return sliced_splits
# Calculate the Row ID range for this slice
start_range, end_range = self._divide_ranges_by_position(sorted_ranges)
if start_range is None or end_range is None:
return []

def _filter_by_row_range(
self,
partitioned_files: defaultdict,
start_pos: int,
end_pos: int
) -> tuple:
# Return the range for this slice
return [Range(start_range.from_, end_range.to)]

def _divide_ranges_by_position(self, sorted_ranges: List[Range]) -> Tuple[Optional[Range], Optional[Range]]:
"""
Filter file entries by row range for data evolution tables.
Divide ranges by position (start_pos, end_pos) to get the Row ID range for this slice.
"""
if not sorted_ranges:
return None, None

total_row_count = sum(r.count() for r in sorted_ranges)
start_pos = self.start_pos_of_this_subtask
end_pos = self.end_pos_of_this_subtask

if start_pos >= total_row_count:
return None, None

# Find the start Row ID
current_pos = 0
start_row_id = None
end_row_id = None

for r in sorted_ranges:
range_end_pos = current_pos + r.count()

# Find start Row ID
if start_row_id is None and start_pos < range_end_pos:
offset = start_pos - current_pos
start_row_id = r.from_ + offset

# Find end Row ID
if end_pos <= range_end_pos:
offset = end_pos - current_pos
end_row_id = r.from_ + offset - 1 # -1 because end_pos is exclusive
break

current_pos = range_end_pos

if start_row_id is None:
return None, None
if end_row_id is None:
end_row_id = sorted_ranges[-1].to

return Range(start_row_id, start_row_id), Range(end_row_id, end_row_id)

@staticmethod
def _filter_files_by_row_ranges(partitioned_files: defaultdict, row_ranges: List[Range]) -> defaultdict:
"""
Filter files by Row ID ranges. Keep files that overlap with the given ranges.
"""
plan_start_pos = 0
plan_end_pos = 0
entry_end_pos = 0 # end row position of current file in all data
splits_start_pos = 0
filtered_partitioned_files = defaultdict(list)

# Iterate through all file entries to find files that overlap with current shard range
for key, file_entries in partitioned_files.items():
filtered_entries = []
blob_added = False # If it is true, all blobs corresponding to this data file are added

for entry in file_entries:
if self._is_blob_file(entry.file.file_name):
if blob_added:
filtered_entries.append(entry)
continue
blob_added = False
entry_begin_pos = entry_end_pos # Starting row position of current file in all data
entry_end_pos += entry.file.row_count # Update to row position after current file

# If current file is completely after shard range, stop iteration
if entry_begin_pos >= end_pos:
break
# If current file is completely before shard range, skip it
if entry_end_pos <= start_pos:
continue
if entry_begin_pos <= start_pos < entry_end_pos:
splits_start_pos = entry_begin_pos
plan_start_pos = start_pos - entry_begin_pos
# If shard end position is within current file, record relative end position
if entry_begin_pos < end_pos <= entry_end_pos:
plan_end_pos = end_pos - splits_start_pos
# Add files that overlap with shard range to result
filtered_entries.append(entry)
blob_added = True
first_row_id = entry.file.first_row_id
file_range = Range(first_row_id, first_row_id + entry.file.row_count - 1)

# Check if file overlaps with any of the row ranges
overlaps = False
for r in row_ranges:
if r.overlaps(file_range):
overlaps = True
break

if overlaps:
filtered_entries.append(entry)

if filtered_entries:
filtered_partitioned_files[key] = filtered_entries

return filtered_partitioned_files, plan_start_pos, plan_end_pos
return filtered_partitioned_files

def _filter_by_shard(self, partitioned_files: defaultdict, sub_task_id: int, total_tasks: int) -> defaultdict:
list_ranges = []
Expand Down Expand Up @@ -413,7 +436,7 @@ def _compute_slice_split_file_idx_map(
shard_file_idx_map[self.NEXT_POS_KEY] = next_pos
return shard_file_idx_map

def _wrap_to_indexed_splits(self, splits: List[Split]) -> List[Split]:
def _wrap_to_indexed_splits(self, splits: List[Split], row_ranges: List[Range]) -> List[Split]:
"""
Wrap splits with IndexedSplit for row range filtering.
"""
Expand All @@ -438,7 +461,7 @@ def _wrap_to_indexed_splits(self, splits: List[Split]) -> List[Split]:
file_ranges = Range.merge_sorted_as_possible(file_ranges)

# Intersect with row_ranges from global index
expected = Range.and_(file_ranges, self.row_ranges)
expected = Range.and_(file_ranges, row_ranges)

if not expected:
# No intersection, skip this split
Expand Down
25 changes: 9 additions & 16 deletions paimon-python/pypaimon/read/split_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from pypaimon.common.predicate import Predicate
from pypaimon.deletionvectors import ApplyDeletionVectorReader
from pypaimon.deletionvectors.deletion_vector import DeletionVector
from pypaimon.globalindex import Range
from pypaimon.manifest.schema.data_file_meta import DataFileMeta
from pypaimon.read.interval_partition import IntervalPartition, SortedRun
from pypaimon.read.partition_info import PartitionInfo
Expand All @@ -44,7 +45,7 @@
from pypaimon.read.reader.format_lance_reader import FormatLanceReader
from pypaimon.read.reader.format_pyarrow_reader import FormatPyArrowReader
from pypaimon.read.reader.iface.record_batch_reader import (RecordBatchReader,
RowPositionReader)
RowPositionReader, EmptyRecordBatchReader)
from pypaimon.read.reader.iface.record_reader import RecordReader
from pypaimon.read.reader.key_value_unwrap_reader import \
KeyValueUnwrapRecordReader
Expand Down Expand Up @@ -593,26 +594,18 @@ def _create_union_reader(self, need_merge_files: List[DataFileMeta]) -> RecordRe
def _create_file_reader(self, file: DataFileMeta, read_fields: [str]) -> Optional[RecordReader]:
"""Create a file reader for a single file."""
def create_record_reader():
record_reader = self.file_reader_supplier(
return self.file_reader_supplier(
file=file,
for_merge_read=False,
read_fields=read_fields,
row_tracking_enabled=True)
if self.row_ranges is not None:
record_reader = RowIdFilterRecordBatchReader(record_reader, file.first_row_id, self.row_ranges)
return record_reader

shard_file_idx_map = (
self.split.shard_file_idx_map() if isinstance(self.split, SlicedSplit) else {}
)
if file.file_name in shard_file_idx_map:
(begin_pos, end_pos) = shard_file_idx_map[file.file_name]
if (begin_pos, end_pos) == (-1, -1):
return None
else:
return ShardBatchReader(create_record_reader(), begin_pos, end_pos)
else:
if self.row_ranges is None:
return create_record_reader()
file_range = Range(file.first_row_id, file.first_row_id + file.row_count - 1)
row_ranges = Range.and_(self.row_ranges, [file_range])
if len(row_ranges) == 0:
return EmptyRecordBatchReader()
return RowIdFilterRecordBatchReader(create_record_reader(), file.first_row_id, row_ranges)

def _split_field_bunches(self, need_merge_files: List[DataFileMeta]) -> List[FieldBunch]:
"""Split files into field bunches."""
Expand Down