Fix wrong offset when replica pause (#9448)
When a replica paused, it would not apply any commands event the command comes from master, if we feed the non-applied command to replication stream, the replication offset would be wrong, and data would be lost after failover(since replica's `master_repl_offset` grows but command is not applied). To fix it, here are the changes: * Don't update replica's replication offset or propagate commands to sub-replicas when it's paused in `commandProcessed`. * Show `slave_read_repl_offset` in info reply. * Add an assert to make sure master client should never be blocked unless pause or module (some modules may use block way to do background (parallel) processing and forward original block module command to the replica, it's not a good way but it can work, so the assert excludes module now, but someday in future all modules should rewrite block command to propagate like what `BLPOP` does). (cherry picked from commit 1b83353dc382959e218191f64d94edb9703552e3)
This commit is contained in:
parent
49f8f43890
commit
9b25484a13
@ -87,6 +87,11 @@ typedef struct bkinfo {
|
||||
* flag is set client query buffer is not longer processed, but accumulated,
|
||||
* and will be processed when the client is unblocked. */
|
||||
void blockClient(client *c, int btype) {
|
||||
/* Master client should never be blocked unless pause or module */
|
||||
serverAssert(!(c->flags & CLIENT_MASTER &&
|
||||
btype != BLOCKED_MODULE &&
|
||||
btype != BLOCKED_PAUSE));
|
||||
|
||||
c->flags |= CLIENT_BLOCKED;
|
||||
c->btype = btype;
|
||||
server.blocked_clients++;
|
||||
|
@ -1991,19 +1991,23 @@ int processMultibulkBuffer(client *c) {
|
||||
* 2. In the case of master clients, the replication offset is updated.
|
||||
* 3. Propagate commands we got from our master to replicas down the line. */
|
||||
void commandProcessed(client *c) {
|
||||
/* If client is blocked(including paused), just return avoid reset and replicate.
|
||||
*
|
||||
* 1. Don't reset the client structure for blocked clients, so that the reply
|
||||
* callback will still be able to access the client argv and argc fields.
|
||||
* The client will be reset in unblockClient().
|
||||
* 2. Don't update replication offset or propagate commands to replicas,
|
||||
* since we have not applied the command. */
|
||||
if (c->flags & CLIENT_BLOCKED) return;
|
||||
|
||||
resetClient(c);
|
||||
|
||||
long long prev_offset = c->reploff;
|
||||
if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {
|
||||
/* Update the applied replication offset of our master. */
|
||||
c->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos;
|
||||
}
|
||||
|
||||
/* Don't reset the client structure for blocked clients, so that the reply
|
||||
* callback will still be able to access the client argv and argc fields.
|
||||
* The client will be reset in unblockClient(). */
|
||||
if (!(c->flags & CLIENT_BLOCKED)) {
|
||||
resetClient(c);
|
||||
}
|
||||
|
||||
/* If the client is a master we need to compute the difference
|
||||
* between the applied offset before and after processing the buffer,
|
||||
* to understand how much of the replication stream was actually
|
||||
|
10
src/server.c
10
src/server.c
@ -5083,11 +5083,15 @@ sds genRedisInfoString(const char *section) {
|
||||
server.masterhost == NULL ? "master" : "slave");
|
||||
if (server.masterhost) {
|
||||
long long slave_repl_offset = 1;
|
||||
long long slave_read_repl_offset = 1;
|
||||
|
||||
if (server.master)
|
||||
if (server.master) {
|
||||
slave_repl_offset = server.master->reploff;
|
||||
else if (server.cached_master)
|
||||
slave_read_repl_offset = server.master->read_reploff;
|
||||
} else if (server.cached_master) {
|
||||
slave_repl_offset = server.cached_master->reploff;
|
||||
slave_read_repl_offset = server.cached_master->read_reploff;
|
||||
}
|
||||
|
||||
info = sdscatprintf(info,
|
||||
"master_host:%s\r\n"
|
||||
@ -5095,6 +5099,7 @@ sds genRedisInfoString(const char *section) {
|
||||
"master_link_status:%s\r\n"
|
||||
"master_last_io_seconds_ago:%d\r\n"
|
||||
"master_sync_in_progress:%d\r\n"
|
||||
"slave_read_repl_offset:%lld\r\n"
|
||||
"slave_repl_offset:%lld\r\n"
|
||||
,server.masterhost,
|
||||
server.masterport,
|
||||
@ -5103,6 +5108,7 @@ sds genRedisInfoString(const char *section) {
|
||||
server.master ?
|
||||
((int)(server.unixtime-server.master->lastinteraction)) : -1,
|
||||
server.repl_state == REPL_STATE_TRANSFER,
|
||||
slave_read_repl_offset,
|
||||
slave_repl_offset
|
||||
);
|
||||
|
||||
|
@ -195,6 +195,57 @@ start_server {tags {"pause network"}} {
|
||||
$rd close
|
||||
}
|
||||
|
||||
start_server {tags {needs:repl external:skip}} {
|
||||
set master [srv -1 client]
|
||||
set master_host [srv -1 host]
|
||||
set master_port [srv -1 port]
|
||||
|
||||
# Avoid PINGs
|
||||
$master config set repl-ping-replica-period 3600
|
||||
r replicaof $master_host $master_port
|
||||
|
||||
wait_for_condition 50 100 {
|
||||
[s master_link_status] eq {up}
|
||||
} else {
|
||||
fail "Replication not started."
|
||||
}
|
||||
|
||||
test "Test when replica paused, offset would not grow" {
|
||||
$master set foo bar
|
||||
set old_master_offset [status $master master_repl_offset]
|
||||
|
||||
wait_for_condition 50 100 {
|
||||
[s slave_repl_offset] == [status $master master_repl_offset]
|
||||
} else {
|
||||
fail "Replication offset not matched."
|
||||
}
|
||||
|
||||
r client pause 100000 write
|
||||
$master set foo2 bar2
|
||||
|
||||
# Make sure replica received data from master
|
||||
wait_for_condition 50 100 {
|
||||
[s slave_read_repl_offset] == [status $master master_repl_offset]
|
||||
} else {
|
||||
fail "Replication not work."
|
||||
}
|
||||
|
||||
# Replica would not apply the write command
|
||||
assert {[s slave_repl_offset] == $old_master_offset}
|
||||
r get foo2
|
||||
} {}
|
||||
|
||||
test "Test replica offset would grow after unpause" {
|
||||
r client unpause
|
||||
wait_for_condition 50 100 {
|
||||
[s slave_repl_offset] == [status $master master_repl_offset]
|
||||
} else {
|
||||
fail "Replication not continue."
|
||||
}
|
||||
r get foo2
|
||||
} {bar2}
|
||||
}
|
||||
|
||||
# Make sure we unpause at the end
|
||||
r client unpause
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user