mirror of
https://github.com/redis/redis.git
synced 2026-01-09 03:58:05 -05:00
Refactor HFE: Introduce Per-Slot Expiration Store (estore) (#14294)
Hash field expiration is managed with two levels of data structures. 1. At the DB level, an ebuckets structure maintains the set of all hashes that contain fields with expiration. 2. At the per-hash level, an ebuckets structure tracks fields with expiration. This pull request refactors the 1st level to operate per slot instead, and introduces a new API called estore (expiration store). Its design aligns closely with the existing kvstore API, ensuring consistency and simplifying usage. The terminology at that level has been updated from “HFE” or “hexpire” to “subexpiry”, reflecting a broader scope that can later support other data types.
This commit is contained in:
@@ -382,7 +382,7 @@ endif
|
||||
|
||||
REDIS_SERVER_NAME=redis-server$(PROG_SUFFIX)
|
||||
REDIS_SENTINEL_NAME=redis-sentinel$(PROG_SUFFIX)
|
||||
REDIS_SERVER_OBJ=threads_mngr.o memory_prefetch.o adlist.o quicklist.o ae.o anet.o dict.o ebuckets.o eventnotifier.o iothread.o mstr.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o lolwut8.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o
|
||||
REDIS_SERVER_OBJ=threads_mngr.o memory_prefetch.o adlist.o quicklist.o ae.o anet.o dict.o ebuckets.o eventnotifier.o iothread.o mstr.o kvstore.o fwtree.o estore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o lolwut8.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o
|
||||
REDIS_CLI_NAME=redis-cli$(PROG_SUFFIX)
|
||||
REDIS_CLI_OBJ=anet.o adlist.o dict.o redis-cli.o zmalloc.o release.o ae.o redisassert.o crcspeed.o crccombine.o crc64.o siphash.o crc16.o monotonic.o cli_common.o mt19937-64.o strl.o cli_commands.o
|
||||
REDIS_BENCHMARK_NAME=redis-benchmark$(PROG_SUFFIX)
|
||||
|
||||
@@ -264,7 +264,7 @@ void restoreCommand(client *c) {
|
||||
if (kv->type == OBJ_HASH) {
|
||||
uint64_t minExpiredField = hashTypeGetMinExpire(kv, 1);
|
||||
if (minExpiredField != EB_EXPIRE_TIME_INVALID)
|
||||
hashTypeAddToExpires(c->db, kv, minExpiredField);
|
||||
estoreAdd(c->db->subexpires, getKeySlot(key->ptr), kv, minExpiredField);
|
||||
}
|
||||
|
||||
if (ttl) {
|
||||
|
||||
62
src/db.c
62
src/db.c
@@ -481,7 +481,7 @@ static void dbSetValue(redisDb *db, robj *key, robj **valref, dictEntryLink link
|
||||
/* if hash with HFEs, take care to remove from global HFE DS before attempting
|
||||
* to manipulate and maybe free kvOld object */
|
||||
if (old->type == OBJ_HASH)
|
||||
hashTypeRemoveFromExpires(&db->hexpires, old);
|
||||
estoreRemove(db->subexpires, slot, old);
|
||||
|
||||
if (overwrite) {
|
||||
/* RM_StringDMA may call dbUnshareStringValue which may free val, so we
|
||||
@@ -513,9 +513,9 @@ static void dbSetValue(redisDb *db, robj *key, robj **valref, dictEntryLink link
|
||||
kvNew = old;
|
||||
old = val;
|
||||
|
||||
/* Handle TTL in the optimization path */
|
||||
if ((!keepTTL) && (getExpire(db, key->ptr, kvNew) >= 0))
|
||||
removeExpire(db, key);
|
||||
/* Handle TTL in the optimization path */
|
||||
if ((!keepTTL) && (getExpire(db, key->ptr, kvNew) >= 0))
|
||||
removeExpire(db, key);
|
||||
} else {
|
||||
/* Replace the old value at its location in the key space. */
|
||||
val->lru = old->lru;
|
||||
@@ -698,7 +698,7 @@ int dbGenericDelete(redisDb *db, robj *key, int async, int flags) {
|
||||
|
||||
/* If hash object with expiry on fields, remove it from HFE DS of DB */
|
||||
if (type == OBJ_HASH)
|
||||
hashTypeRemoveFromExpires(&db->hexpires, kv);
|
||||
estoreRemove(db->subexpires, slot, kv);
|
||||
|
||||
/* RM_StringDMA may call dbUnshareStringValue which may free kv, so we
|
||||
* need to incr to retain kv */
|
||||
@@ -830,9 +830,9 @@ long long emptyDbStructure(redisDb *dbarray, int dbnum, int async,
|
||||
if (async) {
|
||||
emptyDbAsync(&dbarray[j]);
|
||||
} else {
|
||||
/* Destroy global HFE DS before deleting the hashes since ebuckets
|
||||
* DS is embedded in the stored objects. */
|
||||
ebDestroy(&dbarray[j].hexpires, &hashExpireBucketsType, NULL);
|
||||
/* Destroy sub-expires before deleting the kv-objects since ebuckets
|
||||
* data structure is embedded in the stored kv-objects. */
|
||||
estoreEmpty(dbarray[j].subexpires);
|
||||
kvstoreEmpty(dbarray[j].keys, callback);
|
||||
kvstoreEmpty(dbarray[j].expires, callback);
|
||||
}
|
||||
@@ -913,7 +913,7 @@ redisDb *initTempDb(void) {
|
||||
tempDb[i].keys = kvstoreCreate(&dbDictType, slot_count_bits,
|
||||
flags | KVSTORE_ALLOC_META_KEYS_HIST);
|
||||
tempDb[i].expires = kvstoreCreate(&dbExpiresDictType, slot_count_bits, flags);
|
||||
tempDb[i].hexpires = ebCreate();
|
||||
tempDb[i].subexpires = estoreCreate(&subexpiresBucketsType, slot_count_bits);
|
||||
}
|
||||
|
||||
return tempDb;
|
||||
@@ -926,9 +926,9 @@ void discardTempDb(redisDb *tempDb) {
|
||||
/* Release temp DBs. */
|
||||
emptyDbStructure(tempDb, -1, async, NULL);
|
||||
for (int i=0; i<server.dbnum; i++) {
|
||||
/* Destroy global HFE DS before deleting the hashes since ebuckets DS is
|
||||
* embedded in the stored objects. */
|
||||
ebDestroy(&tempDb[i].hexpires, &hashExpireBucketsType, NULL);
|
||||
/* Destroy sub-expires before deleting the kv-objects since ebuckets
|
||||
* data structure is embedded in the stored kv-objects. */
|
||||
estoreRelease(tempDb[i].subexpires);
|
||||
kvstoreRelease(tempDb[i].keys);
|
||||
kvstoreRelease(tempDb[i].expires);
|
||||
}
|
||||
@@ -1882,14 +1882,14 @@ void renameGenericCommand(client *c, int nx) {
|
||||
* global HFE DS and we will lose the expiration time. */
|
||||
int srctype = o->type;
|
||||
if (srctype == OBJ_HASH)
|
||||
minHashExpireTime = hashTypeRemoveFromExpires(&c->db->hexpires, o);
|
||||
minHashExpireTime = estoreRemove(c->db->subexpires, getKeySlot(c->argv[1]->ptr), o);
|
||||
|
||||
dbDelete(c->db,c->argv[1]);
|
||||
dbAddInternal(c->db, c->argv[2], &o, NULL, expire);
|
||||
|
||||
/* If hash with HFEs, register in db->hexpires */
|
||||
/* If hash with HFEs, register in DB subexpires */
|
||||
if (minHashExpireTime != EB_EXPIRE_TIME_INVALID)
|
||||
hashTypeAddToExpires(c->db, o, minHashExpireTime);
|
||||
estoreAdd(c->db->subexpires, getKeySlot(c->argv[2]->ptr), o, minHashExpireTime);
|
||||
|
||||
signalModifiedKey(c,c->db,c->argv[1]);
|
||||
signalModifiedKey(c,c->db,c->argv[2]);
|
||||
@@ -1964,11 +1964,13 @@ void moveCommand(client *c) {
|
||||
return;
|
||||
}
|
||||
|
||||
/* If hash with expiration on fields, remove it from global HFE DS and keep
|
||||
* aside registered expiration time. Must be before addition/deletion of the
|
||||
* object. hexpires (ebuckets) embed in stored items its structure. */
|
||||
int slot = getKeySlot(c->argv[1]->ptr);
|
||||
|
||||
/* If hash with expiration on fields, remove it from DB subexpires and keep
|
||||
* aside registered expiration time. Must be before removal of the
|
||||
* object since it embeds ExpireMeta that is used by subexpires */
|
||||
if (kv->type == OBJ_HASH)
|
||||
hashExpireTime = hashTypeRemoveFromExpires(&src->hexpires, kv);
|
||||
hashExpireTime = estoreRemove(src->subexpires, slot, kv);
|
||||
|
||||
incrRefCount(kv); /* ref counter = 1->2 */
|
||||
dbDelete(src,c->argv[1]); /* ref counter = 2->1 */
|
||||
@@ -1978,9 +1980,9 @@ void moveCommand(client *c) {
|
||||
kv = setExpireByLink(c, dst, c->argv[1]->ptr, expire, dstBucket);
|
||||
|
||||
/* If object of type hash with expiration on fields. Taken care to add the
|
||||
* hash to hexpires of `dst` only after dbDelete(). */
|
||||
* hash to subexpires of `dst` only after dbDelete(). */
|
||||
if (hashExpireTime != EB_EXPIRE_TIME_INVALID)
|
||||
hashTypeAddToExpires(dst, kv, hashExpireTime);
|
||||
estoreAdd(dst->subexpires, slot, kv, hashExpireTime);
|
||||
|
||||
signalModifiedKey(c,src,c->argv[1]);
|
||||
signalModifiedKey(c,dst,c->argv[1]);
|
||||
@@ -2097,7 +2099,7 @@ void copyCommand(client *c) {
|
||||
/* If minExpiredField was set, then the object is hash with expiration
|
||||
* on fields and need to register it in global HFE DS */
|
||||
if (minHashExpire != EB_EXPIRE_TIME_INVALID)
|
||||
hashTypeAddToExpires(dst, kvCopy, minHashExpire);
|
||||
estoreAdd(dst->subexpires, getKeySlot(newkey->ptr), kvCopy, minHashExpire);
|
||||
|
||||
/* OK! key copied */
|
||||
signalModifiedKey(c,dst,c->argv[2]);
|
||||
@@ -2193,13 +2195,13 @@ int dbSwapDatabases(int id1, int id2) {
|
||||
* remain in the same DB they were. */
|
||||
db1->keys = db2->keys;
|
||||
db1->expires = db2->expires;
|
||||
db1->hexpires = db2->hexpires;
|
||||
db1->subexpires = db2->subexpires;
|
||||
db1->avg_ttl = db2->avg_ttl;
|
||||
db1->expires_cursor = db2->expires_cursor;
|
||||
|
||||
db2->keys = aux.keys;
|
||||
db2->expires = aux.expires;
|
||||
db2->hexpires = aux.hexpires;
|
||||
db2->subexpires = aux.subexpires;
|
||||
db2->avg_ttl = aux.avg_ttl;
|
||||
db2->expires_cursor = aux.expires_cursor;
|
||||
|
||||
@@ -2237,13 +2239,13 @@ void swapMainDbWithTempDb(redisDb *tempDb) {
|
||||
* remain in the same DB they were. */
|
||||
activedb->keys = newdb->keys;
|
||||
activedb->expires = newdb->expires;
|
||||
activedb->hexpires = newdb->hexpires;
|
||||
activedb->subexpires = newdb->subexpires;
|
||||
activedb->avg_ttl = newdb->avg_ttl;
|
||||
activedb->expires_cursor = newdb->expires_cursor;
|
||||
|
||||
newdb->keys = aux.keys;
|
||||
newdb->expires = aux.expires;
|
||||
newdb->hexpires = aux.hexpires;
|
||||
newdb->subexpires = aux.subexpires;
|
||||
newdb->avg_ttl = aux.avg_ttl;
|
||||
newdb->expires_cursor = aux.expires_cursor;
|
||||
|
||||
@@ -2344,11 +2346,11 @@ kvobj *setExpireByLink(client *c, redisDb *db, sds key, long long when, dictEntr
|
||||
/* Val already had an expire field, so it was not reallocated. */
|
||||
serverAssert(kv == kvnew);
|
||||
} else { /* No old expire */
|
||||
uint64_t hexpire = EB_EXPIRE_TIME_INVALID;
|
||||
uint64_t subexpiry = EB_EXPIRE_TIME_INVALID;
|
||||
/* If hash with HFEs, take care to remove from global HFE DS before attempting
|
||||
* to manipulate and maybe free kv object */
|
||||
if (kv->type == OBJ_HASH)
|
||||
hexpire = hashTypeRemoveFromExpires(&db->hexpires, kv);
|
||||
subexpiry = estoreRemove(db->subexpires, slot, kv);
|
||||
|
||||
kvobj *kvnew = kvobjSetExpire(kv, when); /* release kv if reallocated */
|
||||
/* if kvobj was reallocated, update dict */
|
||||
@@ -2360,8 +2362,8 @@ kvobj *setExpireByLink(client *c, redisDb *db, sds key, long long when, dictEntr
|
||||
dictEntry *de = kvstoreDictAddRaw(db->expires, slot, kv, NULL);
|
||||
serverAssert(de != NULL);
|
||||
|
||||
if (hexpire != EB_EXPIRE_TIME_INVALID)
|
||||
hashTypeAddToExpires(db, kv, hexpire);
|
||||
if (subexpiry != EB_EXPIRE_TIME_INVALID)
|
||||
estoreAdd(db->subexpires, slot, kv, subexpiry);
|
||||
}
|
||||
|
||||
int writable_slave = server.masterhost && server.repl_slave_ro == 0;
|
||||
|
||||
100
src/defrag.c
100
src/defrag.c
@@ -73,20 +73,21 @@ struct DefragContext {
|
||||
};
|
||||
static struct DefragContext defrag = {0, 0, 0, 0, 1.0f};
|
||||
|
||||
#define ITER_SLOT_DEFRAG_LUT (-2)
|
||||
#define ITER_SLOT_UNASSIGNED (-1)
|
||||
|
||||
/* There are a number of stages which process a kvstore. To simplify this, a stage helper function
|
||||
* `defragStageKvstoreHelper()` is defined. This function aids in iterating over the kvstore. It
|
||||
* uses these definitions.
|
||||
*/
|
||||
/* State of the kvstore helper. The context passed to the kvstore helper MUST BEGIN
|
||||
* with a kvstoreIterState (or be passed as NULL). */
|
||||
#define KVS_SLOT_DEFRAG_LUT -2
|
||||
#define KVS_SLOT_UNASSIGNED -1
|
||||
typedef struct {
|
||||
kvstore *kvs;
|
||||
int slot;
|
||||
int slot; /* Consider defines ITER_SLOT_XXX for special values. */
|
||||
unsigned long cursor;
|
||||
} kvstoreIterState;
|
||||
#define INIT_KVSTORE_STATE(kvs) ((kvstoreIterState){(kvs), KVS_SLOT_DEFRAG_LUT, 0})
|
||||
#define INIT_KVSTORE_STATE(kvs) ((kvstoreIterState){(kvs), ITER_SLOT_DEFRAG_LUT, 0})
|
||||
|
||||
/* The kvstore helper uses this function to perform tasks before continuing the iteration. For the
|
||||
* main dictionary, large items are set aside and processed by this function before continuing with
|
||||
@@ -117,12 +118,13 @@ typedef struct {
|
||||
} defragKeysCtx;
|
||||
static_assert(offsetof(defragKeysCtx, kvstate) == 0, "defragStageKvstoreHelper requires this");
|
||||
|
||||
/* Context for hexpires */
|
||||
/* Context for subexpires */
|
||||
typedef struct {
|
||||
estore *subexpires;
|
||||
int slot; /* Consider defines ITER_SLOT_XXX for special values. */
|
||||
int dbid;
|
||||
ebuckets hexpires;
|
||||
unsigned long cursor;
|
||||
} defragHExpiresCtx;
|
||||
} defragSubexpiresCtx;
|
||||
|
||||
/* Context for pubsub kvstores */
|
||||
typedef dict *(*getClientChannelsFn)(client *);
|
||||
@@ -962,7 +964,7 @@ void defragKey(defragKeysCtx *ctx, dictEntry *de, dictEntryLink link) {
|
||||
}
|
||||
|
||||
/* Try to defrag robj and/or string value. For hash objects with HFEs,
|
||||
* defer defragmentation until processing db's hexpires. */
|
||||
* defer defragmentation until processing db's subexpires. */
|
||||
if (!(ob->type == OBJ_HASH && hashTypeGetMinExpire(ob, 0) != EB_EXPIRE_TIME_INVALID)) {
|
||||
/* If the dict doesn't have metadata, we directly defrag it. */
|
||||
kvnew = activeDefragStringOb(ob);
|
||||
@@ -1251,13 +1253,13 @@ static doneStatus defragStageKvstoreHelper(monotime endtime,
|
||||
unsigned long long prev_scanned = server.stat_active_defrag_scanned;
|
||||
kvstoreIterState *state = (kvstoreIterState*)ctx;
|
||||
|
||||
if (state->slot == KVS_SLOT_DEFRAG_LUT) {
|
||||
if (state->slot == ITER_SLOT_DEFRAG_LUT) {
|
||||
/* Before we start scanning the kvstore, handle the main structures */
|
||||
do {
|
||||
state->cursor = kvstoreDictLUTDefrag(state->kvs, state->cursor, dictDefragTables);
|
||||
if (getMonotonicUs() >= endtime) return DEFRAG_NOT_DONE;
|
||||
} while (state->cursor != 0);
|
||||
state->slot = KVS_SLOT_UNASSIGNED;
|
||||
state->slot = ITER_SLOT_UNASSIGNED;
|
||||
}
|
||||
|
||||
while (1) {
|
||||
@@ -1274,13 +1276,13 @@ static doneStatus defragStageKvstoreHelper(monotime endtime,
|
||||
|
||||
if (!state->cursor) {
|
||||
/* If there's no cursor, we're ready to begin a new kvstore slot. */
|
||||
if (state->slot == KVS_SLOT_UNASSIGNED) {
|
||||
if (state->slot == ITER_SLOT_UNASSIGNED) {
|
||||
state->slot = kvstoreGetFirstNonEmptyDictIndex(state->kvs);
|
||||
} else {
|
||||
state->slot = kvstoreGetNextNonEmptyDictIndex(state->kvs, state->slot);
|
||||
}
|
||||
|
||||
if (state->slot == KVS_SLOT_UNASSIGNED) return DEFRAG_DONE;
|
||||
if (state->slot == ITER_SLOT_UNASSIGNED) return DEFRAG_DONE;
|
||||
}
|
||||
|
||||
/* Whatever privdata's actual type, this function requires that it begins with kvstoreIterState. */
|
||||
@@ -1328,57 +1330,81 @@ static doneStatus defragStageExpiresKvstore(void *ctx, monotime endtime) {
|
||||
scanCallbackCountScanned, NULL, &defragfns);
|
||||
}
|
||||
|
||||
/* Defragment hash object with HFE and update its reference in the DB keys. */
|
||||
void *activeDefragHExpiresOB(void *ptr, void *privdata) {
|
||||
/* Defrag (hash) object with subexpiry and update its reference in the DB keys. */
|
||||
void *activeDefragSubexpiresOB(void *ptr, void *privdata) {
|
||||
redisDb *db = privdata;
|
||||
dictEntryLink link, exlink = NULL;
|
||||
kvobj *newkv, *kvobj = ptr;
|
||||
sds keystr = kvobjGetKey(kvobj);
|
||||
kvobj *newkv, *kv = ptr;
|
||||
sds keystr = kvobjGetKey(kv);
|
||||
unsigned int slot = calculateKeySlot(keystr);
|
||||
serverAssert(kvobj->type == OBJ_HASH);
|
||||
|
||||
long long expire = kvobjGetExpire(kvobj);
|
||||
serverAssert(kv->type == OBJ_HASH); /* Currently relevant only for hashes */
|
||||
|
||||
long long expire = kvobjGetExpire(kv);
|
||||
/* We can't search in db->expires for that KV after we've released
|
||||
* the pointer it holds, since it won't be able to do the string
|
||||
* compare. Search it before, if needed. */
|
||||
if (expire != -1) {
|
||||
exlink = kvstoreDictFindLink(db->expires, slot, kvobjGetKey(kvobj), NULL);
|
||||
exlink = kvstoreDictFindLink(db->expires, slot, keystr, NULL);
|
||||
serverAssert(exlink != NULL);
|
||||
}
|
||||
|
||||
if ((newkv = activeDefragAllocWithoutFree(kvobj))) {
|
||||
if ((newkv = activeDefragAllocWithoutFree(kv))) {
|
||||
/* Update its reference in the DB keys. */
|
||||
link = kvstoreDictFindLink(db->keys, slot, keystr, NULL);
|
||||
serverAssert(link != NULL);
|
||||
kvstoreDictSetAtLink(db->keys, slot, newkv, &link, 0);
|
||||
if (expire != -1)
|
||||
kvstoreDictSetAtLink(db->expires, slot, newkv, &exlink, 0);
|
||||
activeDefragFree(kvobj);
|
||||
activeDefragFree(kv);
|
||||
}
|
||||
return newkv;
|
||||
}
|
||||
|
||||
static doneStatus defragStageHExpires(void *ctx, monotime endtime) {
|
||||
static doneStatus defragStageSubexpires(void *ctx, monotime endtime) {
|
||||
unsigned int iterations = 0;
|
||||
defragHExpiresCtx *defrag_hexpires_ctx = ctx;
|
||||
redisDb *db = &server.db[defrag_hexpires_ctx->dbid];
|
||||
if (db->hexpires != defrag_hexpires_ctx->hexpires) {
|
||||
/* There has been a change of the kvs (flushdb, swapdb, etc.). Just complete the stage. */
|
||||
unsigned long long prev_defragged = server.stat_active_defrag_hits;
|
||||
unsigned long long prev_scanned = server.stat_active_defrag_scanned;
|
||||
defragSubexpiresCtx *subctx = ctx;
|
||||
redisDb *db = &server.db[subctx->dbid];
|
||||
estore *subexpires = db->subexpires;
|
||||
|
||||
/* If estore changed (flushdb, swapdb, etc.), Just complete the stage. */
|
||||
if (db->subexpires != subctx->subexpires) {
|
||||
return DEFRAG_DONE;
|
||||
}
|
||||
|
||||
ebDefragFunctions eb_defragfns = {
|
||||
.defragAlloc = activeDefragAlloc,
|
||||
.defragItem = activeDefragHExpiresOB
|
||||
.defragItem = activeDefragSubexpiresOB
|
||||
};
|
||||
while (1) {
|
||||
if (!ebScanDefrag(&db->hexpires, &hashExpireBucketsType, &defrag_hexpires_ctx->cursor, &eb_defragfns, db))
|
||||
return DEFRAG_DONE;
|
||||
|
||||
if (++iterations > 16) {
|
||||
while (1) {
|
||||
if (++iterations > 16 ||
|
||||
server.stat_active_defrag_hits - prev_defragged > 512 ||
|
||||
server.stat_active_defrag_scanned - prev_scanned > 64)
|
||||
{
|
||||
if (getMonotonicUs() >= endtime) break;
|
||||
iterations = 0;
|
||||
prev_defragged = server.stat_active_defrag_hits;
|
||||
prev_scanned = server.stat_active_defrag_scanned;
|
||||
}
|
||||
|
||||
/* If there's no cursor, we're ready to begin a new estore slot. */
|
||||
if (!subctx->cursor) {
|
||||
if (subctx->slot == ITER_SLOT_UNASSIGNED) {
|
||||
subctx->slot = estoreGetFirstNonEmptyBucket(subexpires);
|
||||
} else {
|
||||
subctx->slot = estoreGetNextNonEmptyBucket(subexpires, subctx->slot);
|
||||
}
|
||||
|
||||
if (subctx->slot == ITER_SLOT_UNASSIGNED) return DEFRAG_DONE;
|
||||
}
|
||||
|
||||
/* Get the ebuckets for the current slot and scan it */
|
||||
ebuckets *bucket = estoreGetBuckets(subexpires, subctx->slot);
|
||||
if (!ebScanDefrag(bucket, &subexpiresBucketsType, &subctx->cursor, &eb_defragfns, db))
|
||||
subctx->cursor = 0; /* Reset cursor to move to next slot */
|
||||
}
|
||||
|
||||
return DEFRAG_NOT_DONE;
|
||||
@@ -1709,11 +1735,13 @@ static void beginDefragCycle(void) {
|
||||
defrag_expires_ctx->dbid = dbid;
|
||||
addDefragStage(defragStageExpiresKvstore, freeDefragKeysContext, defrag_expires_ctx);
|
||||
|
||||
/* Add stage for hexpires. */
|
||||
defragHExpiresCtx *defrag_hexpires_ctx = zcalloc(sizeof(defragHExpiresCtx));
|
||||
defrag_hexpires_ctx->hexpires = db->hexpires;
|
||||
defrag_hexpires_ctx->dbid = dbid;
|
||||
addDefragStage(defragStageHExpires, zfree, defrag_hexpires_ctx);
|
||||
/* Add stage for subexpires. */
|
||||
defragSubexpiresCtx *defrag_subexpires_ctx = zcalloc(sizeof(defragSubexpiresCtx));
|
||||
defrag_subexpires_ctx->subexpires = db->subexpires;
|
||||
defrag_subexpires_ctx->slot = ITER_SLOT_UNASSIGNED;
|
||||
defrag_subexpires_ctx->cursor = 0;
|
||||
defrag_subexpires_ctx->dbid = dbid;
|
||||
addDefragStage(defragStageSubexpires, zfree, defrag_subexpires_ctx);
|
||||
}
|
||||
|
||||
/* Add stage for pubsub channels. */
|
||||
|
||||
@@ -737,6 +737,7 @@ static void ebValidateList(eItem head, EbucketsType *type) {
|
||||
|
||||
for (int i = 0; i < mHead->numItems ; ++i) {
|
||||
mIter = type->getExpireMeta(iter);
|
||||
assert(mIter->trash == 0);
|
||||
if (i == 0) {
|
||||
/* first item */
|
||||
assert(mIter->numItems > 0 && mIter->numItems <= EB_LIST_MAX_ITEMS);
|
||||
@@ -1211,7 +1212,7 @@ static void ebValidateRax(rax *rax, EbucketsType *type) {
|
||||
assert(iter != NULL);
|
||||
mIter = type->getExpireMeta(iter);
|
||||
curBktKey = EB_BUCKET_KEY(ebGetMetaExpTime(mIter));
|
||||
|
||||
assert(mIter->trash == 0);
|
||||
if (i == 0) {
|
||||
assert(mIter->numItems > 0 && mIter->numItems <= EB_SEG_MAX_ITEMS);
|
||||
assert(mIter->firstItemBucket == expectFirstItemBucket);
|
||||
@@ -1978,8 +1979,10 @@ int ebDefragRax(ebuckets *eb, EbucketsType *type, unsigned long *cursor,
|
||||
*
|
||||
* The 'defragfns' callbacks are called with a pointer to memory that callback
|
||||
* can reallocate. The callbacks should return a new memory address or NULL,
|
||||
* where NULL means that no reallocation happened and the old memory is still
|
||||
* valid. */
|
||||
* where NULL means that no reallocation happened and the old memory is still valid.
|
||||
*
|
||||
* Returns 0 if no more work needs to be been done. Otherwise 1.
|
||||
*/
|
||||
int ebScanDefrag(ebuckets *eb, EbucketsType *type, unsigned long *cursor,
|
||||
ebDefragFunctions *defragfns, void *privdata)
|
||||
{
|
||||
|
||||
475
src/estore.c
Normal file
475
src/estore.c
Normal file
@@ -0,0 +1,475 @@
|
||||
/*
|
||||
* estore.c -- Expiration Store implementation
|
||||
*
|
||||
* Copyright (c) 2011-Present, Redis Ltd.
|
||||
* All rights reserved.
|
||||
*
|
||||
* Licensed under your choice of (a) the Redis Source Available License 2.0
|
||||
* (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the
|
||||
* GNU Affero General Public License v3 (AGPLv3).
|
||||
*/
|
||||
|
||||
#include "fmacros.h"
|
||||
#include "estore.h"
|
||||
#include "zmalloc.h"
|
||||
#include "redisassert.h"
|
||||
#include "server.h"
|
||||
#include <string.h>
|
||||
|
||||
/* Forward declaration of the estore structure */
|
||||
struct _estore {
|
||||
int flags; /* Flags for configuration options */
|
||||
EbucketsType *bucket_type; /* Type of buckets used in this store */
|
||||
ebuckets *ebArray; /* Array of ebuckets (one per slot in cluster mode, or just one) */
|
||||
int num_buckets_bits; /* Log2 of the number of buckets */
|
||||
int num_buckets; /* Number of buckets (1 << num_buckets_bits) */
|
||||
unsigned long long count; /* Total number of items in this estore */
|
||||
fenwickTree *buckets_sizes; /* Binary indexed tree (BIT) that describes cumulative key frequencies */
|
||||
};
|
||||
|
||||
/* Get the appropriate bucket for a given eidx */
|
||||
ebuckets *estoreGetBuckets(estore *es, int eidx) {
|
||||
debugAssert(eidx < es->num_buckets);
|
||||
return &(es->ebArray[eidx]);
|
||||
}
|
||||
|
||||
/* Create a new expiration store
|
||||
* type - Pointer to a static EbucketsType defining the bucket behavior.
|
||||
* num_buckets_bits - The log2 of the number of buckets (0 for 1 bucket,
|
||||
* CLUSTER_SLOT_MASK_BITS for CLUSTER_SLOTS buckets)
|
||||
* flags - Configuration flags
|
||||
*/
|
||||
estore *estoreCreate(EbucketsType *type, int num_buckets_bits) {
|
||||
/* We can't support more than 2^16 buckets to be consistent with kvstore */
|
||||
assert(num_buckets_bits <= 16);
|
||||
|
||||
estore *es = zmalloc(sizeof(estore));
|
||||
/* Store the bucket type */
|
||||
es->bucket_type = type;
|
||||
|
||||
/* Calculate number of buckets based on num_buckets_bits */
|
||||
es->num_buckets_bits = num_buckets_bits;
|
||||
es->num_buckets = 1 << num_buckets_bits;
|
||||
es->buckets_sizes = num_buckets_bits > 1 ? fwTreeCreate(num_buckets_bits) : NULL;
|
||||
|
||||
/* Allocate the buckets array */
|
||||
es->ebArray = zcalloc(sizeof(ebuckets) * es->num_buckets);
|
||||
|
||||
/* Initialize all buckets */
|
||||
for (int i = 0; i < es->num_buckets; i++) {
|
||||
es->ebArray[i] = ebCreate();
|
||||
}
|
||||
|
||||
es->count = 0;
|
||||
return es;
|
||||
}
|
||||
|
||||
/* Empty an expiration store (clear all entries but keep the structure) */
|
||||
void estoreEmpty(estore *es) {
|
||||
if (es == NULL) return;
|
||||
|
||||
for (int i = 0; i < es->num_buckets; i++) {
|
||||
ebDestroy(&es->ebArray[i], es->bucket_type, NULL);
|
||||
es->ebArray[i] = ebCreate();
|
||||
}
|
||||
|
||||
es->count = 0;
|
||||
}
|
||||
|
||||
/* Check if the expiration store is empty */
|
||||
int estoreIsEmpty(estore *es) {
|
||||
return es->count == 0;
|
||||
}
|
||||
|
||||
/* Get the first non-empty bucket index in the estore */
|
||||
int estoreGetFirstNonEmptyBucket(estore *es) {
|
||||
if (es->num_buckets == 1 || estoreSize(es) == 0)
|
||||
return 0;
|
||||
return fwTreeFindFirstNonEmpty(es->buckets_sizes);
|
||||
}
|
||||
|
||||
/* Get the next non-empty bucket index after the given index */
|
||||
int estoreGetNextNonEmptyBucket(estore *es, int eidx) {
|
||||
if (es->num_buckets == 1) {
|
||||
assert(eidx == 0);
|
||||
return -1;
|
||||
}
|
||||
return fwTreeFindNextNonEmpty(es->buckets_sizes, eidx);
|
||||
}
|
||||
|
||||
/* Release an expiration store (free all memory) */
|
||||
void estoreRelease(estore *es) {
|
||||
if (es == NULL) return;
|
||||
|
||||
for (int i = 0; i < es->num_buckets; i++) {
|
||||
if (es->ebArray[i])
|
||||
ebDestroy(&es->ebArray[i], es->bucket_type, NULL);
|
||||
}
|
||||
fwTreeDestroy(es->buckets_sizes);
|
||||
zfree(es->ebArray);
|
||||
zfree(es);
|
||||
}
|
||||
|
||||
/* Perform active expiration on a specific bucket */
|
||||
void estoreActiveExpire(estore *es, int eidx, ExpireInfo *info) {
|
||||
ebuckets *eb = estoreGetBuckets(es, eidx);
|
||||
uint64_t before = ebGetTotalItems(*eb, es->bucket_type);
|
||||
ebExpire(eb, es->bucket_type, info);
|
||||
/* If items expired (or updated), update the BIT and estore count */
|
||||
if (info->itemsExpired) {
|
||||
uint64_t diff = before - ebGetTotalItems(*eb, es->bucket_type);
|
||||
fwTreeUpdate(es->buckets_sizes, eidx, (long long) diff);
|
||||
es->count -= diff;
|
||||
}
|
||||
}
|
||||
|
||||
/* Add item to estore with the given expiration time. The item must has
|
||||
* expireMeta already allocated. */
|
||||
void estoreAdd(estore *es, int eidx, eItem item, uint64_t when) {
|
||||
debugAssert(es != NULL && item != NULL);
|
||||
|
||||
/* currently only used by hash field expiration. Verify it has expireMeta */
|
||||
debugAssert((((robj *)item)->encoding == OBJ_ENCODING_LISTPACK_EX) ||
|
||||
((((robj *)item)->encoding == OBJ_ENCODING_HT) &&
|
||||
((dict *) ((robj *)item)->ptr)->type == &mstrHashDictTypeWithHFE));
|
||||
|
||||
ebuckets *bucket = estoreGetBuckets(es, eidx);
|
||||
if (ebAdd(bucket, es->bucket_type, item, when) == 0) {
|
||||
es->count++;
|
||||
fwTreeUpdate(es->buckets_sizes, eidx, 1);
|
||||
}
|
||||
}
|
||||
|
||||
/* Remove an item from the expiration store. Returns the expire time or EB_EXPIRE_TIME_INVALID */
|
||||
uint64_t estoreRemove(estore *es, int eidx, eItem item) {
|
||||
uint64_t expireTime;
|
||||
debugAssert(es != NULL && item != NULL);
|
||||
|
||||
/* Currently only used by hash field expiration. gracefully ignore otherwise */
|
||||
kvobj *kv = (kvobj *) item;
|
||||
if ( (kv->type != OBJ_HASH) ||
|
||||
(kv->encoding == OBJ_ENCODING_LISTPACK) ||
|
||||
((kv->encoding == OBJ_ENCODING_HT) && (((dict *)kv->ptr)->type != &mstrHashDictTypeWithHFE)))
|
||||
return EB_EXPIRE_TIME_INVALID;
|
||||
|
||||
/* If (ExpireMeta of kv) marked as trash, then it is already removed */
|
||||
if ((expireTime = ebGetExpireTime(es->bucket_type, item)) == EB_EXPIRE_TIME_INVALID)
|
||||
return EB_EXPIRE_TIME_INVALID;
|
||||
|
||||
ebuckets *bucket = estoreGetBuckets(es, eidx);
|
||||
serverAssert(ebRemove(bucket, es->bucket_type, item)==1);
|
||||
es->count--;
|
||||
fwTreeUpdate(es->buckets_sizes, eidx, -1);
|
||||
|
||||
return expireTime;
|
||||
}
|
||||
|
||||
/* Update an item's expiration time in the store */
|
||||
void estoreUpdate(estore *es, int eidx, eItem item, uint64_t when) {
|
||||
debugAssert(es != NULL && item != NULL);
|
||||
|
||||
/* currently only used by hash field expiration. Verify it has expireMeta */
|
||||
debugAssert((((robj *)item)->encoding == OBJ_ENCODING_LISTPACK_EX) ||
|
||||
((((robj *)item)->encoding == OBJ_ENCODING_HT) &&
|
||||
((dict *) ((robj *)item)->ptr)->type == &mstrHashDictTypeWithHFE));
|
||||
|
||||
debugAssert(ebGetExpireTime(es->bucket_type, item) != EB_EXPIRE_TIME_INVALID);
|
||||
|
||||
ebuckets *bucket = estoreGetBuckets(es, eidx);
|
||||
|
||||
/* Remove the item from its current position */
|
||||
serverAssert(ebRemove(bucket, es->bucket_type, item) != 0);
|
||||
|
||||
/* Add the item back with the new expiration time */
|
||||
serverAssert(ebAdd(bucket, es->bucket_type, item, when) == 0);
|
||||
|
||||
/* Note that estore count remain unchanged */
|
||||
}
|
||||
|
||||
/* Get the total number of items in the expiration store */
|
||||
uint64_t estoreSize(estore *es) {
|
||||
return es->count;
|
||||
}
|
||||
|
||||
#ifdef REDIS_TEST
|
||||
#include <stdio.h>
|
||||
#include "testhelp.h"
|
||||
|
||||
#define TEST(name) printf("test — %s\n", name);
|
||||
|
||||
/* Test item structure for estore testing */
|
||||
typedef struct TestItem {
|
||||
kvobj kv; /* mimic kvobj of type HASH to pass checks in estore */
|
||||
ExpireMeta mexpire;
|
||||
int index;
|
||||
} TestItem;
|
||||
|
||||
/* Test EbucketsType for estore testing */
|
||||
ExpireMeta *getTestItemExpireMeta(const eItem item) {
|
||||
return &((TestItem *)item)->mexpire;
|
||||
}
|
||||
|
||||
void deleteTestItemCb(eItem item, void *ctx) {
|
||||
UNUSED(ctx);
|
||||
zfree(item);
|
||||
}
|
||||
|
||||
EbucketsType testEbucketsType = {
|
||||
.getExpireMeta = getTestItemExpireMeta,
|
||||
.onDeleteItem = deleteTestItemCb,
|
||||
.itemsAddrAreOdd = 0,
|
||||
};
|
||||
|
||||
/* Helper function to create a test item */
|
||||
static TestItem *createTestItem(int index) {
|
||||
TestItem *item = zmalloc(sizeof(TestItem));
|
||||
item->index = index;
|
||||
item->mexpire.trash = 1;
|
||||
/* mimic kvobj of type HASH to pass checks in estore */
|
||||
item->kv.type = OBJ_HASH;
|
||||
item->kv.encoding = OBJ_ENCODING_LISTPACK_EX;
|
||||
return item;
|
||||
}
|
||||
|
||||
static ExpireAction activeExpireTestCb(eItem item, void *ctx) {
|
||||
UNUSED(ctx);
|
||||
zfree(item);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int estoreTest(int argc, char **argv, int flags) {
|
||||
UNUSED(argc);
|
||||
UNUSED(argv);
|
||||
UNUSED(flags);
|
||||
|
||||
/* Initialize minimal server state needed for testing */
|
||||
server.hz = 10;
|
||||
server.unixtime = time(NULL);
|
||||
|
||||
TEST("Create and destroy estore") {
|
||||
estore *es = estoreCreate(&testEbucketsType, 0);
|
||||
assert(es != NULL);
|
||||
assert(estoreIsEmpty(es));
|
||||
assert(estoreSize(es) == 0);
|
||||
estoreRelease(es);
|
||||
}
|
||||
|
||||
TEST("Create estore with multiple buckets") {
|
||||
estore *es = estoreCreate(&testEbucketsType, 2); /* 4 buckets */
|
||||
assert(es != NULL);
|
||||
assert(estoreIsEmpty(es));
|
||||
assert(estoreSize(es) == 0);
|
||||
|
||||
/* Test bucket access */
|
||||
ebuckets *bucket0 = estoreGetBuckets(es, 0);
|
||||
ebuckets *bucket1 = estoreGetBuckets(es, 1);
|
||||
ebuckets *bucket2 = estoreGetBuckets(es, 2);
|
||||
ebuckets *bucket3 = estoreGetBuckets(es, 3);
|
||||
|
||||
assert(bucket0 != NULL);
|
||||
assert(bucket1 != NULL);
|
||||
assert(bucket2 != NULL);
|
||||
assert(bucket3 != NULL);
|
||||
|
||||
/* All buckets should be different */
|
||||
assert(bucket0 != bucket1);
|
||||
assert(bucket0 != bucket2);
|
||||
assert(bucket1 != bucket3);
|
||||
|
||||
estoreRelease(es);
|
||||
}
|
||||
|
||||
TEST("Add and remove items") {
|
||||
estore *es = estoreCreate(&testEbucketsType, 1); /* 2 buckets */
|
||||
|
||||
/* Test initial state */
|
||||
assert(estoreSize(es) == 0);
|
||||
assert(estoreIsEmpty(es));
|
||||
|
||||
/* Create test items */
|
||||
TestItem *item1 = createTestItem(1);
|
||||
TestItem *item2 = createTestItem(2);
|
||||
TestItem *item3 = createTestItem(3);
|
||||
|
||||
/* Add items to different buckets */
|
||||
estoreAdd(es, 0, item1, 1000);
|
||||
assert(estoreSize(es) == 1);
|
||||
assert(!estoreIsEmpty(es));
|
||||
|
||||
estoreAdd(es, 1, item2, 2000);
|
||||
assert(estoreSize(es) == 2);
|
||||
|
||||
estoreAdd(es, 0, item3, 3000); /* Add another item to bucket 0 */
|
||||
assert(estoreSize(es) == 3);
|
||||
|
||||
/* Verify expiration times are set correctly */
|
||||
assert(ebGetMetaExpTime(&item1->mexpire) == 1000);
|
||||
assert(ebGetMetaExpTime(&item2->mexpire) == 2000);
|
||||
assert(ebGetMetaExpTime(&item3->mexpire) == 3000);
|
||||
|
||||
/* Remove items */
|
||||
uint64_t expireTime1 = estoreRemove(es, 0, item1);
|
||||
assert(expireTime1 == 1000);
|
||||
assert(estoreSize(es) == 2);
|
||||
zfree(item1);
|
||||
|
||||
uint64_t expireTime2 = estoreRemove(es, 1, item2);
|
||||
assert(expireTime2 == 2000);
|
||||
assert(estoreSize(es) == 1);
|
||||
zfree(item2);
|
||||
|
||||
uint64_t expireTime3 = estoreRemove(es, 0, item3);
|
||||
assert(expireTime3 == 3000);
|
||||
assert(estoreSize(es) == 0);
|
||||
assert(estoreIsEmpty(es));
|
||||
zfree(item3);
|
||||
|
||||
/* Clean up - items are freed by the onDeleteItem callback */
|
||||
estoreRelease(es);
|
||||
}
|
||||
|
||||
TEST("Update item expiration") {
|
||||
estore *es = estoreCreate(&testEbucketsType, 0); /* 1 bucket */
|
||||
|
||||
/* Create and add a test item */
|
||||
TestItem *item = createTestItem(1);
|
||||
estoreAdd(es, 0, item, 1000);
|
||||
assert(estoreSize(es) == 1);
|
||||
|
||||
/* Verify initial expiration time */
|
||||
assert(ebGetMetaExpTime(&item->mexpire) == 1000);
|
||||
|
||||
/* Update expiration time */
|
||||
estoreUpdate(es, 0, item, 2000);
|
||||
assert(estoreSize(es) == 1); /* Size should remain the same */
|
||||
assert(ebGetMetaExpTime(&item->mexpire) == 2000);
|
||||
|
||||
/* Update again to a different time */
|
||||
estoreUpdate(es, 0, item, 500);
|
||||
assert(estoreSize(es) == 1);
|
||||
assert(ebGetMetaExpTime(&item->mexpire) == 500);
|
||||
|
||||
/* Clean up */
|
||||
estoreRemove(es, 0, item);
|
||||
assert(estoreSize(es) == 0);
|
||||
zfree(item);
|
||||
|
||||
estoreRelease(es);
|
||||
}
|
||||
|
||||
TEST("Non-empty bucket iteration") {
|
||||
estore *es = estoreCreate(&testEbucketsType, 2); /* 4 buckets */
|
||||
|
||||
/* Test bucket iteration on empty store */
|
||||
assert(estoreGetFirstNonEmptyBucket(es) == 0); /* Returns 0 when empty */
|
||||
assert(estoreGetNextNonEmptyBucket(es, 0) == -1); /* No next bucket when empty */
|
||||
|
||||
/* Create test items and add to specific buckets */
|
||||
TestItem *item1 = createTestItem(1);
|
||||
TestItem *item2 = createTestItem(2);
|
||||
TestItem *item3 = createTestItem(3);
|
||||
|
||||
/* Add to bucket 1 and 3 (skip 0 and 2) */
|
||||
estoreAdd(es, 1, item1, 1000);
|
||||
estoreAdd(es, 3, item2, 2000);
|
||||
estoreAdd(es, 3, item3, 3000); /* Add another item to bucket 3 */
|
||||
|
||||
assert(estoreSize(es) == 3);
|
||||
|
||||
/* Test iteration through non-empty buckets */
|
||||
int firstBucket = estoreGetFirstNonEmptyBucket(es);
|
||||
assert(firstBucket == 1);
|
||||
|
||||
int nextBucket = estoreGetNextNonEmptyBucket(es, firstBucket);
|
||||
assert(nextBucket == 3);
|
||||
|
||||
int lastBucket = estoreGetNextNonEmptyBucket(es, nextBucket);
|
||||
assert(lastBucket == -1); /* No more non-empty buckets */
|
||||
|
||||
/* Test iteration from different starting points */
|
||||
assert(estoreGetNextNonEmptyBucket(es, 0) == 1);
|
||||
assert(estoreGetNextNonEmptyBucket(es, 2) == 3);
|
||||
|
||||
/* Clean up */
|
||||
estoreRemove(es, 1, item1);
|
||||
zfree(item1);
|
||||
estoreRemove(es, 3, item2);
|
||||
zfree(item2);
|
||||
estoreRemove(es, 3, item3);
|
||||
zfree(item3);
|
||||
assert(estoreSize(es) == 0);
|
||||
|
||||
estoreRelease(es);
|
||||
}
|
||||
|
||||
TEST("Empty estore") {
|
||||
estore *es = estoreCreate(&testEbucketsType, 1); /* 2 buckets */
|
||||
|
||||
/* Add some items */
|
||||
TestItem *item1 = createTestItem(1);
|
||||
TestItem *item2 = createTestItem(2);
|
||||
TestItem *item3 = createTestItem(3);
|
||||
|
||||
estoreAdd(es, 0, item1, 1000);
|
||||
estoreAdd(es, 1, item2, 2000);
|
||||
estoreAdd(es, 0, item3, 3000);
|
||||
assert(estoreSize(es) == 3);
|
||||
assert(!estoreIsEmpty(es));
|
||||
|
||||
/* Empty the store - this should call onDeleteItem for all items */
|
||||
estoreEmpty(es);
|
||||
assert(estoreSize(es) == 0);
|
||||
assert(estoreIsEmpty(es));
|
||||
|
||||
/* Verify buckets are empty */
|
||||
assert(estoreGetFirstNonEmptyBucket(es) == 0); /* Returns 0 when empty */
|
||||
assert(estoreGetNextNonEmptyBucket(es, 0) == -1);
|
||||
|
||||
estoreRelease(es);
|
||||
}
|
||||
|
||||
TEST("Active expiration") {
|
||||
estore *es = estoreCreate(&testEbucketsType, 14); /* 2^14 buckets */
|
||||
|
||||
/* Create items with different expiration times */
|
||||
TestItem *expiredItem1 = createTestItem(1);
|
||||
TestItem *expiredItem2 = createTestItem(2);
|
||||
TestItem *expiredItem3 = createTestItem(3);
|
||||
TestItem *futureItem = createTestItem(4);
|
||||
|
||||
estoreAdd(es, 0, expiredItem1, 1023);
|
||||
estoreAdd(es, 0, expiredItem2, 2047);
|
||||
estoreAdd(es, 1, expiredItem3, 127);
|
||||
estoreAdd(es, 0, futureItem, 4095);
|
||||
assert(estoreSize(es) == 4);
|
||||
|
||||
/* Perform active expiration */
|
||||
ExpireInfo expireInfo = {
|
||||
.maxToExpire = UINT64_MAX,
|
||||
.onExpireItem = activeExpireTestCb,
|
||||
.ctx = NULL,
|
||||
.now = 2048, /* Current time in milliseconds */
|
||||
.itemsExpired = 0
|
||||
};
|
||||
|
||||
estoreActiveExpire(es, 0, &expireInfo);
|
||||
|
||||
/* The expired items should be removed, future item should remain */
|
||||
assert(expireInfo.itemsExpired == 2);
|
||||
assert(estoreSize(es) == 2);
|
||||
|
||||
estoreActiveExpire(es, 1, &expireInfo);
|
||||
assert(expireInfo.itemsExpired == 1);
|
||||
assert(estoreSize(es) == 1);
|
||||
|
||||
/* Clean up remaining item */
|
||||
estoreRemove(es, 0, futureItem);
|
||||
zfree(futureItem);
|
||||
assert(estoreSize(es) == 0);
|
||||
|
||||
estoreRelease(es);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
#endif
|
||||
89
src/estore.h
Normal file
89
src/estore.h
Normal file
@@ -0,0 +1,89 @@
|
||||
/*
|
||||
* Copyright (c) 2011-Present, Redis Ltd.
|
||||
* All rights reserved.
|
||||
*
|
||||
* Licensed under your choice of (a) the Redis Source Available License 2.0
|
||||
* (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the
|
||||
* GNU Affero General Public License v3 (AGPLv3).
|
||||
*
|
||||
* estore.h -- Expiration Store implementation
|
||||
*
|
||||
* ESTORE (Expiration Store)
|
||||
* =========================
|
||||
*
|
||||
* Index-based expiration store implementation. Similar to kvstore, but built
|
||||
* on top of ebuckets instead of dict. Items stored in estore must embed an
|
||||
* ExpireMeta, enabling efficient active-expiration.
|
||||
*
|
||||
* Estore is currently used to manage "subexpiry" only for hash objects with
|
||||
* field-level expiration (HFE). Each hash with HFE is registered in estore
|
||||
* with the earliest expiration time among its fields.
|
||||
*
|
||||
* USAGE IN REDIS
|
||||
* ==============
|
||||
* This implementation is used to efficiently track hash objects that have
|
||||
* field-level expiration (HFE):
|
||||
* - Each hash with HFE is registered with its earliest field expiration time
|
||||
* - Enables efficient active expiration of hash fields
|
||||
* - Uses Fenwick tree for efficient iteration through non-empty buckets
|
||||
* - Supports cluster mode with per-slot buckets
|
||||
*
|
||||
* IMPLEMENTATION NOTES
|
||||
* ====================
|
||||
* - Built on top of ebuckets data structure for expiration management
|
||||
* - Uses Fenwick tree to track cumulative item counts across buckets
|
||||
* - Supports both single bucket (standalone) and multiple buckets (cluster) modes
|
||||
* - All operations have O(log n) time complexity for bucket selection
|
||||
*
|
||||
* STRUCTURE
|
||||
* =========
|
||||
* - ebArray: Array of ebuckets (one per slot in cluster mode, or just one)
|
||||
* - buckets_sizes: Fenwick tree tracking cumulative counts for efficient iteration
|
||||
* - bucket_type: EbucketsType defining callbacks for the stored items
|
||||
*/
|
||||
|
||||
#ifndef __ESTORE_H
|
||||
#define __ESTORE_H
|
||||
|
||||
#include "ebuckets.h"
|
||||
#include "fwtree.h"
|
||||
|
||||
/* Forward declaration of the estore structure */
|
||||
typedef struct _estore estore;
|
||||
|
||||
/* Estore API */
|
||||
|
||||
estore *estoreCreate(EbucketsType *type, int num_buckets_bits);
|
||||
|
||||
void estoreEmpty(estore *es);
|
||||
|
||||
int estoreIsEmpty(estore *es);
|
||||
|
||||
void estoreRelease(estore *es);
|
||||
|
||||
void estoreActiveExpire(estore *es, int eidx, ExpireInfo *info);
|
||||
|
||||
uint64_t estoreRemove(estore *es, int eidx, eItem item);
|
||||
|
||||
void estoreAdd(estore *es, int eidx, eItem item, uint64_t when);
|
||||
|
||||
void estoreUpdate(estore *es, int eidx, eItem item, uint64_t when);
|
||||
|
||||
uint64_t estoreSize(estore *es);
|
||||
|
||||
ebuckets *estoreGetBuckets(estore *es, int eidx);
|
||||
|
||||
int estoreGetFirstNonEmptyBucket(estore *es);
|
||||
|
||||
int estoreGetNextNonEmptyBucket(estore *es, int eidx);
|
||||
|
||||
/* Hash-specific function to get ExpireMeta from a hash kvobj.
|
||||
* Once we shall have another data-type with subexpiry, we should refactor
|
||||
* ExpireMeta to optionally reside as part of kvobj struct */
|
||||
ExpireMeta *hashGetExpireMeta(const eItem kvobjHash);
|
||||
|
||||
#ifdef REDIS_TEST
|
||||
int estoreTest(int argc, char *argv[], int flags);
|
||||
#endif
|
||||
|
||||
#endif /* __ESTORE_H */
|
||||
99
src/expire.c
99
src/expire.c
@@ -11,6 +11,7 @@
|
||||
*/
|
||||
|
||||
#include "server.h"
|
||||
#include "redisassert.h"
|
||||
|
||||
/*-----------------------------------------------------------------------------
|
||||
* Incremental collection of expired keys.
|
||||
@@ -136,14 +137,92 @@ static inline int isExpiryDictValidForSamplingCb(dict *d) {
|
||||
return C_OK;
|
||||
}
|
||||
|
||||
/* SubexpireCtx passed to activeSubexpiresCb() */
|
||||
typedef struct SubexpireCtx {
|
||||
uint32_t fieldsToExpireQuota;
|
||||
redisDb *db;
|
||||
int slot;
|
||||
} SubexpireCtx;
|
||||
|
||||
/*
|
||||
* Active sub-expiration callback
|
||||
*
|
||||
* Called by activeSubexpires() for each key registered in the subexpires DB
|
||||
* with an expiration-time on its "elements" that are less than or equal current
|
||||
* time.
|
||||
*
|
||||
* This callback performs the following actions for each hash:
|
||||
* - Delete expired fields as by calling ebExpire(hash)
|
||||
* - If afterward there are future fields to expire, it will update the hash in
|
||||
* HFE DB with the next hash-field minimum expiration time by returning
|
||||
* ACT_UPDATE_EXP_ITEM.
|
||||
* - If the hash has no more fields to expire, it is removed from the HFE DB
|
||||
* by returning ACT_REMOVE_EXP_ITEM.
|
||||
* - If hash has no more fields afterward, it will remove the hash from keyspace.
|
||||
*/
|
||||
static ExpireAction activeSubexpiresCb(eItem item, void *ctx) {
|
||||
SubexpireCtx *subexCtx = ctx;
|
||||
|
||||
/* If no more quota left for this callback, stop */
|
||||
if (subexCtx->fieldsToExpireQuota == 0)
|
||||
return ACT_STOP_ACTIVE_EXP;
|
||||
|
||||
kvobj *kv = (kvobj *) item;
|
||||
|
||||
/* currently we only support hash type sub-expire */
|
||||
assert(kv->type == OBJ_HASH);
|
||||
uint64_t nextExpTime = hashTypeActiveExpire(subexCtx->db,kv,
|
||||
&subexCtx->fieldsToExpireQuota, 0);
|
||||
|
||||
/* If hash has no more fields to expire or got deleted, indicate
|
||||
* to remove it from HFE DB to the caller ebExpire() */
|
||||
if (nextExpTime == EB_EXPIRE_TIME_INVALID || nextExpTime == 0) {
|
||||
return ACT_REMOVE_EXP_ITEM;
|
||||
} else {
|
||||
/* Hash has more fields to expire. Update next expiration time of the hash
|
||||
* and indicate to add it back to global HFE DS */
|
||||
ebSetMetaExpTime(hashGetExpireMeta(item), nextExpTime);
|
||||
return ACT_UPDATE_EXP_ITEM;
|
||||
}
|
||||
}
|
||||
|
||||
/* DB active expire and update hashes with time-expiration on fields.
|
||||
*
|
||||
* The callback function activeSubexpiresCb() is invoked for each hash registered
|
||||
* in the subexpires DB with an expiration-time less than or equal to the
|
||||
* current time. This callback performs the following actions for each hash:
|
||||
* - If the hash has one or more fields to expire, it will delete those fields.
|
||||
* - If there are more fields to expire, it will update the hash with the next
|
||||
* expiration time in subexpires DB.
|
||||
* - If the hash has no more fields to expire, it is removed from the subexpires DB.
|
||||
* - If the hash has no more fields, it is removed from the main DB.
|
||||
*
|
||||
* Returns number of fields active-expired.
|
||||
*/
|
||||
uint64_t activeSubexpires(redisDb *db, int slot, uint32_t maxFieldsToExpire) {
|
||||
SubexpireCtx ctx = { .db = db, .fieldsToExpireQuota = maxFieldsToExpire, .slot = slot };
|
||||
ExpireInfo info = {
|
||||
.maxToExpire = UINT64_MAX, /* Only maxFieldsToExpire play a role */
|
||||
.onExpireItem = activeSubexpiresCb,
|
||||
.ctx = &ctx,
|
||||
.now = commandTimeSnapshot(),
|
||||
.itemsExpired = 0};
|
||||
|
||||
estoreActiveExpire(db->subexpires, slot, &info);
|
||||
|
||||
/* Return number of fields active-expired */
|
||||
return maxFieldsToExpire - ctx.fieldsToExpireQuota;
|
||||
}
|
||||
|
||||
/* Active expiration Cycle for hash-fields.
|
||||
*
|
||||
* Note that releasing fields is expected to be more predictable and rewarding
|
||||
* than releasing keys because it is stored in `ebuckets` DS which optimized for
|
||||
* active expiration and in addition the deletion of fields is simple to handle. */
|
||||
static inline void activeExpireHashFieldCycle(int type) {
|
||||
static inline void activeSubexpiresCycle(int type) {
|
||||
/* Remember current db across calls */
|
||||
static unsigned int currentDb = 0;
|
||||
static int currentSlot = -1;
|
||||
|
||||
/* Tracks the count of fields actively expired for the current database.
|
||||
* This count continues as long as it fails to actively expire all expired
|
||||
@@ -156,11 +235,13 @@ static inline void activeExpireHashFieldCycle(int type) {
|
||||
redisDb *db = server.db + currentDb;
|
||||
|
||||
/* If db is empty, move to next db and return */
|
||||
if (ebIsEmpty(db->hexpires)) {
|
||||
if (estoreIsEmpty(db->subexpires)) {
|
||||
activeExpirySequence = 0;
|
||||
currentDb = (currentDb + 1) % server.dbnum;
|
||||
return;
|
||||
}
|
||||
if (currentSlot == -1)
|
||||
currentSlot = estoreGetFirstNonEmptyBucket(db->subexpires);
|
||||
|
||||
/* Maximum number of fields to actively expire on a single call */
|
||||
uint32_t maxToExpire = HFE_DB_BASE_ACTIVE_EXPIRE_FIELDS_PER_SEC / server.hz;
|
||||
@@ -174,13 +255,16 @@ static inline void activeExpireHashFieldCycle(int type) {
|
||||
maxToExpire *= (factor<32) ? factor : 32;
|
||||
}
|
||||
|
||||
if (hashTypeDbActiveExpire(db, maxToExpire) == maxToExpire) {
|
||||
if (activeSubexpires(db, currentSlot, maxToExpire) == maxToExpire) {
|
||||
/* active-expire reached maxToExpire limit */
|
||||
activeExpirySequence += maxToExpire;
|
||||
} else {
|
||||
/* Managed to active-expire all expired fields of currentDb */
|
||||
activeExpirySequence = 0;
|
||||
currentDb = (currentDb + 1) % server.dbnum;
|
||||
/* Move to next non-empty subexpires slot */
|
||||
currentSlot = estoreGetNextNonEmptyBucket(db->subexpires, currentSlot);
|
||||
if (currentSlot == -1)
|
||||
currentDb = (currentDb + 1) % server.dbnum;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -282,10 +366,9 @@ void activeExpireCycle(int type) {
|
||||
* distribute the time evenly across DBs. */
|
||||
current_db++;
|
||||
|
||||
/* Interleaving hash-field expiration with key expiration. Better
|
||||
* call it before handling expired keys because HFE DS is optimized for
|
||||
* active expiration */
|
||||
activeExpireHashFieldCycle(type);
|
||||
/* Interleaving sub-expiration with key expiration. Better call it before
|
||||
* handling expired keys because ebuckets is optimized for active expiration */
|
||||
activeSubexpiresCycle(type);
|
||||
|
||||
if (kvstoreSize(db->expires))
|
||||
dbs_performed++;
|
||||
|
||||
238
src/fwtree.c
Normal file
238
src/fwtree.c
Normal file
@@ -0,0 +1,238 @@
|
||||
/*
|
||||
* fwtree.c -- FENWICK TREE (Binary Indexed Tree)
|
||||
*
|
||||
* Copyright (c) 2011-Present, Redis Ltd.
|
||||
* All rights reserved.
|
||||
*
|
||||
* Licensed under your choice of (a) the Redis Source Available License 2.0
|
||||
* (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the
|
||||
* GNU Affero General Public License v3 (AGPLv3).
|
||||
*/
|
||||
|
||||
#include "server.h"
|
||||
#include "fwtree.h"
|
||||
#include "zmalloc.h"
|
||||
#include "redisassert.h"
|
||||
#include <string.h>
|
||||
|
||||
struct _fenwickTree {
|
||||
unsigned long long *tree;
|
||||
int size_bits;
|
||||
int size;
|
||||
uint64_t total;
|
||||
};
|
||||
|
||||
/* Create a new Fenwick tree with 2^sizeBits elements (all initialized to 0) */
|
||||
fenwickTree *fwTreeCreate(int sizeBits) {
|
||||
|
||||
fenwickTree *ft = zmalloc(sizeof(fenwickTree));
|
||||
ft->size_bits = sizeBits;
|
||||
ft->size = 1 << sizeBits;
|
||||
/* Fenwick tree is 1-based, so we need size + 1 elements */
|
||||
ft->tree = zcalloc(sizeof(unsigned long long) * (ft->size + 1));
|
||||
ft->total = 0;
|
||||
return ft;
|
||||
}
|
||||
|
||||
void fwTreeDestroy(fenwickTree *ft) {
|
||||
if (!ft) return;
|
||||
zfree(ft->tree);
|
||||
zfree(ft);
|
||||
}
|
||||
|
||||
/* Query cumulative sum from index 0 to idx (inclusive, 0-based) */
|
||||
unsigned long long fwTreePrefixSum(fenwickTree *ft, int idx) {
|
||||
if (!ft || idx < 0) return 0;
|
||||
if (idx >= ft->size) idx = ft->size - 1;
|
||||
|
||||
/* Convert to 1-based indexing */
|
||||
idx++;
|
||||
|
||||
unsigned long long sum = 0;
|
||||
while (idx > 0) {
|
||||
sum += ft->tree[idx];
|
||||
idx -= (idx & -idx);
|
||||
}
|
||||
return sum;
|
||||
}
|
||||
|
||||
/* Update the tree by adding delta to the element at idx (0-based) */
|
||||
void fwTreeUpdate(fenwickTree *ft, int idx, long long delta) {
|
||||
if (!ft || idx < 0 || idx >= ft->size) return;
|
||||
|
||||
/* Convert to 1-based indexing */
|
||||
idx++;
|
||||
ft->total += delta;
|
||||
|
||||
while (idx <= ft->size) {
|
||||
if (delta < 0) {
|
||||
assert(ft->tree[idx] >= (unsigned long long)(-delta));
|
||||
}
|
||||
ft->tree[idx] += delta;
|
||||
idx += (idx & -idx);
|
||||
}
|
||||
debugAssert(ft->total == fwTreePrefixSum(ft, ft->size - 1));
|
||||
}
|
||||
|
||||
/* Find the 0-based index where the cumulative sum first reaches or exceeds target.
|
||||
* target should be in range [1..total].
|
||||
* Returns the 0-based index, or 0 if target <= 0 or tree is empty.
|
||||
*/
|
||||
int fwTreeFindIndex(fenwickTree *ft, unsigned long long target) {
|
||||
debugAssert(ft);
|
||||
|
||||
if (target <= 0) return 0;
|
||||
|
||||
int result = 0, bit_mask = 1 << ft->size_bits;
|
||||
for (int i = bit_mask; i != 0; i >>= 1) {
|
||||
int current = result + i;
|
||||
/* When the target index is greater than 'current' node value the we will update
|
||||
* the target and search in the 'current' node tree. */
|
||||
if (target > ft->tree[current]) {
|
||||
target -= ft->tree[current];
|
||||
result = current;
|
||||
}
|
||||
}
|
||||
/* Adjust the result to get the correct index:
|
||||
* 1. result += 1;
|
||||
* After the calculations, the index of target in the tree should be the next one,
|
||||
* so we should add 1.
|
||||
* 2. result -= 1;
|
||||
* Unlike BIT (tree is 1-based), the API uses 0-based indexing, so we need to subtract 1.
|
||||
* As the addition and subtraction cancel each other out, we can simply return the result. */
|
||||
return result;
|
||||
}
|
||||
|
||||
/* Find the first non-empty index (equivalent to fwTreeFindIndex(ft, 1)) */
|
||||
int fwTreeFindFirstNonEmpty(fenwickTree *ft) {
|
||||
debugAssert(ft);
|
||||
return fwTreeFindIndex(ft, 1);
|
||||
}
|
||||
|
||||
/* Find the next non-empty index after idx (0-based).
|
||||
* Returns the 0-based index of the next non-empty element, or -1 if no such element exists.
|
||||
* If idx is -1, finds the first non-empty index.
|
||||
* Time complexity: O(log n)
|
||||
*/
|
||||
int fwTreeFindNextNonEmpty(fenwickTree *ft, int idx) {
|
||||
if (!ft || idx < 0 || idx >= ft->size) return -1;
|
||||
/* Get cumulative sum up to current index */
|
||||
unsigned long long next_sum = fwTreePrefixSum(ft, idx) + 1;
|
||||
/* Find the index that contains the next key (curr_sum + 1) */
|
||||
return (next_sum <= ft->total) ? fwTreeFindIndex(ft, next_sum) : -1;
|
||||
}
|
||||
|
||||
/* Clear all values in the tree */
|
||||
void fwTreeClear(fenwickTree *ft) {
|
||||
debugAssert(ft);
|
||||
memset(ft->tree, 0, sizeof(unsigned long long) * (ft->size + 1));
|
||||
ft->total = 0;
|
||||
}
|
||||
|
||||
#ifdef REDIS_TEST
|
||||
#include <stdio.h>
|
||||
|
||||
#define TEST(name) printf("%s\n", name);
|
||||
|
||||
int fwtreeTest(int argc, char *argv[], int flags) {
|
||||
UNUSED(argc);
|
||||
UNUSED(argv);
|
||||
UNUSED(flags);
|
||||
/* Test basic operations */
|
||||
int sizeBits = 3; /*size = 8*/
|
||||
fenwickTree *ft = fwTreeCreate(sizeBits);
|
||||
assert(ft != NULL);
|
||||
|
||||
TEST("estore - Test updates and queries") {
|
||||
fwTreeUpdate(ft, 0, 5); /* index 0 += 5 */
|
||||
fwTreeUpdate(ft, 2, 3); /* index 2 += 3 */
|
||||
fwTreeUpdate(ft, 4, 7); /* index 4 += 7 */
|
||||
fwTreeUpdate(ft, 6, 2); /* index 6 += 2 */
|
||||
}
|
||||
|
||||
TEST("estore - Test cumulative queries") {
|
||||
assert(fwTreePrefixSum(ft, 0) == 5); /* sum[0..0] = 5 */
|
||||
assert(fwTreePrefixSum(ft, 1) == 5); /* sum[0..1] = 5 */
|
||||
assert(fwTreePrefixSum(ft, 2) == 8); /* sum[0..2] = 5+3 = 8 */
|
||||
assert(fwTreePrefixSum(ft, 3) == 8); /* sum[0..3] = 8 */
|
||||
assert(fwTreePrefixSum(ft, 4) == 15); /* sum[0..4] = 5+3+7 = 15 */
|
||||
assert(fwTreePrefixSum(ft, 5) == 15); /* sum[0..5] = 15 */
|
||||
assert(fwTreePrefixSum(ft, 6) == 17); /* sum[0..6] = 5+3+7+2 = 17 */
|
||||
assert(fwTreePrefixSum(ft, 7) == 17); /* sum[0..7] = 17 */
|
||||
}
|
||||
|
||||
|
||||
|
||||
TEST("estore - Test find_index functionality") {
|
||||
assert(fwTreeFindIndex(ft, 1) == 0); /* target 1 -> index 0 */
|
||||
assert(fwTreeFindIndex(ft, 5) == 0); /* target 5 -> index 0 */
|
||||
assert(fwTreeFindIndex(ft, 6) == 2); /* target 6 -> index 2 */
|
||||
assert(fwTreeFindIndex(ft, 8) == 2); /* target 8 -> index 2 */
|
||||
assert(fwTreeFindIndex(ft, 9) == 4); /* target 9 -> index 4 */
|
||||
assert(fwTreeFindIndex(ft, 15) == 4); /* target 15 -> index 4 */
|
||||
assert(fwTreeFindIndex(ft, 16) == 6); /* target 16 -> index 6 */
|
||||
assert(fwTreeFindIndex(ft, 17) == 6); /* target 17 -> index 6 */
|
||||
}
|
||||
|
||||
TEST("estore - Test fwTreeFindNextNonEmpty functionality") {
|
||||
/* Current state: indices 0, 2, 4, 6 have values 5, 3, 7, 2 respectively */
|
||||
assert(fwTreeFindNextNonEmpty(ft, -1) == -1); /* Invalid index */
|
||||
assert(fwTreeFindNextNonEmpty(ft, 0) == 2); /* Next after 0 is index 2 */
|
||||
assert(fwTreeFindNextNonEmpty(ft, 1) == 2); /* Next after 1 is index 2 */
|
||||
assert(fwTreeFindNextNonEmpty(ft, 2) == 4); /* Next after 2 is index 4 */
|
||||
assert(fwTreeFindNextNonEmpty(ft, 3) == 4); /* Next after 3 is index 4 */
|
||||
assert(fwTreeFindNextNonEmpty(ft, 4) == 6); /* Next after 4 is index 6 */
|
||||
assert(fwTreeFindNextNonEmpty(ft, 5) == 6); /* Next after 5 is index 6 */
|
||||
assert(fwTreeFindNextNonEmpty(ft, 6) == -1); /* No next after 6 */
|
||||
assert(fwTreeFindNextNonEmpty(ft, 7) == -1); /* No next after 7 */
|
||||
}
|
||||
|
||||
TEST("estore - Test negative updates") {
|
||||
fwTreeUpdate(ft, 2, -1); /* index 2 -= 1 */
|
||||
assert(fwTreePrefixSum(ft, 2) == 7); /* sum[0..2] = 5+2 = 7 */
|
||||
assert(fwTreePrefixSum(ft, 7) == 16); /* total = 16 */
|
||||
}
|
||||
|
||||
TEST("estore - Test making an index empty") {
|
||||
fwTreeUpdate(ft, 2, -2); /* index 2 -= 2, should become empty */
|
||||
assert(fwTreePrefixSum(ft, 2) == 5); /* sum[0..2] = 5+0 = 5 */
|
||||
}
|
||||
|
||||
TEST("estore - Test fwTreeFindNextNonEmpty after making index 2 empty") {
|
||||
/* Current state: indices 0, 4, 6 have values 5, 7, 2 respectively (index 2 is now empty) */
|
||||
assert(fwTreeFindNextNonEmpty(ft, 0) == 4); /* Next after 0 is now index 4 (skipping empty 2) */
|
||||
assert(fwTreeFindNextNonEmpty(ft, 1) == 4); /* Next after 1 is index 4 */
|
||||
assert(fwTreeFindNextNonEmpty(ft, 2) == 4); /* Next after 2 is index 4 */
|
||||
assert(fwTreeFindNextNonEmpty(ft, 3) == 4); /* Next after 3 is index 4 */
|
||||
}
|
||||
|
||||
TEST("estore - Operations on empty tree") {
|
||||
fwTreeClear(ft);
|
||||
assert(fwTreePrefixSum(ft, 7) == 0);
|
||||
|
||||
/* Test fwTreeFindNextNonEmpty on empty tree */
|
||||
assert(fwTreeFindNextNonEmpty(ft, -1) == -1); /* Empty tree */
|
||||
assert(fwTreeFindNextNonEmpty(ft, 0) == -1); /* Empty tree */
|
||||
}
|
||||
|
||||
fwTreeDestroy(ft);
|
||||
|
||||
TEST("estore - misc") {
|
||||
ft = fwTreeCreate(0);
|
||||
|
||||
fwTreeUpdate(ft, 0, 10); /* add 10 to index 0 */
|
||||
assert(fwTreePrefixSum(ft, 0) == 10);
|
||||
|
||||
assert(fwTreeFindIndex(ft, 5) == 0);
|
||||
|
||||
/* Test fwTreeFindNextNonEmpty on single element tree */
|
||||
assert(fwTreeFindNextNonEmpty(ft, -1) == -1); /* Invalid index */
|
||||
assert(fwTreeFindNextNonEmpty(ft, 0) == -1); /* No next after 0 in single element tree */
|
||||
|
||||
fwTreeDestroy(ft);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
#endif
|
||||
71
src/fwtree.h
Normal file
71
src/fwtree.h
Normal file
@@ -0,0 +1,71 @@
|
||||
/*
|
||||
* Copyright (c) 2011-Present, Redis Ltd.
|
||||
* All rights reserved.
|
||||
*
|
||||
* Licensed under your choice of (a) the Redis Source Available License 2.0
|
||||
* (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the
|
||||
* GNU Affero General Public License v3 (AGPLv3).
|
||||
*
|
||||
*
|
||||
* FENWICK TREE (Binary Indexed Tree)
|
||||
* ----------------------------------
|
||||
* A Fenwick tree is a data structure that efficiently supports:
|
||||
* - Point updates: Add/subtract values at specific indices in O(log n) time
|
||||
* - Prefix sum queries: Calculate cumulative sums from index 0 to any index in O(log n) time
|
||||
* - Range queries: Calculate sums over any range [i,j] in O(log n) time
|
||||
* - Space complexity: O(n)
|
||||
*
|
||||
* USAGE IN REDIS
|
||||
* --------------
|
||||
* This implementation is used by KVSTORE and ESTORE to efficiently track:
|
||||
* - Cumulative key counts across dictionary slots (KVSTORE)
|
||||
* - Cumulative item counts across expiration buckets (ESTORE)
|
||||
*
|
||||
* This enables efficient operations like:
|
||||
* - Finding which dictionary/bucket contains the Nth key/item
|
||||
* - Iterating through non-empty dictionaries/buckets
|
||||
* - Load balancing and random key selection
|
||||
*
|
||||
* IMPLEMENTATION NOTES
|
||||
* -------------------
|
||||
* - The tree uses 1-based indexing internally for mathematical convenience
|
||||
* - The public API uses 0-based indexing for consistency with Redis codebase
|
||||
* - Tree size must be a power of 2 (specified as sizeBits where size = 2^sizeBits)
|
||||
* - All operations have O(log n) time complexity where n is the tree size
|
||||
*
|
||||
* REFERENCES
|
||||
* ----------
|
||||
* For more details on Fenwick trees: https://en.wikipedia.org/wiki/Fenwick_tree
|
||||
*/
|
||||
|
||||
#ifndef __FWTREE_H
|
||||
#define __FWTREE_H
|
||||
|
||||
#include <stdint.h>
|
||||
|
||||
/* Forward declaration of the fenwick tree structure */
|
||||
typedef struct _fenwickTree fenwickTree;
|
||||
|
||||
/* Fenwick Tree API */
|
||||
|
||||
fenwickTree *fwTreeCreate(int sizeBits);
|
||||
|
||||
void fwTreeDestroy(fenwickTree *ft);
|
||||
|
||||
unsigned long long fwTreePrefixSum(fenwickTree *ft, int idx);
|
||||
|
||||
void fwTreeUpdate(fenwickTree *ft, int idx, long long delta);
|
||||
|
||||
int fwTreeFindIndex(fenwickTree *ft, unsigned long long target);
|
||||
|
||||
int fwTreeFindFirstNonEmpty(fenwickTree *ft);
|
||||
|
||||
int fwTreeFindNextNonEmpty(fenwickTree *ft, int idx);
|
||||
|
||||
void fwTreeClear(fenwickTree *ft);
|
||||
|
||||
#ifdef REDIS_TEST
|
||||
int fwtreeTest(int argc, char *argv[], int flags);
|
||||
#endif
|
||||
|
||||
#endif /* __FWTREE_H */
|
||||
@@ -1,14 +1,4 @@
|
||||
/*
|
||||
* Index-based KV store implementation
|
||||
* This file implements a KV store comprised of an array of dicts (see dict.c)
|
||||
* The purpose of this KV store is to have easy access to all keys that belong
|
||||
* in the same dict (i.e. are in the same dict-index)
|
||||
*
|
||||
* For example, when Redis is running in cluster mode, we use kvstore to save
|
||||
* all keys that map to the same hash-slot in a separate dict within the kvstore
|
||||
* struct.
|
||||
* This enables us to easily access all keys that map to a specific hash-slot.
|
||||
*
|
||||
* Copyright (c) 2011-Present, Redis Ltd. and contributors.
|
||||
* All rights reserved.
|
||||
*
|
||||
@@ -29,8 +19,11 @@
|
||||
|
||||
#include "zmalloc.h"
|
||||
#include "kvstore.h"
|
||||
#include "fwtree.h"
|
||||
#include "redisassert.h"
|
||||
#include "monotonic.h"
|
||||
#include "server.h"
|
||||
|
||||
|
||||
#define UNUSED(V) ((void) V)
|
||||
|
||||
@@ -46,7 +39,7 @@ struct _kvstore {
|
||||
int non_empty_dicts; /* The number of non-empty dicts. */
|
||||
unsigned long long key_count; /* Total number of keys in this kvstore. */
|
||||
unsigned long long bucket_count; /* Total number of buckets in this kvstore across dictionaries. */
|
||||
unsigned long long *dict_size_index; /* Binary indexed tree (BIT) that describes cumulative key frequencies up until given dict-index. */
|
||||
fenwickTree *dict_sizes; /* Binary indexed tree (BIT) that describes cumulative key frequencies up until given dict-index. */
|
||||
size_t overhead_hashtable_rehashing; /* The overhead of dictionaries rehashing. */
|
||||
void *metadata[]; /* conditionally allocated based on "flags" */
|
||||
};
|
||||
@@ -97,22 +90,6 @@ static int kvstoreDictIsRehashingPaused(kvstore *kvs, int didx)
|
||||
return d ? dictIsRehashingPaused(d) : 0;
|
||||
}
|
||||
|
||||
/* Returns total (cumulative) number of keys up until given dict-index (inclusive).
|
||||
* Time complexity is O(log(kvs->num_dicts)). */
|
||||
static unsigned long long cumulativeKeyCountRead(kvstore *kvs, int didx) {
|
||||
if (kvs->num_dicts == 1) {
|
||||
assert(didx == 0);
|
||||
return kvstoreSize(kvs);
|
||||
}
|
||||
int idx = didx + 1;
|
||||
unsigned long long sum = 0;
|
||||
while (idx > 0) {
|
||||
sum += kvs->dict_size_index[idx];
|
||||
idx -= (idx & -idx);
|
||||
}
|
||||
return sum;
|
||||
}
|
||||
|
||||
static void addDictIndexToCursor(kvstore *kvs, int didx, unsigned long long *cursor) {
|
||||
if (kvs->num_dicts == 1)
|
||||
return;
|
||||
@@ -130,11 +107,7 @@ static int getAndClearDictIndexFromCursor(kvstore *kvs, unsigned long long *curs
|
||||
return didx;
|
||||
}
|
||||
|
||||
/* Updates binary index tree (also known as Fenwick tree), increasing key count for a given dict.
|
||||
* You can read more about this data structure here https://en.wikipedia.org/wiki/Fenwick_tree
|
||||
* Time complexity is O(log(kvs->num_dicts)). Take care to call it only after
|
||||
* adding or removing keys from the kvstore.
|
||||
*/
|
||||
/* Updates binary index tree (Fenwick tree), updates key count for a given dict */
|
||||
static void cumulativeKeyCountAdd(kvstore *kvs, int didx, long delta) {
|
||||
kvs->key_count += delta;
|
||||
|
||||
@@ -150,14 +123,7 @@ static void cumulativeKeyCountAdd(kvstore *kvs, int didx, long delta) {
|
||||
return;
|
||||
|
||||
/* Update the BIT */
|
||||
int idx = didx + 1; /* Unlike dict indices, BIT is 1-based, so we need to add 1. */
|
||||
while (idx <= kvs->num_dicts) {
|
||||
if (delta < 0) {
|
||||
assert(kvs->dict_size_index[idx] >= (unsigned long long)labs(delta));
|
||||
}
|
||||
kvs->dict_size_index[idx] += delta;
|
||||
idx += (idx & -idx);
|
||||
}
|
||||
fwTreeUpdate(kvs->dict_sizes, didx, delta);
|
||||
}
|
||||
|
||||
/* Create the dict if it does not exist and return it. */
|
||||
@@ -294,7 +260,7 @@ kvstore *kvstoreCreate(dictType *type, int num_dicts_bits, int flags) {
|
||||
kvs->key_count = 0;
|
||||
kvs->non_empty_dicts = 0;
|
||||
kvs->resize_cursor = 0;
|
||||
kvs->dict_size_index = kvs->num_dicts > 1? zcalloc(sizeof(unsigned long long) * (kvs->num_dicts + 1)) : NULL;
|
||||
kvs->dict_sizes = kvs->num_dicts > 1 ? fwTreeCreate(kvs->num_dicts_bits) : NULL;
|
||||
kvs->bucket_count = 0;
|
||||
kvs->overhead_hashtable_rehashing = 0;
|
||||
return kvs;
|
||||
@@ -325,8 +291,8 @@ void kvstoreEmpty(kvstore *kvs, void(callback)(dict*)) {
|
||||
kvs->non_empty_dicts = 0;
|
||||
kvs->resize_cursor = 0;
|
||||
kvs->bucket_count = 0;
|
||||
if (kvs->dict_size_index)
|
||||
memset(kvs->dict_size_index, 0, sizeof(unsigned long long) * (kvs->num_dicts + 1));
|
||||
if (kvs->dict_sizes)
|
||||
fwTreeClear(kvs->dict_sizes);
|
||||
kvs->overhead_hashtable_rehashing = 0;
|
||||
}
|
||||
|
||||
@@ -343,18 +309,14 @@ void kvstoreRelease(kvstore *kvs) {
|
||||
zfree(kvs->dicts);
|
||||
|
||||
listRelease(kvs->rehashing);
|
||||
if (kvs->dict_size_index)
|
||||
zfree(kvs->dict_size_index);
|
||||
if (kvs->dict_sizes)
|
||||
fwTreeDestroy(kvs->dict_sizes);
|
||||
|
||||
zfree(kvs);
|
||||
}
|
||||
|
||||
unsigned long long int kvstoreSize(kvstore *kvs) {
|
||||
if (kvs->num_dicts != 1) {
|
||||
return kvs->key_count;
|
||||
} else {
|
||||
return kvs->dicts[0]? dictSize(kvs->dicts[0]) : 0;
|
||||
}
|
||||
return kvs->key_count;
|
||||
}
|
||||
|
||||
/* This method provides the cumulative sum of all the dictionary buckets
|
||||
@@ -542,29 +504,14 @@ int kvstoreFindDictIndexByKeyIndex(kvstore *kvs, unsigned long target) {
|
||||
return 0;
|
||||
assert(target <= kvstoreSize(kvs));
|
||||
|
||||
int result = 0, bit_mask = 1 << kvs->num_dicts_bits;
|
||||
for (int i = bit_mask; i != 0; i >>= 1) {
|
||||
int current = result + i;
|
||||
/* When the target index is greater than 'current' node value the we will update
|
||||
* the target and search in the 'current' node tree. */
|
||||
if (target > kvs->dict_size_index[current]) {
|
||||
target -= kvs->dict_size_index[current];
|
||||
result = current;
|
||||
}
|
||||
}
|
||||
/* Adjust the result to get the correct dict:
|
||||
* 1. result += 1;
|
||||
* After the calculations, the index of target in dict_size_index should be the next one,
|
||||
* so we should add 1.
|
||||
* 2. result -= 1;
|
||||
* Unlike BIT(dict_size_index is 1-based), dict indices are 0-based, so we need to subtract 1.
|
||||
* As the addition and subtraction cancel each other out, we can simply return the result. */
|
||||
return result;
|
||||
return fwTreeFindIndex(kvs->dict_sizes, target);
|
||||
}
|
||||
|
||||
/* Wrapper for kvstoreFindDictIndexByKeyIndex to get the first non-empty dict index in the kvstore. */
|
||||
int kvstoreGetFirstNonEmptyDictIndex(kvstore *kvs) {
|
||||
return kvstoreFindDictIndexByKeyIndex(kvs, 1);
|
||||
if (kvs->num_dicts == 1 || kvstoreSize(kvs) == 0)
|
||||
return 0;
|
||||
return fwTreeFindFirstNonEmpty(kvs->dict_sizes);
|
||||
}
|
||||
|
||||
/* Returns next non-empty dict index strictly after given one, or -1 if provided didx is the last one. */
|
||||
@@ -573,8 +520,7 @@ int kvstoreGetNextNonEmptyDictIndex(kvstore *kvs, int didx) {
|
||||
assert(didx == 0);
|
||||
return -1;
|
||||
}
|
||||
unsigned long long next_key = cumulativeKeyCountRead(kvs, didx) + 1;
|
||||
return next_key <= kvstoreSize(kvs) ? kvstoreFindDictIndexByKeyIndex(kvs, next_key) : -1;
|
||||
return fwTreeFindNextNonEmpty(kvs->dict_sizes, didx);
|
||||
}
|
||||
|
||||
int kvstoreNumNonEmptyDicts(kvstore *kvs) {
|
||||
|
||||
@@ -9,6 +9,17 @@
|
||||
* (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the
|
||||
* GNU Affero General Public License v3 (AGPLv3).
|
||||
*
|
||||
* KVSTORE
|
||||
* -------
|
||||
* Index-based KV store implementation. This file implements a KV store comprised
|
||||
* of an array of dicts (see dict.c) The purpose of this KV store is to have easy
|
||||
* access to all keys that belong in the same dict (i.e. are in the same dict-index)
|
||||
*
|
||||
* For example, when Redis is running in cluster mode, we use kvstore to save
|
||||
* all keys that map to the same hash-slot in a separate dict within the kvstore
|
||||
* struct.
|
||||
* This enables us to easily access all keys that map to a specific hash-slot.
|
||||
*
|
||||
* Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information.
|
||||
*/
|
||||
|
||||
|
||||
@@ -23,8 +23,8 @@ void lazyfreeFreeObject(void *args[]) {
|
||||
void lazyfreeFreeDatabase(void *args[]) {
|
||||
kvstore *da1 = args[0];
|
||||
kvstore *da2 = args[1];
|
||||
ebuckets oldHfe = args[2];
|
||||
ebDestroy(&oldHfe, &hashExpireBucketsType, NULL);
|
||||
estore *subexpires = args[2];
|
||||
estoreRelease(subexpires);
|
||||
size_t numkeys = kvstoreSize(da1);
|
||||
kvstoreRelease(da1);
|
||||
kvstoreRelease(da2);
|
||||
@@ -206,12 +206,12 @@ void emptyDbAsync(redisDb *db) {
|
||||
flags |= KVSTORE_FREE_EMPTY_DICTS;
|
||||
}
|
||||
kvstore *oldkeys = db->keys, *oldexpires = db->expires;
|
||||
ebuckets oldHfe = db->hexpires;
|
||||
estore *oldsubexpires = db->subexpires;
|
||||
db->keys = kvstoreCreate(&dbDictType, slot_count_bits, flags | KVSTORE_ALLOC_META_KEYS_HIST);
|
||||
db->expires = kvstoreCreate(&dbExpiresDictType, slot_count_bits, flags);
|
||||
db->hexpires = ebCreate();
|
||||
db->subexpires = estoreCreate(&subexpiresBucketsType, slot_count_bits);
|
||||
atomicIncr(lazyfree_objects, kvstoreSize(oldkeys));
|
||||
bioCreateLazyFreeJob(lazyfreeFreeDatabase, 3, oldkeys, oldexpires, oldHfe);
|
||||
bioCreateLazyFreeJob(lazyfreeFreeDatabase, 3, oldkeys, oldexpires, oldsubexpires);
|
||||
}
|
||||
|
||||
/* Free the key tracking table.
|
||||
|
||||
18
src/rdb.c
18
src/rdb.c
@@ -2157,7 +2157,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, int dbid, int *error)
|
||||
|
||||
/* Too many entries? Use a hash table right from the start. */
|
||||
if (len > server.hash_max_listpack_entries)
|
||||
hashTypeConvert(o, OBJ_ENCODING_HT, NULL);
|
||||
hashTypeConvert(NULL, o, OBJ_ENCODING_HT);
|
||||
else if (deep_integrity_validation) {
|
||||
/* In this mode, we need to guarantee that the server won't crash
|
||||
* later when the ziplist is converted to a dict.
|
||||
@@ -2201,7 +2201,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, int dbid, int *error)
|
||||
sdslen(value) > server.hash_max_listpack_value ||
|
||||
!lpSafeToAdd(o->ptr, hfieldlen(field) + sdslen(value)))
|
||||
{
|
||||
hashTypeConvert(o, OBJ_ENCODING_HT, NULL);
|
||||
hashTypeConvert(NULL, o, OBJ_ENCODING_HT);
|
||||
dictUseStoredKeyApi((dict *)o->ptr, 1);
|
||||
ret = dictAdd((dict*)o->ptr, field, value);
|
||||
dictUseStoredKeyApi((dict *)o->ptr, 0);
|
||||
@@ -2299,11 +2299,11 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, int dbid, int *error)
|
||||
o = createHashObject();
|
||||
/* Too many entries? Use a hash table right from the start. */
|
||||
if (len > server.hash_max_listpack_entries) {
|
||||
hashTypeConvert(o, OBJ_ENCODING_HT, NULL);
|
||||
hashTypeConvert(NULL, o, OBJ_ENCODING_HT);
|
||||
dictTypeAddMeta((dict**)&o->ptr, &mstrHashDictTypeWithHFE);
|
||||
initDictExpireMetadata(o);
|
||||
} else {
|
||||
hashTypeConvert(o, OBJ_ENCODING_LISTPACK_EX, NULL);
|
||||
hashTypeConvert(NULL, o, OBJ_ENCODING_LISTPACK_EX);
|
||||
if (deep_integrity_validation) {
|
||||
/* In this mode, we need to guarantee that the server won't crash
|
||||
* later when the listpack is converted to a dict.
|
||||
@@ -2388,7 +2388,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, int dbid, int *error)
|
||||
!lpSafeToAdd(((listpackEx*)o->ptr)->lp, hfieldlen(field) + sdslen(value) + lpEntrySizeInteger(expireAt)))
|
||||
{
|
||||
/* convert to hash */
|
||||
hashTypeConvert(o, OBJ_ENCODING_HT, NULL);
|
||||
hashTypeConvert(NULL, o, OBJ_ENCODING_HT);
|
||||
|
||||
if (len > DICT_HT_INITIAL_SIZE) { /* TODO: this is NOT the original len, but this is also the case for simple hash, is this a bug? */
|
||||
if (dictTryExpand(o->ptr, len) != DICT_OK) {
|
||||
@@ -2598,7 +2598,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, int dbid, int *error)
|
||||
if (hashTypeLength(o, 0) > server.hash_max_listpack_entries ||
|
||||
maxlen > server.hash_max_listpack_value)
|
||||
{
|
||||
hashTypeConvert(o, OBJ_ENCODING_HT, NULL);
|
||||
hashTypeConvert(NULL, o, OBJ_ENCODING_HT);
|
||||
}
|
||||
}
|
||||
break;
|
||||
@@ -2735,7 +2735,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, int dbid, int *error)
|
||||
}
|
||||
|
||||
if (hashTypeLength(o, 0) > server.hash_max_listpack_entries)
|
||||
hashTypeConvert(o, OBJ_ENCODING_HT, NULL);
|
||||
hashTypeConvert(NULL, o, OBJ_ENCODING_HT);
|
||||
else
|
||||
o->ptr = lpShrinkToFit(o->ptr);
|
||||
break;
|
||||
@@ -2776,7 +2776,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, int dbid, int *error)
|
||||
/* Convert listpack to hash table without registering in global HFE DS,
|
||||
* if has HFEs, since the listpack is not connected yet to the DB */
|
||||
if (hashTypeLength(o, 0) > server.hash_max_listpack_entries)
|
||||
hashTypeConvert(o, OBJ_ENCODING_HT, NULL /*db->hexpires*/);
|
||||
hashTypeConvert(NULL /*db*/, o, OBJ_ENCODING_HT);
|
||||
|
||||
break;
|
||||
default:
|
||||
@@ -3649,7 +3649,7 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin
|
||||
if (kv->type == OBJ_HASH) {
|
||||
uint64_t minExpiredField = hashTypeGetMinExpire(kv, 1);
|
||||
if (minExpiredField != EB_EXPIRE_TIME_INVALID)
|
||||
hashTypeAddToExpires(db, kv, minExpiredField);
|
||||
estoreAdd(db->subexpires, getKeySlot(key), kv, minExpiredField);
|
||||
}
|
||||
|
||||
/* Set usage information (for eviction). */
|
||||
|
||||
14
src/server.c
14
src/server.c
@@ -28,6 +28,8 @@
|
||||
#include "fmtargs.h"
|
||||
#include "mstr.h"
|
||||
#include "ebuckets.h"
|
||||
#include "fwtree.h"
|
||||
#include "estore.h"
|
||||
|
||||
#include <time.h>
|
||||
#include <signal.h>
|
||||
@@ -2851,7 +2853,7 @@ void initServer(void) {
|
||||
for (j = 0; j < server.dbnum; j++) {
|
||||
server.db[j].keys = kvstoreCreate(&dbDictType, slot_count_bits, flags | KVSTORE_ALLOC_META_KEYS_HIST);
|
||||
server.db[j].expires = kvstoreCreate(&dbExpiresDictType, slot_count_bits, flags);
|
||||
server.db[j].hexpires = ebCreate();
|
||||
server.db[j].subexpires = estoreCreate(&subexpiresBucketsType, slot_count_bits);
|
||||
server.db[j].expires_cursor = 0;
|
||||
server.db[j].blocking_keys = dictCreate(&keylistDictType);
|
||||
server.db[j].blocking_keys_unblock_on_nokey = dictCreate(&objectKeyPointerValueDictType);
|
||||
@@ -6453,16 +6455,16 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) {
|
||||
if (sections++) info = sdscat(info,"\r\n");
|
||||
info = sdscatprintf(info, "# Keyspace\r\n");
|
||||
for (j = 0; j < server.dbnum; j++) {
|
||||
long long keys, vkeys, hexpires;
|
||||
long long keys, vkeys, subexpiry;
|
||||
|
||||
keys = kvstoreSize(server.db[j].keys);
|
||||
vkeys = kvstoreSize(server.db[j].expires);
|
||||
hexpires = ebGetTotalItems(server.db[j].hexpires, &hashExpireBucketsType);
|
||||
subexpiry = estoreSize(server.db[j].subexpires);
|
||||
|
||||
if (keys || vkeys) {
|
||||
info = sdscatprintf(info,
|
||||
"db%d:keys=%lld,expires=%lld,avg_ttl=%lld,subexpiry=%lld\r\n",
|
||||
j, keys, vkeys, server.db[j].avg_ttl, hexpires);
|
||||
"db%d:keys=%lld,expires=%lld,avg_ttl=%lld,subexpiry=%lld\r\n",
|
||||
j, keys, vkeys, server.db[j].avg_ttl, subexpiry);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -7305,6 +7307,8 @@ struct redisTest {
|
||||
{"dict", dictTest},
|
||||
{"listpack", listpackTest},
|
||||
{"kvstore", kvstoreTest},
|
||||
{"fwtree", fwtreeTest},
|
||||
{"estore", estoreTest},
|
||||
{"ebuckets", ebucketsTest},
|
||||
};
|
||||
redisTestProc *getTestProcByName(const char *name) {
|
||||
|
||||
30
src/server.h
30
src/server.h
@@ -51,6 +51,7 @@ typedef long long ustime_t; /* microsecond time type. */
|
||||
#include "ebuckets.h" /* expiry data structure */
|
||||
#include "dict.h" /* Hash tables */
|
||||
#include "kvstore.h" /* Slot-based hash table */
|
||||
#include "estore.h" /* Expiration store */
|
||||
#include "adlist.h" /* Linked lists */
|
||||
#include "zmalloc.h" /* total memory usage aware version of malloc/free */
|
||||
#include "anet.h" /* Networking the easy way */
|
||||
@@ -1111,7 +1112,7 @@ typedef struct replBufBlock {
|
||||
typedef struct redisDb {
|
||||
kvstore *keys; /* The keyspace for this DB. As metadata, holds keysizes histogram */
|
||||
kvstore *expires; /* Timeout of keys with a timeout set */
|
||||
ebuckets hexpires; /* Hash expiration DS. Single TTL per hash (of next min field to expire) */
|
||||
estore *subexpires; /* Timeout of sub-keys with a timeout set. (Currently only used for hashes) */
|
||||
dict *blocking_keys; /* Keys with clients waiting for data (BLPOP)*/
|
||||
dict *blocking_keys_unblock_on_nokey; /* Keys with clients waiting for
|
||||
* data, and should be unblocked if key is deleted (XREADEDGROUP).
|
||||
@@ -2730,7 +2731,7 @@ extern dictType sdsReplyDictType;
|
||||
extern dictType keylistDictType;
|
||||
extern dict *modules;
|
||||
|
||||
extern EbucketsType hashExpireBucketsType; /* global expires */
|
||||
extern EbucketsType subexpiresBucketsType; /* global expires */
|
||||
extern EbucketsType hashFieldExpireBucketsType; /* local per hash */
|
||||
|
||||
/*-----------------------------------------------------------------------------
|
||||
@@ -3449,10 +3450,9 @@ robj *setTypeDup(robj *o);
|
||||
* and metadata fields for hash field expiration.*/
|
||||
typedef struct listpackEx {
|
||||
ExpireMeta meta; /* To be used in order to register the hash in the
|
||||
global ebuckets (i.e. db->hexpires) with next,
|
||||
minimum, hash-field to expire. TTL value might be
|
||||
inaccurate up-to few seconds due to optimization
|
||||
consideration. */
|
||||
global ebuckets subexpires with next, minimum,
|
||||
hash-field to expire. TTL value might be inaccurate
|
||||
up-to few seconds due to optimization consideration. */
|
||||
void *lp; /* listpack that contains 'key-value-ttl' tuples which
|
||||
are ordered by ttl. */
|
||||
} listpackEx;
|
||||
@@ -3462,10 +3462,9 @@ typedef struct listpackEx {
|
||||
typedef struct dictExpireMetadata {
|
||||
ExpireMeta expireMeta; /* embedded ExpireMeta in dict.
|
||||
To be used in order to register the hash in the
|
||||
global ebuckets (i.e db->hexpires) with next,
|
||||
minimum, hash-field to expire. TTL value might be
|
||||
inaccurate up-to few seconds due to optimization
|
||||
consideration. */
|
||||
subexpires DB with next minimum hash-field to expire.
|
||||
TTL value might be inaccurate up-to few seconds due
|
||||
to optimization consideration. */
|
||||
ebuckets hfe; /* DS of Hash Fields Expiration, associated to each hash */
|
||||
} dictExpireMetadata;
|
||||
|
||||
@@ -3485,8 +3484,8 @@ typedef struct dictExpireMetadata {
|
||||
#define HFE_LAZY_ACCESS_EXPIRED (1<<4) /* Avoid lazy expire and allow access to expired fields */
|
||||
#define HFE_LAZY_NO_UPDATE_KEYSIZES (1<<5) /* If field lazy deleted, avoid updating keysizes histogram */
|
||||
|
||||
void hashTypeConvert(robj *o, int enc, ebuckets *hexpires);
|
||||
void hashTypeTryConversion(redisDb *db, robj *subject, robj **argv, int start, int end);
|
||||
void hashTypeConvert(redisDb *db, robj *o, int enc);
|
||||
void hashTypeTryConversion(redisDb *db, kvobj *kv, robj **argv, int start, int end);
|
||||
int hashTypeExists(redisDb *db, kvobj *kv, sds field, int hfeFlags, int *isHashDeleted);
|
||||
int hashTypeDelete(robj *o, void *key, int isSdsField);
|
||||
unsigned long hashTypeLength(const robj *o, int subtractExpiredFields);
|
||||
@@ -3506,10 +3505,9 @@ sds hashTypeCurrentObjectNewSds(hashTypeIterator *hi, int what);
|
||||
hfield hashTypeCurrentObjectNewHfield(hashTypeIterator *hi);
|
||||
int hashTypeGetValueObject(redisDb *db, kvobj *kv, sds field, int hfeFlags,
|
||||
robj **val, uint64_t *expireTime, int *isHashDeleted);
|
||||
int hashTypeSet(redisDb *db, robj *o, sds field, sds value, int flags);
|
||||
int hashTypeSet(redisDb *db, kvobj *kv, sds field, sds value, int flags);
|
||||
robj *hashTypeDup(kvobj *kv, uint64_t *minHashExpire);
|
||||
uint64_t hashTypeRemoveFromExpires(ebuckets *hexpires, robj *o);
|
||||
void hashTypeAddToExpires(redisDb *db, kvobj *hashObj, uint64_t expireTime);
|
||||
uint64_t hashTypeActiveExpire(redisDb *db, kvobj *o, uint32_t *quota, int updateSubexpires);
|
||||
void hashTypeFree(robj *o);
|
||||
int hashTypeIsExpired(const robj *o, uint64_t expireAt);
|
||||
unsigned char *hashTypeListpackGetLp(robj *o);
|
||||
@@ -3848,7 +3846,7 @@ void expireSlaveKeys(void);
|
||||
void rememberSlaveKeyWithExpire(redisDb *db, sds key);
|
||||
void flushSlaveKeysWithExpireList(void);
|
||||
size_t getSlaveKeyWithExpireCount(void);
|
||||
uint64_t hashTypeDbActiveExpire(redisDb *db, uint32_t maxFieldsToExpire);
|
||||
uint64_t activeSubexpires(redisDb *db, int slot, uint32_t maxFieldsToExpire);
|
||||
|
||||
/* evict.c -- maxmemory handling and LRU eviction. */
|
||||
void evictionPoolAlloc(void);
|
||||
|
||||
226
src/t_hash.c
226
src/t_hash.c
@@ -8,6 +8,7 @@
|
||||
*/
|
||||
|
||||
#include "server.h"
|
||||
#include "redisassert.h"
|
||||
#include "ebuckets.h"
|
||||
#include <math.h>
|
||||
|
||||
@@ -37,21 +38,12 @@ typedef enum GetFieldRes {
|
||||
* it was the last field in the hash. */
|
||||
} GetFieldRes;
|
||||
|
||||
/* ActiveExpireCtx passed to hashTypeActiveExpire() */
|
||||
typedef struct ExpireCtx {
|
||||
uint32_t fieldsToExpireQuota;
|
||||
redisDb *db;
|
||||
} ExpireCtx;
|
||||
|
||||
typedef listpackEntry CommonEntry; /* extend usage beyond lp */
|
||||
|
||||
/* hash field expiration (HFE) funcs */
|
||||
static ExpireAction onFieldExpire(eItem item, void *ctx);
|
||||
static ExpireMeta* hfieldGetExpireMeta(const eItem field);
|
||||
static ExpireMeta *hashGetExpireMeta(const eItem hash);
|
||||
static void hexpireGenericCommand(client *c, long long basetime, int unit);
|
||||
static ExpireAction hashTypeActiveExpire(eItem hashObj, void *ctx);
|
||||
static uint64_t hashTypeExpire(kvobj *kv, ExpireCtx *expireCtx, int updateGlobalHFE);
|
||||
static void hfieldPersist(robj *hashObj, hfield field);
|
||||
static void propagateHashFieldDeletion(redisDb *db, sds key, char *field, size_t fieldLen);
|
||||
|
||||
@@ -104,16 +96,11 @@ dictType mstrHashDictTypeWithHFE = {
|
||||
* private ebuckets DS. In order to support HFE active expire cycle across hash
|
||||
* instances, hashes with associated HFE will be also registered in a global
|
||||
* ebuckets DS with expiration time value that reflects their next minimum
|
||||
* time to expire. The global HFE Active expiration will be triggered from
|
||||
* activeExpireCycle() function and will invoke "local" HFE Active expiration
|
||||
* for each hash instance that has expired fields.
|
||||
*
|
||||
* hashExpireBucketsType - ebuckets-type to be used at the global space
|
||||
* (db->hexpires) to register hashes that have one or more fields with time-Expiration.
|
||||
* The hashes will be registered in with the expiration time of the earliest field
|
||||
* in the hash.
|
||||
* time to expire (db->subexpires). The global HFE Active expiration will be
|
||||
* triggered from activeExpireCycle() function and in turn will invoke "local"
|
||||
* HFE Active sub-expiration for each hash instance that has expired fields.
|
||||
*----------------------------------------------------------------------------*/
|
||||
EbucketsType hashExpireBucketsType = {
|
||||
EbucketsType subexpiresBucketsType = {
|
||||
.onDeleteItem = NULL,
|
||||
.getExpireMeta = hashGetExpireMeta, /* get ExpireMeta attached to each hash */
|
||||
.itemsAddrAreOdd = 0, /* Addresses of dict are even */
|
||||
@@ -603,7 +590,7 @@ unsigned char *hashTypeListpackGetLp(robj *o) {
|
||||
/* Check the length of a number of objects to see if we need to convert a
|
||||
* listpack to a real hash. Note that we only check string encoded objects
|
||||
* as their string length can be queried in constant time. */
|
||||
void hashTypeTryConversion(redisDb *db, robj *o, robj **argv, int start, int end) {
|
||||
void hashTypeTryConversion(redisDb *db, kvobj *o, robj **argv, int start, int end) {
|
||||
int i;
|
||||
size_t sum = 0;
|
||||
|
||||
@@ -615,7 +602,7 @@ void hashTypeTryConversion(redisDb *db, robj *o, robj **argv, int start, int end
|
||||
* might over allocate memory if there are duplicates. */
|
||||
size_t new_fields = (end - start + 1) / 2;
|
||||
if (new_fields > server.hash_max_listpack_entries) {
|
||||
hashTypeConvert(o, OBJ_ENCODING_HT, &db->hexpires);
|
||||
hashTypeConvert(db, o, OBJ_ENCODING_HT);
|
||||
dictExpand(o->ptr, new_fields);
|
||||
return;
|
||||
}
|
||||
@@ -625,13 +612,14 @@ void hashTypeTryConversion(redisDb *db, robj *o, robj **argv, int start, int end
|
||||
continue;
|
||||
size_t len = sdslen(argv[i]->ptr);
|
||||
if (len > server.hash_max_listpack_value) {
|
||||
hashTypeConvert(o, OBJ_ENCODING_HT, &db->hexpires);
|
||||
hashTypeConvert(db, o, OBJ_ENCODING_HT);
|
||||
return;
|
||||
}
|
||||
sum += len;
|
||||
}
|
||||
if (!lpSafeToAdd(hashTypeListpackGetLp(o), sum))
|
||||
hashTypeConvert(o, OBJ_ENCODING_HT, &db->hexpires);
|
||||
if (!lpSafeToAdd(hashTypeListpackGetLp(o), sum)) {
|
||||
hashTypeConvert(db, o, OBJ_ENCODING_HT);
|
||||
}
|
||||
}
|
||||
|
||||
/* Get the value from a listpack encoded hash, identified by field. */
|
||||
@@ -889,7 +877,7 @@ int hashTypeExists(redisDb *db, kvobj *o, sds field, int hfeFlags, int *isHashDe
|
||||
#define HASH_SET_TAKE_FIELD (1<<0)
|
||||
#define HASH_SET_TAKE_VALUE (1<<1)
|
||||
#define HASH_SET_KEEP_TTL (1<<2)
|
||||
int hashTypeSet(redisDb *db, robj *o, sds field, sds value, int flags) {
|
||||
int hashTypeSet(redisDb *db, kvobj *o, sds field, sds value, int flags) {
|
||||
int update = 0;
|
||||
|
||||
/* Check if the field is too long for listpack, and convert before adding the item.
|
||||
@@ -898,7 +886,7 @@ int hashTypeSet(redisDb *db, robj *o, sds field, sds value, int flags) {
|
||||
if (o->encoding == OBJ_ENCODING_LISTPACK ||
|
||||
o->encoding == OBJ_ENCODING_LISTPACK_EX) {
|
||||
if (sdslen(field) > server.hash_max_listpack_value || sdslen(value) > server.hash_max_listpack_value)
|
||||
hashTypeConvert(o, OBJ_ENCODING_HT, &db->hexpires);
|
||||
hashTypeConvert(db, o, OBJ_ENCODING_HT);
|
||||
}
|
||||
|
||||
if (o->encoding == OBJ_ENCODING_LISTPACK) {
|
||||
@@ -932,7 +920,7 @@ int hashTypeSet(redisDb *db, robj *o, sds field, sds value, int flags) {
|
||||
|
||||
/* Check if the listpack needs to be converted to a hash table */
|
||||
if (hashTypeLength(o, 0) > server.hash_max_listpack_entries)
|
||||
hashTypeConvert(o, OBJ_ENCODING_HT, &db->hexpires);
|
||||
hashTypeConvert(db, o, OBJ_ENCODING_HT);
|
||||
} else if (o->encoding == OBJ_ENCODING_LISTPACK_EX) {
|
||||
unsigned char *fptr = NULL, *vptr = NULL, *tptr = NULL;
|
||||
listpackEx *lpt = o->ptr;
|
||||
@@ -971,7 +959,7 @@ int hashTypeSet(redisDb *db, robj *o, sds field, sds value, int flags) {
|
||||
|
||||
/* Check if the listpack needs to be converted to a hash table */
|
||||
if (hashTypeLength(o, 0) > server.hash_max_listpack_entries)
|
||||
hashTypeConvert(o, OBJ_ENCODING_HT, &db->hexpires);
|
||||
hashTypeConvert(db, o, OBJ_ENCODING_HT);
|
||||
|
||||
} else if (o->encoding == OBJ_ENCODING_HT) {
|
||||
dict *ht = o->ptr;
|
||||
@@ -1156,7 +1144,7 @@ int hashTypeSetExInit(robj *key, kvobj *o, client *c, redisDb *db,
|
||||
|
||||
/* Take care that HASH support expiration */
|
||||
if (o->encoding == OBJ_ENCODING_LISTPACK) {
|
||||
hashTypeConvert(o, OBJ_ENCODING_LISTPACK_EX, &c->db->hexpires);
|
||||
hashTypeConvert(c->db, o, OBJ_ENCODING_LISTPACK_EX);
|
||||
} else if (o->encoding == OBJ_ENCODING_HT) {
|
||||
/* Take care dict has HFE metadata */
|
||||
if (!isDictWithMetaHFE(ht)) {
|
||||
@@ -1209,10 +1197,16 @@ void hashTypeSetExDone(HashTypeSetEx *ex) {
|
||||
(ex->minExpire - newMinExpire) : (newMinExpire - ex->minExpire);
|
||||
if (diff < HASH_NEW_EXPIRE_DIFF_THRESHOLD) return;
|
||||
|
||||
if (ex->minExpire != EB_EXPIRE_TIME_INVALID)
|
||||
ebRemove(&ex->db->hexpires, &hashExpireBucketsType, ex->hashObj);
|
||||
if (newMinExpire != EB_EXPIRE_TIME_INVALID)
|
||||
ebAdd(&ex->db->hexpires, &hashExpireBucketsType, ex->hashObj, newMinExpire);
|
||||
int slot = getKeySlot(ex->key->ptr);
|
||||
if (ex->minExpire != EB_EXPIRE_TIME_INVALID) {
|
||||
if (newMinExpire != EB_EXPIRE_TIME_INVALID)
|
||||
estoreUpdate(ex->db->subexpires, slot, ex->hashObj, newMinExpire);
|
||||
else
|
||||
estoreRemove(ex->db->subexpires, slot, ex->hashObj);
|
||||
} else {
|
||||
if (newMinExpire != EB_EXPIRE_TIME_INVALID)
|
||||
estoreAdd(ex->db->subexpires, slot, ex->hashObj, newMinExpire);
|
||||
}
|
||||
}
|
||||
|
||||
/* Delete an element from a hash.
|
||||
@@ -1591,21 +1585,25 @@ void hashTypeConvertListpack(robj *o, int enc) {
|
||||
}
|
||||
}
|
||||
|
||||
void hashTypeConvertListpackEx(robj *o, int enc, ebuckets *hexpires) {
|
||||
/* db can be NULL to avoid registration in subexpires */
|
||||
void hashTypeConvertListpackEx(redisDb *db, robj *o, int enc) {
|
||||
serverAssert(o->encoding == OBJ_ENCODING_LISTPACK_EX);
|
||||
|
||||
if (enc == OBJ_ENCODING_LISTPACK_EX) {
|
||||
return;
|
||||
} else if (enc == OBJ_ENCODING_HT) {
|
||||
int ret;
|
||||
uint64_t minExpire = EB_EXPIRE_TIME_INVALID;
|
||||
int ret, slot = -1;
|
||||
hashTypeIterator *hi;
|
||||
dict *dict;
|
||||
dictExpireMetadata *dictExpireMeta;
|
||||
listpackEx *lpt = o->ptr;
|
||||
uint64_t minExpire = hashTypeGetMinExpire(o, 0);
|
||||
|
||||
if (hexpires && lpt->meta.trash != 1)
|
||||
ebRemove(hexpires, &hashExpireBucketsType, o);
|
||||
if (db && lpt->meta.trash != 1) {
|
||||
minExpire = hashTypeGetMinExpire(o, 0);
|
||||
slot = getKeySlot(kvobjGetKey(o));
|
||||
estoreRemove(db->subexpires, slot, o);
|
||||
}
|
||||
|
||||
dict = dictCreate(&mstrHashDictTypeWithHFE);
|
||||
dictExpand(dict,hashTypeLength(o, 0));
|
||||
@@ -1640,19 +1638,19 @@ void hashTypeConvertListpackEx(robj *o, int enc, ebuckets *hexpires) {
|
||||
o->encoding = OBJ_ENCODING_HT;
|
||||
o->ptr = dict;
|
||||
|
||||
if (hexpires && minExpire != EB_EXPIRE_TIME_INVALID)
|
||||
ebAdd(hexpires, &hashExpireBucketsType, o, minExpire);
|
||||
if (minExpire != EB_EXPIRE_TIME_INVALID)
|
||||
estoreAdd(db->subexpires, slot, o, minExpire);
|
||||
} else {
|
||||
serverPanic("Unknown hash encoding: %d", enc);
|
||||
}
|
||||
}
|
||||
|
||||
/* NOTE: hexpires can be NULL (Won't register in global HFE DS) */
|
||||
void hashTypeConvert(robj *o, int enc, ebuckets *hexpires) {
|
||||
/* NOTE: db can be NULL (Won't register in global HFE DS) */
|
||||
void hashTypeConvert(redisDb *db, robj *o, int enc) {
|
||||
if (o->encoding == OBJ_ENCODING_LISTPACK) {
|
||||
hashTypeConvertListpack(o, enc);
|
||||
} else if (o->encoding == OBJ_ENCODING_LISTPACK_EX) {
|
||||
hashTypeConvertListpackEx(o, enc, hexpires);
|
||||
hashTypeConvertListpackEx(db, o, enc);
|
||||
} else if (o->encoding == OBJ_ENCODING_HT) {
|
||||
serverPanic("Not implemented");
|
||||
} else {
|
||||
@@ -1708,7 +1706,7 @@ robj *hashTypeDup(kvobj *o, uint64_t *minHashExpire) {
|
||||
dictExpireMetaDst->expireMeta.trash = 1; /* mark as trash (as long it wasn't ebAdd()) */
|
||||
|
||||
/* Extract the minimum expire time of the source hash (Will be used by caller
|
||||
* to register the new hash in the global ebuckets, i.e db->hexpires) */
|
||||
* to register the new hash in the global subexpires DB) */
|
||||
if (dictExpireMetaSrc->expireMeta.trash == 0)
|
||||
*minHashExpire = ebGetMetaExpTime(&dictExpireMetaSrc->expireMeta);
|
||||
}
|
||||
@@ -1785,59 +1783,22 @@ void hashTypeRandomElement(robj *hashobj, unsigned long hashsize, CommonEntry *k
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Active expiration of fields in hash
|
||||
*
|
||||
* Called by hashTypeDbActiveExpire() for each hash registered in the HFE DB
|
||||
* (db->hexpires) with an expiration-time less than or equal current time.
|
||||
*
|
||||
* This callback performs the following actions for each hash:
|
||||
* - Delete expired fields as by calling ebExpire(hash)
|
||||
* - If afterward there are future fields to expire, it will update the hash in
|
||||
* HFE DB with the next hash-field minimum expiration time by returning
|
||||
* ACT_UPDATE_EXP_ITEM.
|
||||
* - If the hash has no more fields to expire, it is removed from the HFE DB
|
||||
* by returning ACT_REMOVE_EXP_ITEM.
|
||||
* - If hash has no more fields afterward, it will remove the hash from keyspace.
|
||||
*/
|
||||
static ExpireAction hashTypeActiveExpire(eItem item, void *ctx) {
|
||||
ExpireCtx *expireCtx = ctx;
|
||||
|
||||
/* If no more quota left for this callback, stop */
|
||||
if (expireCtx->fieldsToExpireQuota == 0)
|
||||
return ACT_STOP_ACTIVE_EXP;
|
||||
|
||||
uint64_t nextExpTime = hashTypeExpire((kvobj *) item, expireCtx, 0);
|
||||
|
||||
/* If hash has no more fields to expire or got deleted, indicate
|
||||
* to remove it from HFE DB to the caller ebExpire() */
|
||||
if (nextExpTime == EB_EXPIRE_TIME_INVALID || nextExpTime == 0) {
|
||||
return ACT_REMOVE_EXP_ITEM;
|
||||
} else {
|
||||
/* Hash has more fields to expire. Update next expiration time of the hash
|
||||
* and indicate to add it back to global HFE DS */
|
||||
ebSetMetaExpTime(hashGetExpireMeta(item), nextExpTime);
|
||||
return ACT_UPDATE_EXP_ITEM;
|
||||
}
|
||||
}
|
||||
|
||||
/* Delete all expired fields from the hash and delete the hash if left empty.
|
||||
*
|
||||
* updateGlobalHFE - If the hash should be updated in the global HFE DS with new
|
||||
* updateSubexpires - If the hash should be updated in the subexpires DB with new
|
||||
* expiration time in case expired fields were deleted.
|
||||
*
|
||||
* Return next Expire time of the hash
|
||||
* - 0 if hash got deleted
|
||||
* - EB_EXPIRE_TIME_INVALID if no more fields to expire
|
||||
*/
|
||||
static uint64_t hashTypeExpire(kvobj *o, ExpireCtx *expireCtx, int updateGlobalHFE) {
|
||||
uint64_t hashTypeActiveExpire(redisDb *db, kvobj *o, uint32_t *quota, int updateSubexpires) {
|
||||
uint64_t noExpireLeftRes = EB_EXPIRE_TIME_INVALID;
|
||||
redisDb *db = expireCtx->db;
|
||||
ExpireInfo info = {0};
|
||||
|
||||
if (o->encoding == OBJ_ENCODING_LISTPACK_EX) {
|
||||
info = (ExpireInfo) {
|
||||
.maxToExpire = expireCtx->fieldsToExpireQuota,
|
||||
.maxToExpire = *quota,
|
||||
.now = commandTimeSnapshot(),
|
||||
.itemsExpired = 0};
|
||||
|
||||
@@ -1851,7 +1812,7 @@ static uint64_t hashTypeExpire(kvobj *o, ExpireCtx *expireCtx, int updateGlobalH
|
||||
OnFieldExpireCtx onFieldExpireCtx = { .hashObj = o, .db = db };
|
||||
|
||||
info = (ExpireInfo){
|
||||
.maxToExpire = expireCtx->fieldsToExpireQuota,
|
||||
.maxToExpire = *quota,
|
||||
.onExpireItem = onFieldExpire,
|
||||
.ctx = &onFieldExpireCtx,
|
||||
.now = commandTimeSnapshot()
|
||||
@@ -1861,7 +1822,7 @@ static uint64_t hashTypeExpire(kvobj *o, ExpireCtx *expireCtx, int updateGlobalH
|
||||
}
|
||||
|
||||
/* Update quota left */
|
||||
expireCtx->fieldsToExpireQuota -= info.itemsExpired;
|
||||
*quota -= info.itemsExpired;
|
||||
|
||||
/* In some cases, a field might have been deleted without updating the global DS.
|
||||
* As a result, active-expire might not expire any fields, in such cases,
|
||||
@@ -1870,17 +1831,20 @@ static uint64_t hashTypeExpire(kvobj *o, ExpireCtx *expireCtx, int updateGlobalH
|
||||
sds keystr = kvobjGetKey(o);
|
||||
robj *key = createStringObject(keystr, sdslen(keystr));
|
||||
notifyKeyspaceEvent(NOTIFY_HASH, "hexpired", key, db->id);
|
||||
int slot;
|
||||
|
||||
if (updateGlobalHFE)
|
||||
ebRemove(&db->hexpires, &hashExpireBucketsType, o);
|
||||
if (updateSubexpires) {
|
||||
slot = getKeySlot(keystr);
|
||||
estoreRemove(db->subexpires, slot, o);
|
||||
}
|
||||
|
||||
if (hashTypeLength(o, 0) == 0) {
|
||||
notifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, db->id);
|
||||
dbDelete(db, key);
|
||||
noExpireLeftRes = 0;
|
||||
} else {
|
||||
if ((updateGlobalHFE) && (info.nextExpireTime != EB_EXPIRE_TIME_INVALID))
|
||||
ebAdd(&db->hexpires, &hashExpireBucketsType, o, info.nextExpireTime);
|
||||
if ((updateSubexpires) && (info.nextExpireTime != EB_EXPIRE_TIME_INVALID))
|
||||
estoreAdd(db->subexpires, slot, o, info.nextExpireTime);
|
||||
}
|
||||
|
||||
signalModifiedKey(NULL, db, key);
|
||||
@@ -1913,8 +1877,8 @@ static int hashTypeExpireIfNeeded(redisDb *db, kvobj *o) {
|
||||
return 0;
|
||||
|
||||
/* Take care to expire all the fields */
|
||||
ExpireCtx expireCtx = { .db = db, .fieldsToExpireQuota = UINT32_MAX };
|
||||
nextExpireTime = hashTypeExpire(o, &expireCtx, 1);
|
||||
uint32_t quota = UINT32_MAX;
|
||||
nextExpireTime = hashTypeActiveExpire(db, o, "a, 1);
|
||||
/* return 1 if the entire hash was deleted */
|
||||
return nextExpireTime == 0;
|
||||
}
|
||||
@@ -1971,24 +1935,6 @@ uint64_t hashTypeGetMinExpire(robj *o, int accurate) {
|
||||
}
|
||||
}
|
||||
|
||||
uint64_t hashTypeRemoveFromExpires(ebuckets *hexpires, robj *o) {
|
||||
if (o->encoding == OBJ_ENCODING_LISTPACK) {
|
||||
return EB_EXPIRE_TIME_INVALID;
|
||||
} else if (o->encoding == OBJ_ENCODING_HT) {
|
||||
/* If dict doesn't holds HFE metadata */
|
||||
if (!isDictWithMetaHFE(o->ptr))
|
||||
return EB_EXPIRE_TIME_INVALID;
|
||||
}
|
||||
|
||||
uint64_t expireTime = ebGetExpireTime(&hashExpireBucketsType, o);
|
||||
|
||||
/* If registered in global HFE DS then remove it (not trash) */
|
||||
if (expireTime != EB_EXPIRE_TIME_INVALID)
|
||||
ebRemove(hexpires, &hashExpireBucketsType, o);
|
||||
|
||||
return expireTime;
|
||||
}
|
||||
|
||||
int hashTypeIsFieldsWithExpire(robj *o) {
|
||||
if (o->encoding == OBJ_ENCODING_LISTPACK) {
|
||||
return 0;
|
||||
@@ -2004,62 +1950,6 @@ int hashTypeIsFieldsWithExpire(robj *o) {
|
||||
}
|
||||
}
|
||||
|
||||
/* Add hash to global HFE DS and update key for notifications.
|
||||
*
|
||||
* expireTime - expiration in msec.
|
||||
* If eq. 0 then the hash will be added to the global HFE DS with
|
||||
* the minimum expiration time that is already written in advance
|
||||
* to attached metadata (which considered as trash as long as it is
|
||||
* not attached to global HFE DS).
|
||||
*
|
||||
* Precondition: It is a hash of type listpackex or HT with HFE metadata.
|
||||
*/
|
||||
void hashTypeAddToExpires(redisDb *db, kvobj *hashObj, uint64_t expireTime) {
|
||||
if (expireTime > EB_EXPIRE_TIME_MAX)
|
||||
return;
|
||||
|
||||
if (hashObj->encoding == OBJ_ENCODING_LISTPACK_EX) {
|
||||
listpackEx *lpt = hashObj->ptr;
|
||||
expireTime = (expireTime) ? expireTime : ebGetMetaExpTime(&lpt->meta);
|
||||
ebAdd(&db->hexpires, &hashExpireBucketsType, hashObj, expireTime);
|
||||
} else if (hashObj->encoding == OBJ_ENCODING_HT) {
|
||||
dict *d = hashObj->ptr;
|
||||
if (isDictWithMetaHFE(d)) {
|
||||
dictExpireMetadata *meta = (dictExpireMetadata *) dictMetadata(d);
|
||||
expireTime = (expireTime) ? expireTime : ebGetMetaExpTime(&meta->expireMeta);
|
||||
ebAdd(&db->hexpires, &hashExpireBucketsType, hashObj, expireTime);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* DB active expire and update hashes with time-expiration on fields.
|
||||
*
|
||||
* The callback function hashTypeActiveExpire() is invoked for each hash registered
|
||||
* in the HFE DB (db->hexpires) with an expiration-time less than or equal to the
|
||||
* current time. This callback performs the following actions for each hash:
|
||||
* - If the hash has one or more fields to expire, it will delete those fields.
|
||||
* - If there are more fields to expire, it will update the hash with the next
|
||||
* expiration time in HFE DB.
|
||||
* - If the hash has no more fields to expire, it is removed from the HFE DB.
|
||||
* - If the hash has no more fields, it is removed from the main DB.
|
||||
*
|
||||
* Returns number of fields active-expired.
|
||||
*/
|
||||
uint64_t hashTypeDbActiveExpire(redisDb *db, uint32_t maxFieldsToExpire) {
|
||||
ExpireCtx ctx = { .db = db, .fieldsToExpireQuota = maxFieldsToExpire };
|
||||
ExpireInfo info = {
|
||||
.maxToExpire = UINT64_MAX, /* Only maxFieldsToExpire play a role */
|
||||
.onExpireItem = hashTypeActiveExpire,
|
||||
.ctx = &ctx,
|
||||
.now = commandTimeSnapshot(),
|
||||
.itemsExpired = 0};
|
||||
|
||||
ebExpire(&db->hexpires, &hashExpireBucketsType, &info);
|
||||
|
||||
/* Return number of fields active-expired */
|
||||
return maxFieldsToExpire - ctx.fieldsToExpireQuota;
|
||||
}
|
||||
|
||||
void hashTypeFree(robj *o) {
|
||||
switch (o->encoding) {
|
||||
case OBJ_ENCODING_HT:
|
||||
@@ -2719,7 +2609,7 @@ void hgetdelCommand(client *c) {
|
||||
dbDeleteSkipKeysizesUpdate(c->db, c->argv[1]);
|
||||
notifyKeyspaceEvent(NOTIFY_GENERIC, "del", c->argv[1], c->db->id);
|
||||
} else if (hfe && (hashTypeIsFieldsWithExpire(o) == 0)) { /*is it last HFE*/
|
||||
ebRemove(&c->db->hexpires, &hashExpireBucketsType, o);
|
||||
estoreRemove(c->db->subexpires, getKeySlot(kvobjGetKey(o)), o);
|
||||
}
|
||||
|
||||
if (oldlen != newlen)
|
||||
@@ -2920,7 +2810,7 @@ void hdelCommand(client *c) {
|
||||
notifyKeyspaceEvent(NOTIFY_GENERIC, "del", c->argv[1], c->db->id);
|
||||
} else {
|
||||
if (isHFE && (hashTypeIsFieldsWithExpire(o) == 0)) /* is it last HFE */
|
||||
ebRemove(&c->db->hexpires, &hashExpireBucketsType, o);
|
||||
estoreRemove(c->db->subexpires, getKeySlot(c->argv[1]->ptr), o);
|
||||
newLen = oldLen - deleted;
|
||||
}
|
||||
updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_HASH, oldLen, newLen);
|
||||
@@ -3495,7 +3385,7 @@ static ExpireAction onFieldExpire(eItem item, void *ctx) {
|
||||
|
||||
/* Retrieve the ExpireMeta associated with the hash.
|
||||
* The caller is responsible for ensuring that it is indeed attached. */
|
||||
static ExpireMeta *hashGetExpireMeta(const eItem hash) {
|
||||
ExpireMeta *hashGetExpireMeta(const eItem hash) {
|
||||
robj *hashObj = (robj *)hash;
|
||||
if (hashObj->encoding == OBJ_ENCODING_LISTPACK_EX) {
|
||||
listpackEx *lpt = hashObj->ptr;
|
||||
|
||||
@@ -1287,8 +1287,11 @@ start_server {tags {"external:skip needs:debug"}} {
|
||||
r flushall
|
||||
|
||||
# hash1: 5 fields, 3 with TTL. subexpiry incr +1
|
||||
r hset myhash f1 v1 f2 v2 f3 v3 f4 v4 f5 v5
|
||||
r hpexpire myhash 150 FIELDS 3 f1 f2 f3
|
||||
r hset myhash1 f1 v1 f2 v2 f3 v3 f4 v4 f5 v5
|
||||
r hpexpire myhash1 150 FIELDS 3 f1 f2 f3
|
||||
assert_match [get_stat_subexpiry r] 1
|
||||
# Update hash1, f3 field with earlier TTL. subexpiry no change.
|
||||
r hpexpire myhash1 100 FIELDS 1 f3
|
||||
assert_match [get_stat_subexpiry r] 1
|
||||
|
||||
# hash2: 5 fields, 3 with TTL. subexpiry incr +1
|
||||
@@ -1296,6 +1299,9 @@ start_server {tags {"external:skip needs:debug"}} {
|
||||
assert_match [get_stat_subexpiry r] 1
|
||||
r hpexpire myhash2 100 FIELDS 3 f1 f2 f3
|
||||
assert_match [get_stat_subexpiry r] 2
|
||||
# Update hash2, f3 field with later TTL. subexpiry no change.
|
||||
r hpexpire myhash2 150 FIELDS 1 f3
|
||||
assert_match [get_stat_subexpiry r] 2
|
||||
|
||||
# hash3: 2 fields, 1 with TTL. HDEL field with TTL. subexpiry decr -1
|
||||
r hset myhash3 f1 v1 f2 v2
|
||||
|
||||
Reference in New Issue
Block a user