Run active defrag while blocked / loading (#7726)
During long running scripts or loading RDB/AOF, we may need to do some defragging. Since processEventsWhileBlocked is called periodically at unknown intervals, and many cron jobs either depend on run_with_period (including active defrag), or rely on being called at server.hz rate (i.e. active defrag knows ho much time to run by looking at server.hz), the whileBlockedCron may have to run a loop triggering the cron jobs in it (currently only active defrag) several times. Other changes: - Adding a test for defrag during aof loading. - Changing key-load-delay config to take negative values for fractions of a microsecond sleep
This commit is contained in:
parent
d52ce4ea1a
commit
9ef8d2f671
@ -773,7 +773,6 @@ int loadAppendOnlyFile(char *filename) {
|
||||
if (!(loops++ % 1000)) {
|
||||
loadingProgress(ftello(fp));
|
||||
processEventsWhileBlocked();
|
||||
loadingCron();
|
||||
processModuleLoadingProgressEvent(1);
|
||||
}
|
||||
|
||||
@ -859,7 +858,7 @@ int loadAppendOnlyFile(char *filename) {
|
||||
fakeClient->cmd = NULL;
|
||||
if (server.aof_load_truncated) valid_up_to = ftello(fp);
|
||||
if (server.key_load_delay)
|
||||
usleep(server.key_load_delay);
|
||||
debugDelay(server.key_load_delay);
|
||||
}
|
||||
|
||||
/* This point can only be reached when EOF is reached without errors.
|
||||
|
@ -2319,8 +2319,8 @@ standardConfig configs[] = {
|
||||
createIntConfig("repl-timeout", NULL, MODIFIABLE_CONFIG, 1, INT_MAX, server.repl_timeout, 60, INTEGER_CONFIG, NULL, NULL),
|
||||
createIntConfig("repl-ping-replica-period", "repl-ping-slave-period", MODIFIABLE_CONFIG, 1, INT_MAX, server.repl_ping_slave_period, 10, INTEGER_CONFIG, NULL, NULL),
|
||||
createIntConfig("list-compress-depth", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.list_compress_depth, 0, INTEGER_CONFIG, NULL, NULL),
|
||||
createIntConfig("rdb-key-save-delay", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.rdb_key_save_delay, 0, INTEGER_CONFIG, NULL, NULL),
|
||||
createIntConfig("key-load-delay", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.key_load_delay, 0, INTEGER_CONFIG, NULL, NULL),
|
||||
createIntConfig("rdb-key-save-delay", NULL, MODIFIABLE_CONFIG, INT_MIN, INT_MAX, server.rdb_key_save_delay, 0, INTEGER_CONFIG, NULL, NULL),
|
||||
createIntConfig("key-load-delay", NULL, MODIFIABLE_CONFIG, INT_MIN, INT_MAX, server.key_load_delay, 0, INTEGER_CONFIG, NULL, NULL),
|
||||
createIntConfig("active-expire-effort", NULL, MODIFIABLE_CONFIG, 1, 10, server.active_expire_effort, 1, INTEGER_CONFIG, NULL, NULL), /* From 1 to 10. */
|
||||
createIntConfig("hz", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.config_hz, CONFIG_DEFAULT_HZ, INTEGER_CONFIG, NULL, updateHZ),
|
||||
createIntConfig("min-replicas-to-write", "min-slaves-to-write", MODIFIABLE_CONFIG, 0, INT_MAX, server.repl_min_slaves_to_write, 0, INTEGER_CONFIG, NULL, updateGoodSlaves),
|
||||
|
@ -1804,3 +1804,12 @@ void disableWatchdog(void) {
|
||||
sigaction(SIGALRM, &act, NULL);
|
||||
server.watchdog_period = 0;
|
||||
}
|
||||
|
||||
/* Positive input is sleep time in microseconds. Negative input is fractions
|
||||
* of microseconds, i.e. -10 means 100 nanoseconds. */
|
||||
void debugDelay(int usec) {
|
||||
/* Since even the shortest sleep results in context switch and system call,
|
||||
* the way we achive short sleeps is by statistically sleeping less often. */
|
||||
if (usec < 0) usec = (rand() % -usec) == 0 ? 1: 0;
|
||||
if (usec) usleep(usec);
|
||||
}
|
||||
|
@ -2920,6 +2920,9 @@ void processEventsWhileBlocked(void) {
|
||||
long long events = server.events_processed_while_blocked - startval;
|
||||
if (!events) break;
|
||||
}
|
||||
|
||||
whileBlockedCron();
|
||||
|
||||
ProcessingEventsWhileBlocked = 0;
|
||||
}
|
||||
|
||||
|
@ -1084,7 +1084,7 @@ int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime) {
|
||||
|
||||
/* Delay return if required (for testing) */
|
||||
if (server.rdb_key_save_delay)
|
||||
usleep(server.rdb_key_save_delay);
|
||||
debugDelay(server.rdb_key_save_delay);
|
||||
|
||||
return 1;
|
||||
}
|
||||
@ -1997,6 +1997,7 @@ void startLoading(size_t size, int rdbflags) {
|
||||
server.loading_start_time = time(NULL);
|
||||
server.loading_loaded_bytes = 0;
|
||||
server.loading_total_bytes = size;
|
||||
blockingOperationStarts();
|
||||
|
||||
/* Fire the loading modules start event. */
|
||||
int subevent;
|
||||
@ -2030,6 +2031,7 @@ void loadingProgress(off_t pos) {
|
||||
/* Loading finished */
|
||||
void stopLoading(int success) {
|
||||
server.loading = 0;
|
||||
blockingOperationEnds();
|
||||
rdbFileBeingLoaded = NULL;
|
||||
|
||||
/* Fire the loading modules end event. */
|
||||
@ -2073,7 +2075,6 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) {
|
||||
replicationSendNewlineToMaster();
|
||||
loadingProgress(r->processed_bytes);
|
||||
processEventsWhileBlocked();
|
||||
loadingCron();
|
||||
processModuleLoadingProgressEvent(0);
|
||||
}
|
||||
}
|
||||
@ -2336,7 +2337,8 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
|
||||
|
||||
/* Loading the database more slowly is useful in order to test
|
||||
* certain edge cases. */
|
||||
if (server.key_load_delay) usleep(server.key_load_delay);
|
||||
if (server.key_load_delay)
|
||||
debugDelay(server.key_load_delay);
|
||||
|
||||
/* Reset the state that is key-specified and is populated by
|
||||
* opcodes before the key, so that we start from scratch again. */
|
||||
|
@ -1424,6 +1424,7 @@ void luaMaskCountHook(lua_State *lua, lua_Debug *ar) {
|
||||
"Script SHA1 is: %s",
|
||||
elapsed, server.lua_cur_script);
|
||||
server.lua_timedout = 1;
|
||||
blockingOperationStarts();
|
||||
/* Once the script timeouts we reenter the event loop to permit others
|
||||
* to call SCRIPT KILL or SHUTDOWN NOSAVE if needed. For this reason
|
||||
* we need to mask the client executing the script from the event loop.
|
||||
@ -1575,6 +1576,7 @@ void evalGenericCommand(client *c, int evalsha) {
|
||||
if (delhook) lua_sethook(lua,NULL,0,0); /* Disable hook */
|
||||
if (server.lua_timedout) {
|
||||
server.lua_timedout = 0;
|
||||
blockingOperationEnds();
|
||||
/* Restore the client that was protected when the script timeout
|
||||
* was detected. */
|
||||
unprotectClient(c);
|
||||
|
61
src/server.c
61
src/server.c
@ -2109,22 +2109,62 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
|
||||
return 1000/server.hz;
|
||||
}
|
||||
|
||||
/* This function fill in the role of serverCron during RDB or AOF loading.
|
||||
|
||||
void blockingOperationStarts() {
|
||||
updateCachedTime(0);
|
||||
server.blocked_last_cron = server.mstime;
|
||||
}
|
||||
|
||||
void blockingOperationEnds() {
|
||||
server.blocked_last_cron = 0;
|
||||
}
|
||||
|
||||
/* This function fill in the role of serverCron during RDB or AOF loading, and
|
||||
* also during blocked scripts.
|
||||
* It attempts to do its duties at a similar rate as the configured server.hz,
|
||||
* and updates cronloops variable so that similarly to serverCron, the
|
||||
* run_with_period can be used. */
|
||||
void loadingCron() {
|
||||
long long now = server.ustime;
|
||||
static long long next_event = 0;
|
||||
if (now >= next_event) {
|
||||
cronUpdateMemoryStats();
|
||||
|
||||
void whileBlockedCron() {
|
||||
/* Here we may want to perform some cron jobs (normally done server.hz times
|
||||
* per second). */
|
||||
|
||||
/* Since this function depends on a call to blockingOperationStarts, let's
|
||||
* make sure it was done. */
|
||||
serverAssert(server.blocked_last_cron);
|
||||
|
||||
/* In case we where called too soon, leave right away. This way one time
|
||||
* jobs after the loop below don't need an if. and we don't bother to start
|
||||
* latency monitor if this function is called too often. */
|
||||
if (server.blocked_last_cron >= server.mstime)
|
||||
return;
|
||||
|
||||
mstime_t latency;
|
||||
latencyStartMonitor(latency);
|
||||
|
||||
/* In some cases we may be called with big intervals, so we may need to do
|
||||
* extra work here. This is because some of the functions in serverCron rely
|
||||
* on the fact that it is performed every 10 ms or so. For instance, if
|
||||
* activeDefragCycle needs to utilize 25% cpu, it will utilize 2.5ms, so we
|
||||
* need to call it multiple times. */
|
||||
long hz_ms = 1000/server.hz;
|
||||
while (server.blocked_last_cron < server.mstime) {
|
||||
|
||||
/* Defrag keys gradually. */
|
||||
activeDefragCycle();
|
||||
|
||||
server.blocked_last_cron += hz_ms;
|
||||
/* Increment cronloop so that run_with_period works. */
|
||||
server.cronloops++;
|
||||
|
||||
/* Decide when the next event should fire. */
|
||||
next_event = now + 1000000 / server.hz;
|
||||
}
|
||||
|
||||
/* Other cron jobs do not need to be done in a loop. No need to check
|
||||
* server.blocked_last_cron since we have an early exit at the top. */
|
||||
|
||||
/* Update memory stats during loading (excluding blocked scripts) */
|
||||
if (server.loading) cronUpdateMemoryStats();
|
||||
|
||||
latencyEndMonitor(latency);
|
||||
latencyAddSampleIfNeeded("while-blocked-cron",latency);
|
||||
}
|
||||
|
||||
extern int ProcessingEventsWhileBlocked;
|
||||
@ -2830,6 +2870,7 @@ void resetServerStats(void) {
|
||||
server.stat_net_output_bytes = 0;
|
||||
server.stat_unexpected_error_replies = 0;
|
||||
server.aof_delayed_fsync = 0;
|
||||
server.blocked_last_cron = 0;
|
||||
}
|
||||
|
||||
void initServer(void) {
|
||||
|
11
src/server.h
11
src/server.h
@ -1265,9 +1265,11 @@ struct redisServer {
|
||||
char *rdb_pipe_buff; /* In diskless replication, this buffer holds data */
|
||||
int rdb_pipe_bufflen; /* that was read from the the rdb pipe. */
|
||||
int rdb_key_save_delay; /* Delay in microseconds between keys while
|
||||
* writing the RDB. (for testings) */
|
||||
* writing the RDB. (for testings). negative
|
||||
* value means fractions of microsecons (on average). */
|
||||
int key_load_delay; /* Delay in microseconds between keys while
|
||||
* loading aof or rdb. (for testings) */
|
||||
* loading aof or rdb. (for testings). negative
|
||||
* value means fractions of microsecons (on average). */
|
||||
/* Pipe and data structures for child -> parent info sharing. */
|
||||
int child_info_pipe[2]; /* Pipe used to write the child_info_data. */
|
||||
struct {
|
||||
@ -1393,6 +1395,7 @@ struct redisServer {
|
||||
int daylight_active; /* Currently in daylight saving time. */
|
||||
mstime_t mstime; /* 'unixtime' in milliseconds. */
|
||||
ustime_t ustime; /* 'unixtime' in microseconds. */
|
||||
long long blocked_last_cron; /* Indicate the mstime of the last time we did cron jobs from a blocking operation */
|
||||
/* Pubsub */
|
||||
dict *pubsub_channels; /* Map channels to list of subscribed clients */
|
||||
list *pubsub_patterns; /* A list of pubsub_patterns */
|
||||
@ -1698,6 +1701,9 @@ void pauseClients(mstime_t duration);
|
||||
int clientsArePaused(void);
|
||||
void processEventsWhileBlocked(void);
|
||||
void loadingCron(void);
|
||||
void whileBlockedCron();
|
||||
void blockingOperationStarts();
|
||||
void blockingOperationEnds();
|
||||
int handleClientsWithPendingWrites(void);
|
||||
int handleClientsWithPendingWritesUsingThreads(void);
|
||||
int handleClientsWithPendingReadsUsingThreads(void);
|
||||
@ -2464,6 +2470,7 @@ int memtest_preserving_test(unsigned long *m, size_t bytes, int passes);
|
||||
void mixDigest(unsigned char *digest, void *ptr, size_t len);
|
||||
void xorDigest(unsigned char *digest, void *ptr, size_t len);
|
||||
int populateCommandTableParseFlags(struct redisCommand *c, char *strflags);
|
||||
void debugDelay(int usec);
|
||||
|
||||
/* TLS stuff */
|
||||
void tlsInit(void);
|
||||
|
@ -455,3 +455,14 @@ proc start_bg_complex_data {host port db ops} {
|
||||
proc stop_bg_complex_data {handle} {
|
||||
catch {exec /bin/kill -9 $handle}
|
||||
}
|
||||
|
||||
proc populate {num prefix size} {
|
||||
set rd [redis_deferring_client]
|
||||
for {set j 0} {$j < $num} {incr j} {
|
||||
$rd set $prefix$j [string repeat A $size]
|
||||
}
|
||||
for {set j 0} {$j < $num} {incr j} {
|
||||
$rd read
|
||||
}
|
||||
$rd close
|
||||
}
|
||||
|
@ -37,10 +37,9 @@ start_server {tags {"memefficiency"}} {
|
||||
}
|
||||
|
||||
run_solo {defrag} {
|
||||
start_server {tags {"defrag"}} {
|
||||
start_server {tags {"defrag"} overrides {appendonly yes auto-aof-rewrite-percentage 0 save ""}} {
|
||||
if {[string match {*jemalloc*} [s mem_allocator]]} {
|
||||
test "Active defrag" {
|
||||
r config set save "" ;# prevent bgsave from interfereing with save below
|
||||
r config set hz 100
|
||||
r config set activedefrag no
|
||||
r config set active-defrag-threshold-lower 5
|
||||
@ -49,9 +48,9 @@ start_server {tags {"defrag"}} {
|
||||
r config set active-defrag-ignore-bytes 2mb
|
||||
r config set maxmemory 100mb
|
||||
r config set maxmemory-policy allkeys-lru
|
||||
r debug populate 700000 asdf1 150
|
||||
r debug populate 170000 asdf2 300
|
||||
r ping ;# trigger eviction following the previous population
|
||||
|
||||
populate 700000 asdf1 150
|
||||
populate 170000 asdf2 300
|
||||
after 120 ;# serverCron only updates the info once in 100ms
|
||||
set frag [s allocator_frag_ratio]
|
||||
if {$::verbose} {
|
||||
@ -107,19 +106,57 @@ start_server {tags {"defrag"}} {
|
||||
# due to high fragmentation, 100hz, and active-defrag-cycle-max set to 75,
|
||||
# we expect max latency to be not much higher than 7.5ms but due to rare slowness threshold is set higher
|
||||
assert {$max_latency <= 30}
|
||||
} else {
|
||||
set _ ""
|
||||
}
|
||||
# verify the data isn't corrupted or changed
|
||||
set newdigest [r debug digest]
|
||||
assert {$digest eq $newdigest}
|
||||
r save ;# saving an rdb iterates over all the data / pointers
|
||||
} {OK}
|
||||
|
||||
# if defrag is supported, test AOF loading too
|
||||
if {[r config get activedefrag] eq "activedefrag yes"} {
|
||||
# reset stats and load the AOF file
|
||||
r config resetstat
|
||||
r config set key-load-delay -50 ;# sleep on average 1/50 usec
|
||||
r debug loadaof
|
||||
r config set activedefrag no
|
||||
# measure hits and misses right after aof loading
|
||||
set misses [s active_defrag_misses]
|
||||
set hits [s active_defrag_hits]
|
||||
|
||||
after 120 ;# serverCron only updates the info once in 100ms
|
||||
set frag [s allocator_frag_ratio]
|
||||
set max_latency 0
|
||||
foreach event [r latency latest] {
|
||||
lassign $event eventname time latency max
|
||||
if {$eventname == "loading-cron"} {
|
||||
set max_latency $max
|
||||
}
|
||||
}
|
||||
if {$::verbose} {
|
||||
puts "AOF loading:"
|
||||
puts "frag $frag"
|
||||
puts "hits: $hits"
|
||||
puts "misses: $misses"
|
||||
puts "max latency $max_latency"
|
||||
puts [r latency latest]
|
||||
puts [r latency history loading-cron]
|
||||
}
|
||||
# make sure we had defrag hits during AOF loading
|
||||
assert {$hits > 100000}
|
||||
# make sure the defragger did enough work to keep the fragmentation low during loading.
|
||||
# we cannot check that it went all the way down, since we don't wait for full defrag cycle to complete.
|
||||
assert {$frag < 1.4}
|
||||
# since the AOF contains simple (fast) SET commands (and the cron during loading runs every 1000 commands),
|
||||
# it'll still not block the loading for long periods of time.
|
||||
assert {$max_latency <= 30}
|
||||
}
|
||||
}
|
||||
r config set appendonly no
|
||||
r config set key-load-delay 0
|
||||
|
||||
test "Active defrag big keys" {
|
||||
r flushdb
|
||||
r config resetstat
|
||||
r config set save "" ;# prevent bgsave from interfereing with save below
|
||||
r config set hz 100
|
||||
r config set activedefrag no
|
||||
r config set active-defrag-max-scan-fields 1000
|
||||
@ -247,7 +284,6 @@ start_server {tags {"defrag"}} {
|
||||
test "Active defrag big list" {
|
||||
r flushdb
|
||||
r config resetstat
|
||||
r config set save "" ;# prevent bgsave from interfereing with save below
|
||||
r config set hz 100
|
||||
r config set activedefrag no
|
||||
r config set active-defrag-max-scan-fields 1000
|
||||
@ -354,7 +390,6 @@ start_server {tags {"defrag"}} {
|
||||
start_server {tags {"defrag"}} {
|
||||
r flushdb
|
||||
r config resetstat
|
||||
r config set save "" ;# prevent bgsave from interfereing with save below
|
||||
r config set hz 100
|
||||
r config set activedefrag no
|
||||
r config set active-defrag-max-scan-fields 1000
|
||||
|
Loading…
Reference in New Issue
Block a user