Skip to content

Commit 03d7bf5

Browse files
committed
enhance: add translator for zero-copy aggregation of manifest translator to reduce io operations
Signed-off-by: Ted Xu <[email protected]>
1 parent 2ef18c5 commit 03d7bf5

File tree

6 files changed

+875
-34
lines changed

6 files changed

+875
-34
lines changed

internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@
6868
#include "segcore/storagev1translator/InterimSealedIndexTranslator.h"
6969
#include "segcore/storagev1translator/TextMatchIndexTranslator.h"
7070
#include "segcore/storagev2translator/ManifestGroupTranslator.h"
71+
#include "segcore/storagev2translator/AggregateTranslator.h"
7172
#include "storage/Util.h"
7273
#include "storage/ThreadPools.h"
7374
#include "storage/MmapManager.h"
@@ -401,11 +402,11 @@ ChunkedSegmentSealedImpl::LoadColumnGroup(
401402

402403
auto chunk_reader = std::move(chunk_reader_result).ValueOrDie();
403404

404-
LOG_INFO("[StorageV2] segment {} loads manifest cg index {}",
405+
LOG_INFO("[StorageV2] segment {} loads manifest cg index {} with aggregation factor 4",
405406
this->get_segment_id(),
406407
index);
407408

408-
auto translator =
409+
auto manifest_translator =
409410
std::make_unique<storagev2translator::ManifestGroupTranslator>(
410411
get_segment_id(),
411412
index,
@@ -414,6 +415,14 @@ ChunkedSegmentSealedImpl::LoadColumnGroup(
414415
false, // TODO
415416
column_group->columns.size(),
416417
segment_load_info_.priority());
418+
419+
// Wrap with AggregateTranslator for zero-copy aggregation
420+
auto translator =
421+
std::make_unique<storagev2translator::AggregateTranslator>(
422+
std::move(manifest_translator),
423+
4 // Default aggregation factor
424+
);
425+
417426
auto chunked_column_group =
418427
std::make_shared<ChunkedColumnGroup>(std::move(translator));
419428

Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,196 @@
1+
// Licensed to the LF AI & Data foundation under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing, software
12+
// distributed under the License is distributed on an "AS IS" BASIS,
13+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
// See the License for the specific language governing permissions and
15+
// limitations under the License.
16+
17+
#include "segcore/storagev2translator/AggregateTranslator.h"
18+
19+
#include <algorithm>
20+
#include <cmath>
21+
#include <memory>
22+
#include <unordered_set>
23+
24+
#include <arrow/api.h>
25+
#include <arrow/array.h>
26+
#include <fmt/format.h>
27+
28+
#include "common/Chunk.h"
29+
#include "common/Exception.h"
30+
#include "common/FieldData.h"
31+
#include "segcore/storagev2translator/ManifestGroupTranslator.h"
32+
33+
namespace milvus::segcore::storagev2translator {
34+
35+
AggregateTranslator::AggregateTranslator(
36+
std::unique_ptr<ManifestGroupTranslator> underlying_translator,
37+
size_t aggregation_factor)
38+
: underlying_translator_(std::move(underlying_translator)),
39+
aggregation_factor_(aggregation_factor) {
40+
AssertInfo(aggregation_factor_ > 1,
41+
"Aggregation factor must be greater than 1, got: {}",
42+
aggregation_factor_);
43+
AssertInfo(underlying_translator_ != nullptr,
44+
"Underlying translator cannot be null");
45+
46+
key_ = fmt::format(
47+
"agg_{}_{}", aggregation_factor_, underlying_translator_->key());
48+
}
49+
50+
size_t
51+
AggregateTranslator::num_cells() const {
52+
size_t underlying_num_cells = underlying_translator_->num_cells();
53+
return (underlying_num_cells + aggregation_factor_ - 1) /
54+
aggregation_factor_;
55+
}
56+
57+
milvus::cachinglayer::cid_t
58+
AggregateTranslator::cell_id_of(milvus::cachinglayer::uid_t uid) const {
59+
// First map through the underlying translator
60+
auto underlying_cid = underlying_translator_->cell_id_of(uid);
61+
// Then aggregate by dividing by the factor
62+
return underlying_cid / aggregation_factor_;
63+
}
64+
65+
std::pair<milvus::cachinglayer::ResourceUsage,
66+
milvus::cachinglayer::ResourceUsage>
67+
AggregateTranslator::estimated_byte_size_of_cell(
68+
milvus::cachinglayer::cid_t cid) const {
69+
auto underlying_cids = get_underlying_cell_ids(cid);
70+
71+
milvus::cachinglayer::ResourceUsage total_loading_usage = {0, 0};
72+
milvus::cachinglayer::ResourceUsage total_loaded_usage = {0, 0};
73+
74+
for (auto underlying_cid : underlying_cids) {
75+
auto [loading_usage, loaded_usage] =
76+
underlying_translator_->estimated_byte_size_of_cell(underlying_cid);
77+
total_loading_usage += loading_usage;
78+
total_loaded_usage += loaded_usage;
79+
}
80+
81+
return {total_loading_usage, total_loaded_usage};
82+
}
83+
84+
const std::string&
85+
AggregateTranslator::key() const {
86+
return key_;
87+
}
88+
89+
std::vector<
90+
std::pair<milvus::cachinglayer::cid_t, std::unique_ptr<milvus::GroupChunk>>>
91+
AggregateTranslator::get_cells(
92+
const std::vector<milvus::cachinglayer::cid_t>& cids) {
93+
// Collect all underlying cell IDs we need to fetch
94+
std::vector<milvus::cachinglayer::cid_t> all_underlying_cids;
95+
std::unordered_map<milvus::cachinglayer::cid_t,
96+
std::vector<milvus::cachinglayer::cid_t>>
97+
aggregate_to_underlying_map;
98+
99+
for (auto aggregate_cid : cids) {
100+
auto underlying_cids = get_underlying_cell_ids(aggregate_cid);
101+
aggregate_to_underlying_map[aggregate_cid] = underlying_cids;
102+
all_underlying_cids.insert(all_underlying_cids.end(),
103+
underlying_cids.begin(),
104+
underlying_cids.end());
105+
}
106+
107+
// Read all RecordBatches in one batch (zero-copy read phase)
108+
auto record_batches =
109+
underlying_translator_->read_cells(all_underlying_cids);
110+
111+
// Create a map for quick lookup of RecordBatches
112+
std::unordered_map<milvus::cachinglayer::cid_t,
113+
std::shared_ptr<arrow::RecordBatch>>
114+
record_batch_map;
115+
for (auto& [cid, batch] : record_batches) {
116+
record_batch_map[cid] = batch;
117+
}
118+
119+
// Build the result by loading aggregate cells from RecordBatches
120+
std::vector<std::pair<milvus::cachinglayer::cid_t,
121+
std::unique_ptr<milvus::GroupChunk>>>
122+
result;
123+
result.reserve(cids.size());
124+
125+
for (auto aggregate_cid : cids) {
126+
const auto& underlying_cids =
127+
aggregate_to_underlying_map[aggregate_cid];
128+
129+
// Collect the RecordBatches for this aggregate cell
130+
std::vector<std::shared_ptr<arrow::RecordBatch>> batches_to_load;
131+
batches_to_load.reserve(underlying_cids.size());
132+
133+
for (auto underlying_cid : underlying_cids) {
134+
auto it = record_batch_map.find(underlying_cid);
135+
if (it != record_batch_map.end() && it->second != nullptr) {
136+
batches_to_load.push_back(it->second);
137+
}
138+
}
139+
140+
if (batches_to_load.empty()) {
141+
ThrowInfo(ErrorCode::UnexpectedError,
142+
"No underlying batches found for aggregate cell {}",
143+
aggregate_cid);
144+
}
145+
146+
// Load the aggregate cell from multiple batches (zero-copy load phase)
147+
// The concatenation happens at Arrow array level inside load_group_chunk
148+
auto merged_chunk = underlying_translator_->load_group_chunk(
149+
batches_to_load, aggregate_cid);
150+
result.emplace_back(aggregate_cid, std::move(merged_chunk));
151+
}
152+
153+
return result;
154+
}
155+
156+
int64_t
157+
AggregateTranslator::cells_storage_bytes(
158+
const std::vector<milvus::cachinglayer::cid_t>& cids) const {
159+
std::vector<milvus::cachinglayer::cid_t> all_underlying_cids;
160+
161+
for (auto aggregate_cid : cids) {
162+
auto underlying_cids = get_underlying_cell_ids(aggregate_cid);
163+
all_underlying_cids.insert(all_underlying_cids.end(),
164+
underlying_cids.begin(),
165+
underlying_cids.end());
166+
}
167+
168+
return underlying_translator_->cells_storage_bytes(all_underlying_cids);
169+
}
170+
171+
std::vector<milvus::cachinglayer::cid_t>
172+
AggregateTranslator::get_underlying_cell_ids(
173+
milvus::cachinglayer::cid_t aggregate_cid) const {
174+
size_t underlying_num_cells = underlying_translator_->num_cells();
175+
size_t start_cid = aggregate_cid * aggregation_factor_;
176+
size_t end_cid =
177+
std::min(start_cid + aggregation_factor_, underlying_num_cells);
178+
179+
AssertInfo(start_cid < underlying_num_cells,
180+
"Aggregate cell ID {} out of range (start_cid={} >= "
181+
"underlying_num_cells={})",
182+
aggregate_cid,
183+
start_cid,
184+
underlying_num_cells);
185+
186+
std::vector<milvus::cachinglayer::cid_t> result;
187+
result.reserve(end_cid - start_cid);
188+
189+
for (size_t i = start_cid; i < end_cid; ++i) {
190+
result.push_back(i);
191+
}
192+
193+
return result;
194+
}
195+
196+
} // namespace milvus::segcore::storagev2translator
Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
// Licensed to the LF AI & Data foundation under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing, software
12+
// distributed under the License is distributed on an "AS IS" BASIS,
13+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
// See the License for the specific language governing permissions and
15+
// limitations under the License.
16+
#pragma once
17+
18+
#include <memory>
19+
#include <string>
20+
#include <vector>
21+
22+
#include "cachinglayer/Translator.h"
23+
#include "common/GroupChunk.h"
24+
#include "segcore/storagev2translator/ManifestGroupTranslator.h"
25+
26+
namespace milvus::segcore::storagev2translator {
27+
28+
/**
29+
* @brief Aggregating proxy translator that merges consecutive cells into coarse-grain cells
30+
*
31+
* This class implements a proxy pattern that wraps a ManifestGroupTranslator and aggregates
32+
* consecutive cells into larger cells using zero-copy optimization. For example, with an
33+
* aggregation factor of 2, every 2 consecutive row groups from the underlying translator
34+
* are merged into a single cell in this translator.
35+
*
36+
* This is useful for:
37+
* - Reducing cache slot overhead by having fewer, larger cells
38+
* - Batching multiple small chunks into larger units for more efficient processing
39+
* - Adjusting granularity of data access patterns
40+
*
41+
* Example:
42+
* If the underlying translator has 10 cells (row groups 0-9) and aggregation_factor=2:
43+
* - Aggregate cell 0 contains underlying cells 0,1
44+
* - Aggregate cell 1 contains underlying cells 2,3
45+
* - Aggregate cell 2 contains underlying cells 4,5
46+
* - Aggregate cell 3 contains underlying cells 6,7
47+
* - Aggregate cell 4 contains underlying cells 8,9
48+
* - Total aggregate cells: ceil(10/2) = 5
49+
*
50+
* Note: This translator specifically requires ManifestGroupTranslator as the underlying
51+
* translator to enable zero-copy aggregation through the read_cells/load_group_chunk pattern.
52+
*/
53+
class AggregateTranslator
54+
: public milvus::cachinglayer::Translator<milvus::GroupChunk> {
55+
public:
56+
/**
57+
* @brief Construct an aggregating translator
58+
*
59+
* @param underlying_translator The ManifestGroupTranslator to wrap and aggregate
60+
* @param aggregation_factor Number of consecutive underlying cells to merge into one aggregate cell.
61+
* Must be > 1. Use the underlying translator directly if no aggregation is needed.
62+
*/
63+
explicit AggregateTranslator(
64+
std::unique_ptr<ManifestGroupTranslator> underlying_translator,
65+
size_t aggregation_factor);
66+
67+
~AggregateTranslator() override = default;
68+
69+
/**
70+
* @brief Get the total number of aggregate cells
71+
*
72+
* Returns ceil(underlying_num_cells / aggregation_factor)
73+
*
74+
* @return Number of aggregate cells
75+
*/
76+
size_t
77+
num_cells() const override;
78+
79+
/**
80+
* @brief Map a unit ID to its corresponding aggregate cell ID
81+
*
82+
* The mapping divides the unit ID by the aggregation factor.
83+
*
84+
* @param uid Unit ID to map
85+
* @return Corresponding aggregate cell ID
86+
*/
87+
milvus::cachinglayer::cid_t
88+
cell_id_of(milvus::cachinglayer::uid_t uid) const override;
89+
90+
/**
91+
* @brief Estimate memory and disk usage for an aggregate cell
92+
*
93+
* Sums the resource usage of all underlying cells that comprise this aggregate cell.
94+
*
95+
* @param cid Aggregate cell ID to estimate
96+
* @return Pair of (memory_usage, disk_usage) for loading and storage
97+
*/
98+
std::pair<milvus::cachinglayer::ResourceUsage,
99+
milvus::cachinglayer::ResourceUsage>
100+
estimated_byte_size_of_cell(milvus::cachinglayer::cid_t cid) const override;
101+
102+
/**
103+
* @brief Get the cache key for this translator
104+
*
105+
* Returns a key that includes the aggregation factor and underlying key
106+
*
107+
* @return Cache key in format "agg_{factor}_{underlying_key}"
108+
*/
109+
const std::string&
110+
key() const override;
111+
112+
/**
113+
* @brief Load specified aggregate cells
114+
*
115+
* For each aggregate cell ID, loads the corresponding underlying cells,
116+
* merges their GroupChunks together, and returns the merged result.
117+
*
118+
* @param cids List of aggregate cell IDs to load
119+
* @return Vector of (aggregate_cell_id, merged_GroupChunk) pairs
120+
*/
121+
std::vector<std::pair<milvus::cachinglayer::cid_t,
122+
std::unique_ptr<milvus::GroupChunk>>>
123+
get_cells(const std::vector<milvus::cachinglayer::cid_t>& cids) override;
124+
125+
/**
126+
* @brief Get the metadata object from the underlying translator
127+
*
128+
* @return Pointer to the underlying translator's metadata
129+
*/
130+
milvus::cachinglayer::Meta*
131+
meta() override {
132+
return underlying_translator_->meta();
133+
}
134+
135+
/**
136+
* @brief Calculate total storage bytes needed for loading aggregate cells
137+
*
138+
* Sums up the storage size for all underlying cells that comprise the requested aggregate cells.
139+
*
140+
* @param cids List of aggregate cell IDs
141+
* @return Total storage bytes required
142+
*/
143+
int64_t
144+
cells_storage_bytes(
145+
const std::vector<milvus::cachinglayer::cid_t>& cids) const override;
146+
147+
private:
148+
/**
149+
* @brief Get the underlying cell IDs that comprise an aggregate cell
150+
*
151+
* For aggregate cell cid, returns the range of underlying cell IDs:
152+
* [cid * aggregation_factor, min((cid + 1) * aggregation_factor, underlying_num_cells))
153+
*
154+
* @param aggregate_cid The aggregate cell ID
155+
* @return Vector of underlying cell IDs
156+
*/
157+
std::vector<milvus::cachinglayer::cid_t>
158+
get_underlying_cell_ids(milvus::cachinglayer::cid_t aggregate_cid) const;
159+
160+
std::unique_ptr<ManifestGroupTranslator> underlying_translator_;
161+
size_t aggregation_factor_;
162+
std::string key_;
163+
};
164+
165+
} // namespace milvus::segcore::storagev2translator

0 commit comments

Comments
 (0)