mirror of
https://github.com/redis/redis.git
synced 2026-04-21 03:01:35 -04:00
Rebuild function engines for function flush command (#13383)
### Issue The current implementation of `FUNCTION FLUSH` command uses `lua_unref()` to unreference script closures in Lua vm. However, invoking `lua_unref()` during lazy free (`ASYNC` argument) is risky since it is not thread-safe. Another issue is that using `lua_unref()` to unreference references does not trigger GC, This can result in the Lua VM leaves a significant amount of garbage, which may never be cleaned up if not properly GC. ### Solution The proposed solution is to completely rebuild the engines, resulting in a brand new Lua VM. --------- Co-authored-by: meir <meir@redis.com>
This commit is contained in:
@@ -183,6 +183,12 @@ static void luaEngineFreeFunction(void *engine_ctx, void *compiled_function) {
|
||||
zfree(f_ctx);
|
||||
}
|
||||
|
||||
static void luaEngineFreeCtx(void *engine_ctx) {
|
||||
luaEngineCtx *lua_engine_ctx = engine_ctx;
|
||||
lua_close(lua_engine_ctx->lua);
|
||||
zfree(lua_engine_ctx);
|
||||
}
|
||||
|
||||
static void luaRegisterFunctionArgsInitialize(registerFunctionArgs *register_f_args,
|
||||
sds name,
|
||||
sds desc,
|
||||
@@ -480,6 +486,7 @@ int luaEngineInitEngine(void) {
|
||||
.get_function_memory_overhead = luaEngineFunctionMemoryOverhead,
|
||||
.get_engine_memory_overhead = luaEngineMemoryOverhead,
|
||||
.free_function = luaEngineFreeFunction,
|
||||
.free_ctx = luaEngineFreeCtx,
|
||||
};
|
||||
return functionsRegisterEngine(LUA_ENGINE_NAME, lua_engine);
|
||||
}
|
||||
|
||||
@@ -24,6 +24,7 @@ static size_t engine_cache_memory = 0;
|
||||
static void engineFunctionDispose(dict *d, void *obj);
|
||||
static void engineStatsDispose(dict *d, void *obj);
|
||||
static void engineLibraryDispose(dict *d, void *obj);
|
||||
static void engineDispose(dict *d, void *obj);
|
||||
static int functionsVerifyName(sds name);
|
||||
|
||||
typedef struct functionsLibEngineStats {
|
||||
@@ -50,7 +51,7 @@ dictType engineDictType = {
|
||||
NULL, /* val dup */
|
||||
dictSdsKeyCaseCompare, /* key compare */
|
||||
dictSdsDestructor, /* key destructor */
|
||||
NULL, /* val destructor */
|
||||
engineDispose, /* val destructor */
|
||||
NULL /* allow to expand */
|
||||
};
|
||||
|
||||
@@ -148,6 +149,16 @@ static void engineLibraryDispose(dict *d, void *obj) {
|
||||
engineLibraryFree(obj);
|
||||
}
|
||||
|
||||
static void engineDispose(dict *d, void *obj) {
|
||||
UNUSED(d);
|
||||
engineInfo *ei = obj;
|
||||
freeClient(ei->c);
|
||||
sdsfree(ei->name);
|
||||
ei->engine->free_ctx(ei->engine->engine_ctx);
|
||||
zfree(ei->engine);
|
||||
zfree(ei);
|
||||
}
|
||||
|
||||
/* Clear all the functions from the given library ctx */
|
||||
void functionsLibCtxClear(functionsLibCtx *lib_ctx) {
|
||||
dictEmpty(lib_ctx->functions, NULL);
|
||||
@@ -166,11 +177,13 @@ void functionsLibCtxClear(functionsLibCtx *lib_ctx) {
|
||||
void functionsLibCtxClearCurrent(int async) {
|
||||
if (async) {
|
||||
functionsLibCtx *old_l_ctx = curr_functions_lib_ctx;
|
||||
curr_functions_lib_ctx = functionsLibCtxCreate();
|
||||
freeFunctionsAsync(old_l_ctx);
|
||||
dict *old_engines = engines;
|
||||
freeFunctionsAsync(old_l_ctx, old_engines);
|
||||
} else {
|
||||
functionsLibCtxClear(curr_functions_lib_ctx);
|
||||
functionsLibCtxFree(curr_functions_lib_ctx);
|
||||
dictRelease(engines);
|
||||
}
|
||||
functionsInit();
|
||||
}
|
||||
|
||||
/* Free the given functions ctx */
|
||||
|
||||
@@ -67,6 +67,9 @@ typedef struct engine {
|
||||
|
||||
/* free the given function */
|
||||
void (*free_function)(void *engine_ctx, void *compiled_function);
|
||||
|
||||
/* Free the engine context. */
|
||||
void (*free_ctx)(void *engine_ctx);
|
||||
} engine;
|
||||
|
||||
/* Hold information about an engine.
|
||||
@@ -116,5 +119,6 @@ int functionLibCreateFunction(sds name, void *function, functionLibInfo *li, sds
|
||||
|
||||
int luaEngineInitEngine(void);
|
||||
int functionsInit(void);
|
||||
void functionsFree(functionsLibCtx *lib_ctx, dict *engs);
|
||||
|
||||
#endif /* __FUNCTIONS_H_ */
|
||||
|
||||
@@ -72,8 +72,11 @@ void lazyFreeLuaScripts(void *args[]) {
|
||||
/* Release the functions ctx. */
|
||||
void lazyFreeFunctionsCtx(void *args[]) {
|
||||
functionsLibCtx *functions_lib_ctx = args[0];
|
||||
dict *engs = args[1];
|
||||
size_t len = functionsLibCtxFunctionsLen(functions_lib_ctx);
|
||||
functionsLibCtxFree(functions_lib_ctx);
|
||||
len += dictSize(engs);
|
||||
dictRelease(engs);
|
||||
atomicDecr(lazyfree_objects,len);
|
||||
atomicIncr(lazyfreed_objects,len);
|
||||
}
|
||||
@@ -247,12 +250,13 @@ void freeLuaScriptsAsync(dict *lua_scripts, list *lua_scripts_lru_list, lua_Stat
|
||||
}
|
||||
|
||||
/* Free functions ctx, if the functions ctx contains enough functions, free it in async way. */
|
||||
void freeFunctionsAsync(functionsLibCtx *functions_lib_ctx) {
|
||||
void freeFunctionsAsync(functionsLibCtx *functions_lib_ctx, dict *engs) {
|
||||
if (functionsLibCtxFunctionsLen(functions_lib_ctx) > LAZYFREE_THRESHOLD) {
|
||||
atomicIncr(lazyfree_objects,functionsLibCtxFunctionsLen(functions_lib_ctx));
|
||||
bioCreateLazyFreeJob(lazyFreeFunctionsCtx,1,functions_lib_ctx);
|
||||
atomicIncr(lazyfree_objects,functionsLibCtxFunctionsLen(functions_lib_ctx)+dictSize(engs));
|
||||
bioCreateLazyFreeJob(lazyFreeFunctionsCtx,2,functions_lib_ctx,engs);
|
||||
} else {
|
||||
functionsLibCtxFree(functions_lib_ctx);
|
||||
dictRelease(engs);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -3470,7 +3470,7 @@ int ldbPendingChildren(void);
|
||||
void luaLdbLineHook(lua_State *lua, lua_Debug *ar);
|
||||
void freeLuaScriptsSync(dict *lua_scripts, list *lua_scripts_lru_list, lua_State *lua);
|
||||
void freeLuaScriptsAsync(dict *lua_scripts, list *lua_scripts_lru_list, lua_State *lua);
|
||||
void freeFunctionsAsync(functionsLibCtx *lib_ctx);
|
||||
void freeFunctionsAsync(functionsLibCtx *functions_lib_ctx, dict *engines);
|
||||
int ldbIsEnabled(void);
|
||||
void ldbLog(sds entry);
|
||||
void ldbLogRedisReply(char *reply);
|
||||
|
||||
@@ -294,6 +294,31 @@ start_server {tags {"scripting"}} {
|
||||
assert_match {} [r function list]
|
||||
}
|
||||
|
||||
test {FUNCTION - async function flush rebuilds Lua VM without causing race condition between main and lazyfree thread} {
|
||||
# LAZYFREE_THRESHOLD is 64
|
||||
for {set i 0} {$i < 100} {incr i} {
|
||||
r function load REPLACE [get_function_code lua test$i test$i {local a = 1 while true do a = a + 1 end}]
|
||||
}
|
||||
assert_morethan [s used_memory_vm_functions] 70000
|
||||
r config resetstat
|
||||
r function flush async
|
||||
assert_lessthan [s used_memory_vm_functions] 40000
|
||||
|
||||
# Wait for the completion of lazy free for both functions and engines.
|
||||
set start_time [clock seconds]
|
||||
while {1} {
|
||||
# Tests for race conditions between async function flushes and main thread Lua VM operations.
|
||||
r function load REPLACE [get_function_code lua test test {local a = 1 while true do a = a + 1 end}]
|
||||
if {[s lazyfreed_objects] == 101 || [expr {[clock seconds] - $start_time}] > 5} {
|
||||
break
|
||||
}
|
||||
}
|
||||
if {[s lazyfreed_objects] != 101} {
|
||||
error "Timeout or unexpected number of lazyfreed_objects: [s lazyfreed_objects]"
|
||||
}
|
||||
assert_match {{library_name test engine LUA functions {{name test description {} flags {}}}}} [r function list]
|
||||
}
|
||||
|
||||
test {FUNCTION - test function wrong argument} {
|
||||
catch {r function flush bad_arg} e
|
||||
assert_match {*only supports SYNC|ASYNC*} $e
|
||||
|
||||
Reference in New Issue
Block a user