Slaves heartbeats during sync improved.

The previous fix for false positive timeout detected by master was not
complete. There is another blocking stage while loading data for the
first synchronization with the master, that is, flushing away the
current data from the DB memory.

This commit uses the newly introduced dict.c callback in order to make
some incremental work (to send "\n" heartbeats to the master) while
flushing the old data from memory.

It is hard to write a regression test for this issue unfortunately. More
support for debugging in the Redis core would be needed in terms of
functionalities to simulate a slow DB loading / deletion.
This commit is contained in:
antirez 2013-12-10 18:38:26 +01:00
parent 2eb781b35b
commit 11120689c4
3 changed files with 29 additions and 15 deletions

View File

@ -1065,20 +1065,8 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) {
if (server.loading_process_events_interval_bytes &&
(r->processed_bytes + len)/server.loading_process_events_interval_bytes > r->processed_bytes/server.loading_process_events_interval_bytes)
{
if (server.masterhost && server.repl_state == REDIS_REPL_TRANSFER) {
static time_t newline_sent;
/* Avoid the master to detect the slave is timing out while
* loading the RDB file in initial synchronization. We send
* a single newline character that is valid protocol but is
* guaranteed to either be sent entierly or not, since the byte
* is indivisible. */
if (time(NULL) != newline_sent) {
newline_sent = time(NULL);
if (write(server.repl_transfer_s,"\n",1) == -1) {
/* Pinging back in this stage is best-effort. */
}
}
}
if (server.masterhost && server.repl_state == REDIS_REPL_TRANSFER)
replicationSendNewlineToMaster();
loadingProgress(r->processed_bytes);
aeProcessEvents(server.el, AE_FILE_EVENTS|AE_DONT_WAIT);
}

View File

@ -1070,6 +1070,7 @@ int replicationScriptCacheExists(sds sha1);
void processClientsWaitingReplicas(void);
void unblockClientWaitingReplicas(redisClient *c);
int replicationCountAcksByOffset(long long offset);
void replicationSendNewlineToMaster(void);
/* Generic persistence functions */
void startLoading(FILE *fp);

View File

@ -715,6 +715,31 @@ void replicationAbortSyncTransfer(void) {
server.repl_state = REDIS_REPL_CONNECT;
}
/* Avoid the master to detect the slave is timing out while loading the
* RDB file in initial synchronization. We send a single newline character
* that is valid protocol but is guaranteed to either be sent entierly or
* not, since the byte is indivisible.
*
* The function is called in two contexts: while we flush the current
* data with emptyDb(), and while we load the new data received as an
* RDB file from the master. */
void replicationSendNewlineToMaster(void) {
static time_t newline_sent;
if (time(NULL) != newline_sent) {
newline_sent = time(NULL);
if (write(server.repl_transfer_s,"\n",1) == -1) {
/* Pinging back in this stage is best-effort. */
}
}
}
/* Callback used by emptyDb() while flushing away old data to load
* the new dataset received by the master. */
void replicationEmptyDbCallback(void *privdata) {
REDIS_NOTUSED(privdata);
replicationSendNewlineToMaster();
}
/* Asynchronously read the SYNC payload we receive from a master */
#define REPL_MAX_WRITTEN_BEFORE_FSYNC (1024*1024*8) /* 8 MB */
void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
@ -796,7 +821,7 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
}
redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Flushing old data");
signalFlushedDb(-1);
emptyDb(NULL);
emptyDb(replicationEmptyDbCallback);
/* Before loading the DB into memory we need to delete the readable
* handler, otherwise it will get called recursively since
* rdbLoad() will call the event loop to process events from time to