Skip to content

Commit 6e70f76

Browse files
Storage-Partitioned Joins (supported in Iceberg, not in Delta)
1 parent 0a990a4 commit 6e70f76

File tree

6 files changed

+84
-7
lines changed

6 files changed

+84
-7
lines changed

docs/SQLConf.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1053,6 +1053,14 @@ Used when `CacheManager` is requested to [cache a structured query](CacheManager
10531053

10541054
Used when [Aggregation](execution-planning-strategies/Aggregation.md) execution planning strategy is executed (and uses `AggUtils` to [create an aggregation physical operator](aggregations/AggUtils.md#createAggregate)).
10551055

1056+
## <span id="V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED"> v2BucketingPartiallyClusteredDistributionEnabled { #v2BucketingPartiallyClusteredDistributionEnabled }
1057+
1058+
[spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled](configuration-properties.md#spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled)
1059+
1060+
## <span id="V2_BUCKETING_PUSH_PART_VALUES_ENABLED"> v2BucketingPushPartValuesEnabled { #v2BucketingPushPartValuesEnabled }
1061+
1062+
[spark.sql.sources.v2.bucketing.pushPartValues.enabled](configuration-properties.md#spark.sql.sources.v2.bucketing.pushPartValues.enabled)
1063+
10561064
## <span id="VARIABLE_SUBSTITUTE_ENABLED"><span id="variableSubstituteEnabled"><span id="spark.sql.variable.substitute"> variableSubstituteEnabled
10571065

10581066
[spark.sql.variable.substitute](configuration-properties.md#spark.sql.variable.substitute)

docs/configuration-properties.md

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1282,7 +1282,7 @@ Used when:
12821282

12831283
**spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled**
12841284

1285-
During a Storage-Partitioned Join, whether to allow input partitions to be partially clustered, when both sides of the join are of `KeyGroupedPartitioning`.
1285+
During a [Storage-Partitioned Join](storage-partitioned-joins/index.md), whether to allow input partitions to be partially clustered, when both sides of the join are of [KeyGroupedPartitioning](connector/KeyGroupedPartitioning.md).
12861286

12871287
Default: `false`
12881288

@@ -1292,6 +1292,14 @@ This is an optimization on skew join and can help to reduce data skewness when c
12921292

12931293
Requires both [spark.sql.sources.v2.bucketing.enabled](#spark.sql.sources.v2.bucketing.enabled) and [spark.sql.sources.v2.bucketing.pushPartValues.enabled](#spark.sql.sources.v2.bucketing.pushPartValues.enabled) to be enabled
12941294

1295+
Use [SQLConf.v2BucketingPartiallyClusteredDistributionEnabled](SQLConf.md#v2BucketingPartiallyClusteredDistributionEnabled) for the current value
1296+
1297+
Used when:
1298+
1299+
* `BatchScanExec` physical operator is requested for the [input RDD](physical-operators/BatchScanExec.md#inputRDD)
1300+
* `DataSourceV2ScanExecBase` physical operator is requested for [groupPartitions](physical-operators/DataSourceV2ScanExecBase.md#groupPartitions)
1301+
* [EnsureRequirements](physical-optimizations/EnsureRequirements.md) physical optimization is executed (to [checkKeyGroupCompatible](physical-optimizations/EnsureRequirements.md#checkKeyGroupCompatible))
1302+
12951303
### <span id="V2_BUCKETING_PUSH_PART_VALUES_ENABLED"> v2.bucketing.pushPartValues.enabled { #spark.sql.sources.v2.bucketing.pushPartValues.enabled }
12961304

12971305
**spark.sql.sources.v2.bucketing.pushPartValues.enabled**
@@ -1303,6 +1311,14 @@ Default: `false`
13031311
When enabled, if both sides of a join are of `KeyGroupedPartitioning` and if they share compatible partition keys, even if they don't have the exact same partition values, Spark will calculate a superset of partition values and pushdown that info to scan nodes, which will use empty partitions for the missing partition values on either side.
13041312
This could help to eliminate unnecessary shuffles.
13051313

1314+
Use [SQLConf.v2BucketingPushPartValuesEnabled](SQLConf.md#v2BucketingPushPartValuesEnabled) for the current value
1315+
1316+
Used when:
1317+
1318+
* `DataSourceV2ScanExecBase` physical operator is requested to [groupPartitions](physical-operators/DataSourceV2ScanExecBase.md#groupPartitions)
1319+
* `BatchScanExec` physical operator is requested for the [inputRDD](physical-operators/BatchScanExec.md#inputRDD)
1320+
* `EnsureRequirements` physical optimization is requested to [checkKeyGroupCompatible](physical-optimizations/EnsureRequirements.md#checkKeyGroupCompatible)
1321+
13061322
## <span id="spark.sql.objectHashAggregate.sortBased.fallbackThreshold"> spark.sql.objectHashAggregate.sortBased.fallbackThreshold
13071323

13081324
**(internal)** The number of entires in an in-memory hash map (to store aggregation buffers per grouping keys) before [ObjectHashAggregateExec](physical-operators/ObjectHashAggregateExec.md) ([ObjectAggregationIterator](aggregations/ObjectAggregationIterator.md#processInputs), precisely) falls back to sort-based aggregation

docs/physical-operators/SortMergeJoinExec.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
---
2+
title: SortMergeJoinExec
3+
---
4+
15
# SortMergeJoinExec Physical Operator
26

37
`SortMergeJoinExec` is a [shuffle-based join physical operator](ShuffledJoin.md) for [sort-merge join](#doExecute) (with the [left join keys](#leftKeys) being [orderable](../expressions/RowOrdering.md#isorderable)).

docs/physical-optimizations/EnsureRequirements.md

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,28 @@ ensureDistributionAndOrdering(
8383

8484
`ensureDistributionAndOrdering` is...FIXME
8585

86+
### checkKeyGroupCompatible { #checkKeyGroupCompatible }
87+
88+
```scala
89+
checkKeyGroupCompatible(
90+
left: SparkPlan,
91+
right: SparkPlan,
92+
joinType: JoinType,
93+
requiredChildDistribution: Seq[Distribution]): Option[Seq[SparkPlan]]
94+
checkKeyGroupCompatible(
95+
parent: SparkPlan,
96+
left: SparkPlan,
97+
right: SparkPlan,
98+
requiredChildDistribution: Seq[Distribution]): Option[Seq[SparkPlan]] // (1)!
99+
```
100+
101+
1. Uses `JoinType` of either [SortMergeJoinExec](../physical-operators/SortMergeJoinExec.md) or [ShuffledHashJoinExec](../physical-operators/ShuffledHashJoinExec.md) physical operator
102+
103+
!!! note
104+
Only [SortMergeJoinExec](../physical-operators/SortMergeJoinExec.md) and [ShuffledHashJoinExec](../physical-operators/ShuffledHashJoinExec.md) physical operators are considered.
105+
106+
`checkKeyGroupCompatible`...FIXME
107+
86108
## OptimizeSkewedJoin { #OptimizeSkewedJoin }
87109

88110
`EnsureRequirements` is used to create a [OptimizeSkewedJoin](OptimizeSkewedJoin.md) physical optimization.
Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,39 @@
11
# Storage-Partitioned Joins
22

3-
**Storage-Partitioned Joins** (_SPJ_) are a new type of [join](../joins.md) in Spark SQL that use the existing storage layout for a partitioned join to avoid expensive shuffles (similarly to [Bucketing](../bucketing/index.md)).
3+
**Storage-Partitioned Join** (_SPJ_) is a new type of [join](../joins.md) in Spark SQL that uses the existing storage layout for a partitioned join to avoid expensive shuffles (similarly to [Bucketing](../bucketing/index.md)).
44

55
!!! note
66
Storage-Partitioned Joins feature was added in Apache Spark 3.3.0 ([\[SPARK-37375\] Umbrella: Storage Partitioned Join (SPJ)]({{ spark.jira }}/SPARK-37375)).
77

8-
Storage-Partitioned Join is meant mainly, if not exclusively, for [Spark SQL connectors](../connector/index.md) (_v2 data sources_).
8+
Storage-Partitioned Join is based on [KeyGroupedPartitioning](../connector/KeyGroupedPartitioning.md) to determine partitions.
9+
10+
Out of the available built-in [DataSourceV2ScanExecBase](../physical-operators/DataSourceV2ScanExecBase.md) physical operators, only [BatchScanExec](../physical-operators/BatchScanExec.md) supports storage-partitioned joins.
11+
12+
Storage-Partitioned Join is meant for [Spark SQL connectors](../connector/index.md) (yet there are none built-in at the moment).
913

1014
Storage-Partitioned Join was proposed in this [SPIP](https://docs.google.com/document/d/1foTkDSM91VxKgkEcBMsuAvEjNybjja-uHk-r3vtXWFE).
1115

12-
Storage-Partitioned Join uses [KeyGroupedPartitioning](../connector/KeyGroupedPartitioning.md) to determine partitions.
16+
!!! note
17+
It [appears](../physical-optimizations/EnsureRequirements.md#checkKeyGroupCompatible) that [SortMergeJoinExec](../physical-operators/SortMergeJoinExec.md) and [ShuffledHashJoinExec](../physical-operators/ShuffledHashJoinExec.md) physical operator are the only candidates for Storage-Partitioned Joins.
18+
19+
## Configuration Properties
20+
21+
* [spark.sql.sources.v2.bucketing.enabled](../configuration-properties.md#spark.sql.sources.v2.bucketing.enabled)
22+
* [spark.sql.sources.v2.bucketing.pushPartValues.enabled](../configuration-properties.md#spark.sql.sources.v2.bucketing.pushPartValues.enabled)
23+
* [spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled](../configuration-properties.md#spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled)
24+
25+
## Apache Iceberg
26+
27+
Storage-Partitioned Join is supported in [Apache Iceberg 1.2.0](https://iceberg.apache.org/releases/#121-release):
28+
29+
> Added support for storage partition joins to improve read and write performance ([#6371](https://github.com/apache/iceberg/pull/6371))
30+
31+
## Delta Lake
32+
33+
Storage-Partitioned Join is not supported in Delta Lake yet (as per [this feature request](https://github.com/delta-io/delta/issues/1698)).
34+
35+
## Learn More
36+
37+
1. [What's new in Apache Spark 3.3 - joins](https://www.waitingforcode.com/apache-spark-sql/what-new-apache-spark-3.3-joins/read) by Bartosz Konieczny
38+
1. (video) [Storage-Partitioned Join for Apache Spark](https://youtu.be/ioLeHZDMSuU)
39+
1. (video) [Eliminating Shuffles in Delete Update, and Merge](https://youtu.be/AIZjy6_K0ws)

mkdocs.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,6 @@ nav:
163163
- ... | bloom-filter-join/**.md
164164
- ... | bucketing/**.md
165165
- ... | cache-serialization/**.md
166-
- ... | storage-partitioned-joins/**.md
167166
- Catalog Plugin API:
168167
- connector/catalog/index.md
169168
- CatalogExtension: connector/catalog/CatalogExtension.md
@@ -224,10 +223,11 @@ nav:
224223
- Partition File Metadata Caching:
225224
- partition-file-metadata-caching/index.md
226225
# FIXME Rename to spark-connect?
227-
- ... | connect/**.md
228226
- ... | runtime-filtering/**.md
227+
- ... | connect/**.md
229228
- ... | thrift-server/**.md
230-
- Statistics: new-and-noteworthy/statistics.md
229+
- new-and-noteworthy/statistics.md
230+
- ... | storage-partitioned-joins/**.md
231231
- ... | subexpression-elimination/**.md
232232
- ... | subqueries/**.md
233233
- ... | table-valued-functions/**.md

0 commit comments

Comments
 (0)