Merge pull request #7390 from oranagra/exec_fails_abort

EXEC always fails with EXECABORT and multi-state is cleared
This commit is contained in:
Salvatore Sanfilippo 2020-06-23 13:12:52 +02:00 committed by GitHub
commit 6bbbdd26f4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 204 additions and 91 deletions

View File

@ -36,6 +36,7 @@ void initClientMultiState(client *c) {
c->mstate.commands = NULL;
c->mstate.count = 0;
c->mstate.cmd_flags = 0;
c->mstate.cmd_inv_flags = 0;
}
/* Release all the resources associated with MULTI/EXEC state */
@ -76,6 +77,7 @@ void queueMultiCommand(client *c) {
incrRefCount(mc->argv[j]);
c->mstate.count++;
c->mstate.cmd_flags |= c->cmd->flags;
c->mstate.cmd_inv_flags |= ~c->cmd->flags;
}
void discardTransaction(client *c) {
@ -122,6 +124,23 @@ void execCommandPropagateExec(client *c) {
PROPAGATE_AOF|PROPAGATE_REPL);
}
/* Aborts a transaction, with a specific error message.
* The transaction is always aboarted with -EXECABORT so that the client knows
* the server exited the multi state, but the actual reason for the abort is
* included too. */
void execCommandAbort(client *c, sds error) {
discardTransaction(c);
if (error[0] == '-') error++;
addReplyErrorFormat(c, "-EXECABORT Transaction discarded because of: %s", error);
/* Send EXEC to clients waiting data from MONITOR. We did send a MULTI
* already, and didn't send any of the queued commands, now we'll just send
* EXEC so it is clear that the transaction is over. */
if (listLength(server.monitors) && !server.loading)
replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
}
void execCommand(client *c) {
int j;
robj **orig_argv;
@ -135,15 +154,6 @@ void execCommand(client *c) {
return;
}
/* If we are in -BUSY state, flag the transaction and return the
* -BUSY error, like Redis <= 5. This is a temporary fix, may be changed
* ASAP, see issue #7353 on Github. */
if (server.lua_timedout) {
flagTransaction(c);
addReply(c, shared.slowscripterr);
return;
}
/* Check if we need to abort the EXEC because:
* 1) Some WATCHed key was touched.
* 2) There was a previous error while queueing commands.
@ -157,21 +167,6 @@ void execCommand(client *c) {
goto handle_monitor;
}
/* If there are write commands inside the transaction, and this is a read
* only slave, we want to send an error. This happens when the transaction
* was initiated when the instance was a master or a writable replica and
* then the configuration changed (for example instance was turned into
* a replica). */
if (!server.loading && server.masterhost && server.repl_slave_ro &&
!(c->flags & CLIENT_MASTER) && c->mstate.cmd_flags & CMD_WRITE)
{
addReplyError(c,
"Transaction contains write commands but instance "
"is now a read-only replica. EXEC aborted.");
discardTransaction(c);
goto handle_monitor;
}
/* Exec all the queued commands */
unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */
orig_argv = c->argv;

View File

@ -407,19 +407,23 @@ void addReplyError(client *c, const char *err) {
addReplyErrorLength(c,err,strlen(err));
}
/* See addReplyErrorLength.
* Makes sure there are no newlines in the string, otherwise invalid protocol
* is emitted. */
void addReplyErrorSafe(client *c, char *s, size_t len) {
size_t j;
for (j = 0; j < len; j++) {
if (s[j] == '\r' || s[j] == '\n') s[j] = ' ';
}
addReplyErrorLength(c,s,sdslen(s));
}
void addReplyErrorFormat(client *c, const char *fmt, ...) {
size_t l, j;
va_list ap;
va_start(ap,fmt);
sds s = sdscatvprintf(sdsempty(),fmt,ap);
va_end(ap);
/* Make sure there are no newlines in the string, otherwise invalid protocol
* is emitted. */
l = sdslen(s);
for (j = 0; j < l; j++) {
if (s[j] == '\r' || s[j] == '\n') s[j] = ' ';
}
addReplyErrorLength(c,s,sdslen(s));
addReplyErrorSafe(c, s, sdslen(s));
sdsfree(s);
}

View File

@ -3402,6 +3402,34 @@ void call(client *c, int flags) {
server.stat_numcommands++;
}
/* Used when a command that is ready for execution needs to be rejected, due to
* varios pre-execution checks. it returns the appropriate error to the client.
* If there's a transaction is flags it as dirty, and if the command is EXEC,
* it aborts the transaction. */
void rejectCommand(client *c, robj *reply) {
flagTransaction(c);
if (c->cmd && c->cmd->proc == execCommand) {
execCommandAbort(c, reply->ptr);
} else {
/* using addReplyError* rather than addReply so that the error can be logged. */
addReplyErrorSafe(c, reply->ptr, sdslen(reply->ptr));
}
}
void rejectCommandFormat(client *c, const char *fmt, ...) {
flagTransaction(c);
va_list ap;
va_start(ap,fmt);
sds s = sdscatvprintf(sdsempty(),fmt,ap);
va_end(ap);
if (c->cmd && c->cmd->proc == execCommand) {
execCommandAbort(c, s);
} else {
addReplyErrorSafe(c, s, sdslen(s));
}
sdsfree(s);
}
/* If this function gets called we already read a whole
* command, arguments are in the client argv/argc fields.
* processCommand() execute the command or prepare the
@ -3427,23 +3455,30 @@ int processCommand(client *c) {
* such as wrong arity, bad command name and so forth. */
c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
if (!c->cmd) {
flagTransaction(c);
sds args = sdsempty();
int i;
for (i=1; i < c->argc && sdslen(args) < 128; i++)
args = sdscatprintf(args, "`%.*s`, ", 128-(int)sdslen(args), (char*)c->argv[i]->ptr);
addReplyErrorFormat(c,"unknown command `%s`, with args beginning with: %s",
rejectCommandFormat(c,"unknown command `%s`, with args beginning with: %s",
(char*)c->argv[0]->ptr, args);
sdsfree(args);
return C_OK;
} else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
(c->argc < -c->cmd->arity)) {
flagTransaction(c);
addReplyErrorFormat(c,"wrong number of arguments for '%s' command",
rejectCommandFormat(c,"wrong number of arguments for '%s' command",
c->cmd->name);
return C_OK;
}
int is_write_command = (c->cmd->flags & CMD_WRITE) ||
(c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_WRITE));
int is_denyoom_command = (c->cmd->flags & CMD_DENYOOM) ||
(c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_DENYOOM));
int is_denystale_command = !(c->cmd->flags & CMD_STALE) ||
(c->cmd->proc == execCommand && (c->mstate.cmd_inv_flags & CMD_STALE));
int is_denyloading_command = !(c->cmd->flags & CMD_LOADING) ||
(c->cmd->proc == execCommand && (c->mstate.cmd_inv_flags & CMD_LOADING));
/* Check if the user is authenticated. This check is skipped in case
* the default user is flagged as "nopass" and is active. */
int auth_required = (!(DefaultUser->flags & USER_FLAG_NOPASS) ||
@ -3453,8 +3488,7 @@ int processCommand(client *c) {
/* AUTH and HELLO and no auth modules are valid even in
* non-authenticated state. */
if (!(c->cmd->flags & CMD_NO_AUTH)) {
flagTransaction(c);
addReply(c,shared.noautherr);
rejectCommand(c,shared.noautherr);
return C_OK;
}
}
@ -3465,13 +3499,12 @@ int processCommand(client *c) {
int acl_retval = ACLCheckCommandPerm(c,&acl_keypos);
if (acl_retval != ACL_OK) {
addACLLogEntry(c,acl_retval,acl_keypos,NULL);
flagTransaction(c);
if (acl_retval == ACL_DENIED_CMD)
addReplyErrorFormat(c,
rejectCommandFormat(c,
"-NOPERM this user has no permissions to run "
"the '%s' command or its subcommand", c->cmd->name);
else
addReplyErrorFormat(c,
rejectCommandFormat(c,
"-NOPERM this user has no permissions to access "
"one of the keys used as arguments");
return C_OK;
@ -3519,13 +3552,11 @@ int processCommand(client *c) {
* is trying to execute is denied during OOM conditions or the client
* is in MULTI/EXEC context? Error. */
if (out_of_memory &&
(c->cmd->flags & CMD_DENYOOM ||
(is_denyoom_command ||
(c->flags & CLIENT_MULTI &&
c->cmd->proc != execCommand &&
c->cmd->proc != discardCommand)))
{
flagTransaction(c);
addReply(c, shared.oomerr);
rejectCommand(c, shared.oomerr);
return C_OK;
}
@ -3546,17 +3577,14 @@ int processCommand(client *c) {
int deny_write_type = writeCommandsDeniedByDiskError();
if (deny_write_type != DISK_ERROR_TYPE_NONE &&
server.masterhost == NULL &&
(c->cmd->flags & CMD_WRITE ||
c->cmd->proc == pingCommand))
(is_write_command ||c->cmd->proc == pingCommand))
{
flagTransaction(c);
if (deny_write_type == DISK_ERROR_TYPE_RDB)
addReply(c, shared.bgsaveerr);
rejectCommand(c, shared.bgsaveerr);
else
addReplySds(c,
sdscatprintf(sdsempty(),
rejectCommandFormat(c,
"-MISCONF Errors writing to the AOF file: %s\r\n",
strerror(server.aof_last_write_errno)));
strerror(server.aof_last_write_errno));
return C_OK;
}
@ -3565,11 +3593,10 @@ int processCommand(client *c) {
if (server.masterhost == NULL &&
server.repl_min_slaves_to_write &&
server.repl_min_slaves_max_lag &&
c->cmd->flags & CMD_WRITE &&
is_write_command &&
server.repl_good_slaves_count < server.repl_min_slaves_to_write)
{
flagTransaction(c);
addReply(c, shared.noreplicaserr);
rejectCommand(c, shared.noreplicaserr);
return C_OK;
}
@ -3577,10 +3604,9 @@ int processCommand(client *c) {
* accept write commands if this is our master. */
if (server.masterhost && server.repl_slave_ro &&
!(c->flags & CLIENT_MASTER) &&
c->cmd->flags & CMD_WRITE)
is_write_command)
{
flagTransaction(c);
addReply(c, shared.roslaveerr);
rejectCommand(c, shared.roslaveerr);
return C_OK;
}
@ -3592,7 +3618,7 @@ int processCommand(client *c) {
c->cmd->proc != unsubscribeCommand &&
c->cmd->proc != psubscribeCommand &&
c->cmd->proc != punsubscribeCommand) {
addReplyErrorFormat(c,
rejectCommandFormat(c,
"Can't execute '%s': only (P)SUBSCRIBE / "
"(P)UNSUBSCRIBE / PING / QUIT are allowed in this context",
c->cmd->name);
@ -3604,17 +3630,16 @@ int processCommand(client *c) {
* link with master. */
if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED &&
server.repl_serve_stale_data == 0 &&
!(c->cmd->flags & CMD_STALE))
is_denystale_command)
{
flagTransaction(c);
addReply(c, shared.masterdownerr);
rejectCommand(c, shared.masterdownerr);
return C_OK;
}
/* Loading DB? Return an error if the command has not the
* CMD_LOADING flag. */
if (server.loading && !(c->cmd->flags & CMD_LOADING)) {
addReply(c, shared.loadingerr);
if (server.loading && is_denyloading_command) {
rejectCommand(c, shared.loadingerr);
return C_OK;
}
@ -3629,7 +3654,6 @@ int processCommand(client *c) {
c->cmd->proc != helloCommand &&
c->cmd->proc != replconfCommand &&
c->cmd->proc != multiCommand &&
c->cmd->proc != execCommand &&
c->cmd->proc != discardCommand &&
c->cmd->proc != watchCommand &&
c->cmd->proc != unwatchCommand &&
@ -3640,8 +3664,7 @@ int processCommand(client *c) {
c->argc == 2 &&
tolower(((char*)c->argv[1]->ptr)[0]) == 'k'))
{
flagTransaction(c);
addReply(c, shared.slowscripterr);
rejectCommand(c, shared.slowscripterr);
return C_OK;
}

View File

@ -666,6 +666,9 @@ typedef struct multiState {
int cmd_flags; /* The accumulated command flags OR-ed together.
So if at least a command has a given flag, it
will be set in this field. */
int cmd_inv_flags; /* Same as cmd_flags, OR-ing the ~flags. so that it
is possible to know if all the commands have a
certain flag. */
int minreplicas; /* MINREPLICAS for synchronous replication */
time_t minreplicas_timeout; /* MINREPLICAS timeout as unixtime. */
} multiState;
@ -1626,6 +1629,7 @@ void addReplyBulkLongLong(client *c, long long ll);
void addReply(client *c, robj *obj);
void addReplySds(client *c, sds s);
void addReplyBulkSds(client *c, sds s);
void addReplyErrorSafe(client *c, char *s, size_t len);
void addReplyError(client *c, const char *err);
void addReplyStatus(client *c, const char *status);
void addReplyDouble(client *c, double d);
@ -1724,6 +1728,7 @@ void touchWatchedKey(redisDb *db, robj *key);
void touchWatchedKeysOnFlush(int dbid);
void discardTransaction(client *c);
void flagTransaction(client *c);
void execCommandAbort(client *c, sds error);
void execCommandPropagateMulti(client *c);
void execCommandPropagateExec(client *c);

View File

@ -196,6 +196,21 @@ proc redis_deferring_client {args} {
return $client
}
proc redis_client {args} {
set level 0
if {[llength $args] > 0 && [string is integer [lindex $args 0]]} {
set level [lindex $args 0]
set args [lrange $args 1 end]
}
# create client that defers reading reply
set client [redis [srv $level "host"] [srv $level "port"] 0 $::tls]
# select the right db and read the response (OK)
$client select 9
return $client
}
# Provide easy access to INFO properties. Same semantic as "proc r".
proc s {args} {
set level 0

View File

@ -325,74 +325,145 @@ start_server {tags {"multi"}} {
# check that if MULTI arrives during timeout, it is either refused, or
# allowed to pass, and we don't end up executing half of the transaction
set rd1 [redis_deferring_client]
set rd2 [redis_deferring_client]
set r2 [redis_client]
r config set lua-time-limit 10
r set xx 1
$rd1 eval {while true do end} 0
after 200
catch { $rd2 multi; $rd2 read } e
catch { $rd2 incr xx; $rd2 read } e
catch { $r2 multi; } e
catch { $r2 incr xx; } e
r script kill
after 200 ; # Give some time to Lua to call the hook again...
catch { $rd2 incr xx; $rd2 read } e
catch { $rd2 exec; $rd2 read } e
catch { $r2 incr xx; } e
catch { $r2 exec; } e
assert_match {EXECABORT*previous errors*} $e
set xx [r get xx]
# make sure that either the whole transcation passed or none of it (we actually expect none)
assert { $xx == 1 || $xx == 3}
# check that the connection is no longer in multi state
$rd2 ping asdf
set pong [$rd2 read]
set pong [$r2 ping asdf]
assert_equal $pong "asdf"
$rd1 close; $r2 close
}
test {EXEC and script timeout} {
# check that if EXEC arrives during timeout, we don't end up executing
# half of the transaction, and also that we exit the multi state
set rd1 [redis_deferring_client]
set rd2 [redis_deferring_client]
set r2 [redis_client]
r config set lua-time-limit 10
r set xx 1
catch { $rd2 multi; $rd2 read } e
catch { $rd2 incr xx; $rd2 read } e
catch { $r2 multi; } e
catch { $r2 incr xx; } e
$rd1 eval {while true do end} 0
after 200
catch { $rd2 incr xx; $rd2 read } e
catch { $rd2 exec; $rd2 read } e
catch { $r2 incr xx; } e
catch { $r2 exec; } e
assert_match {EXECABORT*BUSY*} $e
r script kill
after 200 ; # Give some time to Lua to call the hook again...
set xx [r get xx]
# make sure that either the whole transcation passed or none of it (we actually expect none)
assert { $xx == 1 || $xx == 3}
# Discard the transaction since EXEC likely got -BUSY error
# so the client is still in MULTI state.
catch { $rd2 discard ;$rd2 read } e
# check that the connection is no longer in multi state
$rd2 ping asdf
set pong [$rd2 read]
set pong [$r2 ping asdf]
assert_equal $pong "asdf"
$rd1 close; $r2 close
}
test {MULTI-EXEC body and script timeout} {
# check that we don't run an imcomplete transaction due to some commands
# arriving during busy script
set rd1 [redis_deferring_client]
set rd2 [redis_deferring_client]
set r2 [redis_client]
r config set lua-time-limit 10
r set xx 1
catch { $rd2 multi; $rd2 read } e
catch { $rd2 incr xx; $rd2 read } e
catch { $r2 multi; } e
catch { $r2 incr xx; } e
$rd1 eval {while true do end} 0
after 200
catch { $rd2 incr xx; $rd2 read } e
catch { $r2 incr xx; } e
r script kill
after 200 ; # Give some time to Lua to call the hook again...
catch { $rd2 exec; $rd2 read } e
catch { $r2 exec; } e
assert_match {EXECABORT*previous errors*} $e
set xx [r get xx]
# make sure that either the whole transcation passed or none of it (we actually expect none)
assert { $xx == 1 || $xx == 3}
# check that the connection is no longer in multi state
$rd2 ping asdf
set pong [$rd2 read]
set pong [$r2 ping asdf]
assert_equal $pong "asdf"
$rd1 close; $r2 close
}
test {just EXEC and script timeout} {
# check that if EXEC arrives during timeout, we don't end up executing
# actual commands during busy script, and also that we exit the multi state
set rd1 [redis_deferring_client]
set r2 [redis_client]
r config set lua-time-limit 10
r set xx 1
catch { $r2 multi; } e
catch { $r2 incr xx; } e
$rd1 eval {while true do end} 0
after 200
catch { $r2 exec; } e
assert_match {EXECABORT*BUSY*} $e
r script kill
after 200 ; # Give some time to Lua to call the hook again...
set xx [r get xx]
# make we didn't execute the transaction
assert { $xx == 1}
# check that the connection is no longer in multi state
set pong [$r2 ping asdf]
assert_equal $pong "asdf"
$rd1 close; $r2 close
}
test {exec with write commands and state change} {
# check that exec that contains write commands fails if server state changed since they were queued
set r1 [redis_client]
r set xx 1
r multi
r incr xx
$r1 config set min-replicas-to-write 2
catch {r exec} e
assert_match {*EXECABORT*NOREPLICAS*} $e
set xx [r get xx]
# make sure that the INCR wasn't executed
assert { $xx == 1}
$r1 config set min-replicas-to-write 0
$r1 close;
}
test {exec with read commands and stale replica state change} {
# check that exec that contains read commands fails if server state changed since they were queued
r config set replica-serve-stale-data no
set r1 [redis_client]
r set xx 1
# check that GET is disallowed on stale replica, even if the replica becomes stale only after queuing.
r multi
r get xx
$r1 replicaof localhsot 0
catch {r exec} e
assert_match {*EXECABORT*MASTERDOWN*} $e
# check that PING is allowed
r multi
r ping
$r1 replicaof localhsot 0
set pong [r exec]
assert {$pong == "PONG"}
# check that when replica is not stale, GET is allowed
# while we're at it, let's check that multi is allowed on stale replica too
r multi
$r1 replicaof no one
r get xx
set xx [r exec]
# make sure that the INCR was executed
assert { $xx == 1 }
$r1 close;
}
}