Start AOFRW before streaming repl buffer during fullsync (#13758)

During fullsync, before loading RDB on the replica, we stop aof child to
prevent copy-on-write disaster.
Once rdb is loaded, aof is started again and it will trigger aof
rewrite. With https://github.com/redis/redis/pull/13732 , for rdbchannel
replication, this behavior was changed. Currently, we start aof after
replication buffer is streamed to db. This PR changes it back to start
aof just after rdb is loaded (before repl buffer is streamed)

Both approaches may have pros and cons. If we start aof before streaming
repl buffers, we may still face with copy-on-write issues as repl
buffers potentially include large amount of changes. If we wait until
replication buffer drained, it means we are delaying starting aof
persistence.

Additional changes are introduced as part of this PR:

- Interface change:
Added `mem_replica_full_sync_buffer` field to the `INFO MEMORY` command
reply. During full sync, it shows total memory consumed by accumulated
replication stream buffer on replica. Added same metric to `MEMORY
STATS` command reply as `replica.fullsync.buffer` field.
  
  
- Fixes: 
- Count repl stream buffer size of replica as part of 'memory overhead'
calculation for fields in "INFO MEMORY" and "MEMORY STATS" outputs.
Before this PR, repl buffer was not counted as part of memory overhead
calculation, causing misreports for fields like `used_memory_overhead`
and `used_memory_dataset` in "INFO STATS" and for `overhead.total` field
in "MEMORY STATS" command reply.
- Dismiss replication stream buffers memory of replica in the fork to
reduce COW impact during a fork.
- Fixed a few time sensitive flaky tests, deleted a noop statement,
fixed some comments and fail messages in rdbchannel tests.
This commit is contained in:
Ozan Tezcan
2025-02-04 21:40:18 +03:00
committed by GitHub
parent 870b6bd487
commit 09f8a2f374
6 changed files with 52 additions and 18 deletions

View File

@@ -29,6 +29,9 @@
"replication.backlog": {
"type": "integer"
},
"replica.fullsync.buffer": {
"type": "integer"
},
"clients.slaves": {
"type": "integer"
},

View File

@@ -1209,6 +1209,9 @@ struct redisMemOverhead *getMemoryOverheadData(void) {
server.repl_backlog->blocks_index->numnodes * sizeof(raxNode) +
raxSize(server.repl_backlog->blocks_index) * sizeof(void*);
}
mh->replica_fullsync_buffer = server.repl_full_sync_buffer.mem_used;
mem_total += mh->replica_fullsync_buffer;
mem_total += mh->repl_backlog;
mem_total += mh->clients_slaves;
@@ -1560,7 +1563,7 @@ NULL
} else if (!strcasecmp(c->argv[1]->ptr,"stats") && c->argc == 2) {
struct redisMemOverhead *mh = getMemoryOverheadData();
addReplyMapLen(c,32+mh->num_dbs);
addReplyMapLen(c,33+mh->num_dbs);
addReplyBulkCString(c,"peak.allocated");
addReplyLongLong(c,mh->peak_allocated);
@@ -1574,6 +1577,9 @@ NULL
addReplyBulkCString(c,"replication.backlog");
addReplyLongLong(c,mh->repl_backlog);
addReplyBulkCString(c,"replica.fullsync.buffer");
addReplyLongLong(c,mh->replica_fullsync_buffer);
addReplyBulkCString(c,"clients.slaves");
addReplyLongLong(c,mh->clients_slaves);

View File

@@ -2444,6 +2444,14 @@ void readSyncBulkPayload(connection *conn) {
/* Send the initial ACK immediately to put this replica in online state. */
if (usemark) replicationSendAck();
/* Restart the AOF subsystem now that we finished the sync. This
* will trigger an AOF rewrite, and when done will start appending
* to the new file. */
if (server.aof_enabled) {
serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Starting AOF after a successful sync");
startAppendOnlyWithRetry();
}
if (rdbchannel) {
int close_asap;
@@ -2465,13 +2473,6 @@ void readSyncBulkPayload(connection *conn) {
freeClientAsync(server.master);
}
/* Restart the AOF subsystem now that we finished the sync. This
* will trigger an AOF rewrite, and when done will start appending
* to the new file. */
if (server.aof_enabled) {
serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Starting AOF after a successful sync");
startAppendOnlyWithRetry();
}
return;
error:
@@ -3671,6 +3672,7 @@ static void rdbChannelReplDataBufInit(void) {
serverAssert(server.repl_full_sync_buffer.blocks == NULL);
server.repl_full_sync_buffer.size = 0;
server.repl_full_sync_buffer.used = 0;
server.repl_full_sync_buffer.mem_used = 0;
server.repl_full_sync_buffer.blocks = listCreate();
server.repl_full_sync_buffer.blocks->free = zfree;
}
@@ -3682,6 +3684,7 @@ static void rdbChannelReplDataBufFree(void) {
server.repl_full_sync_buffer.blocks = NULL;
server.repl_full_sync_buffer.size = 0;
server.repl_full_sync_buffer.used = 0;
server.repl_full_sync_buffer.mem_used = 0;
}
/* Replication: Replica side.
@@ -3752,6 +3755,7 @@ void rdbChannelBufferReplData(connection *conn) {
listAddNodeTail(server.repl_full_sync_buffer.blocks, tail);
server.repl_full_sync_buffer.size += tail->size;
server.repl_full_sync_buffer.mem_used += usable_size + sizeof(listNode);
/* Update buffer's peak */
if (server.repl_full_sync_buffer.peak < server.repl_full_sync_buffer.size)
@@ -3791,7 +3795,8 @@ int rdbChannelStreamReplDataToDb(client *c) {
server.repl_full_sync_buffer.used -= used;
server.repl_full_sync_buffer.size -= size;
server.repl_full_sync_buffer.mem_used -= (size + sizeof(listNode) +
sizeof(replDataBufBlock));
if (server.repl_debug_pause & REPL_DEBUG_ON_STREAMING_REPL_BUF)
debugPauseProcess();

View File

@@ -5818,6 +5818,7 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) {
"mem_not_counted_for_evict:%zu\r\n", freeMemoryGetNotCountedMemory(),
"mem_replication_backlog:%zu\r\n", mh->repl_backlog,
"mem_total_replication_buffers:%zu\r\n", server.repl_buffer_mem,
"mem_replica_full_sync_buffer:%zu\r\n", server.repl_full_sync_buffer.mem_used,
"mem_clients_slaves:%zu\r\n", mh->clients_slaves,
"mem_clients_normal:%zu\r\n", mh->clients_normal,
"mem_cluster_links:%zu\r\n", mh->cluster_links,
@@ -6793,6 +6794,15 @@ void dismissMemoryInChild(void) {
dismissMemory(o, o->size);
}
/* Dismiss accumulated repl buffer on replica. */
if (server.repl_full_sync_buffer.blocks) {
listRewind(server.repl_full_sync_buffer.blocks, &li);
while((ln = listNext(&li))) {
replDataBufBlock *o = listNodeValue(ln);
dismissMemory(o, o->size);
}
}
/* Dismiss all clients memory. */
listRewind(server.clients, &li);
while((ln = listNext(&li))) {

View File

@@ -1200,6 +1200,7 @@ typedef struct replDataBufBlock {
* rdb channel replication on replica side. */
typedef struct replDataBuf {
list *blocks; /* List of replDataBufBlock */
size_t mem_used; /* Total allocated memory */
size_t size; /* Total number of bytes available in all blocks. */
size_t used; /* Total number of bytes actually used in all blocks. */
size_t peak; /* Peak number of bytes stored in all blocks. */
@@ -1498,6 +1499,7 @@ struct redisMemOverhead {
size_t total_allocated;
size_t startup_allocated;
size_t repl_backlog;
size_t replica_fullsync_buffer;
size_t clients_slaves;
size_t clients_normal;
size_t cluster_links;

View File

@@ -79,6 +79,7 @@ start_server {tags {"repl external:skip"}} {
$replica1 config set repl-rdb-channel yes
$replica2 config set repl-rdb-channel no
set loglines [count_log_lines 0]
set prev_forks [s 0 total_forks]
$master set x 2
@@ -87,9 +88,10 @@ start_server {tags {"repl external:skip"}} {
$replica1 replicaof $master_host $master_port
$replica2 replicaof $master_host $master_port
set res [wait_for_log_messages 0 {"*Starting BGSAVE* replicas sockets (rdb-channel)*"} 0 2000 10]
set loglines [lindex $res 1]
wait_for_log_messages 0 {"*Starting BGSAVE* replicas sockets*"} $loglines 2000 10
# There will be two forks subsequently, one for rdbchannel
# replica, another for the replica without rdbchannel config.
wait_for_log_messages 0 {"*Starting BGSAVE* replicas sockets (rdb-channel)*"} $loglines 300 100
wait_for_log_messages 0 {"*Starting BGSAVE* replicas sockets"} $loglines 300 100
wait_replica_online $master 0 100 100
wait_replica_online $master 1 100 100
@@ -396,10 +398,10 @@ start_server {tags {"repl external:skip"}} {
populate 20000 master 100 -1
$replica replicaof $master_host $master_port
wait_for_condition 50 200 {
wait_for_condition 100 200 {
[s 0 loading] == 1
} else {
fail "[s 0 loading] sdsdad"
fail "Replica did not start loading"
}
# Generate some traffic for backlog ~2mb
@@ -465,12 +467,11 @@ start_server {tags {"repl external:skip"}} {
fail "Sync did not start"
}
# Wait for both replicas main conns to establish psync
# Verify replicas are connected
wait_for_condition 500 100 {
[s -2 connected_slaves] == 2
} else {
fail "Replicas didn't establish psync:
sync_partial_ok: [s -2 sync_partial_ok]"
fail "Replicas didn't connect: [s -2 connected_slaves]"
}
# kill one of the replicas
@@ -488,6 +489,14 @@ start_server {tags {"repl external:skip"}} {
sync_full:[s -2 sync_full]
connected_slaves: [s -2 connected_slaves]"
}
# Wait until replica catches up
wait_replica_online $master 0 200 100
wait_for_condition 200 100 {
[s 0 mem_replica_full_sync_buffer] == 0
} else {
fail "Replica did not consume buffer in time"
}
}
test "Test master aborts rdb delivery if all replicas are dropped" {
@@ -773,7 +782,6 @@ start_server {tags {"repl external:skip"}} {
# Speed up loading
$replica config set key-load-delay 0
stop_write_load $load_handle
# Wait until replica recovers and becomes online
wait_replica_online $master 0 100 100