Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 48 additions & 2 deletions src/query/catalog/src/sbbf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ impl Sbbf {

/// Create a new [Sbbf] with given number of bytes, the exact number of bytes will be adjusted
/// to the next power of two bounded by [BITSET_MIN_LENGTH] and [BITSET_MAX_LENGTH].
pub(crate) fn new_with_num_of_bytes(num_bytes: usize) -> Self {
pub fn new_with_num_of_bytes(num_bytes: usize) -> Self {
let num_bytes = optimal_num_of_bytes(num_bytes);
assert_eq!(num_bytes % size_of::<Block>(), 0);
let num_blocks = num_bytes / size_of::<Block>();
Expand Down Expand Up @@ -307,6 +307,52 @@ impl Sbbf {
pub fn estimated_memory_size(&self) -> usize {
self.0.capacity() * std::mem::size_of::<Block>()
}

/// Serialize the bloom filter into a little-endian byte array.
/// The layout is a contiguous sequence of blocks, each block consisting
/// of 8 u32 values in little-endian order.
pub fn to_bytes(&self) -> Vec<u8> {
let mut bytes = Vec::with_capacity(self.0.len() * size_of::<Block>());
for block in &self.0 {
for value in block.0 {
bytes.extend_from_slice(&value.to_le_bytes());
}
}
bytes
}

/// Deserialize a bloom filter from bytes produced by `to_bytes`.
pub fn from_bytes(bytes: &[u8]) -> Result<Self, String> {
if bytes.len() % size_of::<Block>() != 0 {
return Err(format!(
"Invalid bloom filter bytes length {}, expected multiple of {}",
bytes.len(),
size_of::<Block>()
));
}

let num_blocks = bytes.len() / size_of::<Block>();
if num_blocks == 0 {
return Ok(Sbbf(Vec::new()));
}

let mut blocks = Vec::with_capacity(num_blocks);
let mut offset = 0;
for _ in 0..num_blocks {
let mut arr = [0u32; 8];
for value in &mut arr {
let end = offset + size_of::<u32>();
let chunk = bytes
.get(offset..end)
.ok_or_else(|| "Invalid bloom filter bytes".to_string())?;
*value = u32::from_le_bytes(chunk.try_into().unwrap());
offset = end;
}
blocks.push(Block(arr));
}

Ok(Sbbf(blocks))
}
}

impl SbbfAtomic {
Expand All @@ -320,7 +366,7 @@ impl SbbfAtomic {
Ok(Self::new_with_num_of_bytes(num_bits / 8))
}

pub(crate) fn new_with_num_of_bytes(num_bytes: usize) -> Self {
pub fn new_with_num_of_bytes(num_bytes: usize) -> Self {
let num_bytes = optimal_num_of_bytes(num_bytes);
assert_eq!(size_of::<BlockAtomic>(), size_of::<Block>());
assert_eq!(num_bytes % size_of::<BlockAtomic>(), 0);
Expand Down
40 changes: 36 additions & 4 deletions src/query/service/src/physical_plans/runtime_filter/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ use std::sync::Arc;
use databend_common_catalog::table_context::TableContext;
use databend_common_exception::Result;
use databend_common_expression::types::DataType;
use databend_common_expression::Expr;
use databend_common_expression::RemoteExpr;
use databend_common_functions::BUILTIN_FUNCTIONS;
use databend_common_sql::optimizer::ir::ColumnStatSet;
use databend_common_sql::optimizer::ir::SExpr;
use databend_common_sql::plans::Exchange;
use databend_common_sql::plans::Join;
Expand Down Expand Up @@ -113,6 +115,11 @@ pub async fn build_runtime_filter(

let mut filters = Vec::new();

// Derive statistics for the build side to estimate NDV of join keys.
let build_rel_expr = databend_common_sql::optimizer::ir::RelExpr::with_s_expr(build_side);
let build_stat_info = build_rel_expr.derive_cardinality()?;
let build_column_stats = &build_stat_info.statistics.column_stats;

let probe_side = s_expr.probe_side_child();

// Process each probe key that has runtime filter information
Expand Down Expand Up @@ -144,10 +151,17 @@ pub async fn build_runtime_filter(
let build_table_rows =
get_build_table_rows(ctx.clone(), metadata, build_table_index).await?;

let data_type = build_key
.as_expr(&BUILTIN_FUNCTIONS)
.data_type()
.remove_nullable();
let build_key_expr = build_key.as_expr(&BUILTIN_FUNCTIONS);

// Estimate NDV for the build side join key using optimizer statistics.
// Handles all RemoteExpr variants by looking at the column references inside
// the expression. If the expression is constant, NDV is 1. If it contains
// exactly one column reference, reuse that column's NDV. Otherwise, fall
// back to the overall build-side cardinality.
let build_key_ndv = estimate_build_key_ndv(&build_key_expr, build_column_stats)
.unwrap_or_else(|| build_stat_info.cardinality.ceil() as u64);

let data_type = build_key_expr.data_type().remove_nullable();
let id = metadata.write().next_runtime_filter_id();

let enable_bloom_runtime_filter = is_type_supported_for_bloom_filter(&data_type);
Expand All @@ -159,6 +173,7 @@ pub async fn build_runtime_filter(
id,
build_key: build_key.clone(),
probe_targets,
build_key_ndv,
build_table_rows,
enable_bloom_runtime_filter,
enable_inlist_runtime_filter: true,
Expand All @@ -170,6 +185,23 @@ pub async fn build_runtime_filter(
Ok(PhysicalRuntimeFilters { filters })
}

fn estimate_build_key_ndv(
build_key: &Expr<IndexType>,
build_column_stats: &ColumnStatSet,
) -> Option<u64> {
let mut column_refs = build_key.column_refs();
if column_refs.is_empty() {
return Some(1);
}

if column_refs.len() == 1 {
let (id, _) = column_refs.drain().next().unwrap();
build_column_stats.get(&id).map(|s| s.ndv.ceil() as u64)
} else {
None
}
}

async fn get_build_table_rows(
ctx: Arc<dyn TableContext>,
metadata: &MetadataRef,
Expand Down
3 changes: 3 additions & 0 deletions src/query/service/src/physical_plans/runtime_filter/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ pub struct PhysicalRuntimeFilter {
/// All probe targets in this list are in the same equivalence class
pub probe_targets: Vec<(RemoteExpr<String>, usize)>,

/// Estimated NDV of the build side join key, derived from optimizer statistics.
pub build_key_ndv: u64,

pub build_table_rows: Option<u64>,

/// Enable bloom filter for this runtime filter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ pub struct RuntimeFilterDesc {
pub id: usize,
pub build_key: Expr,
pub probe_targets: Vec<(Expr<String>, usize)>,
pub build_key_ndv: u64,
pub build_table_rows: Option<u64>,
pub enable_bloom_runtime_filter: bool,
pub enable_inlist_runtime_filter: bool,
Expand Down Expand Up @@ -98,6 +99,7 @@ impl From<&PhysicalRuntimeFilter> for RuntimeFilterDesc {
.iter()
.map(|(probe_key, scan_id)| (probe_key.as_expr(&BUILTIN_FUNCTIONS), *scan_id))
.collect(),
build_key_ndv: runtime_filter.build_key_ndv,
build_table_rows: runtime_filter.build_table_rows,
enable_bloom_runtime_filter: runtime_filter.enable_bloom_runtime_filter,
enable_inlist_runtime_filter: runtime_filter.enable_inlist_runtime_filter,
Expand Down
Loading
Loading