Skip to content

Commit 2607bda

Browse files
committed
enhance: map multi row groups into one cache cell
Signed-off-by: Shawn Wang <[email protected]>
1 parent 6e2872c commit 2607bda

File tree

6 files changed

+331
-147
lines changed

6 files changed

+331
-147
lines changed

internal/core/src/segcore/storagev2translator/GroupCTMeta.h

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,21 @@
1919

2020
namespace milvus::segcore::storagev2translator {
2121

22+
// Number of row groups (parquet row groups) merged into one cache cell,
23+
// for now it is a constant.
24+
// herierchy: 1 group chunk <-> 1 cache cell <-> kRowGroupsPerCell row groups
25+
constexpr size_t kRowGroupsPerCell = 4;
26+
2227
struct GroupCTMeta : public milvus::cachinglayer::Meta {
28+
// num_rows_until_chunk_[i] = total rows(prefix sum) in cells [0, i-1]
29+
// the size of num_rows_until_chunk_ is num_cells + 1
2330
std::vector<int64_t> num_rows_until_chunk_;
31+
// memory size for each group chunk(cache cell)
2432
std::vector<int64_t> chunk_memory_size_;
2533
size_t num_fields_;
34+
// total number of row groups
35+
size_t total_row_groups_;
36+
2637
GroupCTMeta(size_t num_fields,
2738
milvus::cachinglayer::StorageType storage_type,
2839
milvus::cachinglayer::CellIdMappingMode cell_id_mapping_mode,
@@ -34,7 +45,16 @@ struct GroupCTMeta : public milvus::cachinglayer::Meta {
3445
cell_data_type,
3546
cache_warmup_policy,
3647
support_eviction),
37-
num_fields_(num_fields) {
48+
num_fields_(num_fields),
49+
total_row_groups_(0) {
50+
}
51+
52+
// Get the range of row groups for a cell [start, end)
53+
std::pair<size_t, size_t>
54+
get_row_group_range(size_t cid) const {
55+
size_t start = cid * kRowGroupsPerCell;
56+
size_t end = std::min(start + kRowGroupsPerCell, total_row_groups_);
57+
return {start, end};
3858
}
3959
};
4060

internal/core/src/segcore/storagev2translator/GroupChunkTranslator.cpp

Lines changed: 129 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -144,19 +144,39 @@ GroupChunkTranslator::GroupChunkTranslator(
144144
file_metas.size());
145145
}
146146

147-
meta_.num_rows_until_chunk_.reserve(total_row_groups + 1);
148-
meta_.chunk_memory_size_.reserve(total_row_groups);
149-
150-
meta_.num_rows_until_chunk_.push_back(0);
147+
// Collect row group sizes and row counts
148+
std::vector<int64_t> row_group_row_counts;
149+
row_group_sizes_.reserve(total_row_groups);
150+
row_group_row_counts.reserve(total_row_groups);
151151
for (const auto& row_group_meta : row_group_meta_list_) {
152152
for (int i = 0; i < row_group_meta.size(); ++i) {
153-
meta_.num_rows_until_chunk_.push_back(
154-
meta_.num_rows_until_chunk_.back() +
155-
row_group_meta.Get(i).row_num());
156-
meta_.chunk_memory_size_.push_back(
157-
row_group_meta.Get(i).memory_size());
153+
row_group_sizes_.push_back(row_group_meta.Get(i).memory_size());
154+
row_group_row_counts.push_back(row_group_meta.Get(i).row_num());
155+
}
156+
}
157+
158+
// Build cell mapping: each cell contains up to kRowGroupsPerCell row groups
159+
meta_.total_row_groups_ = total_row_groups;
160+
size_t num_cells =
161+
(total_row_groups + kRowGroupsPerCell - 1) / kRowGroupsPerCell;
162+
163+
// Merge row groups into group chunks(cache cells)
164+
meta_.num_rows_until_chunk_.reserve(num_cells + 1);
165+
meta_.num_rows_until_chunk_.push_back(0);
166+
meta_.chunk_memory_size_.reserve(num_cells);
167+
168+
int64_t cumulative_rows = 0;
169+
for (size_t cell_id = 0; cell_id < num_cells; ++cell_id) {
170+
auto [start, end] = meta_.get_row_group_range(cell_id);
171+
int64_t cell_size = 0;
172+
for (size_t i = start; i < end; ++i) {
173+
cumulative_rows += row_group_row_counts[i];
174+
cell_size += row_group_sizes_[i];
158175
}
176+
meta_.num_rows_until_chunk_.push_back(cumulative_rows);
177+
meta_.chunk_memory_size_.push_back(cell_size);
159178
}
179+
160180
AssertInfo(
161181
meta_.num_rows_until_chunk_.back() == column_group_info_.row_count,
162182
fmt::format(
@@ -165,6 +185,14 @@ GroupChunkTranslator::GroupChunkTranslator(
165185
column_group_info_.field_id,
166186
meta_.num_rows_until_chunk_.back(),
167187
column_group_info_.row_count));
188+
189+
LOG_INFO(
190+
"[StorageV2] translator {} merged {} row groups into {} cells ({} "
191+
"row groups per cell)",
192+
key_,
193+
total_row_groups,
194+
num_cells,
195+
kRowGroupsPerCell);
168196
}
169197

170198
GroupChunkTranslator::~GroupChunkTranslator() {
@@ -184,10 +212,14 @@ std::pair<milvus::cachinglayer::ResourceUsage,
184212
milvus::cachinglayer::ResourceUsage>
185213
GroupChunkTranslator::estimated_byte_size_of_cell(
186214
milvus::cachinglayer::cid_t cid) const {
187-
auto [file_idx, row_group_idx] = get_file_and_row_group_index(cid);
188-
auto& row_group_meta = row_group_meta_list_[file_idx].Get(row_group_idx);
215+
AssertInfo(cid < meta_.chunk_memory_size_.size(),
216+
fmt::format("[StorageV2] translator {} cid {} is out of range. "
217+
"Total cells: {}",
218+
key_,
219+
cid,
220+
meta_.chunk_memory_size_.size()));
189221

190-
auto cell_sz = static_cast<int64_t>(row_group_meta.memory_size());
222+
auto cell_sz = meta_.chunk_memory_size_[cid];
191223

192224
if (use_mmap_) {
193225
// why double the disk size for loading?
@@ -205,7 +237,7 @@ GroupChunkTranslator::key() const {
205237
}
206238

207239
std::pair<size_t, size_t>
208-
GroupChunkTranslator::get_file_and_row_group_index(
240+
GroupChunkTranslator::get_file_and_row_group_offset(
209241
milvus::cachinglayer::cid_t cid) const {
210242
for (size_t file_idx = 0; file_idx < file_row_group_prefix_sum_.size() - 1;
211243
++file_idx) {
@@ -223,8 +255,8 @@ GroupChunkTranslator::get_file_and_row_group_index(
223255
}
224256

225257
milvus::cachinglayer::cid_t
226-
GroupChunkTranslator::get_cid_from_file_and_row_group_index(
227-
size_t file_idx, size_t row_group_idx) const {
258+
GroupChunkTranslator::get_global_row_group_idx(size_t file_idx,
259+
size_t row_group_idx) const {
228260
AssertInfo(file_idx < file_row_group_prefix_sum_.size() - 1,
229261
fmt::format("[StorageV2] translator {} file_idx {} is out of "
230262
"range. Total files: {}",
@@ -254,12 +286,27 @@ GroupChunkTranslator::get_cells(const std::vector<cachinglayer::cid_t>& cids) {
254286
cells;
255287
cells.reserve(cids.size());
256288

257-
// Create row group lists for requested cids
258-
std::vector<std::vector<int64_t>> row_group_lists(insert_files_.size());
259-
289+
// Collect all row group indices needed for the requested cells
290+
std::vector<size_t> needed_row_group_indices;
291+
needed_row_group_indices.reserve(kRowGroupsPerCell * cids.size());
260292
for (auto cid : cids) {
261-
auto [file_idx, row_group_idx] = get_file_and_row_group_index(cid);
262-
row_group_lists[file_idx].push_back(row_group_idx);
293+
AssertInfo(cid < meta_.chunk_memory_size_.size(),
294+
fmt::format("[StorageV2] translator {} cid {} is out of "
295+
"range. Total cells: {}",
296+
key_,
297+
cid,
298+
meta_.chunk_memory_size_.size()));
299+
auto [start, end] = meta_.get_row_group_range(cid);
300+
for (size_t i = start; i < end; ++i) {
301+
needed_row_group_indices.push_back(i);
302+
}
303+
}
304+
305+
// Create row group lists for file loading
306+
std::vector<std::vector<int64_t>> row_group_lists(insert_files_.size());
307+
for (auto rg_idx : needed_row_group_indices) {
308+
auto [file_idx, row_group_off] = get_file_and_row_group_offset(rg_idx);
309+
row_group_lists[file_idx].push_back(row_group_off);
263310
}
264311

265312
auto parallel_degree =
@@ -288,57 +335,81 @@ GroupChunkTranslator::get_cells(const std::vector<cachinglayer::cid_t>& cids) {
288335
key_,
289336
column_group_info_.field_id);
290337

338+
// Collect loaded tables by row group index
339+
std::unordered_map<size_t, std::shared_ptr<arrow::Table>> row_group_tables;
340+
row_group_tables.reserve(needed_row_group_indices.size());
341+
291342
std::shared_ptr<milvus::ArrowDataWrapper> r;
292-
std::unordered_set<cachinglayer::cid_t> filled_cids;
293-
filled_cids.reserve(cids.size());
294343
while (channel->pop(r)) {
295344
for (const auto& table_info : r->arrow_tables) {
296-
// Convert file_index and row_group_index to global cid
297-
auto cid = get_cid_from_file_and_row_group_index(
298-
table_info.file_index, table_info.row_group_index);
299-
cells.emplace_back(cid, load_group_chunk(table_info.table, cid));
300-
filled_cids.insert(cid);
345+
// Convert file_index and row_group_index (file inner index, not global index) to global row group index
346+
auto rg_idx = get_global_row_group_idx(table_info.file_index,
347+
table_info.row_group_index);
348+
row_group_tables[rg_idx] = table_info.table;
301349
}
302350
}
303351

304352
// access underlying feature to get exception if any
305353
load_future.get();
306354

307-
// Verify all requested cids have been filled
355+
// Build cells from collected tables
356+
std::unordered_set<cachinglayer::cid_t> filled_cids;
357+
filled_cids.reserve(cids.size());
308358
for (auto cid : cids) {
309-
AssertInfo(filled_cids.find(cid) != filled_cids.end(),
310-
"[StorageV2] translator {} cid {} was not filled, missing "
311-
"row group id {}",
312-
key_,
313-
cid,
314-
cid);
359+
if (filled_cids.count(cid) > 0) {
360+
continue; // Already processed this cell
361+
}
362+
363+
auto [start, end] = meta_.get_row_group_range(cid);
364+
std::vector<std::shared_ptr<arrow::Table>> tables;
365+
tables.reserve(end - start);
366+
367+
for (size_t i = start; i < end; ++i) {
368+
auto it = row_group_tables.find(i);
369+
AssertInfo(it != row_group_tables.end(),
370+
fmt::format("[StorageV2] translator {} row group {} "
371+
"for cell {} was not loaded",
372+
key_,
373+
i,
374+
cid));
375+
tables.push_back(it->second);
376+
}
377+
378+
cells.emplace_back(cid, load_group_chunk(tables, cid));
379+
filled_cids.insert(cid);
315380
}
381+
316382
return cells;
317383
}
318384

319385
std::unique_ptr<milvus::GroupChunk>
320386
GroupChunkTranslator::load_group_chunk(
321-
const std::shared_ptr<arrow::Table>& table,
387+
const std::vector<std::shared_ptr<arrow::Table>>& tables,
322388
const milvus::cachinglayer::cid_t cid) {
323-
AssertInfo(table != nullptr, "arrow table is nullptr");
324-
// Create chunks for each field in this batch
325-
std::unordered_map<FieldId, std::shared_ptr<Chunk>> chunks;
326-
// Iterate through field_id_list to get field_id and create chunk
389+
AssertInfo(!tables.empty(), "tables vector is empty");
390+
for (const auto& table : tables) {
391+
AssertInfo(table != nullptr, "arrow table is nullptr");
392+
}
393+
394+
// Use the first table's schema as reference for field iteration
395+
const auto& schema = tables[0]->schema();
396+
397+
// Collect field info and merge array vectors from all tables
327398
std::vector<FieldId> field_ids;
328399
std::vector<FieldMeta> field_metas;
329400
std::vector<arrow::ArrayVector> array_vecs;
330-
field_metas.reserve(table->schema()->num_fields());
331-
array_vecs.reserve(table->schema()->num_fields());
401+
field_ids.reserve(schema->num_fields());
402+
field_metas.reserve(schema->num_fields());
403+
array_vecs.reserve(schema->num_fields());
332404

333-
for (int i = 0; i < table->schema()->num_fields(); ++i) {
334-
AssertInfo(table->schema()->field(i)->metadata()->Contains(
405+
for (int i = 0; i < schema->num_fields(); ++i) {
406+
AssertInfo(schema->field(i)->metadata()->Contains(
335407
milvus_storage::ARROW_FIELD_ID_KEY),
336408
"[StorageV2] translator {} field id not found in metadata "
337409
"for field {}",
338410
key_,
339-
table->schema()->field(i)->name());
340-
auto field_id = std::stoll(table->schema()
341-
->field(i)
411+
schema->field(i)->name());
412+
auto field_id = std::stoll(schema->field(i)
342413
->metadata()
343414
->Get(milvus_storage::ARROW_FIELD_ID_KEY)
344415
->data());
@@ -355,12 +426,22 @@ GroupChunkTranslator::load_group_chunk(
355426
key_,
356427
fid.get());
357428
const auto& field_meta = it->second;
358-
const arrow::ArrayVector& array_vec = table->column(i)->chunks();
429+
430+
// Merge array vectors from all tables for this field
431+
// All tables in a cell come from the same column group with consistent schema
432+
arrow::ArrayVector merged_array_vec;
433+
for (const auto& table : tables) {
434+
const arrow::ArrayVector& array_vec = table->column(i)->chunks();
435+
merged_array_vec.insert(
436+
merged_array_vec.end(), array_vec.begin(), array_vec.end());
437+
}
438+
359439
field_ids.push_back(fid);
360440
field_metas.push_back(field_meta);
361-
array_vecs.push_back(array_vec);
441+
array_vecs.push_back(std::move(merged_array_vec));
362442
}
363443

444+
std::unordered_map<FieldId, std::shared_ptr<Chunk>> chunks;
364445
if (!use_mmap_) {
365446
chunks = create_group_chunk(field_ids, field_metas, array_vecs);
366447
} else {

internal/core/src/segcore/storagev2translator/GroupChunkTranslator.h

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -66,11 +66,10 @@ class GroupChunkTranslator
6666
get_cells(const std::vector<milvus::cachinglayer::cid_t>& cids) override;
6767

6868
std::pair<size_t, size_t>
69-
get_file_and_row_group_index(milvus::cachinglayer::cid_t cid) const;
69+
get_file_and_row_group_offset(milvus::cachinglayer::cid_t cid) const;
7070

7171
milvus::cachinglayer::cid_t
72-
get_cid_from_file_and_row_group_index(size_t file_idx,
73-
size_t row_group_idx) const;
72+
get_global_row_group_idx(size_t file_idx, size_t row_group_idx) const;
7473

7574
milvus::cachinglayer::Meta*
7675
meta() override {
@@ -83,12 +82,8 @@ class GroupChunkTranslator
8382
constexpr int64_t MIN_STORAGE_BYTES = 1 * 1024 * 1024;
8483
int64_t total_size = 0;
8584
for (auto cid : cids) {
86-
auto [file_idx, row_group_idx] = get_file_and_row_group_index(cid);
87-
auto& row_group_meta =
88-
row_group_meta_list_[file_idx].Get(row_group_idx);
8985
total_size +=
90-
std::max(static_cast<int64_t>(row_group_meta.memory_size()),
91-
MIN_STORAGE_BYTES);
86+
std::max(meta_.chunk_memory_size_[cid], MIN_STORAGE_BYTES);
9287
}
9388
return total_size;
9489
}
@@ -104,8 +99,9 @@ class GroupChunkTranslator
10499
}
105100

106101
private:
102+
// Load a single cell which may contain multiple row groups
107103
std::unique_ptr<milvus::GroupChunk>
108-
load_group_chunk(const std::shared_ptr<arrow::Table>& table,
104+
load_group_chunk(const std::vector<std::shared_ptr<arrow::Table>>& tables,
109105
const milvus::cachinglayer::cid_t cid);
110106

111107
int64_t segment_id_;
@@ -127,6 +123,9 @@ class GroupChunkTranslator
127123
milvus::proto::common::LoadPriority::HIGH};
128124
std::vector<std::shared_ptr<parquet::FileMetaData>> parquet_file_metadata_;
129125
std::map<int64_t, milvus_storage::ColumnOffset> field_id_mapping_;
126+
127+
// Row group sizes before cell merging
128+
std::vector<int64_t> row_group_sizes_;
130129
};
131130

132131
} // namespace milvus::segcore::storagev2translator

0 commit comments

Comments
 (0)