Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/query/sql/src/planner/optimizer/ir/stats/column_stat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ pub struct ColumnStat {
/// Count of null values
pub null_count: u64,

pub num_rows: u64,
pub origin_ndv: f64,

/// Histogram of column
pub histogram: Option<Histogram>,
}
Expand Down
4 changes: 4 additions & 0 deletions src/query/sql/src/planner/optimizer/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,10 @@ pub async fn optimize_query(opt_ctx: Arc<OptimizerContext>, s_expr: SExpr) -> Re
.add(SingleToInnerOptimizer::new())
// 12. Deduplicate join conditions.
.add(DeduplicateJoinConditionOptimizer::new())
.add(RecursiveRuleOptimizer::new(
opt_ctx.clone(),
[RuleID::PushDownAntiJoin].as_slice(),
))
// 13. Apply join commutativity to further optimize join ordering
.add_if(
opt_ctx.get_enable_join_reorder(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ use crate::optimizer::optimizers::rule::RulePushDownRankLimitAggregate;
use crate::optimizer::optimizers::rule::RulePushDownSortEvalScalar;
use crate::optimizer::optimizers::rule::RulePushDownSortFilterScan;
use crate::optimizer::optimizers::rule::RulePushDownSortScan;
use crate::optimizer::optimizers::rule::RulePushdownAntiJoin;
use crate::optimizer::optimizers::rule::RuleSemiToInnerJoin;
use crate::optimizer::optimizers::rule::RuleSplitAggregate;
use crate::optimizer::optimizers::rule::RuleTryApplyAggIndex;
Expand Down Expand Up @@ -130,6 +131,7 @@ impl RuleFactory {
RuleID::MergeFilterIntoMutation => {
Ok(Box::new(RuleMergeFilterIntoMutation::new(metadata)))
}
RuleID::PushDownAntiJoin => Ok(Box::new(RulePushdownAntiJoin::new())),
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ pub fn try_push_down_filter_join(s_expr: &SExpr, metadata: MetadataRef) -> Resul
right_push_down = vec![];
}
}

let join_prop = JoinProperty::new(&left_prop.output_columns, &right_prop.output_columns);
let mut infer_filter = InferFilterOptimizer::new(Some(join_prop));
push_down_predicates = infer_filter.optimize(push_down_predicates)?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@ mod push_down_filter_join;
mod rule_commute_join;
mod rule_commute_join_base_table;
mod rule_left_exchange_join;
mod rule_push_down_anti_join;
mod rule_semi_to_inner_join;
mod util;

pub use push_down_filter_join::*;
pub use rule_commute_join::RuleCommuteJoin;
pub use rule_commute_join_base_table::RuleCommuteJoinBaseTable;
pub use rule_left_exchange_join::RuleLeftExchangeJoin;
pub use rule_push_down_anti_join::RulePushdownAntiJoin;
pub use rule_semi_to_inner_join::RuleSemiToInnerJoin;
pub use util::get_join_predicates;
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use databend_common_exception::Result;

use crate::binder::JoinPredicate;
use crate::optimizer::ir::Matcher;
use crate::optimizer::ir::RelExpr;
use crate::optimizer::ir::SExpr;
use crate::optimizer::optimizers::rule::Rule;
use crate::optimizer::optimizers::rule::RuleID;
use crate::optimizer::optimizers::rule::TransformResult;
use crate::plans::Join;
use crate::plans::JoinType;
use crate::plans::RelOp;
use crate::plans::RelOperator;
use crate::ColumnSet;

/// Push `Left/Right Semi|Anti` join closer to the base table that participates
/// in the predicate so that fewer rows stay in the join tree.
pub struct RulePushdownAntiJoin {
id: RuleID,
matchers: Vec<Matcher>,
}

impl RulePushdownAntiJoin {
pub fn new() -> Self {
Self {
id: RuleID::PushDownAntiJoin,
matchers: vec![Matcher::MatchOp {
op_type: RelOp::Join,
children: vec![Matcher::Leaf, Matcher::Leaf],
}],
}
}

fn try_push_down(&self, left: &SExpr, right: &SExpr, join: Join) -> Result<Option<SExpr>> {
let right_rel_expr = RelExpr::with_s_expr(right);

if let Some(inner_join) = extract_inner_join(left)? {
let inner_join_rel_expr = RelExpr::with_s_expr(&inner_join);
let inner_join_left_prop = inner_join_rel_expr.derive_relational_prop_child(0)?;
let inner_join_right_prop = inner_join_rel_expr.derive_relational_prop_child(1)?;

let equi_conditions = join
.equi_conditions
.iter()
.map(|condition| {
JoinPredicate::new(
&condition.left,
&inner_join_left_prop,
&inner_join_right_prop,
)
})
.collect::<Vec<_>>();

if equi_conditions.iter().all(left_predicate) {
let right_prop = right_rel_expr.derive_relational_prop()?;
let mut union_output_columns = ColumnSet::new();
union_output_columns.extend(right_prop.output_columns.clone());
union_output_columns.extend(inner_join_left_prop.output_columns.clone());

if join
.non_equi_conditions
.iter()
.all(|x| x.used_columns().is_subset(&union_output_columns))
{
let new_inner_join = inner_join.replace_children([
Arc::new(SExpr::create_binary(
RelOperator::Join(join.clone()),
inner_join.child(0)?.clone(),
right.clone(),
)),
Arc::new(inner_join.child(1)?.clone()),
]);

return replace_inner_join(left, new_inner_join);
}
} else if equi_conditions.iter().all(right_predicate) {
let right_prop = right_rel_expr.derive_relational_prop()?;
let mut union_output_columns = ColumnSet::new();
union_output_columns.extend(right_prop.output_columns.clone());
union_output_columns.extend(inner_join_right_prop.output_columns.clone());

if join
.non_equi_conditions
.iter()
.all(|x| x.used_columns().is_subset(&union_output_columns))
{
let new_inner_join = inner_join.replace_children([
Arc::new(inner_join.child(0)?.clone()),
Arc::new(SExpr::create_binary(
RelOperator::Join(join.clone()),
inner_join.child(1)?.clone(),
right.clone(),
)),
]);

return replace_inner_join(left, new_inner_join);
}
}
}

Ok(None)
}
}

impl Rule for RulePushdownAntiJoin {
fn id(&self) -> RuleID {
self.id
}

fn apply(&self, s_expr: &SExpr, state: &mut TransformResult) -> Result<()> {
let join: Join = s_expr.plan().clone().try_into()?;

if matches!(join.join_type, JoinType::LeftAnti | JoinType::LeftSemi) {
if let Some(mut result) =
self.try_push_down(s_expr.child(0)?, s_expr.child(1)?, join)?
{
result.set_applied_rule(&self.id);
state.add_result(result);
}
}

Ok(())
}

fn matchers(&self) -> &[Matcher] {
&self.matchers
}
}

impl Default for RulePushdownAntiJoin {
fn default() -> Self {
Self::new()
}
}

fn replace_inner_join(expr: &SExpr, new_join: SExpr) -> Result<Option<SExpr>> {
match expr.plan() {
RelOperator::Join(join) if join.join_type == JoinType::Inner => Ok(Some(new_join)),
RelOperator::Filter(_) => match replace_inner_join(expr.child(0)?, new_join)? {
None => Ok(None),
Some(new_child) => Ok(Some(expr.replace_children([Arc::new(new_child)]))),
},
_ => Ok(None),
}
}

fn extract_inner_join(expr: &SExpr) -> Result<Option<SExpr>> {
match expr.plan() {
RelOperator::Join(join) if join.join_type == JoinType::Inner => Ok(Some(expr.clone())),
RelOperator::Filter(_) => extract_inner_join(expr.child(0)?),
_ => Ok(None),
}
}

fn left_predicate(tuple: &JoinPredicate) -> bool {
matches!(&tuple, JoinPredicate::Left(_))
}

fn right_predicate(tuple: &JoinPredicate) -> bool {
matches!(&tuple, JoinPredicate::Right(_))
}
2 changes: 2 additions & 0 deletions src/query/sql/src/planner/optimizer/optimizers/rule/rule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ pub enum RuleID {
PushDownSortFilterScan,
PushDownLimitFilterScan,
SemiToInnerJoin,
PushDownAntiJoin,
EliminateEvalScalar,
EliminateFilter,
EliminateSort,
Expand Down Expand Up @@ -194,6 +195,7 @@ impl Display for RuleID {
RuleID::EliminateUnion => write!(f, "EliminateUnion"),

RuleID::MergeFilterIntoMutation => write!(f, "MergeFilterIntoMutation"),
RuleID::PushDownAntiJoin => write!(f, "PushDownAntiJoin"),
}
}
}
2 changes: 2 additions & 0 deletions src/query/sql/src/planner/plans/constant_table_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,8 @@ impl Operator for ConstantTableScan {
max,
ndv: ndv as f64,
null_count,
num_rows: self.num_rows as u64,
origin_ndv: ndv as f64,
histogram,
};
column_stats.insert(*index, column_stat);
Expand Down
88 changes: 84 additions & 4 deletions src/query/sql/src/planner/plans/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -507,10 +507,43 @@ impl Join {
+ f64::max(right_cardinality, inner_join_cardinality)
- inner_join_cardinality
}
JoinType::LeftSemi => f64::min(left_cardinality, inner_join_cardinality),
JoinType::RightSemi => f64::min(right_cardinality, inner_join_cardinality),
JoinType::LeftSingle | JoinType::RightMark | JoinType::LeftAnti => left_cardinality,
JoinType::RightSingle | JoinType::LeftMark | JoinType::RightAnti => right_cardinality,
JoinType::LeftSemi => {
let left_exprs = self
.equi_conditions
.iter()
.map(|x| &x.left)
.collect::<Vec<_>>();

self.semi_cardinality(left_cardinality, &left_statistics, &left_exprs)
}
JoinType::RightSemi => {
let right_exprs = self
.equi_conditions
.iter()
.map(|x| &x.right)
.collect::<Vec<_>>();

self.semi_cardinality(right_cardinality, &right_statistics, &right_exprs)
}
JoinType::LeftAnti => {
let left_exprs = self
.equi_conditions
.iter()
.map(|x| &x.left)
.collect::<Vec<_>>();
self.anti_cardinality(left_cardinality, &left_statistics, &left_exprs)
}
JoinType::RightAnti => {
let right_exprs = self
.equi_conditions
.iter()
.map(|x| &x.right)
.collect::<Vec<_>>();

self.anti_cardinality(right_cardinality, &right_statistics, &right_exprs)
}
JoinType::LeftSingle | JoinType::RightMark => left_cardinality,
JoinType::RightSingle | JoinType::LeftMark => right_cardinality,
};
// Derive column statistics
let column_stats = if cardinality == 0.0 {
Expand Down Expand Up @@ -558,6 +591,53 @@ impl Join {
.iter()
.any(|expr| expr.has_subquery())
}

fn anti_cardinality(
&self,
cardinality: f64,
statistics: &Statistics,
exprs: &[&ScalarExpr],
) -> f64 {
let mut anti_cardinality = cardinality;
for expr in exprs {
let mut used_columns = expr.used_columns();

let (Some(column), None) = (used_columns.pop_first(), used_columns.pop_first()) else {
continue;
};

if let Some(column_stat) = statistics.column_stats.get(&column) {
let semi_cardinality = cardinality * column_stat.ndv / column_stat.origin_ndv;
let column_cardinality = (cardinality - semi_cardinality).max(cardinality * 0.3);
anti_cardinality = 1_f64.max(column_cardinality.min(anti_cardinality));
}
}

anti_cardinality
}

fn semi_cardinality(
&self,
cardinality: f64,
statistics: &Statistics,
exprs: &[&ScalarExpr],
) -> f64 {
let mut semi_cardinality = cardinality;
for expr in exprs {
let mut used_columns = expr.used_columns();

let (Some(column), None) = (used_columns.pop_first(), used_columns.pop_first()) else {
continue;
};

if let Some(column_stat) = statistics.column_stats.get(&column) {
let column_cardinality = cardinality * column_stat.ndv / column_stat.origin_ndv;
semi_cardinality = 1_f64.max(column_cardinality.min(semi_cardinality));
}
}

semi_cardinality
}
}

impl Operator for Join {
Expand Down
4 changes: 4 additions & 0 deletions src/query/sql/src/planner/plans/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ impl Operator for Scan {
.unwrap_or(0);

let mut column_stats: ColumnStatSet = Default::default();

for (k, v) in &self.statistics.column_stats {
// No need to cal histogram for unused columns
if !used_columns.contains(k) {
Expand Down Expand Up @@ -311,11 +312,14 @@ impl Operator for Scan {
None
}
};

let column_stat = ColumnStat {
min,
max,
ndv: ndv as f64,
null_count: col_stat.null_count,
origin_ndv: ndv as f64,
num_rows,
histogram,
};
column_stats.insert(*k as IndexType, column_stat);
Expand Down
Loading
Loading