mirror of
https://github.com/redis/redis.git
synced 2026-05-14 03:01:49 -04:00
Add support to defrag ebuckets incrementally (#13842)
In PR #13229, we introduced the ebucket for HFE. Before this PR, when updating eitems stored in ebuckets, the lack of incremental fragmentation support for non-kvstore data structures (until PR #13814) meant that we had to reverse lookup the position of the eitem in the ebucket and then perform the update. This approach was inefficient as it often required frequent traversals of the segment list to locate and update the item. To address this issue, in this PR, This PR implements incremental fragmentation for hash dict ebuckets and server.hexpires. By incrementally defrag the ebuckets, we also perform defragmentation for the associated items, eliminates the need for frequent traversals of the segment list for defragging the eitem. --------- Co-authored-by: Moti Cohen <moticless@gmail.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
This commit is contained in:
183
src/defrag.c
183
src/defrag.c
@@ -117,6 +117,13 @@ typedef struct {
|
||||
} defragKeysCtx;
|
||||
static_assert(offsetof(defragKeysCtx, kvstate) == 0, "defragStageKvstoreHelper requires this");
|
||||
|
||||
/* Context for hexpires */
|
||||
typedef struct {
|
||||
int dbid;
|
||||
ebuckets hexpires;
|
||||
unsigned long cursor;
|
||||
} defragHExpiresCtx;
|
||||
|
||||
/* Context for pubsub kvstores */
|
||||
typedef dict *(*getClientChannelsFn)(client *);
|
||||
typedef struct {
|
||||
@@ -200,6 +207,28 @@ hfield activeDefragHfield(hfield hf) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/* Defrag helper for hfield strings and update the reference in the dict.
|
||||
*
|
||||
* returns NULL in case the allocation wasn't moved.
|
||||
* when it returns a non-null value, the old pointer was already released
|
||||
* and should NOT be accessed. */
|
||||
void *activeDefragHfieldAndUpdateRef(void *ptr, void *privdata) {
|
||||
dict *d = privdata;
|
||||
dictEntryLink link;
|
||||
|
||||
/* Before the key is released, obtain the link to
|
||||
* ensure we can safely access and update the key. */
|
||||
dictUseStoredKeyApi(d, 1);
|
||||
link = dictFindLink(d, ptr, NULL);
|
||||
serverAssert(link);
|
||||
dictUseStoredKeyApi(d, 0);
|
||||
|
||||
hfield newhf = activeDefragHfield(ptr);
|
||||
if (newhf)
|
||||
dictSetKeyAtLink(d, newhf, &link, 0);
|
||||
return newhf;
|
||||
}
|
||||
|
||||
/* Defrag helper for robj and/or string objects with expected refcount.
|
||||
*
|
||||
* Like activeDefragStringOb, but it requires the caller to pass in the expected
|
||||
@@ -384,18 +413,15 @@ void activeDefragSdsDictCallback(void *privdata, const dictEntry *de, dictEntryL
|
||||
void activeDefragHfieldDictCallback(void *privdata, const dictEntry *de, dictEntryLink plink) {
|
||||
UNUSED(plink);
|
||||
dict *d = privdata;
|
||||
hfield newhf, hf = dictGetKey(de);
|
||||
hfield newhf = NULL, hf = dictGetKey(de);
|
||||
|
||||
/* If the hfield does not have TTL, we directly defrag it.
|
||||
* Fields with TTL are skipped here and will be defragmented later
|
||||
* during the hash expiry ebuckets defragmentation phase. */
|
||||
if (hfieldGetExpireTime(hf) == EB_EXPIRE_TIME_INVALID) {
|
||||
/* If the hfield does not have TTL, we directly defrag it. */
|
||||
newhf = activeDefragHfield(hf);
|
||||
} else {
|
||||
/* Update its reference in the ebucket while defragging it. */
|
||||
ebuckets *eb = hashTypeGetDictMetaHFE(d);
|
||||
newhf = ebDefragItem(eb, &hashFieldExpireBucketsType, hf, (ebDefragFunction *)activeDefragHfield);
|
||||
if ((newhf = activeDefragHfield(hf)))
|
||||
dictSetKey(d, (dictEntry *)de, newhf);
|
||||
}
|
||||
|
||||
if (newhf) dictSetKey(d, (dictEntry *) de, newhf);
|
||||
}
|
||||
|
||||
/* Defrag a dict with sds key and optional value (either ptr, sds or robj string) */
|
||||
@@ -428,6 +454,19 @@ void activeDefragHfieldDict(dict *d) {
|
||||
cursor = dictScanDefrag(d, cursor, activeDefragHfieldDictCallback,
|
||||
&defragfns, d);
|
||||
} while (cursor != 0);
|
||||
|
||||
/* Continue with defragmentation of hash fields that have with TTL.
|
||||
* During the dictionary defragmentaion above, we skipped fields with TTL,
|
||||
* Now we continue to defrag those fields by using the expiry buckets. */
|
||||
if (d->type == &mstrHashDictTypeWithHFE) {
|
||||
cursor = 0;
|
||||
ebDefragFunctions eb_defragfns = {
|
||||
.defragAlloc = activeDefragAlloc,
|
||||
.defragItem = activeDefragHfieldAndUpdateRef
|
||||
};
|
||||
ebuckets *eb = hashTypeGetDictMetaHFE(d);
|
||||
while (ebScanDefrag(eb, &hashFieldExpireBucketsType, &cursor, &eb_defragfns, d)) {}
|
||||
}
|
||||
}
|
||||
|
||||
/* Defrag a list of ptr, sds or robj string values */
|
||||
@@ -555,12 +594,46 @@ void scanLaterSet(robj *ob, unsigned long *cursor) {
|
||||
void scanLaterHash(robj *ob, unsigned long *cursor) {
|
||||
serverAssert(ob->type == OBJ_HASH && ob->encoding == OBJ_ENCODING_HT);
|
||||
dict *d = ob->ptr;
|
||||
dictDefragFunctions defragfns = {
|
||||
.defragAlloc = activeDefragAlloc,
|
||||
.defragKey = NULL, /* Will be defragmented in activeDefragHfieldDictCallback. */
|
||||
.defragVal = (dictDefragAllocFunction *)activeDefragSds
|
||||
};
|
||||
*cursor = dictScanDefrag(d, *cursor, activeDefragHfieldDictCallback, &defragfns, d);
|
||||
|
||||
typedef enum {
|
||||
HASH_DEFRAG_NONE = 0,
|
||||
HASH_DEFRAG_DICT = 1,
|
||||
HASH_DEFRAG_EBUCKETS = 2
|
||||
} hashDefragPhase;
|
||||
static hashDefragPhase defrag_phase = HASH_DEFRAG_NONE;
|
||||
|
||||
/* Start a new hash defrag. */
|
||||
if (!*cursor || defrag_phase == HASH_DEFRAG_NONE)
|
||||
defrag_phase = HASH_DEFRAG_DICT;
|
||||
|
||||
/* Defrag hash dictionary but skip TTL fields. */
|
||||
if (defrag_phase == HASH_DEFRAG_DICT) {
|
||||
dictDefragFunctions defragfns = {
|
||||
.defragAlloc = activeDefragAlloc,
|
||||
.defragKey = NULL, /* Will be defragmented in activeDefragHfieldDictCallback. */
|
||||
.defragVal = (dictDefragAllocFunction *)activeDefragSds
|
||||
};
|
||||
*cursor = dictScanDefrag(d, *cursor, activeDefragHfieldDictCallback, &defragfns, d);
|
||||
|
||||
/* Move to next phase. */
|
||||
if (!*cursor) defrag_phase = HASH_DEFRAG_EBUCKETS;
|
||||
}
|
||||
|
||||
/* Defrag ebuckets and TTL fields. */
|
||||
if (defrag_phase == HASH_DEFRAG_EBUCKETS) {
|
||||
if (d->type == &mstrHashDictTypeWithHFE) {
|
||||
ebDefragFunctions eb_defragfns = {
|
||||
.defragAlloc = activeDefragAlloc,
|
||||
.defragItem = activeDefragHfieldAndUpdateRef
|
||||
};
|
||||
ebuckets *eb = hashTypeGetDictMetaHFE(d);
|
||||
ebScanDefrag(eb, &hashFieldExpireBucketsType, cursor, &eb_defragfns, d);
|
||||
} else {
|
||||
/* Finish defragmentation if this dict doesn't have expired fields. */
|
||||
*cursor = 0;
|
||||
}
|
||||
if (!*cursor) defrag_phase = HASH_DEFRAG_NONE;
|
||||
}
|
||||
}
|
||||
|
||||
void defragQuicklist(defragKeysCtx *ctx, kvobj *kv) {
|
||||
@@ -630,7 +703,8 @@ void defragSet(defragKeysCtx *ctx, kvobj *ob) {
|
||||
|
||||
/* Defrag callback for radix tree iterator, called for each node,
|
||||
* used in order to defrag the nodes allocations. */
|
||||
int defragRaxNode(raxNode **noderef) {
|
||||
int defragRaxNode(raxNode **noderef, void *privdata) {
|
||||
UNUSED(privdata);
|
||||
raxNode *newnode = activeDefragAlloc(*noderef);
|
||||
if (newnode) {
|
||||
*noderef = newnode;
|
||||
@@ -650,7 +724,7 @@ int scanLaterStreamListpacks(robj *ob, unsigned long *cursor, monotime endtime)
|
||||
raxStart(&ri,s->rax);
|
||||
if (*cursor == 0) {
|
||||
/* if cursor is 0, we start new iteration */
|
||||
defragRaxNode(&s->rax->head);
|
||||
defragRaxNode(&s->rax->head, NULL);
|
||||
/* assign the iterator node callback before the seek, so that the
|
||||
* initial nodes that are processed till the first item are covered */
|
||||
ri.node_cb = defragRaxNode;
|
||||
@@ -714,7 +788,7 @@ void defragRadixTree(rax **raxref, int defrag_data, raxDefragFunction *element_c
|
||||
rax = *raxref;
|
||||
raxStart(&ri,rax);
|
||||
ri.node_cb = defragRaxNode;
|
||||
defragRaxNode(&rax->head);
|
||||
defragRaxNode(&rax->head, NULL);
|
||||
raxSeek(&ri,"^",NULL,0);
|
||||
while (raxNext(&ri)) {
|
||||
void *newdata = NULL;
|
||||
@@ -808,8 +882,9 @@ void defragModule(defragKeysCtx *ctx, redisDb *db, kvobj *kv) {
|
||||
/* for each key we scan in the main dict, this function will attempt to defrag
|
||||
* all the various pointers it has. */
|
||||
void defragKey(defragKeysCtx *ctx, dictEntry *de, dictEntryLink link) {
|
||||
UNUSED(link);
|
||||
dictEntryLink exlink = NULL;
|
||||
kvobj *kvnew, *ob = dictGetKV(de);
|
||||
kvobj *kvnew = NULL, *ob = dictGetKV(de);
|
||||
redisDb *db = &server.db[ctx->dbid];
|
||||
int slot = ctx->kvstate.slot;
|
||||
unsigned char *newzl;
|
||||
@@ -823,12 +898,9 @@ void defragKey(defragKeysCtx *ctx, dictEntry *de, dictEntryLink link) {
|
||||
serverAssert(exlink != NULL);
|
||||
}
|
||||
|
||||
/* Try to defrag robj and / or string value. */
|
||||
if (unlikely(ob->type == OBJ_HASH && hashTypeGetMinExpire(ob, 0) != EB_EXPIRE_TIME_INVALID)) {
|
||||
/* Update its reference in the ebucket while defragging it. */
|
||||
kvnew = ebDefragItem(&db->hexpires, &hashExpireBucketsType, ob,
|
||||
(ebDefragFunction *)activeDefragStringOb);
|
||||
} else {
|
||||
/* Try to defrag robj and/or string value. For hash objects with HFEs,
|
||||
* defer defragmentation until processing db's hexpires. */
|
||||
if (!(ob->type == OBJ_HASH && hashTypeGetMinExpire(ob, 0) != EB_EXPIRE_TIME_INVALID)) {
|
||||
/* If the dict doesn't have metadata, we directly defrag it. */
|
||||
kvnew = activeDefragStringOb(ob);
|
||||
}
|
||||
@@ -1183,6 +1255,61 @@ static doneStatus defragStageExpiresKvstore(void *ctx, monotime endtime) {
|
||||
scanCallbackCountScanned, NULL, &defragfns);
|
||||
}
|
||||
|
||||
/* Defragment hash object with HFE and update its reference in the DB keys. */
|
||||
void *activeDefragHExpiresOB(void *ptr, void *privdata) {
|
||||
redisDb *db = privdata;
|
||||
dictEntryLink link, exlink = NULL;
|
||||
kvobj *kvobj = ptr;
|
||||
sds keystr = kvobjGetKey(kvobj);
|
||||
unsigned int slot = calculateKeySlot(keystr);
|
||||
serverAssert(kvobj->type == OBJ_HASH);
|
||||
|
||||
long long expire = kvobjGetExpire(kvobj);
|
||||
/* We can't search in db->expires for that KV after we've released
|
||||
* the pointer it holds, since it won't be able to do the string
|
||||
* compare. Search it before, if needed. */
|
||||
if (expire != -1) {
|
||||
exlink = kvstoreDictFindLink(db->expires, slot, kvobjGetKey(kvobj), NULL);
|
||||
serverAssert(exlink != NULL);
|
||||
}
|
||||
|
||||
if ((kvobj = activeDefragAlloc(kvobj))) {
|
||||
/* Update its reference in the DB keys. */
|
||||
link = kvstoreDictFindLink(db->keys, slot, keystr, NULL);
|
||||
serverAssert(link != NULL);
|
||||
kvstoreDictSetAtLink(db->keys, slot, kvobj, &link, 0);
|
||||
if (expire != -1)
|
||||
kvstoreDictSetAtLink(db->expires, slot, kvobj, &exlink, 0);
|
||||
}
|
||||
return kvobj;
|
||||
}
|
||||
|
||||
static doneStatus defragStageHExpires(void *ctx, monotime endtime) {
|
||||
unsigned int iterations = 0;
|
||||
defragHExpiresCtx *defrag_hexpires_ctx = ctx;
|
||||
redisDb *db = &server.db[defrag_hexpires_ctx->dbid];
|
||||
if (db->hexpires != defrag_hexpires_ctx->hexpires) {
|
||||
/* There has been a change of the kvs (flushdb, swapdb, etc.). Just complete the stage. */
|
||||
return DEFRAG_DONE;
|
||||
}
|
||||
|
||||
ebDefragFunctions eb_defragfns = {
|
||||
.defragAlloc = activeDefragAlloc,
|
||||
.defragItem = activeDefragHExpiresOB
|
||||
};
|
||||
while (1) {
|
||||
if (!ebScanDefrag(&db->hexpires, &hashExpireBucketsType, &defrag_hexpires_ctx->cursor, &eb_defragfns, db))
|
||||
return DEFRAG_DONE;
|
||||
|
||||
if (++iterations > 16) {
|
||||
if (getMonotonicUs() >= endtime) break;
|
||||
iterations = 0;
|
||||
}
|
||||
}
|
||||
|
||||
return DEFRAG_NOT_DONE;
|
||||
}
|
||||
|
||||
static doneStatus defragStagePubsubKvstore(void *ctx, monotime endtime) {
|
||||
static dictDefragFunctions defragfns = {
|
||||
.defragAlloc = activeDefragAlloc,
|
||||
@@ -1510,6 +1637,12 @@ static void beginDefragCycle(void) {
|
||||
defrag_expires_ctx->kvstate = INIT_KVSTORE_STATE(db->expires);
|
||||
defrag_expires_ctx->dbid = dbid;
|
||||
addDefragStage(defragStageExpiresKvstore, freeDefragKeysContext, defrag_expires_ctx);
|
||||
|
||||
/* Add stage for hexpires. */
|
||||
defragHExpiresCtx *defrag_hexpires_ctx = zcalloc(sizeof(defragHExpiresCtx));
|
||||
defrag_hexpires_ctx->hexpires = db->hexpires;
|
||||
defrag_hexpires_ctx->dbid = dbid;
|
||||
addDefragStage(defragStageHExpires, zfree, defrag_hexpires_ctx);
|
||||
}
|
||||
|
||||
/* Add stage for pubsub channels. */
|
||||
|
||||
273
src/ebuckets.c
273
src/ebuckets.c
@@ -10,6 +10,7 @@
|
||||
#include <stddef.h>
|
||||
#include <stdlib.h>
|
||||
#include <inttypes.h>
|
||||
#include <string.h>
|
||||
#include "zmalloc.h"
|
||||
#include "redisassert.h"
|
||||
#include "config.h"
|
||||
@@ -1807,66 +1808,190 @@ void ebValidate(ebuckets eb, EbucketsType *type) {
|
||||
ebValidateRax(ebGetRaxPtr(eb), type);
|
||||
}
|
||||
|
||||
/* Reallocates the memory used by the item using the provided allocation function.
|
||||
* This feature was added for the active defrag feature.
|
||||
*
|
||||
* The 'defragfn' callbacks are called with a pointer to memory that callback
|
||||
* can reallocate. The callbacks should return a new memory address or NULL,
|
||||
* where NULL means that no reallocation happened and the old memory is still valid.
|
||||
*
|
||||
* Note: It is the caller's responsibility to ensure that the item has a valid expire time. */
|
||||
eItem ebDefragItem(ebuckets *eb, EbucketsType *type, eItem item, ebDefragFunction *defragfn) {
|
||||
assert(!ebIsEmpty(*eb));
|
||||
if (ebIsList(*eb)) {
|
||||
ExpireMeta *prevem = NULL;
|
||||
eItem curitem = ebGetListPtr(type, *eb);
|
||||
while (curitem != NULL) {
|
||||
if (curitem == item) {
|
||||
if ((curitem = defragfn(curitem))) {
|
||||
if (prevem)
|
||||
prevem->next = curitem;
|
||||
else
|
||||
*eb = ebMarkAsList(curitem);
|
||||
}
|
||||
return curitem;
|
||||
}
|
||||
|
||||
/* Move to the next item in the list. */
|
||||
prevem = type->getExpireMeta(curitem);
|
||||
curitem = prevem->next;
|
||||
}
|
||||
} else {
|
||||
CommonSegHdr *currHdr;
|
||||
ExpireMeta *mIter = type->getExpireMeta(item);
|
||||
assert(mIter->trash != 1);
|
||||
while (mIter->lastInSegment == 0)
|
||||
mIter = type->getExpireMeta(mIter->next);
|
||||
|
||||
if (mIter->lastItemBucket)
|
||||
currHdr = (CommonSegHdr *) mIter->next;
|
||||
else
|
||||
currHdr = (CommonSegHdr *) ((NextSegHdr *) mIter->next)->prevSeg;
|
||||
/* If the item is the first in the segment, then update the segment header */
|
||||
if (currHdr->head == item) {
|
||||
if ((item = defragfn(item))) {
|
||||
currHdr->head = item;
|
||||
}
|
||||
return item;
|
||||
}
|
||||
|
||||
/* Iterate over all items in the segment until the next is 'item' */
|
||||
ExpireMeta *mHead = type->getExpireMeta(currHdr->head);
|
||||
mIter = mHead;
|
||||
while (mIter->next != item)
|
||||
mIter = type->getExpireMeta(mIter->next);
|
||||
assert(mIter->next == item);
|
||||
|
||||
if ((item = defragfn(item))) {
|
||||
mIter->next = item;
|
||||
}
|
||||
return item;
|
||||
/* Defrag callback for radix tree iterator, called for each node,
|
||||
* used in order to defrag the nodes allocations. */
|
||||
int ebDefragRaxNode(raxNode **noderef, void *privdata) {
|
||||
ebDefragFunctions *defragfns = privdata;
|
||||
raxNode *newnode = defragfns->defragAlloc(*noderef);
|
||||
if (newnode) {
|
||||
*noderef = newnode;
|
||||
return 1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* Defragments items in list-based bucket. */
|
||||
void ebDefragList(ebuckets *eb, EbucketsType *type, ebDefragFunctions *defragfns, void *privdata) {
|
||||
ExpireMeta *previtem = NULL;
|
||||
eItem newitem, curitem = ebGetListPtr(type, *eb);
|
||||
while (curitem != NULL) {
|
||||
if ((newitem = defragfns->defragItem(curitem, privdata))) {
|
||||
curitem = newitem;
|
||||
if (previtem) {
|
||||
previtem->next = curitem;
|
||||
} else {
|
||||
*eb = ebMarkAsList(curitem);
|
||||
}
|
||||
}
|
||||
/* Move to the next item in the list. */
|
||||
previtem = type->getExpireMeta(curitem);
|
||||
curitem = previtem->next;
|
||||
}
|
||||
}
|
||||
|
||||
/* Defragments a single bucket in rax, including its segments and items. */
|
||||
void ebDefragRaxBucket(EbucketsType *type, raxIterator *ri,
|
||||
ebDefragFunctions *defragfns, void *privdata)
|
||||
{
|
||||
CommonSegHdr *currentSegHdr = ri->data;
|
||||
eItem iter = ((FirstSegHdr*)currentSegHdr)->head;
|
||||
ExpireMeta *mHead = type->getExpireMeta(iter);
|
||||
ExpireMeta *prevSegLastItem = NULL; /* The last item of the previous segment */
|
||||
|
||||
while (1) {
|
||||
unsigned int numItems = mHead->numItems;
|
||||
assert(numItems); /* Avoid compiler warning with old build chain. */
|
||||
ExpireMeta *prevIter = NULL;
|
||||
ExpireMeta *mIter = NULL;
|
||||
|
||||
for (unsigned int i = 0; i < numItems; ++i) {
|
||||
eItem newiter = defragfns->defragItem(iter, privdata);
|
||||
if (newiter) {
|
||||
iter = newiter;
|
||||
|
||||
if (prevIter == NULL) {
|
||||
/* If this is the first item in the segment, update the segment
|
||||
* header to point to the new item location. */
|
||||
currentSegHdr->head = iter;
|
||||
} else {
|
||||
/* Update the previous item's next pointer to point to the newly defragmented item */
|
||||
prevIter->next = iter;
|
||||
}
|
||||
}
|
||||
mIter = type->getExpireMeta(iter);
|
||||
prevIter = mIter;
|
||||
iter = mIter->next;
|
||||
}
|
||||
|
||||
/* Try to defragment the current segment. */
|
||||
CommonSegHdr *newSegHdr = defragfns->defragAlloc(currentSegHdr);
|
||||
if (newSegHdr) {
|
||||
if (currentSegHdr == ri->data) {
|
||||
/* If it's the first segment, update the rax data pointer. */
|
||||
raxSetData(ri->node, ri->data=newSegHdr);
|
||||
} else {
|
||||
/* For non-first segments, update the previous segment's next
|
||||
* item to new pointer. */
|
||||
prevSegLastItem->next = newSegHdr;
|
||||
}
|
||||
currentSegHdr = newSegHdr;
|
||||
}
|
||||
|
||||
/* Remember last item in this segment for next iteration */
|
||||
prevSegLastItem = mIter;
|
||||
|
||||
if (mIter->lastItemBucket) {
|
||||
/* The last eitem needs to point back to the segment. */
|
||||
if (newSegHdr) mIter->next = currentSegHdr;
|
||||
break;
|
||||
}
|
||||
|
||||
NextSegHdr *nextSegHdr = mIter->next;
|
||||
if (newSegHdr) {
|
||||
/* Update next segment's prev to point to the defragmented segment. */
|
||||
nextSegHdr->prevSeg = newSegHdr;
|
||||
}
|
||||
|
||||
/* Update pointers for next segment iteration */
|
||||
iter = nextSegHdr->head;
|
||||
mHead = type->getExpireMeta(iter);
|
||||
currentSegHdr = (CommonSegHdr *)nextSegHdr;
|
||||
}
|
||||
}
|
||||
|
||||
/* Defragments items in rax-based bucket.
|
||||
* returns 0 if no more work needs to be been done, and 1 if more work is needed. */
|
||||
int ebDefragRax(ebuckets *eb, EbucketsType *type, unsigned long *cursor,
|
||||
ebDefragFunctions *defragfns, void *privdata)
|
||||
{
|
||||
rax *newrax, *rax = ebGetRaxPtr(*eb);
|
||||
raxIterator ri;
|
||||
static unsigned char next[EB_KEY_SIZE];
|
||||
|
||||
/* defrag the rax struct */
|
||||
if (!*cursor) {
|
||||
if ((newrax = defragfns->defragAlloc(rax))) {
|
||||
*eb = newrax;
|
||||
rax = newrax;
|
||||
}
|
||||
}
|
||||
|
||||
raxStart(&ri,rax);
|
||||
if (!*cursor) {
|
||||
ebDefragRaxNode(&rax->head, defragfns);
|
||||
/* assign the iterator node callback before the seek, so that the
|
||||
* initial nodes that are processed till the first item are covered */
|
||||
ri.node_cb = ebDefragRaxNode;
|
||||
ri.privdata = defragfns;
|
||||
raxSeek(&ri, "^", NULL, 0);
|
||||
} else {
|
||||
/* if cursor is non-zero, we seek to the static 'next'.
|
||||
* Since node_cb is set after seek operation, any node traversed during seek wouldn't
|
||||
* be defragmented. To prevent this, we advance to next node before exiting previous
|
||||
* run, ensuring it gets defragmented instead of being skipped during current seek. */
|
||||
if (!raxSeek(&ri, ">=", next, EB_KEY_SIZE)) {
|
||||
*cursor = 0;
|
||||
raxStop(&ri);
|
||||
return 0;
|
||||
}
|
||||
/* assign the iterator node callback after the seek, so that the
|
||||
* initial nodes that are processed till now aren't covered */
|
||||
ri.node_cb = ebDefragRaxNode;
|
||||
ri.privdata = defragfns;
|
||||
}
|
||||
|
||||
/* Defrag the bucket in the rax node. */
|
||||
assert(raxNext(&ri));
|
||||
ebDefragRaxBucket(type, &ri, defragfns, privdata);
|
||||
|
||||
/* Move to next node. */
|
||||
if (!raxNext(&ri)) {
|
||||
/* If we reached the end, we can stop. */
|
||||
*cursor = 0;
|
||||
raxStop(&ri);
|
||||
return 0;
|
||||
}
|
||||
|
||||
(*cursor)++;
|
||||
assert(ri.key_len == sizeof(next));
|
||||
memcpy(next, ri.key, ri.key_len);
|
||||
raxStop(&ri);
|
||||
return 1;
|
||||
}
|
||||
|
||||
/* Reallocates the memory used by ebucket components (segments and items)
|
||||
* using the provided allocation functions. This feature was added for
|
||||
* the active defrag feature.
|
||||
*
|
||||
* The 'defragfns' callbacks are called with a pointer to memory that callback
|
||||
* can reallocate. The callbacks should return a new memory address or NULL,
|
||||
* where NULL means that no reallocation happened and the old memory is still
|
||||
* valid. */
|
||||
int ebScanDefrag(ebuckets *eb, EbucketsType *type, unsigned long *cursor,
|
||||
ebDefragFunctions *defragfns, void *privdata)
|
||||
{
|
||||
if (ebIsEmpty(*eb)) {
|
||||
*cursor = 0;
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (ebIsList(*eb)) {
|
||||
ebDefragList(eb, type, defragfns, privdata);
|
||||
*cursor = 0;
|
||||
return 0;
|
||||
} else {
|
||||
return ebDefragRax(eb, type, cursor, defragfns, privdata);
|
||||
}
|
||||
redis_unreachable();
|
||||
}
|
||||
|
||||
/* Retrieves the expiration time associated with the given item. If associated
|
||||
@@ -2166,11 +2291,21 @@ void distributeTest(int lowestTime,
|
||||
#define UNUSED(x) (void)(x)
|
||||
#define ARRAY_SIZE(arr) (sizeof(arr) / sizeof((arr)[0]))
|
||||
|
||||
eItem defragCallback(const eItem item) {
|
||||
size_t size = zmalloc_usable_size(item);
|
||||
eItem newitem = zmalloc(size);
|
||||
memcpy(newitem, item, size);
|
||||
zfree(item);
|
||||
void *defragCallback(void *ptr) {
|
||||
size_t size = zmalloc_usable_size(ptr);
|
||||
void *newitem = zmalloc(size);
|
||||
memcpy(newitem, ptr, size);
|
||||
zfree(ptr);
|
||||
return newitem;
|
||||
}
|
||||
|
||||
void *defragItemCallback(void *ptr, void *privdata) {
|
||||
MyItem *item = ptr;
|
||||
MyItem **items = privdata;
|
||||
int index = item->index;
|
||||
void *newitem = defragCallback(ptr);
|
||||
if (newitem)
|
||||
items[index] = newitem;
|
||||
return newitem;
|
||||
}
|
||||
|
||||
@@ -2561,10 +2696,12 @@ int ebucketsTest(int argc, char **argv, int flags) {
|
||||
}
|
||||
assert((s <= EB_LIST_MAX_ITEMS) ? ebIsList(eb) : !ebIsList(eb));
|
||||
/* Defrag all the items. */
|
||||
for (int i = 0; i < s; i++) {
|
||||
MyItem *newitem = ebDefragItem(&eb, &myEbucketsType, items[i], defragCallback);
|
||||
if (newitem) items[i] = newitem;
|
||||
}
|
||||
unsigned long cursor = 0;
|
||||
ebDefragFunctions defragfns = {
|
||||
.defragAlloc = defragCallback,
|
||||
.defragItem = defragItemCallback,
|
||||
};
|
||||
while (ebScanDefrag(&eb, &myEbucketsType, &cursor, &defragfns, items)) {}
|
||||
/* Verify that the data is not corrupted. */
|
||||
ebValidate(eb, &myEbucketsType);
|
||||
for (int i = 0; i < s; i++)
|
||||
|
||||
@@ -270,6 +270,13 @@ typedef struct EbucketsIterator {
|
||||
uint64_t itemsCurrBucket; /* Number of items in current bucket. */
|
||||
} EbucketsIterator;
|
||||
|
||||
typedef void *(ebDefragAllocFunction)(void *ptr);
|
||||
typedef void *(ebDefragAllocItemFunction)(void *ptr, void *privdata);
|
||||
typedef struct {
|
||||
ebDefragAllocFunction *defragAlloc; /* Used for rax nodes, segment etc. */
|
||||
ebDefragAllocItemFunction *defragItem; /* Defrag-realloc eitem */
|
||||
} ebDefragFunctions;
|
||||
|
||||
/* ebuckets API */
|
||||
|
||||
static inline ebuckets ebCreate(void) { return NULL; } /* Empty ebuckets */
|
||||
@@ -304,8 +311,8 @@ int ebNext(EbucketsIterator *iter);
|
||||
|
||||
int ebNextBucket(EbucketsIterator *iter);
|
||||
|
||||
typedef eItem (ebDefragFunction)(const eItem item);
|
||||
eItem ebDefragItem(ebuckets *eb, EbucketsType *type, eItem item, ebDefragFunction *fn);
|
||||
int ebScanDefrag(ebuckets *eb, EbucketsType *type, unsigned long *cursor,
|
||||
ebDefragFunctions *defragfns, void *privdata);
|
||||
|
||||
static inline uint64_t ebGetMetaExpTime(ExpireMeta *expMeta) {
|
||||
return (((uint64_t)(expMeta)->expireTimeHi << 32) | (expMeta)->expireTimeLo);
|
||||
|
||||
@@ -13996,7 +13996,8 @@ RedisModuleString *RM_DefragRedisModuleString(RedisModuleDefragCtx *ctx, RedisMo
|
||||
|
||||
/* Defrag callback for radix tree iterator, called for each node,
|
||||
* used in order to defrag the nodes allocations. */
|
||||
int moduleDefragRaxNode(raxNode **noderef) {
|
||||
int moduleDefragRaxNode(raxNode **noderef, void *privdata) {
|
||||
UNUSED(privdata);
|
||||
raxNode *newnode = activeDefragAlloc(*noderef);
|
||||
if (newnode) {
|
||||
*noderef = newnode;
|
||||
@@ -14034,7 +14035,7 @@ RedisModuleDict *RM_DefragRedisModuleDict(RedisModuleDefragCtx *ctx, RedisModule
|
||||
raxStart(&ri,dict->rax);
|
||||
if (*seekTo == NULL) {
|
||||
/* if last seek is NULL, we start new iteration */
|
||||
moduleDefragRaxNode(&dict->rax->head);
|
||||
moduleDefragRaxNode(&dict->rax->head, NULL);
|
||||
/* assign the iterator node callback before the seek, so that the
|
||||
* initial nodes that are processed till the first item are covered */
|
||||
ri.node_cb = moduleDefragRaxNode;
|
||||
|
||||
@@ -1270,6 +1270,7 @@ void raxStart(raxIterator *it, rax *rt) {
|
||||
it->key_max = RAX_ITER_STATIC_LEN;
|
||||
it->data = NULL;
|
||||
it->node_cb = NULL;
|
||||
it->privdata = NULL;
|
||||
raxStackInit(&it->stack);
|
||||
}
|
||||
|
||||
@@ -1346,7 +1347,7 @@ int raxIteratorNextStep(raxIterator *it, int noup) {
|
||||
memcpy(&it->node,cp,sizeof(it->node));
|
||||
/* Call the node callback if any, and replace the node pointer
|
||||
* if the callback returns true. */
|
||||
if (it->node_cb && it->node_cb(&it->node))
|
||||
if (it->node_cb && it->node_cb(&it->node, it->privdata))
|
||||
memcpy(cp,&it->node,sizeof(it->node));
|
||||
/* For "next" step, stop every time we find a key along the
|
||||
* way, since the key is lexicographically smaller compared to
|
||||
@@ -1402,7 +1403,7 @@ int raxIteratorNextStep(raxIterator *it, int noup) {
|
||||
memcpy(&it->node,cp,sizeof(it->node));
|
||||
/* Call the node callback if any, and replace the node
|
||||
* pointer if the callback returns true. */
|
||||
if (it->node_cb && it->node_cb(&it->node))
|
||||
if (it->node_cb && it->node_cb(&it->node, it->privdata))
|
||||
memcpy(cp,&it->node,sizeof(it->node));
|
||||
if (it->node->iskey) {
|
||||
it->data = raxGetData(it->node);
|
||||
|
||||
@@ -143,7 +143,7 @@ typedef struct raxStack {
|
||||
* Redis application for this callback).
|
||||
*
|
||||
* This is currently only supported in forward iterations (raxNext) */
|
||||
typedef int (*raxNodeCallback)(raxNode **noderef);
|
||||
typedef int (*raxNodeCallback)(raxNode **noderef, void *privdata);
|
||||
|
||||
/* Radix tree iterator state is encapsulated into this data structure. */
|
||||
#define RAX_ITER_STATIC_LEN 128
|
||||
@@ -164,6 +164,7 @@ typedef struct raxIterator {
|
||||
raxNode *node; /* Current node. Only for unsafe iteration. */
|
||||
raxStack stack; /* Stack used for unsafe iteration. */
|
||||
raxNodeCallback node_cb; /* Optional node callback. Normally set to NULL. */
|
||||
void *privdata; /* Optional private data for node callback. */
|
||||
} raxIterator;
|
||||
|
||||
/* Exported API. */
|
||||
|
||||
@@ -297,7 +297,11 @@ run_solo {defrag} {
|
||||
r config set maxmemory 0
|
||||
r config set list-max-ziplist-size 5 ;# list of 10k items will have 2000 quicklist nodes
|
||||
r config set stream-node-max-entries 5
|
||||
r hmset hash h1 v1 h2 v2 h3 v3
|
||||
r config set hash-max-listpack-entries 10
|
||||
r hmset hash_lp h1 v1 h2 v2 h3 v3
|
||||
assert_encoding listpack hash_lp
|
||||
r hmset hash_ht h1 v1 h2 v2 h3 v3 h4 v4 h5 v5 h6 v6 h7 v7 h8 v8 h9 v9 h10 v10 h11 v11
|
||||
assert_encoding hashtable hash_ht
|
||||
r lpush list a b c d
|
||||
r zadd zset 0 a 1 b 2 c 3 d
|
||||
r sadd set a b c d
|
||||
@@ -347,7 +351,7 @@ run_solo {defrag} {
|
||||
for {set j 0} {$j < 500000} {incr j} {
|
||||
$rd read ; # Discard replies
|
||||
}
|
||||
assert_equal [r dbsize] 500015
|
||||
assert_equal [r dbsize] 500016
|
||||
|
||||
# create some fragmentation
|
||||
for {set j 0} {$j < 500000} {incr j 2} {
|
||||
@@ -356,7 +360,7 @@ run_solo {defrag} {
|
||||
for {set j 0} {$j < 500000} {incr j 2} {
|
||||
$rd read ; # Discard replies
|
||||
}
|
||||
assert_equal [r dbsize] 250015
|
||||
assert_equal [r dbsize] 250016
|
||||
|
||||
# start defrag
|
||||
after 120 ;# serverCron only updates the info once in 100ms
|
||||
@@ -512,15 +516,14 @@ run_solo {defrag} {
|
||||
$rd_pubsub close
|
||||
}
|
||||
|
||||
test "Active Defrag HFE: $type" {
|
||||
foreach {eb_container fields n} {eblist 16 3000 ebrax 30 1600 large_ebrax 1600 30} {
|
||||
test "Active Defrag HFE with $eb_container: $type" {
|
||||
r flushdb
|
||||
r config set hz 100
|
||||
r config set activedefrag no
|
||||
wait_for_defrag_stop 500 100
|
||||
r config resetstat
|
||||
# TODO: Lower the threshold after defraging the ebuckets.
|
||||
# Now just to ensure that the reference is updated correctly.
|
||||
r config set active-defrag-threshold-lower 12
|
||||
r config set active-defrag-threshold-lower 7
|
||||
r config set active-defrag-cycle-min 65
|
||||
r config set active-defrag-cycle-max 75
|
||||
r config set active-defrag-ignore-bytes 1500kb
|
||||
@@ -529,26 +532,29 @@ run_solo {defrag} {
|
||||
r config set hash-max-listpack-entries 10
|
||||
|
||||
# Populate memory with interleaving hash field of same size
|
||||
set n 3000
|
||||
set fields 16 ;# make all the fields in an eblist.
|
||||
set dummy_field "[string repeat x 400]"
|
||||
set rd [redis_deferring_client]
|
||||
for {set i 0} {$i < $n} {incr i} {
|
||||
for {set j 0} {$j < $fields} {incr j} {
|
||||
$rd hset h$i f$j $dummy_field
|
||||
$rd hexpire h$i 9999999 FIELDS 1 f$j
|
||||
$rd hset h$i $dummy_field$j v
|
||||
$rd hexpire h$i 9999999 FIELDS 1 $dummy_field$j
|
||||
$rd set "k$i$j" $dummy_field
|
||||
}
|
||||
$rd expire h$i 9999999 ;# Ensure expire is updated after kvobj reallocation
|
||||
}
|
||||
for {set j 0} {$j < [expr $n*$fields]} {incr j} {
|
||||
$rd read ; # Discard hset replies
|
||||
$rd read ; # Discard hexpire replies
|
||||
$rd read ; # Discard set replies
|
||||
|
||||
for {set i 0} {$i < $n} {incr i} {
|
||||
for {set j 0} {$j < $fields} {incr j} {
|
||||
$rd read ; # Discard hset replies
|
||||
$rd read ; # Discard hexpire replies
|
||||
$rd read ; # Discard set replies
|
||||
}
|
||||
$rd read ; # Discard expire replies
|
||||
}
|
||||
|
||||
# Coverage for listpackex.
|
||||
r hset h_lpex f0 $dummy_field
|
||||
r hexpire h_lpex 9999999 FIELDS 1 f0
|
||||
r hset h_lpex $dummy_field v
|
||||
r hexpire h_lpex 9999999 FIELDS 1 $dummy_field
|
||||
assert_encoding listpackex h_lpex
|
||||
|
||||
after 120 ;# serverCron only updates the info once in 100ms
|
||||
@@ -591,7 +597,7 @@ run_solo {defrag} {
|
||||
}
|
||||
|
||||
# wait for the active defrag to stop working
|
||||
wait_for_defrag_stop 500 100 1.5
|
||||
wait_for_defrag_stop 500 100 1.07
|
||||
|
||||
# test the fragmentation is lower
|
||||
after 120 ;# serverCron only updates the info once in 100ms
|
||||
@@ -603,6 +609,7 @@ run_solo {defrag} {
|
||||
}
|
||||
}
|
||||
}
|
||||
} ;# end of foreach
|
||||
|
||||
test "Active defrag for argv retained by the main thread from IO thread: $type" {
|
||||
r flushdb
|
||||
|
||||
Reference in New Issue
Block a user