From fd135f3e2d3adfdec31980ee69be8f783fc8ddce Mon Sep 17 00:00:00 2001 From: Huang Zhw Date: Thu, 7 Oct 2021 20:13:42 +0800 Subject: [PATCH] Make tracking invalidation messages always after command's reply (#9422) Tracking invalidation messages were sometimes sent in inconsistent order, before the command's reply rather than after. In addition to that, they were sometimes embedded inside other commands responses, like MULTI-EXEC and MGET. --- src/blocked.c | 16 +++++++ src/db.c | 2 +- src/server.c | 28 ++++++++++++ src/server.h | 8 +++- src/tracking.c | 45 ++++++++++++++----- tests/unit/tracking.tcl | 95 +++++++++++++++++++++++++++++++++++++++++ 6 files changed, 182 insertions(+), 12 deletions(-) diff --git a/src/blocked.c b/src/blocked.c index 67fd3fdca1..8a2e414635 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -295,6 +295,8 @@ void serveClientsBlockedOnListKey(robj *o, readyList *rl) { * call. */ if (dstkey) incrRefCount(dstkey); + client *old_client = server.current_client; + server.current_client = receiver; monotime replyTimer; elapsedStart(&replyTimer); serveClientBlockedOnList(receiver, o, @@ -303,6 +305,8 @@ void serveClientsBlockedOnListKey(robj *o, readyList *rl) { &deleted); updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer)); unblockClient(receiver); + afterCommand(receiver); + server.current_client = old_client; if (dstkey) decrRefCount(dstkey); @@ -343,11 +347,15 @@ void serveClientsBlockedOnSortedSetKey(robj *o, readyList *rl) { ? 1 : 0; int reply_nil_when_empty = use_nested_array; + client *old_client = server.current_client; + server.current_client = receiver; monotime replyTimer; elapsedStart(&replyTimer); genericZpopCommand(receiver, &rl->key, 1, where, 1, count, use_nested_array, reply_nil_when_empty, &deleted); updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer)); unblockClient(receiver); + afterCommand(receiver); + server.current_client = old_client; /* Replicate the command. */ int argc = 2; @@ -442,6 +450,8 @@ void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) { } } + client *old_client = server.current_client; + server.current_client = receiver; monotime replyTimer; elapsedStart(&replyTimer); /* Emit the two elements sub-array consisting of @@ -470,6 +480,8 @@ void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) { * valid, so we must do the setup above before * this call. */ unblockClient(receiver); + afterCommand(receiver); + server.current_client = old_client; } } } @@ -514,12 +526,16 @@ void serveClientsBlockedOnKeyByModule(readyList *rl) { * different modules with different triggers to consider if a key * is ready or not. This means we can't exit the loop but need * to continue after the first failure. */ + client *old_client = server.current_client; + server.current_client = receiver; monotime replyTimer; elapsedStart(&replyTimer); if (!moduleTryServeClientBlockedOnKey(receiver, rl->key)) continue; updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer)); moduleUnblockClient(receiver); + afterCommand(receiver); + server.current_client = old_client; } } } diff --git a/src/db.c b/src/db.c index 31c2f914bc..23cf220dbd 100644 --- a/src/db.c +++ b/src/db.c @@ -570,7 +570,7 @@ long long dbTotalServerKeyCount() { * a context of a client. */ void signalModifiedKey(client *c, redisDb *db, robj *key) { touchWatchedKey(db,key); - trackingInvalidateKey(c,key); + trackingInvalidateKey(c,key,1); } void signalFlushedDb(int dbid, int async) { diff --git a/src/server.c b/src/server.c index e6b185a767..c183a0b972 100644 --- a/src/server.c +++ b/src/server.c @@ -2933,6 +2933,11 @@ void beforeSleep(struct aeEventLoop *eventLoop) { * our clients. */ updateFailoverStatus(); + /* Since we rely on current_client to send scheduled invalidation messages + * we have to flush them after each command, so when we get here, the list + * must be empty. */ + serverAssert(listLength(server.tracking_pending_keys) == 0); + /* Send the invalidation messages to clients participating to the * client side caching protocol in broadcasting (BCAST) mode. */ trackingBroadcastInvalidationMessages(); @@ -3663,6 +3668,7 @@ void initServer(void) { server.current_client = NULL; server.errors = raxNew(); server.fixed_time_expire = 0; + server.in_nested_call = 0; server.clients = listCreate(); server.clients_index = raxNew(); server.clients_to_close = listCreate(); @@ -3675,6 +3681,7 @@ void initServer(void) { server.slaveseldb = -1; /* Force to emit the first SELECT command. */ server.unblocked_clients = listCreate(); server.ready_keys = listCreate(); + server.tracking_pending_keys = listCreate(); server.clients_waiting_acks = listCreate(); server.get_ack_from_slaves = 0; server.client_pause_type = CLIENT_PAUSE_OFF; @@ -4322,6 +4329,7 @@ void call(client *c, int flags) { if (server.fixed_time_expire++ == 0) { updateCachedTime(0); } + server.in_nested_call++; elapsedStart(&call_timer); c->cmd->proc(c); @@ -4330,6 +4338,8 @@ void call(client *c, int flags) { dirty = server.dirty-dirty; if (dirty < 0) dirty = 0; + server.in_nested_call--; + /* Update failed command calls if required. * We leverage a static variable (prev_err_count) to retain * the counter across nested function calls and avoid logging @@ -4504,6 +4514,9 @@ void call(client *c, int flags) { size_t zmalloc_used = zmalloc_used_memory(); if (zmalloc_used > server.stat_peak_memory) server.stat_peak_memory = zmalloc_used; + + /* Do some maintenance job and cleanup */ + afterCommand(c); } /* Used when a command that is ready for execution needs to be rejected, due to @@ -4541,6 +4554,14 @@ void rejectCommandFormat(client *c, const char *fmt, ...) { } } +/* This is called after a command in call, we can do some maintenance job in it. */ +void afterCommand(client *c) { + UNUSED(c); + /* Flush pending invalidation messages only when we are not in nested call. + * So the messages are not interleaved with transaction response. */ + if (!server.in_nested_call) trackingHandlePendingKeyInvalidations(); +} + /* Returns 1 for commands that may have key names in their arguments, but the legacy range * spec doesn't cover all of them. */ void populateCommandMovableKeys(struct redisCommand *cmd) { @@ -4713,6 +4734,13 @@ int processCommand(client *c) { * propagation of DELs due to eviction. */ if (server.maxmemory && !server.lua_timedout) { int out_of_memory = (performEvictions() == EVICT_FAIL); + + /* performEvictions may evict keys, so we need flush pending tracking + * invalidation keys. If we don't do this, we may get an invalidation + * message after we perform operation on the key, where in fact this + * message belongs to the old value of the key before it gets evicted.*/ + trackingHandlePendingKeyInvalidations(); + /* performEvictions may flush slave output buffers. This may result * in a slave, that may be the active client, to be freed. */ if (server.current_client == NULL) return C_ERR; diff --git a/src/server.h b/src/server.h index d57e8e16f3..054ef03982 100644 --- a/src/server.h +++ b/src/server.h @@ -1314,6 +1314,7 @@ struct redisServer { rax *clients_timeout_table; /* Radix tree for blocked clients timeouts. */ long fixed_time_expire; /* If > 0, expire keys against server.mstime. */ + int in_nested_call; /* If > 0, in a nested call of a call */ rax *clients_index; /* Active clients dictionary by client ID. */ pause_type client_pause_type; /* True if clients are currently paused */ list *paused_clients; /* List of pause clients */ @@ -1598,6 +1599,7 @@ struct redisServer { /* Client side caching. */ unsigned int tracking_clients; /* # of clients with tracking enabled.*/ size_t tracking_table_max_keys; /* Max number of keys in tracking table. */ + list *tracking_pending_keys; /* tracking invalidation keys pending to flush */ /* Sort parameters - qsort_r() is only available under BSD so we * have to take this state global, in order to pass it to sortCompare() */ int sort_desc; @@ -2108,7 +2110,9 @@ void addReplyStatusFormat(client *c, const char *fmt, ...); void enableTracking(client *c, uint64_t redirect_to, uint64_t options, robj **prefix, size_t numprefix); void disableTracking(client *c); void trackingRememberKeys(client *c); -void trackingInvalidateKey(client *c, robj *keyobj); +void trackingInvalidateKey(client *c, robj *keyobj, int bcast); +void trackingScheduleKeyInvalidation(uint64_t client_id, robj *keyobj); +void trackingHandlePendingKeyInvalidations(void); void trackingInvalidateKeysOnFlush(int async); void freeTrackingRadixTree(rax *rt); void freeTrackingRadixTreeAsync(rax *rt); @@ -2432,6 +2436,8 @@ void preventCommandAOF(client *c); void preventCommandReplication(client *c); void slowlogPushCurrentCommand(client *c, struct redisCommand *cmd, ustime_t duration); int prepareForShutdown(int flags); +void afterCommand(client *c); +int inNestedCall(void); #ifdef __GNUC__ void _serverLog(int level, const char *fmt, ...) __attribute__((format(printf, 2, 3))); diff --git a/src/tracking.c b/src/tracking.c index 1e84cc3c10..11e2587e24 100644 --- a/src/tracking.c +++ b/src/tracking.c @@ -348,13 +348,16 @@ void trackingRememberKeyToBroadcast(client *c, char *keyname, size_t keylen) { * of memory pressure: in that case the key didn't really change, so we want * just to notify the clients that are in the table for this key, that would * otherwise miss the fact we are no longer tracking the key for them. */ -void trackingInvalidateKeyRaw(client *c, char *key, size_t keylen, int bcast) { +void trackingInvalidateKey(client *c, robj *keyobj, int bcast) { if (TrackingTable == NULL) return; - if (bcast && raxSize(PrefixTable) > 0) - trackingRememberKeyToBroadcast(c,key,keylen); + unsigned char *key = (unsigned char*)keyobj->ptr; + size_t keylen = sdslen(keyobj->ptr); - rax *ids = raxFind(TrackingTable,(unsigned char*)key,keylen); + if (bcast && raxSize(PrefixTable) > 0) + trackingRememberKeyToBroadcast(c,(char *)key,keylen); + + rax *ids = raxFind(TrackingTable,key,keylen); if (ids == raxNotFound) return; raxIterator ri; @@ -384,7 +387,15 @@ void trackingInvalidateKeyRaw(client *c, char *key, size_t keylen, int bcast) { continue; } - sendTrackingMessage(target,key,keylen,0); + /* If target is current client, we need schedule key invalidation. + * As the invalidation messages may be interleaved with command + * response and should after command response */ + if (target == server.current_client){ + incrRefCount(keyobj); + listAddNodeTail(server.tracking_pending_keys, keyobj); + } else { + sendTrackingMessage(target,(char *)keyobj->ptr,sdslen(keyobj->ptr),0); + } } raxStop(&ri); @@ -395,10 +406,22 @@ void trackingInvalidateKeyRaw(client *c, char *key, size_t keylen, int bcast) { raxRemove(TrackingTable,(unsigned char*)key,keylen,NULL); } -/* Wrapper (the one actually called across the core) to pass the key - * as object. */ -void trackingInvalidateKey(client *c, robj *keyobj) { - trackingInvalidateKeyRaw(c,keyobj->ptr,sdslen(keyobj->ptr),1); +void trackingHandlePendingKeyInvalidations() { + if (!listLength(server.tracking_pending_keys)) return; + + listNode *ln; + listIter li; + + listRewind(server.tracking_pending_keys,&li); + while ((ln = listNext(&li)) != NULL) { + robj *key = listNodeValue(ln); + /* current_client maybe freed, so we need to send invalidation + * message only when current_client is still alive */ + if (server.current_client != NULL) + sendTrackingMessage(server.current_client,(char *)key->ptr,sdslen(key->ptr),0); + decrRefCount(key); + } + listEmpty(server.tracking_pending_keys); } /* This function is called when one or all the Redis databases are @@ -475,7 +498,9 @@ void trackingLimitUsedSlots(void) { raxSeek(&ri,"^",NULL,0); raxRandomWalk(&ri,0); if (raxEOF(&ri)) break; - trackingInvalidateKeyRaw(NULL,(char*)ri.key,ri.key_len,0); + robj *keyobj = createStringObject((char*)ri.key,ri.key_len); + trackingInvalidateKey(NULL,keyobj,0); + decrRefCount(keyobj); if (raxSize(TrackingTable) <= max_keys) { timeout_counter = 0; raxStop(&ri); diff --git a/tests/unit/tracking.tcl b/tests/unit/tracking.tcl index 8e51a3e57a..c0909cc1d8 100644 --- a/tests/unit/tracking.tcl +++ b/tests/unit/tracking.tcl @@ -369,6 +369,101 @@ start_server {tags {"tracking network"}} { $r CLIENT TRACKING OFF } + test {hdel deliver invlidate message after response in the same connection} { + r CLIENT TRACKING off + r HELLO 3 + r CLIENT TRACKING on + r HSET myhash f 1 + r HGET myhash f + set res [r HDEL myhash f] + assert_equal $res 1 + set res [r read] + assert_equal $res {invalidate myhash} + } + + test {Tracking invalidation message is not interleaved with multiple keys response} { + r CLIENT TRACKING off + r HELLO 3 + r CLIENT TRACKING on + # We need disable active expire, so we can trigger lazy expire + r DEBUG SET-ACTIVE-EXPIRE 0 + r MULTI + r MSET x{t} 1 y{t} 2 + r PEXPIRE y{t} 100 + r GET y{t} + r EXEC + after 110 + # Read expired key y{t}, generate invalidate message about this key + set res [r MGET x{t} y{t}] + assert_equal $res {1 {}} + # Consume the invalidate message which is after command response + set res [r read] + assert_equal $res {invalidate y{t}} + r DEBUG SET-ACTIVE-EXPIRE 1 + } {OK} {needs:debug} + + test {Tracking invalidation message is not interleaved with transaction response} { + r CLIENT TRACKING off + r HELLO 3 + r CLIENT TRACKING on + r MSET a{t} 1 b{t} 2 + r GET a{t} + # Start a transaction, make a{t} generate an invalidate message + r MULTI + r INCR a{t} + r GET b{t} + set res [r EXEC] + assert_equal $res {2 2} + set res [r read] + # Consume the invalidate message which is after command response + assert_equal $res {invalidate a{t}} + } + + test {Tracking invalidation message of eviction keys should be before response} { + # Get the current memory limit and calculate a new limit. + r CLIENT TRACKING off + r HELLO 3 + r CLIENT TRACKING on + set used [s used_memory] + set limit [expr {$used+100*1024}] + set old_policy [lindex [r config get maxmemory-policy] 1] + r config set maxmemory $limit + # We set policy volatile-random, so only keys with ttl will be evicted + r config set maxmemory-policy volatile-random + # Add a volatile key and tracking it. + r setex volatile-key 10000 x + r get volatile-key + # We use SETBIT here, so we can set a big key and get the used_memory + # bigger than maxmemory. Next command will evict volatile keys. We + # can't use SET, as SET uses big input buffer, so it will fail. + r setbit big-key 1000000 0 + # volatile-key is evicted before response. + set res [r getbit big-key 0] + assert_equal $res {invalidate volatile-key} + set res [r read] + assert_equal $res 0 + r config set maxmemory-policy $old_policy + r config set maxmemory 0 + } + + test {Unblocked BLMOVE gets notification after response} { + r RPUSH list2{t} a + $rd HELLO 3 + $rd read + $rd CLIENT TRACKING on + $rd read + # Tracking key list2{t} + $rd LRANGE list2{t} 0 -1 + $rd read + # We block on list1{t} + $rd BLMOVE list1{t} list2{t} left left 0 + wait_for_blocked_clients_count 1 + # unblock $rd, list2{t} gets element and generate invalidation message + r rpush list1{t} foo + assert_equal [$rd read] {foo} + assert_equal [$rd read] {invalidate list2{t}} + } + test {Tracking gets notification on tracking table key eviction} { r CLIENT TRACKING off r CLIENT TRACKING on REDIRECT $redir_id NOLOOP