mirror of
https://github.com/redis/redis.git
synced 2026-04-21 03:01:35 -04:00
Fix H(P)EXPIREAT command to propagate HDEL as well (#13364)
H(P)EXPIREAT command might delete fields in case the absolute time is in the past. Those HDELs need to be propagated as well. In general, as we need to propagate H(P)EXPIRE(AT) command to the replica, each field that is mentioned in the command should be categorized into one of the four options: 1. Managed to update field’s expiration time - propagate it to replica as part of the HPEXPIREAT command. 2. Deleted the field because the time is in the past - propagate also HDEL command to delete the field and remove the field from the propagated HPEXPIREAT. 3. Condition not met for the field - Remove the field from the propagated HPEXPIREAT command. 4. Field does not exists - Remove the field from the propagated HPEXPIREAT command. If none of the provided fields match option number 1, then avoid also propagating the HPEXPIREAT command to the replica. This approach is aligned with the EXPIRE command: If a given key has already expired, then DEL will be propagated instead of EXPIRE command. If condition not met, then command will be rejected. Otherwise, EXPIRE command will be propagated for given key.
This commit is contained in:
85
src/t_hash.c
85
src/t_hash.c
@@ -540,8 +540,10 @@ SetExRes hashTypeSetExpiryListpack(HashTypeSetEx *ex, sds field,
|
||||
ex->minExpireFields = prevExpire;
|
||||
}
|
||||
|
||||
/* if expiration time is in the past */
|
||||
/* If expired, then delete the field and propagate the deletion.
|
||||
* If replica, continue like the field is valid */
|
||||
if (unlikely(checkAlreadyExpired(expireAt))) {
|
||||
propagateHashFieldDeletion(ex->db, ex->key->ptr, field, sdslen(field));
|
||||
hashTypeDelete(ex->hashObj, field, 1);
|
||||
ex->fieldDeleted++;
|
||||
return HSETEX_DELETED;
|
||||
@@ -1034,8 +1036,11 @@ SetExRes hashTypeSetExpiryHT(HashTypeSetEx *exInfo, sds field, uint64_t expireAt
|
||||
dictSetKey(ht, existingEntry, hfNew);
|
||||
|
||||
|
||||
/* if expiration time is in the past */
|
||||
/* If expired, then delete the field and propagate the deletion.
|
||||
* If replica, continue like the field is valid */
|
||||
if (unlikely(checkAlreadyExpired(expireAt))) {
|
||||
/* replicas should not initiate deletion of fields */
|
||||
propagateHashFieldDeletion(exInfo->db, exInfo->key->ptr, field, sdslen(field));
|
||||
hashTypeDelete(exInfo->hashObj, field, 1);
|
||||
exInfo->fieldDeleted++;
|
||||
return HSETEX_DELETED;
|
||||
@@ -1101,12 +1106,7 @@ void initDictExpireMetadata(sds key, robj *o) {
|
||||
m->expireMeta.trash = 1; /* mark as trash (as long it wasn't ebAdd()) */
|
||||
}
|
||||
|
||||
/*
|
||||
* Init HashTypeSetEx struct before calling hashTypeSetEx()
|
||||
*
|
||||
* Don't have to provide client and "cmd". If provided, then notification once
|
||||
* done by function hashTypeSetExDone().
|
||||
*/
|
||||
/* Init HashTypeSetEx struct before calling hashTypeSetEx() */
|
||||
int hashTypeSetExInit(robj *key, robj *o, client *c, redisDb *db, const char *cmd,
|
||||
ExpireSetCond expireSetCond, HashTypeSetEx *ex)
|
||||
{
|
||||
@@ -1123,20 +1123,20 @@ int hashTypeSetExInit(robj *key, robj *o, client *c, redisDb *db, const char *cm
|
||||
ex->minExpireFields = EB_EXPIRE_TIME_INVALID;
|
||||
|
||||
/* Take care that HASH support expiration */
|
||||
if (ex->hashObj->encoding == OBJ_ENCODING_LISTPACK) {
|
||||
hashTypeConvert(ex->hashObj, OBJ_ENCODING_LISTPACK_EX, &c->db->hexpires);
|
||||
if (o->encoding == OBJ_ENCODING_LISTPACK) {
|
||||
hashTypeConvert(o, OBJ_ENCODING_LISTPACK_EX, &c->db->hexpires);
|
||||
|
||||
listpackEx *lpt = ex->hashObj->ptr;
|
||||
listpackEx *lpt = o->ptr;
|
||||
dictEntry *de = dbFind(c->db, key->ptr);
|
||||
serverAssert(de != NULL);
|
||||
lpt->key = dictGetKey(de);
|
||||
} else if (ex->hashObj->encoding == OBJ_ENCODING_HT) {
|
||||
} else if (o->encoding == OBJ_ENCODING_HT) {
|
||||
/* Take care dict has HFE metadata */
|
||||
if (!isDictWithMetaHFE(ht)) {
|
||||
/* Realloc (only header of dict) with metadata for hash-field expiration */
|
||||
dictTypeAddMeta(&ht, &mstrHashDictTypeWithHFE);
|
||||
dictExpireMetadata *m = (dictExpireMetadata *) dictMetadata(ht);
|
||||
ex->hashObj->ptr = ht;
|
||||
o->ptr = ht;
|
||||
|
||||
/* Find the key in the keyspace. Need to keep reference to the key for
|
||||
* notifications or even removal of the hash */
|
||||
@@ -1151,7 +1151,7 @@ int hashTypeSetExInit(robj *key, robj *o, client *c, redisDb *db, const char *cm
|
||||
}
|
||||
|
||||
/* Read minExpire from attached ExpireMeta to the hash */
|
||||
ex->minExpire = hashTypeGetMinExpire(ex->hashObj, 0);
|
||||
ex->minExpire = hashTypeGetMinExpire(o, 0);
|
||||
return C_OK;
|
||||
}
|
||||
|
||||
@@ -3040,11 +3040,34 @@ static void httlGenericCommand(client *c, const char *cmd, long long basetime, i
|
||||
* unit is either UNIT_SECONDS or UNIT_MILLISECONDS, and is only used for
|
||||
* the argv[2] parameter. The basetime is always specified in milliseconds.
|
||||
*
|
||||
* Additional flags are supported and parsed via parseExtendedExpireArguments */
|
||||
* PROPAGATE TO REPLICA:
|
||||
* The command will be translated into HPEXPIREAT and the expiration time will be
|
||||
* converted to absolute time in milliseconds.
|
||||
*
|
||||
* As we need to propagate H(P)EXPIRE(AT) command to the replica, each field that
|
||||
* is mentioned in the command should be categorized into one of the four options:
|
||||
* 1. Field’s expiration time updated successfully - Propagate it to replica as
|
||||
* part of the HPEXPIREAT command.
|
||||
* 2. The field got deleted since the time is in the past - propagate also HDEL
|
||||
* command to delete the field. Also remove the field from the propagated
|
||||
* HPEXPIREAT command.
|
||||
* 3. Condition not met for the field - Remove the field from the propagated
|
||||
* HPEXPIREAT command.
|
||||
* 4. Field doesn't exists - Remove the field from propagated HPEXPIREAT command.
|
||||
*
|
||||
* If none of the provided fields match option #1, that is provided time of the
|
||||
* command is in the past, then avoid propagating the HPEXPIREAT command to the
|
||||
* replica.
|
||||
*
|
||||
* This approach is aligned with existing EXPIRE command. If a given key has already
|
||||
* expired, then DEL will be propagated instead of EXPIRE command. If condition
|
||||
* not met, then command will be rejected. Otherwise, EXPIRE command will be
|
||||
* propagated for given key.
|
||||
*/
|
||||
static void hexpireGenericCommand(client *c, const char *cmd, long long basetime, int unit) {
|
||||
long numFields = 0, numFieldsAt = 4;
|
||||
long long expire; /* unix time in msec */
|
||||
int expireSetCond = 0;
|
||||
int fieldAt, fieldsNotSet = 0, expireSetCond = 0;
|
||||
robj *hashObj, *keyArg = c->argv[1], *expireArg = c->argv[2];
|
||||
|
||||
/* Read the hash object */
|
||||
@@ -3117,14 +3140,38 @@ static void hexpireGenericCommand(client *c, const char *cmd, long long basetime
|
||||
hashTypeSetExInit(keyArg, hashObj, c, c->db, cmd, expireSetCond, &exCtx);
|
||||
addReplyArrayLen(c, numFields);
|
||||
|
||||
for (int i = 0 ; i < numFields ; i++) {
|
||||
sds field = c->argv[numFieldsAt+i+1]->ptr;
|
||||
fieldAt = numFieldsAt + 1;
|
||||
while (fieldAt < c->argc) {
|
||||
sds field = c->argv[fieldAt]->ptr;
|
||||
SetExRes res = hashTypeSetEx(hashObj, field, expire, &exCtx);
|
||||
|
||||
if (unlikely(res != HSETEX_OK)) {
|
||||
/* If the field was not set, prevent field propagation */
|
||||
rewriteClientCommandArgument(c, fieldAt, NULL);
|
||||
fieldsNotSet = 1;
|
||||
} else {
|
||||
++fieldAt;
|
||||
}
|
||||
|
||||
addReplyLongLong(c,res);
|
||||
}
|
||||
|
||||
hashTypeSetExDone(&exCtx);
|
||||
|
||||
/* rewrite command for the replica sake */
|
||||
/* Avoid propagating command if not even one field was updated (Either because
|
||||
* the time is in the past, and corresponding HDELs were sent, or conditions
|
||||
* not met) then it is useless and invalid to propagate command with no fields */
|
||||
if (exCtx.fieldUpdated == 0) {
|
||||
preventCommandPropagation(c);
|
||||
return;
|
||||
}
|
||||
|
||||
/* If some fields were dropped, rewrite the number of fields */
|
||||
if (fieldsNotSet) {
|
||||
robj *numFieldsObj = createStringObjectFromLongLong(exCtx.fieldUpdated);
|
||||
rewriteClientCommandArgument(c, numFieldsAt, numFieldsObj);
|
||||
decrRefCount(numFieldsObj);
|
||||
}
|
||||
|
||||
/* Propagate as HPEXPIREAT millisecond-timestamp. Rewrite only if not already */
|
||||
if (c->cmd->proc != hpexpireatCommand) {
|
||||
|
||||
Reference in New Issue
Block a user