Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/query/catalog/src/sbbf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ pub struct Sbbf(Vec<Block>);
pub struct SbbfAtomic(Vec<BlockAtomic>);

pub(crate) const BITSET_MIN_LENGTH: usize = 32;
pub(crate) const BITSET_MAX_LENGTH: usize = 128 * 1024 * 1024;
pub(crate) const BITSET_MAX_LENGTH: usize = 512 * 1024 * 1024;

#[inline]
fn hash_to_block_index_for_blocks(hash: u64, num_blocks: usize) -> usize {
Expand Down
26 changes: 18 additions & 8 deletions src/query/service/src/physical_plans/runtime_filter/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use super::types::PhysicalRuntimeFilters;
/// Type alias for probe keys with runtime filter information
/// Contains: (RemoteExpr, scan_id, table_index, column_idx)
type ProbeKeysWithRuntimeFilter = Vec<Option<(RemoteExpr<String>, usize, usize, IndexType)>>;
type ProbeTargetWithTable = (RemoteExpr<String>, usize, Option<IndexType>);

/// Check if a data type is supported for bloom filter
///
Expand Down Expand Up @@ -138,11 +139,17 @@ pub async fn build_runtime_filter(
_ => continue,
}

let probe_targets =
let probe_targets_with_tables =
find_probe_targets(metadata, probe_side, &probe_key, scan_id, column_idx)?;
let mut probe_targets = Vec::with_capacity(probe_targets_with_tables.len());
let mut probe_table_rows = Vec::with_capacity(probe_targets_with_tables.len());
for (probe_expr, probe_scan_id, table_index) in probe_targets_with_tables {
probe_targets.push((probe_expr, probe_scan_id));
let table_rows = get_table_rows(ctx.clone(), metadata, table_index).await?;
probe_table_rows.push(table_rows);
}

let build_table_rows =
get_build_table_rows(ctx.clone(), metadata, build_table_index).await?;
let build_table_rows = get_table_rows(ctx.clone(), metadata, build_table_index).await?;

let data_type = build_key
.as_expr(&BUILTIN_FUNCTIONS)
Expand All @@ -159,6 +166,7 @@ pub async fn build_runtime_filter(
id,
build_key: build_key.clone(),
probe_targets,
probe_table_rows,
build_table_rows,
enable_bloom_runtime_filter,
enable_inlist_runtime_filter: true,
Expand All @@ -170,12 +178,12 @@ pub async fn build_runtime_filter(
Ok(PhysicalRuntimeFilters { filters })
}

async fn get_build_table_rows(
async fn get_table_rows(
ctx: Arc<dyn TableContext>,
metadata: &MetadataRef,
build_table_index: Option<IndexType>,
table_index: Option<IndexType>,
) -> Result<Option<u64>> {
if let Some(table_index) = build_table_index {
if let Some(table_index) = table_index {
let table = {
let metadata_read = metadata.read();
metadata_read.table(table_index).table().clone()
Expand All @@ -194,7 +202,7 @@ fn find_probe_targets(
probe_key: &RemoteExpr<String>,
probe_scan_id: usize,
probe_key_col_idx: IndexType,
) -> Result<Vec<(RemoteExpr<String>, usize)>> {
) -> Result<Vec<ProbeTargetWithTable>> {
let mut uf = UnionFind::new();
let mut column_to_remote: HashMap<IndexType, (RemoteExpr<String>, usize)> = HashMap::new();
column_to_remote.insert(probe_key_col_idx, (probe_key.clone(), probe_scan_id));
Expand All @@ -217,9 +225,11 @@ fn find_probe_targets(
let equiv_class = uf.get_equivalence_class(probe_key_col_idx);

let mut result = Vec::new();
let metadata_read = metadata.read();
for idx in equiv_class {
if let Some((remote_expr, scan_id)) = column_to_remote.get(&idx) {
result.push((remote_expr.clone(), *scan_id));
let table_index = metadata_read.column(idx).table_index();
result.push((remote_expr.clone(), *scan_id, table_index));
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/query/service/src/physical_plans/runtime_filter/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ pub struct PhysicalRuntimeFilter {
/// All probe targets in this list are in the same equivalence class
pub probe_targets: Vec<(RemoteExpr<String>, usize)>,

pub probe_table_rows: Vec<Option<u64>>,

pub build_table_rows: Option<u64>,

/// Enable bloom filter for this runtime filter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ pub struct RuntimeFilterDesc {
pub id: usize,
pub build_key: Expr,
pub probe_targets: Vec<(Expr<String>, usize)>,
pub probe_table_rows: Vec<Option<u64>>,
pub build_table_rows: Option<u64>,
pub enable_bloom_runtime_filter: bool,
pub enable_inlist_runtime_filter: bool,
Expand Down Expand Up @@ -98,6 +99,7 @@ impl From<&PhysicalRuntimeFilter> for RuntimeFilterDesc {
.iter()
.map(|(probe_key, scan_id)| (probe_key.as_expr(&BUILTIN_FUNCTIONS), *scan_id))
.collect(),
probe_table_rows: runtime_filter.probe_table_rows.clone(),
build_table_rows: runtime_filter.build_table_rows,
enable_bloom_runtime_filter: runtime_filter.enable_bloom_runtime_filter,
enable_inlist_runtime_filter: runtime_filter.enable_inlist_runtime_filter,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ use parking_lot::RwLock;
use super::concat_buffer::ConcatBuffer;
use super::desc::RuntimeFilterDesc;
use super::runtime_filter::JoinRuntimeFilterPacket;
use super::runtime_filter::RuntimeFilterBuildLimit;
use crate::pipelines::memory_settings::MemorySettingsExt;
use crate::pipelines::processors::transforms::hash_join::common::wrap_true_validity;
use crate::pipelines::processors::transforms::hash_join::desc::MARKER_KIND_FALSE;
Expand Down Expand Up @@ -106,6 +107,7 @@ pub struct HashJoinBuildState {
pub(crate) broadcast_id: Option<u32>,
pub(crate) is_runtime_filter_added: AtomicBool,
runtime_filter_packets: Mutex<Vec<JoinRuntimeFilterPacket>>,
runtime_filter_build_limit: Mutex<Option<Arc<RuntimeFilterBuildLimit>>>,
}

impl HashJoinBuildState {
Expand Down Expand Up @@ -157,6 +159,7 @@ impl HashJoinBuildState {
broadcast_id,
is_runtime_filter_added: AtomicBool::new(false),
runtime_filter_packets: Mutex::new(Vec::new()),
runtime_filter_build_limit: Mutex::new(None),
}))
}

Expand Down Expand Up @@ -878,6 +881,24 @@ impl HashJoinBuildState {
&self.hash_join_state.hash_join_desc.runtime_filter.filters
}

pub fn runtime_filter_build_limit(
&self,
selectivity_threshold: u64,
probe_ratio_threshold: f64,
) -> Arc<RuntimeFilterBuildLimit> {
let mut guard = self.runtime_filter_build_limit.lock();
if let Some(limit) = guard.as_ref() {
return limit.clone();
}
let limit = Arc::new(RuntimeFilterBuildLimit::from_descs(
self.runtime_filter_desc(),
selectivity_threshold,
probe_ratio_threshold,
));
*guard = Some(limit.clone());
limit
}

pub fn add_runtime_filter_packet(&self, packet: JoinRuntimeFilterPacket) {
self.runtime_filter_packets.lock().push(packet);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ struct JoinRuntimeFilterPacketBuilder<'a> {
bloom_threshold: usize,
min_max_threshold: usize,
selectivity_threshold: u64,
probe_ratio_threshold: f64,
}

impl<'a> JoinRuntimeFilterPacketBuilder<'a> {
Expand All @@ -53,6 +54,7 @@ impl<'a> JoinRuntimeFilterPacketBuilder<'a> {
bloom_threshold: usize,
min_max_threshold: usize,
selectivity_threshold: u64,
probe_ratio_threshold: f64,
) -> Result<Self> {
let build_key_column = Self::eval_build_key_column(data_blocks, func_ctx, build_key)?;
Ok(Self {
Expand All @@ -62,13 +64,15 @@ impl<'a> JoinRuntimeFilterPacketBuilder<'a> {
bloom_threshold,
min_max_threshold,
selectivity_threshold,
probe_ratio_threshold,
})
}
fn build(&self, desc: &RuntimeFilterDesc) -> Result<RuntimeFilterPacket> {
if !should_enable_runtime_filter(
desc,
self.build_key_column.len(),
self.selectivity_threshold,
self.probe_ratio_threshold,
) {
return Ok(RuntimeFilterPacket {
id: desc.id,
Expand Down Expand Up @@ -201,11 +205,37 @@ pub(super) fn should_enable_runtime_filter(
desc: &RuntimeFilterDesc,
build_num_rows: usize,
selectivity_threshold: u64,
probe_ratio_threshold: f64,
) -> bool {
if build_num_rows == 0 {
return false;
}

let mut max_probe_ratio = 0.0;
for rows in desc.probe_table_rows.iter().flatten() {
if *rows == 0 {
continue;
}
let ratio = *rows as f64 / build_num_rows as f64;
if ratio > max_probe_ratio {
max_probe_ratio = ratio;
}
if ratio > probe_ratio_threshold {
break;
}
}
if max_probe_ratio <= probe_ratio_threshold {
log::info!(
"RUNTIME-FILTER: Disable runtime filter {} - insufficient probe/build ratio: {:.2} <= {:.2} (build_rows={}, probe_table_rows={:?})",
desc.id,
max_probe_ratio,
probe_ratio_threshold,
build_num_rows,
desc.probe_table_rows
);
return false;
}

let Some(build_table_rows) = desc.build_table_rows else {
log::info!(
"RUNTIME-FILTER: Disable runtime filter {} - no build table statistics available",
Expand Down Expand Up @@ -248,6 +278,7 @@ pub fn build_runtime_filter_packet(
bloom_threshold: usize,
min_max_threshold: usize,
selectivity_threshold: u64,
probe_ratio_threshold: f64,
is_spill_happened: bool,
) -> Result<JoinRuntimeFilterPacket> {
if is_spill_happened {
Expand All @@ -274,6 +305,7 @@ pub fn build_runtime_filter_packet(
bloom_threshold,
min_max_threshold,
selectivity_threshold,
probe_ratio_threshold,
)?
.build(rf)?,
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ pub async fn build_runtime_filter_infos(
packet: JoinRuntimeFilterPacket,
runtime_filter_descs: HashMap<usize, &RuntimeFilterDesc>,
selectivity_threshold: u64,
probe_ratio_threshold: f64,
max_threads: usize,
) -> Result<HashMap<usize, RuntimeFilterInfo>> {
let total_build_rows = packet.build_rows;
Expand All @@ -63,7 +64,12 @@ pub async fn build_runtime_filter_infos(
// Iterate over all runtime filter packets
for packet in packets.into_values() {
let desc = runtime_filter_descs.get(&packet.id).unwrap();
let enabled = should_enable_runtime_filter(desc, total_build_rows, selectivity_threshold);
let enabled = should_enable_runtime_filter(
desc,
total_build_rows,
selectivity_threshold,
probe_ratio_threshold,
);

// Apply this single runtime filter to all probe targets (scan_id, probe_key pairs)
// This implements the design goal: "one runtime filter built once, pushed down to multiple scans"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,17 @@ pub async fn build_and_push_down_runtime_filter(
.ctx
.get_settings()
.get_join_runtime_filter_selectivity_threshold()?;
let probe_ratio_threshold = join
.ctx
.get_settings()
.get_join_runtime_filter_probe_ratio_threshold()? as f64;
let max_threads = join.ctx.get_settings().get_max_threads()? as usize;
let build_rows = packet.build_rows;
let runtime_filter_infos = build_runtime_filter_infos(
packet,
runtime_filter_descs,
selectivity_threshold,
probe_ratio_threshold,
max_threads,
)
.await?;
Expand Down
Loading
Loading