Skip to content

Conversation

@mbutrovich
Copy link
Contributor

@mbutrovich mbutrovich commented Jan 30, 2026

Continues efforts from #3295, #3297, and #3301, building on #3295's diff. I asked Claude to summarize the diff and draft the PR description for me:

Which issue does this PR close?

Closes #.

Rationale for this change

This PR continues the work from #3295, addressing two problems with Iceberg native scan:

  1. Serialization overhead: All Iceberg FileScanTask data was serialized into the protobuf plan at planning time and distributed to every executor. For tables with many files, this creates significant overhead since every task receives the full plan containing all partitions' tasks.

  2. No DPP support: Dynamic Partition Pruning couldn't work because partition data was serialized at planning time, before DPP subqueries execute. Now that we'e deferring some parts of plan generation, that opens the door for DPP.

What changes are included in this PR?

Partition data distribution (solves plan bloat problem)

  • Planning now creates a placeholder IcebergScan with only metadata_location (for matching)
  • serializePartitions() runs at execution time after DPP resolves, producing:
  • commonData: shared across partitions (catalog properties, schema, pools)
  • perPartitionData: array of serialized FileScanTask data, one per partition
  • CometExecPartition carries only its partition's data; PlanDataInjector injects it into the operator tree at execution time
  • Cache the commonData parsing to eliminate repeated allocations for PartitionData and PartitionValue

Unified RDD (replaces ZippedPartitionsRDD)

  • ZippedPartitionsRDD is removed; CometExecRDD now handles all cases:
  • With inputs: zips them via inputPartitions (equivalent to ZippedPartitionsBaseRDD)
  • Without inputs: uses numPartitions to create partitions (standalone scan case)
  • This is equivalent because:
  • getPartitions creates CometExecPartition with inputPartitions array from each input RDD at that index
  • compute iterates each input RDD with its corresponding partition (same as ZippedPartitionsRDD.compute)
  • getDependencies returns OneToOneDependency for each input (same dependency semantics)
  • getPreferredLocations uses intersection-then-union logic (duplicated from ZippedPartitionsBaseRDD)

Dynamic Partition Pruning support

  • CometIcebergNativeScanExec.doPrepare() triggers DPP subquery preparation
  • serializedPartitionData is lazy: waits for DPP values, then serializes only filtered partitions
  • Handles SubqueryAdaptiveBroadcastExec via reflection to set InSubqueryExec.result
  • Added Spark version shims for SubqueryAdaptiveBroadcastExec.index vs .indices (SPARK-46946)

CometNativeExec iterator creation

  • Moved from inline closure to CometExecRDD.compute()
  • findAllPlanData traverses the plan tree to collect planning data, stopping at stage boundaries
  • collectSubqueries gathers ScalarSubquery expressions for registration with CometScalarSubquery
  • Both are equivalent to the previous behavior but structured for the new RDD

How are these changes tested?

  • Extended CometIcebergNativeSuite with DPP tests:
  • runtime filtering - join with dynamic partition pruning: verifies DPP prunes partitions (3 → 1)
  • runtime filtering - multiple DPP filters on two partition columns: tests multi-column DPP
  • CI runs full test suite including Iceberg Java TestRuntimeFiltering tests

@codecov-commenter
Copy link

codecov-commenter commented Jan 30, 2026

Codecov Report

❌ Patch coverage is 81.06796% with 78 lines in your changes missing coverage. Please review.
✅ Project coverage is 60.17%. Comparing base (f09f8af) to head (9cc541a).
⚠️ Report is 918 commits behind head on main.

Files with missing lines Patch % Lines
...e/spark/sql/comet/CometIcebergNativeScanExec.scala 61.53% 10 Missing and 20 partials ⚠️
...n/scala/org/apache/comet/rules/CometScanRule.scala 21.73% 11 Missing and 7 partials ⚠️
.../comet/serde/operator/CometIcebergNativeScan.scala 89.79% 13 Missing and 2 partials ⚠️
...n/scala/org/apache/spark/sql/comet/operators.scala 87.67% 2 Missing and 7 partials ⚠️
...cala/org/apache/spark/sql/comet/CometExecRDD.scala 93.33% 3 Missing and 3 partials ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##               main    #3349      +/-   ##
============================================
+ Coverage     56.12%   60.17%   +4.05%     
- Complexity      976     1491     +515     
============================================
  Files           119      174      +55     
  Lines         11743    16348    +4605     
  Branches       2251     2713     +462     
============================================
+ Hits           6591     9838    +3247     
- Misses         4012     5129    +1117     
- Partials       1140     1381     +241     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@mbutrovich mbutrovich changed the title feat: [WIP] Per-Partition Plan Building for Native Iceberg Scans with DPP [iceberg-rust] feat: Per-Partition Plan Building for Native Iceberg Scans with DPP [iceberg-rust] Jan 30, 2026
@mbutrovich mbutrovich changed the title feat: Per-Partition Plan Building for Native Iceberg Scans with DPP [iceberg-rust] feat: [iceberg-rust] Iceberg native scan does per-partition task serialization with DPP Jan 30, 2026

// Include generated modules from .proto files.
#[allow(missing_docs)]
#[allow(clippy::large_enum_variant)]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should probably have a follow up issue to revisit the choice to ignore this warning

1. findAllIcebergSplitData() collected perPartitionByLocation (all partitions' data)
2. This map was captured in the createCometExecIter closure
3. ZippedPartitionsRDD serialized that closure to every task
4. Each task received ALL partitions' data (925 bytes to both tasks)

Instead we now use CometIcebergSplitRDD which puts per-partition data in Partition objects.
@mbutrovich
Copy link
Contributor Author

Even though CI is green, gonna mark this as draft since I might still keep refactoring a bit to clean things up and don't want it merged early.

@mbutrovich mbutrovich marked this pull request as draft January 30, 2026 21:28
… columns), fixes TestRuntimeFiltering Iceberg Java tests with column renames.

CometIcebergSplitRDD registers subqueries so native code can look them up, fixes TestViews Iceberg Java tests with rewritten filter.
… assertion at index lookup, and defensive fallback if future Spark behavior changes.
# Conflicts:
#	.github/workflows/iceberg_spark_test.yml
@mbutrovich mbutrovich changed the title feat: [iceberg-rust] Iceberg native scan does per-partition task serialization with DPP feat: [iceberg] Iceberg native scan does per-partition task serialization with DPP Feb 1, 2026
@mbutrovich mbutrovich changed the title feat: [iceberg] Iceberg native scan does per-partition task serialization with DPP feat: [iceberg] Refactor CometExecRDD to support per-partition plan data, Iceberg native scan with DPP Feb 1, 2026
@mbutrovich mbutrovich changed the title feat: [iceberg] Refactor CometExecRDD to support per-partition plan data, Iceberg native scan with DPP feat: [iceberg] CometExecRDD supports per-partition plan data, Iceberg native scan with DPP Feb 1, 2026
@mbutrovich mbutrovich marked this pull request as ready for review February 1, 2026 16:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants