Add API to allow Redis modules to unsubscribe from keyspace notifications

This API complements module subscribe by enabling modules to unsubscribe
from specific keyspace event notifications when they are no longer
needed.
This helps reduce performance overhead and unnecessary callback
invocations.

The function matches subscriptions based on event mask, callback
pointer,
and module identity. If a matching subscription is found, it is removed.

Returns REDISMODULE_OK if a subscription was successfully removed,
otherwise REDISMODULE_ERR.
This commit is contained in:
Stav-Levi
2025-07-28 10:17:48 +03:00
committed by GitHub
parent fe3f0aa252
commit 82396716d0
4 changed files with 115 additions and 3 deletions

View File

@@ -8866,6 +8866,38 @@ int RM_SubscribeToKeyspaceEvents(RedisModuleCtx *ctx, int types, RedisModuleNoti
return REDISMODULE_OK;
}
/*
* RM_UnsubscribeFromKeyspaceEvents - Unregister a module's callback from keyspace notifications for specific event types.
*
* This function removes a previously registered subscription identified by both the event mask and the callback function.
* It is useful to reduce performance overhead when the module no longer requires notifications for certain events.
*
* Parameters:
* - ctx: The RedisModuleCtx associated with the calling module.
* - types: The event mask representing the keyspace notification types to unsubscribe from.
* - callback: The callback function pointer that was originally registered for these events.
*
* Returns:
* - REDISMODULE_OK on successful removal of the subscription.
* - REDISMODULE_ERR if no matching subscription was found or if invalid parameters were provided.
*/
int RM_UnsubscribeFromKeyspaceEvents(RedisModuleCtx *ctx, int types, RedisModuleNotificationFunc callback) {
if (!ctx || !callback) return REDISMODULE_ERR;
int removed = 0;
listIter li;
listNode *ln;
listRewind(moduleKeyspaceSubscribers,&li);
while ((ln = listNext(&li))) {
RedisModuleKeyspaceSubscriber *sub = ln->value;
if (sub->event_mask == types && sub->notify_callback == callback && sub->module == ctx->module) {
zfree(sub);
listDelNode(moduleKeyspaceSubscribers, ln);
removed++;
}
}
return removed > 0 ? REDISMODULE_OK : REDISMODULE_ERR;
}
void firePostExecutionUnitJobs(void) {
/* Avoid propagation of commands.
* In that way, postExecutionUnitOperations will prevent
@@ -14730,6 +14762,7 @@ void moduleRegisterCoreAPI(void) {
REGISTER_API(NotifyKeyspaceEvent);
REGISTER_API(GetNotifyKeyspaceEvents);
REGISTER_API(SubscribeToKeyspaceEvents);
REGISTER_API(UnsubscribeFromKeyspaceEvents);
REGISTER_API(AddPostNotificationJob);
REGISTER_API(RegisterClusterMessageReceiver);
REGISTER_API(SendClusterMessage);

View File

@@ -1255,6 +1255,7 @@ REDISMODULE_API void (*RedisModule_ThreadSafeContextLock)(RedisModuleCtx *ctx) R
REDISMODULE_API int (*RedisModule_ThreadSafeContextTryLock)(RedisModuleCtx *ctx) REDISMODULE_ATTR;
REDISMODULE_API void (*RedisModule_ThreadSafeContextUnlock)(RedisModuleCtx *ctx) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_SubscribeToKeyspaceEvents)(RedisModuleCtx *ctx, int types, RedisModuleNotificationFunc cb) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_UnsubscribeFromKeyspaceEvents)(RedisModuleCtx *ctx, int types, RedisModuleNotificationFunc cb) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_AddPostNotificationJob)(RedisModuleCtx *ctx, RedisModulePostNotificationJobFunc callback, void *pd, void (*free_pd)(void*)) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_NotifyKeyspaceEvent)(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_GetNotifyKeyspaceEvents)(void) REDISMODULE_ATTR;
@@ -1643,6 +1644,7 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
REDISMODULE_GET_API(BlockedClientMeasureTimeEnd);
REDISMODULE_GET_API(SetDisconnectCallback);
REDISMODULE_GET_API(SubscribeToKeyspaceEvents);
REDISMODULE_GET_API(UnsubscribeFromKeyspaceEvents);
REDISMODULE_GET_API(AddPostNotificationJob);
REDISMODULE_GET_API(NotifyKeyspaceEvent);
REDISMODULE_GET_API(GetNotifyKeyspaceEvents);

View File

@@ -45,8 +45,10 @@ static int KeySpace_NotificationLoaded(RedisModuleCtx *ctx, int type, const char
return REDISMODULE_OK;
}
static long long callback_call_count = 0;
static int KeySpace_NotificationGeneric(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key) {
REDISMODULE_NOT_USED(type);
callback_call_count++;
const char *key_str = RedisModule_StringPtrLen(key, NULL);
if (strncmp(key_str, "count_dels_", 11) == 0 && strcmp(event, "del") == 0) {
if (RedisModule_GetContextFlags(ctx) & REDISMODULE_CTX_FLAGS_MASTER) {
@@ -296,6 +298,55 @@ static int cmdGetDels(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
return RedisModule_ReplyWithLongLong(ctx, dels);
}
static RedisModuleNotificationFunc get_callback_for_event(int event_mask) {
switch(event_mask) {
case REDISMODULE_NOTIFY_LOADED:
return KeySpace_NotificationLoaded;
case REDISMODULE_NOTIFY_GENERIC:
return KeySpace_NotificationGeneric;
case REDISMODULE_NOTIFY_EXPIRED:
return KeySpace_NotificationExpired;
case REDISMODULE_NOTIFY_MODULE:
return KeySpace_NotificationModule;
case REDISMODULE_NOTIFY_KEY_MISS:
return KeySpace_NotificationModuleKeyMiss;
case REDISMODULE_NOTIFY_STRING:
// We have two callbacks for STRING events in your OnLoad,
// For simplicity, pick the first:
return KeySpace_NotificationModuleString;
default:
return NULL;
}
}
int GetCallbackCountCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
REDISMODULE_NOT_USED(argv);
REDISMODULE_NOT_USED(argc);
RedisModule_ReplyWithLongLong(ctx, callback_call_count);
return REDISMODULE_OK;
}
static int CmdUnsub(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
if (argc != 2) {
return RedisModule_WrongArity(ctx);
}
long long event_mask;
if (RedisModule_StringToLongLong(argv[1], &event_mask) != REDISMODULE_OK) {
return RedisModule_ReplyWithError(ctx, "ERR invalid event mask");
}
RedisModuleNotificationFunc cb = get_callback_for_event((int)event_mask);
if (cb == NULL) {
return RedisModule_ReplyWithError(ctx, "ERR unknown event mask");
}
if (RedisModule_UnsubscribeFromKeyspaceEvents(ctx, (int)event_mask, cb) != REDISMODULE_OK) {
return RedisModule_ReplyWithError(ctx, "ERR unsubscribe failed");
}
return RedisModule_ReplyWithSimpleString(ctx, "OK");
}
/* This function must be present on each Redis module. It is used in order to
* register the commands into the Redis server. */
int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
@@ -357,17 +408,17 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
"write", 0, 0, 0) == REDISMODULE_ERR){
return REDISMODULE_ERR;
}
if (RedisModule_CreateCommand(ctx, "keyspace.incr_case1", cmdIncrCase1,
"write", 0, 0, 0) == REDISMODULE_ERR){
return REDISMODULE_ERR;
}
if (RedisModule_CreateCommand(ctx, "keyspace.incr_case2", cmdIncrCase2,
"write", 0, 0, 0) == REDISMODULE_ERR){
return REDISMODULE_ERR;
}
if (RedisModule_CreateCommand(ctx, "keyspace.incr_case3", cmdIncrCase3,
"write", 0, 0, 0) == REDISMODULE_ERR){
return REDISMODULE_ERR;
@@ -383,6 +434,14 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
return REDISMODULE_ERR;
}
if (RedisModule_CreateCommand(ctx, "keyspace.unsubscribe", CmdUnsub, "write", 0, 0, 0) == REDISMODULE_ERR){
return REDISMODULE_ERR;
}
if (RedisModule_CreateCommand(ctx, "keyspace.callback_count", GetCallbackCountCommand, "", 0, 0, 0)== REDISMODULE_ERR){
return REDISMODULE_ERR;
}
if (argc == 1) {
const char *ptr = RedisModule_StringPtrLen(argv[0], NULL);
if (!strcasecmp(ptr, "noload")) {

View File

@@ -87,6 +87,24 @@ tags "modules" {
$rd1 close
}
test "Keyspace notifications: unsubscribe removes handler" {
r config set notify-keyspace-events KEA
set before [r keyspace.callback_count]
r set a 1
r del a
wait_for_condition 100 10 {
[r keyspace.callback_count] > $before
} else {
fail "callback did not trigger"
}
set before_unsub [r keyspace.callback_count]
r keyspace.unsubscribe 4 ;# REDISMODULE_NOTIFY_GENERIC
r set a 1
r del a
set after_unsub [r keyspace.callback_count]
assert_equal $before_unsub $after_unsub
}
test {Test expired key space event} {
set prev_expired [s expired_keys]
r set exp 1 PX 10