diff --git a/src/defrag.c b/src/defrag.c index 4766e16370..3668da2b77 100644 --- a/src/defrag.c +++ b/src/defrag.c @@ -124,6 +124,12 @@ typedef struct { } defragPubSubCtx; static_assert(offsetof(defragPubSubCtx, kvstate) == 0, "defragStageKvstoreHelper requires this"); +typedef struct { + sds module_name; + RedisModuleDefragCtx *module_ctx; + unsigned long cursor; +} defragModuleCtx; + /* this method was added to jemalloc in order to help us understand which * pointers are worthwhile moving and which aren't */ int je_get_defrag_hint(void* ptr); @@ -1211,11 +1217,33 @@ static doneStatus defragLuaScripts(void *ctx, monotime endtime) { return DEFRAG_DONE; } +/* Handles defragmentation of module global data. This is a stage function + * that gets called periodically during the active defragmentation process. */ static doneStatus defragModuleGlobals(void *ctx, monotime endtime) { - UNUSED(endtime); - UNUSED(ctx); - moduleDefragGlobals(); - return DEFRAG_DONE; + defragModuleCtx *defrag_module_ctx = ctx; + + RedisModule *module = moduleGetHandleByName(defrag_module_ctx->module_name); + if (!module) { + /* Module has been unloaded, nothing to defrag. */ + return DEFRAG_DONE; + } + + /* Set up context for the module's defrag callback. */ + defrag_module_ctx->module_ctx->endtime = endtime; + defrag_module_ctx->module_ctx->cursor = &defrag_module_ctx->cursor; + + /* Call appropriate version of module's defrag callback: + * 1. Version 2 (defrag_cb_2): Supports incremental defrag and returns whether more work is needed + * 2. Version 1 (defrag_cb): Legacy version, performs all work in one call. + * Note: V1 doesn't support incremental defragmentation, may block for longer periods. */ + if (module->defrag_cb_2) { + return module->defrag_cb_2(defrag_module_ctx->module_ctx) ? DEFRAG_NOT_DONE : DEFRAG_DONE; + } else if (module->defrag_cb) { + module->defrag_cb(defrag_module_ctx->module_ctx); + return DEFRAG_DONE; + } else { + redis_unreachable(); + } } static void freeDefragKeysContext(void *ctx) { @@ -1226,6 +1254,13 @@ static void freeDefragKeysContext(void *ctx) { zfree(defrag_keys_ctx); } +static void freeDefragModelContext(void *ctx) { + defragModuleCtx *defrag_model_ctx = ctx; + sdsfree(defrag_model_ctx->module_name); + zfree(defrag_model_ctx->module_ctx); + zfree(defrag_model_ctx); +} + static void freeDefragContext(void *ptr) { StageDescriptor *stage = ptr; if (stage->ctx_free_fn) @@ -1508,7 +1543,21 @@ static void beginDefragCycle(void) { addDefragStage(defragStagePubsubKvstore, zfree, defrag_pubsubshard_ctx); addDefragStage(defragLuaScripts, NULL, NULL); - addDefragStage(defragModuleGlobals, NULL, NULL); + + /* Add stages for modules. */ + dictIterator *di = dictGetIterator(modules); + dictEntry *de; + while ((de = dictNext(di)) != NULL) { + struct RedisModule *module = dictGetVal(de); + if (module->defrag_cb || module->defrag_cb_2) { + defragModuleCtx *ctx = zmalloc(sizeof(defragModuleCtx)); + ctx->cursor = 0; + ctx->module_name = sdsnew(module->name); + ctx->module_ctx = zcalloc(sizeof(RedisModuleDefragCtx)); + addDefragStage(defragModuleGlobals, freeDefragModelContext, ctx); + } + } + dictReleaseIterator(di); defrag.current_stage = NULL; defrag.start_cycle = getMonotonicUs(); diff --git a/src/module.c b/src/module.c index 8ecd23862b..4fb0eba58b 100644 --- a/src/module.c +++ b/src/module.c @@ -2309,6 +2309,7 @@ void RM_SetModuleAttribs(RedisModuleCtx *ctx, const char *name, int ver, int api module->options = 0; module->info_cb = 0; module->defrag_cb = 0; + module->defrag_cb_2 = 0; module->defrag_start_cb = 0; module->defrag_end_cb = 0; module->loadmod = NULL; @@ -13783,16 +13784,6 @@ const char *RM_GetCurrentCommandName(RedisModuleCtx *ctx) { * ## Defrag API * -------------------------------------------------------------------------- */ -/* The defrag context, used to manage state during calls to the data type - * defrag callback. - */ -struct RedisModuleDefragCtx { - monotime endtime; - unsigned long *cursor; - struct redisObject *key; /* Optional name of key processed, NULL when unknown. */ - int dbid; /* The dbid of the key being processed, -1 when unknown. */ -}; - /* Register a defrag callback for global data, i.e. anything that the module * may allocate that is not tied to a specific data type. */ @@ -13801,6 +13792,17 @@ int RM_RegisterDefragFunc(RedisModuleCtx *ctx, RedisModuleDefragFunc cb) { return REDISMODULE_OK; } +/* Register a defrag callback for global data, i.e. anything that the module + * may allocate that is not tied to a specific data type. + * This is a more advanced version of RM_RegisterDefragFunc, in that it takes + * a callbacks that has a return value, and can use RM_DefragShouldStop + * in and indicate that it should be called again later, or is it done (returned 0). + */ +int RM_RegisterDefragFunc2(RedisModuleCtx *ctx, RedisModuleDefragFunc2 cb) { + ctx->module->defrag_cb_2 = cb; + return REDISMODULE_OK; +} + /* Register a defrag callbacks that will be called when defrag operation starts and ends. * * The callbacks are the same as `RM_RegisterDefragFunc` but the user @@ -13992,16 +13994,6 @@ int moduleDefragValue(robj *key, robj *value, int dbid) { return 1; } -/* Call registered module API defrag functions */ -void moduleDefragGlobals(void) { - dictForEach(modules, struct RedisModule, module, - if (module->defrag_cb) { - RedisModuleDefragCtx defrag_ctx = { 0, NULL, NULL, -1}; - module->defrag_cb(&defrag_ctx); - } - ); -} - /* Call registered module API defrag start functions */ void moduleDefragStart(void) { dictForEach(modules, struct RedisModule, module, @@ -14381,6 +14373,7 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(GetCurrentCommandName); REGISTER_API(GetTypeMethodVersion); REGISTER_API(RegisterDefragFunc); + REGISTER_API(RegisterDefragFunc2); REGISTER_API(RegisterDefragCallbacks); REGISTER_API(DefragAlloc); REGISTER_API(DefragAllocRaw); diff --git a/src/redismodule.h b/src/redismodule.h index 54f778315a..f7d8cc76af 100644 --- a/src/redismodule.h +++ b/src/redismodule.h @@ -840,6 +840,7 @@ typedef struct RedisModuleDefragCtx RedisModuleDefragCtx; * exposed since you can't cast a function pointer to (void *). */ typedef void (*RedisModuleInfoFunc)(RedisModuleInfoCtx *ctx, int for_crash_report); typedef void (*RedisModuleDefragFunc)(RedisModuleDefragCtx *ctx); +typedef int (*RedisModuleDefragFunc2)(RedisModuleDefragCtx *ctx); typedef void (*RedisModuleUserChangedFunc) (uint64_t client_id, void *privdata); /* ------------------------- End of common defines ------------------------ */ @@ -1305,6 +1306,7 @@ REDISMODULE_API int *(*RedisModule_GetCommandKeys)(RedisModuleCtx *ctx, RedisMod REDISMODULE_API int *(*RedisModule_GetCommandKeysWithFlags)(RedisModuleCtx *ctx, RedisModuleString **argv, int argc, int *num_keys, int **out_flags) REDISMODULE_ATTR; REDISMODULE_API const char *(*RedisModule_GetCurrentCommandName)(RedisModuleCtx *ctx) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_RegisterDefragFunc)(RedisModuleCtx *ctx, RedisModuleDefragFunc func) REDISMODULE_ATTR; +REDISMODULE_API int (*RedisModule_RegisterDefragFunc2)(RedisModuleCtx *ctx, RedisModuleDefragFunc2 func) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_RegisterDefragCallbacks)(RedisModuleCtx *ctx, RedisModuleDefragFunc start, RedisModuleDefragFunc end) REDISMODULE_ATTR; REDISMODULE_API void *(*RedisModule_DefragAlloc)(RedisModuleDefragCtx *ctx, void *ptr) REDISMODULE_ATTR; REDISMODULE_API void *(*RedisModule_DefragAllocRaw)(RedisModuleDefragCtx *ctx, size_t size) REDISMODULE_ATTR; @@ -1678,6 +1680,7 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int REDISMODULE_GET_API(GetCommandKeysWithFlags); REDISMODULE_GET_API(GetCurrentCommandName); REDISMODULE_GET_API(RegisterDefragFunc); + REDISMODULE_GET_API(RegisterDefragFunc2); REDISMODULE_GET_API(RegisterDefragCallbacks); REDISMODULE_GET_API(DefragAlloc); REDISMODULE_GET_API(DefragAllocRaw); diff --git a/src/server.h b/src/server.h index a98f1aa8a2..d133907220 100644 --- a/src/server.h +++ b/src/server.h @@ -891,6 +891,7 @@ struct RedisModule { int blocked_clients; /* Count of RedisModuleBlockedClient in this module. */ RedisModuleInfoFunc info_cb; /* Callback for module to add INFO fields. */ RedisModuleDefragFunc defrag_cb; /* Callback for global data defrag. */ + RedisModuleDefragFunc2 defrag_cb_2; /* Version 2 callback for global data defrag. */ RedisModuleDefragFunc defrag_start_cb; /* Callback indicating defrag started. */ RedisModuleDefragFunc defrag_end_cb; /* Callback indicating defrag ended. */ struct moduleLoadQueueEntry *loadmod; /* Module load arguments for config rewrite. */ @@ -900,6 +901,16 @@ struct RedisModule { }; typedef struct RedisModule RedisModule; +/* The defrag context, used to manage state during calls to the data type + * defrag callback. + */ +struct RedisModuleDefragCtx { + monotime endtime; + unsigned long *cursor; + struct redisObject *key; /* Optional name of key processed, NULL when unknown. */ + int dbid; /* The dbid of the key being processed, -1 when unknown. */ +}; + /* This is a wrapper for the 'rio' streams used inside rdb.c in Redis, so that * the user does not have to take the total count of the written bytes nor * to care about error conditions. */ @@ -2675,7 +2686,6 @@ size_t moduleGetMemUsage(robj *key, robj *val, size_t sample_size, int dbid); robj *moduleTypeDupOrReply(client *c, robj *fromkey, robj *tokey, int todb, robj *value); int moduleDefragValue(robj *key, robj *obj, int dbid); int moduleLateDefrag(robj *key, robj *value, unsigned long *cursor, monotime endtime, int dbid); -void moduleDefragGlobals(void); void moduleDefragStart(void); void moduleDefragEnd(void); void *moduleGetHandleByName(char *modulename); diff --git a/tests/modules/defragtest.c b/tests/modules/defragtest.c index 597b5aa79f..d436e56b07 100644 --- a/tests/modules/defragtest.c +++ b/tests/modules/defragtest.c @@ -21,34 +21,45 @@ unsigned long int datatype_defragged = 0; unsigned long int datatype_raw_defragged = 0; unsigned long int datatype_resumes = 0; unsigned long int datatype_wrong_cursor = 0; -unsigned long int global_attempts = 0; unsigned long int defrag_started = 0; unsigned long int defrag_ended = 0; -unsigned long int global_defragged = 0; +unsigned long int global_strings_attempts = 0; +unsigned long int global_strings_defragged = 0; +unsigned long int global_strings_pauses = 0; -int global_strings_len = 0; +unsigned long global_strings_len = 0; RedisModuleString **global_strings = NULL; -static void createGlobalStrings(RedisModuleCtx *ctx, int count) +static void createGlobalStrings(RedisModuleCtx *ctx, unsigned long count) { global_strings_len = count; global_strings = RedisModule_Alloc(sizeof(RedisModuleString *) * count); - for (int i = 0; i < count; i++) { + for (unsigned long i = 0; i < count; i++) { global_strings[i] = RedisModule_CreateStringFromLongLong(ctx, i); } } -static void defragGlobalStrings(RedisModuleDefragCtx *ctx) +static int defragGlobalStrings(RedisModuleDefragCtx *ctx) { - for (int i = 0; i < global_strings_len; i++) { - RedisModuleString *new = RedisModule_DefragRedisModuleString(ctx, global_strings[i]); - global_attempts++; + unsigned long cursor = 0; + RedisModule_DefragCursorGet(ctx, &cursor); + RedisModule_Assert(cursor < global_strings_len); + for (; cursor < global_strings_len; cursor++) { + RedisModuleString *new = RedisModule_DefragRedisModuleString(ctx, global_strings[cursor]); + global_strings_attempts++; if (new != NULL) { - global_strings[i] = new; - global_defragged++; + global_strings[cursor] = new; + global_strings_defragged++; + } + + if (RedisModule_DefragShouldStop(ctx)) { + global_strings_pauses++; + RedisModule_DefragCursorSet(ctx, cursor); + return 1; } } + return 0; } static void defragStart(RedisModuleDefragCtx *ctx) { @@ -70,8 +81,9 @@ static void FragInfo(RedisModuleInfoCtx *ctx, int for_crash_report) { RedisModule_InfoAddFieldLongLong(ctx, "datatype_raw_defragged", datatype_raw_defragged); RedisModule_InfoAddFieldLongLong(ctx, "datatype_resumes", datatype_resumes); RedisModule_InfoAddFieldLongLong(ctx, "datatype_wrong_cursor", datatype_wrong_cursor); - RedisModule_InfoAddFieldLongLong(ctx, "global_attempts", global_attempts); - RedisModule_InfoAddFieldLongLong(ctx, "global_defragged", global_defragged); + RedisModule_InfoAddFieldLongLong(ctx, "global_strings_attempts", global_strings_attempts); + RedisModule_InfoAddFieldLongLong(ctx, "global_strings_defragged", global_strings_defragged); + RedisModule_InfoAddFieldLongLong(ctx, "global_strings_pauses", global_strings_pauses); RedisModule_InfoAddFieldLongLong(ctx, "defrag_started", defrag_started); RedisModule_InfoAddFieldLongLong(ctx, "defrag_ended", defrag_ended); } @@ -99,8 +111,9 @@ static int fragResetStatsCommand(RedisModuleCtx *ctx, RedisModuleString **argv, datatype_raw_defragged = 0; datatype_resumes = 0; datatype_wrong_cursor = 0; - global_attempts = 0; - global_defragged = 0; + global_strings_attempts = 0; + global_strings_defragged = 0; + global_strings_pauses = 0; defrag_started = 0; defrag_ended = 0; @@ -258,7 +271,7 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) return REDISMODULE_ERR; RedisModule_RegisterInfoFunc(ctx, FragInfo); - RedisModule_RegisterDefragFunc(ctx, defragGlobalStrings); + RedisModule_RegisterDefragFunc2(ctx, defragGlobalStrings); RedisModule_RegisterDefragCallbacks(ctx, defragStart, defragEnd); return REDISMODULE_OK; diff --git a/tests/unit/moduleapi/defrag.tcl b/tests/unit/moduleapi/defrag.tcl index 3f36ee1912..9e7efb673c 100644 --- a/tests/unit/moduleapi/defrag.tcl +++ b/tests/unit/moduleapi/defrag.tcl @@ -1,7 +1,7 @@ set testmodule [file normalize tests/modules/defragtest.so] start_server {tags {"modules"} overrides {{save ""}}} { - r module load $testmodule 10000 + r module load $testmodule 50000 r config set hz 100 r config set active-defrag-ignore-bytes 1 r config set active-defrag-threshold-lower 0 @@ -46,7 +46,8 @@ start_server {tags {"modules"} overrides {{save ""}}} { after 2000 set info [r info defragtest_stats] - assert {[getInfoProperty $info defragtest_global_attempts] > 0} + assert {[getInfoProperty $info defragtest_global_strings_attempts] > 0} + assert {[getInfoProperty $info defragtest_global_strings_pauses] > 0} assert_morethan [getInfoProperty $info defragtest_defrag_started] 0 assert_morethan [getInfoProperty $info defragtest_defrag_ended] 0 }