diff --git a/src/networking.c b/src/networking.c index 876c788bbd..c301009003 100644 --- a/src/networking.c +++ b/src/networking.c @@ -201,7 +201,7 @@ client *createClient(connection *conn) { c->pending_read_list_node = NULL; c->client_tracking_redirection = 0; c->client_tracking_prefixes = NULL; - c->last_memory_usage = c->last_memory_usage_on_bucket_update = 0; + c->last_memory_usage = 0; c->last_memory_type = CLIENT_TYPE_NORMAL; c->auth_callback = NULL; c->auth_callback_privdata = NULL; @@ -1954,7 +1954,11 @@ int writeToClient(client *c, int handler_installed) { return C_ERR; } } - updateClientMemUsage(c); + /* Update client's memory usage after writing. + * Since this isn't thread safe we do this conditionally. In case of threaded writes this is done in + * handleClientsWithPendingWritesUsingThreads(). */ + if (io_threads_op == IO_THREADS_OP_IDLE) + updateClientMemUsage(c); return C_OK; } @@ -2519,7 +2523,8 @@ int processInputBuffer(client *c) { /* Update client memory usage after processing the query buffer, this is * important in case the query buffer is big and wasn't drained during * the above loop (because of partially sent big commands). */ - updateClientMemUsage(c); + if (io_threads_op == IO_THREADS_OP_IDLE) + updateClientMemUsage(c); return C_OK; } @@ -4167,8 +4172,8 @@ int handleClientsWithPendingWritesUsingThreads(void) { while((ln = listNext(&li))) { client *c = listNodeValue(ln); - /* Update the client in the mem usage buckets after we're done processing it in the io-threads */ - updateClientMemUsageBucket(c); + /* Update the client in the mem usage after we're done processing it in the io-threads */ + updateClientMemUsage(c); /* Install the write handler if there are pending writes in some * of the clients. */ @@ -4276,8 +4281,8 @@ int handleClientsWithPendingReadsUsingThreads(void) { continue; } - /* Once io-threads are idle we can update the client in the mem usage buckets */ - updateClientMemUsageBucket(c); + /* Once io-threads are idle we can update the client in the mem usage */ + updateClientMemUsage(c); if (processPendingCommandsAndResetClient(c) == C_ERR) { /* If the client is no longer valid, we avoid diff --git a/src/server.c b/src/server.c index d064433a03..3bf04a7706 100644 --- a/src/server.c +++ b/src/server.c @@ -814,7 +814,7 @@ int clientsCronTrackExpansiveClients(client *c, int time_idx) { * client's memory usage doubles it's moved up to the next bucket, if it's * halved we move it down a bucket. * For more details see CLIENT_MEM_USAGE_BUCKETS documentation in server.h. */ -clientMemUsageBucket *getMemUsageBucket(size_t mem) { +static inline clientMemUsageBucket *getMemUsageBucket(size_t mem) { int size_in_bits = 8*(int)sizeof(mem); int clz = mem > 0 ? __builtin_clzl(mem) : size_in_bits; int bucket_idx = size_in_bits - clz; @@ -831,46 +831,34 @@ clientMemUsageBucket *getMemUsageBucket(size_t mem) { * and also from the clientsCron. We call it from the cron so we have updated * stats for non CLIENT_TYPE_NORMAL/PUBSUB clients and in case a configuration * change requires us to evict a non-active client. + * + * This also adds the client to the correct memory usage bucket. Each bucket contains + * all clients with roughly the same amount of memory. This way we group + * together clients consuming about the same amount of memory and can quickly + * free them in case we reach maxmemory-clients (client eviction). */ int updateClientMemUsage(client *c) { + serverAssert(io_threads_op == IO_THREADS_OP_IDLE); size_t mem = getClientMemoryUsage(c, NULL); int type = getClientType(c); /* Remove the old value of the memory used by the client from the old * category, and add it back. */ - atomicDecr(server.stat_clients_type_memory[c->last_memory_type], c->last_memory_usage); - atomicIncr(server.stat_clients_type_memory[type], mem); + if (type != c->last_memory_type) { + server.stat_clients_type_memory[c->last_memory_type] -= c->last_memory_usage; + server.stat_clients_type_memory[type] += mem; + c->last_memory_type = type; + } else { + server.stat_clients_type_memory[type] += mem - c->last_memory_usage; + } - /* Remember what we added and where, to remove it next time. */ - c->last_memory_usage = mem; - c->last_memory_type = type; - - /* Update client mem usage bucket only when we're not in the context of an - * IO thread. See updateClientMemUsageBucket() for details. */ - if (io_threads_op == IO_THREADS_OP_IDLE) - updateClientMemUsageBucket(c); - - return 0; -} - -/* Adds the client to the correct memory usage bucket. Each bucket contains - * all clients with roughly the same amount of memory. This way we group - * together clients consuming about the same amount of memory and can quickly - * free them in case we reach maxmemory-clients (client eviction). - * Note that in case of io-threads enabled we have to call this function only - * after the fan-in phase (when no io-threads are working) because the bucket - * lists are global. The io-threads themselves track per-client memory usage in - * updateClientMemUsage(). Here we update the clients to each bucket when all - * io-threads are done (both for read and write io-threading). */ -void updateClientMemUsageBucket(client *c) { - serverAssert(io_threads_op == IO_THREADS_OP_IDLE); int allow_eviction = - (c->last_memory_type == CLIENT_TYPE_NORMAL || c->last_memory_type == CLIENT_TYPE_PUBSUB) && + (type == CLIENT_TYPE_NORMAL || type == CLIENT_TYPE_PUBSUB) && !(c->flags & CLIENT_NO_EVICT); /* Update the client in the mem usage buckets */ if (c->mem_usage_bucket) { - c->mem_usage_bucket->mem_usage_sum -= c->last_memory_usage_on_bucket_update; + c->mem_usage_bucket->mem_usage_sum -= c->last_memory_usage; /* If this client can't be evicted then remove it from the mem usage * buckets */ if (!allow_eviction) { @@ -880,8 +868,8 @@ void updateClientMemUsageBucket(client *c) { } } if (allow_eviction) { - clientMemUsageBucket *bucket = getMemUsageBucket(c->last_memory_usage); - bucket->mem_usage_sum += c->last_memory_usage; + clientMemUsageBucket *bucket = getMemUsageBucket(mem); + bucket->mem_usage_sum += mem; if (bucket != c->mem_usage_bucket) { if (c->mem_usage_bucket) listDelNode(c->mem_usage_bucket->clients, @@ -892,7 +880,10 @@ void updateClientMemUsageBucket(client *c) { } } - c->last_memory_usage_on_bucket_update = c->last_memory_usage; + /* Remember what we added, to remove it next time. */ + c->last_memory_usage = mem; + + return 0; } /* Return the max samples in the memory usage of clients tracked by diff --git a/src/server.h b/src/server.h index 26c10170a8..78afe6ccde 100644 --- a/src/server.h +++ b/src/server.h @@ -1172,7 +1172,6 @@ typedef struct client { size_t last_memory_usage; int last_memory_type; - size_t last_memory_usage_on_bucket_update; listNode *mem_usage_bucket_node; clientMemUsageBucket *mem_usage_bucket; @@ -1583,8 +1582,8 @@ struct redisServer { size_t stat_aof_cow_bytes; /* Copy on write bytes during AOF rewrite. */ size_t stat_module_cow_bytes; /* Copy on write bytes during module fork. */ double stat_module_progress; /* Module save progress. */ - redisAtomic size_t stat_clients_type_memory[CLIENT_TYPE_COUNT];/* Mem usage by type */ - size_t stat_cluster_links_memory;/* Mem usage by cluster links */ + size_t stat_clients_type_memory[CLIENT_TYPE_COUNT];/* Mem usage by type */ + size_t stat_cluster_links_memory; /* Mem usage by cluster links */ long long stat_unexpected_error_replies; /* Number of unexpected (aof-loading, replica to master, etc.) error replies */ long long stat_total_error_replies; /* Total number of issued error replies ( command + rejected errors ) */ long long stat_dump_payload_sanitizations; /* Number deep dump payloads integrity validations. */