diff --git a/src/core/detail/listpack_wrap.cc b/src/core/detail/listpack_wrap.cc index 091a091a77cc..7d86c8d73c1d 100644 --- a/src/core/detail/listpack_wrap.cc +++ b/src/core/detail/listpack_wrap.cc @@ -33,16 +33,11 @@ void ListpackWrap::Iterator::Read() { next_ptr_ = lpNext(lp_, next_ptr_); } -ListpackWrap::~ListpackWrap() { - DCHECK(!dirty_); -} - ListpackWrap ListpackWrap::WithCapacity(size_t capacity) { return ListpackWrap{lpNew(capacity)}; } uint8_t* ListpackWrap::GetPointer() { - dirty_ = false; return lp_; } @@ -63,7 +58,6 @@ bool ListpackWrap::Delete(std::string_view key) { return false; lp_ = lpDeleteRangeWithEntry(lp_, &ptr, 2); - dirty_ = true; return true; } @@ -90,7 +84,6 @@ bool ListpackWrap::Insert(std::string_view key, std::string_view value, bool ski lp_ = lpReplace(lp_, &vptr, vsrc, value.size()); DCHECK_EQ(0u, lpLength(lp_) % 2); - dirty_ = true; updated = true; } } @@ -100,7 +93,6 @@ bool ListpackWrap::Insert(std::string_view key, std::string_view value, bool ski // TODO: we should at least allocate once for both elements lp_ = lpAppend(lp_, fsrc, key.size()); lp_ = lpAppend(lp_, vsrc, value.size()); - dirty_ = true; } return !updated; @@ -110,6 +102,10 @@ size_t ListpackWrap::size() const { return lpLength(lp_) / 2; } +size_t ListpackWrap::DataBytes() const { + return lpBytes(lp_); +} + ListpackWrap::Iterator ListpackWrap::begin() const { return Iterator{lp_, lpFirst(lp_), intbuf_}; } diff --git a/src/core/detail/listpack_wrap.h b/src/core/detail/listpack_wrap.h index 06e035061209..80f5e9484b1b 100644 --- a/src/core/detail/listpack_wrap.h +++ b/src/core/detail/listpack_wrap.h @@ -15,8 +15,6 @@ struct ListpackWrap { using IntBuf = uint8_t[2][24]; public: - ~ListpackWrap(); - struct Iterator { using iterator_category = std::forward_iterator_tag; using difference_type = std::ptrdiff_t; @@ -60,13 +58,14 @@ struct ListpackWrap { Iterator end() const; size_t size() const; // number of entries + size_t DataBytes() const; + // Get view from raw listpack iterator static std::string_view GetView(uint8_t* lp_it, uint8_t int_buf[]); private: uint8_t* lp_; // the listpack itself mutable IntBuf intbuf_; // buffer for integers decoded to strings - bool dirty_ = false; // whether lp_ was updated, but never retrieved with GetPointer }; } // namespace dfly::detail diff --git a/src/server/hset_family.cc b/src/server/hset_family.cc index 7dca43b84d51..3d7ef0a7eccd 100644 --- a/src/server/hset_family.cc +++ b/src/server/hset_family.cc @@ -75,6 +75,7 @@ struct HMapWrap { } public: + // Create from non-external prime value HMapWrap(const PrimeValue& pv, DbContext db_cntx) { DCHECK(!pv.IsExternal() || pv.IsCool()); if (pv.Encoding() == kEncodingListPack) @@ -83,6 +84,9 @@ struct HMapWrap { impl_ = GetStringMap(pv, db_cntx); } + explicit HMapWrap(detail::ListpackWrap lw) : impl_{std::move(lw)} { + } + explicit HMapWrap(tiering::SerializedMap* sm) : impl_{sm} { } @@ -134,6 +138,14 @@ struct HMapWrap { VisitMut(ov); } + void Launder(tiering::SerializedMapDecoder* dec) { + Overloaded ov{ + [](StringMap* s) {}, + [&](detail::ListpackWrap& lw) { *dec->Write() = lw; }, + }; + VisitMut(ov); + } + template optional Get() const { if (holds_alternative(impl_)) return get(impl_); @@ -193,7 +205,12 @@ OpResult ExecuteRO(Transaction* tx, F&& f) { using D = tiering::SerializedMapDecoder; util::fb2::Future> fut; auto read_cb = [fut, f = std::move(f)](io::Result res) mutable { - HMapWrap hw{res.value()->Get()}; + // Create wrapper from different types + Overloaded ov{ + [](tiering::SerializedMap* sm) { return HMapWrap{sm}; }, + [](detail::ListpackWrap* lw) { return HMapWrap{*lw}; }, + }; + auto hw = visit(ov, res.value()->Read()); fut.Resolve(f(hw)); }; @@ -216,15 +233,32 @@ OpResult ExecuteRO(Transaction* tx, F&& f) { } // Wrap write handler -template auto WrapW(F&& f) { - using RT = std::invoke_result_t; - return [f = std::forward(f)](Transaction* t, EngineShard* es) -> RT { +template auto ExecuteW(Transaction* tx, F&& f) { + using T = typename std::invoke_result_t::Type; + auto shard_cb = [f = std::forward(f)](Transaction* t, + EngineShard* es) -> OpResult> { + // Fetch value of hash type auto [key, op_args] = KeyAndArgs(t, es); auto it_res = op_args.GetDbSlice().FindMutable(op_args.db_cntx, key, OBJ_HASH); RETURN_ON_BAD_STATUS(it_res); auto& pv = it_res->it->second; + // Enqueue read for future values + if (pv.IsExternal() && !pv.IsCool()) { + using D = tiering::SerializedMapDecoder; + util::fb2::Future> fut; + auto read_cb = [fut, f = std::move(f)](io::Result res) mutable { + // Create wrapper from different types + HMapWrap hw{*res.value()->Write()}; + fut.Resolve(f(hw)); + hw.Launder(*res); + }; + + es->tiered_storage()->Read(op_args.db_cntx.db_index, key, pv, D{}, std::move(read_cb)); + return CbVariant{std::move(fut)}; + } + // Remove document before modification op_args.shard->search_indices()->RemoveDoc(key, op_args.db_cntx, pv); @@ -240,8 +274,11 @@ template auto WrapW(F&& f) { else op_args.shard->search_indices()->AddDoc(key, op_args.db_cntx, pv); - return res; + RETURN_ON_BAD_STATUS(res); + return CbVariant{std::move(res).value()}; }; + + return Unwrap(tx->ScheduleSingleHopT(std::move(shard_cb))); } size_t EstimateListpackMinBytes(CmdArgList members) { @@ -299,6 +336,10 @@ OpStatus OpIncrBy(const OpArgs& op_args, string_view key, string_view field, Inc auto& add_res = *op_res; PrimeValue& pv = add_res.it->second; + + if (pv.IsExternal() && !pv.IsCool()) + return OpStatus::CANCELLED; // Not supported for offloaded values + if (add_res.is_new) { pv.InitRobj(OBJ_HASH, kEncodingListPack, lpNew(0)); } else { @@ -391,14 +432,20 @@ OpResult> OpHMGet(const HMapWrap& hw, CmdArgList fields) { DCHECK(!fields.empty()); std::vector result(fields.size()); - if (auto lw = hw.Get(); lw) { + if (auto sm = hw.Get(); sm) { + for (size_t i = 0; i < fields.size(); ++i) { + if (auto it = (*sm)->Find(fields[i]); it != (*sm)->end()) { + result[i].emplace(it->second, sdslen(it->second)); + } + } + } else { absl::flat_hash_map> reverse; reverse.reserve(fields.size() + 1); for (size_t i = 0; i < fields.size(); ++i) { reverse[ArgS(fields, i)].push_back(i); // map fields to their index. } - for (const auto [key, value] : *lw) { + for (const auto [key, value] : hw.Range()) { if (auto it = reverse.find(key); it != reverse.end()) { for (size_t index : it->second) { DCHECK_LT(index, result.size()); @@ -406,13 +453,6 @@ OpResult> OpHMGet(const HMapWrap& hw, CmdArgList fields) { } } } - } else { - StringMap* sm = *hw.Get(); - for (size_t i = 0; i < fields.size(); ++i) { - if (auto it = sm->Find(fields[i]); it != sm->end()) { - result[i].emplace(it->second, sdslen(it->second)); - } - } } return result; @@ -422,10 +462,12 @@ struct OpSetParams { bool skip_if_exists = false; uint32_t ttl = UINT32_MAX; bool keepttl = false; + + optional>* backpressure = nullptr; }; -OpResult OpSet(const OpArgs& op_args, string_view key, CmdArgList values, - const OpSetParams& op_sp = OpSetParams{}) { +OpResult> OpSet(const OpArgs& op_args, string_view key, CmdArgList values, + const OpSetParams& op_sp = OpSetParams{}) { DCHECK(!values.empty() && 0 == values.size() % 2); VLOG(2) << "OpSet(" << key << ")"; @@ -438,6 +480,27 @@ OpResult OpSet(const OpArgs& op_args, string_view key, CmdArgList valu auto& it = add_res.it; PrimeValue& pv = it->second; + // If the value is external, enqueue read and modify it there + if (pv.IsExternal() && !pv.IsCool()) { + if (op_sp.ttl != UINT32_MAX) + return OpStatus::CANCELLED; // Don't support expiry with offloaded hashes + + using D = tiering::SerializedMapDecoder; + util::fb2::Future> fut; + auto read_cb = [fut, values, op_sp](io::Result res) mutable { + auto& lw = *res.value()->Write(); + uint32_t created = 0; + for (size_t i = 0; i < values.size(); i += 2) { + created += lw.Insert(values[i], values[i + 1], op_sp.skip_if_exists); + } + fut.Resolve(created); + }; + + op_args.shard->tiered_storage()->Read(op_args.db_cntx.db_index, key, pv, D{}, + std::move(read_cb)); + return CbVariant{std::move(fut)}; + } + if (add_res.is_new) { if (op_sp.ttl == UINT32_MAX) { lp = lpNew(0); @@ -492,10 +555,13 @@ OpResult OpSet(const OpArgs& op_args, string_view key, CmdArgList valu op_args.shard->search_indices()->AddDoc(key, op_args.db_cntx, pv); - if (auto* ts = op_args.shard->tiered_storage(); ts) - ts->TryStash(op_args.db_cntx.db_index, key, &pv); + if (auto* ts = op_args.shard->tiered_storage(); ts) { + auto bp = ts->TryStash(op_args.db_cntx.db_index, key, &pv, true); + if (bp && op_sp.backpressure) + *op_sp.backpressure = std::move(*bp); + } - return created; + return CbVariant{created}; } void HGetGeneric(CmdArgList args, uint8_t getall_mask, Transaction* tx, SinkReplyBuilder* builder) { @@ -587,7 +653,8 @@ void HSetEx(CmdArgList args, const CommandContext& cmd_cntx) { return OpSet(t->GetOpArgs(shard), key, fields, op_sp); }; - OpResult result = cmd_cntx.tx->ScheduleSingleHopT(std::move(cb)); + auto delayed_result = cmd_cntx.tx->ScheduleSingleHopT(std::move(cb)); + OpResult result = Unwrap(std::move(delayed_result)); if (result) { cmd_cntx.rb->SendLong(*result); } else { @@ -618,7 +685,7 @@ void HSetFamily::HDel(CmdArgList args, const CommandContext& cmd_cntx) { deleted += hw.Erase(s); return deleted; }; - HSetReplies{cmd_cntx.rb}.Send(cmd_cntx.tx->ScheduleSingleHopT(WrapW(cb))); + HSetReplies{cmd_cntx.rb}.Send(ExecuteW(cmd_cntx.tx, std::move(cb))); } void HSetFamily::HExpire(CmdArgList args, const CommandContext& cmd_cntx) { @@ -856,12 +923,19 @@ void HSetFamily::HSet(CmdArgList args, const CommandContext& cmd_cntx) { return cmd_cntx.rb->SendError(facade::WrongNumArgsError(cmd), kSyntaxErrType); } + optional> tiered_backpressure; + OpSetParams params{.backpressure = &tiered_backpressure}; + args.remove_prefix(1); auto cb = [&](Transaction* t, EngineShard* shard) { - return OpSet(t->GetOpArgs(shard), key, args); + return OpSet(t->GetOpArgs(shard), key, args, params); }; - OpResult result = cmd_cntx.tx->ScheduleSingleHopT(std::move(cb)); + auto delayed_result = cmd_cntx.tx->ScheduleSingleHopT(std::move(cb)); + OpResult result = Unwrap(std::move(delayed_result)); + + if (tiered_backpressure) + tiered_backpressure->GetFor(10ms); if (result && cmd == "HSET") { cmd_cntx.rb->SendLong(*result); @@ -876,7 +950,7 @@ void HSetFamily::HSetNx(CmdArgList args, const CommandContext& cmd_cntx) { auto cb = [&](Transaction* t, EngineShard* shard) { return OpSet(t->GetOpArgs(shard), key, args.subspan(1), OpSetParams{.skip_if_exists = true}); }; - HSetReplies{cmd_cntx.rb}.Send(cmd_cntx.tx->ScheduleSingleHopT(cb)); + HSetReplies{cmd_cntx.rb}.Send(Unwrap(cmd_cntx.tx->ScheduleSingleHopT(cb))); } void StrVecEmplaceBack(StringVec& str_vec, const listpackEntry& lp) { diff --git a/src/server/tiered_storage.cc b/src/server/tiered_storage.cc index e24bc99eb1c4..9fc8f4b53b55 100644 --- a/src/server/tiered_storage.cc +++ b/src/server/tiered_storage.cc @@ -196,7 +196,6 @@ class TieredStorage::ShardOpManager : public tiering::OpManager { // Set value to be an in-memory type again. Update memory stats. void Upload(DbIndex dbid, string_view value, PrimeValue* pv) { DCHECK(!value.empty()); - switch (pv->GetExternalRep()) { case CompactObj::ExternalRep::STRING: pv->Materialize(value, true); diff --git a/src/server/tiered_storage_test.cc b/src/server/tiered_storage_test.cc index 4d4c215df5e7..2cb03e305965 100644 --- a/src/server/tiered_storage_test.cc +++ b/src/server/tiered_storage_test.cc @@ -493,8 +493,7 @@ TEST_F(PureDiskTSTest, Dump) { TEST_P(LatentCoolingTSTest, SimpleHash) { absl::FlagSaver saver; absl::SetFlag(&FLAGS_tiered_experimental_hash_support, true); - // For now, never upload as its not implemented yet - absl::SetFlag(&FLAGS_tiered_upload_threshold, 0.0); + absl::SetFlag(&FLAGS_tiered_upload_threshold, 0.0); // never upload UpdateFromFlags(); const size_t kNUM = 100; @@ -516,9 +515,9 @@ TEST_P(LatentCoolingTSTest, SimpleHash) { // Wait for all to be stashed or in end up in bins ExpectConditionWithinTimeout([=] { auto metrics = GetMetrics(); - return metrics.tiered_stats.total_stashes + - metrics.tiered_stats.small_bins_filling_entries_cnt == - kNUM; + size_t sum = + metrics.tiered_stats.total_stashes + metrics.tiered_stats.small_bins_filling_entries_cnt; + return sum == kNUM; }); // Verify correctness @@ -530,6 +529,37 @@ TEST_P(LatentCoolingTSTest, SimpleHash) { auto v = string{31, 'x'} + 'f'; EXPECT_EQ(resp, v); } + + // Start offloading + SetFlag(&FLAGS_tiered_offload_threshold, 1.0); + UpdateFromFlags(); + auto wait_offloaded = [=] { + auto metrics = GetMetrics(); + size_t sum = + metrics.db_stats[0].tiered_entries + metrics.tiered_stats.small_bins_filling_entries_cnt; + return sum == kNUM; + }; + + // Wait for all offloads again + ExpectConditionWithinTimeout(wait_offloaded); + + // HDEL + for (size_t i = 0; i < kNUM; i++) { + string key = absl::StrCat("k", i); + EXPECT_THAT(Run({"HDEL", key, string{1, 'c'}}), IntArg(1)); + EXPECT_THAT(Run({"HLEN", key}), IntArg(25)); + } + + // Wait for all offloads again + ExpectConditionWithinTimeout(wait_offloaded); + + // HSET new field + for (size_t i = 0; i < kNUM; i++) { + string key = absl::StrCat("k", i); + EXPECT_THAT(Run({"HSET", key, string{1, 'c'}, "Some new value"}), IntArg(1)); + EXPECT_THAT(Run({"HLEN", key}), IntArg(26)); + EXPECT_EQ(Run({"HGET", key, string{1, 'c'}}), "Some new value"); + } } } // namespace dfly diff --git a/src/server/tiering/decoders.cc b/src/server/tiering/decoders.cc index 7268cca207c5..920184f8f4f6 100644 --- a/src/server/tiering/decoders.cc +++ b/src/server/tiering/decoders.cc @@ -4,8 +4,9 @@ #include "server/tiering/decoders.h" -#include "base/logging.h" +#include "core/compact_object.h" #include "core/detail/listpack_wrap.h" +#include "core/overloaded.h" #include "server/tiering/serialized_map.h" extern "C" { @@ -78,19 +79,44 @@ void SerializedMapDecoder::Initialize(std::string_view slice) { } Decoder::UploadMetrics SerializedMapDecoder::GetMetrics() const { - return UploadMetrics{.modified = false, - .estimated_mem_usage = map_->DataBytes() + map_->size() * 2 * 8}; + Overloaded ov{ + [](const SerializedMap& sm) { return sm.DataBytes() + sm.size() * 8; }, + [](const detail::ListpackWrap& lw) { return lw.DataBytes(); }, + }; + size_t bytes = visit(Overloaded{ov, [&](const auto& ptr) { return ov(*ptr); }}, map_); + return UploadMetrics{.modified = modified_, .estimated_mem_usage = bytes}; } void SerializedMapDecoder::Upload(CompactObj* obj) { + if (std::holds_alternative>(map_)) + MakeOwned(); + + obj->InitRobj(OBJ_HASH, kEncodingListPack, Write()->GetPointer()); +} + +std::variant SerializedMapDecoder::Read() const { + using RT = std::variant; + return std::visit([](auto& ptr) -> RT { return ptr.get(); }, map_); +} + +detail::ListpackWrap* SerializedMapDecoder::Write() { + if (std::holds_alternative>(map_)) + return std::get>(map_).get(); + + // Convert SerializedMap to listpack + MakeOwned(); + modified_ = true; + return Write(); +} + +void SerializedMapDecoder::MakeOwned() { + auto& map = std::get>(map_); + auto lw = detail::ListpackWrap::WithCapacity(GetMetrics().estimated_mem_usage); - for (const auto& [key, value] : *map_) + for (const auto& [key, value] : *map) lw.Insert(key, value, true); - obj->InitRobj(OBJ_HASH, kEncodingListPack, lw.GetPointer()); -} -SerializedMap* SerializedMapDecoder::Get() const { - return map_.get(); + map_ = std::make_unique(lw); } } // namespace dfly::tiering diff --git a/src/server/tiering/decoders.h b/src/server/tiering/decoders.h index 66149f93aade..d42c7d46a047 100644 --- a/src/server/tiering/decoders.h +++ b/src/server/tiering/decoders.h @@ -12,6 +12,10 @@ #include "core/compact_object.h" #include "core/string_or_view.h" +namespace dfly::detail { +struct ListpackWrap; +} + namespace dfly::tiering { struct SerializedMap; @@ -77,10 +81,14 @@ struct SerializedMapDecoder : public Decoder { UploadMetrics GetMetrics() const override; void Upload(CompactObj* obj) override; - SerializedMap* Get() const; + std::variant Read() const; + dfly::detail::ListpackWrap* Write(); private: - std::unique_ptr map_; + void MakeOwned(); // Convert to listpack + + bool modified_; + std::variant, std::unique_ptr> map_; }; } // namespace dfly::tiering