diff --git a/src/core/compact_object.cc b/src/core/compact_object.cc index 6859991cc68d..585e4e33e859 100644 --- a/src/core/compact_object.cc +++ b/src/core/compact_object.cc @@ -598,91 +598,6 @@ bool RobjWrapper::DefragIfNeeded(PageUsage* page_usage) { return false; } -int RobjWrapper::ZsetAdd(double score, std::string_view ele, int in_flags, int* out_flags, - double* newscore) { - *out_flags = 0; /* We'll return our response flags. */ - double curscore; - - /* NaN as input is an error regardless of all the other parameters. */ - if (isnan(score)) { - *out_flags = ZADD_OUT_NAN; - return 0; - } - - /* Update the sorted set according to its encoding. */ - if (encoding_ == OBJ_ENCODING_LISTPACK) { - /* Turn options into simple to check vars. */ - bool incr = (in_flags & ZADD_IN_INCR) != 0; - bool nx = (in_flags & ZADD_IN_NX) != 0; - bool xx = (in_flags & ZADD_IN_XX) != 0; - bool gt = (in_flags & ZADD_IN_GT) != 0; - bool lt = (in_flags & ZADD_IN_LT) != 0; - - uint8_t* lp = (uint8_t*)inner_obj_; - uint8_t* eptr = ZzlFind(lp, ele, &curscore); - if (eptr != NULL) { - /* NX? Return, same element already exists. */ - if (nx) { - *out_flags |= ZADD_OUT_NOP; - return 1; - } - - /* Prepare the score for the increment if needed. */ - if (incr) { - score += curscore; - if (isnan(score)) { - *out_flags |= ZADD_OUT_NAN; - return 0; - } - } - - /* GT/LT? Only update if score is greater/less than current. */ - if ((lt && score >= curscore) || (gt && score <= curscore)) { - *out_flags |= ZADD_OUT_NOP; - return 1; - } - - if (newscore) - *newscore = score; - - /* Remove and re-insert when score changed. */ - if (score != curscore) { - lp = lpDeleteRangeWithEntry(lp, &eptr, 2); - lp = detail::ZzlInsert(lp, ele, score); - inner_obj_ = lp; - *out_flags |= ZADD_OUT_UPDATED; - } - - return 1; - } else if (!xx) { - unsigned zl_len = lpLength(lp) / 2; - - /* check if the element is too large or the list - * becomes too long *before* executing zzlInsert. */ - if (zl_len >= server.zset_max_listpack_entries || - ele.size() > server.zset_max_listpack_value) { - inner_obj_ = SortedMap::FromListPack(tl.local_mr, lp); - lpFree(lp); - encoding_ = OBJ_ENCODING_SKIPLIST; - } else { - lp = detail::ZzlInsert(lp, ele, score); - inner_obj_ = lp; - if (newscore) - *newscore = score; - *out_flags |= ZADD_OUT_ADDED; - return 1; - } - } else { - *out_flags |= ZADD_OUT_NOP; - return 1; - } - } - - CHECK_EQ(encoding_, OBJ_ENCODING_SKIPLIST); - SortedMap* ss = (SortedMap*)inner_obj_; - return ss->AddElem(score, ele, in_flags, out_flags, newscore); -} - void RobjWrapper::ReallocateString(MemoryResource* mr) { DCHECK_EQ(type(), OBJ_STRING); void* old_ptr = inner_obj_; diff --git a/src/core/compact_object.h b/src/core/compact_object.h index 50f1797d9c93..be9e22d34c25 100644 --- a/src/core/compact_object.h +++ b/src/core/compact_object.h @@ -75,9 +75,6 @@ class RobjWrapper { // Returns true if re-allocated. bool DefragIfNeeded(PageUsage* page_usage); - // as defined in zset.h - int ZsetAdd(double score, std::string_view ele, int in_flags, int* out_flags, double* newscore); - private: void ReallocateString(MemoryResource* mr); @@ -298,15 +295,6 @@ class CompactObj { void SetInt(int64_t val); std::optional TryGetInt() const; - // We temporary expose this function to avoid passing around robj objects. - detail::RobjWrapper* GetRobjWrapper() { - return &u_.r_obj; - } - - const detail::RobjWrapper* GetRobjWrapper() const { - return &u_.r_obj; - } - // For STR object. void SetString(std::string_view str, bool is_key); void SetValue(std::string_view val) { diff --git a/src/server/container_utils.cc b/src/server/container_utils.cc index 95529eef7239..1774617158d2 100644 --- a/src/server/container_utils.cc +++ b/src/server/container_utils.cc @@ -180,9 +180,9 @@ bool IterateSet(const PrimeValue& pv, const IterateFunc& func) { return success; } -bool IterateSortedSet(const detail::RobjWrapper* robj_wrapper, const IterateSortedFunc& func, - int32_t start, int32_t end, bool reverse, bool use_score) { - unsigned long llen = robj_wrapper->Size(); +bool IterateSortedSet(const PrimeValue& pv, const IterateSortedFunc& func, int32_t start, + int32_t end, bool reverse, bool use_score) { + unsigned long llen = pv.Size(); if (end < 0 || unsigned(end) >= llen) end = llen - 1; @@ -191,8 +191,8 @@ bool IterateSortedSet(const detail::RobjWrapper* robj_wrapper, const IterateSort unsigned rangelen = unsigned(end - start) + 1; - if (robj_wrapper->encoding() == OBJ_ENCODING_LISTPACK) { - uint8_t* zl = static_cast(robj_wrapper->inner_obj()); + if (pv.Encoding() == OBJ_ENCODING_LISTPACK) { + uint8_t* zl = static_cast(pv.RObjPtr()); uint8_t *eptr, *sptr; uint8_t* vstr; unsigned int vlen; @@ -231,8 +231,8 @@ bool IterateSortedSet(const detail::RobjWrapper* robj_wrapper, const IterateSort } return success; } else { - CHECK_EQ(robj_wrapper->encoding(), OBJ_ENCODING_SKIPLIST); - detail::SortedMap* smap = (detail::SortedMap*)robj_wrapper->inner_obj(); + CHECK_EQ(pv.Encoding(), OBJ_ENCODING_SKIPLIST); + auto* smap = static_cast(pv.RObjPtr()); return smap->Iterate(start, rangelen, reverse, [&](sds ele, double score) { return func(ContainerEntry{ele, sdslen(ele)}, score); }); diff --git a/src/server/container_utils.h b/src/server/container_utils.h index 9177bce328be..851c1a99ab4b 100644 --- a/src/server/container_utils.h +++ b/src/server/container_utils.h @@ -65,9 +65,8 @@ bool IterateSet(const PrimeValue& pv, const IterateFunc& func); // Iterate over all values and call func(val). Iteration stops as soon // as func return false. Returns true if it successfully processed all elements // without stopping. -bool IterateSortedSet(const detail::RobjWrapper* robj_wrapper, const IterateSortedFunc& func, - int32_t start = 0, int32_t end = -1, bool reverse = false, - bool use_score = false); +bool IterateSortedSet(const PrimeValue& pv, const IterateSortedFunc& func, int32_t start = 0, + int32_t end = -1, bool reverse = false, bool use_score = false); bool IterateMap(const PrimeValue& pv, const IterateKVFunc& func); diff --git a/src/server/debugcmd.cc b/src/server/debugcmd.cc index 61949e466ff3..56fd3e1b5621 100644 --- a/src/server/debugcmd.cc +++ b/src/server/debugcmd.cc @@ -179,8 +179,7 @@ void AddObjHist(PrimeIterator it, ObjHist* hist) { val_len = ql->MallocUsed(true); } } else if (pv.ObjType() == OBJ_ZSET) { - IterateSortedSet(pv.GetRobjWrapper(), - [&](ContainerEntry entry, double) { return per_entry_cb(entry); }); + IterateSortedSet(pv, [&](ContainerEntry entry, double) { return per_entry_cb(entry); }); val_len = 0; // reset - will be calculated below. if (pv.Encoding() == OBJ_ENCODING_LISTPACK) { hist->listpack.Add(pv.MallocUsed()); @@ -312,7 +311,7 @@ void DoComputeHist(CompactObjType type, EngineShard* shard, ConnectionContext* c } } else if (type == OBJ_ZSET && it->second.ObjType() == OBJ_ZSET) { container_utils::IterateSortedSet( - it->second.GetRobjWrapper(), [&](container_utils::ContainerEntry entry, double) { + it->second, [&](container_utils::ContainerEntry entry, double) { ++steps; if (entry.value) { HIST_add(dest->hist.data(), entry.value, entry.length); diff --git a/src/server/generic_family.cc b/src/server/generic_family.cc index 5387ef988dc9..9d8ebfd0d1ba 100644 --- a/src/server/generic_family.cc +++ b/src/server/generic_family.cc @@ -1437,8 +1437,7 @@ template bool Iterate(const PrimeValue& pv, F&& func) { return container_utils::IterateSet(pv, cb); case OBJ_ZSET: return container_utils::IterateSortedSet( - pv.GetRobjWrapper(), - [&cb](container_utils::ContainerEntry ce, double) { return cb(ce); }); + pv, [&cb](container_utils::ContainerEntry ce, double) { return cb(ce); }); default: return false; } diff --git a/src/server/journal/cmd_serializer.cc b/src/server/journal/cmd_serializer.cc index da767215934c..b58235087ef5 100644 --- a/src/server/journal/cmd_serializer.cc +++ b/src/server/journal/cmd_serializer.cc @@ -170,7 +170,7 @@ size_t CmdSerializer::SerializeZSet(string_view key, const PrimeValue& pv) { size_t commands = 0; container_utils::IterateSortedSet( - pv.GetRobjWrapper(), + pv, [&](container_utils::ContainerEntry ce, double score) { aggregator.AddArg(absl::StrCat(score), CommandAggregator::CommitMode::kNoCommit); commands += aggregator.AddArg(ce.ToString()); diff --git a/src/server/rdb_save.cc b/src/server/rdb_save.cc index 1952e8255336..9a54177d5061 100644 --- a/src/server/rdb_save.cc +++ b/src/server/rdb_save.cc @@ -467,9 +467,8 @@ error_code RdbSerializer::SaveHSetObject(const PrimeValue& pv) { error_code RdbSerializer::SaveZSetObject(const PrimeValue& pv) { DCHECK_EQ(OBJ_ZSET, pv.ObjType()); - const detail::RobjWrapper* robj_wrapper = pv.GetRobjWrapper(); if (pv.Encoding() == OBJ_ENCODING_SKIPLIST) { - detail::SortedMap* zs = (detail::SortedMap*)robj_wrapper->inner_obj(); + auto* zs = static_cast(pv.RObjPtr()); RETURN_ON_ERR(SaveLen(zs->Size())); std::error_code ec; @@ -497,7 +496,7 @@ error_code RdbSerializer::SaveZSetObject(const PrimeValue& pv) { }); } else { CHECK_EQ(pv.Encoding(), unsigned(OBJ_ENCODING_LISTPACK)); - uint8_t* lp = (uint8_t*)robj_wrapper->inner_obj(); + uint8_t* lp = (uint8_t*)pv.RObjPtr(); size_t lp_bytes = lpBytes(lp); RETURN_ON_ERR(SaveString((uint8_t*)lp, lp_bytes)); diff --git a/src/server/zset_family.cc b/src/server/zset_family.cc index 17706e5fe23b..d92b346c278c 100644 --- a/src/server/zset_family.cc +++ b/src/server/zset_family.cc @@ -96,8 +96,8 @@ zlexrangespec GetLexRange(bool reverse, const ZSetFamily::LexInterval& li) { return range; } -bool IsListPack(const detail::RobjWrapper* robj_wrapper) { - return robj_wrapper->encoding() == OBJ_ENCODING_LISTPACK; +bool IsListPack(const PrimeValue& pv) { + return pv.Encoding() == OBJ_ENCODING_LISTPACK; } /* Delete the element 'ele' from the sorted set, returning 1 if the element @@ -105,17 +105,17 @@ bool IsListPack(const detail::RobjWrapper* robj_wrapper) { * taken from t_zset.c */ -int ZsetDel(detail::RobjWrapper* robj_wrapper, std::string_view ele) { - if (IsListPack(robj_wrapper)) { - uint8_t* lp = (uint8_t*)robj_wrapper->inner_obj(); +int ZsetDel(PrimeValue* pv, std::string_view ele) { + if (IsListPack(*pv)) { + uint8_t* lp = (uint8_t*)pv->RObjPtr(); unsigned char* eptr = detail::ZzlFind(lp, ele, nullptr); if (eptr) { lp = lpDeleteRangeWithEntry(lp, &eptr, 2); - robj_wrapper->set_inner_obj(lp); + pv->SetRObjPtr(lp); return 1; } - } else if (robj_wrapper->encoding() == OBJ_ENCODING_SKIPLIST) { - detail::SortedMap* zs = (detail::SortedMap*)robj_wrapper->inner_obj(); + } else if (pv->Encoding() == OBJ_ENCODING_SKIPLIST) { + detail::SortedMap* zs = (detail::SortedMap*)pv->RObjPtr(); if (zs->Delete(ele)) return 1; } @@ -123,17 +123,16 @@ int ZsetDel(detail::RobjWrapper* robj_wrapper, std::string_view ele) { } // taken from t_zset.c -std::optional GetZsetScore(const detail::RobjWrapper* robj_wrapper, - std::string_view member) { - if (IsListPack(robj_wrapper)) { +std::optional GetZsetScore(const PrimeValue& pv, std::string_view member) { + if (IsListPack(pv)) { double score; - if (detail::ZzlFind((uint8_t*)robj_wrapper->inner_obj(), member, &score) == NULL) + if (detail::ZzlFind((uint8_t*)pv.RObjPtr(), member, &score) == NULL) return std::nullopt; return score; } - if (robj_wrapper->encoding() == OBJ_ENCODING_SKIPLIST) { - detail::SortedMap* zs = (detail::SortedMap*)robj_wrapper->inner_obj(); + if (pv.Encoding() == OBJ_ENCODING_SKIPLIST) { + detail::SortedMap* zs = (detail::SortedMap*)pv.RObjPtr(); return zs->GetScore(member); } @@ -141,6 +140,90 @@ std::optional GetZsetScore(const detail::RobjWrapper* robj_wrapper, return 0; } +int ZsetAdd(PrimeValue* pv, double score, std::string_view ele, int in_flags, int* out_flags, + double* newscore) { + *out_flags = 0; /* We'll return our response flags. */ + double curscore; + + /* NaN as input is an error regardless of all the other parameters. */ + if (isnan(score)) { + *out_flags = ZADD_OUT_NAN; + return 0; + } + + /* Update the sorted set according to its encoding. */ + if (pv->Encoding() == OBJ_ENCODING_LISTPACK) { + /* Turn options into simple to check vars. */ + bool incr = (in_flags & ZADD_IN_INCR) != 0; + bool nx = (in_flags & ZADD_IN_NX) != 0; + bool xx = (in_flags & ZADD_IN_XX) != 0; + bool gt = (in_flags & ZADD_IN_GT) != 0; + bool lt = (in_flags & ZADD_IN_LT) != 0; + + uint8_t* lp = (uint8_t*)pv->RObjPtr(); + uint8_t* eptr = detail::ZzlFind(lp, ele, &curscore); + if (eptr != NULL) { + /* NX? Return, same element already exists. */ + if (nx) { + *out_flags |= ZADD_OUT_NOP; + return 1; + } + + /* Prepare the score for the increment if needed. */ + if (incr) { + score += curscore; + if (isnan(score)) { + *out_flags |= ZADD_OUT_NAN; + return 0; + } + } + + /* GT/LT? Only update if score is greater/less than current. */ + if ((lt && score >= curscore) || (gt && score <= curscore)) { + *out_flags |= ZADD_OUT_NOP; + return 1; + } + + if (newscore) + *newscore = score; + + /* Remove and re-insert when score changed. */ + if (score != curscore) { + lp = lpDeleteRangeWithEntry(lp, &eptr, 2); + lp = detail::ZzlInsert(lp, ele, score); + pv->SetRObjPtr(lp); + *out_flags |= ZADD_OUT_UPDATED; + } + + return 1; + } else if (!xx) { + unsigned zl_len = lpLength(lp) / 2; + + /* check if the element is too large or the list + * becomes too long *before* executing zzlInsert. */ + if (zl_len >= server.zset_max_listpack_entries || + ele.size() > server.zset_max_listpack_value) { + auto* ptr = detail::SortedMap::FromListPack(pv->memory_resource(), lp); + pv->InitRobj(OBJ_ZSET, OBJ_ENCODING_SKIPLIST, ptr); + } else { + lp = detail::ZzlInsert(lp, ele, score); + pv->SetRObjPtr(lp); + if (newscore) + *newscore = score; + *out_flags |= ZADD_OUT_ADDED; + return 1; + } + } else { + *out_flags |= ZADD_OUT_NOP; + return 1; + } + } + + CHECK_EQ(pv->Encoding(), OBJ_ENCODING_SKIPLIST); + detail::SortedMap* ss = (detail::SortedMap*)pv->RObjPtr(); + return ss->AddElem(score, ele, in_flags, out_flags, newscore); +} + void OutputScoredArrayResult(const OpResult& result, SinkReplyBuilder* builder) { if (result.status() == OpStatus::WRONG_TYPE) { return builder->SendError(kWrongTypeErr); @@ -199,7 +282,7 @@ enum class Action : uint8_t { RANGE = 0, REMOVE = 1, POP = 2 }; class IntervalVisitor { public: IntervalVisitor(Action action, const ZSetFamily::RangeParams& params, PrimeValue* pv) - : action_(action), params_(params), robj_wrapper_(pv->GetRobjWrapper()) { + : action_(action), params_(params), pv_(pv) { } void operator()(const ZSetFamily::IndexInterval& ii); @@ -255,14 +338,14 @@ class IntervalVisitor { Action action_; ZSetFamily::RangeParams params_; - detail::RobjWrapper* robj_wrapper_; + PrimeValue* pv_; ScoredArray result_; unsigned removed_ = 0; }; void IntervalVisitor::operator()(const ZSetFamily::IndexInterval& ii) { - unsigned long llen = robj_wrapper_->Size(); + unsigned long llen = pv_->Size(); int64_t start = ii.first; int64_t end = ii.second; @@ -342,7 +425,7 @@ void IntervalVisitor::ActionRange(unsigned start, unsigned end) { end = static_cast(min(1ULL * start + params_.limit - 1, 1ULL * end)); container_utils::IterateSortedSet( - robj_wrapper_, + *pv_, [this](container_utils::ContainerEntry ce, double score) { result_.emplace_back(ce.ToString(), score); return true; @@ -351,78 +434,78 @@ void IntervalVisitor::ActionRange(unsigned start, unsigned end) { } void IntervalVisitor::ActionRange(const zrangespec& range) { - if (IsListPack(robj_wrapper_)) { + if (IsListPack(*pv_)) { ExtractListPack(range); } else { - CHECK_EQ(robj_wrapper_->encoding(), OBJ_ENCODING_SKIPLIST); + CHECK_EQ(pv_->Encoding(), OBJ_ENCODING_SKIPLIST); ExtractSkipList(range); } } void IntervalVisitor::ActionRange(const zlexrangespec& range) { - if (IsListPack(robj_wrapper_)) { + if (IsListPack(*pv_)) { ExtractListPack(range); } else { - CHECK_EQ(robj_wrapper_->encoding(), OBJ_ENCODING_SKIPLIST); + CHECK_EQ(pv_->Encoding(), OBJ_ENCODING_SKIPLIST); ExtractSkipList(range); } } void IntervalVisitor::ActionRem(unsigned start, unsigned end) { - if (IsListPack(robj_wrapper_)) { - uint8_t* zl = (uint8_t*)robj_wrapper_->inner_obj(); + if (IsListPack(*pv_)) { + uint8_t* zl = (uint8_t*)pv_->RObjPtr(); removed_ = (end - start) + 1; zl = lpDeleteRange(zl, 2 * start, 2 * removed_); - robj_wrapper_->set_inner_obj(zl); + pv_->SetRObjPtr(zl); } else { - CHECK_EQ(OBJ_ENCODING_SKIPLIST, robj_wrapper_->encoding()); - detail::SortedMap* zs = (detail::SortedMap*)robj_wrapper_->inner_obj(); + CHECK_EQ(OBJ_ENCODING_SKIPLIST, pv_->Encoding()); + detail::SortedMap* zs = (detail::SortedMap*)pv_->RObjPtr(); removed_ = zs->DeleteRangeByRank(start, end); } } void IntervalVisitor::ActionRem(const zrangespec& range) { - if (IsListPack(robj_wrapper_)) { - uint8_t* zl = (uint8_t*)robj_wrapper_->inner_obj(); + if (IsListPack(*pv_)) { + uint8_t* zl = (uint8_t*)pv_->RObjPtr(); unsigned long deleted = 0; zl = detail::ZzlDeleteRangeByScore(zl, &range, &deleted); - robj_wrapper_->set_inner_obj(zl); + pv_->SetRObjPtr(zl); removed_ = deleted; } else { - CHECK_EQ(OBJ_ENCODING_SKIPLIST, robj_wrapper_->encoding()); - detail::SortedMap* zs = (detail::SortedMap*)robj_wrapper_->inner_obj(); + CHECK_EQ(OBJ_ENCODING_SKIPLIST, pv_->Encoding()); + detail::SortedMap* zs = (detail::SortedMap*)pv_->RObjPtr(); removed_ = zs->DeleteRangeByScore(range); } } void IntervalVisitor::ActionRem(const zlexrangespec& range) { - if (IsListPack(robj_wrapper_)) { - uint8_t* zl = (uint8_t*)robj_wrapper_->inner_obj(); + if (IsListPack(*pv_)) { + uint8_t* zl = (uint8_t*)pv_->RObjPtr(); unsigned long deleted = 0; zl = detail::ZzlDeleteRangeByLex(zl, &range, &deleted); - robj_wrapper_->set_inner_obj(zl); + pv_->SetRObjPtr(zl); removed_ = deleted; } else { - CHECK_EQ(OBJ_ENCODING_SKIPLIST, robj_wrapper_->encoding()); - detail::SortedMap* zs = (detail::SortedMap*)robj_wrapper_->inner_obj(); + CHECK_EQ(OBJ_ENCODING_SKIPLIST, pv_->Encoding()); + detail::SortedMap* zs = (detail::SortedMap*)pv_->RObjPtr(); removed_ = zs->DeleteRangeByLex(range); } } void IntervalVisitor::ActionPop(ZSetFamily::TopNScored sc) { if (sc > 0) { - if (IsListPack(robj_wrapper_)) { + if (IsListPack(*pv_)) { PopListPack(sc); } else { - CHECK_EQ(robj_wrapper_->encoding(), OBJ_ENCODING_SKIPLIST); + CHECK_EQ(pv_->Encoding(), OBJ_ENCODING_SKIPLIST); PopSkipList(sc); } } } void IntervalVisitor::ExtractListPack(const zrangespec& range) { - uint8_t* zl = (uint8_t*)robj_wrapper_->inner_obj(); + uint8_t* zl = (uint8_t*)pv_->RObjPtr(); uint8_t *eptr, *sptr; uint8_t* vstr; unsigned int vlen = 0; @@ -466,7 +549,7 @@ void IntervalVisitor::ExtractListPack(const zrangespec& range) { } void IntervalVisitor::ExtractSkipList(const zrangespec& range) { - detail::SortedMap* zs = (detail::SortedMap*)robj_wrapper_->inner_obj(); + detail::SortedMap* zs = (detail::SortedMap*)pv_->RObjPtr(); unsigned offset = params_.offset; unsigned limit = params_.limit; @@ -475,7 +558,7 @@ void IntervalVisitor::ExtractSkipList(const zrangespec& range) { } void IntervalVisitor::ExtractListPack(const zlexrangespec& range) { - uint8_t* zl = (uint8_t*)robj_wrapper_->inner_obj(); + uint8_t* zl = (uint8_t*)pv_->RObjPtr(); uint8_t *eptr, *sptr = nullptr; uint8_t* vstr = nullptr; unsigned int vlen = 0; @@ -523,14 +606,14 @@ void IntervalVisitor::ExtractListPack(const zlexrangespec& range) { } void IntervalVisitor::ExtractSkipList(const zlexrangespec& range) { - detail::SortedMap* zs = (detail::SortedMap*)robj_wrapper_->inner_obj(); + detail::SortedMap* zs = (detail::SortedMap*)pv_->RObjPtr(); unsigned offset = params_.offset; unsigned limit = params_.limit; result_ = zs->GetLexRange(range, offset, limit, params_.reverse); } void IntervalVisitor::PopListPack(ZSetFamily::TopNScored sc) { - uint8_t* zl = (uint8_t*)robj_wrapper_->inner_obj(); + uint8_t* zl = (uint8_t*)pv_->RObjPtr(); uint8_t *eptr, *sptr; uint8_t* vstr; unsigned int vlen = 0; @@ -565,11 +648,11 @@ void IntervalVisitor::PopListPack(ZSetFamily::TopNScored sc) { } /* We can finally delete the elements */ - robj_wrapper_->set_inner_obj(lpDeleteRange(zl, start, 2 * sc)); + pv_->SetRObjPtr(lpDeleteRange(zl, start, 2 * sc)); } void IntervalVisitor::PopSkipList(ZSetFamily::TopNScored sc) { - detail::SortedMap* zs = (detail::SortedMap*)robj_wrapper_->inner_obj(); + detail::SortedMap* zs = (detail::SortedMap*)pv_->RObjPtr(); /* We start from the header, or the tail if reversed. */ result_ = zs->PopTopScores(sc, params_.reverse); @@ -1009,7 +1092,7 @@ ScoredArray OpBZPop(Transaction* t, EngineShard* shard, std::string_view key, bo DVLOG(2) << "popping from " << key << " " << t->DebugId(); PrimeValue& pv = it->second; - CHECK_GT(pv.Size(), 0u) << key << " " << pv.GetRobjWrapper()->encoding(); + CHECK_GT(pv.Size(), 0u) << key << " " << pv.Encoding(); IntervalVisitor iv{Action::POP, range_spec.params, &pv}; std::visit(iv, range_spec.interval); @@ -1019,8 +1102,8 @@ ScoredArray OpBZPop(Transaction* t, EngineShard* shard, std::string_view key, bo auto res = iv.PopResult(); // We don't store empty keys - CHECK(!res.empty()) << key << " failed to pop from type " << pv.GetRobjWrapper()->encoding() - << " now size is " << pv.Size(); + CHECK(!res.empty()) << key << " failed to pop from type " << pv.Encoding() << " now size is " + << pv.Size(); auto zlen = pv.Size(); if (zlen == 0) { @@ -1209,9 +1292,9 @@ OpResult OpRank(const OpArgs& op_args, string_view key, string_view if (!res_it) return res_it.status(); - const detail::RobjWrapper* robj_wrapper = res_it.value()->second.GetRobjWrapper(); - if (IsListPack(robj_wrapper)) { - unsigned char* zl = (uint8_t*)robj_wrapper->inner_obj(); + auto& pv = res_it.value()->second; + if (IsListPack(pv)) { + unsigned char* zl = (uint8_t*)pv.RObjPtr(); unsigned char *eptr, *sptr; eptr = lpSeek(zl, 0); @@ -1240,8 +1323,8 @@ OpResult OpRank(const OpArgs& op_args, string_view key, string_view } return res; } - DCHECK_EQ(robj_wrapper->encoding(), OBJ_ENCODING_SKIPLIST); - detail::SortedMap* ss = (detail::SortedMap*)robj_wrapper->inner_obj(); + DCHECK_EQ(pv.Encoding(), OBJ_ENCODING_SKIPLIST); + detail::SortedMap* ss = (detail::SortedMap*)pv.RObjPtr(); RankResult res{}; @@ -1269,7 +1352,7 @@ OpResult OpCount(const OpArgs& op_args, std::string_view key, if (!res_it) return res_it.status(); - const detail::RobjWrapper* robj_wrapper = res_it.value()->second.GetRobjWrapper(); + auto& pv = res_it.value()->second; zrangespec range = GetZrangeSpec(false, interval); unsigned count = 0; @@ -1277,8 +1360,8 @@ OpResult OpCount(const OpArgs& op_args, std::string_view key, return 0; } - if (IsListPack(robj_wrapper)) { - uint8_t* zl = (uint8_t*)robj_wrapper->inner_obj(); + if (IsListPack(pv)) { + uint8_t* zl = (uint8_t*)pv.RObjPtr(); uint8_t *eptr, *sptr; double score; @@ -1309,8 +1392,8 @@ OpResult OpCount(const OpArgs& op_args, std::string_view key, } } } else { - CHECK_EQ(unsigned(OBJ_ENCODING_SKIPLIST), robj_wrapper->encoding()); - detail::SortedMap* zs = (detail::SortedMap*)robj_wrapper->inner_obj(); + CHECK_EQ(unsigned(OBJ_ENCODING_SKIPLIST), pv.Encoding()); + detail::SortedMap* zs = (detail::SortedMap*)pv.RObjPtr(); count = zs->Count(range); } @@ -1325,10 +1408,10 @@ OpResult OpLexCount(const OpArgs& op_args, string_view key, zlexrangespec range = GetLexRange(false, interval); unsigned count = 0; - const detail::RobjWrapper* robj_wrapper = res_it.value()->second.GetRobjWrapper(); - if (IsListPack(robj_wrapper)) { - uint8_t* zl = (uint8_t*)robj_wrapper->inner_obj(); + auto& pv = res_it.value()->second; + if (IsListPack(pv)) { + uint8_t* zl = (uint8_t*)pv.RObjPtr(); uint8_t *eptr, *sptr; /* Use the first element in range as the starting point */ @@ -1351,8 +1434,8 @@ OpResult OpLexCount(const OpArgs& op_args, string_view key, } } } else { - DCHECK_EQ(OBJ_ENCODING_SKIPLIST, robj_wrapper->encoding()); - detail::SortedMap* zs = (detail::SortedMap*)robj_wrapper->inner_obj(); + DCHECK_EQ(OBJ_ENCODING_SKIPLIST, pv.Encoding()); + detail::SortedMap* zs = (detail::SortedMap*)pv.RObjPtr(); count = zs->LexCount(range); } @@ -1366,12 +1449,12 @@ OpResult OpRem(const OpArgs& op_args, string_view key, const facade::A if (!res_it) return res_it.status(); - detail::RobjWrapper* robj_wrapper = res_it->it->second.GetRobjWrapper(); + auto& pv = res_it->it->second; unsigned deleted = 0; for (string_view member : members) - deleted += ZsetDel(robj_wrapper, member); + deleted += ZsetDel(&pv, member); - auto zlen = robj_wrapper->Size(); + auto zlen = pv.Size(); res_it->post_updater.Run(); if (zlen == 0) { @@ -1396,11 +1479,10 @@ OpResult OpMScore(const OpArgs& op_args, string_view key, MScoreResponse scores(members.Size()); - const detail::RobjWrapper* robj_wrapper = res_it.value()->second.GetRobjWrapper(); - + auto& pv = res_it.value()->second; size_t i = 0; for (string_view member : members.Range()) - scores[i++] = GetZsetScore(robj_wrapper, member); + scores[i++] = GetZsetScore(pv, member); return scores; } @@ -1418,7 +1500,7 @@ OpResult OpScan(const OpArgs& op_args, std::string_view key, uint64_t StringVec res; char buf[128]; - if (IsListPack(pv.GetRobjWrapper())) { + if (IsListPack(pv)) { ZSetFamily::RangeParams params; params.with_scores = true; IntervalVisitor iv{Action::RANGE, params, const_cast(&pv)}; @@ -1996,28 +2078,28 @@ OpResult ZSetFamily::OpAdd(const OpArgs& op_args, OpStatus op_status = OpStatus::OK; AddResult aresult; - detail::RobjWrapper* robj_wrapper = res_it->it->second.GetRobjWrapper(); - bool is_list_pack = IsListPack(robj_wrapper); + auto& pv = res_it->it->second; + bool is_list_pack = IsListPack(pv); // opportunistically reserve space if multiple entries are about to be added. if ((zparams.flags & ZADD_IN_XX) == 0 && members.size() > 2) { if (is_list_pack) { - uint8_t* zl = (uint8_t*)robj_wrapper->inner_obj(); + uint8_t* zl = (uint8_t*)pv.RObjPtr(); size_t malloc_reserved = zmalloc_size(zl); size_t min_sz = EstimateListpackMinBytes(members); if (min_sz > malloc_reserved) { zl = (uint8_t*)zrealloc(zl, min_sz); - robj_wrapper->set_inner_obj(zl); + pv.SetRObjPtr(zl); } } else { - detail::SortedMap* sm = (detail::SortedMap*)robj_wrapper->inner_obj(); + detail::SortedMap* sm = (detail::SortedMap*)pv.RObjPtr(); sm->Reserve(members.size()); } } for (size_t j = 0; j < members.size(); j++) { const auto& m = members[j]; - int retval = robj_wrapper->ZsetAdd(m.first, m.second, zparams.flags, &retflags, &new_score); + int retval = ZsetAdd(&pv, m.first, m.second, zparams.flags, &retflags, &new_score); if (zparams.flags & ZADD_IN_INCR) { if (retval == 0) { @@ -2080,8 +2162,7 @@ OpResult ZSetFamily::OpScore(const OpArgs& op_args, string_view key, str return res_it.status(); const PrimeValue& pv = res_it.value()->second; - const detail::RobjWrapper* robj_wrapper = pv.GetRobjWrapper(); - auto res = GetZsetScore(robj_wrapper, member); + auto res = GetZsetScore(pv, member); if (!res) { return OpStatus::MEMBER_NOTFOUND; }