Skip to content

Commit 6409885

Browse files
DataSourceV2ScanExecBase and "v2 Bucketing"
1 parent 9841744 commit 6409885

File tree

3 files changed

+154
-43
lines changed

3 files changed

+154
-43
lines changed

docs/configuration-properties.md

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1262,6 +1262,47 @@ Use [SQLConf.OUTPUT_COMMITTER_CLASS](SQLConf.md#OUTPUT_COMMITTER_CLASS) to acces
12621262
!!! note
12631263
`ParquetUtils` uses [spark.sql.parquet.output.committer.class](#spark.sql.parquet.output.committer.class) or the default `ParquetOutputCommitter` instead.
12641264

1265+
### <span id="V2_BUCKETING_ENABLED"> v2.bucketing.enabled { #spark.sql.sources.v2.bucketing.enabled }
1266+
1267+
**spark.sql.sources.v2.bucketing.enabled**
1268+
1269+
Enables bucketing for [connectors](connector/index.md) (_V2 data sources_).
1270+
1271+
When enabled, Spark will recognize the specific distribution reported by a V2 data source through [SupportsReportPartitioning](connector/SupportsReportPartitioning.md), and avoid shuffle if necessary.
1272+
1273+
Similar to [spark.sql.sources.bucketing.enabled](#spark.sql.sources.bucketing.enabled)
1274+
1275+
Use [SQLConf.v2BucketingEnabled](SQLConf.md#v2BucketingEnabled) for the current value
1276+
1277+
Used when:
1278+
1279+
* `DataSourceV2ScanExecBase` is requested to [groupPartitions](physical-operators/DataSourceV2ScanExecBase.md#groupPartitions)
1280+
1281+
### <span id="V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED"> v2.bucketing.partiallyClusteredDistribution.enabled { #spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled }
1282+
1283+
**spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled**
1284+
1285+
During a Storage-Partitioned Join, whether to allow input partitions to be partially clustered, when both sides of the join are of `KeyGroupedPartitioning`.
1286+
1287+
Default: `false`
1288+
1289+
At planning time, Spark will pick the side with less data size based on table statistics, group and replicate them to match the other side.
1290+
1291+
This is an optimization on skew join and can help to reduce data skewness when certain partitions are assigned large amount of data.
1292+
1293+
Requires both [spark.sql.sources.v2.bucketing.enabled](#spark.sql.sources.v2.bucketing.enabled) and [spark.sql.sources.v2.bucketing.pushPartValues.enabled](#spark.sql.sources.v2.bucketing.pushPartValues.enabled) to be enabled
1294+
1295+
### <span id="V2_BUCKETING_PUSH_PART_VALUES_ENABLED"> v2.bucketing.pushPartValues.enabled { #spark.sql.sources.v2.bucketing.pushPartValues.enabled }
1296+
1297+
**spark.sql.sources.v2.bucketing.pushPartValues.enabled**
1298+
1299+
Whether to pushdown common partition values when [spark.sql.sources.v2.bucketing.enabled](#spark.sql.sources.v2.bucketing.enabled) is enabled.
1300+
1301+
Default: `false`
1302+
1303+
When enabled, if both sides of a join are of `KeyGroupedPartitioning` and if they share compatible partition keys, even if they don't have the exact same partition values, Spark will calculate a superset of partition values and pushdown that info to scan nodes, which will use empty partitions for the missing partition values on either side.
1304+
This could help to eliminate unnecessary shuffles.
1305+
12651306
## <span id="spark.sql.objectHashAggregate.sortBased.fallbackThreshold"> spark.sql.objectHashAggregate.sortBased.fallbackThreshold
12661307

12671308
**(internal)** The number of entires in an in-memory hash map (to store aggregation buffers per grouping keys) before [ObjectHashAggregateExec](physical-operators/ObjectHashAggregateExec.md) ([ObjectAggregationIterator](aggregations/ObjectAggregationIterator.md#processInputs), precisely) falls back to sort-based aggregation

docs/connector/index.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# Connector API
22

3-
**Connector API** is a new API in Spark 3 for Spark SQL developers to create [connectors](../connectors/index.md) (_data sources_ or _providers_).
3+
**Connector API** is a new API in Spark 3 for Spark SQL developers to create [connectors](../connectors/index.md) (_V2 data sources_ or _providers_).
44

55
!!! note
66
Connector API is meant to replace the older (deprecated) DataSource v1 and v2.

docs/physical-operators/DataSourceV2ScanExecBase.md

Lines changed: 112 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -32,18 +32,21 @@ See:
3232

3333
* [BatchScanExec](BatchScanExec.md#inputRDD)
3434

35-
### keyGroupedPartitioning { #keyGroupedPartitioning }
35+
### Custom Partitioning Expressions { #keyGroupedPartitioning }
3636

3737
```scala
3838
keyGroupedPartitioning: Option[Seq[Expression]]
3939
```
4040

41-
Optional partitioning [expression](../expressions/Expression.md)s
41+
Optional partitioning [expression](../expressions/Expression.md)s (provided by [connectors](../connector/index.md) using [SupportsReportPartitioning](../connector/SupportsReportPartitioning.md))
4242

4343
See:
4444

4545
* [BatchScanExec](BatchScanExec.md#keyGroupedPartitioning)
4646

47+
??? note "Spark Structured Streaming Not Supported"
48+
`ContinuousScanExec` and `MicroBatchScanExec` physical operators are not supported (and have `keyGroupedPartitioning` undefined (`None`)).
49+
4750
Used when:
4851

4952
* `DataSourceV2ScanExecBase` is requested to [groupedPartitions](#groupedPartitions), [groupPartitions](#groupPartitions), [outputPartitioning](#outputPartitioning)
@@ -80,80 +83,92 @@ scan: Scan
8083
* `ContinuousScanExec` ([Spark Structured Streaming]({{ book.structured_streaming }}/physical-operators/ContinuousScanExec))
8184
* `MicroBatchScanExec` ([Spark Structured Streaming]({{ book.structured_streaming }}/physical-operators/MicroBatchScanExec))
8285

83-
## <span id="doExecute"> Executing Physical Operator
86+
## Executing Physical Operator { #doExecute }
8487

85-
```scala
86-
doExecute(): RDD[InternalRow]
87-
```
88+
??? note "SparkPlan"
8889

89-
`doExecute` is part of the [SparkPlan](SparkPlan.md#doExecute) abstraction.
90+
```scala
91+
doExecute(): RDD[InternalRow]
92+
```
9093

91-
---
94+
`doExecute` is part of the [SparkPlan](SparkPlan.md#doExecute) abstraction.
9295

9396
`doExecute`...FIXME
9497

95-
## <span id="doExecuteColumnar"> doExecuteColumnar
98+
## doExecuteColumnar { #doExecuteColumnar }
9699

97-
```scala
98-
doExecuteColumnar(): RDD[ColumnarBatch]
99-
```
100+
??? note "SparkPlan"
100101

101-
`doExecuteColumnar` is part of the [SparkPlan](SparkPlan.md#doExecuteColumnar) abstraction.
102+
```scala
103+
doExecuteColumnar(): RDD[ColumnarBatch]
104+
```
102105

103-
---
106+
`doExecuteColumnar` is part of the [SparkPlan](SparkPlan.md#doExecuteColumnar) abstraction.
104107

105108
`doExecuteColumnar`...FIXME
106109

107-
## <span id="metrics"> Performance Metrics
110+
## Performance Metrics { #metrics }
108111

109-
```scala
110-
metrics: Map[String, SQLMetric]
111-
```
112+
??? note "SparkPlan"
112113

113-
`metrics` is part of the [SparkPlan](SparkPlan.md#metrics) abstraction.
114+
```scala
115+
metrics: Map[String, SQLMetric]
116+
```
114117

115-
---
118+
`metrics` is part of the [SparkPlan](SparkPlan.md#metrics) abstraction.
116119

117120
`metrics` is the following [SQLMetric](../SQLMetric.md)s with the [customMetrics](#customMetrics):
118121

119122
Metric Name | web UI
120123
------------|--------
121124
`numOutputRows` | number of output rows
122125

123-
## <span id="outputPartitioning"> Output Data Partitioning Requirements
126+
## Output Data Partitioning { #outputPartitioning }
124127

125-
```scala
126-
outputPartitioning: physical.Partitioning
127-
```
128+
??? note "SparkPlan"
128129

129-
`outputPartitioning` is part of the [SparkPlan](SparkPlan.md#outputPartitioning) abstraction.
130+
```scala
131+
outputPartitioning: physical.Partitioning
132+
```
130133

131-
---
134+
`outputPartitioning` is part of the [SparkPlan](SparkPlan.md#outputPartitioning) abstraction.
132135

133136
`outputPartitioning`...FIXME
134137

135-
## <span id="simpleString"> Simple Node Description
138+
## Output Data Ordering { #outputOrdering }
136139

137-
```scala
138-
simpleString(
139-
maxFields: Int): String
140-
```
140+
??? note "QueryPlan"
141141

142-
`simpleString` is part of the [TreeNode](../catalyst/TreeNode.md#simpleString) abstraction.
142+
```scala
143+
outputOrdering: Seq[SortOrder]
144+
```
143145

144-
---
146+
`outputOrdering` is part of the [QueryPlan](../catalyst/QueryPlan.md#outputOrdering) abstraction.
147+
148+
`outputOrdering`...FIXME
149+
150+
## Simple Node Description { #simpleString }
151+
152+
??? note "TreeNode"
153+
154+
```scala
155+
simpleString(
156+
maxFields: Int): String
157+
```
158+
159+
`simpleString` is part of the [TreeNode](../catalyst/TreeNode.md#simpleString) abstraction.
145160

146161
`simpleString`...FIXME
147162

148-
## <span id="supportsColumnar"> supportsColumnar
163+
## supportsColumnar { #supportsColumnar }
149164

150-
```scala
151-
supportsColumnar: Boolean
152-
```
165+
??? note "SparkPlan"
153166

154-
`supportsColumnar` is part of the [SparkPlan](SparkPlan.md#supportsColumnar) abstraction.
167+
```scala
168+
supportsColumnar: Boolean
169+
```
155170

156-
---
171+
`supportsColumnar` is part of the [SparkPlan](SparkPlan.md#supportsColumnar) abstraction.
157172

158173
`supportsColumnar` is `true` if the [PartitionReaderFactory](#readerFactory) can [supportColumnarReads](../connector/PartitionReaderFactory.md#supportColumnarReads) for all the [input partitions](#inputPartitions). Otherwise, `supportsColumnar` is `false`.
159174

@@ -165,7 +180,7 @@ supportsColumnar: Boolean
165180
Cannot mix row-based and columnar input partitions.
166181
```
167182

168-
## <span id="customMetrics"> Custom Metrics
183+
## Custom Metrics { #customMetrics }
169184

170185
```scala
171186
customMetrics: Map[String, SQLMetric]
@@ -187,9 +202,9 @@ customMetrics: Map[String, SQLMetric]
187202
* `ContinuousScanExec` is requested for the `inputRDD`
188203
* `MicroBatchScanExec` is requested for the `inputRDD` (that creates a [DataSourceRDD](../DataSourceRDD.md))
189204

190-
## <span id="verboseStringWithOperatorId"> verboseStringWithOperatorId
205+
## verboseStringWithOperatorId { #verboseStringWithOperatorId }
191206

192-
??? note "Signature"
207+
??? note "QueryPlan"
193208

194209
```scala
195210
verboseStringWithOperatorId(): String
@@ -209,3 +224,58 @@ In the end, `verboseStringWithOperatorId` is as follows (based on [formattedNode
209224
Output: [output]
210225
[metaDataStr]
211226
```
227+
228+
## Input Partitions { #partitions }
229+
230+
```scala
231+
partitions: Seq[Seq[InputPartition]]
232+
```
233+
234+
`partitions`...FIXME
235+
236+
---
237+
238+
`partitions` is used when:
239+
240+
* `BatchScanExec` physical operator is requested to [filteredPartitions](BatchScanExec.md#filteredPartitions)
241+
* `ContinuousScanExec` physical operator ([Spark Structured Streaming]({{ book.structured_streaming }}/physical-operators/ContinuousScanExec)) is requested for the `inputRDD`
242+
* `MicroBatchScanExec` physical operator ([Spark Structured Streaming]({{ book.structured_streaming }}/physical-operators/MicroBatchScanExec)) is requested for the `inputRDD`
243+
244+
## groupedPartitions { #groupedPartitions }
245+
246+
```scala
247+
groupedPartitions: Option[Seq[(InternalRow, Seq[InputPartition])]]
248+
```
249+
250+
??? note "Lazy Value"
251+
`groupedPartitions` is a Scala **lazy value** to guarantee that the code to initialize it is executed once only (when accessed for the first time) and the computed value never changes afterwards.
252+
253+
Learn more in the [Scala Language Specification]({{ scala.spec }}/05-classes-and-objects.html#lazy).
254+
255+
`groupedPartitions` takes the [keyGroupedPartitioning](#keyGroupedPartitioning), if specified, and [group](#groupPartitions) the [input partitions](#inputPartitions).
256+
257+
---
258+
259+
`groupedPartitions` is used when:
260+
261+
* `DataSourceV2ScanExecBase` physical operator is requested for the [output data ordering](#outputOrdering), [output data partitioning requirements](#outputPartitioning), [partitions](#partitions)
262+
263+
## groupPartitions { #groupPartitions }
264+
265+
```scala
266+
groupPartitions(
267+
inputPartitions: Seq[InputPartition],
268+
groupSplits: Boolean = !conf.v2BucketingPushPartValuesEnabled || !conf.v2BucketingPartiallyClusteredDistributionEnabled): Option[Seq[(InternalRow, Seq[InputPartition])]]
269+
```
270+
271+
!!! note "Noop"
272+
`groupPartitions` does nothing (and returns `None`) when called with [spark.sql.sources.v2.bucketing.enabled](../configuration-properties.md#spark.sql.sources.v2.bucketing.enabled) disabled.
273+
274+
`groupPartitions`...FIXME
275+
276+
---
277+
278+
`groupPartitions` is used when:
279+
280+
* `BatchScanExec` physical operator is requested for the [filtered input partitions](BatchScanExec.md#filteredPartitions) and [input RDD](BatchScanExec.md#inputRDD)
281+
* `DataSourceV2ScanExecBase` is requested for the [groupedPartitions](#groupedPartitions)

0 commit comments

Comments
 (0)