diff --git a/src/commands/memory-stats.json b/src/commands/memory-stats.json index 23dac5eb2c..0e95e0f36d 100644 --- a/src/commands/memory-stats.json +++ b/src/commands/memory-stats.json @@ -29,6 +29,9 @@ "replication.backlog": { "type": "integer" }, + "replica.fullsync.buffer": { + "type": "integer" + }, "clients.slaves": { "type": "integer" }, diff --git a/src/object.c b/src/object.c index 5f43904d65..6a03f26a74 100644 --- a/src/object.c +++ b/src/object.c @@ -1209,6 +1209,9 @@ struct redisMemOverhead *getMemoryOverheadData(void) { server.repl_backlog->blocks_index->numnodes * sizeof(raxNode) + raxSize(server.repl_backlog->blocks_index) * sizeof(void*); } + + mh->replica_fullsync_buffer = server.repl_full_sync_buffer.mem_used; + mem_total += mh->replica_fullsync_buffer; mem_total += mh->repl_backlog; mem_total += mh->clients_slaves; @@ -1560,7 +1563,7 @@ NULL } else if (!strcasecmp(c->argv[1]->ptr,"stats") && c->argc == 2) { struct redisMemOverhead *mh = getMemoryOverheadData(); - addReplyMapLen(c,32+mh->num_dbs); + addReplyMapLen(c,33+mh->num_dbs); addReplyBulkCString(c,"peak.allocated"); addReplyLongLong(c,mh->peak_allocated); @@ -1574,6 +1577,9 @@ NULL addReplyBulkCString(c,"replication.backlog"); addReplyLongLong(c,mh->repl_backlog); + addReplyBulkCString(c,"replica.fullsync.buffer"); + addReplyLongLong(c,mh->replica_fullsync_buffer); + addReplyBulkCString(c,"clients.slaves"); addReplyLongLong(c,mh->clients_slaves); diff --git a/src/replication.c b/src/replication.c index 4e030bada3..ca0d2238b3 100644 --- a/src/replication.c +++ b/src/replication.c @@ -2444,6 +2444,14 @@ void readSyncBulkPayload(connection *conn) { /* Send the initial ACK immediately to put this replica in online state. */ if (usemark) replicationSendAck(); + /* Restart the AOF subsystem now that we finished the sync. This + * will trigger an AOF rewrite, and when done will start appending + * to the new file. */ + if (server.aof_enabled) { + serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Starting AOF after a successful sync"); + startAppendOnlyWithRetry(); + } + if (rdbchannel) { int close_asap; @@ -2465,13 +2473,6 @@ void readSyncBulkPayload(connection *conn) { freeClientAsync(server.master); } - /* Restart the AOF subsystem now that we finished the sync. This - * will trigger an AOF rewrite, and when done will start appending - * to the new file. */ - if (server.aof_enabled) { - serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Starting AOF after a successful sync"); - startAppendOnlyWithRetry(); - } return; error: @@ -3671,6 +3672,7 @@ static void rdbChannelReplDataBufInit(void) { serverAssert(server.repl_full_sync_buffer.blocks == NULL); server.repl_full_sync_buffer.size = 0; server.repl_full_sync_buffer.used = 0; + server.repl_full_sync_buffer.mem_used = 0; server.repl_full_sync_buffer.blocks = listCreate(); server.repl_full_sync_buffer.blocks->free = zfree; } @@ -3682,6 +3684,7 @@ static void rdbChannelReplDataBufFree(void) { server.repl_full_sync_buffer.blocks = NULL; server.repl_full_sync_buffer.size = 0; server.repl_full_sync_buffer.used = 0; + server.repl_full_sync_buffer.mem_used = 0; } /* Replication: Replica side. @@ -3752,6 +3755,7 @@ void rdbChannelBufferReplData(connection *conn) { listAddNodeTail(server.repl_full_sync_buffer.blocks, tail); server.repl_full_sync_buffer.size += tail->size; + server.repl_full_sync_buffer.mem_used += usable_size + sizeof(listNode); /* Update buffer's peak */ if (server.repl_full_sync_buffer.peak < server.repl_full_sync_buffer.size) @@ -3791,7 +3795,8 @@ int rdbChannelStreamReplDataToDb(client *c) { server.repl_full_sync_buffer.used -= used; server.repl_full_sync_buffer.size -= size; - + server.repl_full_sync_buffer.mem_used -= (size + sizeof(listNode) + + sizeof(replDataBufBlock)); if (server.repl_debug_pause & REPL_DEBUG_ON_STREAMING_REPL_BUF) debugPauseProcess(); diff --git a/src/server.c b/src/server.c index 266da7a692..932752ba82 100644 --- a/src/server.c +++ b/src/server.c @@ -5818,6 +5818,7 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) { "mem_not_counted_for_evict:%zu\r\n", freeMemoryGetNotCountedMemory(), "mem_replication_backlog:%zu\r\n", mh->repl_backlog, "mem_total_replication_buffers:%zu\r\n", server.repl_buffer_mem, + "mem_replica_full_sync_buffer:%zu\r\n", server.repl_full_sync_buffer.mem_used, "mem_clients_slaves:%zu\r\n", mh->clients_slaves, "mem_clients_normal:%zu\r\n", mh->clients_normal, "mem_cluster_links:%zu\r\n", mh->cluster_links, @@ -6793,6 +6794,15 @@ void dismissMemoryInChild(void) { dismissMemory(o, o->size); } + /* Dismiss accumulated repl buffer on replica. */ + if (server.repl_full_sync_buffer.blocks) { + listRewind(server.repl_full_sync_buffer.blocks, &li); + while((ln = listNext(&li))) { + replDataBufBlock *o = listNodeValue(ln); + dismissMemory(o, o->size); + } + } + /* Dismiss all clients memory. */ listRewind(server.clients, &li); while((ln = listNext(&li))) { diff --git a/src/server.h b/src/server.h index 5bc8b3f4d2..2673978699 100644 --- a/src/server.h +++ b/src/server.h @@ -1200,6 +1200,7 @@ typedef struct replDataBufBlock { * rdb channel replication on replica side. */ typedef struct replDataBuf { list *blocks; /* List of replDataBufBlock */ + size_t mem_used; /* Total allocated memory */ size_t size; /* Total number of bytes available in all blocks. */ size_t used; /* Total number of bytes actually used in all blocks. */ size_t peak; /* Peak number of bytes stored in all blocks. */ @@ -1498,6 +1499,7 @@ struct redisMemOverhead { size_t total_allocated; size_t startup_allocated; size_t repl_backlog; + size_t replica_fullsync_buffer; size_t clients_slaves; size_t clients_normal; size_t cluster_links; diff --git a/tests/integration/replication-rdbchannel.tcl b/tests/integration/replication-rdbchannel.tcl index 212bfbf050..844339ec03 100644 --- a/tests/integration/replication-rdbchannel.tcl +++ b/tests/integration/replication-rdbchannel.tcl @@ -79,6 +79,7 @@ start_server {tags {"repl external:skip"}} { $replica1 config set repl-rdb-channel yes $replica2 config set repl-rdb-channel no + set loglines [count_log_lines 0] set prev_forks [s 0 total_forks] $master set x 2 @@ -87,9 +88,10 @@ start_server {tags {"repl external:skip"}} { $replica1 replicaof $master_host $master_port $replica2 replicaof $master_host $master_port - set res [wait_for_log_messages 0 {"*Starting BGSAVE* replicas sockets (rdb-channel)*"} 0 2000 10] - set loglines [lindex $res 1] - wait_for_log_messages 0 {"*Starting BGSAVE* replicas sockets*"} $loglines 2000 10 + # There will be two forks subsequently, one for rdbchannel + # replica, another for the replica without rdbchannel config. + wait_for_log_messages 0 {"*Starting BGSAVE* replicas sockets (rdb-channel)*"} $loglines 300 100 + wait_for_log_messages 0 {"*Starting BGSAVE* replicas sockets"} $loglines 300 100 wait_replica_online $master 0 100 100 wait_replica_online $master 1 100 100 @@ -396,10 +398,10 @@ start_server {tags {"repl external:skip"}} { populate 20000 master 100 -1 $replica replicaof $master_host $master_port - wait_for_condition 50 200 { + wait_for_condition 100 200 { [s 0 loading] == 1 } else { - fail "[s 0 loading] sdsdad" + fail "Replica did not start loading" } # Generate some traffic for backlog ~2mb @@ -465,12 +467,11 @@ start_server {tags {"repl external:skip"}} { fail "Sync did not start" } - # Wait for both replicas main conns to establish psync + # Verify replicas are connected wait_for_condition 500 100 { [s -2 connected_slaves] == 2 } else { - fail "Replicas didn't establish psync: - sync_partial_ok: [s -2 sync_partial_ok]" + fail "Replicas didn't connect: [s -2 connected_slaves]" } # kill one of the replicas @@ -488,6 +489,14 @@ start_server {tags {"repl external:skip"}} { sync_full:[s -2 sync_full] connected_slaves: [s -2 connected_slaves]" } + + # Wait until replica catches up + wait_replica_online $master 0 200 100 + wait_for_condition 200 100 { + [s 0 mem_replica_full_sync_buffer] == 0 + } else { + fail "Replica did not consume buffer in time" + } } test "Test master aborts rdb delivery if all replicas are dropped" { @@ -773,7 +782,6 @@ start_server {tags {"repl external:skip"}} { # Speed up loading $replica config set key-load-delay 0 - stop_write_load $load_handle # Wait until replica recovers and becomes online wait_replica_online $master 0 100 100