Skip to content

Commit fef969f

Browse files
committed
refactor: Pre-deserialization of Bitmap
1 parent 21b6f03 commit fef969f

File tree

40 files changed

+629
-515
lines changed

40 files changed

+629
-515
lines changed

src/common/io/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ micromarshal = { workspace = true }
2727
roaring = { workspace = true }
2828
scroll = { workspace = true }
2929
serde = { workspace = true }
30-
smallvec = { workspace = true }
30+
smallvec = { workspace = true, features = ["serde"] }
3131
wkt = { workspace = true }
3232

3333
[dev-dependencies]

src/common/io/src/bitmap.rs

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,17 +14,24 @@
1414

1515
use std::fmt;
1616
use std::io;
17+
use std::io::Read;
18+
use std::io::Write;
1719
use std::iter::FromIterator;
1820
use std::mem;
21+
use std::mem::size_of;
1922
use std::ops::BitAndAssign;
2023
use std::ops::BitOrAssign;
2124
use std::ops::BitXorAssign;
2225
use std::ops::SubAssign;
2326

27+
use borsh::BorshDeserialize;
28+
use borsh::BorshSerialize;
2429
use databend_common_exception::ErrorCode;
2530
use databend_common_exception::Result;
2631
use roaring::treemap::Iter;
2732
use roaring::RoaringTreemap;
33+
use serde::Deserialize;
34+
use serde::Serialize;
2835
use smallvec::SmallVec;
2936

3037
// https://github.com/ClickHouse/ClickHouse/blob/516a6ed6f8bd8c5f6eed3a10e9037580b2fb6152/src/AggregateFunctions/AggregateFunctionGroupBitmapData.h#L914
@@ -38,7 +45,7 @@ pub const HYBRID_HEADER_LEN: usize = 4;
3845
type SmallBitmap = SmallVec<[u64; LARGE_THRESHOLD]>;
3946

4047
#[allow(clippy::large_enum_variant)]
41-
#[derive(Clone, PartialEq)]
48+
#[derive(Clone, PartialEq, Serialize, Deserialize)]
4249
pub enum HybridBitmap {
4350
Small(SmallBitmap),
4451
Large(RoaringTreemap),
@@ -62,6 +69,15 @@ impl HybridBitmap {
6269
}
6370
}
6471

72+
pub fn memory_size(&self) -> usize {
73+
match self {
74+
HybridBitmap::Small(set) => {
75+
size_of::<SmallBitmap>() + set.capacity() * size_of::<u64>()
76+
}
77+
HybridBitmap::Large(tree) => tree.serialized_size(),
78+
}
79+
}
80+
6581
pub fn is_empty(&self) -> bool {
6682
self.len() == 0
6783
}
@@ -447,6 +463,33 @@ impl fmt::Debug for HybridBitmap {
447463
}
448464
}
449465

466+
impl BorshSerialize for HybridBitmap {
467+
fn serialize<W: Write>(&self, writer: &mut W) -> std::io::Result<()> {
468+
let mut buf = Vec::new();
469+
self.serialize_into(&mut buf)?;
470+
let len = u64::try_from(buf.len()).map_err(|_| {
471+
io::Error::new(
472+
io::ErrorKind::InvalidData,
473+
format!("hybrid bitmap serialized size overflow: {}", buf.len()),
474+
)
475+
})?;
476+
writer.write_all(&len.to_le_bytes())?;
477+
writer.write_all(&buf)
478+
}
479+
}
480+
481+
impl BorshDeserialize for HybridBitmap {
482+
fn deserialize_reader<R: Read>(reader: &mut R) -> std::io::Result<Self> {
483+
let mut len_buf = [0u8; 8];
484+
reader.read_exact(&mut len_buf)?;
485+
let len = u64::from_le_bytes(len_buf) as usize;
486+
let mut buf = vec![0u8; len];
487+
reader.read_exact(&mut buf)?;
488+
deserialize_bitmap(&buf)
489+
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))
490+
}
491+
}
492+
450493
pub fn parse_bitmap(buf: &[u8]) -> Result<HybridBitmap> {
451494
std::str::from_utf8(buf)
452495
.map_err(|_| ())

src/common/native/src/read/array/binary.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,15 @@
1515
use std::io::Cursor;
1616

1717
use databend_common_column::binary::BinaryColumn;
18+
use databend_common_expression::types::bitmap::BitmapColumn;
1819
use databend_common_expression::types::Bitmap;
1920
use databend_common_expression::types::Buffer;
2021
use databend_common_expression::types::GeographyColumn;
2122
use databend_common_expression::Column;
2223
use databend_common_expression::TableDataType;
2324

2425
use crate::compression::binary::decompress_binary;
26+
use crate::error::Error;
2527
use crate::error::Result;
2628
use crate::nested::InitNested;
2729
use crate::nested::NestedState;
@@ -132,25 +134,27 @@ fn try_new_binary_column(
132134
validity: Option<Bitmap>,
133135
) -> Result<Column> {
134136
let column = BinaryColumn::new(values, offsets);
135-
Ok(binary_column_to_column(data_type, column, validity))
137+
binary_column_to_column(data_type, column, validity)
136138
}
137139

138140
fn binary_column_to_column(
139141
data_type: &TableDataType,
140142
column: BinaryColumn,
141143
validity: Option<Bitmap>,
142-
) -> Column {
144+
) -> Result<Column> {
143145
let col = match data_type.remove_nullable() {
144146
TableDataType::Binary => Column::Binary(column),
145-
TableDataType::Bitmap => Column::Bitmap(column),
147+
TableDataType::Bitmap => Column::Bitmap(Box::new(
148+
BitmapColumn::from_binary(column).map_err(|e| Error::OutOfSpec(e.to_string()))?,
149+
)),
146150
TableDataType::Variant => Column::Variant(column),
147151
TableDataType::Geometry => Column::Geometry(column),
148152
TableDataType::Geography => Column::Geography(GeographyColumn(column)),
149153
_ => unreachable!(),
150154
};
151155
if data_type.is_nullable() {
152-
col.wrap_nullable(validity)
156+
Ok(col.wrap_nullable(validity))
153157
} else {
154-
col
158+
Ok(col)
155159
}
156160
}

src/common/native/src/write/serialize.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -184,11 +184,14 @@ impl<'a, W: Write> ValueVisitor for WriteVisitor<'a, W> {
184184
)
185185
}
186186

187-
Column::Binary(b)
188-
| Column::Bitmap(b)
189-
| Column::Variant(b)
190-
| Column::Geography(GeographyColumn(b))
191-
| Column::Geometry(b) => write_binary::<W>(
187+
Column::Binary(b) | Column::Variant(b) | Column::Geometry(b) => write_binary::<W>(
188+
self.w,
189+
&b,
190+
self.validity.clone(),
191+
self.write_options.clone(),
192+
self.scratch,
193+
),
194+
Column::Geography(GeographyColumn(b)) => write_binary::<W>(
192195
self.w,
193196
&b,
194197
self.validity.clone(),

src/query/expression/src/aggregate/group_hash.rs

Lines changed: 36 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use databend_common_column::types::Index;
1919
use databend_common_exception::ErrorCode;
2020
use databend_common_exception::Result;
2121

22+
use crate::types::bitmap::BitmapColumn;
2223
use crate::types::i256;
2324
use crate::types::number::Number;
2425
use crate::types::AccessType;
@@ -47,7 +48,6 @@ use crate::types::StringType;
4748
use crate::types::TimestampType;
4849
use crate::types::ValueType;
4950
use crate::types::VariantType;
50-
use crate::utils::bitmap::normalize_bitmap_column;
5151
use crate::visitor::ValueVisitor;
5252
use crate::with_decimal_mapped_type;
5353
use crate::with_number_mapped_type;
@@ -167,33 +167,40 @@ impl<const IS_FIRST: bool> ValueVisitor for HashVisitor<'_, IS_FIRST> {
167167
}
168168

169169
fn visit_binary(&mut self, column: BinaryColumn) -> Result<()> {
170-
self.combine_group_hash_string_column::<BinaryType>(&column);
170+
self.combine_group_hash_string_column::<BinaryType, _>(&column, |x| Ok(x.agg_hash()))?;
171171
Ok(())
172172
}
173173

174174
fn visit_string(&mut self, column: StringColumn) -> Result<()> {
175-
self.combine_group_hash_string_column::<StringType>(&column);
175+
self.combine_group_hash_string_column::<StringType, _>(&column, |x| {
176+
Ok(x.as_bytes().agg_hash())
177+
})?;
176178
Ok(())
177179
}
178180

179-
fn visit_bitmap(&mut self, column: BinaryColumn) -> Result<()> {
180-
let column = normalize_bitmap_column(&column);
181-
self.combine_group_hash_string_column::<BitmapType>(column.as_ref());
181+
fn visit_bitmap(&mut self, column: BitmapColumn) -> Result<()> {
182+
let mut bytes = Vec::new();
183+
self.combine_group_hash_string_column::<BitmapType, _>(&column, |x| {
184+
x.serialize_into(&mut bytes).unwrap();
185+
let hash = bytes.agg_hash();
186+
bytes.clear();
187+
Ok(hash)
188+
})?;
182189
Ok(())
183190
}
184191

185192
fn visit_variant(&mut self, column: BinaryColumn) -> Result<()> {
186-
self.combine_group_hash_string_column::<VariantType>(&column);
193+
self.combine_group_hash_string_column::<VariantType, _>(&column, |x| Ok(x.agg_hash()))?;
187194
Ok(())
188195
}
189196

190197
fn visit_geometry(&mut self, column: BinaryColumn) -> Result<()> {
191-
self.combine_group_hash_string_column::<GeometryType>(&column);
198+
self.combine_group_hash_string_column::<GeometryType, _>(&column, |x| Ok(x.agg_hash()))?;
192199
Ok(())
193200
}
194201

195202
fn visit_geography(&mut self, column: GeographyColumn) -> Result<()> {
196-
self.combine_group_hash_string_column::<GeographyType>(&column);
203+
self.combine_group_hash_string_column::<GeographyType, _>(&column, |x| Ok(x.0.agg_hash()))?;
197204
Ok(())
198205
}
199206

@@ -225,20 +232,25 @@ impl<const IS_FIRST: bool> HashVisitor<'_, IS_FIRST> {
225232
}
226233
}
227234

228-
fn combine_group_hash_string_column<T>(&mut self, col: &T::Column)
235+
fn combine_group_hash_string_column<T, F>(
236+
&mut self,
237+
col: &T::Column,
238+
mut agg_hash: F,
239+
) -> Result<()>
229240
where
230241
T: ValueType,
231-
for<'a> T::ScalarRef<'a>: AsRef<[u8]>,
242+
for<'a> F: FnMut(&T::ScalarRef<'a>) -> Result<u64>,
232243
{
233244
if IS_FIRST {
234245
for (x, val) in T::iter_column(col).zip(self.values.iter_mut()) {
235-
*val = x.as_ref().agg_hash();
246+
*val = agg_hash(&x)?;
236247
}
237248
} else {
238249
for (x, val) in T::iter_column(col).zip(self.values.iter_mut()) {
239-
*val = (*val).wrapping_mul(NULL_HASH_VAL) ^ x.as_ref().agg_hash();
250+
*val = (*val).wrapping_mul(NULL_HASH_VAL) ^ agg_hash(&x)?;
240251
}
241252
}
253+
Ok(())
242254
}
243255
}
244256

@@ -359,11 +371,14 @@ where I: Index
359371
self.visit_binary(column)
360372
}
361373

362-
fn visit_bitmap(&mut self, column: crate::types::BinaryColumn) -> Result<()> {
363-
let column = normalize_bitmap_column(&column);
374+
fn visit_bitmap(&mut self, column: BitmapColumn) -> Result<()> {
375+
let mut bytes = Vec::new();
364376
self.visit_indices(|i| {
365-
let value = column.as_ref().index(i.to_usize()).unwrap();
366-
value.agg_hash()
377+
let value = column.index(i.to_usize()).unwrap();
378+
value.serialize_into(&mut bytes).unwrap();
379+
let hash = bytes.agg_hash();
380+
bytes.clear();
381+
hash
367382
})
368383
}
369384

@@ -429,8 +444,8 @@ where I: Index
429444
impl<const IS_FIRST: bool, I> IndexHashVisitor<'_, '_, IS_FIRST, I>
430445
where I: Index
431446
{
432-
fn visit_indices<F>(&mut self, do_hash: F) -> Result<()>
433-
where F: Fn(&I) -> u64 {
447+
fn visit_indices<F>(&mut self, mut do_hash: F) -> Result<()>
448+
where F: FnMut(&I) -> u64 {
434449
self.visit_indices_update(|i, val| {
435450
let hash = do_hash(i);
436451
if IS_FIRST {
@@ -441,8 +456,8 @@ where I: Index
441456
})
442457
}
443458

444-
fn visit_indices_update<F>(&mut self, update: F) -> Result<()>
445-
where F: Fn(&I, &mut u64) {
459+
fn visit_indices_update<F>(&mut self, mut update: F) -> Result<()>
460+
where F: FnMut(&I, &mut u64) {
446461
for i in self.indices {
447462
let val = &mut self.target[i.to_usize()];
448463
update(i, val);

src/query/expression/src/aggregate/payload_flush.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,6 @@ impl Payload {
239239
DataType::Date => self.flush_type_column::<DateType>(col_offset, state),
240240
DataType::Binary => Column::Binary(self.flush_binary_column(col_offset, state)),
241241
DataType::String => Column::String(self.flush_string_column(col_offset, state)),
242-
DataType::Bitmap => Column::Bitmap(self.flush_binary_column(col_offset, state)),
243242
DataType::Variant => Column::Variant(self.flush_binary_column(col_offset, state)),
244243
DataType::Geometry => Column::Geometry(self.flush_binary_column(col_offset, state)),
245244
DataType::Nullable(_) => unreachable!(),

src/query/expression/src/aggregate/payload_row.rs

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,14 @@ use databend_common_io::prelude::bincode_serialize_into_buf;
2020
use super::probe_state::ProbeState;
2121
use super::row_ptr::RowLayout;
2222
use super::row_ptr::RowPtr;
23+
#[cfg(test)]
24+
use crate::types::bitmap::BitmapColumn;
2325
use crate::types::decimal::DecimalColumn;
2426
use crate::types::i256;
2527
use crate::types::AccessType;
2628
use crate::types::AnyType;
2729
use crate::types::BinaryType;
30+
use crate::types::BitmapType;
2831
use crate::types::BooleanType;
2932
use crate::types::DataType;
3033
use crate::types::DateType;
@@ -34,7 +37,6 @@ use crate::types::NumberColumn;
3437
use crate::types::NumberType;
3538
use crate::types::StringType;
3639
use crate::types::TimestampType;
37-
use crate::utils::bitmap::normalize_bitmap_column;
3840
use crate::with_decimal_mapped_type;
3941
use crate::with_number_mapped_type;
4042
use crate::Column;
@@ -122,11 +124,12 @@ pub(super) unsafe fn serialize_column_to_rowformat(
122124
}
123125
}
124126
Column::Bitmap(v) => {
125-
let normalized = normalize_bitmap_column(v);
126-
let col = normalized.as_ref();
127+
let mut bytes = Vec::new();
127128
for &index in select_vector {
128-
let data = arena.alloc_slice_copy(unsafe { col.index_unchecked(index) });
129-
address[index].write_bytes(offset, data);
129+
let map = unsafe { v.index_unchecked(index) };
130+
map.serialize_into(bytes.as_mut_slice()).unwrap();
131+
address[index].write_bytes(offset, &bytes);
132+
bytes.clear();
130133
}
131134
}
132135
Column::Binary(v) | Column::Variant(v) | Column::Geometry(v) => {
@@ -287,15 +290,17 @@ impl ProbeState {
287290
},
288291
),
289292
Column::Bitmap(v) => {
290-
let normalized = normalize_bitmap_column(v);
291-
let col = normalized.as_ref();
293+
let mut bytes = Vec::new();
292294
self.row_match_column_generic(
293295
validity,
294296
validity_offset,
295297
(count, no_match_count),
296298
|idx, row_ptr| unsafe {
297-
let value = BinaryType::index_column_unchecked(col, idx);
298-
row_ptr.is_bytes_eq(col_offset, value)
299+
let value = BitmapType::index_column_unchecked(v, idx);
300+
value.serialize_into(&mut bytes).unwrap();
301+
let is_eq = row_ptr.is_bytes_eq(col_offset, &bytes);
302+
bytes.clear();
303+
is_eq
299304
},
300305
)
301306
}
@@ -346,10 +351,10 @@ impl ProbeState {
346351
validity: Option<&Bitmap>,
347352
validity_offset: usize,
348353
(count, mut no_match_count): (usize, usize),
349-
compare_fn: F,
354+
mut compare_fn: F,
350355
) -> (usize, usize)
351356
where
352-
F: Fn(usize, &RowPtr) -> bool,
357+
F: FnMut(usize, &RowPtr) -> bool,
353358
{
354359
let mut temp = self.get_temp();
355360
temp.reserve(count);
@@ -451,7 +456,9 @@ mod tests {
451456
builder.commit_row();
452457
builder.put_slice(&legacy_bytes);
453458
builder.commit_row();
454-
let column = Column::Bitmap(builder.build());
459+
let column = Column::Bitmap(
460+
BitmapColumn::from_binary(builder.build()).expect("valid bitmap column"),
461+
);
455462

456463
let arena = Bump::new();
457464
let row_size = rowformat_size(&DataType::Bitmap);

0 commit comments

Comments
 (0)