From 725cd268e652cd3c9f0047c3cfe23a911e565d8a Mon Sep 17 00:00:00 2001 From: "debing.sun" Date: Thu, 20 Feb 2025 00:05:24 +0800 Subject: [PATCH] Refactor of ActiveDefrag to reduce latencies (#13814) This PR is based on: https://github.com/valkey-io/valkey/pull/1462 ## Issue/Problems Duty Cycle: Active Defrag has configuration values which determine the intended percentage of CPU to be used based on a gradient of the fragmentation percentage. However, Active Defrag performs its work on the 100ms serverCron timer. It then computes a duty cycle and performs a single long cycle. For example, if the intended CPU is computed to be 10%, Active Defrag will perform 10ms of work on this 100ms timer cron. * This type of cycle introduces large latencies on the client (up to 25ms with default configurations) * This mechanism is subject to starvation when slow commands delay the serverCron Maintainability: The current Active Defrag code is difficult to read & maintain. Refactoring of the high level control mechanisms and functions will allow us to more seamlessly adapt to new defragmentation needs. Specific examples include: * A single function (activeDefragCycle) includes the logic to start/stop/modify the defragmentation as well as performing one "step" of the defragmentation. This should be separated out, so that the actual defrag activity can be performed on an independent timer (see duty cycle above). * The code is focused on kvstores, with other actions just thrown in at the end (defragOtherGlobals). There's no mechanism to break this up to reduce latencies. * For the main dictionary (only), there is a mechanism to set aside large keys to be processed in a later step. However this code creates a separate list in each kvstore (main dict or not), bleeding/exposing internal defrag logic. We only need 1 list - inside defrag. This logic should be more contained for the main key store. * The structure is not well suited towards other non-main-dictionary items. For example, pub-sub and pub-sub-shard was added, but it's added in such a way that in CMD mode, with multiple DBs, we will defrag pub-sub repeatedly after each DB. ## Description of the feature Primarily, this feature will split activeDefragCycle into 2 functions. 1. One function will be called from serverCron to determine if a defrag cycle (a complete scan) needs to be started. It will also determine if the CPU expenditure needs to be adjusted. 2. The 2nd function will be a timer proc dedicated to performing defrag. This will be invoked independently from serverCron. Once the functions are split, there is more control over the latency created by the defrag process. A new configuration will be used to determine the running time for the defrag timer proc. The default for this will be 500us (one-half of the current minimum time). Then the timer will be adjusted to achieve the desired CPU. As an example, 5% of CPU will run the defrag process for 500us every 10ms. This is much better than running for 5ms every 100ms. The timer function will also adjust to compensate for starvation. If a slow command delays the timer, the process will run proportionately longer to ensure that the configured CPU is achieved. Given the presence of slow commands, the proportional extra time is insignificant to latency. This also addresses the overload case. At 100% CPU, if the event loop slows, defrag will run proportionately longer to achieve the configured CPU utilization. Optionally, in low CPU situations, there would be little impact in utilizing more than the configured CPU. We could optionally allow the timer to pop more often (even with a 0ms delay) and the (tail) latency impact would not change. And we add a time limit for the defrag duty cycle to prevent excessive latency. When latency is already high (indicated by a long time between calls), we don't want to make it worse by running defrag for too long. Addressing maintainability: * The basic code structure can more clearly be organized around a "cycle". * Have clear begin/end functions and a set of "stages" to be executed. * Rather than stages being limited to "kvstore" type data, a cycle should be more flexible, incorporating the ability to incrementally perform arbitrary work. This will likely be necessary in the future for certain module types. It can be used today to address oddballs like defragOtherGlobals. * We reduced some of the globals, and reduce some of the coupling. defrag_later should be removed from serverDb. * Each stage should begin on a fresh cycle. So if there are non-time-bounded operations like kvstoreDictLUTDefrag, these would be less likely to introduce additional latency. Signed-off-by: Jim Brunner [brunnerj@amazon.com](mailto:brunnerj@amazon.com) Signed-off-by: Madelyn Olson [madelyneolson@gmail.com](mailto:madelyneolson@gmail.com) Co-authored-by: Madelyn Olson [madelyneolson@gmail.com](mailto:madelyneolson@gmail.com) --------- Signed-off-by: Jim Brunner brunnerj@amazon.com Signed-off-by: Madelyn Olson madelyneolson@gmail.com Co-authored-by: Madelyn Olson madelyneolson@gmail.com Co-authored-by: ShooterIT --- src/defrag.c | 963 ++++++++++++++++++++++------------- src/kvstore.c | 23 +- src/kvstore.h | 15 +- src/module.c | 11 +- src/server.c | 22 +- src/server.h | 4 +- tests/unit/memefficiency.tcl | 65 ++- 7 files changed, 686 insertions(+), 417 deletions(-) diff --git a/src/defrag.c b/src/defrag.c index f25e102d51..4766e16370 100644 --- a/src/defrag.c +++ b/src/defrag.c @@ -8,8 +8,13 @@ * Copyright (c) 2020-Present, Redis Ltd. * All rights reserved. * + * Copyright (c) 2024-present, Valkey contributors. + * All rights reserved. + * * Licensed under your choice of the Redis Source Available License 2.0 * (RSALv2) or the Server Side Public License v1 (SSPLv1). + * + * Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information. */ #include "server.h" @@ -18,15 +23,106 @@ #ifdef HAVE_DEFRAG -typedef struct defragCtx { - void *privdata; - int slot; -} defragCtx; +#define DEFRAG_CYCLE_US 500 /* Standard duration of defrag cycle (in microseconds) */ -typedef struct defragPubSubCtx { - kvstore *pubsub_channels; - dict *(*clientPubSubChannels)(client*); +typedef enum { DEFRAG_NOT_DONE = 0, + DEFRAG_DONE = 1 } doneStatus; + +/* + * Defragmentation is performed in stages. Each stage is serviced by a stage function + * (defragStageFn). The stage function is passed a context (void*) to defrag. The contents of that + * context are unique to the particular stage - and may even be NULL for some stage functions. The + * same stage function can be used multiple times (for different stages) each having a different + * context. + * + * Parameters: + * endtime - This is the monotonic time that the function should end and return. This ensures + * a bounded latency due to defrag. + * ctx - A pointer to context which is unique to the stage function. + * + * Returns: + * - DEFRAG_DONE if the stage is complete + * - DEFRAG_NOT_DONE if there is more work to do + */ +typedef doneStatus (*defragStageFn)(void *ctx, monotime endtime); + +/* Function pointer type for freeing context in defragmentation stages. */ +typedef void (*defragStageContextFreeFn)(void *ctx); +typedef struct { + defragStageFn stage_fn; /* The function to be invoked for the stage */ + defragStageContextFreeFn ctx_free_fn; /* Function to free the context */ + void *ctx; /* Context, unique to the stage function */ +} StageDescriptor; + +/* Globals needed for the main defrag processing logic. + * Doesn't include variables specific to a stage or type of data. */ +struct DefragContext { + monotime start_cycle; /* Time of beginning of defrag cycle */ + long long start_defrag_hits; /* server.stat_active_defrag_hits captured at beginning of cycle */ + long long start_defrag_misses; /* server.stat_active_defrag_misses captured at beginning of cycle */ + float start_frag_pct; /* Fragmention percent of beginning of defrag cycle */ + float decay_rate; /* Defrag speed decay rate */ + + list *remaining_stages; /* List of stages which remain to be processed */ + listNode *current_stage; /* The list node of stage that's currently being processed */ + + long long timeproc_id; /* Eventloop ID of the timerproc (or AE_DELETED_EVENT_ID) */ + monotime timeproc_end_time; /* Ending time of previous timerproc execution */ + long timeproc_overage_us; /* A correction value if over target CPU percent */ +}; +static struct DefragContext defrag = {0, 0, 0, 0, 1.0f}; + +/* There are a number of stages which process a kvstore. To simplify this, a stage helper function + * `defragStageKvstoreHelper()` is defined. This function aids in iterating over the kvstore. It + * uses these definitions. + */ +/* State of the kvstore helper. The context passed to the kvstore helper MUST BEGIN + * with a kvstoreIterState (or be passed as NULL). */ +#define KVS_SLOT_DEFRAG_LUT -2 +#define KVS_SLOT_UNASSIGNED -1 +typedef struct { + kvstore *kvs; + int slot; + unsigned long cursor; +} kvstoreIterState; +#define INIT_KVSTORE_STATE(kvs) ((kvstoreIterState){(kvs), KVS_SLOT_DEFRAG_LUT, 0}) + +/* The kvstore helper uses this function to perform tasks before continuing the iteration. For the + * main dictionary, large items are set aside and processed by this function before continuing with + * iteration over the kvstore. + * endtime - This is the monotonic time that the function should end and return. + * ctx - Context for functions invoked by the helper. If provided in the call to + * `defragStageKvstoreHelper()`, the `kvstoreIterState` portion (at the beginning) + * will be updated with the current kvstore iteration status. + * + * Returns: + * - DEFRAG_DONE if the pre-continue work is complete + * - DEFRAG_NOT_DONE if there is more work to do + */ +typedef doneStatus (*kvstoreHelperPreContinueFn)(void *ctx, monotime endtime); + +typedef struct { + kvstoreIterState kvstate; + int dbid; + + /* When scanning a main kvstore, large elements are queued for later handling rather than + * causing a large latency spike while processing a hash table bucket. This list is only used + * for stage: "defragStageDbKeys". It will only contain values for the current kvstore being + * defragged. + * Note that this is a list of key names. It's possible that the key may be deleted or modified + * before "later" and we will search by key name to find the entry when we defrag the item later. */ + list *defrag_later; + unsigned long defrag_later_cursor; +} defragKeysCtx; +static_assert(offsetof(defragKeysCtx, kvstate) == 0, "defragStageKvstoreHelper requires this"); + +/* Context for pubsub kvstores */ +typedef dict *(*getClientChannelsFn)(client *); +typedef struct { + kvstoreIterState kvstate; + getClientChannelsFn getPubSubChannels; } defragPubSubCtx; +static_assert(offsetof(defragPubSubCtx, kvstate) == 0, "defragStageKvstoreHelper requires this"); /* this method was added to jemalloc in order to help us understand which * pointers are worthwhile moving and which aren't */ @@ -336,36 +432,6 @@ void activeDefragHfieldDict(dict *d) { } /* Defrag a list of ptr, sds or robj string values */ -void activeDefragList(list *l, int val_type) { - listNode *ln, *newln; - for (ln = l->head; ln; ln = ln->next) { - if ((newln = activeDefragAlloc(ln))) { - if (newln->prev) - newln->prev->next = newln; - else - l->head = newln; - if (newln->next) - newln->next->prev = newln; - else - l->tail = newln; - ln = newln; - } - if (val_type == DEFRAG_SDS_DICT_VAL_IS_SDS) { - sds newsds, sdsele = ln->value; - if ((newsds = activeDefragSds(sdsele))) - ln->value = newsds; - } else if (val_type == DEFRAG_SDS_DICT_VAL_IS_STROB) { - robj *newele, *ele = ln->value; - if ((newele = activeDefragStringOb(ele))) - ln->value = newele; - } else if (val_type == DEFRAG_SDS_DICT_VAL_VOID_PTR) { - void *newptr, *ptr = ln->value; - if ((newptr = activeDefragAlloc(ptr))) - ln->value = newptr; - } - } -} - void activeDefragQuickListNode(quicklist *ql, quicklistNode **node_ref) { quicklistNode *newnode, *node = *node_ref; unsigned char *newzl; @@ -395,13 +461,18 @@ void activeDefragQuickListNodes(quicklist *ql) { /* when the value has lots of elements, we want to handle it later and not as * part of the main dictionary scan. this is needed in order to prevent latency * spikes when handling large items */ -void defragLater(redisDb *db, dictEntry *kde) { +void defragLater(defragKeysCtx *ctx, dictEntry *kde) { + if (!ctx->defrag_later) { + ctx->defrag_later = listCreate(); + listSetFreeMethod(ctx->defrag_later, sdsfreegeneric); + ctx->defrag_later_cursor = 0; + } sds key = sdsdup(dictGetKey(kde)); - listAddNodeTail(db->defrag_later, key); + listAddNodeTail(ctx->defrag_later, key); } /* returns 0 if no more work needs to be been done, and 1 if time is up and more work is needed. */ -long scanLaterList(robj *ob, unsigned long *cursor, long long endtime) { +long scanLaterList(robj *ob, unsigned long *cursor, monotime endtime) { quicklist *ql = ob->ptr; quicklistNode *node; long iterations = 0; @@ -427,7 +498,7 @@ long scanLaterList(robj *ob, unsigned long *cursor, long long endtime) { activeDefragQuickListNode(ql, &node); server.stat_active_defrag_scanned++; if (++iterations > 128 && !bookmark_failed) { - if (ustime() > endtime) { + if (getMonotonicUs() > endtime) { if (!quicklistBookmarkCreate(&ql, "_AD", node)) { bookmark_failed = 1; } else { @@ -495,19 +566,19 @@ void scanLaterHash(robj *ob, unsigned long *cursor) { *cursor = dictScanDefrag(d, *cursor, activeDefragHfieldDictCallback, &defragfns, d); } -void defragQuicklist(redisDb *db, dictEntry *kde) { +void defragQuicklist(defragKeysCtx *ctx, dictEntry *kde) { robj *ob = dictGetVal(kde); quicklist *ql = ob->ptr, *newql; serverAssert(ob->type == OBJ_LIST && ob->encoding == OBJ_ENCODING_QUICKLIST); if ((newql = activeDefragAlloc(ql))) ob->ptr = ql = newql; if (ql->len > server.active_defrag_max_scan_fields) - defragLater(db, kde); + defragLater(ctx, kde); else activeDefragQuickListNodes(ql); } -void defragZsetSkiplist(redisDb *db, dictEntry *kde) { +void defragZsetSkiplist(defragKeysCtx *ctx, dictEntry *kde) { robj *ob = dictGetVal(kde); zset *zs = (zset*)ob->ptr; zset *newzs; @@ -523,7 +594,7 @@ void defragZsetSkiplist(redisDb *db, dictEntry *kde) { if ((newheader = activeDefragAlloc(zs->zsl->header))) zs->zsl->header = newheader; if (dictSize(zs->dict) > server.active_defrag_max_scan_fields) - defragLater(db, kde); + defragLater(ctx, kde); else { dictIterator *di = dictGetIterator(zs->dict); while((de = dictNext(di)) != NULL) { @@ -536,13 +607,13 @@ void defragZsetSkiplist(redisDb *db, dictEntry *kde) { zs->dict = newdict; } -void defragHash(redisDb *db, dictEntry *kde) { +void defragHash(defragKeysCtx *ctx, dictEntry *kde) { robj *ob = dictGetVal(kde); dict *d, *newd; serverAssert(ob->type == OBJ_HASH && ob->encoding == OBJ_ENCODING_HT); d = ob->ptr; if (dictSize(d) > server.active_defrag_max_scan_fields) - defragLater(db, kde); + defragLater(ctx, kde); else activeDefragHfieldDict(d); /* defrag the dict struct and tables */ @@ -550,13 +621,13 @@ void defragHash(redisDb *db, dictEntry *kde) { ob->ptr = newd; } -void defragSet(redisDb *db, dictEntry *kde) { +void defragSet(defragKeysCtx *ctx, dictEntry *kde) { robj *ob = dictGetVal(kde); dict *d, *newd; serverAssert(ob->type == OBJ_SET && ob->encoding == OBJ_ENCODING_HT); d = ob->ptr; if (dictSize(d) > server.active_defrag_max_scan_fields) - defragLater(db, kde); + defragLater(ctx, kde); else activeDefragSdsDict(d, DEFRAG_SDS_DICT_NO_VAL); /* defrag the dict struct and tables */ @@ -576,7 +647,7 @@ int defragRaxNode(raxNode **noderef) { } /* returns 0 if no more work needs to be been done, and 1 if time is up and more work is needed. */ -int scanLaterStreamListpacks(robj *ob, unsigned long *cursor, long long endtime) { +int scanLaterStreamListpacks(robj *ob, unsigned long *cursor, monotime endtime) { static unsigned char last[sizeof(streamID)]; raxIterator ri; long iterations = 0; @@ -613,7 +684,7 @@ int scanLaterStreamListpacks(robj *ob, unsigned long *cursor, long long endtime) raxSetData(ri.node, ri.data=newdata); server.stat_active_defrag_scanned++; if (++iterations > 128) { - if (ustime() > endtime) { + if (getMonotonicUs() > endtime) { serverAssert(ri.key_len==sizeof(last)); memcpy(last,ri.key,ri.key_len); raxStop(&ri); @@ -703,7 +774,7 @@ void* defragStreamConsumerGroup(raxIterator *ri, void *privdata) { return NULL; } -void defragStream(redisDb *db, dictEntry *kde) { +void defragStream(defragKeysCtx *ctx, dictEntry *kde) { robj *ob = dictGetVal(kde); serverAssert(ob->type == OBJ_STREAM && ob->encoding == OBJ_ENCODING_STREAM); stream *s = ob->ptr, *news; @@ -716,7 +787,7 @@ void defragStream(redisDb *db, dictEntry *kde) { rax *newrax = activeDefragAlloc(s->rax); if (newrax) s->rax = newrax; - defragLater(db, kde); + defragLater(ctx, kde); } else defragRadixTree(&s->rax, 1, NULL, NULL); @@ -727,24 +798,25 @@ void defragStream(redisDb *db, dictEntry *kde) { /* Defrag a module key. This is either done immediately or scheduled * for later. Returns then number of pointers defragged. */ -void defragModule(redisDb *db, dictEntry *kde) { +void defragModule(defragKeysCtx *ctx, redisDb *db, dictEntry *kde) { robj *obj = dictGetVal(kde); serverAssert(obj->type == OBJ_MODULE); robj keyobj; initStaticStringObject(keyobj, dictGetKey(kde)); if (!moduleDefragValue(&keyobj, obj, db->id)) - defragLater(db, kde); + defragLater(ctx, kde); } /* for each key we scan in the main dict, this function will attempt to defrag * all the various pointers it has. */ -void defragKey(defragCtx *ctx, dictEntry *de) { +void defragKey(defragKeysCtx *ctx, dictEntry *de) { + redisDb *db = &server.db[ctx->dbid]; + int slot = ctx->kvstate.slot; sds keysds = dictGetKey(de); robj *newob, *ob = dictGetVal(de); unsigned char *newzl; sds newsds; - redisDb *db = ctx->privdata; - int slot = ctx->slot; + /* Try to defrag the key name. */ newsds = activeDefragSds(keysds); if (newsds) { @@ -781,7 +853,7 @@ void defragKey(defragCtx *ctx, dictEntry *de) { /* Already handled in activeDefragStringOb. */ } else if (ob->type == OBJ_LIST) { if (ob->encoding == OBJ_ENCODING_QUICKLIST) { - defragQuicklist(db, de); + defragQuicklist(ctx, de); } else if (ob->encoding == OBJ_ENCODING_LISTPACK) { if ((newzl = activeDefragAlloc(ob->ptr))) ob->ptr = newzl; @@ -790,7 +862,7 @@ void defragKey(defragCtx *ctx, dictEntry *de) { } } else if (ob->type == OBJ_SET) { if (ob->encoding == OBJ_ENCODING_HT) { - defragSet(db, de); + defragSet(ctx, de); } else if (ob->encoding == OBJ_ENCODING_INTSET || ob->encoding == OBJ_ENCODING_LISTPACK) { @@ -805,7 +877,7 @@ void defragKey(defragCtx *ctx, dictEntry *de) { if ((newzl = activeDefragAlloc(ob->ptr))) ob->ptr = newzl; } else if (ob->encoding == OBJ_ENCODING_SKIPLIST) { - defragZsetSkiplist(db, de); + defragZsetSkiplist(ctx, de); } else { serverPanic("Unknown sorted set encoding"); } @@ -820,23 +892,23 @@ void defragKey(defragCtx *ctx, dictEntry *de) { if ((newzl = activeDefragAlloc(lpt->lp))) lpt->lp = newzl; } else if (ob->encoding == OBJ_ENCODING_HT) { - defragHash(db, de); + defragHash(ctx, de); } else { serverPanic("Unknown hash encoding"); } } else if (ob->type == OBJ_STREAM) { - defragStream(db, de); + defragStream(ctx, de); } else if (ob->type == OBJ_MODULE) { - defragModule(db, de); + defragModule(ctx,db, de); } else { serverPanic("Unknown object type"); } } /* Defrag scan callback for the main db dictionary. */ -void defragScanCallback(void *privdata, const dictEntry *de) { +static void dbKeysScanCallback(void *privdata, const dictEntry *de) { long long hits_before = server.stat_active_defrag_hits; - defragKey((defragCtx*)privdata, (dictEntry*)de); + defragKey((defragKeysCtx *)privdata, (dictEntry *)de); if (server.stat_active_defrag_hits != hits_before) server.stat_active_defrag_key_hits++; else @@ -880,9 +952,8 @@ float getAllocatorFragmentation(size_t *out_frag_bytes) { /* Defrag scan callback for the pubsub dictionary. */ void defragPubsubScanCallback(void *privdata, const dictEntry *de) { - defragCtx *ctx = privdata; - defragPubSubCtx *pubsub_ctx = ctx->privdata; - kvstore *pubsub_channels = pubsub_ctx->pubsub_channels; + defragPubSubCtx *ctx = privdata; + kvstore *pubsub_channels = ctx->kvstate.kvs; robj *newchannel, *channel = dictGetKey(de); dict *newclients, *clients = dictGetVal(de); @@ -890,7 +961,7 @@ void defragPubsubScanCallback(void *privdata, const dictEntry *de) { serverAssert(channel->refcount == (int)dictSize(clients) + 1); newchannel = activeDefragStringObEx(channel, dictSize(clients) + 1); if (newchannel) { - kvstoreDictSetKey(pubsub_channels, ctx->slot, (dictEntry*)de, newchannel); + kvstoreDictSetKey(pubsub_channels, ctx->kvstate.slot, (dictEntry*)de, newchannel); /* The channel name is shared by the client's pubsub(shard) and server's * pubsub(shard), after defraging the channel name, we need to update @@ -899,36 +970,24 @@ void defragPubsubScanCallback(void *privdata, const dictEntry *de) { dictEntry *clientde; while((clientde = dictNext(di)) != NULL) { client *c = dictGetKey(clientde); - dictEntry *pubsub_channel = dictFind(pubsub_ctx->clientPubSubChannels(c), newchannel); + dict *client_channels = ctx->getPubSubChannels(c); + dictEntry *pubsub_channel = dictFind(client_channels, newchannel); serverAssert(pubsub_channel); - dictSetKey(pubsub_ctx->clientPubSubChannels(c), pubsub_channel, newchannel); + dictSetKey(ctx->getPubSubChannels(c), pubsub_channel, newchannel); } dictReleaseIterator(di); } /* Try to defrag the dictionary of clients that is stored as the value part. */ if ((newclients = dictDefragTables(clients))) - kvstoreDictSetVal(pubsub_channels, ctx->slot, (dictEntry*)de, newclients); + kvstoreDictSetVal(pubsub_channels, ctx->kvstate.slot, (dictEntry *)de, newclients); server.stat_active_defrag_scanned++; } -/* We may need to defrag other globals, one small allocation can hold a full allocator run. - * so although small, it is still important to defrag these */ -void defragOtherGlobals(void) { - - /* there are many more pointers to defrag (e.g. client argv, output / aof buffers, etc. - * but we assume most of these are short lived, we only need to defrag allocations - * that remain static for a long time */ - activeDefragSdsDict(evalScriptsDict(), DEFRAG_SDS_DICT_VAL_LUA_SCRIPT); - moduleDefragGlobals(); - kvstoreDictLUTDefrag(server.pubsub_channels, dictDefragTables); - kvstoreDictLUTDefrag(server.pubsubshard_channels, dictDefragTables); -} - /* returns 0 more work may or may not be needed (see non-zero cursor), * and 1 if time is up and more work is needed. */ -int defragLaterItem(dictEntry *de, unsigned long *cursor, long long endtime, int dbid) { +int defragLaterItem(dictEntry *de, unsigned long *cursor, monotime endtime, int dbid) { if (de) { robj *ob = dictGetVal(de); if (ob->type == OBJ_LIST) { @@ -942,9 +1001,10 @@ int defragLaterItem(dictEntry *de, unsigned long *cursor, long long endtime, int } else if (ob->type == OBJ_STREAM) { return scanLaterStreamListpacks(ob, cursor, endtime); } else if (ob->type == OBJ_MODULE) { + long long endtimeWallClock = ustime() + (endtime - getMonotonicUs()); robj keyobj; initStaticStringObject(keyobj, dictGetKey(de)); - return moduleLateDefrag(&keyobj, ob, cursor, endtime, dbid); + return moduleLateDefrag(&keyobj, ob, cursor, endtimeWallClock, dbid); } else { *cursor = 0; /* object type may have changed since we schedule it for later */ } @@ -954,78 +1014,55 @@ int defragLaterItem(dictEntry *de, unsigned long *cursor, long long endtime, int return 0; } -/* static variables serving defragLaterStep to continue scanning a key from were we stopped last time. */ -static sds defrag_later_current_key = NULL; -static unsigned long defrag_later_cursor = 0; +static int defragIsRunning(void) { + return (defrag.timeproc_id > 0); +} + +/* A kvstoreHelperPreContinueFn */ +static doneStatus defragLaterStep(void *ctx, monotime endtime) { + defragKeysCtx *defrag_keys_ctx = ctx; -/* returns 0 if no more work needs to be been done, and 1 if time is up and more work is needed. */ -int defragLaterStep(redisDb *db, int slot, long long endtime) { unsigned int iterations = 0; unsigned long long prev_defragged = server.stat_active_defrag_hits; unsigned long long prev_scanned = server.stat_active_defrag_scanned; - long long key_defragged; - do { - /* if we're not continuing a scan from the last call or loop, start a new one */ - if (!defrag_later_cursor) { - listNode *head = listFirst(db->defrag_later); + while (defrag_keys_ctx->defrag_later && listLength(defrag_keys_ctx->defrag_later) > 0) { + listNode *head = listFirst(defrag_keys_ctx->defrag_later); + sds key = head->value; + dictEntry *de = kvstoreDictFind(defrag_keys_ctx->kvstate.kvs, defrag_keys_ctx->kvstate.slot, key); - /* Move on to next key */ - if (defrag_later_current_key) { - serverAssert(defrag_later_current_key == head->value); - listDelNode(db->defrag_later, head); - defrag_later_cursor = 0; - defrag_later_current_key = NULL; - } - - /* stop if we reached the last one. */ - head = listFirst(db->defrag_later); - if (!head) - return 0; - - /* start a new key */ - defrag_later_current_key = head->value; - defrag_later_cursor = 0; + long long key_defragged = server.stat_active_defrag_hits; + int timeout = (defragLaterItem(de, &defrag_keys_ctx->defrag_later_cursor, endtime, defrag_keys_ctx->dbid) == 1); + if (key_defragged != server.stat_active_defrag_hits) { + server.stat_active_defrag_key_hits++; + } else { + server.stat_active_defrag_key_misses++; } - /* each time we enter this function we need to fetch the key from the dict again (if it still exists) */ - dictEntry *de = kvstoreDictFind(db->keys, slot, defrag_later_current_key); - key_defragged = server.stat_active_defrag_hits; - do { - int quit = 0; - if (defragLaterItem(de, &defrag_later_cursor, endtime,db->id)) - quit = 1; /* time is up, we didn't finish all the work */ + if (timeout) break; - /* Once in 16 scan iterations, 512 pointer reallocations, or 64 fields - * (if we have a lot of pointers in one hash bucket, or rehashing), - * check if we reached the time limit. */ - if (quit || (++iterations > 16 || - server.stat_active_defrag_hits - prev_defragged > 512 || - server.stat_active_defrag_scanned - prev_scanned > 64)) { - if (quit || ustime() > endtime) { - if(key_defragged != server.stat_active_defrag_hits) - server.stat_active_defrag_key_hits++; - else - server.stat_active_defrag_key_misses++; - return 1; - } - iterations = 0; - prev_defragged = server.stat_active_defrag_hits; - prev_scanned = server.stat_active_defrag_scanned; - } - } while(defrag_later_cursor); - if(key_defragged != server.stat_active_defrag_hits) - server.stat_active_defrag_key_hits++; - else - server.stat_active_defrag_key_misses++; - } while(1); + if (defrag_keys_ctx->defrag_later_cursor == 0) { + /* the item is finished, move on */ + listDelNode(defrag_keys_ctx->defrag_later, head); + } + + if (++iterations > 16 || server.stat_active_defrag_hits - prev_defragged > 512 || + server.stat_active_defrag_scanned - prev_scanned > 64) { + if (getMonotonicUs() > endtime) break; + iterations = 0; + prev_defragged = server.stat_active_defrag_hits; + prev_scanned = server.stat_active_defrag_scanned; + } + } + + return (!defrag_keys_ctx->defrag_later || listLength(defrag_keys_ctx->defrag_later) == 0) ? DEFRAG_DONE : DEFRAG_NOT_DONE; } #define INTERPOLATE(x, x1, x2, y1, y2) ( (y1) + ((x)-(x1)) * ((y2)-(y1)) / ((x2)-(x1)) ) #define LIMIT(y, min, max) ((y)<(min)? min: ((y)>(max)? max: (y))) /* decide if defrag is needed, and at what CPU effort to invest in it */ -void computeDefragCycles(float decay_rate) { +void computeDefragCycles(void) { size_t frag_bytes; float frag_pct = getAllocatorFragmentation(&frag_bytes); /* If we're not already running, and below the threshold, exit. */ @@ -1041,7 +1078,7 @@ void computeDefragCycles(float decay_rate) { server.active_defrag_threshold_upper, server.active_defrag_cycle_min, server.active_defrag_cycle_max); - cpu_pct *= decay_rate; + cpu_pct *= defrag.decay_rate; cpu_pct = LIMIT(cpu_pct, server.active_defrag_cycle_min, server.active_defrag_cycle_max); @@ -1052,246 +1089,451 @@ void computeDefragCycles(float decay_rate) { if (cpu_pct > server.active_defrag_running || server.active_defrag_configuration_changed) { - server.active_defrag_running = cpu_pct; server.active_defrag_configuration_changed = 0; - serverLog(LL_VERBOSE, - "Starting active defrag, frag=%.0f%%, frag_bytes=%zu, cpu=%d%%", - frag_pct, frag_bytes, cpu_pct); + if (defragIsRunning()) { + serverLog(LL_VERBOSE, "Changing active defrag CPU, frag=%.0f%%, frag_bytes=%zu, cpu=%d%%", + frag_pct, frag_bytes, cpu_pct); + } else { + serverLog(LL_VERBOSE, + "Starting active defrag, frag=%.0f%%, frag_bytes=%zu, cpu=%d%%", + frag_pct, frag_bytes, cpu_pct); + } + server.active_defrag_running = cpu_pct; } } -/* Perform incremental defragmentation work from the serverCron. - * This works in a similar way to activeExpireCycle, in the sense that - * we do incremental work across calls. */ -void activeDefragCycle(void) { - static int slot = -1; - static int current_db = -1; - static int defrag_later_item_in_progress = 0; - static int defrag_stage = 0; - static unsigned long defrag_cursor = 0; - static redisDb *db = NULL; - static long long start_scan, start_hits, start_misses; - static float start_frag_pct; - static float decay_rate = 1.0f; +/* This helper function handles most of the work for iterating over a kvstore. 'privdata', if + * provided, MUST begin with 'kvstoreIterState' and this part is automatically updated by this + * function during the iteration. */ +static doneStatus defragStageKvstoreHelper(monotime endtime, + void *ctx, + dictScanFunction scan_fn, + kvstoreHelperPreContinueFn precontinue_fn, + dictDefragFunctions *defragfns) +{ unsigned int iterations = 0; unsigned long long prev_defragged = server.stat_active_defrag_hits; unsigned long long prev_scanned = server.stat_active_defrag_scanned; - long long start, timelimit, endtime; - mstime_t latency; - int all_stages_finished = 0; - int quit = 0; + kvstoreIterState *state = (kvstoreIterState*)ctx; + + if (state->slot == KVS_SLOT_DEFRAG_LUT) { + /* Before we start scanning the kvstore, handle the main structures */ + do { + state->cursor = kvstoreDictLUTDefrag(state->kvs, state->cursor, dictDefragTables); + if (getMonotonicUs() >= endtime) return DEFRAG_NOT_DONE; + } while (state->cursor != 0); + state->slot = KVS_SLOT_UNASSIGNED; + } + + while (1) { + if (++iterations > 16 || server.stat_active_defrag_hits - prev_defragged > 512 || server.stat_active_defrag_scanned - prev_scanned > 64) { + if (getMonotonicUs() >= endtime) break; + iterations = 0; + prev_defragged = server.stat_active_defrag_hits; + prev_scanned = server.stat_active_defrag_scanned; + } + + if (precontinue_fn) { + if (precontinue_fn(ctx, endtime) == DEFRAG_NOT_DONE) return DEFRAG_NOT_DONE; + } + + if (!state->cursor) { + /* If there's no cursor, we're ready to begin a new kvstore slot. */ + if (state->slot == KVS_SLOT_UNASSIGNED) { + state->slot = kvstoreGetFirstNonEmptyDictIndex(state->kvs); + } else { + state->slot = kvstoreGetNextNonEmptyDictIndex(state->kvs, state->slot); + } + + if (state->slot == KVS_SLOT_UNASSIGNED) return DEFRAG_DONE; + } + + /* Whatever privdata's actual type, this function requires that it begins with kvstoreIterState. */ + state->cursor = kvstoreDictScanDefrag(state->kvs, state->slot, state->cursor, + scan_fn, defragfns, ctx); + } + + return DEFRAG_NOT_DONE; +} + +static doneStatus defragStageDbKeys(void *ctx, monotime endtime) { + defragKeysCtx *defrag_keys_ctx = ctx; + redisDb *db = &server.db[defrag_keys_ctx->dbid]; + if (db->keys != defrag_keys_ctx->kvstate.kvs) { + /* There has been a change of the kvs (flushdb, swapdb, etc.). Just complete the stage. */ + return DEFRAG_DONE; + } + + /* Note: for DB keys, we use the start/finish callback to fix an expires table entry if + * the main DB entry has been moved. */ + static dictDefragFunctions defragfns = { + .defragAlloc = activeDefragAlloc, + .defragKey = NULL, /* Handled by dbKeysScanCallback */ + .defragVal = NULL, /* Handled by dbKeysScanCallback */ + }; + + return defragStageKvstoreHelper(endtime, ctx, + dbKeysScanCallback, defragLaterStep, &defragfns); +} + +static doneStatus defragStageExpiresKvstore(void *ctx, monotime endtime) { + defragKeysCtx *defrag_keys_ctx = ctx; + redisDb *db = &server.db[defrag_keys_ctx->dbid]; + if (db->keys != defrag_keys_ctx->kvstate.kvs) { + /* There has been a change of the kvs (flushdb, swapdb, etc.). Just complete the stage. */ + return DEFRAG_DONE; + } + + static dictDefragFunctions defragfns = { + .defragAlloc = activeDefragAlloc, + .defragKey = NULL, /* Not needed for expires (just a ref) */ + .defragVal = NULL, /* Not needed for expires (no value) */ + }; + return defragStageKvstoreHelper(endtime, ctx, + scanCallbackCountScanned, NULL, &defragfns); +} + +static doneStatus defragStagePubsubKvstore(void *ctx, monotime endtime) { + static dictDefragFunctions defragfns = { + .defragAlloc = activeDefragAlloc, + .defragKey = NULL, /* Handled by defragPubsubScanCallback */ + .defragVal = NULL, /* Not needed for expires (no value) */ + }; + + return defragStageKvstoreHelper(endtime, ctx, + defragPubsubScanCallback, NULL, &defragfns); +} + +static doneStatus defragLuaScripts(void *ctx, monotime endtime) { + UNUSED(endtime); + UNUSED(ctx); + activeDefragSdsDict(evalScriptsDict(), DEFRAG_SDS_DICT_VAL_LUA_SCRIPT); + return DEFRAG_DONE; +} + +static doneStatus defragModuleGlobals(void *ctx, monotime endtime) { + UNUSED(endtime); + UNUSED(ctx); + moduleDefragGlobals(); + return DEFRAG_DONE; +} + +static void freeDefragKeysContext(void *ctx) { + defragKeysCtx *defrag_keys_ctx = ctx; + if (defrag_keys_ctx->defrag_later) { + listRelease(defrag_keys_ctx->defrag_later); + } + zfree(defrag_keys_ctx); +} + +static void freeDefragContext(void *ptr) { + StageDescriptor *stage = ptr; + if (stage->ctx_free_fn) + stage->ctx_free_fn(stage->ctx); + zfree(stage); +} + +static void addDefragStage(defragStageFn stage_fn, defragStageContextFreeFn ctx_free_fn, void *ctx) { + StageDescriptor *stage = zmalloc(sizeof(StageDescriptor)); + stage->stage_fn = stage_fn; + stage->ctx_free_fn = ctx_free_fn; + stage->ctx = ctx; + listAddNodeTail(defrag.remaining_stages, stage); +} + +/* Updates the defrag decay rate based on the observed effectiveness of the defrag process. + * The decay rate is used to gradually slow down defrag when it's not being effective. */ +static void updateDefragDecayRate(float frag_pct) { + long long last_hits = server.stat_active_defrag_hits - defrag.start_defrag_hits; + long long last_misses = server.stat_active_defrag_misses - defrag.start_defrag_misses; + float last_frag_pct_change = defrag.start_frag_pct - frag_pct; + /* When defragmentation efficiency is low, we gradually reduce the + * speed for the next cycle to avoid CPU waste. However, in the + * following two cases, we keep the normal speed: + * 1) If the fragmentation percentage has increased or decreased by more than 2%. + * 2) If the fragmentation percentage decrease is small, but hits are above 1%, + * we still keep the normal speed. */ + if (fabs(last_frag_pct_change) > 2 || + (last_frag_pct_change < 0 && last_hits >= (last_hits + last_misses) * 0.01)) + { + defrag.decay_rate = 1.0f; + } else { + defrag.decay_rate *= 0.9; + } +} + +/* Called at the end of a complete defrag cycle, or when defrag is terminated */ +static void endDefragCycle(int normal_termination) { + if (normal_termination) { + /* For normal termination, we expect... */ + serverAssert(!defrag.current_stage); + serverAssert(listLength(defrag.remaining_stages) == 0); + } else { + /* Defrag is being terminated abnormally */ + aeDeleteTimeEvent(server.el, defrag.timeproc_id); + + if (defrag.current_stage) { + listDelNode(defrag.remaining_stages, defrag.current_stage); + defrag.current_stage = NULL; + } + } + defrag.timeproc_id = AE_DELETED_EVENT_ID; + + listRelease(defrag.remaining_stages); + defrag.remaining_stages = NULL; + + size_t frag_bytes; + float frag_pct = getAllocatorFragmentation(&frag_bytes); + serverLog(LL_VERBOSE, "Active defrag done in %dms, reallocated=%d, frag=%.0f%%, frag_bytes=%zu", + (int)elapsedMs(defrag.start_cycle), (int)(server.stat_active_defrag_hits - defrag.start_defrag_hits), + frag_pct, frag_bytes); + + server.stat_total_active_defrag_time += elapsedUs(server.stat_last_active_defrag_time); + server.stat_last_active_defrag_time = 0; + server.active_defrag_running = 0; + + updateDefragDecayRate(frag_pct); + moduleDefragEnd(); + + /* Immediately check to see if we should start another defrag cycle. */ + activeDefragCycle(); +} + +/* Must be called at the start of the timeProc as it measures the delay from the end of the previous + * timeProc invocation when performing the computation. */ +static int computeDefragCycleUs(void) { + long dutyCycleUs; + + int targetCpuPercent = server.active_defrag_running; + serverAssert(targetCpuPercent > 0 && targetCpuPercent < 100); + + static int prevCpuPercent = 0; /* STATIC - this persists */ + if (targetCpuPercent != prevCpuPercent) { + /* If the targetCpuPercent changes, the value might be different from when the last wait + * time was computed. In this case, don't consider wait time. (This is really only an + * issue in crazy tests that dramatically increase CPU while defrag is running.) */ + defrag.timeproc_end_time = 0; + prevCpuPercent = targetCpuPercent; + } + + /* Given when the last duty cycle ended, compute time needed to achieve the desired percentage. */ + if (defrag.timeproc_end_time == 0) { + /* Either the first call to the timeProc, or we were paused for some reason. */ + defrag.timeproc_overage_us = 0; + dutyCycleUs = DEFRAG_CYCLE_US; + } else { + long waitedUs = getMonotonicUs() - defrag.timeproc_end_time; + /* Given the elapsed wait time between calls, compute the necessary duty time needed to + * achieve the desired CPU percentage. + * With: D = duty time, W = wait time, P = percent + * Solve: D P + * ----- = ----- + * D + W 100 + * Solving for D: + * D = P * W / (100 - P) + * + * Note that dutyCycleUs addresses starvation. If the wait time was long, we will compensate + * with a proportionately long duty-cycle. This won't significantly affect perceived + * latency, because clients are already being impacted by the long cycle time which caused + * the starvation of the timer. */ + dutyCycleUs = targetCpuPercent * waitedUs / (100 - targetCpuPercent); + + /* Also adjust for any accumulated overage. */ + dutyCycleUs -= defrag.timeproc_overage_us; + defrag.timeproc_overage_us = 0; + + if (dutyCycleUs < DEFRAG_CYCLE_US) { + /* We never reduce our cycle time, that would increase overhead. Instead, we track this + * as part of the overage, and increase wait time between cycles. */ + defrag.timeproc_overage_us = DEFRAG_CYCLE_US - dutyCycleUs; + dutyCycleUs = DEFRAG_CYCLE_US; + } else if (dutyCycleUs > DEFRAG_CYCLE_US * 10) { + /* Add a time limit for the defrag duty cycle to prevent excessive latency. + * When latency is already high (indicated by a long time between calls), + * we don't want to make it worse by running defrag for too long. */ + dutyCycleUs = DEFRAG_CYCLE_US * 10; + } + } + return dutyCycleUs; +} + +/* Must be called at the end of the timeProc as it records the timeproc_end_time for use in the next + * computeDefragCycleUs computation. */ +static int computeDelayMs(monotime intendedEndtime) { + defrag.timeproc_end_time = getMonotonicUs(); + long overage = defrag.timeproc_end_time - intendedEndtime; + defrag.timeproc_overage_us += overage; /* track over/under desired CPU */ + /* Allow negative overage (underage) to count against existing overage, but don't allow + * underage (from short stages) to be accumulated. */ + if (defrag.timeproc_overage_us < 0) defrag.timeproc_overage_us = 0; + + int targetCpuPercent = server.active_defrag_running; + serverAssert(targetCpuPercent > 0 && targetCpuPercent < 100); + + /* Given the desired duty cycle, what inter-cycle delay do we need to achieve that? */ + /* We want to achieve a specific CPU percent. To do that, we can't use a skewed computation. */ + /* Example, if we run for 1ms and delay 10ms, that's NOT 10%, because the total cycle time is 11ms. */ + /* Instead, if we rum for 1ms, our total time should be 10ms. So the delay is only 9ms. */ + long totalCycleTimeUs = DEFRAG_CYCLE_US * 100 / targetCpuPercent; + long delayUs = totalCycleTimeUs - DEFRAG_CYCLE_US; + /* Only increase delay by the fraction of the overage that would be non-duty-cycle */ + delayUs += defrag.timeproc_overage_us * (100 - targetCpuPercent) / 100; + if (delayUs < 0) delayUs = 0; + long delayMs = delayUs / 1000; /* round down */ + return delayMs; +} + +/* An independent time proc for defrag. While defrag is running, this is called much more often + * than the server cron. Frequent short calls provides low latency impact. */ +static int activeDefragTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData) { + UNUSED(eventLoop); + UNUSED(id); + UNUSED(clientData); + + /* This timer shouldn't be registered unless there's work to do. */ + serverAssert(defrag.current_stage || listLength(defrag.remaining_stages) > 0); if (!server.active_defrag_enabled) { - if (server.active_defrag_running) { - /* if active defrag was disabled mid-run, start from fresh next time. */ - server.active_defrag_running = 0; - server.active_defrag_configuration_changed = 0; - if (db) - listEmpty(db->defrag_later); - defrag_later_current_key = NULL; - defrag_later_cursor = 0; - current_db = -1; - defrag_stage = 0; - defrag_cursor = 0; - slot = -1; - defrag_later_item_in_progress = 0; - db = NULL; - moduleDefragEnd(); - goto update_metrics; - } - return; + /* Defrag has been disabled while running */ + endDefragCycle(0); + return AE_NOMORE; } - if (hasActiveChildProcess()) - return; /* Defragging memory while there's a fork will just do damage. */ - - /* Once a second, check if the fragmentation justfies starting a scan - * or making it more aggressive. */ - run_with_period(1000) { - computeDefragCycles(decay_rate); + if (hasActiveChildProcess()) { + /* If there's a child process, pause the defrag, polling until the child completes. */ + defrag.timeproc_end_time = 0; /* prevent starvation recovery */ + return 100; } - /* Normally it is checked once a second, but when there is a configuration - * change, we want to check it as soon as possible. */ - if (server.active_defrag_configuration_changed) { - computeDefragCycles(decay_rate); - server.active_defrag_configuration_changed = 0; - } + monotime starttime = getMonotonicUs(); + int dutyCycleUs = computeDefragCycleUs(); + monotime endtime = starttime + dutyCycleUs; + int haveMoreWork = 1; - if (!server.active_defrag_running) - return; + /* Increment server.cronloops so that run_with_period works. */ + long hz_ms = 1000 / server.hz; + int cronloops = (server.mstime - server.blocked_last_cron + (hz_ms - 1)) / hz_ms; /* rounding up */ + server.blocked_last_cron += cronloops * hz_ms; + server.cronloops += cronloops; - /* See activeExpireCycle for how timelimit is handled. */ - start = ustime(); - timelimit = 1000000*server.active_defrag_running/server.hz/100; - if (timelimit <= 0) timelimit = 1; - endtime = start + timelimit; + mstime_t latency; latencyStartMonitor(latency); - dictDefragFunctions defragfns = {.defragAlloc = activeDefragAlloc}; do { - /* if we're not continuing a scan from the last call or loop, start a new one */ - if (!defrag_stage && !defrag_cursor && (slot < 0)) { - /* finish any leftovers from previous db before moving to the next one */ - if (db && defragLaterStep(db, slot, endtime)) { - quit = 1; /* time is up, we didn't finish all the work */ - break; /* this will exit the function and we'll continue on the next cycle */ - } - - if (current_db == -1) { - moduleDefragStart(); - } - - /* Move on to next database, and stop if we reached the last one. */ - if (++current_db >= server.dbnum) { - /* defrag other items not part of the db / keys */ - defragOtherGlobals(); - - long long now = ustime(); - size_t frag_bytes; - float frag_pct = getAllocatorFragmentation(&frag_bytes); - serverLog(LL_VERBOSE, - "Active defrag done in %dms, reallocated=%d, frag=%.0f%%, frag_bytes=%zu", - (int)((now - start_scan)/1000), (int)(server.stat_active_defrag_hits - start_hits), frag_pct, frag_bytes); - - start_scan = now; - current_db = -1; - defrag_stage = 0; - defrag_cursor = 0; - slot = -1; - defrag_later_item_in_progress = 0; - db = NULL; - server.active_defrag_running = 0; - - long long last_hits = server.stat_active_defrag_hits - start_hits; - long long last_misses = server.stat_active_defrag_misses - start_misses; - float last_frag_pct_change = start_frag_pct - frag_pct; - /* When defragmentation efficiency is low, we gradually reduce the - * speed for the next cycle to avoid CPU waste. However, in the - * following two cases, we keep the normal speed: - * 1) If the fragmentation percentage has increased or decreased by more than 2%. - * 2) If the fragmentation percentage decrease is small, but hits are above 1%, - * we still keep the normal speed. */ - if (fabs(last_frag_pct_change) > 2 || - (last_frag_pct_change < 0 && last_hits >= (last_hits + last_misses) * 0.01)) - { - decay_rate = 1.0f; - } else { - decay_rate *= 0.9; - } - - moduleDefragEnd(); - - computeDefragCycles(decay_rate); /* if another scan is needed, start it right away */ - if (server.active_defrag_running != 0 && ustime() < endtime) - continue; - break; - } - else if (current_db==0) { - /* Start a scan from the first database. */ - start_scan = ustime(); - start_hits = server.stat_active_defrag_hits; - start_misses = server.stat_active_defrag_misses; - start_frag_pct = getAllocatorFragmentation(NULL); - } - - db = &server.db[current_db]; - kvstoreDictLUTDefrag(db->keys, dictDefragTables); - kvstoreDictLUTDefrag(db->expires, dictDefragTables); - defrag_stage = 0; - defrag_cursor = 0; - slot = -1; - defrag_later_item_in_progress = 0; + if (!defrag.current_stage) { + defrag.current_stage = listFirst(defrag.remaining_stages); } - /* This array of structures holds the parameters for all defragmentation stages. */ - typedef struct defragStage { - kvstore *kvs; - dictScanFunction *scanfn; - void *privdata; - } defragStage; - defragStage defrag_stages[] = { - {db->keys, defragScanCallback, db}, - {db->expires, scanCallbackCountScanned, NULL}, - {server.pubsub_channels, defragPubsubScanCallback, - &(defragPubSubCtx){server.pubsub_channels, getClientPubSubChannels}}, - {server.pubsubshard_channels, defragPubsubScanCallback, - &(defragPubSubCtx){server.pubsubshard_channels, getClientPubSubShardChannels}}, - }; - do { - int num_stages = sizeof(defrag_stages) / sizeof(defrag_stages[0]); - serverAssert(defrag_stage < num_stages); - defragStage *current_stage = &defrag_stages[defrag_stage]; + StageDescriptor *stage = listNodeValue(defrag.current_stage); + doneStatus status = stage->stage_fn(stage->ctx, endtime); + if (status == DEFRAG_DONE) { + listDelNode(defrag.remaining_stages, defrag.current_stage); + defrag.current_stage = NULL; + } - /* before scanning the next bucket, see if we have big keys left from the previous bucket to scan */ - if (defragLaterStep(db, slot, endtime)) { - quit = 1; /* time is up, we didn't finish all the work */ - break; /* this will exit the function and we'll continue on the next cycle */ - } - - if (!defrag_later_item_in_progress) { - /* Continue defragmentation from the previous stage. - * If slot is -1, it means this stage starts from the first non-empty slot. */ - if (slot == -1) slot = kvstoreGetFirstNonEmptyDictIndex(current_stage->kvs); - defrag_cursor = kvstoreDictScanDefrag(current_stage->kvs, slot, defrag_cursor, - current_stage->scanfn, &defragfns, &(defragCtx){current_stage->privdata, slot}); - } - - if (!defrag_cursor) { - /* Move to the next slot only if regular and large item scanning has been completed. */ - if (listLength(db->defrag_later) > 0) { - defrag_later_item_in_progress = 1; - continue; - } - - /* Move to the next slot in the current stage. If we've reached the end, move to the next stage. */ - if ((slot = kvstoreGetNextNonEmptyDictIndex(current_stage->kvs, slot)) == -1) - defrag_stage++; - defrag_later_item_in_progress = 0; - } - - /* Check if all defragmentation stages have been processed. - * If so, mark as finished and reset the stage counter to move on to next database. */ - if (defrag_stage == num_stages) { - all_stages_finished = 1; - defrag_stage = 0; - } - - /* Once in 16 scan iterations, 512 pointer reallocations. or 64 keys - * (if we have a lot of pointers in one hash bucket or rehashing), - * check if we reached the time limit. - * But regardless, don't start a new db in this loop, this is because after - * the last db we call defragOtherGlobals, which must be done in one cycle */ - if (all_stages_finished || - ++iterations > 16 || - server.stat_active_defrag_hits - prev_defragged > 512 || - server.stat_active_defrag_scanned - prev_scanned > 64) - { - /* Quit if all stages were finished or timeout. */ - if (all_stages_finished || ustime() > endtime) { - quit = 1; - break; - } - iterations = 0; - prev_defragged = server.stat_active_defrag_hits; - prev_scanned = server.stat_active_defrag_scanned; - } - } while(!all_stages_finished && !quit); - } while(!quit); + haveMoreWork = (defrag.current_stage || listLength(defrag.remaining_stages) > 0); + /* If we've completed a stage early, and still have a standard time allotment remaining, + * we'll start another stage. This can happen when defrag is running infrequently, and + * starvation protection has increased the duty-cycle. */ + } while (haveMoreWork && getMonotonicUs() <= endtime - DEFRAG_CYCLE_US); latencyEndMonitor(latency); - latencyAddSampleIfNeeded("active-defrag-cycle",latency); + latencyAddSampleIfNeeded("active-defrag-cycle", latency); -update_metrics: - if (server.active_defrag_running > 0) { - if (server.stat_last_active_defrag_time == 0) - elapsedStart(&server.stat_last_active_defrag_time); - } else if (server.stat_last_active_defrag_time != 0) { - server.stat_total_active_defrag_time += elapsedUs(server.stat_last_active_defrag_time); - server.stat_last_active_defrag_time = 0; + if (haveMoreWork) { + return computeDelayMs(endtime); + } else { + endDefragCycle(1); + return AE_NOMORE; /* Ends the timer proc */ } } +/* During long running scripts, or while loading, there is a periodic function for handling other + * actions. This interface allows defrag to continue running, avoiding a single long defrag step + * after the long operation completes. */ +void defragWhileBlocked(void) { + /* This is called infrequently, while timers are not active. We might need to start defrag. */ + if (!defragIsRunning()) activeDefragCycle(); + + if (!defragIsRunning()) return; + + /* Save off the timeproc_id. If we have a normal termination, it will be cleared. */ + long long timeproc_id = defrag.timeproc_id; + + /* Simulate a single call of the timer proc */ + long long reschedule_delay = activeDefragTimeProc(NULL, 0, NULL); + if (reschedule_delay == AE_NOMORE) { + /* If it's done, deregister the timer */ + aeDeleteTimeEvent(server.el, timeproc_id); + } + /* Otherwise, just ignore the reschedule_delay, the timer will pop the next time that the + * event loop can process timers again. */ +} + +static void beginDefragCycle(void) { + serverAssert(!defragIsRunning()); + + moduleDefragStart(); + + serverAssert(defrag.remaining_stages == NULL); + defrag.remaining_stages = listCreate(); + listSetFreeMethod(defrag.remaining_stages, freeDefragContext); + + for (int dbid = 0; dbid < server.dbnum; dbid++) { + redisDb *db = &server.db[dbid]; + + /* Add stage for keys. */ + defragKeysCtx *defrag_keys_ctx = zcalloc(sizeof(defragKeysCtx)); + defrag_keys_ctx->kvstate = INIT_KVSTORE_STATE(db->keys); + defrag_keys_ctx->dbid = dbid; + addDefragStage(defragStageDbKeys, freeDefragKeysContext, defrag_keys_ctx); + + /* Add stage for expires. */ + defragKeysCtx *defrag_expires_ctx = zcalloc(sizeof(defragKeysCtx)); + defrag_expires_ctx->kvstate = INIT_KVSTORE_STATE(db->expires); + defrag_expires_ctx->dbid = dbid; + addDefragStage(defragStageExpiresKvstore, freeDefragKeysContext, defrag_expires_ctx); + } + + /* Add stage for pubsub channels. */ + defragPubSubCtx *defrag_pubsub_ctx = zmalloc(sizeof(defragPubSubCtx)); + defrag_pubsub_ctx->kvstate = INIT_KVSTORE_STATE(server.pubsub_channels); + defrag_pubsub_ctx->getPubSubChannels = getClientPubSubChannels; + addDefragStage(defragStagePubsubKvstore, zfree, defrag_pubsub_ctx); + + /* Add stage for pubsubshard channels. */ + defragPubSubCtx *defrag_pubsubshard_ctx = zmalloc(sizeof(defragPubSubCtx)); + defrag_pubsubshard_ctx->kvstate = INIT_KVSTORE_STATE(server.pubsubshard_channels); + defrag_pubsubshard_ctx->getPubSubChannels = getClientPubSubShardChannels; + addDefragStage(defragStagePubsubKvstore, zfree, defrag_pubsubshard_ctx); + + addDefragStage(defragLuaScripts, NULL, NULL); + addDefragStage(defragModuleGlobals, NULL, NULL); + + defrag.current_stage = NULL; + defrag.start_cycle = getMonotonicUs(); + defrag.start_defrag_hits = server.stat_active_defrag_hits; + defrag.start_defrag_misses = server.stat_active_defrag_misses; + defrag.start_frag_pct = getAllocatorFragmentation(NULL); + defrag.timeproc_end_time = 0; + defrag.timeproc_overage_us = 0; + defrag.timeproc_id = aeCreateTimeEvent(server.el, 0, activeDefragTimeProc, NULL, NULL); + + elapsedStart(&server.stat_last_active_defrag_time); +} + +void activeDefragCycle(void) { + if (!server.active_defrag_enabled) return; + + /* Defrag gets paused while a child process is active. So there's no point in starting a new + * cycle or adjusting the CPU percentage for an existing cycle. */ + if (hasActiveChildProcess()) return; + + computeDefragCycles(); + + if (server.active_defrag_running > 0 && !defragIsRunning()) beginDefragCycle(); +} + #else /* HAVE_DEFRAG */ void activeDefragCycle(void) { @@ -1318,4 +1560,7 @@ robj *activeDefragStringOb(robj *ob) { return NULL; } +void defragWhileBlocked(void) { +} + #endif diff --git a/src/kvstore.c b/src/kvstore.c index 6a4d123ad1..fdb9b61a68 100644 --- a/src/kvstore.c +++ b/src/kvstore.c @@ -12,9 +12,15 @@ * Copyright (c) 2011-Present, Redis Ltd. and contributors. * All rights reserved. * + * Copyright (c) 2024-present, Valkey contributors. + * All rights reserved. + * * Licensed under your choice of the Redis Source Available License 2.0 * (RSALv2) or the Server Side Public License v1 (SSPLv1). + * + * Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information. */ + #include "fmacros.h" #include @@ -802,10 +808,14 @@ unsigned long kvstoreDictScanDefrag(kvstore *kvs, int didx, unsigned long v, dic * within dict, it only reallocates the memory used by the dict structure itself using * the provided allocation function. This feature was added for the active defrag feature. * - * The 'defragfn' callback is called with a reference to the dict - * that callback can reallocate. */ -void kvstoreDictLUTDefrag(kvstore *kvs, kvstoreDictLUTDefragFunction *defragfn) { - for (int didx = 0; didx < kvs->num_dicts; didx++) { + * With 16k dictionaries for cluster mode with 1 shard, this operation may require substantial time + * to execute. A "cursor" is used to perform the operation iteratively. When first called, a + * cursor value of 0 should be provided. The return value is an updated cursor which should be + * provided on the next iteration. The operation is complete when 0 is returned. + * + * The 'defragfn' callback is called with a reference to the dict that callback can reallocate. */ +unsigned long kvstoreDictLUTDefrag(kvstore *kvs, unsigned long cursor, kvstoreDictLUTDefragFunction *defragfn) { + for (int didx = cursor; didx < kvs->num_dicts; didx++) { dict **d = kvstoreGetDictRef(kvs, didx), *newd; if (!*d) continue; @@ -818,7 +828,9 @@ void kvstoreDictLUTDefrag(kvstore *kvs, kvstoreDictLUTDefragFunction *defragfn) if (metadata->rehashing_node) metadata->rehashing_node->value = *d; } + return (didx + 1); } + return 0; } uint64_t kvstoreGetHash(kvstore *kvs, const void *key) @@ -1059,13 +1071,14 @@ int kvstoreTest(int argc, char **argv, int flags) { } TEST("Verify that a rehashing dict's node in the rehashing list is correctly updated after defragmentation") { + unsigned long cursor = 0; kvstore *kvs = kvstoreCreate(&KvstoreDictTestType, 0, KVSTORE_ALLOCATE_DICTS_ON_DEMAND); for (i = 0; i < 256; i++) { de = kvstoreDictAddRaw(kvs, 0, stringFromInt(i), NULL); if (listLength(kvs->rehashing)) break; } assert(listLength(kvs->rehashing)); - kvstoreDictLUTDefrag(kvs, defragLUTTestCallback); + while ((cursor = kvstoreDictLUTDefrag(kvs, cursor, defragLUTTestCallback)) != 0) {} while (kvstoreIncrementallyRehash(kvs, 1000)) {} kvstoreRelease(kvs); } diff --git a/src/kvstore.h b/src/kvstore.h index 9e2e4afe0d..8b9fd7348f 100644 --- a/src/kvstore.h +++ b/src/kvstore.h @@ -1,3 +1,16 @@ +/* + * Copyright (c) 2009-Present, Redis Ltd. + * All rights reserved. + * + * Copyright (c) 2024-present, Valkey contributors. + * All rights reserved. + * + * Licensed under your choice of the Redis Source Available License 2.0 + * (RSALv2) or the Server Side Public License v1 (SSPLv1). + * + * Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information. + */ + #ifndef DICTARRAY_H_ #define DICTARRAY_H_ @@ -78,7 +91,7 @@ unsigned int kvstoreDictGetSomeKeys(kvstore *kvs, int didx, dictEntry **des, uns int kvstoreDictExpand(kvstore *kvs, int didx, unsigned long size); unsigned long kvstoreDictScanDefrag(kvstore *kvs, int didx, unsigned long v, dictScanFunction *fn, dictDefragFunctions *defragfns, void *privdata); typedef dict *(kvstoreDictLUTDefragFunction)(dict *d); -void kvstoreDictLUTDefrag(kvstore *kvs, kvstoreDictLUTDefragFunction *defragfn); +unsigned long kvstoreDictLUTDefrag(kvstore *kvs, unsigned long cursor, kvstoreDictLUTDefragFunction *defragfn); void *kvstoreDictFetchValue(kvstore *kvs, int didx, const void *key); dictEntry *kvstoreDictFind(kvstore *kvs, int didx, void *key); dictEntry *kvstoreDictAddRaw(kvstore *kvs, int didx, void *key, dictEntry **existing); diff --git a/src/module.c b/src/module.c index 59ae99e70e..8ecd23862b 100644 --- a/src/module.c +++ b/src/module.c @@ -2,8 +2,13 @@ * Copyright (c) 2016-Present, Redis Ltd. * All rights reserved. * + * Copyright (c) 2024-present, Valkey contributors. + * All rights reserved. + * * Licensed under your choice of the Redis Source Available License 2.0 * (RSALv2) or the Server Side Public License v1 (SSPLv1). + * + * Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information. */ /* -------------------------------------------------------------------------- @@ -13782,7 +13787,7 @@ const char *RM_GetCurrentCommandName(RedisModuleCtx *ctx) { * defrag callback. */ struct RedisModuleDefragCtx { - long long int endtime; + monotime endtime; unsigned long *cursor; struct redisObject *key; /* Optional name of key processed, NULL when unknown. */ int dbid; /* The dbid of the key being processed, -1 when unknown. */ @@ -13821,7 +13826,7 @@ int RM_RegisterDefragCallbacks(RedisModuleCtx *ctx, RedisModuleDefragFunc start, * so it generally makes sense to do small batches of work in between calls. */ int RM_DefragShouldStop(RedisModuleDefragCtx *ctx) { - return (ctx->endtime != 0 && ctx->endtime < ustime()); + return (ctx->endtime != 0 && ctx->endtime <= getMonotonicUs()); } /* Store an arbitrary cursor value for future re-use. @@ -13929,7 +13934,7 @@ RedisModuleString *RM_DefragRedisModuleString(RedisModuleDefragCtx *ctx, RedisMo * Returns a zero value (and initializes the cursor) if no more needs to be done, * or a non-zero value otherwise. */ -int moduleLateDefrag(robj *key, robj *value, unsigned long *cursor, long long endtime, int dbid) { +int moduleLateDefrag(robj *key, robj *value, unsigned long *cursor, monotime endtime, int dbid) { moduleValue *mv = value->ptr; moduleType *mt = mv->type; diff --git a/src/server.c b/src/server.c index 48936ac53e..9e1250ca04 100644 --- a/src/server.c +++ b/src/server.c @@ -1637,25 +1637,7 @@ void whileBlockedCron(void) { mstime_t latency; latencyStartMonitor(latency); - /* In some cases we may be called with big intervals, so we may need to do - * extra work here. This is because some of the functions in serverCron rely - * on the fact that it is performed every 10 ms or so. For instance, if - * activeDefragCycle needs to utilize 25% cpu, it will utilize 2.5ms, so we - * need to call it multiple times. */ - long hz_ms = 1000/server.hz; - while (server.blocked_last_cron < server.mstime) { - - /* Defrag keys gradually. */ - activeDefragCycle(); - - server.blocked_last_cron += hz_ms; - - /* Increment cronloop so that run_with_period works. */ - server.cronloops++; - } - - /* Other cron jobs do not need to be done in a loop. No need to check - * server.blocked_last_cron since we have an early exit at the top. */ + defragWhileBlocked(); /* Update memory stats during loading (excluding blocked scripts) */ if (server.loading) cronUpdateMemoryStats(); @@ -2758,8 +2740,6 @@ void initServer(void) { server.db[j].watched_keys = dictCreate(&keylistDictType); server.db[j].id = j; server.db[j].avg_ttl = 0; - server.db[j].defrag_later = listCreate(); - listSetFreeMethod(server.db[j].defrag_later, sdsfreegeneric); } evictionPoolAlloc(); /* Initialize the LRU keys pool. */ /* Note that server.pubsub_channels was chosen to be a kvstore (with only one dict, which diff --git a/src/server.h b/src/server.h index d65392b8cc..a98f1aa8a2 100644 --- a/src/server.h +++ b/src/server.h @@ -1051,7 +1051,6 @@ typedef struct redisDb { int id; /* Database ID */ long long avg_ttl; /* Average TTL, just for stats */ unsigned long expires_cursor; /* Cursor of the active expire cycle. */ - list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */ } redisDb; /* forward declaration for functions ctx */ @@ -2675,7 +2674,7 @@ size_t moduleGetFreeEffort(robj *key, robj *val, int dbid); size_t moduleGetMemUsage(robj *key, robj *val, size_t sample_size, int dbid); robj *moduleTypeDupOrReply(client *c, robj *fromkey, robj *tokey, int todb, robj *value); int moduleDefragValue(robj *key, robj *obj, int dbid); -int moduleLateDefrag(robj *key, robj *value, unsigned long *cursor, long long endtime, int dbid); +int moduleLateDefrag(robj *key, robj *value, unsigned long *cursor, monotime endtime, int dbid); void moduleDefragGlobals(void); void moduleDefragStart(void); void moduleDefragEnd(void); @@ -3269,6 +3268,7 @@ void enterExecutionUnit(int update_cached_time, long long us); void exitExecutionUnit(void); void resetServerStats(void); void activeDefragCycle(void); +void defragWhileBlocked(void); unsigned int getLRUClock(void); unsigned int LRU_CLOCK(void); const char *evictPolicyToString(void); diff --git a/tests/unit/memefficiency.tcl b/tests/unit/memefficiency.tcl index 92c1f572cf..15b00e767e 100644 --- a/tests/unit/memefficiency.tcl +++ b/tests/unit/memefficiency.tcl @@ -1,3 +1,16 @@ +# +# Copyright (c) 2009-Present, Redis Ltd. +# All rights reserved. +# +# Copyright (c) 2024-present, Valkey contributors. +# All rights reserved. +# +# Licensed under your choice of the Redis Source Available License 2.0 +# (RSALv2) or the Server Side Public License v1 (SSPLv1). +# +# Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information. +# + proc test_memory_efficiency {range} { r flushall set rd [redis_deferring_client] @@ -37,15 +50,19 @@ start_server {tags {"memefficiency external:skip"}} { } run_solo {defrag} { - proc wait_for_defrag_stop {maxtries delay} { + proc wait_for_defrag_stop {maxtries delay {expect_frag 0}} { wait_for_condition $maxtries $delay { - [s active_defrag_running] eq 0 + [s active_defrag_running] eq 0 && ($expect_frag == 0 || [s allocator_frag_ratio] <= $expect_frag) } else { after 120 ;# serverCron only updates the info once in 100ms puts [r info memory] puts [r info stats] puts [r memory malloc-stats] - fail "defrag didn't stop." + if {$expect_frag != 0} { + fail "defrag didn't stop or failed to achieve expected frag ratio ([s allocator_frag_ratio] > $expect_frag)" + } else { + fail "defrag didn't stop." + } } } @@ -102,7 +119,7 @@ run_solo {defrag} { r config set active-defrag-cycle-max 75 # Wait for the active defrag to stop working. - wait_for_defrag_stop 2000 100 + wait_for_defrag_stop 2000 100 1.1 # Test the fragmentation is lower. after 120 ;# serverCron only updates the info once in 100ms @@ -124,7 +141,6 @@ run_solo {defrag} { puts [r latency latest] puts [r latency history active-defrag-cycle] } - assert {$frag < 1.1} # due to high fragmentation, 100hz, and active-defrag-cycle-max set to 75, # we expect max latency to be not much higher than 7.5ms but due to rare slowness threshold is set higher if {!$::no_latency} { @@ -142,6 +158,11 @@ run_solo {defrag} { # reset stats and load the AOF file r config resetstat r config set key-load-delay -25 ;# sleep on average 1/25 usec + # Note: This test is checking if defrag is working DURING AOF loading (while + # timers are not active). So we don't give any extra time, and we deactivate + # defrag immediately after the AOF loading is complete. During loading, + # defrag will get invoked less often, causing starvation prevention. We + # should expect longer latency measurements. r debug loadaof r config set activedefrag no # measure hits and misses right after aof loading @@ -246,7 +267,7 @@ run_solo {defrag} { } # wait for the active defrag to stop working - wait_for_defrag_stop 500 100 + wait_for_defrag_stop 500 100 1.05 # test the fragmentation is lower after 120 ;# serverCron only updates the info once in 100ms @@ -256,7 +277,6 @@ run_solo {defrag} { puts "frag [s allocator_frag_ratio]" puts "frag_bytes [s allocator_frag_bytes]" } - assert_lessthan_equal [s allocator_frag_ratio] 1.05 } # Flush all script to make sure we don't crash after defragging them r script flush sync @@ -362,7 +382,7 @@ run_solo {defrag} { } # wait for the active defrag to stop working - wait_for_defrag_stop 500 100 + wait_for_defrag_stop 500 100 1.1 # test the fragmentation is lower after 120 ;# serverCron only updates the info once in 100ms @@ -384,7 +404,6 @@ run_solo {defrag} { puts [r latency latest] puts [r latency history active-defrag-cycle] } - assert {$frag < 1.1} # due to high fragmentation, 100hz, and active-defrag-cycle-max set to 75, # we expect max latency to be not much higher than 7.5ms but due to rare slowness threshold is set higher if {!$::no_latency} { @@ -464,7 +483,7 @@ run_solo {defrag} { } # wait for the active defrag to stop working - wait_for_defrag_stop 500 100 + wait_for_defrag_stop 500 100 1.05 # test the fragmentation is lower after 120 ;# serverCron only updates the info once in 100ms @@ -474,7 +493,6 @@ run_solo {defrag} { puts "frag [s allocator_frag_ratio]" puts "frag_bytes [s allocator_frag_bytes]" } - assert_lessthan_equal [s allocator_frag_ratio] 1.05 } # Publishes some message to all the pubsub clients to make sure that @@ -572,7 +590,7 @@ run_solo {defrag} { } # wait for the active defrag to stop working - wait_for_defrag_stop 500 100 + wait_for_defrag_stop 500 100 1.5 # test the fragmentation is lower after 120 ;# serverCron only updates the info once in 100ms @@ -582,7 +600,6 @@ run_solo {defrag} { puts "frag [s allocator_frag_ratio]" puts "frag_bytes [s allocator_frag_bytes]" } - assert_lessthan_equal [s allocator_frag_ratio] 1.5 } } @@ -682,7 +699,13 @@ run_solo {defrag} { } # wait for the active defrag to stop working - wait_for_defrag_stop 500 100 + if {$io_threads == 1} { + wait_for_defrag_stop 500 100 1.05 + } else { + # TODO: When multithreading is enabled, argv may be created in the io thread + # and kept in the main thread, which can cause fragmentation to become worse. + wait_for_defrag_stop 500 100 1.1 + } # test the fragmentation is lower after 120 ;# serverCron only updates the info once in 100ms @@ -692,14 +715,6 @@ run_solo {defrag} { puts "frag [s allocator_frag_ratio]" puts "frag_bytes [s allocator_frag_bytes]" } - - if {$io_threads == 1} { - assert_lessthan_equal [s allocator_frag_ratio] 1.05 - } else { - # TODO: When multithreading is enabled, argv may be created in the io thread - # and kept in the main thread, which can cause fragmentation to become worse. - assert_lessthan_equal [s allocator_frag_ratio] 1.1 - } } } @@ -763,7 +778,7 @@ run_solo {defrag} { } # wait for the active defrag to stop working - wait_for_defrag_stop 500 100 + wait_for_defrag_stop 500 100 1.1 # test the fragmentation is lower after 120 ;# serverCron only updates the info once in 100ms @@ -789,7 +804,6 @@ run_solo {defrag} { puts [r latency history active-defrag-cycle] puts [r memory malloc-stats] } - assert {$frag < 1.1} # due to high fragmentation, 100hz, and active-defrag-cycle-max set to 75, # we expect max latency to be not much higher than 7.5ms but due to rare slowness threshold is set higher if {!$::no_latency} { @@ -884,7 +898,7 @@ run_solo {defrag} { } # wait for the active defrag to stop working - wait_for_defrag_stop 500 100 + wait_for_defrag_stop 500 100 1.1 # test the fragmentation is lower after 120 ;# serverCron only updates the info once in 100ms @@ -896,7 +910,6 @@ run_solo {defrag} { puts "hits: $hits" puts "misses: $misses" } - assert {$frag < 1.1} assert {$misses < 10000000} ;# when defrag doesn't stop, we have some 30m misses, when it does, we have 2m misses }