-
Notifications
You must be signed in to change notification settings - Fork 280
feat: [iceberg] CometExecRDD supports per-partition plan data, Iceberg native scan with DPP #3349
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
feat: [iceberg] CometExecRDD supports per-partition plan data, Iceberg native scan with DPP #3349
Conversation
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #3349 +/- ##
============================================
+ Coverage 56.12% 60.17% +4.05%
- Complexity 976 1491 +515
============================================
Files 119 174 +55
Lines 11743 16348 +4605
Branches 2251 2713 +462
============================================
+ Hits 6591 9838 +3247
- Misses 4012 5129 +1117
- Partials 1140 1381 +241 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
…oned columns) and run a representative test.
|
|
||
| // Include generated modules from .proto files. | ||
| #[allow(missing_docs)] | ||
| #[allow(clippy::large_enum_variant)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should probably have a follow up issue to revisit the choice to ignore this warning
spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala
Show resolved
Hide resolved
1. findAllIcebergSplitData() collected perPartitionByLocation (all partitions' data) 2. This map was captured in the createCometExecIter closure 3. ZippedPartitionsRDD serialized that closure to every task 4. Each task received ALL partitions' data (925 bytes to both tasks) Instead we now use CometIcebergSplitRDD which puts per-partition data in Partition objects.
|
Even though CI is green, gonna mark this as draft since I might still keep refactoring a bit to clean things up and don't want it merged early. |
…hat every partition doesn't come over.
… columns), fixes TestRuntimeFiltering Iceberg Java tests with column renames. CometIcebergSplitRDD registers subqueries so native code can look them up, fixes TestViews Iceberg Java tests with rewritten filter.
… assertion at index lookup, and defensive fallback if future Spark behavior changes.
# Conflicts: # .github/workflows/iceberg_spark_test.yml
Continues efforts from #3295, #3297, and #3301, building on #3295's diff. I asked Claude to summarize the diff and draft the PR description for me:
Which issue does this PR close?
Closes #.
Rationale for this change
This PR continues the work from #3295, addressing two problems with Iceberg native scan:
Serialization overhead: All Iceberg
FileScanTaskdata was serialized into the protobuf plan at planning time and distributed to every executor. For tables with many files, this creates significant overhead since every task receives the full plan containing all partitions' tasks.No DPP support: Dynamic Partition Pruning couldn't work because partition data was serialized at planning time, before DPP subqueries execute. Now that we'e deferring some parts of plan generation, that opens the door for DPP.
What changes are included in this PR?
Partition data distribution (solves plan bloat problem)
IcebergScanwith onlymetadata_location(for matching)serializePartitions()runs at execution time after DPP resolves, producing:commonData: shared across partitions (catalog properties, schema, pools)perPartitionData: array of serializedFileScanTaskdata, one per partitionCometExecPartitioncarries only its partition's data;PlanDataInjectorinjects it into the operator tree at execution timecommonDataparsing to eliminate repeated allocations forPartitionDataandPartitionValueUnified RDD (replaces ZippedPartitionsRDD)
ZippedPartitionsRDDis removed;CometExecRDDnow handles all cases:inputPartitions(equivalent toZippedPartitionsBaseRDD)numPartitionsto create partitions (standalone scan case)getPartitionscreatesCometExecPartitionwithinputPartitionsarray from each input RDD at that indexcomputeiterates each input RDD with its corresponding partition (same asZippedPartitionsRDD.compute)getDependenciesreturnsOneToOneDependencyfor each input (same dependency semantics)getPreferredLocationsuses intersection-then-union logic (duplicated fromZippedPartitionsBaseRDD)Dynamic Partition Pruning support
CometIcebergNativeScanExec.doPrepare()triggers DPP subquery preparationserializedPartitionDatais lazy: waits for DPP values, then serializes only filtered partitionsSubqueryAdaptiveBroadcastExecvia reflection to setInSubqueryExec.resultSubqueryAdaptiveBroadcastExec.indexvs.indices(SPARK-46946)CometNativeExec iterator creation
CometExecRDD.compute()findAllPlanDatatraverses the plan tree to collect planning data, stopping at stage boundariescollectSubqueriesgathersScalarSubqueryexpressions for registration withCometScalarSubqueryHow are these changes tested?
CometIcebergNativeSuitewith DPP tests:runtime filtering - join with dynamic partition pruning: verifies DPP prunes partitions (3 → 1)runtime filtering - multiple DPP filters on two partition columns: tests multi-column DPPTestRuntimeFilteringtests