PSYNC: work in progress, preview #2, rebased to unstable.

This commit is contained in:
antirez 2013-01-30 18:33:16 +01:00
parent e34a35a511
commit 078882025e
8 changed files with 756 additions and 71 deletions

View File

@ -227,6 +227,28 @@ slave-read-only yes
# be a good idea.
repl-disable-tcp-nodelay no
# Set the replication backlog size. The backlog is a buffer that accumulates
# slave data when slaves are disconnected for some time, so that when a slave
# wants to reconnect again, often a full resync is not needed, but a partial
# resync is enough, just passing the portion of data the slave missed while
# disconnected.
#
# The biggest the replication backlog, the longer the time the slave can be
# disconnected and later be able to perform a partial resynchronization.
#
# The backlog is only allocated once there is at least a slave connected.
#
# repl-backlog-size 1mb
# After a master has no longer connected slaves for some time, the backlog
# will be freed. The following option configures the amount of seconds that
# need to elapse, starting from the time the last slave disconnected, for
# the backlog buffer to be freed.
#
# A value of 0 means to never release the backlog.
#
# repl-backlog-ttl 3600
# The slave priority is an integer number published by Redis in the INFO output.
# It is used by Redis Sentinel in order to select a slave to promote into a
# master if the master is no longer working correctly.

View File

@ -238,6 +238,18 @@ void loadServerConfigFromString(char *config) {
} else if (!strcasecmp(argv[0],"repl-disable-tcp-nodelay") && argc==2) {
if ((server.repl_disable_tcp_nodelay = yesnotoi(argv[1])) == -1) {
err = "argument must be 'yes' or 'no'"; goto loaderr;
} else if (!strcasecmp(argv[0],"repl-backlog-size") && argc == 2) {
long long size = strtoll(argv[0],NULL,10);
if (size <= 0) {
err = "repl-backlog-size must be 1 or greater.";
goto loaderr;
}
resizeReplicationBacklog(size);
} else if (!strcasecmp(argv[0],"repl-backlog-ttl") && argc == 2) {
server.repl_backlog_time_limit = atoi(argv[1]);
if (server.repl_backlog_time_limit < 0) {
err = "repl-backlog-ttl can't be negative ";
goto loaderr;
}
} else if (!strcasecmp(argv[0],"masterauth") && argc == 2) {
server.masterauth = zstrdup(argv[1]);
@ -719,6 +731,12 @@ void configSetCommand(redisClient *c) {
} else if (!strcasecmp(c->argv[2]->ptr,"repl-timeout")) {
if (getLongLongFromObject(o,&ll) == REDIS_ERR || ll <= 0) goto badfmt;
server.repl_timeout = ll;
} else if (!strcasecmp(c->argv[2]->ptr,"repl-backlog-size")) {
if (getLongLongFromObject(o,&ll) == REDIS_ERR || ll <= 0) goto badfmt;
resizeReplicationBacklog(ll);
} else if (!strcasecmp(c->argv[2]->ptr,"repl-backlog-ttl")) {
if (getLongLongFromObject(o,&ll) == REDIS_ERR || ll < 0) goto badfmt;
server.repl_backlog_time_limit = ll;
} else if (!strcasecmp(c->argv[2]->ptr,"watchdog-period")) {
if (getLongLongFromObject(o,&ll) == REDIS_ERR || ll < 0) goto badfmt;
if (ll)
@ -832,6 +850,8 @@ void configGetCommand(redisClient *c) {
config_get_numerical_field("databases",server.dbnum);
config_get_numerical_field("repl-ping-slave-period",server.repl_ping_slave_period);
config_get_numerical_field("repl-timeout",server.repl_timeout);
config_get_numerical_field("repl-backlog-size",server.repl_backlog_size);
config_get_numerical_field("repl-backlog-ttl",server.repl_backlog_time_limit);
config_get_numerical_field("maxclients",server.maxclients);
config_get_numerical_field("watchdog-period",server.watchdog_period);
config_get_numerical_field("slave-priority",server.slave_priority);

View File

@ -511,8 +511,7 @@ void propagateExpire(redisDb *db, robj *key) {
if (server.aof_state != REDIS_AOF_OFF)
feedAppendOnlyFile(server.delCommand,db->id,argv,2);
if (listLength(server.slaves))
replicationFeedSlaves(server.slaves,db->id,argv,2);
replicationFeedSlaves(server.slaves,db->id,argv,2);
decrRefCount(argv[0]);
decrRefCount(argv[1]);

View File

@ -108,8 +108,7 @@ void execCommandReplicateMulti(redisClient *c) {
if (server.aof_state != REDIS_AOF_OFF)
feedAppendOnlyFile(server.multiCommand,c->db->id,&multistring,1);
if (listLength(server.slaves))
replicationFeedSlaves(server.slaves,c->db->id,&multistring,1);
replicationFeedSlaves(server.slaves,c->db->id,&multistring,1);
decrRefCount(multistring);
}

View File

@ -87,6 +87,7 @@ redisClient *createClient(int fd) {
c->ctime = c->lastinteraction = server.unixtime;
c->authenticated = 0;
c->replstate = REDIS_REPL_NONE;
c->reploff = 0;
c->slave_listening_port = 0;
c->reply = listCreate();
c->reply_bytes = 0;
@ -595,12 +596,42 @@ void disconnectSlaves(void) {
}
}
/* This function is called when the slave lose the connection with the
* master into an unexpected way. */
void replicationHandleMasterDisconnection(void) {
server.master = NULL;
server.repl_state = REDIS_REPL_CONNECT;
server.repl_down_since = server.unixtime;
/* We lost connection with our master, force our slaves to resync
* with us as well to load the new data set.
*
* If server.masterhost is NULL the user called SLAVEOF NO ONE so
* slave resync is not needed. */
if (server.masterhost != NULL) disconnectSlaves();
}
void freeClient(redisClient *c) {
listNode *ln;
/* If this is marked as current client unset it */
if (server.current_client == c) server.current_client = NULL;
/* If it is our master that's beging disconnected we should make sure
* to cache the state to try a partial resynchronization later.
*
* Note that before doing this we make sure that the client is not in
* some unexpected state, by checking its flags. */
if (server.master &&
(c->flags & REDIS_MASTER) &&
!(c->flags & (REDIS_CLOSE_AFTER_REPLY|
REDIS_CLOSE_ASAP|
REDIS_BLOCKED|
REDIS_UNBLOCKED)))
{
replicationCacheMaster(c);
return;
}
/* Note that if the client we are freeing is blocked into a blocking
* call, we have to set querybuf to NULL *before* to call
* unblockClientWaitingData() to avoid processInputBuffer() will get
@ -620,16 +651,21 @@ void freeClient(redisClient *c) {
pubsubUnsubscribeAllPatterns(c,0);
dictRelease(c->pubsub_channels);
listRelease(c->pubsub_patterns);
/* Obvious cleanup */
aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
/* Close socket, unregister events, and remove list of replies and
* accumulated arguments. */
if (c->fd != -1) {
aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
close(c->fd);
}
listRelease(c->reply);
freeClientArgv(c);
close(c->fd);
/* Remove from the list of clients */
ln = listSearchKey(server.clients,c);
redisAssert(ln != NULL);
listDelNode(server.clients,ln);
if (c->fd != -1) {
ln = listSearchKey(server.clients,c);
redisAssert(ln != NULL);
listDelNode(server.clients,ln);
}
/* When client was just unblocked because of a blocking operation,
* remove it from the list with unblocked clients. */
if (c->flags & REDIS_UNBLOCKED) {
@ -647,20 +683,15 @@ void freeClient(redisClient *c) {
ln = listSearchKey(l,c);
redisAssert(ln != NULL);
listDelNode(l,ln);
/* We need to remember the time when we started to have zero
* attached slaves, as after some time we'll free the replication
* backlog. */
if (c->flags & REDIS_SLAVE && listLength(server.slaves) == 0)
server.repl_no_slaves_since = server.unixtime;
}
/* Case 2: we lost the connection with the master. */
if (c->flags & REDIS_MASTER) {
server.master = NULL;
server.repl_state = REDIS_REPL_CONNECT;
server.repl_down_since = server.unixtime;
/* We lost connection with our master, force our slaves to resync
* with us as well to load the new data set.
*
* If server.masterhost is NULL the user called SLAVEOF NO ONE so
* slave resync is not needed. */
if (server.masterhost != NULL) disconnectSlaves();
}
if (c->flags & REDIS_MASTER) replicationHandleMasterDisconnection();
/* If this client was scheduled for async freeing we need to remove it
* from the queue. */
@ -1059,6 +1090,7 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
if (nread) {
sdsIncrLen(c->querybuf,nread);
c->lastinteraction = server.unixtime;
if (c->flags & REDIS_MASTER) c->reploff += nread;
} else {
server.current_client = NULL;
return;

View File

@ -220,6 +220,7 @@ struct redisCommand redisCommandTable[] = {
{"exec",execCommand,1,"sM",0,NULL,0,0,0,0,0},
{"discard",discardCommand,1,"rs",0,NULL,0,0,0,0,0},
{"sync",syncCommand,1,"ars",0,NULL,0,0,0,0,0},
{"psync",syncCommand,3,"ars",0,NULL,0,0,0,0,0},
{"replconf",replconfCommand,-1,"ars",0,NULL,0,0,0,0,0},
{"flushdb",flushdbCommand,1,"w",0,NULL,0,0,0,0,0},
{"flushall",flushallCommand,1,"w",0,NULL,0,0,0,0,0},
@ -1202,6 +1203,8 @@ void initServerConfig() {
server.masterhost = NULL;
server.masterport = 6379;
server.master = NULL;
server.cached_master = NULL;
server.repl_master_initial_offset = -1;
server.repl_state = REDIS_REPL_NONE;
server.repl_syncio_timeout = REDIS_REPL_SYNCIO_TIMEOUT;
server.repl_serve_stale_data = 1;
@ -1209,6 +1212,16 @@ void initServerConfig() {
server.repl_down_since = time(NULL);
server.repl_disable_tcp_nodelay = 0;
server.slave_priority = REDIS_DEFAULT_SLAVE_PRIORITY;
server.master_repl_offset = 0;
/* Replication partial resync backlog */
server.repl_backlog = NULL;
server.repl_backlog_size = REDIS_DEFAULT_REPL_BACKLOG_SIZE;
server.repl_backlog_histlen = 0;
server.repl_backlog_idx = 0;
server.repl_backlog_off = 0;
server.repl_backlog_time_limit = REDIS_DEFAULT_REPL_BACKLOG_TIME_LIMIT;
server.repl_no_slaves_since = time(NULL);
/* Client output buffer limits */
server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_NORMAL].hard_limit_bytes = 0;
@ -1522,7 +1535,7 @@ void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
{
if (server.aof_state != REDIS_AOF_OFF && flags & REDIS_PROPAGATE_AOF)
feedAppendOnlyFile(cmd,dbid,argv,argc);
if (flags & REDIS_PROPAGATE_REPL && listLength(server.slaves))
if (flags & REDIS_PROPAGATE_REPL)
replicationFeedSlaves(server.slaves,dbid,argv,argc);
}
@ -2151,13 +2164,15 @@ sds genRedisInfoString(char *section) {
"master_link_status:%s\r\n"
"master_last_io_seconds_ago:%d\r\n"
"master_sync_in_progress:%d\r\n"
"slave_repl_offset:%lld\r\n"
,server.masterhost,
server.masterport,
(server.repl_state == REDIS_REPL_CONNECTED) ?
"up" : "down",
server.master ?
((int)(server.unixtime-server.master->lastinteraction)) : -1,
server.repl_state == REDIS_REPL_TRANSFER
server.repl_state == REDIS_REPL_TRANSFER,
server.master ? server.master->reploff : -1
);
if (server.repl_state == REDIS_REPL_TRANSFER) {
@ -2215,6 +2230,17 @@ sds genRedisInfoString(char *section) {
slaveid++;
}
}
info = sdscatprintf(info,
"master_repl_offset:%lld\r\n"
"repl_backlog_active:%d\r\n"
"repl_backlog_size:%lld\r\n"
"repl_backlog_first_byte_offset:%lld\r\n"
"repl_backlog_histlen:%lld\r\n",
server.master_repl_offset,
server.repl_backlog != NULL,
server.repl_backlog_size,
server.repl_backlog_off,
server.repl_backlog_histlen);
}
/* CPU */

View File

@ -93,6 +93,9 @@
#define REDIS_REPL_PING_SLAVE_PERIOD 10
#define REDIS_RUN_ID_SIZE 40
#define REDIS_OPS_SEC_SAMPLES 16
#define REDIS_DEFAULT_REPL_BACKLOG_SIZE (1024*1024) /* 1mb */
#define REDIS_DEFAULT_REPL_BACKLOG_TIME_LIMIT (60*60) /* 1 hour */
#define REDIS_REPL_BACKLOG_MIN_SIZE (1024*16) /* 16k */
/* Protocol and I/O related defines */
#define REDIS_MAX_QUERYBUF_LEN (1024*1024*1024) /* 1GB max query buffer. */
@ -100,6 +103,7 @@
#define REDIS_REPLY_CHUNK_BYTES (16*1024) /* 16k output buffer */
#define REDIS_INLINE_MAX_SIZE (1024*64) /* Max size of inline reads */
#define REDIS_MBULK_BIG_ARG (1024*32)
#define REDIS_LONGSTR_SIZE 21 /* Bytes needed for long -> str */
/* Hash table parameters */
#define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
@ -407,7 +411,8 @@ typedef struct redisClient {
long bulklen; /* length of bulk argument in multi bulk request */
list *reply;
unsigned long reply_bytes; /* Tot bytes of objects in reply list */
int sentlen;
int sentlen; /* Amount of bytes already sent in the current
buffer or object being sent. */
time_t ctime; /* Client creation time */
time_t lastinteraction; /* time of the last interaction, used for timeout */
time_t obuf_soft_limit_reached_time;
@ -417,6 +422,8 @@ typedef struct redisClient {
int repldbfd; /* replication DB file descriptor */
long repldboff; /* replication DB file offset */
off_t repldbsize; /* replication DB file size */
long long reploff; /* replication offset if this is our master */
char replrunid[REDIS_RUN_ID_SIZE+1]; /* master run id if this is a master */
int slave_listening_port; /* As configured with: SLAVECONF listening-port */
multiState mstate; /* MULTI/EXEC state */
blockingState bpop; /* blocking state */
@ -662,7 +669,6 @@ struct redisServer {
list *clients; /* List of active clients */
list *clients_to_close; /* Clients to close asynchronously */
list *slaves, *monitors; /* List of slaves and MONITORs */
int slaveseldb; /* Last SELECTed DB in replication output */
redisClient *current_client; /* Current client, only used on crash report */
char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */
dict *migrate_cached_sockets;/* MIGRATE cached sockets */
@ -745,13 +751,27 @@ struct redisServer {
int syslog_enabled; /* Is syslog enabled? */
char *syslog_ident; /* Syslog ident */
int syslog_facility; /* Syslog facility */
/* Slave specific fields */
/* Replication (master) */
int slaveseldb; /* Last SELECTed DB in replication output */
long long master_repl_offset; /* Global replication offset */
int repl_ping_slave_period; /* Master pings the slave every N seconds */
char *repl_backlog; /* Replication backlog for partial syncs */
long long repl_backlog_size; /* Backlog circular buffer size */
long long repl_backlog_histlen; /* Backlog actual data length */
long long repl_backlog_idx; /* Backlog circular buffer current offset */
long long repl_backlog_off; /* Replication offset of first byte in the
backlog buffer. */
time_t repl_backlog_time_limit; /* Time without slaves after the backlog
gets released. */
time_t repl_no_slaves_since; /* We have no slaves since that time.
Only valid if server.slaves len is 0. */
/* Replication (slave) */
char *masterauth; /* AUTH with this password with master */
char *masterhost; /* Hostname of master */
int masterport; /* Port of master */
int repl_ping_slave_period; /* Master pings the slave every N seconds */
int repl_timeout; /* Timeout after N seconds of master idle */
redisClient *master; /* Client that is master for this slave */
redisClient *cached_master; /* Cached master to be reused for PSYNC. */
int repl_syncio_timeout; /* Timeout for synchronous I/O calls */
int repl_state; /* Replication status if the instance is a slave */
off_t repl_transfer_size; /* Size of RDB to read from master during sync. */
@ -766,6 +786,8 @@ struct redisServer {
time_t repl_down_since; /* Unix time at which link with master went down */
int repl_disable_tcp_nodelay; /* Disable TCP_NODELAY after SYNC? */
int slave_priority; /* Reported in INFO and used by Sentinel. */
char repl_master_runid[REDIS_RUN_ID_SIZE+1]; /* Master run id for PSYNC. */
long long repl_master_initial_offset; /* Master PSYNC offset. */
/* Limits */
unsigned int maxclients; /* Max number of simultaneous clients */
unsigned long long maxmemory; /* Max number of memory bytes to use */
@ -930,6 +952,7 @@ void exitFromChild(int retcode);
redisClient *createClient(int fd);
void closeTimedoutClients(void);
void freeClient(redisClient *c);
void freeClientAsync(redisClient *c);
void resetClient(redisClient *c);
void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask);
void addReply(redisClient *c, robj *obj);
@ -1053,6 +1076,9 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc);
void replicationFeedMonitors(redisClient *c, list *monitors, int dictid, robj **argv, int argc);
void updateSlavesWaitingBgsave(int bgsaveerr);
void replicationCron(void);
void replicationHandleMasterDisconnection(void);
void replicationCacheMaster(redisClient *c);
void resizeReplicationBacklog(long long newsize);
/* Generic persistence functions */
void startLoading(FILE *fp);

View File

@ -37,13 +37,234 @@
#include <sys/socket.h>
#include <sys/stat.h>
void replicationDiscardCachedMaster(void);
void replicationResurrectCachedMaster(int newfd);
/* ---------------------------------- MASTER -------------------------------- */
void createReplicationBacklog(void) {
redisAssert(server.repl_backlog == NULL);
server.repl_backlog = zmalloc(server.repl_backlog_size);
server.repl_backlog_histlen = 0;
server.repl_backlog_idx = 0;
/* When a new backlog buffer is created, we increment the replication
* offset by one to make sure we'll not be able to PSYNC with any
* previous slave. This is needed because we avoid incrementing the
* master_repl_offset if no backlog exists nor slaves are attached. */
server.master_repl_offset++;
/* We don't have any data inside our buffer, but virtually the first
* byte we have is the next byte that will be generated for the
* replication stream. */
server.repl_backlog_off = server.master_repl_offset+1;
}
/* This function is called when the user modifies the replication backlog
* size at runtime. It is up to the function to both update the
* server.repl_backlog_size and to resize the buffer and setup it so that
* it contains the same data as the previous one (possibly less data, but
* the most recent bytes, or the same data and more free space in case the
* buffer is enlarged). */
void resizeReplicationBacklog(long long newsize) {
if (newsize < REDIS_REPL_BACKLOG_MIN_SIZE)
newsize = REDIS_REPL_BACKLOG_MIN_SIZE;
if (server.repl_backlog_size == newsize) return;
server.repl_backlog_size = newsize;
if (server.repl_backlog != NULL) {
/* What we actually do is to flush the old buffer and realloc a new
* empty one. It will refill with new data incrementally.
* The reason is that copying a few gigabytes adds latency and even
* worse often we need to alloc additional space before freeing the
* old buffer. */
zfree(server.repl_backlog);
server.repl_backlog = zmalloc(server.repl_backlog_size);
server.repl_backlog_histlen = 0;
server.repl_backlog_idx = 0;
/* Next byte we have is... the next since the buffer is emtpy. */
server.repl_backlog_off = server.master_repl_offset+1;
}
}
void freeReplicationBacklog(void) {
redisAssert(server.repl_backlog != NULL);
zfree(server.repl_backlog);
server.repl_backlog = NULL;
}
/* Add data to the replication backlog.
* This function also increments the global replication offset stored at
* server.master_repl_offset, because there is no case where we want to feed
* the backlog without incrementing the buffer. */
void feedReplicationBacklog(void *ptr, size_t len) {
unsigned char *p = ptr;
server.master_repl_offset += len;
/* This is a circular buffer, so write as much data we can at every
* iteration and rewind the "idx" index if we reach the limit. */
while(len) {
size_t thislen = server.repl_backlog_size - server.repl_backlog_idx;
if (thislen > len) thislen = len;
memcpy(server.repl_backlog+server.repl_backlog_idx,p,thislen);
server.repl_backlog_idx += thislen;
if (server.repl_backlog_idx == server.repl_backlog_size)
server.repl_backlog_idx = 0;
len -= thislen;
p += thislen;
server.repl_backlog_histlen += thislen;
}
if (server.repl_backlog_histlen > server.repl_backlog_size)
server.repl_backlog_histlen = server.repl_backlog_size;
/* Set the offset of the first byte we have in the backlog. */
server.repl_backlog_off = server.master_repl_offset -
server.repl_backlog_histlen + 1;
}
/* Wrapper for feedReplicationBacklog() that takes Redis string objects
* as input. */
void feedReplicationBacklogWithObject(robj *o) {
char llstr[REDIS_LONGSTR_SIZE];
void *p;
size_t len;
if (o->encoding == REDIS_ENCODING_INT) {
len = ll2string(llstr,sizeof(llstr),(long)o->ptr);
p = llstr;
} else {
len = sdslen(o->ptr);
p = o->ptr;
}
feedReplicationBacklog(p,len);
}
#define FEEDSLAVE_BUF_SIZE (1024*64)
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
listNode *ln;
listIter li;
int j;
int j, i, len;
char buf[FEEDSLAVE_BUF_SIZE], *b = buf;
char llstr[REDIS_LONGSTR_SIZE];
int buf_left = FEEDSLAVE_BUF_SIZE;
robj *o;
/* If there aren't slaves, and there is no backlog buffer to populate,
* we can return ASAP. */
if (server.repl_backlog == NULL && listLength(slaves) == 0) return;
/* We can't have slaves attached and no backlog. */
redisAssert(!(listLength(slaves) != 0 && server.repl_backlog == NULL));
/* What we do here is to try to write as much data as possible in a static
* buffer "buf" that is used to create an object that is later sent to all
* the slaves. This way we do the decoding only one time for most commands
* not containing big payloads. */
/* Create the SELECT command into the static buffer if needed. */
if (server.slaveseldb != dictid) {
char *selectcmd;
size_t sclen;
if (dictid >= 0 && dictid < REDIS_SHARED_SELECT_CMDS) {
selectcmd = shared.select[dictid]->ptr;
sclen = sdslen(selectcmd);
memcpy(b,selectcmd,sclen);
b += sclen;
buf_left -= sclen;
} else {
int dictid_len;
dictid_len = ll2string(llstr,sizeof(llstr),dictid);
sclen = snprintf(b,buf_left,"*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",
dictid_len, llstr);
b += sclen;
buf_left -= sclen;
}
}
server.slaveseldb = dictid;
/* Add the multi bulk reply size to the static buffer, that is, the number
* of arguments of the command to send to every slave. */
b[0] = '*';
len = ll2string(b+1,REDIS_LONGSTR_SIZE,argc);
b += len+1;
buf_left -= len;
b[0] = '\r';
b[1] = '\n';
b += 2;
buf_left -= 2;
/* Try to use the static buffer for as much arguments is possible. */
for (j = 0; j < argc; j++) {
int objlen;
char *objptr;
if (argv[j]->encoding != REDIS_ENCODING_RAW &&
argv[j]->encoding != REDIS_ENCODING_INT) {
redisPanic("Unexpected encoding");
}
if (argv[j]->encoding == REDIS_ENCODING_RAW) {
objlen = sdslen(argv[j]->ptr);
objptr = argv[j]->ptr;
} else {
objlen = ll2string(llstr,REDIS_LONGSTR_SIZE,(long)argv[j]->ptr);
objptr = llstr;
}
/* We need enough space for bulk reply encoding, newlines, and
* the data itself. */
if (buf_left < objlen+REDIS_LONGSTR_SIZE+32) break;
/* Write $...CRLF */
b[0] = '$';
len = ll2string(b+1,REDIS_LONGSTR_SIZE,objlen);
b += len+1;
buf_left -= len;
b[0] = '\r';
b[1] = '\n';
b += 2;
buf_left -= 2;
/* And data plus CRLF */
memcpy(b,objptr,objlen);
b += objlen;
buf_left -= objlen;
b[0] = '\r';
b[1] = '\n';
b += 2;
buf_left -= 2;
}
/* Create an object with the static buffer content. */
redisAssert(buf_left < FEEDSLAVE_BUF_SIZE);
o = createStringObject(buf,b-buf);
/* If we have a backlog, populate it with data and increment
* the global replication offset. */
if (server.repl_backlog) {
feedReplicationBacklogWithObject(o);
for (i = j; i < argc; i++) {
char aux[REDIS_LONGSTR_SIZE+3];
long objlen = stringObjectLen(argv[i]);
/* We need to feed the buffer with the object as a bulk reply
* not just as a plain string, so create the $..CRLF payload len
* ad add the final CRLF */
aux[0] = '$';
len = ll2string(aux+1,objlen,sizeof(aux)-1);
aux[len+1] = '\r';
aux[len+2] = '\n';
feedReplicationBacklog(aux,len+3);
feedReplicationBacklogWithObject(argv[j]);
feedReplicationBacklogWithObject(shared.crlf);
}
}
/* Write data to slaves. Here we do two things:
* 1) We write the "o" object that was created using the accumulated
* static buffer.
* 2) We write any additional argument of the command to replicate that
* was not written inside the static buffer for lack of space.
*/
listRewind(slaves,&li);
while((ln = listNext(&li))) {
redisClient *slave = ln->value;
@ -54,29 +275,16 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
/* Feed slaves that are waiting for the initial SYNC (so these commands
* are queued in the output buffer until the initial SYNC completes),
* or are already in sync with the master. */
if (server.slaveseldb != dictid) {
robj *selectcmd;
if (dictid >= 0 && dictid < REDIS_SHARED_SELECT_CMDS) {
selectcmd = shared.select[dictid];
incrRefCount(selectcmd);
} else {
char dictid_str[64];
int dictid_len;
/* First, trasmit the object created from the static buffer. */
addReply(slave,o);
dictid_len = ll2string(dictid_str,sizeof(dictid_str),dictid);
selectcmd = createObject(REDIS_STRING,
sdscatprintf(sdsempty(),
"*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",
dictid_len, dictid_str));
}
addReply(slave,selectcmd);
decrRefCount(selectcmd);
}
addReplyMultiBulkLen(slave,argc);
for (j = 0; j < argc; j++) addReplyBulk(slave,argv[j]);
/* Finally any additional argument that was not stored inside the
* static buffer if any (from j to argc). */
for (i = j; i < argc; i++)
addReplyBulk(slave,argv[i]);
}
server.slaveseldb = dictid;
decrRefCount(o);
}
void replicationFeedMonitors(redisClient *c, list *monitors, int dictid, robj **argv, int argc) {
@ -120,6 +328,120 @@ void replicationFeedMonitors(redisClient *c, list *monitors, int dictid, robj **
decrRefCount(cmdobj);
}
/* Feed the slave 'c' with the replication backlog starting from the
* specified 'offset' up to the end of the backlog. */
long long addReplyReplicationBacklog(redisClient *c, long long offset) {
long long j, skip, len;
// printf("SLAVE REQUEST %lld\n", offset);
if (server.repl_backlog_histlen == 0) {
// printf("NO HISTORY\n");
return 0;
}
// printf("FIRST BYTE WE HAVE %lld\n", server.repl_backlog_off);
// printf("HISTLEN %lld\n", server.repl_backlog_histlen);
// printf("IDX %lld\n", server.repl_backlog_idx);
/* Compute the amount of bytes we need to discard. */
skip = offset - server.repl_backlog_off;
// printf("SKIP %lld\n", skip);
/* Point j to the oldest byte, that is actaully our
* server.repl_backlog_off byte. */
j = (server.repl_backlog_idx +
(server.repl_backlog_size-server.repl_backlog_histlen)) %
server.repl_backlog_size;
// printf("J %lld\n", j);
/* Discard the amount of data to seek to the specified 'offset'. */
j = (j + skip) % server.repl_backlog_size;
/* Feed slave with data. Since it is a circular buffer we have to
* split the reply in two parts if we are cross-boundary. */
len = server.repl_backlog_histlen - skip;
// printf("LEN %lld\n", len);
while(len) {
long long thislen =
((server.repl_backlog_size - j) < len) ?
(server.repl_backlog_size - j) : len;
// printf("WRITE %lld\n", thislen);
addReplySds(c,sdsnewlen(server.repl_backlog + j, thislen));
len -= thislen;
j = 0;
}
return server.repl_backlog_histlen - skip;
}
/* This function handles the PSYNC command from the point of view of a
* master receiving a request for partial resynchronization.
*
* On success return REDIS_OK, otherwise REDIS_ERR is returned and we proceed
* with the usual full resync. */
int masterTryPartialResynchronization(redisClient *c) {
long long psync_offset, psync_len;
char *master_runid = c->argv[1]->ptr;
/* Is the runid of this master the same advertised by the wannabe slave
* via PSYNC? If runid changed this master is a different instance and
* there is no way to continue. */
if (strcasecmp(master_runid, server.runid)) {
/* Run id "?" is used by slaves that want to force a full resync. */
if (master_runid[0] != '?') {
redisLog(REDIS_NOTICE,"Partial resynchronization not accepted: "
"Runid mismatch (Client asked for '%s', I'm '%s')",
master_runid, server.runid);
} else {
redisLog(REDIS_NOTICE,"Full resync requested by slave.");
}
goto need_full_resync;
}
/* We still have the data our slave is asking for? */
if (getLongLongFromObjectOrReply(c,c->argv[2],&psync_offset,NULL) !=
REDIS_OK) goto need_full_resync;
if (!server.repl_backlog ||
psync_offset < server.repl_backlog_off ||
psync_offset >= (server.repl_backlog_off + server.repl_backlog_size))
{
redisLog(REDIS_NOTICE,
"Unable to partial resync with the slave for lack of backlog (Slave request was: %lld).", psync_offset);
goto need_full_resync;
}
/* If we reached this point, we are able to perform a partial resync:
* 1) Set client state to make it a slave.
* 2) Inform the client we can continue with +CONTINUE
* 3) Send the backlog data (from the offset to the end) to the slave. */
c->flags |= REDIS_SLAVE;
c->replstate = REDIS_REPL_ONLINE;
listAddNodeTail(server.slaves,c);
addReplySds(c,sdsnew("+CONTINUE\r\n"));
psync_len = addReplyReplicationBacklog(c,psync_offset);
redisLog(REDIS_NOTICE,
"Partial resynchronization request accepted. Sending %lld bytes of backlog starting from offset %lld.", psync_len, psync_offset);
/* Note that we don't need to set the selected DB at server.slaveseldb
* to -1 to force the master to emit SELECT, since the slave already
* has this state from the previous connection with the master. */
return REDIS_OK; /* The caller can return, no full resync needed. */
need_full_resync:
/* We need a full resync for some reason... notify the client. */
psync_offset = server.master_repl_offset;
/* Add 1 to psync_offset if it the replication backlog does not exists
* as when it will be created later we'll increment the offset by one. */
if (server.repl_backlog == NULL) psync_offset++;
addReplySds(c,
sdscatprintf(sdsempty(),"+FULLRESYNC %s %lld\r\n",
server.runid,
psync_offset));
return REDIS_ERR;
}
/* SYNC ad PSYNC command implemenation. */
void syncCommand(redisClient *c) {
/* ignore SYNC if already slave or in monitor mode */
if (c->flags & REDIS_SLAVE) return;
@ -136,11 +458,24 @@ void syncCommand(redisClient *c) {
* buffer registering the differences between the BGSAVE and the current
* dataset, so that we can copy to other slaves if needed. */
if (listLength(c->reply) != 0) {
addReplyError(c,"SYNC is invalid with pending input");
addReplyError(c,"SYNC and PSYNC are invalid with pending input");
return;
}
redisLog(REDIS_NOTICE,"Slave ask for synchronization");
redisLog(REDIS_NOTICE,"Slave asks for synchronization");
/* Try a partial resynchronization if this is a PSYNC command.
* If it fails, we continue with usual full resynchronization, however
* when this happens masterTryPartialResynchronization() already
* replied with:
*
* +FULLRESYNC <runid> <offset>
*
* So the slave knows the new runid and offset to try a PSYNC later
* if the connection with the master is lost. */
if (!strcasecmp(c->argv[0]->ptr,"psync") &&
masterTryPartialResynchronization(c) == REDIS_OK) return;
/* Here we need to check if there is a background saving operation
* in progress, or if it is required to start one */
if (server.rdb_child_pid != -1) {
@ -185,6 +520,8 @@ void syncCommand(redisClient *c) {
c->flags |= REDIS_SLAVE;
server.slaveseldb = -1; /* Force to re-emit the SELECT command. */
listAddNodeTail(server.slaves,c);
if (listLength(server.slaves) == 1 && server.repl_backlog == NULL)
createReplicationBacklog();
return;
}
@ -452,6 +789,9 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
server.master->flags |= REDIS_MASTER;
server.master->authenticated = 1;
server.repl_state = REDIS_REPL_CONNECTED;
server.master->reploff = server.repl_master_initial_offset;
memcpy(server.master->replrunid, server.repl_master_runid,
sizeof(server.repl_master_runid));
redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Finished with success");
/* Restart the AOF subsystem now that we finished the sync. This
* will trigger an AOF rewrite, and when done will start appending
@ -481,8 +821,8 @@ error:
/* Send a synchronous command to the master. Used to send AUTH and
* REPLCONF commands before starting the replication with SYNC.
*
* On success NULL is returned.
* On error an sds string describing the error is returned.
* The command returns an sds string representing the result of the
* operation. On error the first byte is a "-".
*/
char *sendSynchronousCommand(int fd, ...) {
va_list ap;
@ -504,7 +844,7 @@ char *sendSynchronousCommand(int fd, ...) {
/* Transfer command to the server. */
if (syncWrite(fd,cmd,sdslen(cmd),server.repl_syncio_timeout*1000) == -1) {
sdsfree(cmd);
return sdscatprintf(sdsempty(),"Writing to master: %s",
return sdscatprintf(sdsempty(),"-Writing to master: %s",
strerror(errno));
}
sdsfree(cmd);
@ -512,22 +852,123 @@ char *sendSynchronousCommand(int fd, ...) {
/* Read the reply from the server. */
if (syncReadLine(fd,buf,sizeof(buf),server.repl_syncio_timeout*1000) == -1)
{
return sdscatprintf(sdsempty(),"Reading from master: %s",
return sdscatprintf(sdsempty(),"-Reading from master: %s",
strerror(errno));
}
return sdsnew(buf);
}
/* Check for errors from the server. */
if (buf[0] != '+') {
return sdscatprintf(sdsempty(),"Error from master: %s", buf);
/* Try a partial resynchronization with the master if we are about to reconnect.
* If there is no cached master structure, at least try to issue a
* "PSYNC ? -1" command in order to trigger a full resync using the PSYNC
* command in order to obtain the master run id and the master replication
* global offset.
*
* This function is designed to be called from syncWithMaster(), so the
* following assumptions are made:
*
* 1) We pass the function an already connected socket "fd".
* 2) This function does not close the file descriptor "fd". However in case
* of successful partial resynchronization, the function will reuse
* 'fd' as file descriptor of the server.master client structure.
*
* The function returns:
*
* PSYNC_CONTINUE: If the PSYNC command succeded and we can continue.
* PSYNC_FULLRESYNC: If PSYNC is supported but a full resync is needed.
* In this case the master run_id and global replication
* offset is saved.
* PSYNC_NOT_SUPPORTED: If the server does not understand PSYNC at all and
* the caller should fall back to SYNC.
*/
#define PSYNC_CONTINUE 0
#define PSYNC_FULLRESYNC 1
#define PSYNC_NOT_SUPPORTED 2
int slaveTryPartialResynchronization(int fd) {
char *psync_runid;
char psync_offset[32];
sds reply;
/* Initially set repl_master_initial_offset to -1 to mark the current
* master run_id and offset as not valid. Later if we'll be able to do
* a FULL resync using the PSYNC command we'll set the offset at the
* right value, so that this information will be propagated to the
* client structure representing the master into server.master. */
server.repl_master_initial_offset = -1;
if (server.cached_master) {
psync_runid = server.cached_master->replrunid;
snprintf(psync_offset,sizeof(psync_offset),"%lld", server.cached_master->reploff+1);
redisLog(REDIS_NOTICE,"Trying a partial resynchronization (request %s:%s).", psync_runid, psync_offset);
} else {
redisLog(REDIS_NOTICE,"Partial resynchronization not possible (no cached master)");
psync_runid = "?";
memcpy(psync_offset,"-1",3);
}
return NULL; /* No errors. */
/* Issue the PSYNC command */
reply = sendSynchronousCommand(fd,"PSYNC",psync_runid,psync_offset,NULL);
if (!strncmp(reply,"+FULLRESYNC",11)) {
char *runid, *offset;
/* FULL RESYNC, parse the reply in order to extract the run id
* and the replication offset. */
runid = strchr(reply,' ');
if (runid) {
runid++;
offset = strchr(runid,' ');
if (offset) offset++;
}
if (!runid || !offset || (offset-runid-1) != REDIS_RUN_ID_SIZE) {
redisLog(REDIS_WARNING,
"Master replied with wrong +FULLRESYNC syntax.");
} else {
memcpy(server.repl_master_runid, runid, offset-runid-1);
server.repl_master_runid[REDIS_RUN_ID_SIZE] = '\0';
server.repl_master_initial_offset = strtoll(offset,NULL,10);
redisLog(REDIS_NOTICE,"Full resync from master: %s:%lld",
server.repl_master_runid,
server.repl_master_initial_offset);
}
/* We are going to full resync, discard the cached master structure. */
replicationDiscardCachedMaster();
sdsfree(reply);
return PSYNC_FULLRESYNC;
}
if (!strncmp(reply,"+CONTINUE",9)) {
/* Partial resync was accepted, set the replication state accordingly */
redisLog(REDIS_NOTICE,
"Successful partial resynchronization with master.");
sdsfree(reply);
replicationResurrectCachedMaster(fd);
return PSYNC_CONTINUE;
}
/* If we reach this point we receied either an error since the master does
* not understand PSYNC, or an unexpected reply from the master.
* Reply with PSYNC_NOT_SUPPORTED in both cases. */
if (strncmp(reply,"-ERR",4)) {
/* If it's not an error, log the unexpected event. */
redisLog(REDIS_WARNING,
"Unexpected reply to PSYNC from master: %s", reply);
} else {
redisLog(REDIS_NOTICE,
"Master does not support PSYNC or is in "
"error state (reply: %s)", reply);
}
sdsfree(reply);
replicationDiscardCachedMaster();
return PSYNC_NOT_SUPPORTED;
}
void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
char tmpfile[256], *err;
int dfd, maxtries = 5;
int sockerr = 0;
int sockerr = 0, psync_result;
socklen_t errlen = sizeof(sockerr);
REDIS_NOTUSED(el);
REDIS_NOTUSED(privdata);
@ -600,11 +1041,12 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
/* AUTH with the master if required. */
if(server.masterauth) {
err = sendSynchronousCommand(fd,"AUTH",server.masterauth,NULL);
if (err) {
if (err[0] == '-') {
redisLog(REDIS_WARNING,"Unable to AUTH to MASTER: %s",err);
sdsfree(err);
goto error;
}
sdsfree(err);
}
/* Set the slave port, so that Master's INFO command can list the
@ -616,17 +1058,33 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
sdsfree(port);
/* Ignore the error if any, not all the Redis versions support
* REPLCONF listening-port. */
if (err) {
redisLog(REDIS_NOTICE,"(non critical): Master does not understand REPLCONF listening-port: %s", err);
sdsfree(err);
if (err[0] == '-') {
redisLog(REDIS_NOTICE,"(Non critical) Master does not understand REPLCONF listening-port: %s", err);
}
sdsfree(err);
}
/* Issue the SYNC command */
if (syncWrite(fd,"SYNC\r\n",6,server.repl_syncio_timeout*1000) == -1) {
redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
strerror(errno));
goto error;
/* Try a partial resynchonization. If we don't have a cached master
* slaveTryPartialResynchronization() will at least try to use PSYNC
* to start a full resynchronization so that we get the master run id
* and the global offset, to try a partial resync at the next
* reconnection attempt. */
psync_result = slaveTryPartialResynchronization(fd);
if (psync_result == PSYNC_CONTINUE) {
redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Master accepted a Partial Resynchronization.");
return;
}
/* Fall back to SYNC if needed. Otherwise psync_result == PSYNC_FULLRESYNC
* and the server.repl_master_runid and repl_master_initial_offset are
* already populated. */
if (psync_result == PSYNC_NOT_SUPPORTED) {
redisLog(REDIS_NOTICE,"Retrying with SYNC...");
if (syncWrite(fd,"SYNC\r\n",6,server.repl_syncio_timeout*1000) == -1) {
redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
strerror(errno));
goto error;
}
}
/* Prepare a suitable temp file for bulk transfer */
@ -733,6 +1191,7 @@ void slaveofCommand(redisClient *c) {
sdsfree(server.masterhost);
server.masterhost = NULL;
if (server.master) freeClient(server.master);
replicationDiscardCachedMaster();
cancelReplicationHandshake();
server.repl_state = REDIS_REPL_NONE;
redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
@ -757,6 +1216,7 @@ void slaveofCommand(redisClient *c) {
server.masterport = port;
if (server.master) freeClient(server.master);
disconnectSlaves(); /* Force our slaves to resync with us as well. */
replicationDiscardCachedMaster(); /* Don't try a PSYNC. */
cancelReplicationHandshake();
server.repl_state = REDIS_REPL_CONNECT;
redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
@ -765,6 +1225,92 @@ void slaveofCommand(redisClient *c) {
addReply(c,shared.ok);
}
/* ---------------------- MASTER CACHING FOR PSYNC -------------------------- */
/* In order to implement partial synchronization we need to be able to cache
* our master's client structure after a transient disconnection.
* It is cached into server.cached_master and flushed away using the following
* functions. */
/* This function is called by freeClient() in order to cache the master
* client structure instead of destryoing it. freeClient() will return
* ASAP after this function returns, so every action needed to avoid problems
* with a client that is really "suspended" has to be done by this function.
*
* The other functions that will deal with the cached master are:
*
* replicationDiscardCachedMaster() that will make sure to kill the client
* as for some reason we don't want to use it in the future.
*
* replicationResurrectCachedMaster() that is used after a successful PSYNC
* handshake in order to reactivate the cached master.
*/
void replicationCacheMaster(redisClient *c) {
listNode *ln;
redisAssert(server.master != NULL && server.cached_master == NULL);
redisLog(REDIS_NOTICE,"Caching the disconnected master state.");
/* Remove from the list of clients, we don't want this client to be
* listed by CLIENT LIST or processed in any way by batch operations. */
ln = listSearchKey(server.clients,c);
redisAssert(ln != NULL);
listDelNode(server.clients,ln);
/* Save the master. Server.master will be set to null later by
* replicationHandleMasterDisconnection(). */
server.cached_master = server.master;
/* Remove the event handlers and close the socket. We'll later reuse
* the socket of the new connection with the master during PSYNC. */
aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
close(c->fd);
/* Set fd to -1 so that we can safely call freeClient(c) later. */
c->fd = -1;
/* Caching the master happens instead of the actual freeClient() call,
* so make sure to adjust the replication state. This function will
* also set server.master to NULL. */
replicationHandleMasterDisconnection();
}
/* Free a cached master, called when there are no longer the conditions for
* a partial resync on reconnection. */
void replicationDiscardCachedMaster(void) {
if (server.cached_master == NULL) return;
redisLog(REDIS_NOTICE,"Discarding previously cached master state.");
server.cached_master->flags &= ~REDIS_MASTER;
freeClient(server.cached_master);
server.cached_master = NULL;
}
/* Turn the cached master into the current master, using the file descriptor
* passed as argument as the socket for the new master.
*
* This funciton is called when successfully setup a partial resynchronization
* so the stream of data that we'll receive will start from were this
* master left. */
void replicationResurrectCachedMaster(int newfd) {
server.master = server.cached_master;
server.cached_master = NULL;
server.master->fd = newfd;
server.master->flags &= ~(REDIS_CLOSE_AFTER_REPLY|REDIS_CLOSE_ASAP);
server.master->authenticated = 1;
server.master->lastinteraction = server.unixtime;
server.repl_state = REDIS_REPL_CONNECTED;
/* Re-add to the list of clients. */
listAddNodeTail(server.clients,server.master);
if (aeCreateFileEvent(server.el, newfd, AE_READABLE,
readQueryFromClient, server.master)) {
redisLog(REDIS_WARNING,"Error resurrecting the cached master, impossible to add the readable handler: %s", strerror(errno));
freeClientAsync(server.master); /* Close ASAP. */
}
}
/* --------------------------- REPLICATION CRON ---------------------------- */
void replicationCron(void) {
@ -816,8 +1362,8 @@ void replicationCron(void) {
replicationFeedSlaves(server.slaves, server.slaveseldb, ping_argv, 1);
decrRefCount(ping_argv[0]);
/* Second, send a newline to all the slaves in pre-synchronization stage,
* that is, slaves waiting for the master to create the RDB file.
/* Second, send a newline to all the slaves in pre-synchronization
* stage, that is, slaves waiting for the master to create the RDB file.
* The newline will be ignored by the slave but will refresh the
* last-io timer preventing a timeout. */
listRewind(server.slaves,&li);
@ -832,4 +1378,19 @@ void replicationCron(void) {
}
}
}
/* If we have no attached slaves and there is a replication backlog
* using memory, free it after some (configured) time. */
if (listLength(server.slaves) == 0 && server.repl_backlog_time_limit &&
server.repl_backlog)
{
time_t idle = server.unixtime - server.repl_no_slaves_since;
if (idle > server.repl_backlog_time_limit) {
freeReplicationBacklog();
redisLog(REDIS_NOTICE,
"Replication backlog freed after %d seconds "
"without connected slaves.", server.repl_backlog_time_limit);
}
}
}