mirror of
https://github.com/redis/redis.git
synced 2026-04-21 03:01:35 -04:00
Don't keep global replication buffer reference for replicas marked CLIENT_CLOSE_ASAP (#13363)
In certain situations, we might generate a large number of propagates (e.g., multi/exec, Lua script, or a single command generating tons of propagations) within an event loop. During the process of propagating to a replica, if the replica is disconnected(marked as CLIENT_CLOSE_ASAP) due to exceeding the output buffer limit, we should remove its reference to the global replication buffer to avoid the global replication buffer being unable to be properly trimmed due to being referenced. --------- Co-authored-by: oranagra <oran@redislabs.com>
This commit is contained in:
@@ -1712,6 +1712,9 @@ void freeClientAsync(client *c) {
|
||||
* idle. */
|
||||
if (c->flags & CLIENT_CLOSE_ASAP || c->flags & CLIENT_SCRIPT) return;
|
||||
c->flags |= CLIENT_CLOSE_ASAP;
|
||||
/* Replicas that was marked as CLIENT_CLOSE_ASAP should not keep the
|
||||
* replication backlog from been trimmed. */
|
||||
if (c->flags & CLIENT_SLAVE) freeReplicaReferencedReplBuffer(c);
|
||||
if (server.io_threads_num == 1) {
|
||||
/* no need to bother with locking if there's just one thread (the main thread) */
|
||||
listAddNodeTail(server.clients_to_close,c);
|
||||
|
||||
@@ -210,6 +210,9 @@ int canFeedReplicaReplBuffer(client *replica) {
|
||||
/* Don't feed replicas that are still waiting for BGSAVE to start. */
|
||||
if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_START) return 0;
|
||||
|
||||
/* Don't feed replicas that are going to be closed ASAP. */
|
||||
if (replica->flags & CLIENT_CLOSE_ASAP) return 0;
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
|
||||
@@ -323,6 +323,21 @@ int propagateTestIncr(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
||||
int propagateTestVerbatim(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||
if (argc < 2){
|
||||
RedisModule_WrongArity(ctx);
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
||||
long long replicate_num;
|
||||
RedisModule_StringToLongLong(argv[1], &replicate_num);
|
||||
/* Replicate the command verbatim for the specified number of times. */
|
||||
for (long long i = 0; i < replicate_num; i++)
|
||||
RedisModule_ReplicateVerbatim(ctx);
|
||||
RedisModule_ReplyWithSimpleString(ctx,"OK");
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
||||
int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||
REDISMODULE_NOT_USED(argv);
|
||||
REDISMODULE_NOT_USED(argc);
|
||||
@@ -389,6 +404,11 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
|
||||
propagateTestIncr,
|
||||
"write",1,1,1) == REDISMODULE_ERR)
|
||||
return REDISMODULE_ERR;
|
||||
|
||||
if (RedisModule_CreateCommand(ctx,"propagate-test.verbatim",
|
||||
propagateTestVerbatim,
|
||||
"write",1,1,1) == REDISMODULE_ERR)
|
||||
return REDISMODULE_ERR;
|
||||
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
||||
@@ -761,3 +761,41 @@ tags "modules aof" {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
# This test does not really test module functionality, but rather uses a module
|
||||
# command to test Redis replication mechanisms.
|
||||
test {Replicas that was marked as CLIENT_CLOSE_ASAP should not keep the replication backlog from been trimmed} {
|
||||
start_server [list overrides [list loadmodule "$testmodule"]] {
|
||||
set replica [srv 0 client]
|
||||
start_server [list overrides [list loadmodule "$testmodule"]] {
|
||||
set master [srv 0 client]
|
||||
set master_host [srv 0 host]
|
||||
set master_port [srv 0 port]
|
||||
$master config set client-output-buffer-limit "replica 10mb 5mb 0"
|
||||
|
||||
# Start the replication process...
|
||||
$replica replicaof $master_host $master_port
|
||||
wait_for_sync $replica
|
||||
|
||||
test {module propagates from timer} {
|
||||
# Replicate large commands to make the replica disconnected.
|
||||
$master write [format_command propagate-test.verbatim 100000 [string repeat "a" 1000]] ;# almost 100mb
|
||||
# Execute this command together with module commands within the same
|
||||
# event loop to prevent periodic cleanup of replication backlog.
|
||||
$master write [format_command info memory]
|
||||
$master flush
|
||||
$master read ;# propagate-test.verbatim
|
||||
set res [$master read] ;# info memory
|
||||
|
||||
# Wait for the replica to be disconnected.
|
||||
wait_for_log_messages 0 {"*flags=S*scheduled to be closed ASAP for overcoming of output buffer limits*"} 0 1500 10
|
||||
# Due to the replica reaching the soft limit (5MB), memory peaks should not significantly
|
||||
# exceed the replica soft limit. Furthermore, as the replica release its reference to
|
||||
# replication backlog, it should be properly trimmed, the memory usage of replication
|
||||
# backlog should not significantly exceed repl-backlog-size (default 1MB). */
|
||||
assert_lessthan [getInfoProperty $res used_memory_peak] 10000000;# less than 10mb
|
||||
assert_lessthan [getInfoProperty $res mem_replication_backlog] 2000000;# less than 2mb
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user