Skip to content

Commit 697b442

Browse files
authored
chore: to move some of the command-specific state into CommandContext (#6199)
Main change is around command latency measurement, where we concentrate the logic in one central place: CommandContext::RecordLatency. Related to #6196 Signed-off-by: Roman Gershman <[email protected]>
1 parent 90d41bf commit 697b442

File tree

11 files changed

+146
-117
lines changed

11 files changed

+146
-117
lines changed

src/server/command_registry.cc

Lines changed: 63 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ using namespace facade;
4141

4242
using absl::AsciiStrToUpper;
4343
using absl::GetFlag;
44+
using absl::StrCat;
4445
using absl::StrSplit;
4546

4647
namespace {
@@ -128,16 +129,56 @@ constexpr int32_t kLatencyHistogramPrecision = 2;
128129

129130
} // namespace
130131

132+
void CommandContext::RecordLatency(facade::ArgSlice tail_args) const {
133+
DCHECK_GT(start_time_ns, 0u);
134+
int64_t after = absl::GetCurrentTimeNanos();
135+
136+
ServerState* ss = ServerState::tlocal(); // Might have migrated thread, read after invocation
137+
int64_t execution_time_usec = (after - start_time_ns) / 1000;
138+
139+
cid->RecordLatency(ss->thread_index(), execution_time_usec);
140+
141+
DCHECK(conn_cntx != nullptr);
142+
143+
// TODO: we should probably discard more commands here,
144+
// not just the blocking ones
145+
const auto* conn = conn_cntx->conn();
146+
if (conn_cntx->conn_state.squashing_info) {
147+
// We run from the squashed transaction.
148+
conn = conn_cntx->conn_state.squashing_info->owner->conn();
149+
}
150+
151+
if (!(cid->opt_mask() & CO::BLOCKING) && conn != nullptr &&
152+
// Use SafeTLocal() to avoid accessing the wrong thread local instance
153+
ServerState::SafeTLocal()->ShouldLogSlowCmd(execution_time_usec)) {
154+
vector<string> aux_params;
155+
CmdArgVec aux_slices;
156+
157+
if (tail_args.empty() && cid->name() == "EXEC") {
158+
// abuse tail_args to pass more information about the slow EXEC.
159+
aux_params.emplace_back(StrCat("CMDCOUNT/", exec_body_len));
160+
aux_slices.emplace_back(aux_params.back());
161+
tail_args = absl::MakeSpan(aux_slices);
162+
}
163+
ServerState::SafeTLocal()->GetSlowLog().Add(cid->name(), tail_args, conn->GetName(),
164+
conn->RemoteEndpointStr(), execution_time_usec,
165+
absl::GetCurrentTimeNanos() / 1000);
166+
}
167+
}
168+
131169
CommandId::CommandId(const char* name, uint32_t mask, int8_t arity, int8_t first_key,
132170
int8_t last_key, std::optional<uint32_t> acl_categories)
133171
: facade::CommandId(name, ImplicitCategories(mask), arity, first_key, last_key,
134172
acl_categories.value_or(ImplicitAclCategories(mask))) {
135173
implicit_acl_ = !acl_categories.has_value();
136-
hdr_histogram* hist = nullptr;
137-
const int init_result = hdr_init(kLatencyHistogramMinValue, kLatencyHistogramMaxValue,
138-
kLatencyHistogramPrecision, &hist);
139-
CHECK_EQ(init_result, 0) << "failed to initialize histogram for command " << name;
140-
latency_histogram_ = hist;
174+
bool is_latency_tracked = GetFlag(FLAGS_latency_tracking);
175+
if (is_latency_tracked) {
176+
hdr_histogram* hist = nullptr;
177+
const int init_result = hdr_init(kLatencyHistogramMinValue, kLatencyHistogramMaxValue,
178+
kLatencyHistogramPrecision, &hist);
179+
CHECK_EQ(init_result, 0) << "failed to initialize histogram for command " << name;
180+
latency_histogram_ = hist;
181+
}
141182

142183
if (name_.rfind("EVAL", 0) == 0)
143184
kind_multi_ctr_ = CO::MultiControlKind::EVAL;
@@ -149,6 +190,7 @@ CommandId::CommandId(const char* name, uint32_t mask, int8_t arity, int8_t first
149190
kind_pubsub_ = CO::PubSubKind::PATTERN;
150191
else if (base::_in(name_, {"SPUBLISH", "SSUBSCRIBE", "SUNSUBSCRIBE"}))
151192
kind_pubsub_ = CO::PubSubKind::SHARDED;
193+
can_be_monitored_ = (opt_mask_ & CO::ADMIN) == 0 && name_ != "EXEC";
152194
}
153195

154196
CommandId::~CommandId() {
@@ -169,8 +211,10 @@ CommandId CommandId::Clone(const std::string_view name) const {
169211

170212
// explicit sharing of the object since it's an alias we can do that.
171213
// I am assuming that the source object lifetime is at least as of the cloned object.
172-
hdr_close(cloned.latency_histogram_); // Free the histogram in the cloned object.
173-
cloned.latency_histogram_ = static_cast<hdr_histogram*>(latency_histogram_);
214+
if (cloned.latency_histogram_) {
215+
hdr_close(cloned.latency_histogram_); // Free the histogram in the cloned object.
216+
cloned.latency_histogram_ = static_cast<hdr_histogram*>(latency_histogram_);
217+
}
174218
return cloned;
175219
}
176220

@@ -189,28 +233,6 @@ bool CommandId::IsMultiTransactional() const {
189233
return kind_multi_ctr_.has_value();
190234
}
191235

192-
uint64_t CommandId::Invoke(CmdArgList args, const CommandContext& cmd_cntx) const {
193-
uint64_t before = cmd_cntx.conn_cntx->conn_state.cmd_start_time_ns;
194-
DCHECK_GT(before, 0u);
195-
handler_(args, cmd_cntx);
196-
int64_t after = absl::GetCurrentTimeNanos();
197-
198-
ServerState* ss = ServerState::tlocal(); // Might have migrated thread, read after invocation
199-
int64_t execution_time_usec = (after - before) / 1000;
200-
201-
auto& ent = command_stats_[ss->thread_index()];
202-
203-
++ent.first;
204-
ent.second += execution_time_usec;
205-
static const bool is_latency_tracked = GetFlag(FLAGS_latency_tracking);
206-
if (is_latency_tracked) {
207-
if (hdr_histogram* cmd_histogram = latency_histogram_; cmd_histogram != nullptr) {
208-
hdr_record_value(cmd_histogram, execution_time_usec);
209-
}
210-
}
211-
return execution_time_usec;
212-
}
213-
214236
optional<facade::ErrorReply> CommandId::Validate(CmdArgList tail_args) const {
215237
if ((arity() > 0 && tail_args.size() + 1 != size_t(arity())) ||
216238
(arity() < 0 && tail_args.size() + 1 < size_t(-arity()))) {
@@ -238,8 +260,15 @@ void CommandId::ResetStats(unsigned thread_index) {
238260
}
239261
}
240262

241-
hdr_histogram* CommandId::LatencyHist() const {
242-
return latency_histogram_;
263+
void CommandId::RecordLatency(unsigned tid, uint64_t latency_usec) const {
264+
auto& ent = command_stats_[tid];
265+
266+
++ent.first;
267+
ent.second += latency_usec;
268+
269+
if (latency_histogram_) {
270+
hdr_record_value(latency_histogram_, latency_usec);
271+
}
243272
}
244273

245274
CommandRegistry::CommandRegistry() {
@@ -280,7 +309,7 @@ CommandRegistry& CommandRegistry::operator<<(CommandId cmd) {
280309
if (it->second.empty()) {
281310
return *this; // Incase of empty string we want to remove the command from registry.
282311
}
283-
k = is_sub_command ? absl::StrCat(it->second, " ", maybe_subcommand[1]) : it->second;
312+
k = is_sub_command ? StrCat(it->second, " ", maybe_subcommand[1]) : it->second;
284313
}
285314

286315
if (restricted_cmds_.find(k) != restricted_cmds_.end()) {
@@ -333,7 +362,7 @@ std::pair<const CommandId*, ParsedArgs> CommandRegistry::FindExtended(string_vie
333362
}
334363

335364
auto second_cmd = absl::AsciiStrToUpper(tail_args.Front());
336-
string full_cmd = absl::StrCat(cmd, " ", second_cmd);
365+
string full_cmd = StrCat(cmd, " ", second_cmd);
337366

338367
return {Find(full_cmd), tail_args.Tail()};
339368
}
@@ -355,7 +384,7 @@ absl::flat_hash_map<std::string, hdr_histogram*> CommandRegistry::LatencyMap() c
355384
absl::flat_hash_map<std::string, hdr_histogram*> cmd_latencies;
356385
cmd_latencies.reserve(cmd_map_.size());
357386
for (const auto& [cmd_name, cmd] : cmd_map_) {
358-
cmd_latencies.insert({absl::AsciiStrToLower(cmd_name), cmd.LatencyHist()});
387+
cmd_latencies.insert({absl::AsciiStrToLower(cmd_name), cmd.GetLatencyHist()});
359388
}
360389
return cmd_latencies;
361390
}

src/server/command_registry.h

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -70,14 +70,25 @@ enum class MultiControlKind : uint8_t {
7070
// Per thread vector of command stats. Each entry is {cmd_calls, cmd_latency_agg in usec}.
7171
using CmdCallStats = std::pair<uint64_t, uint64_t>;
7272

73+
class CommandId;
74+
7375
struct CommandContext {
74-
CommandContext(Transaction* _tx, facade::SinkReplyBuilder* _rb, ConnectionContext* cntx)
75-
: tx(_tx), rb(_rb), conn_cntx(cntx) {
76+
CommandContext(const CommandId* _cid, Transaction* _tx, facade::SinkReplyBuilder* _rb,
77+
ConnectionContext* cntx)
78+
: cid(_cid), tx(_tx), rb(_rb), conn_cntx(cntx) {
7679
}
7780

81+
const CommandId* cid;
7882
Transaction* tx;
7983
facade::SinkReplyBuilder* rb;
8084
ConnectionContext* conn_cntx;
85+
86+
uint64_t start_time_ns = 0;
87+
88+
// number of commands in the last exec body.
89+
unsigned exec_body_len = 0;
90+
91+
void RecordLatency(facade::ArgSlice tail_args) const;
8192
};
8293

8394
// TODO: move it to helio
@@ -103,7 +114,7 @@ template <typename T> class MoveOnly {
103114
}
104115

105116
private:
106-
T value_;
117+
T value_{};
107118
};
108119

109120
class CommandId : public facade::CommandId {
@@ -131,7 +142,9 @@ class CommandId : public facade::CommandId {
131142
std::optional<facade::ErrorReply>(CmdArgList) const>;
132143

133144
// Returns the invoke time in usec.
134-
uint64_t Invoke(CmdArgList args, const CommandContext& cmd_cntx) const;
145+
void Invoke(CmdArgList args, const CommandContext& cmd_cntx) const {
146+
handler_(args, cmd_cntx);
147+
}
135148

136149
// Returns error if validation failed, otherwise nullopt
137150
std::optional<facade::ErrorReply> Validate(CmdArgList tail_args) const;
@@ -152,6 +165,10 @@ class CommandId : public facade::CommandId {
152165
return opt_mask_ & CO::BLOCKING;
153166
}
154167

168+
bool CanBeMonitored() const {
169+
return can_be_monitored_;
170+
}
171+
155172
CommandId&& SetHandler(Handler3 f) && {
156173
handler_ = std::move(f);
157174
return std::move(*this);
@@ -181,7 +198,9 @@ class CommandId : public facade::CommandId {
181198
return is_alias_;
182199
}
183200

184-
hdr_histogram* LatencyHist() const;
201+
hdr_histogram* GetLatencyHist() const {
202+
return latency_histogram_;
203+
}
185204

186205
std::optional<CO::PubSubKind> PubSubKind() const {
187206
return kind_pubsub_;
@@ -192,13 +211,17 @@ class CommandId : public facade::CommandId {
192211
return kind_multi_ctr_;
193212
}
194213

214+
void RecordLatency(unsigned tid, uint64_t latency_usec) const;
215+
195216
private:
196217
std::optional<CO::PubSubKind> kind_pubsub_;
197218
std::optional<CO::MultiControlKind> kind_multi_ctr_;
198219

199220
// The following fields must copy manually in the move constructor.
200221
bool implicit_acl_;
201222
bool is_alias_{false};
223+
bool can_be_monitored_{true};
224+
202225
std::unique_ptr<CmdCallStats[]> command_stats_;
203226
Handler3 handler_;
204227
ArgValidator validator_;

src/server/conn_context.h

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -284,7 +284,6 @@ struct ConnectionState {
284284
std::unique_ptr<ScriptInfo> script_info;
285285
std::unique_ptr<SubscribeInfo> subscribe_info;
286286
ClientTracking tracking_info_;
287-
uint64_t cmd_start_time_ns = 0; // time when the last command started executing
288287
};
289288

290289
class ConnectionContext : public facade::ConnectionContext {
@@ -295,17 +294,14 @@ class ConnectionContext : public facade::ConnectionContext {
295294
struct DebugInfo {
296295
uint32_t shards_count = 0;
297296
TxClock clock = 0;
298-
299-
// number of commands in the last exec body.
300-
unsigned exec_body_len = 0;
301297
};
302298

303299
DebugInfo last_command_debug;
304300

305301
// TODO: to introduce proper accessors.
306302
Namespace* ns = nullptr;
307303
Transaction* transaction = nullptr;
308-
const CommandId* cid = nullptr;
304+
// const CommandId* cid = nullptr;
309305

310306
ConnectionState conn_state;
311307

src/server/debugcmd.cc

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1632,8 +1632,8 @@ void DebugCmd::DoPopulateBatch(const PopulateOptions& options, const PopulateBat
16321632
crb.SetReplyMode(ReplyMode::NONE);
16331633
stub_tx->InitByArgs(cntx_->ns, local_cntx.conn_state.db_index, args_span);
16341634

1635-
sf_.service().InvokeCmd(cid, args_span,
1636-
CommandContext{local_cntx.transaction, &crb, &local_cntx});
1635+
CommandContext cmd_cntx{cid, local_cntx.transaction, &crb, &local_cntx};
1636+
sf_.service().InvokeCmd(args_span, &cmd_cntx);
16371637
}
16381638

16391639
if (options.expire_ttl_range.has_value()) {
@@ -1653,8 +1653,8 @@ void DebugCmd::DoPopulateBatch(const PopulateOptions& options, const PopulateBat
16531653
crb.SetReplyMode(ReplyMode::NONE);
16541654
stub_tx->MultiSwitchCmd(cid);
16551655
stub_tx->InitByArgs(cntx_->ns, local_cntx.conn_state.db_index, args_span);
1656-
sf_.service().InvokeCmd(cid, args_span,
1657-
CommandContext{local_cntx.transaction, &crb, &local_cntx});
1656+
CommandContext cmd_cntx{cid, local_cntx.transaction, &crb, &local_cntx};
1657+
sf_.service().InvokeCmd(args_span, &cmd_cntx);
16581658
}
16591659
}
16601660

src/server/dflycmd.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -592,7 +592,7 @@ void DflyCmd::TakeOver(CmdArgList args, RedisReplyBuilder* rb, ConnectionContext
592592
VLOG(1) << "Takeover accepted, shutting down.";
593593
std::string save_arg = "NOSAVE";
594594
MutableSlice sargs(save_arg);
595-
sf_->ShutdownCmd(CmdArgList(&sargs, 1), CommandContext{nullptr, rb, nullptr});
595+
sf_->ShutdownCmd(CmdArgList(&sargs, 1), CommandContext{nullptr, nullptr, rb, nullptr});
596596
return;
597597
}
598598

src/server/hset_family.cc

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -547,7 +547,7 @@ void HSetEx(CmdArgList args, const CommandContext& cmd_cntx) {
547547
OpSetParams op_sp;
548548

549549
const auto option_already_set = [&cmd_cntx] {
550-
return cmd_cntx.rb->SendError(WrongNumArgsError(cmd_cntx.conn_cntx->cid->name()));
550+
return cmd_cntx.rb->SendError(WrongNumArgsError(cmd_cntx.cid->name()));
551551
};
552552

553553
while (true) {
@@ -580,8 +580,7 @@ void HSetEx(CmdArgList args, const CommandContext& cmd_cntx) {
580580
CmdArgList fields = parser.Tail();
581581

582582
if (fields.size() % 2 != 0) {
583-
return cmd_cntx.rb->SendError(facade::WrongNumArgsError(cmd_cntx.conn_cntx->cid->name()),
584-
kSyntaxErrType);
583+
return cmd_cntx.rb->SendError(facade::WrongNumArgsError(cmd_cntx.cid->name()), kSyntaxErrType);
585584
}
586585

587586
auto cb = [&](Transaction* t, EngineShard* shard) {
@@ -851,7 +850,7 @@ void HSetFamily::HScan(CmdArgList args, const CommandContext& cmd_cntx) {
851850
void HSetFamily::HSet(CmdArgList args, const CommandContext& cmd_cntx) {
852851
string_view key = ArgS(args, 0);
853852

854-
string_view cmd{cmd_cntx.conn_cntx->cid->name()};
853+
string_view cmd{cmd_cntx.cid->name()};
855854

856855
if (args.size() % 2 != 1) {
857856
return cmd_cntx.rb->SendError(facade::WrongNumArgsError(cmd), kSyntaxErrType);

0 commit comments

Comments
 (0)