Keyspace events: it is now possible to select subclasses of events.

When keyspace events are enabled, the overhead is not sever but
noticeable, so this commit introduces the ability to select subclasses
of events in order to avoid to generate events the user is not
interested in.

The events can be selected using redis.conf or CONFIG SET / GET.
This commit is contained in:
antirez 2013-01-25 13:19:08 +01:00
parent 1c0c551776
commit fce016d31b
14 changed files with 237 additions and 108 deletions

View File

@ -476,10 +476,39 @@ slowlog-max-len 128
# PUBLISH __keyspace@0__:foo del # PUBLISH __keyspace@0__:foo del
# PUBLISH __keyevent@0__:del foo # PUBLISH __keyevent@0__:del foo
# #
# While the overhead of this feature is relatively small most users don't # It is possible to select the events that Redis will notify among a set
# need it so it is disabled by default. You can enable it setting the # of classes. Every class is identified by a single character:
# following configuration option to yes. #
notify-keyspace-events no # K Keyspace events, published with __keyspace@<db>__ prefix.
# E Keyevent events, published with __keyevent@<db>__ prefix.
# g Generic commands (non-type specific) like DEL, EXPIRE, RENAME, ...
# $ String commands
# l List commands
# s Set commands
# h Hash commands
# z Sorted set commands
# x Expired events (events generated every time a key expires)
# e Evicted events (events generated when a key is evicted for maxmemory)
# A Alias for g$lshzxe, so that the "AKE" string means all the events.
#
# The "notify-keyspace-events" takes as argument a string that is composed
# by zero or multiple characters. The empty string means that notifications
# are disabled at all.
#
# Example: to enable list and generic events, from the point of view of the
# event name, use:
#
# notify-keyspace-events Elg
#
# Example 2: to get the stream of the expired keys subscribing to channel
# name __keyevent@0__:expired use:
#
# notify-keyspace-events Ex
#
# By default all notifications are disabled because most users don't need
# this feature and the feature has some overhead. Note that if you don't
# specify at least one of K or E, no events will be delivered.
notify-keyspace-events ""
############################### ADVANCED CONFIG ############################### ############################### ADVANCED CONFIG ###############################

View File

@ -153,7 +153,7 @@ void setbitCommand(redisClient *c) {
byteval |= ((on & 0x1) << bit); byteval |= ((on & 0x1) << bit);
((uint8_t*)o->ptr)[byte] = byteval; ((uint8_t*)o->ptr)[byte] = byteval;
signalModifiedKey(c->db,c->argv[1]); signalModifiedKey(c->db,c->argv[1]);
notifyKeyspaceEvent("setbit",c->argv[1],c->db->id); notifyKeyspaceEvent(REDIS_NOTIFY_STRING,"setbit",c->argv[1],c->db->id);
server.dirty++; server.dirty++;
addReply(c, bitval ? shared.cone : shared.czero); addReply(c, bitval ? shared.cone : shared.czero);
} }
@ -347,11 +347,11 @@ void bitopCommand(redisClient *c) {
if (maxlen) { if (maxlen) {
o = createObject(REDIS_STRING,res); o = createObject(REDIS_STRING,res);
setKey(c->db,targetkey,o); setKey(c->db,targetkey,o);
notifyKeyspaceEvent("set",targetkey,c->db->id); notifyKeyspaceEvent(REDIS_NOTIFY_STRING,"set",targetkey,c->db->id);
decrRefCount(o); decrRefCount(o);
} else if (dbDelete(c->db,targetkey)) { } else if (dbDelete(c->db,targetkey)) {
signalModifiedKey(c->db,targetkey); signalModifiedKey(c->db,targetkey);
notifyKeyspaceEvent("del",targetkey,c->db->id); notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,"del",targetkey,c->db->id);
} }
server.dirty++; server.dirty++;
addReplyLongLong(c,maxlen); /* Return the output string length in bytes. */ addReplyLongLong(c,maxlen); /* Return the output string length in bytes. */

View File

@ -392,9 +392,13 @@ void loadServerConfigFromString(char *config) {
} else if (!strcasecmp(argv[0],"slave-priority") && argc == 2) { } else if (!strcasecmp(argv[0],"slave-priority") && argc == 2) {
server.slave_priority = atoi(argv[1]); server.slave_priority = atoi(argv[1]);
} else if (!strcasecmp(argv[0],"notify-keyspace-events") && argc == 2) { } else if (!strcasecmp(argv[0],"notify-keyspace-events") && argc == 2) {
if ((server.notify_keyspace_events = yesnotoi(argv[1])) == -1) { int flags = keyspaceEventsStringToFlags(argv[1]);
err = "argument must be 'yes' or 'no'"; goto loaderr;
if (flags == -1) {
err = "Invalid event class character. Use 'g$lshzxeA'.";
goto loaderr;
} }
server.notify_keyspace_events = flags;
} else if (!strcasecmp(argv[0],"sentinel")) { } else if (!strcasecmp(argv[0],"sentinel")) {
/* argc == 1 is handled by main() as we need to enter the sentinel /* argc == 1 is handled by main() as we need to enter the sentinel
* mode ASAP. */ * mode ASAP. */
@ -714,10 +718,10 @@ void configSetCommand(redisClient *c) {
if (yn == -1) goto badfmt; if (yn == -1) goto badfmt;
server.rdb_compression = yn; server.rdb_compression = yn;
} else if (!strcasecmp(c->argv[2]->ptr,"notify-keyspace-events")) { } else if (!strcasecmp(c->argv[2]->ptr,"notify-keyspace-events")) {
int yn = yesnotoi(o->ptr); int flags = keyspaceEventsStringToFlags(o->ptr);
if (yn == -1) goto badfmt; if (flags == -1) goto badfmt;
server.notify_keyspace_events = yn; server.notify_keyspace_events = flags;
} else if (!strcasecmp(c->argv[2]->ptr,"slave-priority")) { } else if (!strcasecmp(c->argv[2]->ptr,"slave-priority")) {
if (getLongLongFromObject(o,&ll) == REDIS_ERR || if (getLongLongFromObject(o,&ll) == REDIS_ERR ||
ll <= 0) goto badfmt; ll <= 0) goto badfmt;
@ -827,7 +831,6 @@ void configGetCommand(redisClient *c) {
config_get_bool_field("rdbcompression", server.rdb_compression); config_get_bool_field("rdbcompression", server.rdb_compression);
config_get_bool_field("rdbchecksum", server.rdb_checksum); config_get_bool_field("rdbchecksum", server.rdb_checksum);
config_get_bool_field("activerehashing", server.activerehashing); config_get_bool_field("activerehashing", server.activerehashing);
config_get_bool_field("notify-keyspace-events", server.notify_keyspace_events);
/* Everything we can't handle with macros follows. */ /* Everything we can't handle with macros follows. */
@ -942,6 +945,15 @@ void configGetCommand(redisClient *c) {
addReplyBulkCString(c,buf); addReplyBulkCString(c,buf);
matches++; matches++;
} }
if (stringmatch(pattern,"notify-keyspace-events",0)) {
robj *flagsobj = createObject(REDIS_STRING,
keyspaceEventsFlagsToString(server.notify_keyspace_events));
addReplyBulkCString(c,"notify-keyspace-events");
addReplyBulk(c,flagsobj);
decrRefCount(flagsobj);
matches++;
}
setDeferredMultiBulkLength(c,replylen,matches*2); setDeferredMultiBulkLength(c,replylen,matches*2);
} }

View File

@ -240,7 +240,8 @@ void delCommand(redisClient *c) {
for (j = 1; j < c->argc; j++) { for (j = 1; j < c->argc; j++) {
if (dbDelete(c->db,c->argv[j])) { if (dbDelete(c->db,c->argv[j])) {
signalModifiedKey(c->db,c->argv[j]); signalModifiedKey(c->db,c->argv[j]);
notifyKeyspaceEvent("del",c->argv[j],c->db->id); notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,
"del",c->argv[j],c->db->id);
server.dirty++; server.dirty++;
deleted++; deleted++;
} }
@ -392,8 +393,10 @@ void renameGenericCommand(redisClient *c, int nx) {
dbDelete(c->db,c->argv[1]); dbDelete(c->db,c->argv[1]);
signalModifiedKey(c->db,c->argv[1]); signalModifiedKey(c->db,c->argv[1]);
signalModifiedKey(c->db,c->argv[2]); signalModifiedKey(c->db,c->argv[2]);
notifyKeyspaceEvent("rename_from",c->argv[1],c->db->id); notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,"rename_from",
notifyKeyspaceEvent("rename_to",c->argv[2],c->db->id); c->argv[1],c->db->id);
notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,"rename_to",
c->argv[2],c->db->id);
server.dirty++; server.dirty++;
addReply(c,nx ? shared.cone : shared.ok); addReply(c,nx ? shared.cone : shared.ok);
} }
@ -587,14 +590,14 @@ void expireGenericCommand(redisClient *c, long long basetime, int unit) {
rewriteClientCommandVector(c,2,aux,key); rewriteClientCommandVector(c,2,aux,key);
decrRefCount(aux); decrRefCount(aux);
signalModifiedKey(c->db,key); signalModifiedKey(c->db,key);
notifyKeyspaceEvent("del",key,c->db->id); notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,"del",key,c->db->id);
addReply(c, shared.cone); addReply(c, shared.cone);
return; return;
} else { } else {
setExpire(c->db,key,when); setExpire(c->db,key,when);
addReply(c,shared.cone); addReply(c,shared.cone);
signalModifiedKey(c->db,key); signalModifiedKey(c->db,key);
notifyKeyspaceEvent("expire",key,c->db->id); notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,"expire",key,c->db->id);
server.dirty++; server.dirty++;
return; return;
} }

View File

@ -30,50 +30,97 @@
#include "redis.h" #include "redis.h"
/* This file implements keyspace events notification via Pub/Sub ad /* This file implements keyspace events notification via Pub/Sub ad
* described at http://redis.io/topics/keyspace-events. * described at http://redis.io/topics/keyspace-events. */
/* Turn a string representing notification classes into an integer
* representing notification classes flags xored.
* *
* The API provided to the rest of the Redis core is a simple function: * The function returns -1 if the input contains characters not mapping to
* any class. */
int keyspaceEventsStringToFlags(char *classes) {
char *p = classes;
int c, flags = 0;
while((c = *p++) != '\0') {
switch(c) {
case 'A': flags |= REDIS_NOTIFY_ALL; break;
case 'g': flags |= REDIS_NOTIFY_GENERIC; break;
case '$': flags |= REDIS_NOTIFY_STRING; break;
case 'l': flags |= REDIS_NOTIFY_LIST; break;
case 's': flags |= REDIS_NOTIFY_SET; break;
case 'h': flags |= REDIS_NOTIFY_HASH; break;
case 'z': flags |= REDIS_NOTIFY_ZSET; break;
case 'x': flags |= REDIS_NOTIFY_EXPIRED; break;
case 'e': flags |= REDIS_NOTIFY_EVICTED; break;
case 'K': flags |= REDIS_NOTIFY_KEYSPACE; break;
case 'E': flags |= REDIS_NOTIFY_KEYEVENT; break;
default: return -1;
}
}
return flags;
}
/* This function does exactly the revese of the function above: it gets
* as input an integer with the xored flags and returns a string representing
* the selected classes. The string returned is an sds string that needs to
* be released with sdsfree(). */
sds keyspaceEventsFlagsToString(int flags) {
sds res;
if ((flags & REDIS_NOTIFY_ALL) == REDIS_NOTIFY_ALL)
return sdsnew("A");
res = sdsempty();
if (flags & REDIS_NOTIFY_GENERIC) res = sdscatlen(res,"g",1);
if (flags & REDIS_NOTIFY_STRING) res = sdscatlen(res,"$",1);
if (flags & REDIS_NOTIFY_LIST) res = sdscatlen(res,"l",1);
if (flags & REDIS_NOTIFY_SET) res = sdscatlen(res,"s",1);
if (flags & REDIS_NOTIFY_HASH) res = sdscatlen(res,"h",1);
if (flags & REDIS_NOTIFY_ZSET) res = sdscatlen(res,"z",1);
if (flags & REDIS_NOTIFY_EXPIRED) res = sdscatlen(res,"x",1);
if (flags & REDIS_NOTIFY_EVICTED) res = sdscatlen(res,"e",1);
if (flags & REDIS_NOTIFY_KEYSPACE) res = sdscatlen(res,"K",1);
if (flags & REDIS_NOTIFY_KEYEVENT) res = sdscatlen(res,"E",1);
return res;
}
/* The API provided to the rest of the Redis core is a simple function:
* *
* notifyKeyspaceEvent(char *event, robj *key, int dbid); * notifyKeyspaceEvent(char *event, robj *key, int dbid);
* *
* 'event' is a C string representing the event name. * 'event' is a C string representing the event name.
* 'key' is a Redis object representing the key name. * 'key' is a Redis object representing the key name.
* 'dbid' is the database ID where the key lives. * 'dbid' is the database ID where the key lives. */
*/ void notifyKeyspaceEvent(int type, char *event, robj *key, int dbid) {
sds chan;
void notifyKeyspaceEvent(char *event, robj *key, int dbid) { robj *chanobj;
sds keyspace_chan, keyevent_chan; int len = -1;
int len;
char buf[24]; char buf[24];
robj *chan1, *chan2, *eventobj;
if (!server.notify_keyspace_events) return; /* If notifications for this class of events are off, return ASAP. */
if (!(server.notify_keyspace_events & type)) return;
/* The prefix of the two channels is identical if not for /* __keyspace@<db>__:<key> <event> notifications. */
* 'keyspace' that is 'keyevent' in the event channel name, so if (server.notify_keyspace_events & REDIS_NOTIFY_KEYSPACE) {
* we build a single prefix and overwrite 'event' with 'space'. */ robj *eventobj;
keyspace_chan = sdsnewlen("__keyspace@",11);
len = ll2string(buf,sizeof(buf),dbid);
keyspace_chan = sdscatlen(keyspace_chan, buf, len);
keyspace_chan = sdscatlen(keyspace_chan, "__:", 3);
keyevent_chan = sdsdup(keyspace_chan); /* Dup the prefix. */
memcpy(keyevent_chan+5,"event",5); /* Fix it. */
eventobj = createStringObject(event,strlen(event)); chan = sdsnewlen("__keyspace@",11);
len = ll2string(buf,sizeof(buf),dbid);
chan = sdscatlen(chan, buf, len);
chan = sdscatlen(chan, "__:", 3);
eventobj = createStringObject(event,strlen(event));
chanobj = createObject(REDIS_STRING, chan);
pubsubPublishMessage(chanobj, eventobj);
decrRefCount(chanobj);
}
/* The keyspace channel name has a trailing key name, while /* __keyevente@<db>__:<event> <key> notifications. */
* the keyevent channel name has a trailing event name. */ if (server.notify_keyspace_events & REDIS_NOTIFY_KEYEVENT) {
keyspace_chan = sdscatsds(keyspace_chan, key->ptr); chan = sdsnewlen("__keyevent@",11);
keyevent_chan = sdscatsds(keyevent_chan, eventobj->ptr); if (len == -1) len = ll2string(buf,sizeof(buf),dbid);
chan1 = createObject(REDIS_STRING, keyspace_chan); chan = sdscatlen(chan, buf, len);
chan2 = createObject(REDIS_STRING, keyevent_chan); chan = sdscatlen(chan, "__:", 3);
chanobj = createObject(REDIS_STRING, chan);
/* Finally publish the two notifications. */ pubsubPublishMessage(chanobj, key);
pubsubPublishMessage(chan1, eventobj); decrRefCount(chanobj);
pubsubPublishMessage(chan2, key); }
/* Release objects. */
decrRefCount(eventobj);
decrRefCount(chan1);
decrRefCount(chan2);
} }

View File

@ -689,7 +689,8 @@ void activeExpireCycle(void) {
propagateExpire(db,keyobj); propagateExpire(db,keyobj);
dbDelete(db,keyobj); dbDelete(db,keyobj);
notifyKeyspaceEvent("expired",keyobj,db->id); notifyKeyspaceEvent(REDIS_NOTIFY_EXPIRED,
"expired",keyobj,db->id);
decrRefCount(keyobj); decrRefCount(keyobj);
expired++; expired++;
server.stat_expiredkeys++; server.stat_expiredkeys++;
@ -2432,7 +2433,8 @@ int freeMemoryIfNeeded(void) {
delta -= (long long) zmalloc_used_memory(); delta -= (long long) zmalloc_used_memory();
mem_freed += delta; mem_freed += delta;
server.stat_evictedkeys++; server.stat_evictedkeys++;
notifyKeyspaceEvent("evicted",keyobj,db->id); notifyKeyspaceEvent(REDIS_NOTIFY_EVICTED, "evicted",
keyobj, db->id);
decrRefCount(keyobj); decrRefCount(keyobj);
keys_freed++; keys_freed++;

View File

@ -291,6 +291,20 @@
#define REDIS_PROPAGATE_AOF 1 #define REDIS_PROPAGATE_AOF 1
#define REDIS_PROPAGATE_REPL 2 #define REDIS_PROPAGATE_REPL 2
/* Keyspace changes notification classes. Every class is associated with a
* character for configuration purposes. */
#define REDIS_NOTIFY_KEYSPACE (1<<0) /* K */
#define REDIS_NOTIFY_KEYEVENT (1<<1) /* E */
#define REDIS_NOTIFY_GENERIC (1<<2) /* g */
#define REDIS_NOTIFY_STRING (1<<3) /* $ */
#define REDIS_NOTIFY_LIST (1<<4) /* l */
#define REDIS_NOTIFY_SET (1<<5) /* s */
#define REDIS_NOTIFY_HASH (1<<6) /* h */
#define REDIS_NOTIFY_ZSET (1<<7) /* z */
#define REDIS_NOTIFY_EXPIRED (1<<8) /* x */
#define REDIS_NOTIFY_EVICTED (1<<9) /* e */
#define REDIS_NOTIFY_ALL (REDIS_NOTIFY_GENERIC | REDIS_NOTIFY_STRING | REDIS_NOTIFY_LIST | REDIS_NOTIFY_SET | REDIS_NOTIFY_HASH | REDIS_NOTIFY_ZSET | REDIS_NOTIFY_EXPIRED | REDIS_NOTIFY_EVICTED) /* A */
/* Using the following macro you can run code inside serverCron() with the /* Using the following macro you can run code inside serverCron() with the
* specified period, specified in milliseconds. * specified period, specified in milliseconds.
* The actual resolution depends on server.hz. */ * The actual resolution depends on server.hz. */
@ -776,7 +790,8 @@ struct redisServer {
/* Pubsub */ /* Pubsub */
dict *pubsub_channels; /* Map channels to list of subscribed clients */ dict *pubsub_channels; /* Map channels to list of subscribed clients */
list *pubsub_patterns; /* A list of pubsub_patterns */ list *pubsub_patterns; /* A list of pubsub_patterns */
int notify_keyspace_events; /* Propagate keyspace events via Pub/Sub. */ int notify_keyspace_events; /* Events to propagate via Pub/Sub. This is an
xor of REDIS_NOTIFY... flags. */
/* Cluster */ /* Cluster */
int cluster_enabled; /* Is cluster enabled? */ int cluster_enabled; /* Is cluster enabled? */
clusterState cluster; /* State of the cluster */ clusterState cluster; /* State of the cluster */
@ -1136,7 +1151,11 @@ int pubsubUnsubscribeAllPatterns(redisClient *c, int notify);
void freePubsubPattern(void *p); void freePubsubPattern(void *p);
int listMatchPubsubPattern(void *a, void *b); int listMatchPubsubPattern(void *a, void *b);
int pubsubPublishMessage(robj *channel, robj *message); int pubsubPublishMessage(robj *channel, robj *message);
void notifyKeyspaceEvent(char *event, robj *key, int dbid);
/* Keyspace events notification */
void notifyKeyspaceEvent(int type, char *event, robj *key, int dbid);
int keyspaceEventsStringToFlags(char *classes);
sds keyspaceEventsFlagsToString(int flags);
/* Configuration */ /* Configuration */
void loadServerConfig(char *filename, char *options); void loadServerConfig(char *filename, char *options);

View File

@ -504,11 +504,12 @@ void sortCommand(redisClient *c) {
} }
if (outputlen) { if (outputlen) {
setKey(c->db,storekey,sobj); setKey(c->db,storekey,sobj);
notifyKeyspaceEvent("sortstore",storekey,c->db->id); notifyKeyspaceEvent(REDIS_NOTIFY_LIST,"sortstore",storekey,
c->db->id);
server.dirty += outputlen; server.dirty += outputlen;
} else if (dbDelete(c->db,storekey)) { } else if (dbDelete(c->db,storekey)) {
signalModifiedKey(c->db,storekey); signalModifiedKey(c->db,storekey);
notifyKeyspaceEvent("del",storekey,c->db->id); notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,"del",storekey,c->db->id);
server.dirty++; server.dirty++;
} }
decrRefCount(sobj); decrRefCount(sobj);

View File

@ -474,7 +474,7 @@ void hsetCommand(redisClient *c) {
update = hashTypeSet(o,c->argv[2],c->argv[3]); update = hashTypeSet(o,c->argv[2],c->argv[3]);
addReply(c, update ? shared.czero : shared.cone); addReply(c, update ? shared.czero : shared.cone);
signalModifiedKey(c->db,c->argv[1]); signalModifiedKey(c->db,c->argv[1]);
notifyKeyspaceEvent("hset",c->argv[1],c->db->id); notifyKeyspaceEvent(REDIS_NOTIFY_HASH,"hset",c->argv[1],c->db->id);
server.dirty++; server.dirty++;
} }
@ -490,7 +490,7 @@ void hsetnxCommand(redisClient *c) {
hashTypeSet(o,c->argv[2],c->argv[3]); hashTypeSet(o,c->argv[2],c->argv[3]);
addReply(c, shared.cone); addReply(c, shared.cone);
signalModifiedKey(c->db,c->argv[1]); signalModifiedKey(c->db,c->argv[1]);
notifyKeyspaceEvent("hset",c->argv[1],c->db->id); notifyKeyspaceEvent(REDIS_NOTIFY_HASH,"hset",c->argv[1],c->db->id);
server.dirty++; server.dirty++;
} }
} }
@ -512,7 +512,7 @@ void hmsetCommand(redisClient *c) {
} }
addReply(c, shared.ok); addReply(c, shared.ok);
signalModifiedKey(c->db,c->argv[1]); signalModifiedKey(c->db,c->argv[1]);
notifyKeyspaceEvent("hset",c->argv[1],c->db->id); notifyKeyspaceEvent(REDIS_NOTIFY_HASH,"hset",c->argv[1],c->db->id);
server.dirty++; server.dirty++;
} }
@ -546,7 +546,7 @@ void hincrbyCommand(redisClient *c) {
decrRefCount(new); decrRefCount(new);
addReplyLongLong(c,value); addReplyLongLong(c,value);
signalModifiedKey(c->db,c->argv[1]); signalModifiedKey(c->db,c->argv[1]);
notifyKeyspaceEvent("hincrby",c->argv[1],c->db->id); notifyKeyspaceEvent(REDIS_NOTIFY_HASH,"hincrby",c->argv[1],c->db->id);
server.dirty++; server.dirty++;
} }
@ -573,7 +573,7 @@ void hincrbyfloatCommand(redisClient *c) {
hashTypeSet(o,c->argv[2],new); hashTypeSet(o,c->argv[2],new);
addReplyBulk(c,new); addReplyBulk(c,new);
signalModifiedKey(c->db,c->argv[1]); signalModifiedKey(c->db,c->argv[1]);
notifyKeyspaceEvent("hincrbyfloat",c->argv[1],c->db->id); notifyKeyspaceEvent(REDIS_NOTIFY_HASH,"hincrbyfloat",c->argv[1],c->db->id);
server.dirty++; server.dirty++;
/* Always replicate HINCRBYFLOAT as an HSET command with the final value /* Always replicate HINCRBYFLOAT as an HSET command with the final value
@ -671,8 +671,10 @@ void hdelCommand(redisClient *c) {
} }
if (deleted) { if (deleted) {
signalModifiedKey(c->db,c->argv[1]); signalModifiedKey(c->db,c->argv[1]);
notifyKeyspaceEvent("hdel",c->argv[1],c->db->id); notifyKeyspaceEvent(REDIS_NOTIFY_HASH,"hdel",c->argv[1],c->db->id);
if (keyremoved) notifyKeyspaceEvent("del",c->argv[1],c->db->id); if (keyremoved)
notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,"del",c->argv[1],
c->db->id);
server.dirty += deleted; server.dirty += deleted;
} }
addReplyLongLong(c,deleted); addReplyLongLong(c,deleted);

View File

@ -320,7 +320,7 @@ void pushGenericCommand(redisClient *c, int where) {
char *event = (where == REDIS_HEAD) ? "lpush" : "rpush"; char *event = (where == REDIS_HEAD) ? "lpush" : "rpush";
signalModifiedKey(c->db,c->argv[1]); signalModifiedKey(c->db,c->argv[1]);
notifyKeyspaceEvent(event,c->argv[1],c->db->id); notifyKeyspaceEvent(REDIS_NOTIFY_LIST,event,c->argv[1],c->db->id);
} }
server.dirty += pushed; server.dirty += pushed;
} }
@ -367,7 +367,8 @@ void pushxGenericCommand(redisClient *c, robj *refval, robj *val, int where) {
ziplistLen(subject->ptr) > server.list_max_ziplist_entries) ziplistLen(subject->ptr) > server.list_max_ziplist_entries)
listTypeConvert(subject,REDIS_ENCODING_LINKEDLIST); listTypeConvert(subject,REDIS_ENCODING_LINKEDLIST);
signalModifiedKey(c->db,c->argv[1]); signalModifiedKey(c->db,c->argv[1]);
notifyKeyspaceEvent("linsert",c->argv[1],c->db->id); notifyKeyspaceEvent(REDIS_NOTIFY_LIST,"linsert",
c->argv[1],c->db->id);
server.dirty++; server.dirty++;
} else { } else {
/* Notify client of a failed insert */ /* Notify client of a failed insert */
@ -379,7 +380,7 @@ void pushxGenericCommand(redisClient *c, robj *refval, robj *val, int where) {
listTypePush(subject,val,where); listTypePush(subject,val,where);
signalModifiedKey(c->db,c->argv[1]); signalModifiedKey(c->db,c->argv[1]);
notifyKeyspaceEvent(event,c->argv[1],c->db->id); notifyKeyspaceEvent(REDIS_NOTIFY_LIST,event,c->argv[1],c->db->id);
server.dirty++; server.dirty++;
} }
@ -474,7 +475,7 @@ void lsetCommand(redisClient *c) {
decrRefCount(value); decrRefCount(value);
addReply(c,shared.ok); addReply(c,shared.ok);
signalModifiedKey(c->db,c->argv[1]); signalModifiedKey(c->db,c->argv[1]);
notifyKeyspaceEvent("lset",c->argv[1],c->db->id); notifyKeyspaceEvent(REDIS_NOTIFY_LIST,"lset",c->argv[1],c->db->id);
server.dirty++; server.dirty++;
} }
} else if (o->encoding == REDIS_ENCODING_LINKEDLIST) { } else if (o->encoding == REDIS_ENCODING_LINKEDLIST) {
@ -487,7 +488,7 @@ void lsetCommand(redisClient *c) {
incrRefCount(value); incrRefCount(value);
addReply(c,shared.ok); addReply(c,shared.ok);
signalModifiedKey(c->db,c->argv[1]); signalModifiedKey(c->db,c->argv[1]);
notifyKeyspaceEvent("lset",c->argv[1],c->db->id); notifyKeyspaceEvent(REDIS_NOTIFY_LIST,"lset",c->argv[1],c->db->id);
server.dirty++; server.dirty++;
} }
} else { } else {
@ -507,9 +508,10 @@ void popGenericCommand(redisClient *c, int where) {
addReplyBulk(c,value); addReplyBulk(c,value);
decrRefCount(value); decrRefCount(value);
notifyKeyspaceEvent(event,c->argv[1],c->db->id); notifyKeyspaceEvent(REDIS_NOTIFY_LIST,event,c->argv[1],c->db->id);
if (listTypeLength(o) == 0) { if (listTypeLength(o) == 0) {
notifyKeyspaceEvent("del",c->argv[1],c->db->id); notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,"del",
c->argv[1],c->db->id);
dbDelete(c->db,c->argv[1]); dbDelete(c->db,c->argv[1]);
} }
signalModifiedKey(c->db,c->argv[1]); signalModifiedKey(c->db,c->argv[1]);
@ -632,10 +634,10 @@ void ltrimCommand(redisClient *c) {
redisPanic("Unknown list encoding"); redisPanic("Unknown list encoding");
} }
notifyKeyspaceEvent("ltrim",c->argv[1],c->db->id); notifyKeyspaceEvent(REDIS_NOTIFY_LIST,"ltrim",c->argv[1],c->db->id);
if (listTypeLength(o) == 0) { if (listTypeLength(o) == 0) {
dbDelete(c->db,c->argv[1]); dbDelete(c->db,c->argv[1]);
notifyKeyspaceEvent("del",c->argv[1],c->db->id); notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,"del",c->argv[1],c->db->id);
} }
signalModifiedKey(c->db,c->argv[1]); signalModifiedKey(c->db,c->argv[1]);
server.dirty++; server.dirty++;
@ -711,7 +713,7 @@ void rpoplpushHandlePush(redisClient *c, robj *dstkey, robj *dstobj, robj *value
} }
signalModifiedKey(c->db,dstkey); signalModifiedKey(c->db,dstkey);
listTypePush(dstobj,value,REDIS_HEAD); listTypePush(dstobj,value,REDIS_HEAD);
notifyKeyspaceEvent("lpush",dstkey,c->db->id); notifyKeyspaceEvent(REDIS_NOTIFY_LIST,"lpush",dstkey,c->db->id);
/* Always send the pushed value to the client. */ /* Always send the pushed value to the client. */
addReplyBulk(c,value); addReplyBulk(c,value);
} }
@ -741,10 +743,11 @@ void rpoplpushCommand(redisClient *c) {
decrRefCount(value); decrRefCount(value);
/* Delete the source list when it is empty */ /* Delete the source list when it is empty */
notifyKeyspaceEvent("rpop",touchedkey,c->db->id); notifyKeyspaceEvent(REDIS_NOTIFY_LIST,"rpop",touchedkey,c->db->id);
if (listTypeLength(sobj) == 0) { if (listTypeLength(sobj) == 0) {
dbDelete(c->db,touchedkey); dbDelete(c->db,touchedkey);
notifyKeyspaceEvent("del",touchedkey,c->db->id); notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,"del",
touchedkey,c->db->id);
} }
signalModifiedKey(c->db,touchedkey); signalModifiedKey(c->db,touchedkey);
decrRefCount(touchedkey); decrRefCount(touchedkey);
@ -1077,10 +1080,12 @@ void blockingPopGenericCommand(redisClient *c, int where) {
addReplyBulk(c,c->argv[j]); addReplyBulk(c,c->argv[j]);
addReplyBulk(c,value); addReplyBulk(c,value);
decrRefCount(value); decrRefCount(value);
notifyKeyspaceEvent(event,c->argv[j],c->db->id); notifyKeyspaceEvent(REDIS_NOTIFY_LIST,event,
c->argv[j],c->db->id);
if (listTypeLength(o) == 0) { if (listTypeLength(o) == 0) {
dbDelete(c->db,c->argv[j]); dbDelete(c->db,c->argv[j]);
notifyKeyspaceEvent("del",c->argv[j],c->db->id); notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,"del",
c->argv[j],c->db->id);
} }
signalModifiedKey(c->db,c->argv[j]); signalModifiedKey(c->db,c->argv[j]);
server.dirty++; server.dirty++;

View File

@ -268,7 +268,7 @@ void saddCommand(redisClient *c) {
} }
if (added) { if (added) {
signalModifiedKey(c->db,c->argv[1]); signalModifiedKey(c->db,c->argv[1]);
notifyKeyspaceEvent("sadd",c->argv[1],c->db->id); notifyKeyspaceEvent(REDIS_NOTIFY_SET,"sadd",c->argv[1],c->db->id);
} }
server.dirty += added; server.dirty += added;
addReplyLongLong(c,added); addReplyLongLong(c,added);
@ -293,8 +293,10 @@ void sremCommand(redisClient *c) {
} }
if (deleted) { if (deleted) {
signalModifiedKey(c->db,c->argv[1]); signalModifiedKey(c->db,c->argv[1]);
notifyKeyspaceEvent("srem",c->argv[1],c->db->id); notifyKeyspaceEvent(REDIS_NOTIFY_SET,"srem",c->argv[1],c->db->id);
if (keyremoved) notifyKeyspaceEvent("del",c->argv[1],c->db->id); if (keyremoved)
notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,"del",c->argv[1],
c->db->id);
server.dirty += deleted; server.dirty += deleted;
} }
addReplyLongLong(c,deleted); addReplyLongLong(c,deleted);
@ -328,12 +330,12 @@ void smoveCommand(redisClient *c) {
addReply(c,shared.czero); addReply(c,shared.czero);
return; return;
} }
notifyKeyspaceEvent("srem",c->argv[1],c->db->id); notifyKeyspaceEvent(REDIS_NOTIFY_SET,"srem",c->argv[1],c->db->id);
/* Remove the src set from the database when empty */ /* Remove the src set from the database when empty */
if (setTypeSize(srcset) == 0) { if (setTypeSize(srcset) == 0) {
dbDelete(c->db,c->argv[1]); dbDelete(c->db,c->argv[1]);
notifyKeyspaceEvent("del",c->argv[1],c->db->id); notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,"del",c->argv[1],c->db->id);
} }
signalModifiedKey(c->db,c->argv[1]); signalModifiedKey(c->db,c->argv[1]);
signalModifiedKey(c->db,c->argv[2]); signalModifiedKey(c->db,c->argv[2]);
@ -348,7 +350,7 @@ void smoveCommand(redisClient *c) {
/* An extra key has changed when ele was successfully added to dstset */ /* An extra key has changed when ele was successfully added to dstset */
if (setTypeAdd(dstset,ele)) { if (setTypeAdd(dstset,ele)) {
server.dirty++; server.dirty++;
notifyKeyspaceEvent("sadd",c->argv[2],c->db->id); notifyKeyspaceEvent(REDIS_NOTIFY_SET,"sadd",c->argv[2],c->db->id);
} }
addReply(c,shared.cone); addReply(c,shared.cone);
} }
@ -391,7 +393,7 @@ void spopCommand(redisClient *c) {
incrRefCount(ele); incrRefCount(ele);
setTypeRemove(set,ele); setTypeRemove(set,ele);
} }
notifyKeyspaceEvent("spop",c->argv[1],c->db->id); notifyKeyspaceEvent(REDIS_NOTIFY_SET,"spop",c->argv[1],c->db->id);
/* Replicate/AOF this command as an SREM operation */ /* Replicate/AOF this command as an SREM operation */
aux = createStringObject("SREM",4); aux = createStringObject("SREM",4);
@ -402,7 +404,7 @@ void spopCommand(redisClient *c) {
addReplyBulk(c,ele); addReplyBulk(c,ele);
if (setTypeSize(set) == 0) { if (setTypeSize(set) == 0) {
dbDelete(c->db,c->argv[1]); dbDelete(c->db,c->argv[1]);
notifyKeyspaceEvent("del",c->argv[1],c->db->id); notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,"del",c->argv[1],c->db->id);
} }
signalModifiedKey(c->db,c->argv[1]); signalModifiedKey(c->db,c->argv[1]);
server.dirty++; server.dirty++;
@ -708,7 +710,8 @@ void sinterGenericCommand(redisClient *c, robj **setkeys, unsigned long setnum,
if (setTypeSize(dstset) > 0) { if (setTypeSize(dstset) > 0) {
dbAdd(c->db,dstkey,dstset); dbAdd(c->db,dstkey,dstset);
addReplyLongLong(c,setTypeSize(dstset)); addReplyLongLong(c,setTypeSize(dstset));
notifyKeyspaceEvent("sinterstore",dstkey,c->db->id); notifyKeyspaceEvent(REDIS_NOTIFY_SET,"sinterstore",
dstkey,c->db->id);
} else { } else {
decrRefCount(dstset); decrRefCount(dstset);
addReply(c,shared.czero); addReply(c,shared.czero);
@ -874,7 +877,7 @@ void sunionDiffGenericCommand(redisClient *c, robj **setkeys, int setnum, robj *
if (setTypeSize(dstset) > 0) { if (setTypeSize(dstset) > 0) {
dbAdd(c->db,dstkey,dstset); dbAdd(c->db,dstkey,dstset);
addReplyLongLong(c,setTypeSize(dstset)); addReplyLongLong(c,setTypeSize(dstset));
notifyKeyspaceEvent( notifyKeyspaceEvent(REDIS_NOTIFY_SET,
op == REDIS_OP_UNION ? "sunionstore" : "sdiffstore", op == REDIS_OP_UNION ? "sunionstore" : "sdiffstore",
dstkey,c->db->id); dstkey,c->db->id);
} else { } else {

View File

@ -62,8 +62,9 @@ void setGenericCommand(redisClient *c, int nx, robj *key, robj *val, robj *expir
setKey(c->db,key,val); setKey(c->db,key,val);
server.dirty++; server.dirty++;
if (expire) setExpire(c->db,key,mstime()+milliseconds); if (expire) setExpire(c->db,key,mstime()+milliseconds);
notifyKeyspaceEvent("set",key,c->db->id); notifyKeyspaceEvent(REDIS_NOTIFY_STRING,"set",key,c->db->id);
if (expire) notifyKeyspaceEvent("expire",key,c->db->id); if (expire) notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,
"expire",key,c->db->id);
addReply(c, nx ? shared.cone : shared.ok); addReply(c, nx ? shared.cone : shared.ok);
} }
@ -110,7 +111,7 @@ void getsetCommand(redisClient *c) {
if (getGenericCommand(c) == REDIS_ERR) return; if (getGenericCommand(c) == REDIS_ERR) return;
c->argv[2] = tryObjectEncoding(c->argv[2]); c->argv[2] = tryObjectEncoding(c->argv[2]);
setKey(c->db,c->argv[1],c->argv[2]); setKey(c->db,c->argv[1],c->argv[2]);
notifyKeyspaceEvent("set",c->argv[1],c->db->id); notifyKeyspaceEvent(REDIS_NOTIFY_STRING,"set",c->argv[1],c->db->id);
server.dirty++; server.dirty++;
} }
@ -172,7 +173,8 @@ void setrangeCommand(redisClient *c) {
o->ptr = sdsgrowzero(o->ptr,offset+sdslen(value)); o->ptr = sdsgrowzero(o->ptr,offset+sdslen(value));
memcpy((char*)o->ptr+offset,value,sdslen(value)); memcpy((char*)o->ptr+offset,value,sdslen(value));
signalModifiedKey(c->db,c->argv[1]); signalModifiedKey(c->db,c->argv[1]);
notifyKeyspaceEvent("setrange",c->argv[1],c->db->id); notifyKeyspaceEvent(REDIS_NOTIFY_STRING,
"setrange",c->argv[1],c->db->id);
server.dirty++; server.dirty++;
} }
addReplyLongLong(c,sdslen(o->ptr)); addReplyLongLong(c,sdslen(o->ptr));
@ -257,7 +259,7 @@ void msetGenericCommand(redisClient *c, int nx) {
for (j = 1; j < c->argc; j += 2) { for (j = 1; j < c->argc; j += 2) {
c->argv[j+1] = tryObjectEncoding(c->argv[j+1]); c->argv[j+1] = tryObjectEncoding(c->argv[j+1]);
setKey(c->db,c->argv[j],c->argv[j+1]); setKey(c->db,c->argv[j],c->argv[j+1]);
notifyKeyspaceEvent("set",c->argv[j],c->db->id); notifyKeyspaceEvent(REDIS_NOTIFY_STRING,"set",c->argv[j],c->db->id);
} }
server.dirty += (c->argc-1)/2; server.dirty += (c->argc-1)/2;
addReply(c, nx ? shared.cone : shared.ok); addReply(c, nx ? shared.cone : shared.ok);
@ -292,7 +294,7 @@ void incrDecrCommand(redisClient *c, long long incr) {
else else
dbAdd(c->db,c->argv[1],new); dbAdd(c->db,c->argv[1],new);
signalModifiedKey(c->db,c->argv[1]); signalModifiedKey(c->db,c->argv[1]);
notifyKeyspaceEvent("incrby",c->argv[1],c->db->id); notifyKeyspaceEvent(REDIS_NOTIFY_STRING,"incrby",c->argv[1],c->db->id);
server.dirty++; server.dirty++;
addReply(c,shared.colon); addReply(c,shared.colon);
addReply(c,new); addReply(c,new);
@ -342,7 +344,7 @@ void incrbyfloatCommand(redisClient *c) {
else else
dbAdd(c->db,c->argv[1],new); dbAdd(c->db,c->argv[1],new);
signalModifiedKey(c->db,c->argv[1]); signalModifiedKey(c->db,c->argv[1]);
notifyKeyspaceEvent("incrbyfloat",c->argv[1],c->db->id); notifyKeyspaceEvent(REDIS_NOTIFY_STRING,"incrbyfloat",c->argv[1],c->db->id);
server.dirty++; server.dirty++;
addReplyBulk(c,new); addReplyBulk(c,new);
@ -390,7 +392,7 @@ void appendCommand(redisClient *c) {
totlen = sdslen(o->ptr); totlen = sdslen(o->ptr);
} }
signalModifiedKey(c->db,c->argv[1]); signalModifiedKey(c->db,c->argv[1]);
notifyKeyspaceEvent("append",c->argv[1],c->db->id); notifyKeyspaceEvent(REDIS_NOTIFY_STRING,"append",c->argv[1],c->db->id);
server.dirty++; server.dirty++;
addReplyLongLong(c,totlen); addReplyLongLong(c,totlen);
} }

View File

@ -968,7 +968,8 @@ cleanup:
zfree(scores); zfree(scores);
if (added || updated) { if (added || updated) {
signalModifiedKey(c->db,key); signalModifiedKey(c->db,key);
notifyKeyspaceEvent(incr ? "zincr" : "zadd", key, c->db->id); notifyKeyspaceEvent(REDIS_NOTIFY_ZSET,
incr ? "zincr" : "zadd", key, c->db->id);
} }
} }
@ -1029,8 +1030,9 @@ void zremCommand(redisClient *c) {
} }
if (deleted) { if (deleted) {
notifyKeyspaceEvent("zrem",key,c->db->id); notifyKeyspaceEvent(REDIS_NOTIFY_ZSET,"zrem",key,c->db->id);
if (keyremoved) notifyKeyspaceEvent("del",key,c->db->id); if (keyremoved)
notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,"del",key,c->db->id);
signalModifiedKey(c->db,key); signalModifiedKey(c->db,key);
server.dirty += deleted; server.dirty += deleted;
} }
@ -1073,8 +1075,9 @@ void zremrangebyscoreCommand(redisClient *c) {
if (deleted) { if (deleted) {
signalModifiedKey(c->db,key); signalModifiedKey(c->db,key);
notifyKeyspaceEvent("zrembyscore",key,c->db->id); notifyKeyspaceEvent(REDIS_NOTIFY_ZSET,"zrembyscore",key,c->db->id);
if (keyremoved) notifyKeyspaceEvent("del",key,c->db->id); if (keyremoved)
notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,"del",key,c->db->id);
} }
server.dirty += deleted; server.dirty += deleted;
addReplyLongLong(c,deleted); addReplyLongLong(c,deleted);
@ -1132,8 +1135,9 @@ void zremrangebyrankCommand(redisClient *c) {
if (deleted) { if (deleted) {
signalModifiedKey(c->db,key); signalModifiedKey(c->db,key);
notifyKeyspaceEvent("zrembyrank",key,c->db->id); notifyKeyspaceEvent(REDIS_NOTIFY_ZSET,"zrembyrank",key,c->db->id);
if (keyremoved) notifyKeyspaceEvent("del",key,c->db->id); if (keyremoved)
notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,"del",key,c->db->id);
} }
server.dirty += deleted; server.dirty += deleted;
addReplyLongLong(c,deleted); addReplyLongLong(c,deleted);
@ -1676,7 +1680,7 @@ void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) {
if (dbDelete(c->db,dstkey)) { if (dbDelete(c->db,dstkey)) {
signalModifiedKey(c->db,dstkey); signalModifiedKey(c->db,dstkey);
notifyKeyspaceEvent("del",dstkey,c->db->id); notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,"del",dstkey,c->db->id);
touched = 1; touched = 1;
server.dirty++; server.dirty++;
} }
@ -1689,7 +1693,7 @@ void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) {
dbAdd(c->db,dstkey,dstobj); dbAdd(c->db,dstkey,dstobj);
addReplyLongLong(c,zsetLength(dstobj)); addReplyLongLong(c,zsetLength(dstobj));
if (!touched) signalModifiedKey(c->db,dstkey); if (!touched) signalModifiedKey(c->db,dstkey);
notifyKeyspaceEvent( notifyKeyspaceEvent(REDIS_NOTIFY_ZSET,
(op == REDIS_OP_UNION) ? "zunionstore" : "zinterstore", (op == REDIS_OP_UNION) ? "zunionstore" : "zinterstore",
dstkey,c->db->id); dstkey,c->db->id);
server.dirty++; server.dirty++;

View File

@ -13,7 +13,7 @@
# units are case insensitive so 1GB 1Gb 1gB are all the same. # units are case insensitive so 1GB 1Gb 1gB are all the same.
# Enable keyspace events notification for testing so that we cover more code. # Enable keyspace events notification for testing so that we cover more code.
notify-keyspace-events yes notify-keyspace-events A
# By default Redis does not run as a daemon. Use 'yes' if you need it. # By default Redis does not run as a daemon. Use 'yes' if you need it.
# Note that Redis will write a pid file in /var/run/redis.pid when daemonized. # Note that Redis will write a pid file in /var/run/redis.pid when daemonized.