Skip to content

Commit

Permalink
feat(conn): move time-consuming ops out of ExecuteCommand for context (
Browse files Browse the repository at this point in the history
  • Loading branch information
PragmaTwice authored Oct 19, 2024
1 parent 4709734 commit 6ee4190
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 32 deletions.
58 changes: 31 additions & 27 deletions src/server/redis_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,6 @@ Status Connection::ExecuteCommand(engine::Context &ctx, const std::string &cmd_n

srv_->SlowlogPushEntryIfNeeded(&cmd_tokens, duration, this);
srv_->stats.IncrLatency(static_cast<uint64_t>(duration), cmd_name);
srv_->FeedMonitorConns(this, cmd_tokens);
return s;
}

Expand Down Expand Up @@ -498,36 +497,41 @@ void Connection::ExecuteCommands(std::deque<CommandTokens> *to_process_cmds) {
continue;
}

engine::Context ctx(srv_->storage);
// TODO: transaction support for index recording
std::vector<GlobalIndexer::RecordResult> index_records;
if (!srv_->index_mgr.index_map.empty() && IsCmdForIndexing(attributes) && !config->cluster_enabled) {
attributes->ForEachKeyRange(
[&, this](const std::vector<std::string> &args, const CommandKeyRange &key_range) {
key_range.ForEachKey(
[&, this](const std::string &key) {
auto res = srv_->indexer.Record(ctx, key, ns_);
if (res.IsOK()) {
index_records.push_back(*res);
} else if (!res.Is<Status::NoPrefixMatched>() && !res.Is<Status::TypeMismatched>()) {
LOG(WARNING) << "index recording failed for key: " << key;
}
},
args);
},
cmd_tokens);
}

SetLastCmd(cmd_name);
s = ExecuteCommand(ctx, cmd_name, cmd_tokens, current_cmd.get(), &reply);
// TODO: transaction support for index updating
for (const auto &record : index_records) {
auto s = GlobalIndexer::Update(ctx, record);
if (!s.IsOK() && !s.Is<Status::TypeMismatched>()) {
LOG(WARNING) << "index updating failed for key: " << record.key;
{
engine::Context ctx(srv_->storage);

// TODO: transaction support for index recording
std::vector<GlobalIndexer::RecordResult> index_records;
if (!srv_->index_mgr.index_map.empty() && IsCmdForIndexing(attributes) && !config->cluster_enabled) {
attributes->ForEachKeyRange(
[&, this](const std::vector<std::string> &args, const CommandKeyRange &key_range) {
key_range.ForEachKey(
[&, this](const std::string &key) {
auto res = srv_->indexer.Record(ctx, key, ns_);
if (res.IsOK()) {
index_records.push_back(*res);
} else if (!res.Is<Status::NoPrefixMatched>() && !res.Is<Status::TypeMismatched>()) {
LOG(WARNING) << "index recording failed for key: " << key;
}
},
args);
},
cmd_tokens);
}

s = ExecuteCommand(ctx, cmd_name, cmd_tokens, current_cmd.get(), &reply);
// TODO: transaction support for index updating
for (const auto &record : index_records) {
auto s = GlobalIndexer::Update(ctx, record);
if (!s.IsOK() && !s.Is<Status::TypeMismatched>()) {
LOG(WARNING) << "index updating failed for key: " << record.key;
}
}
}

srv_->FeedMonitorConns(this, cmd_tokens);

// Break the execution loop when occurring the blocking command like BLPOP or BRPOP,
// it will suspend the connection and wait for the wakeup signal.
if (s.Is<Status::BlockingCmd>()) {
Expand Down
14 changes: 9 additions & 5 deletions src/storage/scripting.cc
Original file line number Diff line number Diff line change
Expand Up @@ -814,13 +814,17 @@ int RedisGenericCommand(lua_State *lua, int raise_error) {

std::string output;
// TODO: make it possible for multiple redis commands in lua script to use the same txn context.
engine::Context ctx(srv->storage);
s = conn->ExecuteCommand(ctx, cmd_name, args, cmd.get(), &output);
if (!s) {
PushError(lua, s.Msg().data());
return raise_error ? RaiseError(lua) : 1;
{
engine::Context ctx(srv->storage);
s = conn->ExecuteCommand(ctx, cmd_name, args, cmd.get(), &output);
if (!s) {
PushError(lua, s.Msg().data());
return raise_error ? RaiseError(lua) : 1;
}
}

srv->FeedMonitorConns(conn, args);

RedisProtocolToLuaType(lua, output.data());
return 1;
}
Expand Down

0 comments on commit 6ee4190

Please sign in to comment.