@@ -144,19 +144,39 @@ GroupChunkTranslator::GroupChunkTranslator(
144144 file_metas.size ());
145145 }
146146
147- meta_. num_rows_until_chunk_ . reserve (total_row_groups + 1 );
148- meta_. chunk_memory_size_ . reserve (total_row_groups) ;
149-
150- meta_. num_rows_until_chunk_ . push_back ( 0 );
147+ // Collect row group sizes and row counts
148+ std::vector< int64_t > row_group_row_counts ;
149+ row_group_sizes_. reserve (total_row_groups);
150+ row_group_row_counts. reserve (total_row_groups );
151151 for (const auto & row_group_meta : row_group_meta_list_) {
152152 for (int i = 0 ; i < row_group_meta.size (); ++i) {
153- meta_.num_rows_until_chunk_ .push_back (
154- meta_.num_rows_until_chunk_ .back () +
155- row_group_meta.Get (i).row_num ());
156- meta_.chunk_memory_size_ .push_back (
157- row_group_meta.Get (i).memory_size ());
153+ row_group_sizes_.push_back (row_group_meta.Get (i).memory_size ());
154+ row_group_row_counts.push_back (row_group_meta.Get (i).row_num ());
155+ }
156+ }
157+
158+ // Build cell mapping: each cell contains up to kRowGroupsPerCell row groups
159+ meta_.total_row_groups_ = total_row_groups;
160+ size_t num_cells =
161+ (total_row_groups + kRowGroupsPerCell - 1 ) / kRowGroupsPerCell ;
162+
163+ // Merge row groups into group chunks(cache cells)
164+ meta_.num_rows_until_chunk_ .reserve (num_cells + 1 );
165+ meta_.num_rows_until_chunk_ .push_back (0 );
166+ meta_.chunk_memory_size_ .reserve (num_cells);
167+
168+ int64_t cumulative_rows = 0 ;
169+ for (size_t cell_id = 0 ; cell_id < num_cells; ++cell_id) {
170+ auto [start, end] = meta_.get_row_group_range (cell_id);
171+ int64_t cell_size = 0 ;
172+ for (size_t i = start; i < end; ++i) {
173+ cumulative_rows += row_group_row_counts[i];
174+ cell_size += row_group_sizes_[i];
158175 }
176+ meta_.num_rows_until_chunk_ .push_back (cumulative_rows);
177+ meta_.chunk_memory_size_ .push_back (cell_size);
159178 }
179+
160180 AssertInfo (
161181 meta_.num_rows_until_chunk_ .back () == column_group_info_.row_count ,
162182 fmt::format (
@@ -165,6 +185,14 @@ GroupChunkTranslator::GroupChunkTranslator(
165185 column_group_info_.field_id ,
166186 meta_.num_rows_until_chunk_ .back (),
167187 column_group_info_.row_count ));
188+
189+ LOG_INFO (
190+ " [StorageV2] translator {} merged {} row groups into {} cells ({} "
191+ " row groups per cell)" ,
192+ key_,
193+ total_row_groups,
194+ num_cells,
195+ kRowGroupsPerCell );
168196}
169197
170198GroupChunkTranslator::~GroupChunkTranslator () {
@@ -184,10 +212,14 @@ std::pair<milvus::cachinglayer::ResourceUsage,
184212 milvus::cachinglayer::ResourceUsage>
185213GroupChunkTranslator::estimated_byte_size_of_cell (
186214 milvus::cachinglayer::cid_t cid) const {
187- auto [file_idx, row_group_idx] = get_file_and_row_group_index (cid);
188- auto & row_group_meta = row_group_meta_list_[file_idx].Get (row_group_idx);
215+ AssertInfo (cid < meta_.chunk_memory_size_ .size (),
216+ fmt::format (" [StorageV2] translator {} cid {} is out of range. "
217+ " Total cells: {}" ,
218+ key_,
219+ cid,
220+ meta_.chunk_memory_size_ .size ()));
189221
190- auto cell_sz = static_cast < int64_t >(row_group_meta. memory_size ()) ;
222+ auto cell_sz = meta_. chunk_memory_size_ [cid] ;
191223
192224 if (use_mmap_) {
193225 // why double the disk size for loading?
@@ -205,26 +237,29 @@ GroupChunkTranslator::key() const {
205237}
206238
207239std::pair<size_t , size_t >
208- GroupChunkTranslator::get_file_and_row_group_index (
209- milvus::cachinglayer:: cid_t cid ) const {
240+ GroupChunkTranslator::get_file_and_row_group_offset (
241+ size_t global_row_group_idx ) const {
210242 for (size_t file_idx = 0 ; file_idx < file_row_group_prefix_sum_.size () - 1 ;
211243 ++file_idx) {
212- if (cid < file_row_group_prefix_sum_[file_idx + 1 ]) {
213- return {file_idx, cid - file_row_group_prefix_sum_[file_idx]};
244+ if (global_row_group_idx < file_row_group_prefix_sum_[file_idx + 1 ]) {
245+ return {
246+ file_idx,
247+ global_row_group_idx - file_row_group_prefix_sum_[file_idx]};
214248 }
215249 }
216250
217- AssertInfo (false ,
218- fmt::format (" [StorageV2] translator {} cid {} is out of range. "
219- " Total row groups across all files: {}" ,
220- key_,
221- cid,
222- file_row_group_prefix_sum_.back ()));
251+ AssertInfo (
252+ false ,
253+ fmt::format (" [StorageV2] translator {} global_row_group_idx {} is out "
254+ " of range. Total row groups across all files: {}" ,
255+ key_,
256+ global_row_group_idx,
257+ file_row_group_prefix_sum_.back ()));
223258}
224259
225260milvus::cachinglayer::cid_t
226- GroupChunkTranslator::get_cid_from_file_and_row_group_index (
227- size_t file_idx, size_t row_group_idx) const {
261+ GroupChunkTranslator::get_global_row_group_idx ( size_t file_idx,
262+ size_t row_group_idx) const {
228263 AssertInfo (file_idx < file_row_group_prefix_sum_.size () - 1 ,
229264 fmt::format (" [StorageV2] translator {} file_idx {} is out of "
230265 " range. Total files: {}" ,
@@ -254,12 +289,31 @@ GroupChunkTranslator::get_cells(const std::vector<cachinglayer::cid_t>& cids) {
254289 cells;
255290 cells.reserve (cids.size ());
256291
257- // Create row group lists for requested cids
258- std::vector<std::vector<int64_t >> row_group_lists (insert_files_.size ());
292+ std::vector<cachinglayer::cid_t > sorted_cids (cids.begin (), cids.end ());
293+ std::sort (sorted_cids.begin (), sorted_cids.end ());
294+
295+ // Collect all row group indices needed for the requested cells
296+ std::vector<size_t > needed_row_group_indices;
297+ needed_row_group_indices.reserve (kRowGroupsPerCell * sorted_cids.size ());
298+ for (auto cid : sorted_cids) {
299+ AssertInfo (cid < meta_.chunk_memory_size_ .size (),
300+ fmt::format (" [StorageV2] translator {} cid {} is out of "
301+ " range. Total cells: {}" ,
302+ key_,
303+ cid,
304+ meta_.chunk_memory_size_ .size ()));
305+ auto [start, end] = meta_.get_row_group_range (cid);
306+ for (size_t i = start; i < end; ++i) {
307+ needed_row_group_indices.push_back (i);
308+ }
309+ }
259310
260- for (auto cid : cids) {
261- auto [file_idx, row_group_idx] = get_file_and_row_group_index (cid);
262- row_group_lists[file_idx].push_back (row_group_idx);
311+ // Create row group lists for file loading
312+ std::vector<std::vector<int64_t >> row_group_lists (insert_files_.size ());
313+ // no need to sort the needed_row_group_indices because it is built from sorted_cids
314+ for (auto rg_idx : needed_row_group_indices) {
315+ auto [file_idx, row_group_off] = get_file_and_row_group_offset (rg_idx);
316+ row_group_lists[file_idx].push_back (row_group_off);
263317 }
264318
265319 auto parallel_degree =
@@ -288,57 +342,81 @@ GroupChunkTranslator::get_cells(const std::vector<cachinglayer::cid_t>& cids) {
288342 key_,
289343 column_group_info_.field_id );
290344
345+ // Collect loaded tables by row group index
346+ std::unordered_map<size_t , std::shared_ptr<arrow::Table>> row_group_tables;
347+ row_group_tables.reserve (needed_row_group_indices.size ());
348+
291349 std::shared_ptr<milvus::ArrowDataWrapper> r;
292- std::unordered_set<cachinglayer::cid_t > filled_cids;
293- filled_cids.reserve (cids.size ());
294350 while (channel->pop (r)) {
295351 for (const auto & table_info : r->arrow_tables ) {
296- // Convert file_index and row_group_index to global cid
297- auto cid = get_cid_from_file_and_row_group_index (
298- table_info.file_index , table_info.row_group_index );
299- cells.emplace_back (cid, load_group_chunk (table_info.table , cid));
300- filled_cids.insert (cid);
352+ // Convert file_index and row_group_index (file inner index, not global index) to global row group index
353+ auto rg_idx = get_global_row_group_idx (table_info.file_index ,
354+ table_info.row_group_index );
355+ row_group_tables[rg_idx] = table_info.table ;
301356 }
302357 }
303358
304359 // access underlying feature to get exception if any
305360 load_future.get ();
306361
307- // Verify all requested cids have been filled
308- for (auto cid : cids) {
309- AssertInfo (filled_cids.find (cid) != filled_cids.end (),
310- " [StorageV2] translator {} cid {} was not filled, missing "
311- " row group id {}" ,
312- key_,
313- cid,
314- cid);
362+ // Build cells from collected tables
363+ std::unordered_set<cachinglayer::cid_t > filled_cids;
364+ filled_cids.reserve (sorted_cids.size ());
365+ for (auto cid : sorted_cids) {
366+ if (filled_cids.count (cid) > 0 ) {
367+ continue ; // Already processed this cell (handles duplicates)
368+ }
369+
370+ auto [start, end] = meta_.get_row_group_range (cid);
371+ std::vector<std::shared_ptr<arrow::Table>> tables;
372+ tables.reserve (end - start);
373+
374+ for (size_t i = start; i < end; ++i) {
375+ auto it = row_group_tables.find (i);
376+ AssertInfo (it != row_group_tables.end (),
377+ fmt::format (" [StorageV2] translator {} row group {} "
378+ " for cell {} was not loaded" ,
379+ key_,
380+ i,
381+ cid));
382+ tables.push_back (it->second );
383+ }
384+
385+ cells.emplace_back (cid, load_group_chunk (tables, cid));
386+ filled_cids.insert (cid);
315387 }
388+
316389 return cells;
317390}
318391
319392std::unique_ptr<milvus::GroupChunk>
320393GroupChunkTranslator::load_group_chunk (
321- const std::shared_ptr<arrow::Table>& table ,
394+ const std::vector<std:: shared_ptr<arrow::Table>>& tables ,
322395 const milvus::cachinglayer::cid_t cid) {
323- AssertInfo (table != nullptr , " arrow table is nullptr" );
324- // Create chunks for each field in this batch
325- std::unordered_map<FieldId, std::shared_ptr<Chunk>> chunks;
326- // Iterate through field_id_list to get field_id and create chunk
396+ AssertInfo (!tables.empty (), " tables vector is empty" );
397+ for (const auto & table : tables) {
398+ AssertInfo (table != nullptr , " arrow table is nullptr" );
399+ }
400+
401+ // Use the first table's schema as reference for field iteration
402+ const auto & schema = tables[0 ]->schema ();
403+
404+ // Collect field info and merge array vectors from all tables
327405 std::vector<FieldId> field_ids;
328406 std::vector<FieldMeta> field_metas;
329407 std::vector<arrow::ArrayVector> array_vecs;
330- field_metas.reserve (table->schema ()->num_fields ());
331- array_vecs.reserve (table->schema ()->num_fields ());
408+ field_ids.reserve (schema->num_fields ());
409+ field_metas.reserve (schema->num_fields ());
410+ array_vecs.reserve (schema->num_fields ());
332411
333- for (int i = 0 ; i < table-> schema () ->num_fields (); ++i) {
334- AssertInfo (table-> schema () ->field (i)->metadata ()->Contains (
412+ for (int i = 0 ; i < schema->num_fields (); ++i) {
413+ AssertInfo (schema->field (i)->metadata ()->Contains (
335414 milvus_storage::ARROW_FIELD_ID_KEY),
336415 " [StorageV2] translator {} field id not found in metadata "
337416 " for field {}" ,
338417 key_,
339- table->schema ()->field (i)->name ());
340- auto field_id = std::stoll (table->schema ()
341- ->field (i)
418+ schema->field (i)->name ());
419+ auto field_id = std::stoll (schema->field (i)
342420 ->metadata ()
343421 ->Get (milvus_storage::ARROW_FIELD_ID_KEY)
344422 ->data ());
@@ -355,12 +433,22 @@ GroupChunkTranslator::load_group_chunk(
355433 key_,
356434 fid.get ());
357435 const auto & field_meta = it->second ;
358- const arrow::ArrayVector& array_vec = table->column (i)->chunks ();
436+
437+ // Merge array vectors from all tables for this field
438+ // All tables in a cell come from the same column group with consistent schema
439+ arrow::ArrayVector merged_array_vec;
440+ for (const auto & table : tables) {
441+ const arrow::ArrayVector& array_vec = table->column (i)->chunks ();
442+ merged_array_vec.insert (
443+ merged_array_vec.end (), array_vec.begin (), array_vec.end ());
444+ }
445+
359446 field_ids.push_back (fid);
360447 field_metas.push_back (field_meta);
361- array_vecs.push_back (array_vec );
448+ array_vecs.push_back (std::move (merged_array_vec) );
362449 }
363450
451+ std::unordered_map<FieldId, std::shared_ptr<Chunk>> chunks;
364452 if (!use_mmap_) {
365453 chunks = create_group_chunk (field_ids, field_metas, array_vecs);
366454 } else {
0 commit comments