Skip to content

Commit d0b20e7

Browse files
BloomFilterAggregate and Bloom Filter Join
1 parent 53410bb commit d0b20e7

File tree

4 files changed

+122
-10
lines changed

4 files changed

+122
-10
lines changed

docs/aggregations/SortBasedAggregationIterator.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
initialize(): Unit
3030
```
3131

32-
!!! note "Procedure"
32+
!!! warning "Procedure"
3333
`initialize` returns `Unit` (_nothing_) and whatever happens inside stays inside (just like in Las Vegas, _doesn't it?!_ 😉)
3434

3535
`initialize`...FIXME

docs/expressions/BloomFilterAggregate.md

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,13 @@ title: BloomFilterAggregate
2020

2121
* `InjectRuntimeFilter` logical optimization is requested to [inject a BloomFilter](../logical-optimizations/InjectRuntimeFilter.md#injectBloomFilter)
2222

23-
### <span id="estimatedNumItemsExpression"> Estimated Number of Items Expression
23+
### Estimated Number of Items Expression { #estimatedNumItemsExpression }
2424

2525
`BloomFilterAggregate` can be given **Estimated Number of Items** (as an [Expression](Expression.md)) when [created](#creating-instance).
2626

2727
Unless given, `BloomFilterAggregate` uses [spark.sql.optimizer.runtime.bloomFilter.expectedNumItems](../configuration-properties.md#spark.sql.optimizer.runtime.bloomFilter.expectedNumItems) configuration property.
2828

29-
### <span id="numBitsExpression"> Number of Bits Expression
29+
### Number of Bits Expression { #numBitsExpression }
3030

3131
`BloomFilterAggregate` can be given **Number of Bits** (as an [Expression](Expression.md)) when [created](#creating-instance).
3232

@@ -36,7 +36,7 @@ The maximum value for the number of bits is [spark.sql.optimizer.runtime.bloomFi
3636

3737
The number of bits expression is the [third](#third) expression (in this `TernaryLike` tree node).
3838

39-
## <span id="numBits"> Number of Bits
39+
## Number of Bits { #numBits }
4040

4141
```scala
4242
numBits: Long
@@ -76,4 +76,22 @@ The `numBits` value [must be a positive value](#checkInputDataTypes).
7676

7777
`eval` is part of the [TypedImperativeAggregate](TypedImperativeAggregate.md#eval) abstraction.
7878

79-
`eval`...FIXME
79+
`eval` [serializes](#serialize) the given `buffer` (unless the [cardinality](../bloom-filter-join/BloomFilter.md#cardinality) of this `BloomFilter` is `0` and `eval` returns `null`).
80+
81+
??? note "FIXME Why does `eval` return `null`?"
82+
83+
## Serializing Aggregate Buffer { #serialize }
84+
85+
??? note "TypedImperativeAggregate"
86+
87+
```scala
88+
serialize(
89+
obj: BloomFilter): Array[Byte]
90+
```
91+
92+
`serialize` is part of the [TypedImperativeAggregate](TypedImperativeAggregate.md#serialize) abstraction.
93+
94+
??? note "Two `serialize`s"
95+
There is another `serialize` (in `BloomFilterAggregate` companion object) that just makes unit testing easier.
96+
97+
`serialize`...FIXME

docs/expressions/TypedImperativeAggregate.md

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ Used when:
7474

7575
* `TypedImperativeAggregate` is requested to [merge](#merge-Expression) and [mergeBuffersObjects](#mergeBuffersObjects)
7676

77-
### serialize { #serialize }
77+
### Serializing Aggregate Buffer { #serialize }
7878

7979
```scala
8080
serialize(
@@ -88,7 +88,7 @@ See:
8888

8989
Used when:
9090

91-
* `TypedImperativeAggregate` is requested to [serializeAggregateBufferInPlace](#serializeAggregateBufferInPlace)
91+
* `TypedImperativeAggregate` is requested to [serialize the aggregate buffer in-place](#serializeAggregateBufferInPlace)
9292

9393
### update { #update }
9494

@@ -126,7 +126,7 @@ Used when:
126126

127127
`eval` [take the buffer object](#getBufferObject) out of the given [InternalRow](../InternalRow.md) and [evaluates the result](#eval).
128128

129-
## Aggregation Buffer Attributes { #aggBufferAttributes }
129+
## Aggregate Buffer Attributes { #aggBufferAttributes }
130130

131131
??? note "AggregateFunction"
132132

@@ -142,7 +142,7 @@ Used when:
142142
-------|---------
143143
`buf` | [BinaryType](../types/DataType.md#BinaryType)
144144

145-
## Accessing Buffer Object { #getBufferObject }
145+
## Extracting Aggregate Buffer Object { #getBufferObject }
146146

147147
```scala
148148
getBufferObject(
@@ -171,3 +171,25 @@ anyObjectType: ObjectType
171171
When created, `TypedImperativeAggregate` creates an `ObjectType` of a value of Scala `AnyRef` type.
172172

173173
The `ObjectType` is used in [getBufferObject](#getBufferObject).
174+
175+
## Serializing Aggregate Buffer In-Place { #serializeAggregateBufferInPlace }
176+
177+
```scala
178+
serializeAggregateBufferInPlace(
179+
buffer: InternalRow): Unit
180+
```
181+
182+
??? warning "Procedure"
183+
`serializeAggregateBufferInPlace` is a procedure (returns `Unit`) so _whatever happens inside, stays inside_ (paraphrasing the [former advertising slogan of Las Vegas, Nevada](https://idioms.thefreedictionary.com/what+happens+in+Vegas+stays+in+Vegas)).
184+
185+
`serializeAggregateBufferInPlace` [gets the aggregate buffer](#getBufferObject) from the given `buffer` and [serializes it](#serialize).
186+
187+
In the end, `serializeAggregateBufferInPlace` stores the serialized aggregate buffer back to the given `buffer` at [mutableAggBufferOffset](ImperativeAggregate.md#mutableAggBufferOffset).
188+
189+
---
190+
191+
`serializeAggregateBufferInPlace` is used when:
192+
193+
* `AggregatingAccumulator` is requested to [withBufferSerialized](../AggregatingAccumulator.md#withBufferSerialized)
194+
* `AggregationIterator` is requested to [generateResultProjection](../aggregations/AggregationIterator.md#generateResultProjection)
195+
* `ObjectAggregationMap` is requested to [dumpToExternalSorter](../aggregations/ObjectAggregationMap.md#dumpToExternalSorter)

docs/logical-optimizations/InjectRuntimeFilter.md

Lines changed: 73 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@
44

55
`InjectRuntimeFilter` is part of [InjectRuntimeFilter](../SparkOptimizer.md#InjectRuntimeFilter) fixed-point batch of rules.
66

7+
!!! note "Runtime Filter"
8+
**Runtime Filter** can be a [BloomFilter](#hasBloomFilter) (with [spark.sql.optimizer.runtime.bloomFilter.enabled](../configuration-properties.md#spark.sql.optimizer.runtime.bloomFilter.enabled) enabled) or [InSubquery](#hasInSubquery) filter.
9+
710
!!! note "Noop"
811
`InjectRuntimeFilter` is a _noop_ (and does nothing) for the following cases:
912

@@ -107,7 +110,30 @@ injectInSubqueryFilter(
107110
filterCreationSidePlan: LogicalPlan): LogicalPlan
108111
```
109112

110-
`injectInSubqueryFilter`...FIXME
113+
!!! note "The same `DataType`s"
114+
`injectInSubqueryFilter` requires that the [DataType](../expressions/Expression.md#dataType)s of the given `filterApplicationSideExp` and `filterCreationSideExp` are the same.
115+
116+
`injectInSubqueryFilter` creates an [Aggregate](../logical-operators/Aggregate.md) logical operator with the following:
117+
118+
Property | Value
119+
---------|------
120+
[Grouping Expressions](../logical-operators/Aggregate.md#groupingExpressions) | The given `filterCreationSideExp` expression
121+
[Aggregate Expressions](../logical-operators/Aggregate.md#aggregateExpressions) | An `Alias` expression for the `filterCreationSideExp` expression (possibly [mayWrapWithHash](#mayWrapWithHash))
122+
[Child Logical Operator](../logical-operators/Aggregate.md#child) | The given `filterCreationSidePlan` expression
123+
124+
`injectInSubqueryFilter` executes [ColumnPruning](../logical-optimizations/ColumnPruning.md) logical optimization on the `Aggregate` logical operator.
125+
126+
Unless the `Aggregate` logical operator [canBroadcastBySize](../JoinSelectionHelper.md#canBroadcastBySize), `injectInSubqueryFilter` returns the given `filterApplicationSidePlan` logical plan (and basically throws away all the work so far).
127+
128+
!!! note
129+
`injectInSubqueryFilter` skips the `InSubquery` filter if the size of the `Aggregate` is beyond [broadcast join threshold](../JoinSelectionHelper.md#canBroadcastBySize) and the semi-join will be a shuffle join, which is not worthwhile.
130+
131+
`injectInSubqueryFilter` creates an `InSubquery` logical operator with the following:
132+
133+
* The given `filterApplicationSideExp` (possibly [mayWrapWithHash](#mayWrapWithHash))
134+
* [ListQuery](../expressions/ListQuery.md) expression with the `Aggregate`
135+
136+
In the end, `injectInSubqueryFilter` creates a `Filter` logical operator with the `InSubquery` logical operator and the given `filterApplicationSidePlan` expression.
111137

112138
!!! note
113139
`injectInSubqueryFilter` is used when `InjectRuntimeFilter` is requested to [injectFilter](#injectFilter) with [spark.sql.optimizer.runtime.bloomFilter.enabled](../configuration-properties.md#spark.sql.optimizer.runtime.bloomFilter.enabled) configuration properties disabled (unlike [spark.sql.optimizer.runtimeFilter.semiJoinReduction.enabled](../configuration-properties.md#spark.sql.optimizer.runtimeFilter.semiJoinReduction.enabled)).
@@ -128,3 +154,49 @@ isSimpleExpression(
128154
* `LIKE_FAMLIY`
129155
* `REGEXP_EXTRACT_FAMILY`
130156
* `REGEXP_REPLACE`
157+
158+
## hasDynamicPruningSubquery { #hasDynamicPruningSubquery }
159+
160+
```scala
161+
hasDynamicPruningSubquery(
162+
left: LogicalPlan,
163+
right: LogicalPlan,
164+
leftKey: Expression,
165+
rightKey: Expression): Boolean
166+
```
167+
168+
`hasDynamicPruningSubquery` checks if there is a `Filter` logical operator with a [DynamicPruningSubquery](../expressions/DynamicPruningSubquery.md) expression on the `left` or `right` side (of a join).
169+
170+
## hasRuntimeFilter { #hasRuntimeFilter }
171+
172+
```scala
173+
hasRuntimeFilter(
174+
left: LogicalPlan,
175+
right: LogicalPlan,
176+
leftKey: Expression,
177+
rightKey: Expression): Boolean
178+
```
179+
180+
`hasRuntimeFilter` checks if there is [hasBloomFilter](#hasBloomFilter) (with [spark.sql.optimizer.runtime.bloomFilter.enabled](../configuration-properties.md#spark.sql.optimizer.runtime.bloomFilter.enabled) enabled) or [hasInSubquery](#hasInSubquery) filter on the `left` or `right` side (of a join).
181+
182+
## hasBloomFilter { #hasBloomFilter }
183+
184+
```scala
185+
hasBloomFilter(
186+
left: LogicalPlan,
187+
right: LogicalPlan,
188+
leftKey: Expression,
189+
rightKey: Expression): Boolean
190+
```
191+
192+
`hasBloomFilter` checks if there is [findBloomFilterWithExp](#findBloomFilterWithExp) on the `left` or `right` side (of a join).
193+
194+
## findBloomFilterWithExp { #findBloomFilterWithExp }
195+
196+
```scala
197+
findBloomFilterWithExp(
198+
plan: LogicalPlan,
199+
key: Expression): Boolean
200+
```
201+
202+
`findBloomFilterWithExp` tries to find a `Filter` logical operator with a [BloomFilterMightContain](../expressions/BloomFilterMightContain.md) expression (and `XxHash64`) among the nodes of the given [LogicalPlan](../logical-operators/LogicalPlan.md).

0 commit comments

Comments
 (0)