Implement WITHATTRIBS for VSIM. (#14065)

Hi, as described, this implements WITHATTRIBS, a feature requested by a
few users, and indeed needed.
This was requested the first time by @rowantrollope but I was not sure
how to make it work with RESP2 and RESP3 in a clean way, hopefully
that's it.

The patch includes tests and documentation updates.
This commit is contained in:
Salvatore Sanfilippo
2025-05-27 16:12:48 +02:00
committed by YaacovHazan
parent 5362410de7
commit 795ec9118b
5 changed files with 279 additions and 21 deletions

View File

@@ -66,7 +66,7 @@ performed in the background, while the command is executed in the main thread.
**VSIM: return elements by vector similarity**
VSIM key [ELE|FP32|VALUES] <vector or element> [WITHSCORES] [COUNT num] [EF search-exploration-factor] [FILTER expression] [FILTER-EF max-filtering-effort] [TRUTH] [NOTHREAD]
VSIM key [ELE|FP32|VALUES] <vector or element> [WITHSCORES] [WITHATTRIBS] [COUNT num] [EF search-exploration-factor] [FILTER expression] [FILTER-EF max-filtering-effort] [TRUTH] [NOTHREAD]
The command returns similar vectors, for simplicity (and verbosity) in the following example, instead of providing a vector using FP32 or VALUES (like in `VADD`), we will ask for elements having a vector similar to a given element already in the sorted set:
@@ -98,8 +98,14 @@ The `TRUTH` option forces the command to perform a linear scan of all the entrie
The `NOTHREAD` option forces the command to execute the search on the data structure in the main thread. Normally `VSIM` spawns a thread instead. This may be useful for benchmarking purposes, or when we work with extremely small vector sets and don't want to pay the cost of spawning a thread. It is possible that in the future this option will be automatically used by Redis when we detect small vector sets. Note that this option blocks the server for all the time needed to complete the command, so it is a source of potential latency issues: if you are in doubt, never use it.
The `WITHSCORES` option returns, for each returned element, a floating point number representing how near the element is from the query, as a similarity between 0 and 1, where 0 means the vectors are opposite, and 1 means they are pointing exactly in the same direction (maximum similarity).
The `WITHATTRIBS` option returns, for each element, the JSON attribute associated with the element, or NULL for the elements missing an attribute.
For `FILTER` and `FILTER-EF` options, please check the filtered search section of this documentation.
Note that when `WITHSCORES` and `WITHATTRIBS` are provided at the same time, the RESP2 reply guarantees that the returned elements are always in the sequence *ele*,*score*,*attribs*, while RESP3 replies will be in the form *ele > score|attrib* when just one is provided, or *ele -> [score,attrib]* when both are provided, that is, when both options are used and RESP3 is used the score and attribute will be a two-items array associated to the element key.
**VDIM: return the dimension of the vectors inside the vector set**
VDIM keyname

View File

@@ -291,6 +291,12 @@
"name": "withscores",
"type": "pure-token",
"optional": true
},
{
"token": "WITHATTRIBS",
"name": "withattribs",
"type": "pure-token",
"optional": true
}
]
},

View File

@@ -94,6 +94,7 @@ class TestCase:
self.test_key = f"test:{self.__class__.__name__.lower()}"
# Primary Redis instance
self.redis = redis.Redis(port=primary_port)
self.redis3 = redis.Redis(port=primary_port,protocol=3)
# Replica Redis instance
self.replica = redis.Redis(port=replica_port)
# Replication status
@@ -184,6 +185,7 @@ def find_test_classes(primary_port, replica_port):
# Create test instance with specified ports
test_instance = obj()
test_instance.redis = redis.Redis(port=primary_port)
test_instance.redis3 = redis.Redis(port=primary_port,protocol=3)
test_instance.replica = redis.Redis(port=replica_port)
test_instance.primary_port = primary_port
test_instance.replica_port = replica_port

View File

@@ -0,0 +1,214 @@
from test import TestCase, generate_random_vector
import struct
import json
import random
class VSIMWithAttribs(TestCase):
def getname(self):
return "VSIM WITHATTRIBS/WITHSCORES functionality testing"
def setup(self):
super().setup()
self.dim = 8
self.count = 20
# Create vectors with attributes
for i in range(self.count):
vec = generate_random_vector(self.dim)
vec_bytes = struct.pack(f'{self.dim}f', *vec)
# Item name
name = f"{self.test_key}:item:{i}"
# Add to Redis
self.redis.execute_command('VADD', self.test_key, 'FP32', vec_bytes, name)
# Create and add attribute
if i % 5 == 0:
# Every 5th item has no attribute (for testing NULL responses)
continue
category = random.choice(["electronics", "furniture", "clothing"])
price = random.randint(50, 1000)
attrs = {"category": category, "price": price, "id": i}
self.redis.execute_command('VSETATTR', self.test_key, name, json.dumps(attrs))
def is_numeric(self, value):
"""Check if a value can be converted to float"""
try:
if isinstance(value, (int, float)):
return True
if isinstance(value, bytes):
float(value.decode('utf-8'))
return True
if isinstance(value, str):
float(value)
return True
return False
except (ValueError, TypeError):
return False
def test(self):
# Create query vector
query_vec = generate_random_vector(self.dim)
# Test 1: VSIM with no additional options (should be same for RESP2 and RESP3)
cmd_args = ['VSIM', self.test_key, 'VALUES', self.dim]
cmd_args.extend([str(x) for x in query_vec])
cmd_args.extend(['COUNT', 5])
results_resp2 = self.redis.execute_command(*cmd_args)
results_resp3 = self.redis3.execute_command(*cmd_args)
# Both should return simple arrays of item names
assert len(results_resp2) == 5, f"RESP2: Expected 5 results, got {len(results_resp2)}"
assert len(results_resp3) == 5, f"RESP3: Expected 5 results, got {len(results_resp3)}"
assert all(isinstance(item, bytes) for item in results_resp2), "RESP2: Results should be byte strings"
assert all(isinstance(item, bytes) for item in results_resp3), "RESP3: Results should be byte strings"
# Test 2: VSIM with WITHSCORES only
cmd_args = ['VSIM', self.test_key, 'VALUES', self.dim]
cmd_args.extend([str(x) for x in query_vec])
cmd_args.extend(['COUNT', 5, 'WITHSCORES'])
results_resp2 = self.redis.execute_command(*cmd_args)
results_resp3 = self.redis3.execute_command(*cmd_args)
# RESP2: Should be a flat array alternating item, score
assert len(results_resp2) == 10, f"RESP2: Expected 10 elements (5 items × 2), got {len(results_resp2)}"
for i in range(0, len(results_resp2), 2):
assert isinstance(results_resp2[i], bytes), f"RESP2: Item at {i} should be bytes"
assert self.is_numeric(results_resp2[i+1]), f"RESP2: Score at {i+1} should be numeric"
score = float(results_resp2[i+1]) if isinstance(results_resp2[i+1], bytes) else results_resp2[i+1]
assert 0 <= score <= 1, f"RESP2: Score {score} should be between 0 and 1"
# RESP3: Should be a dict/map with items as keys and scores as DIRECT values (not arrays)
assert isinstance(results_resp3, dict), f"RESP3: Expected dict, got {type(results_resp3)}"
assert len(results_resp3) == 5, f"RESP3: Expected 5 entries, got {len(results_resp3)}"
for item, score in results_resp3.items():
assert isinstance(item, bytes), f"RESP3: Key should be bytes"
# Score should be a direct value, NOT an array
assert not isinstance(score, list), f"RESP3: With single WITH option, value should not be array"
assert self.is_numeric(score), f"RESP3: Score should be numeric, got {type(score)}"
score_val = float(score) if isinstance(score, bytes) else score
assert 0 <= score_val <= 1, f"RESP3: Score {score_val} should be between 0 and 1"
# Test 3: VSIM with WITHATTRIBS only
cmd_args = ['VSIM', self.test_key, 'VALUES', self.dim]
cmd_args.extend([str(x) for x in query_vec])
cmd_args.extend(['COUNT', 5, 'WITHATTRIBS'])
results_resp2 = self.redis.execute_command(*cmd_args)
results_resp3 = self.redis3.execute_command(*cmd_args)
# RESP2: Should be a flat array alternating item, attribute
assert len(results_resp2) == 10, f"RESP2: Expected 10 elements (5 items × 2), got {len(results_resp2)}"
for i in range(0, len(results_resp2), 2):
assert isinstance(results_resp2[i], bytes), f"RESP2: Item at {i} should be bytes"
attr = results_resp2[i+1]
assert attr is None or isinstance(attr, bytes), f"RESP2: Attribute at {i+1} should be None or bytes"
if attr is not None:
# Verify it's valid JSON
json.loads(attr)
# RESP3: Should be a dict/map with items as keys and attributes as DIRECT values (not arrays)
assert isinstance(results_resp3, dict), f"RESP3: Expected dict, got {type(results_resp3)}"
assert len(results_resp3) == 5, f"RESP3: Expected 5 entries, got {len(results_resp3)}"
for item, attr in results_resp3.items():
assert isinstance(item, bytes), f"RESP3: Key should be bytes"
# Attribute should be a direct value, NOT an array
assert not isinstance(attr, list), f"RESP3: With single WITH option, value should not be array"
assert attr is None or isinstance(attr, bytes), f"RESP3: Attribute should be None or bytes"
if attr is not None:
# Verify it's valid JSON
json.loads(attr)
# Test 4: VSIM with both WITHSCORES and WITHATTRIBS
cmd_args = ['VSIM', self.test_key, 'VALUES', self.dim]
cmd_args.extend([str(x) for x in query_vec])
cmd_args.extend(['COUNT', 5, 'WITHSCORES', 'WITHATTRIBS'])
results_resp2 = self.redis.execute_command(*cmd_args)
results_resp3 = self.redis3.execute_command(*cmd_args)
# RESP2: Should be a flat array with pattern: item, score, attribute
assert len(results_resp2) == 15, f"RESP2: Expected 15 elements (5 items × 3), got {len(results_resp2)}"
for i in range(0, len(results_resp2), 3):
assert isinstance(results_resp2[i], bytes), f"RESP2: Item at {i} should be bytes"
assert self.is_numeric(results_resp2[i+1]), f"RESP2: Score at {i+1} should be numeric"
score = float(results_resp2[i+1]) if isinstance(results_resp2[i+1], bytes) else results_resp2[i+1]
assert 0 <= score <= 1, f"RESP2: Score {score} should be between 0 and 1"
attr = results_resp2[i+2]
assert attr is None or isinstance(attr, bytes), f"RESP2: Attribute at {i+2} should be None or bytes"
# RESP3: Should be a dict where each value is a 2-element array [score, attribute]
assert isinstance(results_resp3, dict), f"RESP3: Expected dict, got {type(results_resp3)}"
assert len(results_resp3) == 5, f"RESP3: Expected 5 entries, got {len(results_resp3)}"
for item, value in results_resp3.items():
assert isinstance(item, bytes), f"RESP3: Key should be bytes"
# With BOTH options, value MUST be an array
assert isinstance(value, list), f"RESP3: With both WITH options, value should be a list, got {type(value)}"
assert len(value) == 2, f"RESP3: Value should have 2 elements [score, attr], got {len(value)}"
score, attr = value
assert self.is_numeric(score), f"RESP3: Score should be numeric"
score_val = float(score) if isinstance(score, bytes) else score
assert 0 <= score_val <= 1, f"RESP3: Score {score_val} should be between 0 and 1"
assert attr is None or isinstance(attr, bytes), f"RESP3: Attribute should be None or bytes"
# Test 5: Verify consistency - same items returned in same order
cmd_args = ['VSIM', self.test_key, 'VALUES', self.dim]
cmd_args.extend([str(x) for x in query_vec])
cmd_args.extend(['COUNT', 5, 'WITHSCORES', 'WITHATTRIBS'])
results_resp2 = self.redis.execute_command(*cmd_args)
results_resp3 = self.redis3.execute_command(*cmd_args)
# Extract items from RESP2 (every 3rd element starting from 0)
items_resp2 = [results_resp2[i] for i in range(0, len(results_resp2), 3)]
# Extract items from RESP3 (keys of the dict)
items_resp3 = list(results_resp3.keys())
# Verify same items returned
assert set(items_resp2) == set(items_resp3), "RESP2 and RESP3 should return the same items"
# Build a mapping from items to scores and attributes for comparison
data_resp2 = {}
for i in range(0, len(results_resp2), 3):
item = results_resp2[i]
score = float(results_resp2[i+1]) if isinstance(results_resp2[i+1], bytes) else results_resp2[i+1]
attr = results_resp2[i+2]
data_resp2[item] = (score, attr)
data_resp3 = {}
for item, value in results_resp3.items():
score = float(value[0]) if isinstance(value[0], bytes) else value[0]
attr = value[1]
data_resp3[item] = (score, attr)
# Verify scores and attributes match for each item
for item in data_resp2:
score_resp2, attr_resp2 = data_resp2[item]
score_resp3, attr_resp3 = data_resp3[item]
assert abs(score_resp2 - score_resp3) < 0.0001, \
f"Scores for {item} don't match: RESP2={score_resp2}, RESP3={score_resp3}"
assert attr_resp2 == attr_resp3, \
f"Attributes for {item} don't match: RESP2={attr_resp2}, RESP3={attr_resp3}"
# Test 6: Test ordering of WITHSCORES and WITHATTRIBS doesn't matter
cmd_args1 = ['VSIM', self.test_key, 'VALUES', self.dim]
cmd_args1.extend([str(x) for x in query_vec])
cmd_args1.extend(['COUNT', 3, 'WITHSCORES', 'WITHATTRIBS'])
cmd_args2 = ['VSIM', self.test_key, 'VALUES', self.dim]
cmd_args2.extend([str(x) for x in query_vec])
cmd_args2.extend(['COUNT', 3, 'WITHATTRIBS', 'WITHSCORES']) # Reversed order
results1_resp3 = self.redis3.execute_command(*cmd_args1)
results2_resp3 = self.redis3.execute_command(*cmd_args2)
# Both should return the same structure
assert results1_resp3 == results2_resp3, "Order of WITH options shouldn't matter"

View File

@@ -801,8 +801,8 @@ int vectorSetFilterCallback(void *value, void *privdata) {
* handles the HNSW locking explicitly. */
void VSIM_execute(RedisModuleCtx *ctx, struct vsetObject *vset,
float *vec, unsigned long count, float epsilon, unsigned long withscores,
unsigned long ef, exprstate *filter_expr, unsigned long filter_ef,
int ground_truth)
unsigned long withattribs, unsigned long ef, exprstate *filter_expr,
unsigned long filter_ef, int ground_truth)
{
/* In our scan, we can't just collect 'count' elements as
* if count is small we would explore the graph in an insufficient
@@ -837,28 +837,52 @@ void VSIM_execute(RedisModuleCtx *ctx, struct vsetObject *vset,
}
/* Return results */
if (withscores)
int resp3 = RedisModule_GetContextFlags(ctx) & REDISMODULE_CTX_FLAGS_RESP3;
int reply_with_map = resp3 && (withscores || withattribs);
if (reply_with_map)
RedisModule_ReplyWithMap(ctx, REDISMODULE_POSTPONED_LEN);
else
RedisModule_ReplyWithArray(ctx, REDISMODULE_POSTPONED_LEN);
long long arraylen = 0;
long long arraylen = 0;
for (unsigned int i = 0; i < found && i < count; i++) {
if (distances[i] > epsilon) break;
struct vsetNodeVal *nv = neighbors[i]->value;
RedisModule_ReplyWithString(ctx, nv->item);
arraylen++;
/* If the user asked for multiple properties at the same time using
* the RESP3 protocol, we wrap the value of the map into an N-items
* array. Two for now, since we have just two properties that can be
* requested.
*
* So in the case of RESP2 we will just have the flat reply:
* item, score, attribute. For RESP3 instead item -> [score, attribute]
*/
if (resp3 && withscores && withattribs)
RedisModule_ReplyWithArray(ctx,2);
if (withscores) {
/* The similarity score is provided in a 0-1 range. */
RedisModule_ReplyWithDouble(ctx, 1.0 - distances[i]/2.0);
}
if (withattribs) {
/* Return the attributes as well, if any. */
if (nv->attrib)
RedisModule_ReplyWithString(ctx, nv->attrib);
else
RedisModule_ReplyWithNull(ctx);
}
}
hnsw_release_read_slot(vset->hnsw,slot);
if (withscores)
if (reply_with_map) {
RedisModule_ReplySetMapLength(ctx, arraylen);
else
RedisModule_ReplySetArrayLength(ctx, arraylen);
} else {
int items_per_ele = 1+withattribs+withscores;
RedisModule_ReplySetArrayLength(ctx, arraylen * items_per_ele);
}
RedisModule_Free(vec);
RedisModule_Free(neighbors);
@@ -878,10 +902,11 @@ void *VSIM_thread(void *arg) {
unsigned long count = (unsigned long)targ[3];
float epsilon = *((float*)targ[4]);
unsigned long withscores = (unsigned long)targ[5];
unsigned long ef = (unsigned long)targ[6];
exprstate *filter_expr = targ[7];
unsigned long filter_ef = (unsigned long)targ[8];
unsigned long ground_truth = (unsigned long)targ[9];
unsigned long withattribs = (unsigned long)targ[6];
unsigned long ef = (unsigned long)targ[7];
exprstate *filter_expr = targ[8];
unsigned long filter_ef = (unsigned long)targ[9];
unsigned long ground_truth = (unsigned long)targ[10];
RedisModule_Free(targ[4]);
RedisModule_Free(targ);
@@ -894,7 +919,7 @@ void *VSIM_thread(void *arg) {
RedisModuleCtx *ctx = RedisModule_GetThreadSafeContext(bc);
// Run the query.
VSIM_execute(ctx, vset, vec, count, epsilon, withscores, ef, filter_expr, filter_ef, ground_truth);
VSIM_execute(ctx, vset, vec, count, epsilon, withscores, withattribs, ef, filter_expr, filter_ef, ground_truth);
pthread_rwlock_unlock(&vset->in_use_lock);
// Cleanup.
@@ -904,7 +929,7 @@ void *VSIM_thread(void *arg) {
return NULL;
}
/* VSIM key [ELE|FP32|VALUES] <vector or ele> [WITHSCORES] [COUNT num] [EPSILON eps] [EF exploration-factor] [FILTER expression] [FILTER-EF exploration-factor] */
/* VSIM key [ELE|FP32|VALUES] <vector or ele> [WITHSCORES] [WITHATTRIBS] [COUNT num] [EPSILON eps] [EF exploration-factor] [FILTER expression] [FILTER-EF exploration-factor] */
int VSIM_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
RedisModule_AutoMemory(ctx);
@@ -914,6 +939,7 @@ int VSIM_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
/* Defaults */
int withscores = 0;
int withattribs = 0;
long long count = VSET_DEFAULT_COUNT; /* New default value */
long long ef = 0; /* Exploration factor (see HNSW paper) */
double epsilon = 2.0; /* Max cosine distance */
@@ -1017,6 +1043,9 @@ int VSIM_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
if (!strcasecmp(opt, "WITHSCORES")) {
withscores = 1;
j++;
} else if (!strcasecmp(opt, "WITHATTRIBS")) {
withattribs = 1;
j++;
} else if (!strcasecmp(opt, "TRUTH")) {
ground_truth = 1;
j++;
@@ -1097,7 +1126,7 @@ int VSIM_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
* free slot if all the HNSW_MAX_THREADS slots are used. */
RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx,NULL,NULL,NULL,0);
pthread_t tid;
void **targ = RedisModule_Alloc(sizeof(void*)*10);
void **targ = RedisModule_Alloc(sizeof(void*)*11);
targ[0] = bc;
targ[1] = vset;
targ[2] = vec;
@@ -1105,10 +1134,11 @@ int VSIM_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
targ[4] = RedisModule_Alloc(sizeof(float));
*((float*)targ[4]) = epsilon;
targ[5] = (void*)(unsigned long)withscores;
targ[6] = (void*)(unsigned long)ef;
targ[7] = (void*)filter_expr;
targ[8] = (void*)(unsigned long)filter_ef;
targ[9] = (void*)(unsigned long)ground_truth;
targ[6] = (void*)(unsigned long)withattribs;
targ[7] = (void*)(unsigned long)ef;
targ[8] = (void*)filter_expr;
targ[9] = (void*)(unsigned long)filter_ef;
targ[10] = (void*)(unsigned long)ground_truth;
RedisModule_BlockedClientMeasureTimeStart(bc);
vset->thread_creation_pending++;
if (pthread_create(&tid,NULL,VSIM_thread,targ) != 0) {
@@ -1116,10 +1146,10 @@ int VSIM_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
RedisModule_AbortBlock(bc);
RedisModule_Free(targ[4]);
RedisModule_Free(targ);
VSIM_execute(ctx, vset, vec, count, epsilon, withscores, ef, filter_expr, filter_ef, ground_truth);
VSIM_execute(ctx, vset, vec, count, epsilon, withscores, withattribs, ef, filter_expr, filter_ef, ground_truth);
}
} else {
VSIM_execute(ctx, vset, vec, count, epsilon, withscores, ef, filter_expr, filter_ef, ground_truth);
VSIM_execute(ctx, vset, vec, count, epsilon, withscores, withattribs, ef, filter_expr, filter_ef, ground_truth);
}
return REDISMODULE_OK;