mirror of
https://github.com/redis/redis.git
synced 2026-04-21 03:01:35 -04:00
Offload lookupCommand into IO threads when threaded IO is enabled (#13696)
From flame graph, we could see `lookupCommand` in main thread costs much CPU, so we can let IO threads to perform `lookupCommand`. To avoid race condition among multiple IO threads, made the following changes: - Pause all IO threads when register or unregister commands - Force a full rehashing of the command table dict when resizing
This commit is contained in:
@@ -277,6 +277,12 @@ int _dictResize(dict *d, unsigned long size, int* malloc_failed)
|
||||
return DICT_OK;
|
||||
}
|
||||
|
||||
/* Force a full rehashing of the dictionary */
|
||||
if (d->type->force_full_rehash) {
|
||||
while (dictRehash(d, 1000)) {
|
||||
/* Continue rehashing */
|
||||
}
|
||||
}
|
||||
return DICT_OK;
|
||||
}
|
||||
|
||||
|
||||
@@ -62,6 +62,10 @@ typedef struct dictType {
|
||||
unsigned int keys_are_odd:1;
|
||||
/* TODO: Add a 'keys_are_even' flag and use a similar optimization if that
|
||||
* flag is set. */
|
||||
|
||||
/* Ensures that the entire hash table is rehashed at once if set. */
|
||||
unsigned int force_full_rehash:1;
|
||||
|
||||
/* Sometimes we want the ability to store a key in a given way inside the hash
|
||||
* function, and lookup it in some other way without resorting to any kind of
|
||||
* conversion. For instance the key may be stored as a structure also
|
||||
|
||||
@@ -195,7 +195,7 @@ static int PausedIOThreads[IO_THREADS_MAX_NUM] = {0};
|
||||
|
||||
/* Pause the specific range of io threads, and wait for them to be paused. */
|
||||
void pauseIOThreadsRange(int start, int end) {
|
||||
if (server.io_threads_num <= 1) return;
|
||||
if (!server.io_threads_active) return;
|
||||
serverAssert(start >= 1 && end < server.io_threads_num && start <= end);
|
||||
serverAssert(pthread_equal(pthread_self(), server.main_thread_id));
|
||||
|
||||
@@ -227,7 +227,7 @@ void pauseIOThreadsRange(int start, int end) {
|
||||
|
||||
/* Resume the specific range of io threads, and wait for them to be resumed. */
|
||||
void resumeIOThreadsRange(int start, int end) {
|
||||
if (server.io_threads_num <= 1) return;
|
||||
if (!server.io_threads_active) return;
|
||||
serverAssert(start >= 1 && end < server.io_threads_num && start <= end);
|
||||
serverAssert(pthread_equal(pthread_self(), server.main_thread_id));
|
||||
|
||||
|
||||
11
src/module.c
11
src/module.c
@@ -664,7 +664,7 @@ void moduleReleaseTempClient(client *c) {
|
||||
c->bufpos = 0;
|
||||
c->flags = CLIENT_MODULE;
|
||||
c->user = NULL; /* Root user */
|
||||
c->cmd = c->lastcmd = c->realcmd = NULL;
|
||||
c->cmd = c->lastcmd = c->realcmd = c->iolookedcmd = NULL;
|
||||
if (c->bstate.async_rm_call_handle) {
|
||||
RedisModuleAsyncRMCallPromise *promise = c->bstate.async_rm_call_handle;
|
||||
promise->c = NULL; /* Remove the client from the promise so it will no longer be possible to abort it. */
|
||||
@@ -1276,8 +1276,11 @@ int RM_CreateCommand(RedisModuleCtx *ctx, const char *name, RedisModuleCmdFunc c
|
||||
RedisModuleCommand *cp = moduleCreateCommandProxy(ctx->module, declared_name, sdsdup(declared_name), cmdfunc, flags, firstkey, lastkey, keystep);
|
||||
cp->rediscmd->arity = cmdfunc ? -1 : -2; /* Default value, can be changed later via dedicated API */
|
||||
|
||||
pauseAllIOThreads();
|
||||
serverAssert(dictAdd(server.commands, sdsdup(declared_name), cp->rediscmd) == DICT_OK);
|
||||
serverAssert(dictAdd(server.orig_commands, sdsdup(declared_name), cp->rediscmd) == DICT_OK);
|
||||
resumeAllIOThreads();
|
||||
|
||||
cp->rediscmd->id = ACLGetCommandID(declared_name); /* ID used for ACL. */
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
@@ -10905,6 +10908,10 @@ void moduleCallCommandFilters(client *c) {
|
||||
f->callback(&filter);
|
||||
}
|
||||
|
||||
/* If the filter sets a new command, including command or subcommand,
|
||||
* the command looked up in IO threads will be invalid. */
|
||||
c->iolookedcmd = NULL;
|
||||
|
||||
c->argv = filter.argv;
|
||||
c->argv_len = filter.argv_len;
|
||||
c->argc = filter.argc;
|
||||
@@ -12321,6 +12328,7 @@ int moduleFreeCommand(struct RedisModule *module, struct redisCommand *cmd) {
|
||||
}
|
||||
|
||||
void moduleUnregisterCommands(struct RedisModule *module) {
|
||||
pauseAllIOThreads();
|
||||
/* Unregister all the commands registered by this module. */
|
||||
dictIterator *di = dictGetSafeIterator(server.commands);
|
||||
dictEntry *de;
|
||||
@@ -12335,6 +12343,7 @@ void moduleUnregisterCommands(struct RedisModule *module) {
|
||||
zfree(cmd);
|
||||
}
|
||||
dictReleaseIterator(di);
|
||||
resumeAllIOThreads();
|
||||
}
|
||||
|
||||
/* We parse argv to add sds "NAME VALUE" pairs to the server.module_configs_queue list of configs.
|
||||
|
||||
@@ -159,7 +159,7 @@ client *createClient(connection *conn) {
|
||||
c->argv_len_sum = 0;
|
||||
c->original_argc = 0;
|
||||
c->original_argv = NULL;
|
||||
c->cmd = c->lastcmd = c->realcmd = NULL;
|
||||
c->cmd = c->lastcmd = c->realcmd = c->iolookedcmd = NULL;
|
||||
c->cur_script = NULL;
|
||||
c->multibulklen = 0;
|
||||
c->bulklen = -1;
|
||||
@@ -1456,6 +1456,7 @@ static inline void freeClientArgvInternal(client *c, int free_argv) {
|
||||
decrRefCount(c->argv[j]);
|
||||
c->argc = 0;
|
||||
c->cmd = NULL;
|
||||
c->iolookedcmd = NULL;
|
||||
c->argv_len_sum = 0;
|
||||
if (free_argv) {
|
||||
c->argv_len = 0;
|
||||
@@ -2777,6 +2778,7 @@ int processInputBuffer(client *c) {
|
||||
* as one that needs to process the command. */
|
||||
if (c->running_tid != IOTHREAD_MAIN_THREAD_ID) {
|
||||
c->io_flags |= CLIENT_IO_PENDING_COMMAND;
|
||||
c->iolookedcmd = lookupCommand(c->argv, c->argc);
|
||||
enqueuePendingClientsToMainThread(c, 0);
|
||||
break;
|
||||
}
|
||||
|
||||
@@ -521,7 +521,8 @@ dictType commandTableDictType = {
|
||||
dictSdsKeyCaseCompare, /* key compare */
|
||||
dictSdsDestructor, /* key destructor */
|
||||
NULL, /* val destructor */
|
||||
NULL /* allow to expand */
|
||||
NULL, /* allow to expand */
|
||||
.force_full_rehash = 1, /* force full rehashing */
|
||||
};
|
||||
|
||||
/* Hash type hash table (note that small hashes are represented with listpacks) */
|
||||
@@ -3988,7 +3989,8 @@ int processCommand(client *c) {
|
||||
* In case we are reprocessing a command after it was blocked,
|
||||
* we do not have to repeat the same checks */
|
||||
if (!client_reprocessing_command) {
|
||||
c->cmd = c->lastcmd = c->realcmd = lookupCommand(c->argv,c->argc);
|
||||
c->cmd = c->lastcmd = c->realcmd = c->iolookedcmd ? c->iolookedcmd :
|
||||
lookupCommand(c->argv,c->argc);
|
||||
sds err;
|
||||
if (!commandCheckExistence(c, &err)) {
|
||||
rejectCommandSds(c, err);
|
||||
|
||||
@@ -1215,6 +1215,7 @@ typedef struct client {
|
||||
robj **original_argv; /* Arguments of original command if arguments were rewritten. */
|
||||
size_t argv_len_sum; /* Sum of lengths of objects in argv list. */
|
||||
struct redisCommand *cmd, *lastcmd; /* Last command executed. */
|
||||
struct redisCommand *iolookedcmd; /* Command looked up in IO threads. */
|
||||
struct redisCommand *realcmd; /* The original command that was executed by the client,
|
||||
Used to update error stats in case the c->cmd was modified
|
||||
during the command invocation (like on GEOADD for example). */
|
||||
|
||||
Reference in New Issue
Block a user