mirror of
https://github.com/redis/redis.git
synced 2026-04-21 03:01:35 -04:00
Add support for incremental defragmentation of global module data (#13815)
## Description Currently, when performing defragmentation on non-key data within the module, we cannot process the defragmentation incrementally. This limitation affects the efficiency and flexibility of defragmentation in certain scenarios. The primary goal of this PR is to introduce support for incremental defragmentation of global module data. ## Interface Change New module API `RegisterDefragFunc2` This is a more advanced version of `RM_RegisterDefragFunc`, in that it takes a new callbacks(`RegisterDefragFunc2`) that has a return value, and can use RM_DefragShouldStop in and indicate that it should be called again later, or is it done (returned 0). ## Note The `RegisterDefragFunc` API remains available. --------- Co-authored-by: ShooterIT <wangyuancode@163.com> Co-authored-by: oranagra <oran@redislabs.com>
This commit is contained in:
59
src/defrag.c
59
src/defrag.c
@@ -124,6 +124,12 @@ typedef struct {
|
||||
} defragPubSubCtx;
|
||||
static_assert(offsetof(defragPubSubCtx, kvstate) == 0, "defragStageKvstoreHelper requires this");
|
||||
|
||||
typedef struct {
|
||||
sds module_name;
|
||||
RedisModuleDefragCtx *module_ctx;
|
||||
unsigned long cursor;
|
||||
} defragModuleCtx;
|
||||
|
||||
/* this method was added to jemalloc in order to help us understand which
|
||||
* pointers are worthwhile moving and which aren't */
|
||||
int je_get_defrag_hint(void* ptr);
|
||||
@@ -1211,11 +1217,33 @@ static doneStatus defragLuaScripts(void *ctx, monotime endtime) {
|
||||
return DEFRAG_DONE;
|
||||
}
|
||||
|
||||
/* Handles defragmentation of module global data. This is a stage function
|
||||
* that gets called periodically during the active defragmentation process. */
|
||||
static doneStatus defragModuleGlobals(void *ctx, monotime endtime) {
|
||||
UNUSED(endtime);
|
||||
UNUSED(ctx);
|
||||
moduleDefragGlobals();
|
||||
return DEFRAG_DONE;
|
||||
defragModuleCtx *defrag_module_ctx = ctx;
|
||||
|
||||
RedisModule *module = moduleGetHandleByName(defrag_module_ctx->module_name);
|
||||
if (!module) {
|
||||
/* Module has been unloaded, nothing to defrag. */
|
||||
return DEFRAG_DONE;
|
||||
}
|
||||
|
||||
/* Set up context for the module's defrag callback. */
|
||||
defrag_module_ctx->module_ctx->endtime = endtime;
|
||||
defrag_module_ctx->module_ctx->cursor = &defrag_module_ctx->cursor;
|
||||
|
||||
/* Call appropriate version of module's defrag callback:
|
||||
* 1. Version 2 (defrag_cb_2): Supports incremental defrag and returns whether more work is needed
|
||||
* 2. Version 1 (defrag_cb): Legacy version, performs all work in one call.
|
||||
* Note: V1 doesn't support incremental defragmentation, may block for longer periods. */
|
||||
if (module->defrag_cb_2) {
|
||||
return module->defrag_cb_2(defrag_module_ctx->module_ctx) ? DEFRAG_NOT_DONE : DEFRAG_DONE;
|
||||
} else if (module->defrag_cb) {
|
||||
module->defrag_cb(defrag_module_ctx->module_ctx);
|
||||
return DEFRAG_DONE;
|
||||
} else {
|
||||
redis_unreachable();
|
||||
}
|
||||
}
|
||||
|
||||
static void freeDefragKeysContext(void *ctx) {
|
||||
@@ -1226,6 +1254,13 @@ static void freeDefragKeysContext(void *ctx) {
|
||||
zfree(defrag_keys_ctx);
|
||||
}
|
||||
|
||||
static void freeDefragModelContext(void *ctx) {
|
||||
defragModuleCtx *defrag_model_ctx = ctx;
|
||||
sdsfree(defrag_model_ctx->module_name);
|
||||
zfree(defrag_model_ctx->module_ctx);
|
||||
zfree(defrag_model_ctx);
|
||||
}
|
||||
|
||||
static void freeDefragContext(void *ptr) {
|
||||
StageDescriptor *stage = ptr;
|
||||
if (stage->ctx_free_fn)
|
||||
@@ -1508,7 +1543,21 @@ static void beginDefragCycle(void) {
|
||||
addDefragStage(defragStagePubsubKvstore, zfree, defrag_pubsubshard_ctx);
|
||||
|
||||
addDefragStage(defragLuaScripts, NULL, NULL);
|
||||
addDefragStage(defragModuleGlobals, NULL, NULL);
|
||||
|
||||
/* Add stages for modules. */
|
||||
dictIterator *di = dictGetIterator(modules);
|
||||
dictEntry *de;
|
||||
while ((de = dictNext(di)) != NULL) {
|
||||
struct RedisModule *module = dictGetVal(de);
|
||||
if (module->defrag_cb || module->defrag_cb_2) {
|
||||
defragModuleCtx *ctx = zmalloc(sizeof(defragModuleCtx));
|
||||
ctx->cursor = 0;
|
||||
ctx->module_name = sdsnew(module->name);
|
||||
ctx->module_ctx = zcalloc(sizeof(RedisModuleDefragCtx));
|
||||
addDefragStage(defragModuleGlobals, freeDefragModelContext, ctx);
|
||||
}
|
||||
}
|
||||
dictReleaseIterator(di);
|
||||
|
||||
defrag.current_stage = NULL;
|
||||
defrag.start_cycle = getMonotonicUs();
|
||||
|
||||
33
src/module.c
33
src/module.c
@@ -2309,6 +2309,7 @@ void RM_SetModuleAttribs(RedisModuleCtx *ctx, const char *name, int ver, int api
|
||||
module->options = 0;
|
||||
module->info_cb = 0;
|
||||
module->defrag_cb = 0;
|
||||
module->defrag_cb_2 = 0;
|
||||
module->defrag_start_cb = 0;
|
||||
module->defrag_end_cb = 0;
|
||||
module->loadmod = NULL;
|
||||
@@ -13783,16 +13784,6 @@ const char *RM_GetCurrentCommandName(RedisModuleCtx *ctx) {
|
||||
* ## Defrag API
|
||||
* -------------------------------------------------------------------------- */
|
||||
|
||||
/* The defrag context, used to manage state during calls to the data type
|
||||
* defrag callback.
|
||||
*/
|
||||
struct RedisModuleDefragCtx {
|
||||
monotime endtime;
|
||||
unsigned long *cursor;
|
||||
struct redisObject *key; /* Optional name of key processed, NULL when unknown. */
|
||||
int dbid; /* The dbid of the key being processed, -1 when unknown. */
|
||||
};
|
||||
|
||||
/* Register a defrag callback for global data, i.e. anything that the module
|
||||
* may allocate that is not tied to a specific data type.
|
||||
*/
|
||||
@@ -13801,6 +13792,17 @@ int RM_RegisterDefragFunc(RedisModuleCtx *ctx, RedisModuleDefragFunc cb) {
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
||||
/* Register a defrag callback for global data, i.e. anything that the module
|
||||
* may allocate that is not tied to a specific data type.
|
||||
* This is a more advanced version of RM_RegisterDefragFunc, in that it takes
|
||||
* a callbacks that has a return value, and can use RM_DefragShouldStop
|
||||
* in and indicate that it should be called again later, or is it done (returned 0).
|
||||
*/
|
||||
int RM_RegisterDefragFunc2(RedisModuleCtx *ctx, RedisModuleDefragFunc2 cb) {
|
||||
ctx->module->defrag_cb_2 = cb;
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
||||
/* Register a defrag callbacks that will be called when defrag operation starts and ends.
|
||||
*
|
||||
* The callbacks are the same as `RM_RegisterDefragFunc` but the user
|
||||
@@ -13992,16 +13994,6 @@ int moduleDefragValue(robj *key, robj *value, int dbid) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
/* Call registered module API defrag functions */
|
||||
void moduleDefragGlobals(void) {
|
||||
dictForEach(modules, struct RedisModule, module,
|
||||
if (module->defrag_cb) {
|
||||
RedisModuleDefragCtx defrag_ctx = { 0, NULL, NULL, -1};
|
||||
module->defrag_cb(&defrag_ctx);
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
/* Call registered module API defrag start functions */
|
||||
void moduleDefragStart(void) {
|
||||
dictForEach(modules, struct RedisModule, module,
|
||||
@@ -14381,6 +14373,7 @@ void moduleRegisterCoreAPI(void) {
|
||||
REGISTER_API(GetCurrentCommandName);
|
||||
REGISTER_API(GetTypeMethodVersion);
|
||||
REGISTER_API(RegisterDefragFunc);
|
||||
REGISTER_API(RegisterDefragFunc2);
|
||||
REGISTER_API(RegisterDefragCallbacks);
|
||||
REGISTER_API(DefragAlloc);
|
||||
REGISTER_API(DefragAllocRaw);
|
||||
|
||||
@@ -840,6 +840,7 @@ typedef struct RedisModuleDefragCtx RedisModuleDefragCtx;
|
||||
* exposed since you can't cast a function pointer to (void *). */
|
||||
typedef void (*RedisModuleInfoFunc)(RedisModuleInfoCtx *ctx, int for_crash_report);
|
||||
typedef void (*RedisModuleDefragFunc)(RedisModuleDefragCtx *ctx);
|
||||
typedef int (*RedisModuleDefragFunc2)(RedisModuleDefragCtx *ctx);
|
||||
typedef void (*RedisModuleUserChangedFunc) (uint64_t client_id, void *privdata);
|
||||
|
||||
/* ------------------------- End of common defines ------------------------ */
|
||||
@@ -1305,6 +1306,7 @@ REDISMODULE_API int *(*RedisModule_GetCommandKeys)(RedisModuleCtx *ctx, RedisMod
|
||||
REDISMODULE_API int *(*RedisModule_GetCommandKeysWithFlags)(RedisModuleCtx *ctx, RedisModuleString **argv, int argc, int *num_keys, int **out_flags) REDISMODULE_ATTR;
|
||||
REDISMODULE_API const char *(*RedisModule_GetCurrentCommandName)(RedisModuleCtx *ctx) REDISMODULE_ATTR;
|
||||
REDISMODULE_API int (*RedisModule_RegisterDefragFunc)(RedisModuleCtx *ctx, RedisModuleDefragFunc func) REDISMODULE_ATTR;
|
||||
REDISMODULE_API int (*RedisModule_RegisterDefragFunc2)(RedisModuleCtx *ctx, RedisModuleDefragFunc2 func) REDISMODULE_ATTR;
|
||||
REDISMODULE_API int (*RedisModule_RegisterDefragCallbacks)(RedisModuleCtx *ctx, RedisModuleDefragFunc start, RedisModuleDefragFunc end) REDISMODULE_ATTR;
|
||||
REDISMODULE_API void *(*RedisModule_DefragAlloc)(RedisModuleDefragCtx *ctx, void *ptr) REDISMODULE_ATTR;
|
||||
REDISMODULE_API void *(*RedisModule_DefragAllocRaw)(RedisModuleDefragCtx *ctx, size_t size) REDISMODULE_ATTR;
|
||||
@@ -1678,6 +1680,7 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
|
||||
REDISMODULE_GET_API(GetCommandKeysWithFlags);
|
||||
REDISMODULE_GET_API(GetCurrentCommandName);
|
||||
REDISMODULE_GET_API(RegisterDefragFunc);
|
||||
REDISMODULE_GET_API(RegisterDefragFunc2);
|
||||
REDISMODULE_GET_API(RegisterDefragCallbacks);
|
||||
REDISMODULE_GET_API(DefragAlloc);
|
||||
REDISMODULE_GET_API(DefragAllocRaw);
|
||||
|
||||
12
src/server.h
12
src/server.h
@@ -891,6 +891,7 @@ struct RedisModule {
|
||||
int blocked_clients; /* Count of RedisModuleBlockedClient in this module. */
|
||||
RedisModuleInfoFunc info_cb; /* Callback for module to add INFO fields. */
|
||||
RedisModuleDefragFunc defrag_cb; /* Callback for global data defrag. */
|
||||
RedisModuleDefragFunc2 defrag_cb_2; /* Version 2 callback for global data defrag. */
|
||||
RedisModuleDefragFunc defrag_start_cb; /* Callback indicating defrag started. */
|
||||
RedisModuleDefragFunc defrag_end_cb; /* Callback indicating defrag ended. */
|
||||
struct moduleLoadQueueEntry *loadmod; /* Module load arguments for config rewrite. */
|
||||
@@ -900,6 +901,16 @@ struct RedisModule {
|
||||
};
|
||||
typedef struct RedisModule RedisModule;
|
||||
|
||||
/* The defrag context, used to manage state during calls to the data type
|
||||
* defrag callback.
|
||||
*/
|
||||
struct RedisModuleDefragCtx {
|
||||
monotime endtime;
|
||||
unsigned long *cursor;
|
||||
struct redisObject *key; /* Optional name of key processed, NULL when unknown. */
|
||||
int dbid; /* The dbid of the key being processed, -1 when unknown. */
|
||||
};
|
||||
|
||||
/* This is a wrapper for the 'rio' streams used inside rdb.c in Redis, so that
|
||||
* the user does not have to take the total count of the written bytes nor
|
||||
* to care about error conditions. */
|
||||
@@ -2675,7 +2686,6 @@ size_t moduleGetMemUsage(robj *key, robj *val, size_t sample_size, int dbid);
|
||||
robj *moduleTypeDupOrReply(client *c, robj *fromkey, robj *tokey, int todb, robj *value);
|
||||
int moduleDefragValue(robj *key, robj *obj, int dbid);
|
||||
int moduleLateDefrag(robj *key, robj *value, unsigned long *cursor, monotime endtime, int dbid);
|
||||
void moduleDefragGlobals(void);
|
||||
void moduleDefragStart(void);
|
||||
void moduleDefragEnd(void);
|
||||
void *moduleGetHandleByName(char *modulename);
|
||||
|
||||
@@ -21,34 +21,45 @@ unsigned long int datatype_defragged = 0;
|
||||
unsigned long int datatype_raw_defragged = 0;
|
||||
unsigned long int datatype_resumes = 0;
|
||||
unsigned long int datatype_wrong_cursor = 0;
|
||||
unsigned long int global_attempts = 0;
|
||||
unsigned long int defrag_started = 0;
|
||||
unsigned long int defrag_ended = 0;
|
||||
unsigned long int global_defragged = 0;
|
||||
unsigned long int global_strings_attempts = 0;
|
||||
unsigned long int global_strings_defragged = 0;
|
||||
unsigned long int global_strings_pauses = 0;
|
||||
|
||||
int global_strings_len = 0;
|
||||
unsigned long global_strings_len = 0;
|
||||
RedisModuleString **global_strings = NULL;
|
||||
|
||||
static void createGlobalStrings(RedisModuleCtx *ctx, int count)
|
||||
static void createGlobalStrings(RedisModuleCtx *ctx, unsigned long count)
|
||||
{
|
||||
global_strings_len = count;
|
||||
global_strings = RedisModule_Alloc(sizeof(RedisModuleString *) * count);
|
||||
|
||||
for (int i = 0; i < count; i++) {
|
||||
for (unsigned long i = 0; i < count; i++) {
|
||||
global_strings[i] = RedisModule_CreateStringFromLongLong(ctx, i);
|
||||
}
|
||||
}
|
||||
|
||||
static void defragGlobalStrings(RedisModuleDefragCtx *ctx)
|
||||
static int defragGlobalStrings(RedisModuleDefragCtx *ctx)
|
||||
{
|
||||
for (int i = 0; i < global_strings_len; i++) {
|
||||
RedisModuleString *new = RedisModule_DefragRedisModuleString(ctx, global_strings[i]);
|
||||
global_attempts++;
|
||||
unsigned long cursor = 0;
|
||||
RedisModule_DefragCursorGet(ctx, &cursor);
|
||||
RedisModule_Assert(cursor < global_strings_len);
|
||||
for (; cursor < global_strings_len; cursor++) {
|
||||
RedisModuleString *new = RedisModule_DefragRedisModuleString(ctx, global_strings[cursor]);
|
||||
global_strings_attempts++;
|
||||
if (new != NULL) {
|
||||
global_strings[i] = new;
|
||||
global_defragged++;
|
||||
global_strings[cursor] = new;
|
||||
global_strings_defragged++;
|
||||
}
|
||||
|
||||
if (RedisModule_DefragShouldStop(ctx)) {
|
||||
global_strings_pauses++;
|
||||
RedisModule_DefragCursorSet(ctx, cursor);
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void defragStart(RedisModuleDefragCtx *ctx) {
|
||||
@@ -70,8 +81,9 @@ static void FragInfo(RedisModuleInfoCtx *ctx, int for_crash_report) {
|
||||
RedisModule_InfoAddFieldLongLong(ctx, "datatype_raw_defragged", datatype_raw_defragged);
|
||||
RedisModule_InfoAddFieldLongLong(ctx, "datatype_resumes", datatype_resumes);
|
||||
RedisModule_InfoAddFieldLongLong(ctx, "datatype_wrong_cursor", datatype_wrong_cursor);
|
||||
RedisModule_InfoAddFieldLongLong(ctx, "global_attempts", global_attempts);
|
||||
RedisModule_InfoAddFieldLongLong(ctx, "global_defragged", global_defragged);
|
||||
RedisModule_InfoAddFieldLongLong(ctx, "global_strings_attempts", global_strings_attempts);
|
||||
RedisModule_InfoAddFieldLongLong(ctx, "global_strings_defragged", global_strings_defragged);
|
||||
RedisModule_InfoAddFieldLongLong(ctx, "global_strings_pauses", global_strings_pauses);
|
||||
RedisModule_InfoAddFieldLongLong(ctx, "defrag_started", defrag_started);
|
||||
RedisModule_InfoAddFieldLongLong(ctx, "defrag_ended", defrag_ended);
|
||||
}
|
||||
@@ -99,8 +111,9 @@ static int fragResetStatsCommand(RedisModuleCtx *ctx, RedisModuleString **argv,
|
||||
datatype_raw_defragged = 0;
|
||||
datatype_resumes = 0;
|
||||
datatype_wrong_cursor = 0;
|
||||
global_attempts = 0;
|
||||
global_defragged = 0;
|
||||
global_strings_attempts = 0;
|
||||
global_strings_defragged = 0;
|
||||
global_strings_pauses = 0;
|
||||
defrag_started = 0;
|
||||
defrag_ended = 0;
|
||||
|
||||
@@ -258,7 +271,7 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
|
||||
return REDISMODULE_ERR;
|
||||
|
||||
RedisModule_RegisterInfoFunc(ctx, FragInfo);
|
||||
RedisModule_RegisterDefragFunc(ctx, defragGlobalStrings);
|
||||
RedisModule_RegisterDefragFunc2(ctx, defragGlobalStrings);
|
||||
RedisModule_RegisterDefragCallbacks(ctx, defragStart, defragEnd);
|
||||
|
||||
return REDISMODULE_OK;
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
set testmodule [file normalize tests/modules/defragtest.so]
|
||||
|
||||
start_server {tags {"modules"} overrides {{save ""}}} {
|
||||
r module load $testmodule 10000
|
||||
r module load $testmodule 50000
|
||||
r config set hz 100
|
||||
r config set active-defrag-ignore-bytes 1
|
||||
r config set active-defrag-threshold-lower 0
|
||||
@@ -46,7 +46,8 @@ start_server {tags {"modules"} overrides {{save ""}}} {
|
||||
|
||||
after 2000
|
||||
set info [r info defragtest_stats]
|
||||
assert {[getInfoProperty $info defragtest_global_attempts] > 0}
|
||||
assert {[getInfoProperty $info defragtest_global_strings_attempts] > 0}
|
||||
assert {[getInfoProperty $info defragtest_global_strings_pauses] > 0}
|
||||
assert_morethan [getInfoProperty $info defragtest_defrag_started] 0
|
||||
assert_morethan [getInfoProperty $info defragtest_defrag_ended] 0
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user