From 7665bdc91aa6f98289eefdcbdb2def4467864b7a Mon Sep 17 00:00:00 2001 From: Yuan Wang Date: Wed, 25 Dec 2024 16:03:22 +0800 Subject: [PATCH] Offload `lookupCommand` into IO threads when threaded IO is enabled (#13696) From flame graph, we could see `lookupCommand` in main thread costs much CPU, so we can let IO threads to perform `lookupCommand`. To avoid race condition among multiple IO threads, made the following changes: - Pause all IO threads when register or unregister commands - Force a full rehashing of the command table dict when resizing --- src/dict.c | 6 ++++++ src/dict.h | 4 ++++ src/iothread.c | 4 ++-- src/module.c | 11 ++++++++++- src/networking.c | 4 +++- src/server.c | 6 ++++-- src/server.h | 1 + 7 files changed, 30 insertions(+), 6 deletions(-) diff --git a/src/dict.c b/src/dict.c index 3bfcc70170..bf183422ae 100644 --- a/src/dict.c +++ b/src/dict.c @@ -277,6 +277,12 @@ int _dictResize(dict *d, unsigned long size, int* malloc_failed) return DICT_OK; } + /* Force a full rehashing of the dictionary */ + if (d->type->force_full_rehash) { + while (dictRehash(d, 1000)) { + /* Continue rehashing */ + } + } return DICT_OK; } diff --git a/src/dict.h b/src/dict.h index 12a5c99185..fc4554ae23 100644 --- a/src/dict.h +++ b/src/dict.h @@ -62,6 +62,10 @@ typedef struct dictType { unsigned int keys_are_odd:1; /* TODO: Add a 'keys_are_even' flag and use a similar optimization if that * flag is set. */ + + /* Ensures that the entire hash table is rehashed at once if set. */ + unsigned int force_full_rehash:1; + /* Sometimes we want the ability to store a key in a given way inside the hash * function, and lookup it in some other way without resorting to any kind of * conversion. For instance the key may be stored as a structure also diff --git a/src/iothread.c b/src/iothread.c index 2e5c98a285..e3da683d50 100644 --- a/src/iothread.c +++ b/src/iothread.c @@ -195,7 +195,7 @@ static int PausedIOThreads[IO_THREADS_MAX_NUM] = {0}; /* Pause the specific range of io threads, and wait for them to be paused. */ void pauseIOThreadsRange(int start, int end) { - if (server.io_threads_num <= 1) return; + if (!server.io_threads_active) return; serverAssert(start >= 1 && end < server.io_threads_num && start <= end); serverAssert(pthread_equal(pthread_self(), server.main_thread_id)); @@ -227,7 +227,7 @@ void pauseIOThreadsRange(int start, int end) { /* Resume the specific range of io threads, and wait for them to be resumed. */ void resumeIOThreadsRange(int start, int end) { - if (server.io_threads_num <= 1) return; + if (!server.io_threads_active) return; serverAssert(start >= 1 && end < server.io_threads_num && start <= end); serverAssert(pthread_equal(pthread_self(), server.main_thread_id)); diff --git a/src/module.c b/src/module.c index f12d03b479..f662ebb308 100644 --- a/src/module.c +++ b/src/module.c @@ -664,7 +664,7 @@ void moduleReleaseTempClient(client *c) { c->bufpos = 0; c->flags = CLIENT_MODULE; c->user = NULL; /* Root user */ - c->cmd = c->lastcmd = c->realcmd = NULL; + c->cmd = c->lastcmd = c->realcmd = c->iolookedcmd = NULL; if (c->bstate.async_rm_call_handle) { RedisModuleAsyncRMCallPromise *promise = c->bstate.async_rm_call_handle; promise->c = NULL; /* Remove the client from the promise so it will no longer be possible to abort it. */ @@ -1276,8 +1276,11 @@ int RM_CreateCommand(RedisModuleCtx *ctx, const char *name, RedisModuleCmdFunc c RedisModuleCommand *cp = moduleCreateCommandProxy(ctx->module, declared_name, sdsdup(declared_name), cmdfunc, flags, firstkey, lastkey, keystep); cp->rediscmd->arity = cmdfunc ? -1 : -2; /* Default value, can be changed later via dedicated API */ + pauseAllIOThreads(); serverAssert(dictAdd(server.commands, sdsdup(declared_name), cp->rediscmd) == DICT_OK); serverAssert(dictAdd(server.orig_commands, sdsdup(declared_name), cp->rediscmd) == DICT_OK); + resumeAllIOThreads(); + cp->rediscmd->id = ACLGetCommandID(declared_name); /* ID used for ACL. */ return REDISMODULE_OK; } @@ -10905,6 +10908,10 @@ void moduleCallCommandFilters(client *c) { f->callback(&filter); } + /* If the filter sets a new command, including command or subcommand, + * the command looked up in IO threads will be invalid. */ + c->iolookedcmd = NULL; + c->argv = filter.argv; c->argv_len = filter.argv_len; c->argc = filter.argc; @@ -12321,6 +12328,7 @@ int moduleFreeCommand(struct RedisModule *module, struct redisCommand *cmd) { } void moduleUnregisterCommands(struct RedisModule *module) { + pauseAllIOThreads(); /* Unregister all the commands registered by this module. */ dictIterator *di = dictGetSafeIterator(server.commands); dictEntry *de; @@ -12335,6 +12343,7 @@ void moduleUnregisterCommands(struct RedisModule *module) { zfree(cmd); } dictReleaseIterator(di); + resumeAllIOThreads(); } /* We parse argv to add sds "NAME VALUE" pairs to the server.module_configs_queue list of configs. diff --git a/src/networking.c b/src/networking.c index 8fb37af081..e3d0d25e4b 100644 --- a/src/networking.c +++ b/src/networking.c @@ -159,7 +159,7 @@ client *createClient(connection *conn) { c->argv_len_sum = 0; c->original_argc = 0; c->original_argv = NULL; - c->cmd = c->lastcmd = c->realcmd = NULL; + c->cmd = c->lastcmd = c->realcmd = c->iolookedcmd = NULL; c->cur_script = NULL; c->multibulklen = 0; c->bulklen = -1; @@ -1456,6 +1456,7 @@ static inline void freeClientArgvInternal(client *c, int free_argv) { decrRefCount(c->argv[j]); c->argc = 0; c->cmd = NULL; + c->iolookedcmd = NULL; c->argv_len_sum = 0; if (free_argv) { c->argv_len = 0; @@ -2777,6 +2778,7 @@ int processInputBuffer(client *c) { * as one that needs to process the command. */ if (c->running_tid != IOTHREAD_MAIN_THREAD_ID) { c->io_flags |= CLIENT_IO_PENDING_COMMAND; + c->iolookedcmd = lookupCommand(c->argv, c->argc); enqueuePendingClientsToMainThread(c, 0); break; } diff --git a/src/server.c b/src/server.c index 0b4c95ce88..a9b2b637ce 100644 --- a/src/server.c +++ b/src/server.c @@ -521,7 +521,8 @@ dictType commandTableDictType = { dictSdsKeyCaseCompare, /* key compare */ dictSdsDestructor, /* key destructor */ NULL, /* val destructor */ - NULL /* allow to expand */ + NULL, /* allow to expand */ + .force_full_rehash = 1, /* force full rehashing */ }; /* Hash type hash table (note that small hashes are represented with listpacks) */ @@ -3988,7 +3989,8 @@ int processCommand(client *c) { * In case we are reprocessing a command after it was blocked, * we do not have to repeat the same checks */ if (!client_reprocessing_command) { - c->cmd = c->lastcmd = c->realcmd = lookupCommand(c->argv,c->argc); + c->cmd = c->lastcmd = c->realcmd = c->iolookedcmd ? c->iolookedcmd : + lookupCommand(c->argv,c->argc); sds err; if (!commandCheckExistence(c, &err)) { rejectCommandSds(c, err); diff --git a/src/server.h b/src/server.h index bc965999e4..3fe5621840 100644 --- a/src/server.h +++ b/src/server.h @@ -1215,6 +1215,7 @@ typedef struct client { robj **original_argv; /* Arguments of original command if arguments were rewritten. */ size_t argv_len_sum; /* Sum of lengths of objects in argv list. */ struct redisCommand *cmd, *lastcmd; /* Last command executed. */ + struct redisCommand *iolookedcmd; /* Command looked up in IO threads. */ struct redisCommand *realcmd; /* The original command that was executed by the client, Used to update error stats in case the c->cmd was modified during the command invocation (like on GEOADD for example). */