Add new commands ZDIFF and ZDIFFSTORE (#7961)

- Add ZDIFF and ZDIFFSTORE which work similarly to SDIFF and SDIFFSTORE
- Make sure the new WITHSCORES argument that was added for ZUNION isn't considered valid for ZUNIONSTORE

Co-authored-by: Oran Agra <oran@redislabs.com>
This commit is contained in:
Felipe Machado 2020-11-15 09:14:25 -03:00 committed by GitHub
parent 204a14b8cf
commit d8fd48c436
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 352 additions and 55 deletions

View File

@ -901,14 +901,12 @@ acllog-max-len 128
# Both LRU, LFU and volatile-ttl are implemented using approximated
# randomized algorithms.
#
# Note: with any of the above policies, Redis will return an error on write
# operations, when there are no suitable keys for eviction.
#
# At the date of writing these commands are: set setnx setex append
# incr decr rpush lpush rpushx lpushx linsert lset rpoplpush sadd
# sinter sinterstore sunion sunionstore sdiff sdiffstore zadd zincrby
# zunionstore zinterstore hset hsetnx hmset hincrby incrby decrby
# getset mset msetnx exec sort
# Note: with any of the above policies, when there are no suitable keys for
# eviction, Redis will return an error on write operations that require
# more memory. These are usually commands that create new keys, add data or
# modify existing keys. A few examples are: SET, INCR, HSET, LPUSH, SUNIONSTORE,
# SORT (due to the STORE argument), and EXEC (if the transaction includes any
# command that requires memory).
#
# The default is:
#

View File

@ -1441,12 +1441,12 @@ int genericGetKeys(int storeKeyOfs, int keyCountOfs, int firstKeyOfs, int keySte
return result->numkeys;
}
int zunionInterStoreGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
int zunionInterDiffStoreGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
UNUSED(cmd);
return genericGetKeys(1, 2, 3, 1, argv, argc, result);
}
int zunionInterGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
int zunionInterDiffGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
UNUSED(cmd);
return genericGetKeys(0, 1, 2, 1, argv, argc, result);
}

View File

@ -436,19 +436,27 @@ struct redisCommand redisCommandTable[] = {
{"zunionstore",zunionstoreCommand,-4,
"write use-memory @sortedset",
0,zunionInterStoreGetKeys,0,0,0,0,0,0},
0,zunionInterDiffStoreGetKeys,0,0,0,0,0,0},
{"zinterstore",zinterstoreCommand,-4,
"write use-memory @sortedset",
0,zunionInterStoreGetKeys,0,0,0,0,0,0},
0,zunionInterDiffStoreGetKeys,0,0,0,0,0,0},
{"zdiffstore",zdiffstoreCommand,-4,
"write use-memory @sortedset",
0,zunionInterDiffStoreGetKeys,0,0,0,0,0,0},
{"zunion",zunionCommand,-3,
"read-only @sortedset",
0,zunionInterGetKeys,0,0,0,0,0,0},
0,zunionInterDiffGetKeys,0,0,0,0,0,0},
{"zinter",zinterCommand,-3,
"read-only @sortedset",
0,zunionInterGetKeys,0,0,0,0,0,0},
0,zunionInterDiffGetKeys,0,0,0,0,0,0},
{"zdiff",zdiffCommand,-3,
"read-only @sortedset",
0,zunionInterDiffGetKeys,0,0,0,0,0,0},
{"zrange",zrangeCommand,-4,
"read-only @sortedset",

View File

@ -2207,8 +2207,8 @@ void freeObjAsync(robj *o);
int *getKeysPrepareResult(getKeysResult *result, int numkeys);
int getKeysFromCommand(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result);
void getKeysFreeResult(getKeysResult *result);
int zunionInterGetKeys(struct redisCommand *cmd,robj **argv, int argc, getKeysResult *result);
int zunionInterStoreGetKeys(struct redisCommand *cmd,robj **argv, int argc, getKeysResult *result);
int zunionInterDiffGetKeys(struct redisCommand *cmd,robj **argv, int argc, getKeysResult *result);
int zunionInterDiffStoreGetKeys(struct redisCommand *cmd,robj **argv, int argc, getKeysResult *result);
int evalGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result);
int sortGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result);
int migrateGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result);
@ -2427,8 +2427,10 @@ void hstrlenCommand(client *c);
void zremrangebyrankCommand(client *c);
void zunionstoreCommand(client *c);
void zinterstoreCommand(client *c);
void zdiffstoreCommand(client *c);
void zunionCommand(client *c);
void zinterCommand(client *c);
void zdiffCommand(client *c);
void zscanCommand(client *c);
void hkeysCommand(client *c);
void hvalsCommand(client *c);

View File

@ -1435,6 +1435,36 @@ int zsetAdd(robj *zobj, double score, sds ele, int *flags, double *newscore) {
return 0; /* Never reached. */
}
/* Deletes the element 'ele' from the sorted set encoded as a skiplist+dict,
* returning 1 if the element existed and was deleted, 0 otherwise (the
* element was not there). It does not resize the dict after deleting the
* element. */
static int zsetRemoveFromSkiplist(zset *zs, sds ele) {
dictEntry *de;
double score;
de = dictUnlink(zs->dict,ele);
if (de != NULL) {
/* Get the score in order to delete from the skiplist later. */
score = *(double*)dictGetVal(de);
/* Delete from the hash table and later from the skiplist.
* Note that the order is important: deleting from the skiplist
* actually releases the SDS string representing the element,
* which is shared between the skiplist and the hash table, so
* we need to delete from the skiplist as the final step. */
dictFreeUnlinkedEntry(zs->dict,de);
/* Delete from skiplist. */
int retval = zslDelete(zs->zsl,score,ele,NULL);
serverAssert(retval);
return 1;
}
return 0;
}
/* Delete the element 'ele' from the sorted set, returning 1 if the element
* existed and was deleted, 0 otherwise (the element was not there). */
int zsetDel(robj *zobj, sds ele) {
@ -1447,25 +1477,7 @@ int zsetDel(robj *zobj, sds ele) {
}
} else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
zset *zs = zobj->ptr;
dictEntry *de;
double score;
de = dictUnlink(zs->dict,ele);
if (de != NULL) {
/* Get the score in order to delete from the skiplist later. */
score = *(double*)dictGetVal(de);
/* Delete from the hash table and later from the skiplist.
* Note that the order is important: deleting from the skiplist
* actually releases the SDS string representing the element,
* which is shared between the skiplist and the hash table, so
* we need to delete from the skiplist as the final step. */
dictFreeUnlinkedEntry(zs->dict,de);
/* Delete from skiplist. */
int retval = zslDelete(zs->zsl,score,ele,NULL);
serverAssert(retval);
if (zsetRemoveFromSkiplist(zs, ele)) {
if (htNeedsResize(zs->dict)) dictResize(zs->dict);
return 1;
}
@ -2162,6 +2174,10 @@ int zuiCompareByCardinality(const void *s1, const void *s2) {
return 0;
}
static int zuiCompareByRevCardinality(const void *s1, const void *s2) {
return zuiCompareByCardinality(s1, s2) * -1;
}
#define REDIS_AGGR_SUM 1
#define REDIS_AGGR_MIN 2
#define REDIS_AGGR_MAX 3
@ -2184,6 +2200,180 @@ inline static void zunionInterAggregate(double *target, double val, int aggregat
}
}
static int zsetDictGetMaxElementLength(dict *d) {
dictIterator *di;
dictEntry *de;
size_t maxelelen = 0;
di = dictGetIterator(d);
while((de = dictNext(di)) != NULL) {
sds ele = dictGetKey(de);
if (sdslen(ele) > maxelelen) maxelelen = sdslen(ele);
}
dictReleaseIterator(di);
return maxelelen;
}
static void zdiffAlgorithm1(zsetopsrc *src, long setnum, zset *dstzset, size_t *maxelelen) {
/* DIFF Algorithm 1:
*
* We perform the diff by iterating all the elements of the first set,
* and only adding it to the target set if the element does not exist
* into all the other sets.
*
* This way we perform at max N*M operations, where N is the size of
* the first set, and M the number of sets.
*
* There is also a O(K*log(K)) cost for adding the resulting elements
* to the target set, where K is the final size of the target set.
*
* The final complexity of this algorithm is O(N*M + K*log(K)). */
int j;
zsetopval zval;
zskiplistNode *znode;
sds tmp;
/* With algorithm 1 it is better to order the sets to subtract
* by decreasing size, so that we are more likely to find
* duplicated elements ASAP. */
qsort(src+1,setnum-1,sizeof(zsetopsrc),zuiCompareByRevCardinality);
memset(&zval, 0, sizeof(zval));
zuiInitIterator(&src[0]);
while (zuiNext(&src[0],&zval)) {
double value;
int exists = 0;
for (j = 1; j < setnum; j++) {
/* It is not safe to access the zset we are
* iterating, so explicitly check for equal object.
* This check isn't really needed anymore since we already
* check for a duplicate set in the zsetChooseDiffAlgorithm
* function, but we're leaving it for future-proofing. */
if (src[j].subject == src[0].subject ||
zuiFind(&src[j],&zval,&value)) {
exists = 1;
break;
}
}
if (!exists) {
tmp = zuiNewSdsFromValue(&zval);
znode = zslInsert(dstzset->zsl,zval.score,tmp);
dictAdd(dstzset->dict,tmp,&znode->score);
if (sdslen(tmp) > *maxelelen) *maxelelen = sdslen(tmp);
}
}
}
static void zdiffAlgorithm2(zsetopsrc *src, long setnum, zset *dstzset, size_t *maxelelen) {
/* DIFF Algorithm 2:
*
* Add all the elements of the first set to the auxiliary set.
* Then remove all the elements of all the next sets from it.
*
* This is O(L + (N-K)log(N)) where L is the sum of all the elements in every
* set, N is the size of the first set, and K is the size of the result set.
*
* Note that from the (L-N) dict searches, (N-K) got to the zsetRemoveFromSkiplist
* which costs log(N)
*
* There is also a O(K) cost at the end for finding the largest element
* size, but this doesn't change the algorithm complexity since K < L, and
* O(2L) is the same as O(L). */
int j;
int cardinality = 0;
zsetopval zval;
zskiplistNode *znode;
sds tmp;
for (j = 0; j < setnum; j++) {
if (zuiLength(&src[j]) == 0) continue;
memset(&zval, 0, sizeof(zval));
zuiInitIterator(&src[j]);
while (zuiNext(&src[j],&zval)) {
if (j == 0) {
tmp = zuiNewSdsFromValue(&zval);
znode = zslInsert(dstzset->zsl,zval.score,tmp);
dictAdd(dstzset->dict,tmp,&znode->score);
cardinality++;
} else {
tmp = zuiNewSdsFromValue(&zval);
if (zsetRemoveFromSkiplist(dstzset, tmp)) {
cardinality--;
}
}
/* Exit if result set is empty as any additional removal
* of elements will have no effect. */
if (cardinality == 0) break;
}
if (cardinality == 0) break;
}
/* Redize dict if needed after removing multiple elements */
if (htNeedsResize(dstzset->dict)) dictResize(dstzset->dict);
/* Using this algorithm, we can't calculate the max element as we go,
* we have to iterate through all elements to find the max one after. */
*maxelelen = zsetDictGetMaxElementLength(dstzset->dict);
}
static int zsetChooseDiffAlgorithm(zsetopsrc *src, long setnum) {
int j;
/* Select what DIFF algorithm to use.
*
* Algorithm 1 is O(N*M + K*log(K)) where N is the size of the
* first set, M the total number of sets, and K is the size of the
* result set.
*
* Algorithm 2 is O(L + (N-K)log(N)) where L is the total number of elements
* in all the sets, N is the size of the first set, and K is the size of the
* result set.
*
* We compute what is the best bet with the current input here. */
long long algo_one_work = 0;
long long algo_two_work = 0;
for (j = 0; j < setnum; j++) {
/* If any other set is equal to the first set, there is nothing to be
* done, since we would remove all elements anyway. */
if (j > 0 && src[0].subject == src[j].subject) {
return 0;
}
algo_one_work += zuiLength(&src[0]);
algo_two_work += zuiLength(&src[j]);
}
/* Algorithm 1 has better constant times and performs less operations
* if there are elements in common. Give it some advantage. */
algo_one_work /= 2;
return (algo_one_work <= algo_two_work) ? 1 : 2;
}
static void zdiff(zsetopsrc *src, long setnum, zset *dstzset, size_t *maxelelen) {
/* Skip everything if the smallest input is empty. */
if (zuiLength(&src[0]) > 0) {
int diff_algo = zsetChooseDiffAlgorithm(src, setnum);
if (diff_algo == 1) {
zdiffAlgorithm1(src, setnum, dstzset, maxelelen);
} else if (diff_algo == 2) {
zdiffAlgorithm2(src, setnum, dstzset, maxelelen);
} else if (diff_algo != 0) {
serverPanic("Unknown algorithm");
}
}
}
uint64_t dictSdsHash(const void *key);
int dictSdsKeyCompare(void *privdata, const void *key1, const void *key2);
@ -2196,15 +2386,15 @@ dictType setAccumulatorDictType = {
NULL /* val destructor */
};
/* The zunionInterGenericCommand() function is called in order to implement the
* following commands: ZUNION, ZINTER, ZUNIONSTORE, ZINTERSTORE.
/* The zunionInterDiffGenericCommand() function is called in order to implement the
* following commands: ZUNION, ZINTER, ZDIFF, ZUNIONSTORE, ZINTERSTORE, ZDIFFSTORE.
*
* 'numkeysIndex' parameter position of key number. for ZUNION/ZINTER command, this
* value is 1, for ZUNIONSTORE/ZINTERSTORE command, this value is 2.
* 'numkeysIndex' parameter position of key number. for ZUNION/ZINTER/ZDIFF command,
* this value is 1, for ZUNIONSTORE/ZINTERSTORE/ZDIFFSTORE command, this value is 2.
*
* 'op' SET_OP_INTER or SET_OP_UNION.
* 'op' SET_OP_INTER, SET_OP_UNION or SET_OP_DIFF.
*/
void zunionInterGenericCommand(client *c, robj *dstkey, int numkeysIndex, int op) {
void zunionInterDiffGenericCommand(client *c, robj *dstkey, int numkeysIndex, int op) {
int i, j;
long setnum;
int aggregate = REDIS_AGGR_SUM;
@ -2223,7 +2413,7 @@ void zunionInterGenericCommand(client *c, robj *dstkey, int numkeysIndex, int op
if (setnum < 1) {
addReplyError(c,
"at least 1 input key is needed for ZUNIONSTORE/ZINTERSTORE");
"at least 1 input key is needed for ZUNIONSTORE/ZINTERSTORE/ZDIFFSTORE");
return;
}
@ -2260,7 +2450,8 @@ void zunionInterGenericCommand(client *c, robj *dstkey, int numkeysIndex, int op
int remaining = c->argc - j;
while (remaining) {
if (remaining >= (setnum + 1) &&
if (op != SET_OP_DIFF &&
remaining >= (setnum + 1) &&
!strcasecmp(c->argv[j]->ptr,"weights"))
{
j++; remaining--;
@ -2272,7 +2463,8 @@ void zunionInterGenericCommand(client *c, robj *dstkey, int numkeysIndex, int op
return;
}
}
} else if (remaining >= 2 &&
} else if (op != SET_OP_DIFF &&
remaining >= 2 &&
!strcasecmp(c->argv[j]->ptr,"aggregate"))
{
j++; remaining--;
@ -2289,6 +2481,7 @@ void zunionInterGenericCommand(client *c, robj *dstkey, int numkeysIndex, int op
}
j++; remaining--;
} else if (remaining >= 1 &&
!dstkey &&
!strcasecmp(c->argv[j]->ptr,"withscores"))
{
j++; remaining--;
@ -2301,9 +2494,11 @@ void zunionInterGenericCommand(client *c, robj *dstkey, int numkeysIndex, int op
}
}
/* sort sets from the smallest to largest, this will improve our
* algorithm's performance */
qsort(src,setnum,sizeof(zsetopsrc),zuiCompareByCardinality);
if (op != SET_OP_DIFF) {
/* sort sets from the smallest to largest, this will improve our
* algorithm's performance */
qsort(src,setnum,sizeof(zsetopsrc),zuiCompareByCardinality);
}
dstobj = createZsetObject();
dstzset = dstobj->ptr;
@ -2409,6 +2604,8 @@ void zunionInterGenericCommand(client *c, robj *dstkey, int numkeysIndex, int op
}
dictReleaseIterator(di);
dictRelease(accumulator);
} else if (op == SET_OP_DIFF) {
zdiff(src, setnum, dstzset, &maxelelen);
} else {
serverPanic("Unknown operator");
}
@ -2419,7 +2616,8 @@ void zunionInterGenericCommand(client *c, robj *dstkey, int numkeysIndex, int op
setKey(c, c->db, dstkey, dstobj);
addReplyLongLong(c, zsetLength(dstobj));
notifyKeyspaceEvent(NOTIFY_ZSET,
(op == SET_OP_UNION) ? "zunionstore" : "zinterstore",
(op == SET_OP_UNION) ? "zunionstore" :
(op == SET_OP_INTER ? "zinterstore" : "zdiffstore"),
dstkey, c->db->id);
server.dirty++;
} else {
@ -2451,19 +2649,27 @@ void zunionInterGenericCommand(client *c, robj *dstkey, int numkeysIndex, int op
}
void zunionstoreCommand(client *c) {
zunionInterGenericCommand(c, c->argv[1], 2, SET_OP_UNION);
zunionInterDiffGenericCommand(c, c->argv[1], 2, SET_OP_UNION);
}
void zinterstoreCommand(client *c) {
zunionInterGenericCommand(c, c->argv[1], 2, SET_OP_INTER);
zunionInterDiffGenericCommand(c, c->argv[1], 2, SET_OP_INTER);
}
void zdiffstoreCommand(client *c) {
zunionInterDiffGenericCommand(c, c->argv[1], 2, SET_OP_DIFF);
}
void zunionCommand(client *c) {
zunionInterGenericCommand(c, NULL, 1, SET_OP_UNION);
zunionInterDiffGenericCommand(c, NULL, 1, SET_OP_UNION);
}
void zinterCommand(client *c) {
zunionInterGenericCommand(c, NULL, 1, SET_OP_INTER);
zunionInterDiffGenericCommand(c, NULL, 1, SET_OP_INTER);
}
void zdiffCommand(client *c) {
zunionInterDiffGenericCommand(c, NULL, 1, SET_OP_DIFF);
}
void zrangeGenericCommand(client *c, int reverse) {

View File

@ -612,10 +612,11 @@ start_server {tags {"zset"}} {
assert_equal 0 [r exists dst_key]
}
test "ZUNION/ZINTER against non-existing key - $encoding" {
test "ZUNION/ZINTER/ZDIFF against non-existing key - $encoding" {
r del zseta
assert_equal {} [r zunion 1 zseta]
assert_equal {} [r zinter 1 zseta]
assert_equal {} [r zdiff 1 zseta]
}
test "ZUNIONSTORE with empty set - $encoding" {
@ -626,12 +627,13 @@ start_server {tags {"zset"}} {
r zrange zsetc 0 -1 withscores
} {a 1 b 2}
test "ZUNION/ZINTER with empty set - $encoding" {
test "ZUNION/ZINTER/ZDIFF with empty set - $encoding" {
r del zseta zsetb
r zadd zseta 1 a
r zadd zseta 2 b
assert_equal {a 1 b 2} [r zunion 2 zseta zsetb withscores]
assert_equal {} [r zinter 2 zseta zsetb withscores]
assert_equal {a 1 b 2} [r zdiff 2 zseta zsetb withscores]
}
test "ZUNIONSTORE basics - $encoding" {
@ -647,7 +649,7 @@ start_server {tags {"zset"}} {
assert_equal {a 1 b 3 d 3 c 5} [r zrange zsetc 0 -1 withscores]
}
test "ZUNION/ZINTER with integer members - $encoding" {
test "ZUNION/ZINTER/ZDIFF with integer members - $encoding" {
r del zsetd zsetf
r zadd zsetd 1 1
r zadd zsetd 2 2
@ -658,6 +660,7 @@ start_server {tags {"zset"}} {
assert_equal {1 2 2 2 4 4 3 6} [r zunion 2 zsetd zsetf withscores]
assert_equal {1 2 3 6} [r zinter 2 zsetd zsetf withscores]
assert_equal {2 2} [r zdiff 2 zsetd zsetf withscores]
}
test "ZUNIONSTORE with weights - $encoding" {
@ -773,6 +776,80 @@ start_server {tags {"zset"}} {
}
}
test "ZDIFFSTORE basics - $encoding" {
assert_equal 1 [r zdiffstore zsetc 2 zseta zsetb]
assert_equal {a 1} [r zrange zsetc 0 -1 withscores]
}
test "ZDIFF basics - $encoding" {
assert_equal {a 1} [r zdiff 2 zseta zsetb withscores]
}
test "ZDIFFSTORE with a regular set - $encoding" {
r del seta
r sadd seta a
r sadd seta b
r sadd seta c
assert_equal 1 [r zdiffstore zsetc 2 seta zsetb]
assert_equal {a 1} [r zrange zsetc 0 -1 withscores]
}
test "ZDIFF subtracting set from itself - $encoding" {
assert_equal 0 [r zdiffstore zsetc 2 zseta zseta]
assert_equal {} [r zrange zsetc 0 -1 withscores]
}
test "ZDIFF algorithm 1 - $encoding" {
r del zseta zsetb zsetc
r zadd zseta 1 a
r zadd zseta 2 b
r zadd zseta 3 c
r zadd zsetb 1 b
r zadd zsetb 2 c
r zadd zsetb 3 d
assert_equal 1 [r zdiffstore zsetc 2 zseta zsetb]
assert_equal {a 1} [r zrange zsetc 0 -1 withscores]
}
test "ZDIFF algorithm 2 - $encoding" {
r del zseta zsetb zsetc zsetd zsete
r zadd zseta 1 a
r zadd zseta 2 b
r zadd zseta 3 c
r zadd zseta 5 e
r zadd zsetb 1 b
r zadd zsetc 1 c
r zadd zsetd 1 d
assert_equal 2 [r zdiffstore zsete 4 zseta zsetb zsetc zsetd]
assert_equal {a 1 e 5} [r zrange zsete 0 -1 withscores]
}
test "ZDIFF fuzzing" {
for {set j 0} {$j < 100} {incr j} {
unset -nocomplain s
array set s {}
set args {}
set num_sets [expr {[randomInt 10]+1}]
for {set i 0} {$i < $num_sets} {incr i} {
set num_elements [randomInt 100]
r del zset_$i
lappend args zset_$i
while {$num_elements} {
set ele [randomValue]
r zadd zset_$i [randomInt 100] $ele
if {$i == 0} {
set s($ele) x
} else {
unset -nocomplain s($ele)
}
incr num_elements -1
}
}
set result [lsort [r zdiff [llength $args] {*}$args]]
assert_equal $result [lsort [array names s]]
}
}
test "Basic ZPOP with a single key - $encoding" {
r del zset
assert_equal {} [r zpopmin zset]
@ -889,6 +966,12 @@ start_server {tags {"zset"}} {
}
}
test "ZUNIONSTORE/ZINTERSTORE/ZDIFFSTORE error if using WITHSCORES " {
assert_error "*ERR*syntax*" {r zunionstore foo 2 zsetd zsetf withscores}
assert_error "*ERR*syntax*" {r zinterstore foo 2 zsetd zsetf withscores}
assert_error "*ERR*syntax*" {r zdiffstore foo 2 zsetd zsetf withscores}
}
test {ZMSCORE retrieve} {
r del zmscoretest
r zadd zmscoretest 10 x