diff --git a/Cargo.lock b/Cargo.lock index 8f07c5b62a3e1..be9c069769cda 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3669,6 +3669,7 @@ dependencies = [ "rand 0.8.5", "rand_distr", "recursive", + "roaring", "rust_decimal", "rustls 0.23.27", "serde", diff --git a/src/common/io/src/bitmap.rs b/src/common/io/src/bitmap.rs index 289ced1821be0..cdb924acecb7e 100644 --- a/src/common/io/src/bitmap.rs +++ b/src/common/io/src/bitmap.rs @@ -29,12 +29,12 @@ use roaring::treemap::Iter; use smallvec::SmallVec; // https://github.com/ClickHouse/ClickHouse/blob/516a6ed6f8bd8c5f6eed3a10e9037580b2fb6152/src/AggregateFunctions/AggregateFunctionGroupBitmapData.h#L914 -const LARGE_THRESHOLD: usize = 32; -const HYBRID_MAGIC: [u8; 2] = *b"HB"; -const HYBRID_VERSION: u8 = 1; -const HYBRID_KIND_SMALL: u8 = 0; -const HYBRID_KIND_LARGE: u8 = 1; -const HYBRID_HEADER_LEN: usize = 4; +pub const LARGE_THRESHOLD: usize = 32; +pub const HYBRID_MAGIC: [u8; 2] = *b"HB"; +pub const HYBRID_VERSION: u8 = 1; +pub const HYBRID_KIND_SMALL: u8 = 0; +pub const HYBRID_KIND_LARGE: u8 = 1; +pub const HYBRID_HEADER_LEN: usize = 4; type SmallBitmap = SmallVec<[u64; LARGE_THRESHOLD]>; @@ -43,7 +43,7 @@ type SmallBitmap = SmallVec<[u64; LARGE_THRESHOLD]>; /// - Calculations may frequently create new Bitmaps; reusing them as much as possible can effectively improve performance. /// - do not use Box to construct HybridBitmap #[allow(clippy::large_enum_variant)] -#[derive(Clone)] +#[derive(Clone, PartialEq)] pub enum HybridBitmap { Small(SmallBitmap), Large(RoaringTreemap), diff --git a/src/common/io/src/lib.rs b/src/common/io/src/lib.rs index 4c7b97442e6a4..14379062b080a 100644 --- a/src/common/io/src/lib.rs +++ b/src/common/io/src/lib.rs @@ -46,7 +46,13 @@ mod stat_buffer; pub mod interval; pub mod wkb; +pub use bitmap::HYBRID_HEADER_LEN; +pub use bitmap::HYBRID_KIND_LARGE; +pub use bitmap::HYBRID_KIND_SMALL; +pub use bitmap::HYBRID_MAGIC; +pub use bitmap::HYBRID_VERSION; pub use bitmap::HybridBitmap; +pub use bitmap::LARGE_THRESHOLD; pub use bitmap::deserialize_bitmap; pub use bitmap::parse_bitmap; pub use decimal::display_decimal_128; diff --git a/src/query/expression/Cargo.toml b/src/query/expression/Cargo.toml index 2825e58c73f4d..a9d8469478fc1 100644 --- a/src/query/expression/Cargo.toml +++ b/src/query/expression/Cargo.toml @@ -70,6 +70,7 @@ goldenfile = { workspace = true } pretty_assertions = { workspace = true } proptest = { workspace = true } rand = { workspace = true } +roaring = { workspace = true } [[bench]] name = "bench" diff --git a/src/query/expression/src/aggregate/group_hash.rs b/src/query/expression/src/aggregate/group_hash.rs index 090aae48b88d4..cfb29554f3620 100644 --- a/src/query/expression/src/aggregate/group_hash.rs +++ b/src/query/expression/src/aggregate/group_hash.rs @@ -26,7 +26,10 @@ use crate::Scalar; use crate::ScalarRef; use crate::Value; use crate::types::decimal::Decimal; +use crate::types::i256; +use crate::types::number::Number; use crate::types::*; +use crate::utils::bitmap::normalize_bitmap_column; use crate::visitor::ValueVisitor; use crate::with_decimal_mapped_type; use crate::with_number_mapped_type; @@ -229,7 +232,8 @@ impl ValueVisitor for HashVisitor<'_, IS_FIRST> { } fn visit_bitmap(&mut self, column: BinaryColumn) -> Result<()> { - self.combine_group_hash_string_column::(&column); + let column = normalize_bitmap_column(&column); + self.combine_group_hash_string_column::(column.as_ref()); Ok(()) } @@ -411,7 +415,11 @@ where I: Index } fn visit_bitmap(&mut self, column: crate::types::BinaryColumn) -> Result<()> { - self.visit_binary(column) + let column = normalize_bitmap_column(&column); + self.visit_indices(|i| { + let value = column.as_ref().index(i.to_usize()).unwrap(); + value.agg_hash() + }) } fn visit_string(&mut self, column: crate::types::StringColumn) -> Result<()> { @@ -628,6 +636,8 @@ mod tests { use databend_common_column::bitmap::Bitmap; use databend_common_column::types::months_days_micros; use databend_common_column::types::timestamp_tz; + use databend_common_io::HybridBitmap; + use roaring::RoaringTreemap; use super::*; use crate::BlockEntry; @@ -636,8 +646,11 @@ mod tests { use crate::ProjectedBlock; use crate::Value; use crate::types::ArgType; + use crate::types::BitmapType; use crate::types::DecimalSize; + use crate::types::Int32Type; use crate::types::NullableColumn; + use crate::types::NullableType; use crate::types::NumberScalar; use crate::types::OpaqueScalar; use crate::types::VectorDataType; @@ -864,4 +877,33 @@ mod tests { } Ok(()) } + + #[test] + fn test_bitmap_group_hash_legacy_bytes_normalized() -> Result<()> { + let values = [1_u64, 5, 42]; + + let mut hybrid = HybridBitmap::new(); + for v in values { + hybrid.insert(v); + } + let mut hybrid_bytes = Vec::new(); + hybrid.serialize_into(&mut hybrid_bytes).unwrap(); + + let mut tree = RoaringTreemap::new(); + for v in values { + tree.insert(v); + } + let mut legacy_bytes = Vec::new(); + tree.serialize_into(&mut legacy_bytes).unwrap(); + + let bitmap_column = BitmapType::from_data(vec![hybrid_bytes, legacy_bytes]); + let block = DataBlock::new(vec![bitmap_column.into()], 2); + + let mut hashes = vec![0_u64; block.num_rows()]; + group_hash_entries(ProjectedBlock::from(block.columns()), &mut hashes); + + // Legacy-encoded bitmap should hash identically to hybrid-encoded bitmap. + assert_eq!(hashes[0], hashes[1]); + Ok(()) + } } diff --git a/src/query/expression/src/aggregate/payload_row.rs b/src/query/expression/src/aggregate/payload_row.rs index a9cf316f15b13..794a2c3803713 100644 --- a/src/query/expression/src/aggregate/payload_row.rs +++ b/src/query/expression/src/aggregate/payload_row.rs @@ -15,6 +15,7 @@ use bumpalo::Bump; use databend_common_base::hints::assume; use databend_common_column::bitmap::Bitmap; +use databend_common_io::deserialize_bitmap; use databend_common_io::prelude::bincode_deserialize_from_slice; use databend_common_io::prelude::bincode_serialize_into_buf; @@ -42,6 +43,7 @@ use crate::types::TimestampType; use crate::types::decimal::Decimal; use crate::types::decimal::DecimalColumn; use crate::types::i256; +use crate::utils::bitmap::is_hybrid_encoding; use crate::with_decimal_mapped_type; use crate::with_number_mapped_type; @@ -128,11 +130,33 @@ pub(super) unsafe fn serialize_column_to_rowformat( } } } - Column::Binary(v) | Column::Bitmap(v) | Column::Variant(v) | Column::Geometry(v) => { - for row in select_vector { - let data = arena.alloc_slice_copy(unsafe { v.index_unchecked(row.to_usize()) }); + Column::Bitmap(v) => { + for &index in select_vector { + let value = unsafe { v.index_unchecked(index.to_usize()) }; + let normalized = if is_hybrid_encoding(value) { + value + } else { + match deserialize_bitmap(value) { + Ok(bitmap) => { + scratch.clear(); + // Safe unwrap: serialize_into writes into Vec. + bitmap.serialize_into(&mut *scratch).unwrap(); + scratch.as_slice() + } + Err(_) => value, + } + }; + let data = arena.alloc_slice_copy(normalized); unsafe { - address[*row].write_bytes(offset, data); + address[index].write_bytes(offset, data); + } + } + } + Column::Binary(v) | Column::Variant(v) | Column::Geometry(v) => { + for &index in select_vector { + let data = arena.alloc_slice_copy(unsafe { v.index_unchecked(index.to_usize()) }); + unsafe { + address[index].write_bytes(offset, data); } } } @@ -570,3 +594,71 @@ impl<'s> CompareState<'s> { } } } + +#[cfg(test)] +mod tests { + use databend_common_column::binary::BinaryColumnBuilder; + use databend_common_io::HybridBitmap; + use databend_common_io::deserialize_bitmap; + use roaring::RoaringTreemap; + + use super::*; + + #[test] + fn serialize_bitmap_rowformat_normalizes_legacy_bytes() { + let values = [1_u64, 5, 42]; + + let mut hybrid = HybridBitmap::new(); + for v in values { + hybrid.insert(v); + } + let mut hybrid_bytes = Vec::new(); + hybrid.serialize_into(&mut hybrid_bytes).unwrap(); + + let mut tree = RoaringTreemap::new(); + for v in values { + tree.insert(v); + } + let mut legacy_bytes = Vec::new(); + tree.serialize_into(&mut legacy_bytes).unwrap(); + + let mut builder = + BinaryColumnBuilder::with_capacity(2, hybrid_bytes.len() + legacy_bytes.len()); + builder.put_slice(&hybrid_bytes); + builder.commit_row(); + builder.put_slice(&legacy_bytes); + builder.commit_row(); + let column = Column::Bitmap(builder.build()); + + let arena = Bump::new(); + let row_size = rowformat_size(&DataType::Bitmap); + + let mut row0 = vec![0u8; row_size]; + let mut row1 = vec![0u8; row_size]; + let mut addresses = [RowPtr::null(); BATCH_SIZE]; + addresses[0] = RowPtr::new(row0.as_mut_ptr()); + addresses[1] = RowPtr::new(row1.as_mut_ptr()); + + let select_vector = [RowID::from(0), RowID::from(1)]; + let mut scratch = Vec::new(); + unsafe { + serialize_column_to_rowformat( + &arena, + &column, + &select_vector, + &mut addresses, + 0, + &mut scratch, + ); + } + + let bytes0 = unsafe { addresses[0].read_bytes(0) }; + let bytes1 = unsafe { addresses[1].read_bytes(0) }; + + assert_eq!(bytes0, bytes1); + assert!(bytes0.starts_with(b"HB")); + + let decoded = deserialize_bitmap(bytes0).unwrap(); + assert_eq!(decoded.iter().collect::>(), values); + } +} diff --git a/src/query/expression/src/kernels/group_by_hash/method_single_string.rs b/src/query/expression/src/kernels/group_by_hash/method_single_string.rs index 5ea9890de5cfd..f25871c1b9787 100644 --- a/src/query/expression/src/kernels/group_by_hash/method_single_string.rs +++ b/src/query/expression/src/kernels/group_by_hash/method_single_string.rs @@ -22,6 +22,7 @@ use crate::KeysState; use crate::ProjectedBlock; use crate::types::BinaryColumn; use crate::types::binary::BinaryColumnIter; +use crate::utils::bitmap::normalize_bitmap_column; #[derive(Debug, Clone, Default, PartialEq, Eq)] pub struct HashMethodSingleBinary {} @@ -36,7 +37,13 @@ impl HashMethod for HashMethodSingleBinary { } fn build_keys_state(&self, group_columns: ProjectedBlock, _rows: usize) -> Result { - Ok(KeysState::Column(group_columns[0].to_column())) + Ok(KeysState::Column(match group_columns[0].to_column() { + Column::Bitmap(col) => match normalize_bitmap_column(&col) { + std::borrow::Cow::Borrowed(_) => Column::Bitmap(col), + std::borrow::Cow::Owned(col) => Column::Bitmap(col), + }, + column => column, + })) } fn build_keys_iter<'a>(&self, keys_state: &'a KeysState) -> Result> { diff --git a/src/query/expression/src/kernels/group_by_hash/utils.rs b/src/query/expression/src/kernels/group_by_hash/utils.rs index da5ce4809e70f..bb58651902ce6 100644 --- a/src/query/expression/src/kernels/group_by_hash/utils.rs +++ b/src/query/expression/src/kernels/group_by_hash/utils.rs @@ -17,11 +17,14 @@ use databend_common_base::vec_ext::VecU8Ext; use crate::Column; use crate::ProjectedBlock; +use crate::types::AnyType; use crate::types::BinaryColumn; +use crate::types::NullableColumn; use crate::types::NumberColumn; use crate::types::binary::BinaryColumnBuilder; use crate::types::decimal::DecimalColumn; use crate::types::vector::VectorScalarRef; +use crate::utils::bitmap::normalize_bitmap_column; use crate::with_decimal_type; use crate::with_number_mapped_type; use crate::with_vector_number_type; @@ -32,12 +35,16 @@ pub fn serialize_group_columns( num_rows: usize, serialize_size: usize, ) -> BinaryColumn { + let columns: Vec = columns + .iter() + .map(|entry| normalize_bitmap_in_column(entry.to_column())) + .collect(); let mut builder = BinaryColumnBuilder::with_capacity(num_rows, serialize_size); for i in 0..num_rows { - for entry in columns.iter() { + for column in columns.iter() { unsafe { - serialize_column_binary(&entry.to_column(), i, &mut builder.data); + serialize_column_binary(column, i, &mut builder.data); } } builder.commit_row(); @@ -132,3 +139,26 @@ pub unsafe fn serialize_column_binary(column: &Column, row: usize, row_space: &m } } } + +fn normalize_bitmap_in_column(column: Column) -> Column { + match column { + Column::Bitmap(col) => match normalize_bitmap_column(&col) { + std::borrow::Cow::Borrowed(_) => Column::Bitmap(col), + std::borrow::Cow::Owned(col) => Column::Bitmap(col), + }, + Column::Nullable(box nullable) => { + let (col, validity) = nullable.destructure(); + Column::Nullable(Box::new(NullableColumn:: { + column: normalize_bitmap_in_column(col), + validity, + })) + } + Column::Tuple(columns) => Column::Tuple( + columns + .into_iter() + .map(normalize_bitmap_in_column) + .collect(), + ), + other => other, + } +} diff --git a/src/query/expression/src/utils/bitmap.rs b/src/query/expression/src/utils/bitmap.rs new file mode 100644 index 0000000000000..aeeb5e874a224 --- /dev/null +++ b/src/query/expression/src/utils/bitmap.rs @@ -0,0 +1,81 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::borrow::Cow; + +use databend_common_column::binary::BinaryColumn; +use databend_common_column::binary::BinaryColumnBuilder; +use databend_common_io::HYBRID_KIND_LARGE; +use databend_common_io::HYBRID_KIND_SMALL; +use databend_common_io::HYBRID_MAGIC; +use databend_common_io::HYBRID_VERSION; +use databend_common_io::deserialize_bitmap; +#[inline] +pub fn is_hybrid_encoding(bytes: &[u8]) -> bool { + if bytes.is_empty() { + return true; + } + if bytes.len() < 4 { + return false; + } + bytes[0] == HYBRID_MAGIC[0] + && bytes[1] == HYBRID_MAGIC[1] + && bytes[2] == HYBRID_VERSION + && (bytes[3] == HYBRID_KIND_SMALL || bytes[3] == HYBRID_KIND_LARGE) +} + +/// Ensure bitmap bytes are encoded in the current hybrid format. +/// - If all rows are already in the new format, returns a borrowed column (zero copy). +/// - Once a legacy row is seen, the whole column is rewritten with normalized bytes. +pub fn normalize_bitmap_column(column: &BinaryColumn) -> Cow<'_, BinaryColumn> { + if column.is_empty() { + return Cow::Borrowed(column); + } + + let mut has_legacy = false; + for value in column.iter() { + if !is_hybrid_encoding(value) { + has_legacy = true; + break; + } + } + + if !has_legacy { + return Cow::Borrowed(column); + } + + let mut builder = BinaryColumnBuilder::with_capacity(column.len(), column.data().len()); + for value in column.iter() { + if is_hybrid_encoding(value) { + builder.put_slice(value); + builder.commit_row(); + continue; + } + + match deserialize_bitmap(value) { + Ok(bitmap) => { + // Safe unwrap: serialize_into writes into Vec. + bitmap.serialize_into(&mut builder.data).unwrap(); + builder.commit_row(); + } + Err(_) => { + // Keep original bytes if they cannot be decoded; this preserves prior behavior. + builder.put_slice(value); + builder.commit_row(); + } + } + } + + Cow::Owned(builder.build()) +} diff --git a/src/query/expression/src/utils/mod.rs b/src/query/expression/src/utils/mod.rs index b06a708884a3f..d7d50e644ba87 100644 --- a/src/query/expression/src/utils/mod.rs +++ b/src/query/expression/src/utils/mod.rs @@ -14,6 +14,7 @@ pub mod arithmetics_type; pub mod arrow; +pub mod bitmap; pub mod block_debug; pub mod block_thresholds; mod column_from; diff --git a/src/query/expression/src/values.rs b/src/query/expression/src/values.rs index eb1b8cb8c9aba..c21e21315a02a 100755 --- a/src/query/expression/src/values.rs +++ b/src/query/expression/src/values.rs @@ -35,6 +35,7 @@ use databend_common_exception::Result; use databend_common_frozen_api::FrozenAPI; use databend_common_frozen_api::frozen_api; use databend_common_io::HybridBitmap; +use databend_common_io::deserialize_bitmap; use databend_common_io::prelude::BinaryRead; use enum_as_inner::EnumAsInner; use geo::Geometry; @@ -50,6 +51,7 @@ use serde::Serializer; use serde::de::Visitor; use string::StringColumnBuilder; +use crate::bitmap::is_hybrid_encoding; use crate::property::Domain; use crate::types::array::ArrayColumn; use crate::types::array::ArrayColumnBuilder; @@ -941,7 +943,19 @@ impl PartialOrd for Scalar { (Scalar::Interval(i1), Scalar::Interval(i2)) => i1.partial_cmp(i2), (Scalar::Array(a1), Scalar::Array(a2)) => a1.partial_cmp(a2), (Scalar::Map(m1), Scalar::Map(m2)) => m1.partial_cmp(m2), - (Scalar::Bitmap(b1), Scalar::Bitmap(b2)) => b1.partial_cmp(b2), + (Scalar::Bitmap(b1), Scalar::Bitmap(b2)) => { + // Bitmap only allows PartialEq + if is_hybrid_encoding(b1) == is_hybrid_encoding(b2) && b1 == b2 { + return Some(Ordering::Equal); + } + let Ok(map_1) = deserialize_bitmap(b1) else { + return None; + }; + let Ok(map_2) = deserialize_bitmap(b2) else { + return None; + }; + map_1.eq(&map_2).then_some(Ordering::Equal) + } (Scalar::Tuple(t1), Scalar::Tuple(t2)) => t1.partial_cmp(t2), (Scalar::Variant(v1), Scalar::Variant(v2)) => { let left_jsonb = RawJsonb::new(v1);