Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 0 additions & 85 deletions src/core/compact_object.cc
Original file line number Diff line number Diff line change
Expand Up @@ -598,91 +598,6 @@ bool RobjWrapper::DefragIfNeeded(PageUsage* page_usage) {
return false;
}

int RobjWrapper::ZsetAdd(double score, std::string_view ele, int in_flags, int* out_flags,
double* newscore) {
*out_flags = 0; /* We'll return our response flags. */
double curscore;

/* NaN as input is an error regardless of all the other parameters. */
if (isnan(score)) {
*out_flags = ZADD_OUT_NAN;
return 0;
}

/* Update the sorted set according to its encoding. */
if (encoding_ == OBJ_ENCODING_LISTPACK) {
/* Turn options into simple to check vars. */
bool incr = (in_flags & ZADD_IN_INCR) != 0;
bool nx = (in_flags & ZADD_IN_NX) != 0;
bool xx = (in_flags & ZADD_IN_XX) != 0;
bool gt = (in_flags & ZADD_IN_GT) != 0;
bool lt = (in_flags & ZADD_IN_LT) != 0;

uint8_t* lp = (uint8_t*)inner_obj_;
uint8_t* eptr = ZzlFind(lp, ele, &curscore);
if (eptr != NULL) {
/* NX? Return, same element already exists. */
if (nx) {
*out_flags |= ZADD_OUT_NOP;
return 1;
}

/* Prepare the score for the increment if needed. */
if (incr) {
score += curscore;
if (isnan(score)) {
*out_flags |= ZADD_OUT_NAN;
return 0;
}
}

/* GT/LT? Only update if score is greater/less than current. */
if ((lt && score >= curscore) || (gt && score <= curscore)) {
*out_flags |= ZADD_OUT_NOP;
return 1;
}

if (newscore)
*newscore = score;

/* Remove and re-insert when score changed. */
if (score != curscore) {
lp = lpDeleteRangeWithEntry(lp, &eptr, 2);
lp = detail::ZzlInsert(lp, ele, score);
inner_obj_ = lp;
*out_flags |= ZADD_OUT_UPDATED;
}

return 1;
} else if (!xx) {
unsigned zl_len = lpLength(lp) / 2;

/* check if the element is too large or the list
* becomes too long *before* executing zzlInsert. */
if (zl_len >= server.zset_max_listpack_entries ||
ele.size() > server.zset_max_listpack_value) {
inner_obj_ = SortedMap::FromListPack(tl.local_mr, lp);
lpFree(lp);
encoding_ = OBJ_ENCODING_SKIPLIST;
} else {
lp = detail::ZzlInsert(lp, ele, score);
inner_obj_ = lp;
if (newscore)
*newscore = score;
*out_flags |= ZADD_OUT_ADDED;
return 1;
}
} else {
*out_flags |= ZADD_OUT_NOP;
return 1;
}
}

CHECK_EQ(encoding_, OBJ_ENCODING_SKIPLIST);
SortedMap* ss = (SortedMap*)inner_obj_;
return ss->AddElem(score, ele, in_flags, out_flags, newscore);
}

void RobjWrapper::ReallocateString(MemoryResource* mr) {
DCHECK_EQ(type(), OBJ_STRING);
void* old_ptr = inner_obj_;
Expand Down
12 changes: 0 additions & 12 deletions src/core/compact_object.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,6 @@ class RobjWrapper {
// Returns true if re-allocated.
bool DefragIfNeeded(PageUsage* page_usage);

// as defined in zset.h
int ZsetAdd(double score, std::string_view ele, int in_flags, int* out_flags, double* newscore);

private:
void ReallocateString(MemoryResource* mr);

Expand Down Expand Up @@ -298,15 +295,6 @@ class CompactObj {
void SetInt(int64_t val);
std::optional<int64_t> TryGetInt() const;

// We temporary expose this function to avoid passing around robj objects.
detail::RobjWrapper* GetRobjWrapper() {
return &u_.r_obj;
}

const detail::RobjWrapper* GetRobjWrapper() const {
return &u_.r_obj;
}

// For STR object.
void SetString(std::string_view str, bool is_key);
void SetValue(std::string_view val) {
Expand Down
14 changes: 7 additions & 7 deletions src/server/container_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,9 @@ bool IterateSet(const PrimeValue& pv, const IterateFunc& func) {
return success;
}

bool IterateSortedSet(const detail::RobjWrapper* robj_wrapper, const IterateSortedFunc& func,
int32_t start, int32_t end, bool reverse, bool use_score) {
unsigned long llen = robj_wrapper->Size();
bool IterateSortedSet(const PrimeValue& pv, const IterateSortedFunc& func, int32_t start,
int32_t end, bool reverse, bool use_score) {
unsigned long llen = pv.Size();
if (end < 0 || unsigned(end) >= llen)
end = llen - 1;

Expand All @@ -191,8 +191,8 @@ bool IterateSortedSet(const detail::RobjWrapper* robj_wrapper, const IterateSort

unsigned rangelen = unsigned(end - start) + 1;

if (robj_wrapper->encoding() == OBJ_ENCODING_LISTPACK) {
uint8_t* zl = static_cast<uint8_t*>(robj_wrapper->inner_obj());
if (pv.Encoding() == OBJ_ENCODING_LISTPACK) {
uint8_t* zl = static_cast<uint8_t*>(pv.RObjPtr());
uint8_t *eptr, *sptr;
uint8_t* vstr;
unsigned int vlen;
Expand Down Expand Up @@ -231,8 +231,8 @@ bool IterateSortedSet(const detail::RobjWrapper* robj_wrapper, const IterateSort
}
return success;
} else {
CHECK_EQ(robj_wrapper->encoding(), OBJ_ENCODING_SKIPLIST);
detail::SortedMap* smap = (detail::SortedMap*)robj_wrapper->inner_obj();
CHECK_EQ(pv.Encoding(), OBJ_ENCODING_SKIPLIST);
auto* smap = static_cast<detail::SortedMap*>(pv.RObjPtr());
return smap->Iterate(start, rangelen, reverse, [&](sds ele, double score) {
return func(ContainerEntry{ele, sdslen(ele)}, score);
});
Expand Down
5 changes: 2 additions & 3 deletions src/server/container_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,8 @@ bool IterateSet(const PrimeValue& pv, const IterateFunc& func);
// Iterate over all values and call func(val). Iteration stops as soon
// as func return false. Returns true if it successfully processed all elements
// without stopping.
bool IterateSortedSet(const detail::RobjWrapper* robj_wrapper, const IterateSortedFunc& func,
int32_t start = 0, int32_t end = -1, bool reverse = false,
bool use_score = false);
bool IterateSortedSet(const PrimeValue& pv, const IterateSortedFunc& func, int32_t start = 0,
int32_t end = -1, bool reverse = false, bool use_score = false);

bool IterateMap(const PrimeValue& pv, const IterateKVFunc& func);

Expand Down
5 changes: 2 additions & 3 deletions src/server/debugcmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,7 @@ void AddObjHist(PrimeIterator it, ObjHist* hist) {
val_len = ql->MallocUsed(true);
}
} else if (pv.ObjType() == OBJ_ZSET) {
IterateSortedSet(pv.GetRobjWrapper(),
[&](ContainerEntry entry, double) { return per_entry_cb(entry); });
IterateSortedSet(pv, [&](ContainerEntry entry, double) { return per_entry_cb(entry); });
val_len = 0; // reset - will be calculated below.
if (pv.Encoding() == OBJ_ENCODING_LISTPACK) {
hist->listpack.Add(pv.MallocUsed());
Expand Down Expand Up @@ -312,7 +311,7 @@ void DoComputeHist(CompactObjType type, EngineShard* shard, ConnectionContext* c
}
} else if (type == OBJ_ZSET && it->second.ObjType() == OBJ_ZSET) {
container_utils::IterateSortedSet(
it->second.GetRobjWrapper(), [&](container_utils::ContainerEntry entry, double) {
it->second, [&](container_utils::ContainerEntry entry, double) {
++steps;
if (entry.value) {
HIST_add(dest->hist.data(), entry.value, entry.length);
Expand Down
3 changes: 1 addition & 2 deletions src/server/generic_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1437,8 +1437,7 @@ template <typename F> bool Iterate(const PrimeValue& pv, F&& func) {
return container_utils::IterateSet(pv, cb);
case OBJ_ZSET:
return container_utils::IterateSortedSet(
pv.GetRobjWrapper(),
[&cb](container_utils::ContainerEntry ce, double) { return cb(ce); });
pv, [&cb](container_utils::ContainerEntry ce, double) { return cb(ce); });
default:
return false;
}
Expand Down
2 changes: 1 addition & 1 deletion src/server/journal/cmd_serializer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ size_t CmdSerializer::SerializeZSet(string_view key, const PrimeValue& pv) {

size_t commands = 0;
container_utils::IterateSortedSet(
pv.GetRobjWrapper(),
pv,
[&](container_utils::ContainerEntry ce, double score) {
aggregator.AddArg(absl::StrCat(score), CommandAggregator::CommitMode::kNoCommit);
commands += aggregator.AddArg(ce.ToString());
Expand Down
5 changes: 2 additions & 3 deletions src/server/rdb_save.cc
Original file line number Diff line number Diff line change
Expand Up @@ -467,9 +467,8 @@ error_code RdbSerializer::SaveHSetObject(const PrimeValue& pv) {

error_code RdbSerializer::SaveZSetObject(const PrimeValue& pv) {
DCHECK_EQ(OBJ_ZSET, pv.ObjType());
const detail::RobjWrapper* robj_wrapper = pv.GetRobjWrapper();
if (pv.Encoding() == OBJ_ENCODING_SKIPLIST) {
detail::SortedMap* zs = (detail::SortedMap*)robj_wrapper->inner_obj();
auto* zs = static_cast<detail::SortedMap*>(pv.RObjPtr());

RETURN_ON_ERR(SaveLen(zs->Size()));
std::error_code ec;
Expand Down Expand Up @@ -497,7 +496,7 @@ error_code RdbSerializer::SaveZSetObject(const PrimeValue& pv) {
});
} else {
CHECK_EQ(pv.Encoding(), unsigned(OBJ_ENCODING_LISTPACK));
uint8_t* lp = (uint8_t*)robj_wrapper->inner_obj();
uint8_t* lp = (uint8_t*)pv.RObjPtr();
size_t lp_bytes = lpBytes(lp);

RETURN_ON_ERR(SaveString((uint8_t*)lp, lp_bytes));
Expand Down
Loading
Loading