diff --git a/src/server/redis_connection.cc b/src/server/redis_connection.cc index e742b1683c4..6647c308528 100644 --- a/src/server/redis_connection.cc +++ b/src/server/redis_connection.cc @@ -358,7 +358,6 @@ Status Connection::ExecuteCommand(engine::Context &ctx, const std::string &cmd_n srv_->SlowlogPushEntryIfNeeded(&cmd_tokens, duration, this); srv_->stats.IncrLatency(static_cast(duration), cmd_name); - srv_->FeedMonitorConns(this, cmd_tokens); return s; } @@ -498,36 +497,41 @@ void Connection::ExecuteCommands(std::deque *to_process_cmds) { continue; } - engine::Context ctx(srv_->storage); - // TODO: transaction support for index recording - std::vector index_records; - if (!srv_->index_mgr.index_map.empty() && IsCmdForIndexing(attributes) && !config->cluster_enabled) { - attributes->ForEachKeyRange( - [&, this](const std::vector &args, const CommandKeyRange &key_range) { - key_range.ForEachKey( - [&, this](const std::string &key) { - auto res = srv_->indexer.Record(ctx, key, ns_); - if (res.IsOK()) { - index_records.push_back(*res); - } else if (!res.Is() && !res.Is()) { - LOG(WARNING) << "index recording failed for key: " << key; - } - }, - args); - }, - cmd_tokens); - } - SetLastCmd(cmd_name); - s = ExecuteCommand(ctx, cmd_name, cmd_tokens, current_cmd.get(), &reply); - // TODO: transaction support for index updating - for (const auto &record : index_records) { - auto s = GlobalIndexer::Update(ctx, record); - if (!s.IsOK() && !s.Is()) { - LOG(WARNING) << "index updating failed for key: " << record.key; + { + engine::Context ctx(srv_->storage); + + // TODO: transaction support for index recording + std::vector index_records; + if (!srv_->index_mgr.index_map.empty() && IsCmdForIndexing(attributes) && !config->cluster_enabled) { + attributes->ForEachKeyRange( + [&, this](const std::vector &args, const CommandKeyRange &key_range) { + key_range.ForEachKey( + [&, this](const std::string &key) { + auto res = srv_->indexer.Record(ctx, key, ns_); + if (res.IsOK()) { + index_records.push_back(*res); + } else if (!res.Is() && !res.Is()) { + LOG(WARNING) << "index recording failed for key: " << key; + } + }, + args); + }, + cmd_tokens); + } + + s = ExecuteCommand(ctx, cmd_name, cmd_tokens, current_cmd.get(), &reply); + // TODO: transaction support for index updating + for (const auto &record : index_records) { + auto s = GlobalIndexer::Update(ctx, record); + if (!s.IsOK() && !s.Is()) { + LOG(WARNING) << "index updating failed for key: " << record.key; + } } } + srv_->FeedMonitorConns(this, cmd_tokens); + // Break the execution loop when occurring the blocking command like BLPOP or BRPOP, // it will suspend the connection and wait for the wakeup signal. if (s.Is()) { diff --git a/src/storage/scripting.cc b/src/storage/scripting.cc index 6e05e05db6d..f38d942237f 100644 --- a/src/storage/scripting.cc +++ b/src/storage/scripting.cc @@ -814,13 +814,17 @@ int RedisGenericCommand(lua_State *lua, int raise_error) { std::string output; // TODO: make it possible for multiple redis commands in lua script to use the same txn context. - engine::Context ctx(srv->storage); - s = conn->ExecuteCommand(ctx, cmd_name, args, cmd.get(), &output); - if (!s) { - PushError(lua, s.Msg().data()); - return raise_error ? RaiseError(lua) : 1; + { + engine::Context ctx(srv->storage); + s = conn->ExecuteCommand(ctx, cmd_name, args, cmd.get(), &output); + if (!s) { + PushError(lua, s.Msg().data()); + return raise_error ? RaiseError(lua) : 1; + } } + srv->FeedMonitorConns(conn, args); + RedisProtocolToLuaType(lua, output.data()); return 1; }