Files
redis/tests/modules/cmdintrospection.c
debing.sun fa040a72c0 Add XDELEX and XACKDEL commands for stream (#14130)
## Summary and detailed design for new stream command

## XDELEX

### Syntax
```
XDELEX key [KEEPREF | DELREF | ACKED] IDS numids id [id ...]
```

### Description
The `XDELEX` command extends the Redis Streams `XDEL` command, offering
enhanced control over message entry deletion with respect to consumer
groups. It accepts optional `DELREF` or `ACKED` parameters to modify its
behavior:

- **KEEPREF:** Deletes the specified entries from the stream, but
preserves existing references to these entries in all consumer groups'
PEL. This behavior is similar to XDEL.
- **DELREF:** Deletes the specified entries from the stream and also
removes all references to these entries from all consumer groups'
pending entry lists, effectively cleaning up all traces of the messages.
- **ACKED:** Only trims entries that were read and acknowledged by all
consumer groups.

**Note:** The `IDS` block can appear at any position in the command,
consistent with other commands.

### Reply
Array reply, for each `id`:
- `-1`: No such `id` exists in the provided stream `key`.
- `1`: Entry was deleted from the stream.
- `2`: Entry was not deleted, but there are still dangling references.
(ACKED option)

## XACKDEL

### Syntax
```
XACKDEL key group [KEEPREF | DELREF | ACKED] IDS numids id [id ...]
```

### Description
The `XACKDEL` command combines `XACK` and `XDEL` functionalities in
Redis Streams. It acknowledges specified message IDs in the given
consumer group and attempts to delete corresponding stream entries. It
accepts optional `DELREF` or `ACKED` parameters:

- **KEEPREF:** Acknowledges the messages in the specified consumer group
and deletes the entries from the stream, but preserves existing
references to these entries in all consumer groups' PEL.
- **DELREF:** Acknowledges the messages in the specified consumer group,
deletes the entries from the stream, and also removes all references to
these entries from all consumer groups' pending entry lists, effectively
cleaning up all traces of the messages.
- **ACKED:** Acknowledges the messages in the specified consumer group
and only trims entries that were read and acknowledged by all consumer
groups.


### Reply
Array reply, for each `id`:
- `-1`: No such `id` exists in the provided stream `key`.
- `1`: Entry was acknowledged and deleted from the stream.
- `2`: Entry was acknowledged but not deleted, but there are still
dangling references. (ACKED option)

# Redis Streams Commands Extension

## XTRIM

### Syntax
```
XTRIM key <MAXLEN | MINID> [= | ~] threshold [LIMIT count] [KEEPREF | DELREF | ACKED]
```

### Description
The `XTRIM` command trims a stream by removing entries based on
specified criteria, extended to include optional `DELREF` or `ACKED`
parameters for consumer group handling:

- **KEEPREF:** Trims the stream according to the specified strategy
(MAXLEN or MINID) regardless of whether entries are referenced by any
consumer groups, but preserves existing references to these entries in
all consumer groups' PEL.
- **DELREF:** Trims the stream according to the specified strategy and
also removes all references to the trimmed entries from all consumer
groups' PEL.
- **ACKED:** Only trims entries that were read and acknowledged by all
consumer groups.

### Reply
No change.

## XADD

### Syntax
```
XADD key [NOMKSTREAM] [<MAXLEN | MINID> [= | ~] threshold [LIMIT count]] [KEEPREF | DELREF | ACKED] <* | id> field value [field value ...]
```

### Description
The `XADD` command appends a new entry to a stream and optionally trims
it in the same operation, extended to include optional `DELREF` or
`ACKED` parameters for trimming behavior:

- **KEEPREF:** When trimming, removes entries from the stream according
to the specified strategy (MAXLEN or MINID), regardless of whether they
are referenced by any consumer groups, but preserves existing references
to these entries in all consumer groups' PEL.
- **DELREF:** When trimming, removes entries from the stream according
to the specified strategy and also removes all references to these
entries from all consumer groups' PEL.
- **ACKED:** When trimming, only removes entries that were read and
acknowledged by all consumer groups. Note that if the number of
referenced entries is bigger than MAXLEN, we will still stop.

### Reply
No change.

## Key implementation

Since we currently have no simple way to track the association between
an entry and consumer groups without iterating over all groups, we
introduce two mechanisms to establish this link. This allows us to
determine whether an entry has been seen by all consumer groups, and to
identify which groups are referencing it. With this links, we can break
the association when the entry is either acknowledged or deleted.

1) Added reference tracking between stream messages and consumer groups
using `cgroups_ref`
The cgroups_ref is implemented as a rax that maps stream message IDs to
lists of consumer groups that reference those messages, and streamNACK
stores the corresponding nodes of this list, so that the corresponding
groups can be deleted during `ACK`.
In this way, we can determine whether an entry has been seen but not
ack.
2) Store a cache minimum last_id in the stream structure.
The reason for doing this is that there is a situation where an entry
has never been seen by the consume group. In this case, we think this
entry has not been consumed either. If there is an "ACKED" option, we
cannot directly delete this entry either.
When a consumer group updates its last_id, we don’t immediately update
the cached minimum last_id. Instead, we check whether the group’s
previous last_id was equal to the current minimum, or whether the new
last_id is smaller than the current minimum (when using `XGROUP SETID`).
If either is true, we mark the cached minimum last_id as invalid, and
defer the actual update until the next time it’s needed.

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: moticless <moticless@github.com>
Co-authored-by: Ozan Tezcan <ozantezcan@gmail.com>
Co-authored-by: Slavomir Kaslev <slavomir.kaslev@gmail.com>
Co-authored-by: Yuan Wang <yuan.wang@redis.com>
2025-07-01 21:00:42 +08:00

183 lines
7.0 KiB
C

#include "redismodule.h"
#define UNUSED(V) ((void) V)
int cmd_xadd(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
UNUSED(argv);
UNUSED(argc);
RedisModule_ReplyWithSimpleString(ctx, "OK");
return REDISMODULE_OK;
}
int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
REDISMODULE_NOT_USED(argv);
REDISMODULE_NOT_USED(argc);
if (RedisModule_Init(ctx, "cmdintrospection", 1, REDISMODULE_APIVER_1) == REDISMODULE_ERR)
return REDISMODULE_ERR;
if (RedisModule_CreateCommand(ctx,"cmdintrospection.xadd",cmd_xadd,"write deny-oom random fast",0,0,0) == REDISMODULE_ERR)
return REDISMODULE_ERR;
RedisModuleCommand *xadd = RedisModule_GetCommand(ctx,"cmdintrospection.xadd");
RedisModuleCommandInfo info = {
.version = REDISMODULE_COMMAND_INFO_VERSION,
.arity = -5,
.summary = "Appends a new message to a stream. Creates the key if it doesn't exist.",
.since = "5.0.0",
.complexity = "O(1) when adding a new entry, O(N) when trimming where N being the number of entries evicted.",
.tips = "nondeterministic_output",
.history = (RedisModuleCommandHistoryEntry[]){
/* NOTE: All versions specified should be the module's versions, not
* Redis'! We use Redis versions in this example for the purpose of
* testing (comparing the output with the output of the vanilla
* XADD). */
{"6.2.0", "Added the `NOMKSTREAM` option, `MINID` trimming strategy and the `LIMIT` option."},
{"7.0.0", "Added support for the `<ms>-*` explicit ID form."},
{"8.2.0", "Added the `KEEPREF`, `DELREF` and `ACKED` options."},
{0}
},
.key_specs = (RedisModuleCommandKeySpec[]){
{
.notes = "UPDATE instead of INSERT because of the optional trimming feature",
.flags = REDISMODULE_CMD_KEY_RW | REDISMODULE_CMD_KEY_UPDATE,
.begin_search_type = REDISMODULE_KSPEC_BS_INDEX,
.bs.index.pos = 1,
.find_keys_type = REDISMODULE_KSPEC_FK_RANGE,
.fk.range = {0,1,0}
},
{0}
},
.args = (RedisModuleCommandArg[]){
{
.name = "key",
.type = REDISMODULE_ARG_TYPE_KEY,
.key_spec_index = 0
},
{
.name = "nomkstream",
.type = REDISMODULE_ARG_TYPE_PURE_TOKEN,
.token = "NOMKSTREAM",
.since = "6.2.0",
.flags = REDISMODULE_CMD_ARG_OPTIONAL
},
{
.name = "condition",
.type = REDISMODULE_ARG_TYPE_ONEOF,
.flags = REDISMODULE_CMD_ARG_OPTIONAL,
.subargs = (RedisModuleCommandArg[]){
{
.name = "keepref",
.type = REDISMODULE_ARG_TYPE_PURE_TOKEN,
.token = "KEEPREF"
},
{
.name = "delref",
.type = REDISMODULE_ARG_TYPE_PURE_TOKEN,
.token = "DELREF"
},
{
.name = "acked",
.type = REDISMODULE_ARG_TYPE_PURE_TOKEN,
.token = "ACKED"
},
{0}
}
},
{
.name = "trim",
.type = REDISMODULE_ARG_TYPE_BLOCK,
.flags = REDISMODULE_CMD_ARG_OPTIONAL,
.subargs = (RedisModuleCommandArg[]){
{
.name = "strategy",
.type = REDISMODULE_ARG_TYPE_ONEOF,
.subargs = (RedisModuleCommandArg[]){
{
.name = "maxlen",
.type = REDISMODULE_ARG_TYPE_PURE_TOKEN,
.token = "MAXLEN",
},
{
.name = "minid",
.type = REDISMODULE_ARG_TYPE_PURE_TOKEN,
.token = "MINID",
.since = "6.2.0",
},
{0}
}
},
{
.name = "operator",
.type = REDISMODULE_ARG_TYPE_ONEOF,
.flags = REDISMODULE_CMD_ARG_OPTIONAL,
.subargs = (RedisModuleCommandArg[]){
{
.name = "equal",
.type = REDISMODULE_ARG_TYPE_PURE_TOKEN,
.token = "="
},
{
.name = "approximately",
.type = REDISMODULE_ARG_TYPE_PURE_TOKEN,
.token = "~"
},
{0}
}
},
{
.name = "threshold",
.type = REDISMODULE_ARG_TYPE_STRING,
.display_text = "threshold" /* Just for coverage, doesn't have a visible effect */
},
{
.name = "count",
.type = REDISMODULE_ARG_TYPE_INTEGER,
.token = "LIMIT",
.since = "6.2.0",
.flags = REDISMODULE_CMD_ARG_OPTIONAL
},
{0}
}
},
{
.name = "id-selector",
.type = REDISMODULE_ARG_TYPE_ONEOF,
.subargs = (RedisModuleCommandArg[]){
{
.name = "auto-id",
.type = REDISMODULE_ARG_TYPE_PURE_TOKEN,
.token = "*"
},
{
.name = "id",
.type = REDISMODULE_ARG_TYPE_STRING,
},
{0}
}
},
{
.name = "data",
.type = REDISMODULE_ARG_TYPE_BLOCK,
.flags = REDISMODULE_CMD_ARG_MULTIPLE,
.subargs = (RedisModuleCommandArg[]){
{
.name = "field",
.type = REDISMODULE_ARG_TYPE_STRING,
},
{
.name = "value",
.type = REDISMODULE_ARG_TYPE_STRING,
},
{0}
}
},
{0}
}
};
if (RedisModule_SetCommandInfo(xadd, &info) == REDISMODULE_ERR)
return REDISMODULE_ERR;
return REDISMODULE_OK;
}