[SPARK-56337][SQL] Propagate analyzed sizeInBytes to V2 FileScan estimates#55322
Draft
LuciferYang wants to merge 14 commits intoapache:masterfrom
Draft
[SPARK-56337][SQL] Propagate analyzed sizeInBytes to V2 FileScan estimates#55322LuciferYang wants to merge 14 commits intoapache:masterfrom
LuciferYang wants to merge 14 commits intoapache:masterfrom
Conversation
…Frame API writes and delete FallBackFileSourceV2 Key changes: - FileWrite: added partitionSchema, customPartitionLocations, dynamicPartitionOverwrite, isTruncate; path creation and truncate logic; dynamic partition overwrite via FileCommitProtocol - FileTable: createFileWriteBuilder with SupportsDynamicOverwrite and SupportsTruncate; capabilities now include TRUNCATE and OVERWRITE_DYNAMIC; fileIndex skips file existence checks when userSpecifiedSchema is provided (write path) - All file format writes (Parquet, ORC, CSV, JSON, Text, Avro) use createFileWriteBuilder with partition/truncate/overwrite support - DataFrameWriter.lookupV2Provider: enabled FileDataSourceV2 for non-partitioned Append and Overwrite via df.write.save(path) - DataFrameWriter.insertInto: V1 fallback for file sources (TODO: SPARK-56175) - DataFrameWriter.saveAsTable: V1 fallback for file sources (TODO: SPARK-56230, needs StagingTableCatalog) - DataSourceV2Utils.getTableProvider: V1 fallback for file sources (TODO: SPARK-56175) - Removed FallBackFileSourceV2 rule - V2SessionCatalog.createTable: V1 FileFormat data type validation
…catalog table loading, and gate removal Key changes: - FileTable extends SupportsPartitionManagement with createPartition, dropPartition, listPartitionIdentifiers, partitionSchema - Partition operations sync to catalog metastore (best-effort) - V2SessionCatalog.loadTable returns FileTable instead of V1Table, sets catalogTable and useCatalogFileIndex on FileTable - V2SessionCatalog.getDataSourceOptions includes storage.properties for proper option propagation (header, ORC bloom filter, etc.) - V2SessionCatalog.createTable validates data types via FileTable - FileTable.columns() restores NOT NULL constraints from catalogTable - FileTable.partitioning() falls back to userSpecifiedPartitioning or catalog partition columns - FileTable.fileIndex uses CatalogFileIndex when catalog has registered partitions (custom partition locations) - FileTable.schema checks column name duplication for non-catalog tables only - DataSourceV2Utils.getTableProvider: removed FileDataSourceV2 gate - DataFrameWriter.insertInto: enabled V2 for file sources - DataFrameWriter.saveAsTable: V1 fallback (TODO: SPARK-56230) - ResolveSessionCatalog: V1 fallback for FileTable-backed commands (AnalyzeTable, AnalyzeColumn, TruncateTable, TruncatePartition, ShowPartitions, RecoverPartitions, AddPartitions, RenamePartitions, DropPartitions, SetTableLocation, CREATE TABLE validation, REPLACE TABLE blocking) - FindDataSourceTable: streaming V1 fallback for FileTable (TODO: SPARK-56233) - DataSource.planForWritingFileFormat: graceful V2 handling
Enable bucketed writes for V2 file tables via catalog BucketSpec. Key changes: - FileWrite: add bucketSpec field, use V1WritesUtils.getWriterBucketSpec() instead of hardcoded None - FileTable: createFileWriteBuilder passes catalogTable.bucketSpec to the write pipeline - FileDataSourceV2: getTable uses collect to skip BucketTransform (handled via catalogTable.bucketSpec instead) - FileWriterFactory: use DynamicPartitionDataConcurrentWriter for bucketed writes since V2's RequiresDistributionAndOrdering cannot express hash-based ordering - All 6 format Write/Table classes updated with BucketSpec parameter Note: bucket pruning and bucket join (read-path optimization) are not included in this patch (tracked under SPARK-56231).
Add RepairTableExec to sync filesystem partition directories with catalog metastore for V2 file tables. Key changes: - New RepairTableExec: scans filesystem partitions via FileTable.listPartitionIdentifiers(), compares with catalog, registers missing partitions and drops orphaned entries - DataSourceV2Strategy: route RepairTable and RecoverPartitions for FileTable to new V2 exec node
Implement SupportsOverwriteV2 for V2 file tables to support static partition overwrite (INSERT OVERWRITE TABLE t PARTITION(p=1) SELECT ...). Key changes: - FileTable: replace SupportsTruncate with SupportsOverwriteV2 on WriteBuilder, implement overwrite(predicates) - FileWrite: extend toBatch() to delete only the matching partition directory, ordered by partitionSchema - FileTable.CAPABILITIES: add OVERWRITE_BY_FILTER - All 6 format Write/Table classes: plumb overwritePredicates parameter This is a prerequisite for SPARK-56304 (ifPartitionNotExists).
…EAD) ### What changes were proposed in this pull request? Implements `MicroBatchStream` support for V2 file tables, enabling structured streaming reads through the V2 path instead of falling back to V1 `FileStreamSource`. Key changes: - New `FileMicroBatchStream` class implementing `MicroBatchStream`, `SupportsAdmissionControl`, and `SupportsTriggerAvailableNow` — handles file discovery, offset management, rate limiting, and partition planning - Override `FileScan.toMicroBatchStream()` to return `FileMicroBatchStream` - Add `withFileIndex` method to `FileScan` and all 6 concrete scans for creating batch-specific scans - Add `MICRO_BATCH_READ` to `FileTable.CAPABILITIES` - Update `ResolveDataSource` to allow `FileDataSourceV2` into the V2 streaming path (respects `USE_V1_SOURCE_LIST` for backward compatibility) - Remove the `FileTable` streaming fallback in `FindDataSourceTable` - Reuses V1 infrastructure (`FileStreamSourceLog`, `FileStreamSourceOffset`, `SeenFilesMap`) for checkpoint compatibility ### Why are the changes needed? V2 file tables cannot be fully adopted until streaming reads are supported. Without this, the V1 `FileStreamSource` fallback prevents deprecation of V1 file source code. ### Does this PR introduce _any_ user-facing change? No. By default, `USE_V1_SOURCE_LIST` includes all file formats, so streaming reads still use V1. Users can opt into V2 by clearing the list. Existing checkpoints are compatible. ### How was this patch tested? New `FileStreamV2ReadSuite` with 6 E2E tests. Existing `FileStreamSourceSuite` (76 tests) passes with V1 forced via `USE_V1_SOURCE_LIST`.
…ITE) ### What changes were proposed in this pull request? Implements `StreamingWrite` support for V2 file tables, enabling structured streaming writes through the V2 path instead of falling back to V1 `FileStreamSink`. Key changes: - New `FileStreamingWrite` class implementing `StreamingWrite` — uses `ManifestFileCommitProtocol` for file commit and `FileStreamSinkLog` for metadata tracking - New `FileStreamingWriterFactory` bridging `DataWriterFactory` to `StreamingDataWriterFactory` - Override `FileWrite.toStreaming()` to return `FileStreamingWrite` - Add `STREAMING_WRITE` to `FileTable.CAPABILITIES` - Idempotent `commit(epochId, messages)` — skips already-committed batches - Supports `retention` option for metadata log cleanup (V1 parity) - Checkpoint compatible with V1 `FileStreamSink` (same `_spark_metadata` format) ### Why are the changes needed? V2 file tables cannot be fully adopted until streaming writes are supported. Without this, the V1 `FileStreamSink` fallback prevents deprecation of V1 file source code. Together with SPARK-56232 (streaming read), this completes the streaming support needed for V1 deprecation. ### Does this PR introduce _any_ user-facing change? No. By default, `USE_V1_SOURCE_LIST` includes all file formats, so streaming writes still use V1. Users can opt into V2 by clearing the list. Existing checkpoints are compatible. ### How was this patch tested? New `FileStreamV2WriteSuite` with 4 E2E tests. Existing `FileStreamSinkV1Suite` passes. All 108 streaming file tests pass.
Exposes the V1-compatible `_metadata` struct column (`file_path`, `file_name`, `file_size`, `file_block_start`, `file_block_length`, `file_modification_time`) on V2 file-based tables so that queries like `SELECT _metadata.file_path FROM parquet.`<path>`` work against the V2 scan path instead of forcing a V1 fallback. The wiring is: * `FileTable` implements `SupportsMetadataColumns.metadataColumns()` and returns a single `_metadata` struct column whose fields come from `FileFormat.BASE_METADATA_FIELDS`. Formats may extend `metadataSchemaFields` later to expose additional fields (e.g., Parquet's `row_index`, tracked in SPARK-56371). * `FileScanBuilder.pruneColumns` intercepts the `_metadata` field from the required schema, stores the pruned metadata struct on `requestedMetadataFields`, and keeps it out of `readDataSchema` so the format-specific reader stays unchanged. * `FileScan.readSchema` re-exposes `_metadata` as a trailing struct field when metadata is requested, so `V2ScanRelationPushDown` can rebind the downstream attribute reference back to the scan output. * A new `MetadataAppendingFilePartitionReaderFactory` wraps the format-specific reader factory and appends a single `_metadata` struct value (via `JoinedRow` + an inner `GenericInternalRow`) to each row. Columnar reads are disabled while metadata is requested since `ConstantColumnVector` is scalar and cannot represent a struct column; queries fall back to the row path. * All six concrete scans (Parquet/ORC/CSV/JSON/Text/Avro) take `requestedMetadataFields` as a trailing default-valued case-class parameter and call the new `wrapWithMetadataIfNeeded` helper when constructing their reader factory. Their `ScanBuilder.build()` implementations pass the field through from `FileScanBuilder`. Parquet's generated `row_index` metadata field is intentionally out of scope; follow-up work is tracked in SPARK-56371. Before this change, `_metadata` on a DSv2 file table was unresolvable and the query fell back to the V1 `FileSourceScanExec` path, which is one of the remaining blockers for deprecating the V1 file sources (SPARK-56170). Yes. `_metadata.*` queries now work against the V2 file sources with the same semantics as V1. New `FileMetadataColumnsV2Suite` exercises read and projection paths for Parquet/ORC/JSON/CSV/Text, forcing the V2 path via `useV1SourceList`, and asserts the metadata struct values against the underlying file's `java.io.File` stats. All 16 tests pass.
Adds support for the Parquet-specific generated `row_index` field on the V2
`_metadata` struct, completing V1 metadata-column parity for V2 Parquet tables.
This is the follow-up to SPARK-56335 (constant metadata fields).
The implementation also restores vectorized columnar reads for any V2 file
metadata query (SPARK-56335 had to disable them because `ConstantColumnVector`
cannot represent a struct column; the new `CompositeStructColumnVector` lifts
that restriction).
* `CompositeStructColumnVector` (Java) - a minimal struct-typed `ColumnVector`
that wraps a fixed array of arbitrary child column vectors. Used by the
metadata wrapper to compose `_metadata` columnar batches whose children are
a mix of `ConstantColumnVector` (for constant fields like `file_path`) and
per-row vectors supplied by the format reader (e.g., Parquet's
`_tmp_metadata_row_index`).
* `ParquetTable.metadataSchemaFields` - overrides the V2 `FileTable` extension
point to append `ParquetFileFormat.ROW_INDEX_FIELD`, mirroring V1
`ParquetFileFormat.metadataSchemaFields`.
* `FileScanBuilder.pruneColumns` - now inspects each requested `_metadata`
sub-field. Constant fields continue to flow through `requestedMetadataFields`
unchanged; for generated fields (matched via
`FileSourceGeneratedMetadataStructField`), the corresponding internal column
(e.g., `_tmp_metadata_row_index`) is appended to `requiredSchema` so the
format reader populates it. Internal columns are added with `nullable = true`
so the Parquet reader treats them as synthetic via `missingColumns` /
`ParquetRowIndexUtil` rather than failing the required-column check.
* `FileScan.readSchema` - hides internal columns from the user-visible scan
output. They live inside `readDataSchema` for the format reader, but must not
appear in `readSchema()`: V2's `PushDownUtils.toOutputAttrs` looks each output
column up by name in the relation output and the internal name is not a real
column.
* `MetadataAppendingFilePartitionReaderFactory` - rewritten:
- Row path uses `UnsafeProjection.create` over `BoundReference`s and
`CreateNamedStruct`. Constant metadata values are baked in as `Literal`s
for the split; generated values come from `BoundReference`s into the
base row at the position of the internal column.
- Columnar path (newly enabled) takes the input `ColumnarBatch`, drops the
internal columns from the top-level column array, and appends a
`CompositeStructColumnVector` for `_metadata` whose children are
`ConstantColumnVector`s (constants) and direct references to the format
reader's column vectors (generated). Zero-copy.
- `supportColumnarReads` now delegates to the wrapped factory.
* `wrapWithMetadataIfNeeded` takes the read data schema as a parameter so the
wrapper can compute the visible/internal column split. ParquetScan passes
`effectiveReadDataSchema` (variant pushdown aware); other scans pass their
`readDataSchema`.
`_metadata.row_index` works on V1 Parquet but was unresolved on V2 Parquet
tables, forcing fallback to the V1 path. This blocks deprecating the V1 file
sources (SPARK-56170). With this change, `SELECT _metadata.row_index FROM t`
works against V2 Parquet with the same semantics as V1.
The vectorized restoration also recovers the perf regression SPARK-56335
introduced for plain `_metadata.file_path`-style queries.
Yes:
1. `_metadata.row_index` is now available on V2 Parquet tables.
2. Queries that select any `_metadata.*` columns on V2 file tables now use
vectorized reads when the underlying format supports them, instead of
falling back to the row-based path.
* New `ParquetMetadataRowIndexV2Suite` (8 tests):
- per-row values via vectorized + row-based readers
- row_index resets per file across multiple files
- combined constant + generated metadata fields in one query
- filter on `_metadata.row_index`
- metadata-only projection (no data columns)
- row_index with partitioned table
- EXPLAIN shows row_index in the MetadataColumns entry
* Existing suites still pass: `FileMetadataColumnsV2Suite` (24, SPARK-56335),
`FileMetadataStructSuite` (V1, ~100), `MetadataColumnSuite` (~4). 136 tests
total across these suites.
* Scalastyle: `sql`, `sql/Test`, `avro` clean.
Builds on top of SPARK-56335 (constant metadata column support for V2 file
tables).
…mates ### What changes were proposed in this pull request? Extends SPARK-56176's V2-native ANALYZE TABLE stats propagation pattern by adding `FileTable.SIZE_IN_BYTES_KEY` alongside the existing `NUM_ROWS_KEY`, and teaching `FileScan.estimateStatistics().sizeInBytes()` to prefer the catalog-injected size over the file-listing-based estimate. * `FileTable.mergedOptions` now injects `CatalogTable.stats.sizeInBytes` (when present) into the scan-time options under `SIZE_IN_BYTES_KEY`, alongside the `NUM_ROWS_KEY` injection added by SPARK-56176. * `FileScan` exposes a new `storedSizeInBytes` helper that mirrors `storedNumRows`. `estimateStatistics()` prefers it over the existing file-index-based estimate when present, falling back to the listing-based computation when absent. ### Why are the changes needed? SPARK-56176 introduced `numRows` propagation but did not cover `sizeInBytes` for non-partitioned catalog file source tables: * Partitioned catalog tables that use `useCatalogFileIndex` already see the analyzed `sizeInBytes` because `CatalogFileIndex.sizeInBytes` returns the catalog-supplied value directly. * Non-partitioned catalog tables use `InMemoryFileIndex`, whose `sizeInBytes` is computed from file listing rather than from `CatalogTable.stats`. The result is that `ANALYZE TABLE COMPUTE STATISTICS` on a non-partitioned V2 file catalog table populates `CatalogTable.stats.sizeInBytes` but the V2 scan keeps using the listing-based estimate, so CBO decisions (broadcast join threshold, filter selectivity) silently diverge from V1 and from partitioned V2 tables. This change closes that gap symmetrically with how SPARK-56176 closed the `numRows` gap. ### Does this PR introduce _any_ user-facing change? Yes (CBO-affecting). For non-partitioned V2 file source catalog tables that have analyzed stats, the V2 scan's `sizeInBytes` estimate now matches the analyzed value. Workloads that rely on `ANALYZE TABLE` to tune broadcast thresholds against non-partitioned V2 catalog tables will see the same plan shapes V1 produced. ### How was this patch tested? New `V2FileScanSizeInBytesSuite` (3 tests): * `Analyzed sizeInBytes flows into V2 scan estimate (non-partitioned)` -- populates `CatalogTable.stats` directly via `alterTableStats` (sidesteps the ANALYZE-via-table-properties bridge that requires `HiveExternalCatalog`) and asserts the V2 scan's optimized-plan stats reflect the analyzed value. * `Analyzed sizeInBytes flows into V2 scan estimate (partitioned)` -- guards that the partitioned path keeps producing analyzed stats end to end (no regression vs `CatalogFileIndex.sizeInBytes`). * `Without ANALYZE TABLE, sizeInBytes falls back to file-listing estimate` -- guards the fallback path: when `CatalogTable.stats` is empty, the V2 scan produces a positive listing-based estimate (no zero-or-default regression). Existing suites verified passing: * `FileMetadataColumnsV2Suite` (24 tests, SPARK-56335) * `ParquetMetadataRowIndexV2Suite` (9 tests, SPARK-56371) Total: 36 tests verified passing. Scalastyle clean.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
Extend SPARK-56176's V2-native ANALYZE TABLE stats propagation pattern by
adding
FileTable.SIZE_IN_BYTES_KEYalongside the existingNUM_ROWS_KEY,and teaching
FileScan.estimateStatistics().sizeInBytes()to prefer thecatalog-injected size over the file-listing-based estimate.
FileTable.mergedOptionsnow injectsCatalogTable.stats.sizeInBytes(when present) into the scan-time options under
SIZE_IN_BYTES_KEY,alongside the
NUM_ROWS_KEYinjection added by SPARK-56176.FileScanexposes a newstoredSizeInByteshelper that mirrorsstoredNumRows.estimateStatistics()prefers it over the existingfile-index-based estimate when present, falling back to the listing-based
computation when absent.
Why are the changes needed?
SPARK-56176 introduced
numRowspropagation but did not coversizeInBytesfor non-partitioned catalog file source tables:
useCatalogFileIndexalready see theanalyzed
sizeInBytesbecauseCatalogFileIndex.sizeInBytesreturns thecatalog-supplied value directly.
InMemoryFileIndex, whosesizeInBytesis computed from file listing rather than from
CatalogTable.stats.The result is that
ANALYZE TABLE COMPUTE STATISTICSon a non-partitioned V2file catalog table populates
CatalogTable.stats.sizeInBytesbut the V2 scankeeps using the listing-based estimate, so CBO decisions (broadcast join
threshold, filter selectivity) silently diverge from V1 and from partitioned
V2 tables. This change closes that gap symmetrically with how SPARK-56176
closed the
numRowsgap.Does this PR introduce any user-facing change?
Yes (CBO-affecting). For non-partitioned V2 file source catalog tables that
have analyzed stats, the V2 scan's
sizeInBytesestimate now matches theanalyzed value. Workloads that rely on
ANALYZE TABLEto tune broadcastthresholds against non-partitioned V2 catalog tables will see the same plan
shapes V1 produced.
How was this patch tested?
New
V2FileScanSizeInBytesSuite(3 tests):Analyzed sizeInBytes flows into V2 scan estimate (non-partitioned):populates
CatalogTable.statsdirectly viaalterTableStats(sidestepsthe ANALYZE-via-table-properties bridge that requires
HiveExternalCatalog)and asserts the V2 scan's optimized-plan stats reflect the analyzed value.
Analyzed sizeInBytes flows into V2 scan estimate (partitioned):guards that the partitioned path keeps producing analyzed stats end to end
(no regression vs
CatalogFileIndex.sizeInBytes).Without ANALYZE TABLE, sizeInBytes falls back to file-listing estimate:guards the fallback path: when
CatalogTable.statsis empty, the V2 scanproduces a positive listing-based estimate (no zero-or-default regression).
Existing suites verified passing:
FileMetadataColumnsV2Suite(24 tests, SPARK-56335)ParquetMetadataRowIndexV2Suite(9 tests, SPARK-56371)Total: 36 tests across the stacked-branch suites. Scalastyle clean.
Dependencies
Builds on top of SPARK-56371 (row_index metadata for V2 Parquet).
Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code