Skip to content

Commit 4e183f9

Browse files
committed
more than POc
Signed-off-by: Vladislav Oleshko <[email protected]>
1 parent 3f683e2 commit 4e183f9

File tree

7 files changed

+117
-42
lines changed

7 files changed

+117
-42
lines changed

src/core/compact_object.cc

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -882,13 +882,10 @@ CompactObjType CompactObj::ObjType() const {
882882
return OBJ_STRING;
883883

884884
if (taglen_ == EXTERNAL_TAG) {
885-
VLOG(0) << "My type is external";
886885
switch (static_cast<ExternalRep>(u_.ext_ptr.representation)) {
887886
case ExternalRep::STRING:
888-
VLOG(0) << "My type is a string";
889887
return OBJ_STRING;
890888
case ExternalRep::SERIALIZED_MAP:
891-
VLOG(0) << "Mype is a hash map";
892889
return OBJ_HASH;
893890
};
894891
}

src/core/detail/listpack_wrap.cc

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,16 +33,15 @@ void ListpackWrap::Iterator::Read() {
3333
next_ptr_ = lpNext(lp_, next_ptr_);
3434
}
3535

36-
ListpackWrap::~ListpackWrap() {
37-
DCHECK(!dirty_);
36+
ListpackWrap ListpackWrap::WithCapacity(size_t capacity) {
37+
return ListpackWrap{lpNew(capacity)};
3838
}
3939

4040
ListpackWrap ListpackWrap::WithCapacity(size_t capacity) {
4141
return ListpackWrap{lpNew(capacity)};
4242
}
4343

4444
uint8_t* ListpackWrap::GetPointer() {
45-
dirty_ = false;
4645
return lp_;
4746
}
4847

@@ -63,7 +62,6 @@ bool ListpackWrap::Delete(std::string_view key) {
6362
return false;
6463

6564
lp_ = lpDeleteRangeWithEntry(lp_, &ptr, 2);
66-
dirty_ = true;
6765
return true;
6866
}
6967

@@ -90,7 +88,6 @@ bool ListpackWrap::Insert(std::string_view key, std::string_view value, bool ski
9088
lp_ = lpReplace(lp_, &vptr, vsrc, value.size());
9189
DCHECK_EQ(0u, lpLength(lp_) % 2);
9290

93-
dirty_ = true;
9491
updated = true;
9592
}
9693
}
@@ -100,7 +97,6 @@ bool ListpackWrap::Insert(std::string_view key, std::string_view value, bool ski
10097
// TODO: we should at least allocate once for both elements
10198
lp_ = lpAppend(lp_, fsrc, key.size());
10299
lp_ = lpAppend(lp_, vsrc, value.size());
103-
dirty_ = true;
104100
}
105101

106102
return !updated;

src/core/detail/listpack_wrap.h

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,6 @@ struct ListpackWrap {
1515
using IntBuf = uint8_t[2][24];
1616

1717
public:
18-
~ListpackWrap();
19-
2018
struct Iterator {
2119
using iterator_category = std::forward_iterator_tag;
2220
using difference_type = std::ptrdiff_t;
@@ -66,7 +64,6 @@ struct ListpackWrap {
6664
private:
6765
uint8_t* lp_; // the listpack itself
6866
mutable IntBuf intbuf_; // buffer for integers decoded to strings
69-
bool dirty_ = false; // whether lp_ was updated, but never retrieved with GetPointer
7067
};
7168

7269
} // namespace dfly::detail

src/server/hset_family.cc

Lines changed: 73 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ struct HMapWrap {
7575
}
7676

7777
public:
78+
// Create from non-external prime value
7879
HMapWrap(const PrimeValue& pv, DbContext db_cntx) {
7980
DCHECK(!pv.IsExternal() || pv.IsCool());
8081
if (pv.Encoding() == kEncodingListPack)
@@ -83,6 +84,9 @@ struct HMapWrap {
8384
impl_ = GetStringMap(pv, db_cntx);
8485
}
8586

87+
explicit HMapWrap(detail::ListpackWrap lw) : impl_{std::move(lw)} {
88+
}
89+
8690
explicit HMapWrap(tiering::SerializedMap* sm) : impl_{sm} {
8791
}
8892

@@ -193,7 +197,12 @@ OpResult<T> ExecuteRO(Transaction* tx, F&& f) {
193197
using D = tiering::SerializedMapDecoder;
194198
util::fb2::Future<OpResult<T>> fut;
195199
auto read_cb = [fut, f = std::move(f)](io::Result<D*> res) mutable {
196-
HMapWrap hw{res.value()->Get()};
200+
// Create wrapper from different types
201+
Overloaded ov{
202+
[](tiering::SerializedMap* sm) { return HMapWrap{sm}; },
203+
[](detail::ListpackWrap* lw) { return HMapWrap{*lw}; },
204+
};
205+
auto hw = visit(ov, res.value()->Read());
197206
fut.Resolve(f(hw));
198207
};
199208

@@ -216,15 +225,34 @@ OpResult<T> ExecuteRO(Transaction* tx, F&& f) {
216225
}
217226

218227
// Wrap write handler
219-
template <typename F> auto WrapW(F&& f) {
220-
using RT = std::invoke_result_t<F, HMapWrap&>;
221-
return [f = std::forward<F>(f)](Transaction* t, EngineShard* es) -> RT {
228+
template <typename F> auto ExecuteW(Transaction* tx, F&& f) {
229+
using T = typename std::invoke_result_t<F, HMapWrap&>::Type;
230+
auto shard_cb = [f = std::forward<F>(f)](Transaction* t,
231+
EngineShard* es) -> OpResult<CbVariant<T>> {
232+
// Fetch value of hash type
222233
auto [key, op_args] = KeyAndArgs(t, es);
223234

224235
auto it_res = op_args.GetDbSlice().FindMutable(op_args.db_cntx, key, OBJ_HASH);
225236
RETURN_ON_BAD_STATUS(it_res);
226237
auto& pv = it_res->it->second;
227238

239+
// Enqueue read for future values
240+
if (pv.IsExternal() && !pv.IsCool()) {
241+
using D = tiering::SerializedMapDecoder;
242+
util::fb2::Future<OpResult<T>> fut;
243+
auto read_cb = [fut, f = std::move(f)](io::Result<D*> res) mutable {
244+
// Create wrapper from different types
245+
HMapWrap hw{*res.value()->Write()};
246+
fut.Resolve(f(hw));
247+
248+
// soak listpack wrapper back to get updated value
249+
*res.value()->Write() = *hw.Get<detail::ListpackWrap>();
250+
};
251+
252+
es->tiered_storage()->Read(op_args.db_cntx.db_index, key, pv, D{}, std::move(read_cb));
253+
return CbVariant<T>{std::move(fut)};
254+
}
255+
228256
// Remove document before modification
229257
op_args.shard->search_indices()->RemoveDoc(key, op_args.db_cntx, pv);
230258

@@ -240,8 +268,11 @@ template <typename F> auto WrapW(F&& f) {
240268
else
241269
op_args.shard->search_indices()->AddDoc(key, op_args.db_cntx, pv);
242270

243-
return res;
271+
RETURN_ON_BAD_STATUS(res);
272+
return CbVariant<T>{std::move(res).value()};
244273
};
274+
275+
return Unwrap(tx->ScheduleSingleHopT(std::move(shard_cb)));
245276
}
246277

247278
size_t EstimateListpackMinBytes(CmdArgList members) {
@@ -391,28 +422,27 @@ OpResult<vector<OptStr>> OpHMGet(const HMapWrap& hw, CmdArgList fields) {
391422
DCHECK(!fields.empty());
392423

393424
std::vector<OptStr> result(fields.size());
394-
if (auto lw = hw.Get<detail::ListpackWrap>(); lw) {
425+
if (auto sm = hw.Get<StringMap*>(); sm) {
426+
for (size_t i = 0; i < fields.size(); ++i) {
427+
if (auto it = (*sm)->Find(fields[i]); it != (*sm)->end()) {
428+
result[i].emplace(it->second, sdslen(it->second));
429+
}
430+
}
431+
} else {
395432
absl::flat_hash_map<string_view, absl::InlinedVector<size_t, 3>> reverse;
396433
reverse.reserve(fields.size() + 1);
397434
for (size_t i = 0; i < fields.size(); ++i) {
398435
reverse[ArgS(fields, i)].push_back(i); // map fields to their index.
399436
}
400437

401-
for (const auto [key, value] : *lw) {
438+
for (const auto [key, value] : hw.Range()) {
402439
if (auto it = reverse.find(key); it != reverse.end()) {
403440
for (size_t index : it->second) {
404441
DCHECK_LT(index, result.size());
405442
result[index].emplace(value);
406443
}
407444
}
408445
}
409-
} else {
410-
StringMap* sm = *hw.Get<StringMap*>();
411-
for (size_t i = 0; i < fields.size(); ++i) {
412-
if (auto it = sm->Find(fields[i]); it != sm->end()) {
413-
result[i].emplace(it->second, sdslen(it->second));
414-
}
415-
}
416446
}
417447

418448
return result;
@@ -424,8 +454,8 @@ struct OpSetParams {
424454
bool keepttl = false;
425455
};
426456

427-
OpResult<uint32_t> OpSet(const OpArgs& op_args, string_view key, CmdArgList values,
428-
const OpSetParams& op_sp = OpSetParams{}) {
457+
OpResult<CbVariant<uint32_t>> OpSet(const OpArgs& op_args, string_view key, CmdArgList values,
458+
const OpSetParams& op_sp = OpSetParams{}) {
429459
DCHECK(!values.empty() && 0 == values.size() % 2);
430460
VLOG(2) << "OpSet(" << key << ")";
431461

@@ -438,6 +468,26 @@ OpResult<uint32_t> OpSet(const OpArgs& op_args, string_view key, CmdArgList valu
438468
auto& it = add_res.it;
439469
PrimeValue& pv = it->second;
440470

471+
// If the value is external, enqueue read and modify it there
472+
if (pv.IsExternal() && !pv.IsCool()) {
473+
CHECK(op_sp.ttl == UINT32_MAX); // TODO: remove
474+
using D = tiering::SerializedMapDecoder;
475+
util::fb2::Future<OpResult<uint32_t>> fut;
476+
auto read_cb = [fut, values, &op_sp](io::Result<D*> res) mutable {
477+
// Create wrapper from different types
478+
auto& lw = *res.value()->Write();
479+
uint32_t created = 0;
480+
for (size_t i = 0; i < values.size(); i += 2) {
481+
created += lw.Insert(values[i], values[i + 1], op_sp.skip_if_exists);
482+
}
483+
fut.Resolve(created);
484+
};
485+
486+
op_args.shard->tiered_storage()->Read(op_args.db_cntx.db_index, key, pv, D{},
487+
std::move(read_cb));
488+
return CbVariant<uint32_t>{std::move(fut)};
489+
}
490+
441491
if (add_res.is_new) {
442492
if (op_sp.ttl == UINT32_MAX) {
443493
lp = lpNew(0);
@@ -495,7 +545,7 @@ OpResult<uint32_t> OpSet(const OpArgs& op_args, string_view key, CmdArgList valu
495545
if (auto* ts = op_args.shard->tiered_storage(); ts)
496546
ts->TryStash(op_args.db_cntx.db_index, key, &pv);
497547

498-
return created;
548+
return CbVariant<uint32_t>{created};
499549
}
500550

501551
void HGetGeneric(CmdArgList args, uint8_t getall_mask, Transaction* tx, SinkReplyBuilder* builder) {
@@ -587,7 +637,8 @@ void HSetEx(CmdArgList args, const CommandContext& cmd_cntx) {
587637
return OpSet(t->GetOpArgs(shard), key, fields, op_sp);
588638
};
589639

590-
OpResult<uint32_t> result = cmd_cntx.tx->ScheduleSingleHopT(std::move(cb));
640+
auto delayed_result = cmd_cntx.tx->ScheduleSingleHopT(std::move(cb));
641+
OpResult<uint32_t> result = Unwrap(std::move(delayed_result));
591642
if (result) {
592643
cmd_cntx.rb->SendLong(*result);
593644
} else {
@@ -618,7 +669,7 @@ void HSetFamily::HDel(CmdArgList args, const CommandContext& cmd_cntx) {
618669
deleted += hw.Erase(s);
619670
return deleted;
620671
};
621-
HSetReplies{cmd_cntx.rb}.Send(cmd_cntx.tx->ScheduleSingleHopT(WrapW(cb)));
672+
HSetReplies{cmd_cntx.rb}.Send(ExecuteW(cmd_cntx.tx, std::move(cb)));
622673
}
623674

624675
void HSetFamily::HExpire(CmdArgList args, const CommandContext& cmd_cntx) {
@@ -861,7 +912,8 @@ void HSetFamily::HSet(CmdArgList args, const CommandContext& cmd_cntx) {
861912
return OpSet(t->GetOpArgs(shard), key, args);
862913
};
863914

864-
OpResult<uint32_t> result = cmd_cntx.tx->ScheduleSingleHopT(std::move(cb));
915+
auto delayed_result = cmd_cntx.tx->ScheduleSingleHopT(std::move(cb));
916+
OpResult<uint32_t> result = Unwrap(std::move(delayed_result));
865917

866918
if (result && cmd == "HSET") {
867919
cmd_cntx.rb->SendLong(*result);
@@ -876,7 +928,7 @@ void HSetFamily::HSetNx(CmdArgList args, const CommandContext& cmd_cntx) {
876928
auto cb = [&](Transaction* t, EngineShard* shard) {
877929
return OpSet(t->GetOpArgs(shard), key, args.subspan(1), OpSetParams{.skip_if_exists = true});
878930
};
879-
HSetReplies{cmd_cntx.rb}.Send(cmd_cntx.tx->ScheduleSingleHopT(cb));
931+
HSetReplies{cmd_cntx.rb}.Send(Unwrap(cmd_cntx.tx->ScheduleSingleHopT(cb)));
880932
}
881933

882934
void StrVecEmplaceBack(StringVec& str_vec, const listpackEntry& lp) {

src/server/tiered_storage.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,6 @@ class TieredStorage::ShardOpManager : public tiering::OpManager {
196196
// Set value to be an in-memory type again. Update memory stats.
197197
void Upload(DbIndex dbid, string_view value, PrimeValue* pv) {
198198
DCHECK(!value.empty());
199-
200199
switch (pv->GetExternalRep()) {
201200
case CompactObj::ExternalRep::STRING:
202201
pv->Materialize(value, true);
@@ -322,6 +321,7 @@ bool TieredStorage::ShardOpManager::NotifyFetched(EntryId id, tiering::DiskSegme
322321
auto key = get<OpManager::KeyRef>(id);
323322
auto* pv = Find(key);
324323
if (pv && pv->IsExternal() && segment == pv->GetExternalSlice()) {
324+
VLOG(0) << "Touched? " << pv->WasTouched();
325325
if (metrics.modified || pv->WasTouched()) {
326326
++stats_.total_uploads;
327327
decoder->Upload(pv);

src/server/tiering/decoders.cc

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,11 @@
44

55
#include "server/tiering/decoders.h"
66

7+
<<<<<<< HEAD
78
#include "base/logging.h"
9+
=======
10+
#include "core/compact_object.h"
11+
>>>>>>> fbcc8d59 (more than POc)
812
#include "core/detail/listpack_wrap.h"
913
#include "server/tiering/serialized_map.h"
1014

@@ -78,19 +82,40 @@ void SerializedMapDecoder::Initialize(std::string_view slice) {
7882
}
7983

8084
Decoder::UploadMetrics SerializedMapDecoder::GetMetrics() const {
81-
return UploadMetrics{.modified = false,
85+
return UploadMetrics{.modified = modified_,
8286
.estimated_mem_usage = map_->DataBytes() + map_->size() * 2 * 8};
8387
}
8488

8589
void SerializedMapDecoder::Upload(CompactObj* obj) {
90+
if (std::holds_alternative<std::unique_ptr<SerializedMap>>(map_))
91+
MakeOwned();
92+
93+
obj->InitRobj(OBJ_HASH, kEncodingListPack, Write()->GetPointer());
94+
}
95+
96+
std::variant<SerializedMap*, detail::ListpackWrap*> SerializedMapDecoder::Read() const {
97+
using RT = std::variant<SerializedMap*, detail::ListpackWrap*>;
98+
return std::visit([](auto& ptr) -> RT { return ptr.get(); }, map_);
99+
}
100+
101+
detail::ListpackWrap* SerializedMapDecoder::Write() {
102+
if (std::holds_alternative<std::unique_ptr<detail::ListpackWrap>>(map_))
103+
return std::get<std::unique_ptr<detail::ListpackWrap>>(map_).get();
104+
105+
// Convert SerializedMap to listpack
106+
MakeOwned();
107+
modified_ = true;
108+
return Write();
109+
}
110+
111+
void SerializedMapDecoder::MakeOwned() {
112+
auto& map = std::get<std::unique_ptr<SerializedMap>>(map_);
113+
86114
auto lw = detail::ListpackWrap::WithCapacity(GetMetrics().estimated_mem_usage);
87-
for (const auto& [key, value] : *map_)
115+
for (const auto& [key, value] : *map)
88116
lw.Insert(key, value, true);
89-
obj->InitRobj(OBJ_HASH, kEncodingListPack, lw.GetPointer());
90-
}
91117

92-
SerializedMap* SerializedMapDecoder::Get() const {
93-
return map_.get();
118+
map_ = std::make_unique<detail::ListpackWrap>(lw);
94119
}
95120

96121
} // namespace dfly::tiering

src/server/tiering/decoders.h

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,10 @@
1212
#include "core/compact_object.h"
1313
#include "core/string_or_view.h"
1414

15+
namespace dfly::detail {
16+
struct ListpackWrap;
17+
}
18+
1519
namespace dfly::tiering {
1620

1721
struct SerializedMap;
@@ -77,10 +81,14 @@ struct SerializedMapDecoder : public Decoder {
7781
UploadMetrics GetMetrics() const override;
7882
void Upload(CompactObj* obj) override;
7983

80-
SerializedMap* Get() const;
84+
std::variant<SerializedMap*, dfly::detail::ListpackWrap*> Read() const;
85+
dfly::detail::ListpackWrap* Write();
8186

8287
private:
83-
std::unique_ptr<SerializedMap> map_;
88+
void MakeOwned(); // Convert to listpack
89+
90+
bool modified_;
91+
std::variant<std::unique_ptr<SerializedMap>, std::unique_ptr<dfly::detail::ListpackWrap>> map_;
8492
};
8593

8694
} // namespace dfly::tiering

0 commit comments

Comments
 (0)