diff --git a/src/query/catalog/src/sbbf.rs b/src/query/catalog/src/sbbf.rs index 368a5f4ac2893..9c90ea5d6df74 100644 --- a/src/query/catalog/src/sbbf.rs +++ b/src/query/catalog/src/sbbf.rs @@ -200,7 +200,7 @@ pub struct Sbbf(Vec); pub struct SbbfAtomic(Vec); pub(crate) const BITSET_MIN_LENGTH: usize = 32; -pub(crate) const BITSET_MAX_LENGTH: usize = 128 * 1024 * 1024; +pub(crate) const BITSET_MAX_LENGTH: usize = 512 * 1024 * 1024; #[inline] fn hash_to_block_index_for_blocks(hash: u64, num_blocks: usize) -> usize { diff --git a/src/query/service/src/physical_plans/runtime_filter/builder.rs b/src/query/service/src/physical_plans/runtime_filter/builder.rs index 60cf42e3685b5..37077c165b889 100644 --- a/src/query/service/src/physical_plans/runtime_filter/builder.rs +++ b/src/query/service/src/physical_plans/runtime_filter/builder.rs @@ -38,6 +38,7 @@ use super::types::PhysicalRuntimeFilters; /// Type alias for probe keys with runtime filter information /// Contains: (RemoteExpr, scan_id, table_index, column_idx) type ProbeKeysWithRuntimeFilter = Vec, usize, usize, IndexType)>>; +type ProbeTargetWithTable = (RemoteExpr, usize, Option); /// Check if a data type is supported for bloom filter /// @@ -138,11 +139,17 @@ pub async fn build_runtime_filter( _ => continue, } - let probe_targets = + let probe_targets_with_tables = find_probe_targets(metadata, probe_side, &probe_key, scan_id, column_idx)?; + let mut probe_targets = Vec::with_capacity(probe_targets_with_tables.len()); + let mut probe_table_rows = Vec::with_capacity(probe_targets_with_tables.len()); + for (probe_expr, probe_scan_id, table_index) in probe_targets_with_tables { + probe_targets.push((probe_expr, probe_scan_id)); + let table_rows = get_table_rows(ctx.clone(), metadata, table_index).await?; + probe_table_rows.push(table_rows); + } - let build_table_rows = - get_build_table_rows(ctx.clone(), metadata, build_table_index).await?; + let build_table_rows = get_table_rows(ctx.clone(), metadata, build_table_index).await?; let data_type = build_key .as_expr(&BUILTIN_FUNCTIONS) @@ -159,6 +166,7 @@ pub async fn build_runtime_filter( id, build_key: build_key.clone(), probe_targets, + probe_table_rows, build_table_rows, enable_bloom_runtime_filter, enable_inlist_runtime_filter: true, @@ -170,12 +178,12 @@ pub async fn build_runtime_filter( Ok(PhysicalRuntimeFilters { filters }) } -async fn get_build_table_rows( +async fn get_table_rows( ctx: Arc, metadata: &MetadataRef, - build_table_index: Option, + table_index: Option, ) -> Result> { - if let Some(table_index) = build_table_index { + if let Some(table_index) = table_index { let table = { let metadata_read = metadata.read(); metadata_read.table(table_index).table().clone() @@ -194,7 +202,7 @@ fn find_probe_targets( probe_key: &RemoteExpr, probe_scan_id: usize, probe_key_col_idx: IndexType, -) -> Result, usize)>> { +) -> Result> { let mut uf = UnionFind::new(); let mut column_to_remote: HashMap, usize)> = HashMap::new(); column_to_remote.insert(probe_key_col_idx, (probe_key.clone(), probe_scan_id)); @@ -217,9 +225,11 @@ fn find_probe_targets( let equiv_class = uf.get_equivalence_class(probe_key_col_idx); let mut result = Vec::new(); + let metadata_read = metadata.read(); for idx in equiv_class { if let Some((remote_expr, scan_id)) = column_to_remote.get(&idx) { - result.push((remote_expr.clone(), *scan_id)); + let table_index = metadata_read.column(idx).table_index(); + result.push((remote_expr.clone(), *scan_id, table_index)); } } diff --git a/src/query/service/src/physical_plans/runtime_filter/types.rs b/src/query/service/src/physical_plans/runtime_filter/types.rs index 11a7a9992f5c6..835868f5723be 100644 --- a/src/query/service/src/physical_plans/runtime_filter/types.rs +++ b/src/query/service/src/physical_plans/runtime_filter/types.rs @@ -42,6 +42,8 @@ pub struct PhysicalRuntimeFilter { /// All probe targets in this list are in the same equivalence class pub probe_targets: Vec<(RemoteExpr, usize)>, + pub probe_table_rows: Vec>, + pub build_table_rows: Option, /// Enable bloom filter for this runtime filter diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/desc.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/desc.rs index ae2a25d06733b..1848878ce15bc 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/desc.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/desc.rs @@ -70,6 +70,7 @@ pub struct RuntimeFilterDesc { pub id: usize, pub build_key: Expr, pub probe_targets: Vec<(Expr, usize)>, + pub probe_table_rows: Vec>, pub build_table_rows: Option, pub enable_bloom_runtime_filter: bool, pub enable_inlist_runtime_filter: bool, @@ -98,6 +99,7 @@ impl From<&PhysicalRuntimeFilter> for RuntimeFilterDesc { .iter() .map(|(probe_key, scan_id)| (probe_key.as_expr(&BUILTIN_FUNCTIONS), *scan_id)) .collect(), + probe_table_rows: runtime_filter.probe_table_rows.clone(), build_table_rows: runtime_filter.build_table_rows, enable_bloom_runtime_filter: runtime_filter.enable_bloom_runtime_filter, enable_inlist_runtime_filter: runtime_filter.enable_inlist_runtime_filter, diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_build_state.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_build_state.rs index c41364a50856e..d6e50bb9ff159 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_build_state.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_build_state.rs @@ -60,6 +60,7 @@ use parking_lot::RwLock; use super::concat_buffer::ConcatBuffer; use super::desc::RuntimeFilterDesc; use super::runtime_filter::JoinRuntimeFilterPacket; +use super::runtime_filter::RuntimeFilterBuildLimit; use crate::pipelines::memory_settings::MemorySettingsExt; use crate::pipelines::processors::transforms::hash_join::common::wrap_true_validity; use crate::pipelines::processors::transforms::hash_join::desc::MARKER_KIND_FALSE; @@ -106,6 +107,7 @@ pub struct HashJoinBuildState { pub(crate) broadcast_id: Option, pub(crate) is_runtime_filter_added: AtomicBool, runtime_filter_packets: Mutex>, + runtime_filter_build_limit: Mutex>>, } impl HashJoinBuildState { @@ -157,6 +159,7 @@ impl HashJoinBuildState { broadcast_id, is_runtime_filter_added: AtomicBool::new(false), runtime_filter_packets: Mutex::new(Vec::new()), + runtime_filter_build_limit: Mutex::new(None), })) } @@ -878,6 +881,24 @@ impl HashJoinBuildState { &self.hash_join_state.hash_join_desc.runtime_filter.filters } + pub fn runtime_filter_build_limit( + &self, + selectivity_threshold: u64, + probe_ratio_threshold: f64, + ) -> Arc { + let mut guard = self.runtime_filter_build_limit.lock(); + if let Some(limit) = guard.as_ref() { + return limit.clone(); + } + let limit = Arc::new(RuntimeFilterBuildLimit::from_descs( + self.runtime_filter_desc(), + selectivity_threshold, + probe_ratio_threshold, + )); + *guard = Some(limit.clone()); + limit + } + pub fn add_runtime_filter_packet(&self, packet: JoinRuntimeFilterPacket) { self.runtime_filter_packets.lock().push(packet); } diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/builder.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/builder.rs index 9af1bec4b542b..480aac9429cbd 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/builder.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/builder.rs @@ -42,6 +42,7 @@ struct JoinRuntimeFilterPacketBuilder<'a> { bloom_threshold: usize, min_max_threshold: usize, selectivity_threshold: u64, + probe_ratio_threshold: f64, } impl<'a> JoinRuntimeFilterPacketBuilder<'a> { @@ -53,6 +54,7 @@ impl<'a> JoinRuntimeFilterPacketBuilder<'a> { bloom_threshold: usize, min_max_threshold: usize, selectivity_threshold: u64, + probe_ratio_threshold: f64, ) -> Result { let build_key_column = Self::eval_build_key_column(data_blocks, func_ctx, build_key)?; Ok(Self { @@ -62,6 +64,7 @@ impl<'a> JoinRuntimeFilterPacketBuilder<'a> { bloom_threshold, min_max_threshold, selectivity_threshold, + probe_ratio_threshold, }) } fn build(&self, desc: &RuntimeFilterDesc) -> Result { @@ -69,6 +72,7 @@ impl<'a> JoinRuntimeFilterPacketBuilder<'a> { desc, self.build_key_column.len(), self.selectivity_threshold, + self.probe_ratio_threshold, ) { return Ok(RuntimeFilterPacket { id: desc.id, @@ -201,11 +205,37 @@ pub(super) fn should_enable_runtime_filter( desc: &RuntimeFilterDesc, build_num_rows: usize, selectivity_threshold: u64, + probe_ratio_threshold: f64, ) -> bool { if build_num_rows == 0 { return false; } + let mut max_probe_ratio = 0.0; + for rows in desc.probe_table_rows.iter().flatten() { + if *rows == 0 { + continue; + } + let ratio = *rows as f64 / build_num_rows as f64; + if ratio > max_probe_ratio { + max_probe_ratio = ratio; + } + if ratio > probe_ratio_threshold { + break; + } + } + if max_probe_ratio <= probe_ratio_threshold { + log::info!( + "RUNTIME-FILTER: Disable runtime filter {} - insufficient probe/build ratio: {:.2} <= {:.2} (build_rows={}, probe_table_rows={:?})", + desc.id, + max_probe_ratio, + probe_ratio_threshold, + build_num_rows, + desc.probe_table_rows + ); + return false; + } + let Some(build_table_rows) = desc.build_table_rows else { log::info!( "RUNTIME-FILTER: Disable runtime filter {} - no build table statistics available", @@ -248,6 +278,7 @@ pub fn build_runtime_filter_packet( bloom_threshold: usize, min_max_threshold: usize, selectivity_threshold: u64, + probe_ratio_threshold: f64, is_spill_happened: bool, ) -> Result { if is_spill_happened { @@ -274,6 +305,7 @@ pub fn build_runtime_filter_packet( bloom_threshold, min_max_threshold, selectivity_threshold, + probe_ratio_threshold, )? .build(rf)?, ); diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/convert.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/convert.rs index b6c2e5f33cfd6..a5c723137ba80 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/convert.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/convert.rs @@ -52,6 +52,7 @@ pub async fn build_runtime_filter_infos( packet: JoinRuntimeFilterPacket, runtime_filter_descs: HashMap, selectivity_threshold: u64, + probe_ratio_threshold: f64, max_threads: usize, ) -> Result> { let total_build_rows = packet.build_rows; @@ -63,7 +64,12 @@ pub async fn build_runtime_filter_infos( // Iterate over all runtime filter packets for packet in packets.into_values() { let desc = runtime_filter_descs.get(&packet.id).unwrap(); - let enabled = should_enable_runtime_filter(desc, total_build_rows, selectivity_threshold); + let enabled = should_enable_runtime_filter( + desc, + total_build_rows, + selectivity_threshold, + probe_ratio_threshold, + ); // Apply this single runtime filter to all probe targets (scan_id, probe_key pairs) // This implements the design goal: "one runtime filter built once, pushed down to multiple scans" diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/interface.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/interface.rs index d0f1fb003fb1e..979beff43a5f9 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/interface.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/interface.rs @@ -47,12 +47,17 @@ pub async fn build_and_push_down_runtime_filter( .ctx .get_settings() .get_join_runtime_filter_selectivity_threshold()?; + let probe_ratio_threshold = join + .ctx + .get_settings() + .get_join_runtime_filter_probe_ratio_threshold()? as f64; let max_threads = join.ctx.get_settings().get_max_threads()? as usize; let build_rows = packet.build_rows; let runtime_filter_infos = build_runtime_filter_infos( packet, runtime_filter_descs, selectivity_threshold, + probe_ratio_threshold, max_threads, ) .await?; diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/limit.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/limit.rs new file mode 100644 index 0000000000000..796345f883c9f --- /dev/null +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/limit.rs @@ -0,0 +1,138 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::atomic::AtomicBool; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering; + +use crate::pipelines::processors::transforms::hash_join::desc::RuntimeFilterDesc; + +#[derive(Debug)] +pub struct RuntimeFilterBuildLimit { + limit: Option, + total_rows: AtomicUsize, + disabled: AtomicBool, +} + +impl RuntimeFilterBuildLimit { + pub fn new(limit: Option) -> Self { + let disabled = limit.is_some_and(|value| value == 0); + Self { + limit, + total_rows: AtomicUsize::new(0), + disabled: AtomicBool::new(disabled), + } + } + + pub fn from_descs( + descs: &[RuntimeFilterDesc], + selectivity_threshold: u64, + probe_ratio_threshold: f64, + ) -> Self { + let limit = compute_max_build_rows(descs, selectivity_threshold, probe_ratio_threshold); + Self::new(limit) + } + + pub fn try_add_rows(&self, rows: usize) -> bool { + if self.disabled.load(Ordering::Relaxed) { + return false; + } + let Some(limit) = self.limit else { + self.total_rows.fetch_add(rows, Ordering::Relaxed); + return true; + }; + + loop { + let current = self.total_rows.load(Ordering::Relaxed); + if current >= limit { + self.disabled.store(true, Ordering::Relaxed); + return false; + } + if current.saturating_add(rows) > limit { + self.disabled.store(true, Ordering::Relaxed); + return false; + } + let new_total = current + rows; + if self + .total_rows + .compare_exchange(current, new_total, Ordering::Relaxed, Ordering::Relaxed) + .is_ok() + { + if new_total >= limit { + self.disabled.store(true, Ordering::Relaxed); + } + return true; + } + } + } + + pub fn is_disabled(&self) -> bool { + self.disabled.load(Ordering::Relaxed) + } +} + +fn compute_max_build_rows( + descs: &[RuntimeFilterDesc], + selectivity_threshold: u64, + probe_ratio_threshold: f64, +) -> Option { + descs + .iter() + .filter_map(|desc| { + let selectivity_limit = selectivity_limit(desc, selectivity_threshold); + let ratio_limit = probe_ratio_limit(desc, probe_ratio_threshold); + match (selectivity_limit, ratio_limit) { + (Some(a), Some(b)) => Some(a.min(b)), + (Some(a), None) => Some(a), + (None, Some(b)) => Some(b), + (None, None) => None, + } + }) + .min() +} + +fn selectivity_limit(desc: &RuntimeFilterDesc, selectivity_threshold: u64) -> Option { + let build_table_rows = desc.build_table_rows?; + if selectivity_threshold == 0 { + return Some(0); + } + let numerator = (selectivity_threshold as u128) + .saturating_mul(build_table_rows as u128) + .saturating_sub(1); + let limit = numerator / 100; + Some(limit.min(usize::MAX as u128) as usize) +} + +fn probe_ratio_limit(desc: &RuntimeFilterDesc, probe_ratio_threshold: f64) -> Option { + if probe_ratio_threshold <= 0.0 { + return None; + } + let ratio = probe_ratio_threshold.floor() as u128; + if ratio == 0 { + return None; + } + let mut min_limit: Option = None; + for probe_rows in desc.probe_table_rows.iter().flatten() { + if *probe_rows == 0 { + return Some(0); + } + let numerator = (*probe_rows as u128).saturating_sub(1); + let limit = (numerator / ratio).min(usize::MAX as u128) as usize; + min_limit = Some(match min_limit { + Some(current) => current.min(limit), + None => limit, + }); + } + min_limit +} diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/local_builder.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/local_builder.rs index e91b51ce1ffcf..62e45815eca1e 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/local_builder.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/local_builder.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + use databend_common_exception::Result; use databend_common_expression::types::DataType; use databend_common_expression::BlockEntry; @@ -26,6 +28,7 @@ use databend_common_expression::RawExpr; use databend_common_expression::Scalar; use databend_common_functions::BUILTIN_FUNCTIONS; +use super::RuntimeFilterBuildLimit; use crate::pipelines::processors::transforms::hash_join::desc::RuntimeFilterDesc; use crate::pipelines::processors::transforms::hash_join::runtime_filter::packet::JoinRuntimeFilterPacket; use crate::pipelines::processors::transforms::hash_join::runtime_filter::packet::RuntimeFilterPacket; @@ -169,6 +172,8 @@ pub struct RuntimeFilterLocalBuilder { builders: Vec, total_rows: usize, runtime_filters: Vec, + build_limit: Arc, + disabled: bool, } impl RuntimeFilterLocalBuilder { @@ -178,6 +183,7 @@ impl RuntimeFilterLocalBuilder { inlist_threshold: usize, bloom_threshold: usize, min_max_threshold: usize, + build_limit: Arc, ) -> Result> { if descs.is_empty() { return Ok(None); @@ -198,14 +204,26 @@ impl RuntimeFilterLocalBuilder { builders, total_rows: 0, runtime_filters: descs, + build_limit, + disabled: false, })) } pub fn add_block(&mut self, block: &DataBlock) -> Result<()> { + if self.disabled || self.build_limit.is_disabled() { + self.disabled = true; + return Ok(()); + } + if block.is_empty() { return Ok(()); } + if !self.build_limit.try_add_rows(block.num_rows()) { + self.disabled = true; + return Ok(()); + } + let evaluator = Evaluator::new(block, &self.func_ctx, &BUILTIN_FUNCTIONS); for (builder, desc) in self.builders.iter_mut().zip(self.runtime_filters.iter()) { @@ -220,11 +238,18 @@ impl RuntimeFilterLocalBuilder { } pub fn finish(self, spill_happened: bool) -> Result { - let total_rows = self.total_rows; + let RuntimeFilterLocalBuilder { + func_ctx, + builders, + total_rows, + runtime_filters, + build_limit, + disabled, + } = self; - if spill_happened { + if spill_happened || disabled || build_limit.is_disabled() { return Ok(JoinRuntimeFilterPacket::disable_all( - &self.runtime_filters, + &runtime_filters, total_rows, )); } @@ -236,14 +261,10 @@ impl RuntimeFilterLocalBuilder { }); } - let packets: Vec<_> = self - .builders - .into_iter() - .map(|b| { - let id = b.id; - b.finish(&self.func_ctx).map(|p| (id, p)) - }) - .collect::>()?; + let mut packets = Vec::with_capacity(builders.len()); + for (builder, desc) in builders.into_iter().zip(runtime_filters.into_iter()) { + packets.push((desc.id, builder.finish(&func_ctx)?)); + } Ok(JoinRuntimeFilterPacket { packets: Some(packets.into_iter().collect()), diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/mod.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/mod.rs index 618512a3f5f79..b5425a4f0c2a0 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/mod.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/mod.rs @@ -16,6 +16,7 @@ mod builder; mod convert; mod global; mod interface; +mod limit; mod local_builder; mod merge; mod packet; @@ -24,6 +25,7 @@ pub use builder::build_runtime_filter_packet; pub use convert::build_runtime_filter_infos; pub use global::get_global_runtime_filter_packet; pub use interface::build_and_push_down_runtime_filter; +pub use limit::RuntimeFilterBuildLimit; pub use local_builder::RuntimeFilterLocalBuilder; pub use merge::merge_join_runtime_filter_packets; pub use packet::JoinRuntimeFilterPacket; diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/transform_hash_join_build.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/transform_hash_join_build.rs index 2b32d4b47e179..71b3b061f43e6 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/transform_hash_join_build.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/transform_hash_join_build.rs @@ -131,12 +131,18 @@ impl TransformHashJoinBuild { let runtime_filter_builder = { let descs = build_state.runtime_filter_desc().to_vec(); let settings = build_state.ctx.get_settings(); + let selectivity_threshold = settings.get_join_runtime_filter_selectivity_threshold()?; + let probe_ratio_threshold = + settings.get_join_runtime_filter_probe_ratio_threshold()? as f64; + let build_limit = build_state + .runtime_filter_build_limit(selectivity_threshold, probe_ratio_threshold); RuntimeFilterLocalBuilder::try_create( &build_state.func_ctx, descs, settings.get_inlist_runtime_filter_threshold()? as usize, settings.get_bloom_runtime_filter_threshold()? as usize, settings.get_min_max_runtime_filter_threshold()? as usize, + build_limit, )? }; diff --git a/src/query/service/src/pipelines/processors/transforms/new_hash_join/runtime_filter.rs b/src/query/service/src/pipelines/processors/transforms/new_hash_join/runtime_filter.rs index 4757d0510a7d9..433fb0adba538 100644 --- a/src/query/service/src/pipelines/processors/transforms/new_hash_join/runtime_filter.rs +++ b/src/query/service/src/pipelines/processors/transforms/new_hash_join/runtime_filter.rs @@ -24,6 +24,7 @@ use crate::physical_plans::HashJoin; use crate::pipelines::processors::transforms::build_runtime_filter_infos; use crate::pipelines::processors::transforms::get_global_runtime_filter_packet; use crate::pipelines::processors::transforms::JoinRuntimeFilterPacket; +use crate::pipelines::processors::transforms::RuntimeFilterBuildLimit; use crate::pipelines::processors::transforms::RuntimeFilterDesc; use crate::sessions::QueryContext; @@ -35,6 +36,8 @@ pub struct RuntimeFiltersDesc { pub inlist_threshold: usize, pub min_max_threshold: usize, pub selectivity_threshold: u64, + pub probe_ratio_threshold: f64, + pub build_limit: Arc, broadcast_id: Option, pub filters_desc: Vec, @@ -48,6 +51,8 @@ impl RuntimeFiltersDesc { let inlist_threshold = settings.get_inlist_runtime_filter_threshold()? as usize; let min_max_threshold = settings.get_min_max_runtime_filter_threshold()? as usize; let selectivity_threshold = settings.get_join_runtime_filter_selectivity_threshold()?; + let probe_ratio_threshold = + settings.get_join_runtime_filter_probe_ratio_threshold()? as f64; let func_ctx = ctx.get_function_context()?; let mut filters_desc = Vec::with_capacity(join.runtime_filter.filters.len()); @@ -67,6 +72,12 @@ impl RuntimeFiltersDesc { filters_desc.push(filter_desc); } + let build_limit = Arc::new(RuntimeFilterBuildLimit::from_descs( + &filters_desc, + selectivity_threshold, + probe_ratio_threshold, + )); + Ok(Arc::new(RuntimeFiltersDesc { func_ctx, filters_desc, @@ -74,6 +85,8 @@ impl RuntimeFiltersDesc { inlist_threshold, min_max_threshold, selectivity_threshold, + probe_ratio_threshold, + build_limit, runtime_filters_ready, ctx: ctx.clone(), broadcast_id: join.broadcast_id, @@ -90,6 +103,7 @@ impl RuntimeFiltersDesc { packet, runtime_filter_descs, self.selectivity_threshold, + self.probe_ratio_threshold, self.ctx.get_settings().get_max_threads()? as usize, ) .await?; diff --git a/src/query/service/src/pipelines/processors/transforms/new_hash_join/transform_hash_join.rs b/src/query/service/src/pipelines/processors/transforms/new_hash_join/transform_hash_join.rs index 68fbaf0fd8ade..105b39b12bf7a 100644 --- a/src/query/service/src/pipelines/processors/transforms/new_hash_join/transform_hash_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/new_hash_join/transform_hash_join.rs @@ -62,6 +62,7 @@ impl TransformHashJoin { rf_desc.inlist_threshold, rf_desc.bloom_threshold, rf_desc.min_max_threshold, + rf_desc.build_limit.clone(), )?; Ok(ProcessorPtr::create(Box::new(TransformHashJoin { diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index aeaece5d38c0a..3f8f436d59666 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -516,6 +516,13 @@ impl DefaultSettings { scope: SettingScope::Both, range: Some(SettingRange::Numeric(1..=u64::MAX)), }), + ("join_runtime_filter_probe_ratio_threshold", DefaultSettingValue { + value: UserSettingValue::UInt64(3), + desc: "Probe/build rows ratio threshold for join runtime filters. Filters are enabled only when probe_rows / build_rows > threshold for any probe target.", + mode: SettingMode::Both, + scope: SettingScope::Both, + range: Some(SettingRange::Numeric(1..=u64::MAX)), + }), ("max_execute_time_in_seconds", DefaultSettingValue { value: UserSettingValue::UInt64(0), desc: "Sets the maximum query execution time in seconds. Setting it to 0 means no limit.", diff --git a/src/query/settings/src/settings_getter_setter.rs b/src/query/settings/src/settings_getter_setter.rs index 6057058270e25..cfd8417a69fca 100644 --- a/src/query/settings/src/settings_getter_setter.rs +++ b/src/query/settings/src/settings_getter_setter.rs @@ -392,6 +392,10 @@ impl Settings { self.try_get_u64("join_runtime_filter_selectivity_threshold") } + pub fn get_join_runtime_filter_probe_ratio_threshold(&self) -> Result { + self.try_get_u64("join_runtime_filter_probe_ratio_threshold") + } + pub fn get_prefer_broadcast_join(&self) -> Result { Ok(self.try_get_u64("prefer_broadcast_join")? != 0) } diff --git a/src/query/storages/fuse/src/operations/read/fuse_source.rs b/src/query/storages/fuse/src/operations/read/fuse_source.rs index a6a594fb7cb2a..770a1697a99d3 100644 --- a/src/query/storages/fuse/src/operations/read/fuse_source.rs +++ b/src/query/storages/fuse/src/operations/read/fuse_source.rs @@ -167,14 +167,14 @@ pub fn build_fuse_parquet_source_pipeline( } let unfinished_processors_count = Arc::new(AtomicU64::new(pipeline.output_len() as u64)); - pipeline.add_transform(|input, output| { - Ok(TransformRuntimeFilterWait::create( - ctx.clone(), - plan.scan_id, - input, - output, - )) - })?; + // pipeline.add_transform(|input, output| { + // Ok(TransformRuntimeFilterWait::create( + // ctx.clone(), + // plan.scan_id, + // input, + // output, + // )) + // })?; pipeline.add_transform(|input, output| { ReadParquetDataTransform::create(