diff --git a/redis.conf b/redis.conf index 1e1f5313..dbe5b588 100644 --- a/redis.conf +++ b/redis.conf @@ -1302,8 +1302,12 @@ aof-rewrite-incremental-fsync yes # active-defrag-threshold-upper 100 # Minimal effort for defrag in CPU percentage -# active-defrag-cycle-min 25 +# active-defrag-cycle-min 5 # Maximal effort for defrag in CPU percentage # active-defrag-cycle-max 75 +# Maximum number of set/hash/zset/list fields that will be processed from +# the main dictionary scan +# active-defrag-max-scan-fields 1000 + diff --git a/src/config.c b/src/config.c index eddfe1f1..7bba2cde 100644 --- a/src/config.c +++ b/src/config.c @@ -537,6 +537,12 @@ void loadServerConfigFromString(char *config) { err = "active-defrag-cycle-max must be between 1 and 99"; goto loaderr; } + } else if (!strcasecmp(argv[0],"active-defrag-max-scan-fields") && argc == 2) { + server.active_defrag_max_scan_fields = strtoll(argv[1],NULL,10); + if (server.active_defrag_max_scan_fields < 1) { + err = "active-defrag-max-scan-fields must be positive"; + goto loaderr; + } } else if (!strcasecmp(argv[0],"hash-max-ziplist-entries") && argc == 2) { server.hash_max_ziplist_entries = memtoll(argv[1], NULL); } else if (!strcasecmp(argv[0],"hash-max-ziplist-value") && argc == 2) { @@ -1058,6 +1064,8 @@ void configSetCommand(client *c) { "active-defrag-cycle-min",server.active_defrag_cycle_min,1,99) { } config_set_numerical_field( "active-defrag-cycle-max",server.active_defrag_cycle_max,1,99) { + } config_set_numerical_field( + "active-defrag-max-scan-fields",server.active_defrag_max_scan_fields,1,LLONG_MAX) { } config_set_numerical_field( "auto-aof-rewrite-percentage",server.aof_rewrite_perc,0,LLONG_MAX){ } config_set_numerical_field( @@ -1239,6 +1247,7 @@ void configGetCommand(client *c) { config_get_numerical_field("active-defrag-ignore-bytes",server.active_defrag_ignore_bytes); config_get_numerical_field("active-defrag-cycle-min",server.active_defrag_cycle_min); config_get_numerical_field("active-defrag-cycle-max",server.active_defrag_cycle_max); + config_get_numerical_field("active-defrag-max-scan-fields",server.active_defrag_max_scan_fields); config_get_numerical_field("auto-aof-rewrite-percentage", server.aof_rewrite_perc); config_get_numerical_field("auto-aof-rewrite-min-size", @@ -2013,6 +2022,7 @@ int rewriteConfig(char *path) { rewriteConfigBytesOption(state,"active-defrag-ignore-bytes",server.active_defrag_ignore_bytes,CONFIG_DEFAULT_DEFRAG_IGNORE_BYTES); rewriteConfigNumericalOption(state,"active-defrag-cycle-min",server.active_defrag_cycle_min,CONFIG_DEFAULT_DEFRAG_CYCLE_MIN); rewriteConfigNumericalOption(state,"active-defrag-cycle-max",server.active_defrag_cycle_max,CONFIG_DEFAULT_DEFRAG_CYCLE_MAX); + rewriteConfigNumericalOption(state,"active-defrag-max-scan-fields",server.active_defrag_max_scan_fields,CONFIG_DEFAULT_DEFRAG_MAX_SCAN_FIELDS); rewriteConfigYesNoOption(state,"appendonly",server.aof_state != AOF_OFF,0); rewriteConfigStringOption(state,"appendfilename",server.aof_filename,CONFIG_DEFAULT_AOF_FILENAME); rewriteConfigEnumOption(state,"appendfsync",server.aof_fsync,aof_fsync_enum,CONFIG_DEFAULT_AOF_FSYNC); diff --git a/src/defrag.c b/src/defrag.c index 3f0e6627..6d15a4c1 100644 --- a/src/defrag.c +++ b/src/defrag.c @@ -45,6 +45,10 @@ * pointers are worthwhile moving and which aren't */ int je_get_defrag_hint(void* ptr, int *bin_util, int *run_util); +/* forward declarations*/ +void defragDictBucketCallback(void *privdata, dictEntry **bucketref); +dictEntry* replaceSateliteDictKeyPtrAndOrDefragDictEntry(dict *d, sds oldkey, sds newkey, unsigned int hash, long *defragged); + /* Defrag helper for generic allocations. * * returns NULL in case the allocatoin wasn't moved. @@ -96,7 +100,7 @@ sds activeDefragSds(sds sdsptr) { * returns NULL in case the allocatoin wasn't moved. * when it returns a non-null value, the old pointer was already released * and should NOT be accessed. */ -robj *activeDefragStringOb(robj* ob, int *defragged) { +robj *activeDefragStringOb(robj* ob, long *defragged) { robj *ret = NULL; if (ob->refcount!=1) return NULL; @@ -134,11 +138,11 @@ robj *activeDefragStringOb(robj* ob, int *defragged) { /* Defrag helper for dictEntries to be used during dict iteration (called on * each step). Teturns a stat of how many pointers were moved. */ -int dictIterDefragEntry(dictIterator *iter) { +long dictIterDefragEntry(dictIterator *iter) { /* This function is a little bit dirty since it messes with the internals * of the dict and it's iterator, but the benefit is that it is very easy * to use, and require no other chagnes in the dict. */ - int defragged = 0; + long defragged = 0; dictht *ht; /* Handle the next entry (if there is one), and update the pointer in the * current entry. */ @@ -166,14 +170,9 @@ int dictIterDefragEntry(dictIterator *iter) { /* Defrag helper for dict main allocations (dict struct, and hash tables). * receives a pointer to the dict* and implicitly updates it when the dict * struct itself was moved. Returns a stat of how many pointers were moved. */ -int dictDefragTables(dict** dictRef) { - dict *d = *dictRef; +long dictDefragTables(dict* d) { dictEntry **newtable; - int defragged = 0; - /* handle the dict struct */ - dict *newd = activeDefragAlloc(d); - if (newd) - defragged++, *dictRef = d = newd; + long defragged = 0; /* handle the first hash table */ newtable = activeDefragAlloc(d->ht[0].table); if (newtable) @@ -246,6 +245,146 @@ double *zslDefrag(zskiplist *zsl, double score, sds oldele, sds newele) { return NULL; } +/* Defrag helpler for sorted set. + * Defrag a single dict entry key name, and corresponding skiplist struct */ +long activeDefragZsetEntry(zset *zs, dictEntry *de) { + sds newsds; + double* newscore; + long defragged = 0; + sds sdsele = dictGetKey(de); + if ((newsds = activeDefragSds(sdsele))) + defragged++, de->key = newsds; + newscore = zslDefrag(zs->zsl, *(double*)dictGetVal(de), sdsele, newsds); + if (newscore) { + dictSetVal(zs->dict, de, newscore); + defragged++; + } + return defragged; +} + +#define DEFRAG_SDS_DICT_NO_VAL 0 +#define DEFRAG_SDS_DICT_VAL_IS_SDS 1 +#define DEFRAG_SDS_DICT_VAL_IS_STROB 2 +#define DEFRAG_SDS_DICT_VAL_VOID_PTR 3 + +/* Defrag a dict with sds key and optional value (either ptr, sds or robj string) */ +long activeDefragSdsDict(dict* d, int val_type) { + dictIterator *di; + dictEntry *de; + long defragged = 0; + di = dictGetIterator(d); + while((de = dictNext(di)) != NULL) { + sds sdsele = dictGetKey(de), newsds; + if ((newsds = activeDefragSds(sdsele))) + de->key = newsds, defragged++; + /* defrag the value */ + if (val_type == DEFRAG_SDS_DICT_VAL_IS_SDS) { + sdsele = dictGetVal(de); + if ((newsds = activeDefragSds(sdsele))) + de->v.val = newsds, defragged++; + } else if (val_type == DEFRAG_SDS_DICT_VAL_IS_STROB) { + robj *newele, *ele = dictGetVal(de); + if ((newele = activeDefragStringOb(ele, &defragged))) + de->v.val = newele; + } else if (val_type == DEFRAG_SDS_DICT_VAL_VOID_PTR) { + void *newptr, *ptr = dictGetVal(de); + if ((newptr = activeDefragAlloc(ptr))) + de->v.val = newptr, defragged++; + } + defragged += dictIterDefragEntry(di); + } + dictReleaseIterator(di); + return defragged; +} + +/* Defrag a list of ptr, sds or robj string values */ +long activeDefragList(list *l, int val_type) { + long defragged = 0; + listNode *ln, *newln; + for (ln = l->head; ln; ln = ln->next) { + if ((newln = activeDefragAlloc(ln))) { + if (newln->prev) + newln->prev->next = newln; + else + l->head = newln; + if (newln->next) + newln->next->prev = newln; + else + l->tail = newln; + ln = newln; + defragged++; + } + if (val_type == DEFRAG_SDS_DICT_VAL_IS_SDS) { + sds newsds, sdsele = ln->value; + if ((newsds = activeDefragSds(sdsele))) + ln->value = newsds, defragged++; + } else if (val_type == DEFRAG_SDS_DICT_VAL_IS_STROB) { + robj *newele, *ele = ln->value; + if ((newele = activeDefragStringOb(ele, &defragged))) + ln->value = newele; + } else if (val_type == DEFRAG_SDS_DICT_VAL_VOID_PTR) { + void *newptr, *ptr = ln->value; + if ((newptr = activeDefragAlloc(ptr))) + ln->value = newptr, defragged++; + } + } + return defragged; +} + +/* Defrag a list of sds values and a dict with the same sds keys */ +long activeDefragSdsListAndDict(list *l, dict *d, int dict_val_type) { + long defragged = 0; + sds newsds, sdsele; + listNode *ln, *newln; + dictIterator *di; + dictEntry *de; + /* Defrag the list and it's sds values */ + for (ln = l->head; ln; ln = ln->next) { + if ((newln = activeDefragAlloc(ln))) { + if (newln->prev) + newln->prev->next = newln; + else + l->head = newln; + if (newln->next) + newln->next->prev = newln; + else + l->tail = newln; + ln = newln; + defragged++; + } + sdsele = ln->value; + if ((newsds = activeDefragSds(sdsele))) { + /* When defragging an sds value, we need to update the dict key */ + unsigned int hash = dictGetHash(d, sdsele); + replaceSateliteDictKeyPtrAndOrDefragDictEntry(d, sdsele, newsds, hash, &defragged); + ln->value = newsds; + defragged++; + } + } + + /* Defrag the dict values (keys were already handled) */ + di = dictGetIterator(d); + while((de = dictNext(di)) != NULL) { + if (dict_val_type == DEFRAG_SDS_DICT_VAL_IS_SDS) { + sds newsds, sdsele = dictGetVal(de); + if ((newsds = activeDefragSds(sdsele))) + de->v.val = newsds, defragged++; + } else if (dict_val_type == DEFRAG_SDS_DICT_VAL_IS_STROB) { + robj *newele, *ele = dictGetVal(de); + if ((newele = activeDefragStringOb(ele, &defragged))) + de->v.val = newele, defragged++; + } else if (dict_val_type == DEFRAG_SDS_DICT_VAL_VOID_PTR) { + void *newptr, *ptr = ln->value; + if ((newptr = activeDefragAlloc(ptr))) + ln->value = newptr, defragged++; + } + defragged += dictIterDefragEntry(di); + } + dictReleaseIterator(di); + + return defragged; +} + /* Utility function that replaces an old key pointer in the dictionary with a * new pointer. Additionally, we try to defrag the dictEntry in that dict. * Oldkey mey be a dead pointer and should not be accessed (we get a @@ -253,7 +392,7 @@ double *zslDefrag(zskiplist *zsl, double score, sds oldele, sds newele) { * moved. Return value is the the dictEntry if found, or NULL if not found. * NOTE: this is very ugly code, but it let's us avoid the complication of * doing a scan on another dict. */ -dictEntry* replaceSateliteDictKeyPtrAndOrDefragDictEntry(dict *d, sds oldkey, sds newkey, unsigned int hash, int *defragged) { +dictEntry* replaceSateliteDictKeyPtrAndOrDefragDictEntry(dict *d, sds oldkey, sds newkey, unsigned int hash, long *defragged) { dictEntry **deref = dictFindEntryRefByPtrAndHash(d, oldkey, hash); if (deref) { dictEntry *de = *deref; @@ -269,16 +408,198 @@ dictEntry* replaceSateliteDictKeyPtrAndOrDefragDictEntry(dict *d, sds oldkey, sd return NULL; } +long activeDefragQuickListNodes(quicklist *ql) { + quicklistNode *node = ql->head, *newnode; + long defragged = 0; + unsigned char *newzl; + while (node) { + if ((newnode = activeDefragAlloc(node))) { + if (newnode->prev) + newnode->prev->next = newnode; + else + ql->head = newnode; + if (newnode->next) + newnode->next->prev = newnode; + else + ql->tail = newnode; + node = newnode; + defragged++; + } + if ((newzl = activeDefragAlloc(node->zl))) + defragged++, node->zl = newzl; + node = node->next; + } + return defragged; +} + +/* when the value has lots of elements, we want to handle it later and not as + * oart of the main dictionary scan. this is needed in order to prevent latency + * spikes when handling large items */ +void defragLater(redisDb *db, dictEntry *kde) { + sds key = sdsdup(dictGetKey(kde)); + listAddNodeTail(db->defrag_later, key); +} + +long scanLaterList(robj *ob) { + quicklist *ql = ob->ptr; + if (ob->type != OBJ_LIST || ob->encoding != OBJ_ENCODING_QUICKLIST) + return 0; + server.stat_active_defrag_scanned+=ql->len; + return activeDefragQuickListNodes(ql); +} + +typedef struct { + zset *zs; + long defragged; +} scanLaterZsetData; + +void scanLaterZsetCallback(void *privdata, const dictEntry *_de) { + dictEntry *de = (dictEntry*)_de; + scanLaterZsetData *data = privdata; + data->defragged += activeDefragZsetEntry(data->zs, de); + server.stat_active_defrag_scanned++; +} + +long scanLaterZset(robj *ob, unsigned long *cursor) { + if (ob->type != OBJ_ZSET || ob->encoding != OBJ_ENCODING_SKIPLIST) + return 0; + zset *zs = (zset*)ob->ptr; + dict *d = zs->dict; + scanLaterZsetData data = {zs, 0}; + *cursor = dictScan(d, *cursor, scanLaterZsetCallback, defragDictBucketCallback, &data); + return data.defragged; +} + +void scanLaterSetCallback(void *privdata, const dictEntry *_de) { + dictEntry *de = (dictEntry*)_de; + long *defragged = privdata; + sds sdsele = dictGetKey(de), newsds; + if ((newsds = activeDefragSds(sdsele))) + (*defragged)++, de->key = newsds; + server.stat_active_defrag_scanned++; +} + +long scanLaterSet(robj *ob, unsigned long *cursor) { + long defragged = 0; + if (ob->type != OBJ_SET || ob->encoding != OBJ_ENCODING_HT) + return 0; + dict *d = ob->ptr; + *cursor = dictScan(d, *cursor, scanLaterSetCallback, defragDictBucketCallback, &defragged); + return defragged; +} + +void scanLaterHashCallback(void *privdata, const dictEntry *_de) { + dictEntry *de = (dictEntry*)_de; + long *defragged = privdata; + sds sdsele = dictGetKey(de), newsds; + if ((newsds = activeDefragSds(sdsele))) + (*defragged)++, de->key = newsds; + sdsele = dictGetVal(de); + if ((newsds = activeDefragSds(sdsele))) + (*defragged)++, de->v.val = newsds; + server.stat_active_defrag_scanned++; +} + +long scanLaterHash(robj *ob, unsigned long *cursor) { + long defragged = 0; + if (ob->type != OBJ_HASH || ob->encoding != OBJ_ENCODING_HT) + return 0; + dict *d = ob->ptr; + *cursor = dictScan(d, *cursor, scanLaterHashCallback, defragDictBucketCallback, &defragged); + return defragged; +} + +long defragQuicklist(redisDb *db, dictEntry *kde) { + robj *ob = dictGetVal(kde); + long defragged = 0; + quicklist *ql = ob->ptr, *newql; + serverAssert(ob->type == OBJ_LIST && ob->encoding == OBJ_ENCODING_QUICKLIST); + if ((newql = activeDefragAlloc(ql))) + defragged++, ob->ptr = ql = newql; + if (ql->len > server.active_defrag_max_scan_fields) + defragLater(db, kde); + else + defragged += activeDefragQuickListNodes(ql); + return defragged; +} + +long defragZsetSkiplist(redisDb *db, dictEntry *kde) { + robj *ob = dictGetVal(kde); + long defragged = 0; + zset *zs = (zset*)ob->ptr; + zset *newzs; + zskiplist *newzsl; + dict *newdict; + dictEntry *de; + struct zskiplistNode *newheader; + serverAssert(ob->type == OBJ_ZSET && ob->encoding == OBJ_ENCODING_SKIPLIST); + if ((newzs = activeDefragAlloc(zs))) + defragged++, ob->ptr = zs = newzs; + if ((newzsl = activeDefragAlloc(zs->zsl))) + defragged++, zs->zsl = newzsl; + if ((newheader = activeDefragAlloc(zs->zsl->header))) + defragged++, zs->zsl->header = newheader; + if (dictSize(zs->dict) > server.active_defrag_max_scan_fields) + defragLater(db, kde); + else { + dictIterator *di = dictGetIterator(zs->dict); + while((de = dictNext(di)) != NULL) { + defragged += activeDefragZsetEntry(zs, de); + } + dictReleaseIterator(di); + } + /* handle the dict struct */ + if ((newdict = activeDefragAlloc(zs->dict))) + defragged++, zs->dict = newdict; + /* defrag the dict tables */ + defragged += dictDefragTables(zs->dict); + return defragged; +} + +long defragHash(redisDb *db, dictEntry *kde) { + long defragged = 0; + robj *ob = dictGetVal(kde); + dict *d, *newd; + serverAssert(ob->type == OBJ_HASH && ob->encoding == OBJ_ENCODING_HT); + d = ob->ptr; + if (dictSize(d) > server.active_defrag_max_scan_fields) + defragLater(db, kde); + else + defragged += activeDefragSdsDict(d, DEFRAG_SDS_DICT_VAL_IS_SDS); + /* handle the dict struct */ + if ((newd = activeDefragAlloc(ob->ptr))) + defragged++, ob->ptr = newd; + /* defrag the dict tables */ + defragged += dictDefragTables(ob->ptr); + return defragged; +} + +long defragSet(redisDb *db, dictEntry *kde) { + long defragged = 0; + robj *ob = dictGetVal(kde); + dict *d, *newd; + serverAssert(ob->type == OBJ_SET && ob->encoding == OBJ_ENCODING_HT); + d = ob->ptr; + if (dictSize(d) > server.active_defrag_max_scan_fields) + defragLater(db, kde); + else + defragged += activeDefragSdsDict(d, DEFRAG_SDS_DICT_NO_VAL); + /* handle the dict struct */ + if ((newd = activeDefragAlloc(ob->ptr))) + defragged++, ob->ptr = newd; + /* defrag the dict tables */ + defragged += dictDefragTables(ob->ptr); + return defragged; +} + /* for each key we scan in the main dict, this function will attempt to defrag * all the various pointers it has. Returns a stat of how many pointers were * moved. */ -int defragKey(redisDb *db, dictEntry *de) { +long defragKey(redisDb *db, dictEntry *de) { sds keysds = dictGetKey(de); robj *newob, *ob; unsigned char *newzl; - dict *d; - dictIterator *di; - int defragged = 0; + long defragged = 0; sds newsds; /* Try to defrag the key name. */ @@ -304,27 +625,7 @@ int defragKey(redisDb *db, dictEntry *de) { /* Already handled in activeDefragStringOb. */ } else if (ob->type == OBJ_LIST) { if (ob->encoding == OBJ_ENCODING_QUICKLIST) { - quicklist *ql = ob->ptr, *newql; - quicklistNode *node = ql->head, *newnode; - if ((newql = activeDefragAlloc(ql))) - defragged++, ob->ptr = ql = newql; - while (node) { - if ((newnode = activeDefragAlloc(node))) { - if (newnode->prev) - newnode->prev->next = newnode; - else - ql->head = newnode; - if (newnode->next) - newnode->next->prev = newnode; - else - ql->tail = newnode; - node = newnode; - defragged++; - } - if ((newzl = activeDefragAlloc(node->zl))) - defragged++, node->zl = newzl; - node = node->next; - } + defragged += defragQuicklist(db, de); } else if (ob->encoding == OBJ_ENCODING_ZIPLIST) { if ((newzl = activeDefragAlloc(ob->ptr))) defragged++, ob->ptr = newzl; @@ -333,20 +634,10 @@ int defragKey(redisDb *db, dictEntry *de) { } } else if (ob->type == OBJ_SET) { if (ob->encoding == OBJ_ENCODING_HT) { - d = ob->ptr; - di = dictGetIterator(d); - while((de = dictNext(di)) != NULL) { - sds sdsele = dictGetKey(de); - if ((newsds = activeDefragSds(sdsele))) - defragged++, de->key = newsds; - defragged += dictIterDefragEntry(di); - } - dictReleaseIterator(di); - dictDefragTables((dict**)&ob->ptr); + defragged += defragSet(db, de); } else if (ob->encoding == OBJ_ENCODING_INTSET) { - intset *is = ob->ptr; - intset *newis = activeDefragAlloc(is); - if (newis) + intset *newis, *is = ob->ptr; + if ((newis = activeDefragAlloc(is))) defragged++, ob->ptr = newis; } else { serverPanic("Unknown set encoding"); @@ -356,32 +647,7 @@ int defragKey(redisDb *db, dictEntry *de) { if ((newzl = activeDefragAlloc(ob->ptr))) defragged++, ob->ptr = newzl; } else if (ob->encoding == OBJ_ENCODING_SKIPLIST) { - zset *zs = (zset*)ob->ptr; - zset *newzs; - zskiplist *newzsl; - struct zskiplistNode *newheader; - if ((newzs = activeDefragAlloc(zs))) - defragged++, ob->ptr = zs = newzs; - if ((newzsl = activeDefragAlloc(zs->zsl))) - defragged++, zs->zsl = newzsl; - if ((newheader = activeDefragAlloc(zs->zsl->header))) - defragged++, zs->zsl->header = newheader; - d = zs->dict; - di = dictGetIterator(d); - while((de = dictNext(di)) != NULL) { - double* newscore; - sds sdsele = dictGetKey(de); - if ((newsds = activeDefragSds(sdsele))) - defragged++, de->key = newsds; - newscore = zslDefrag(zs->zsl, *(double*)dictGetVal(de), sdsele, newsds); - if (newscore) { - dictSetVal(d, de, newscore); - defragged++; - } - defragged += dictIterDefragEntry(di); - } - dictReleaseIterator(di); - dictDefragTables(&zs->dict); + defragged += defragZsetSkiplist(db, de); } else { serverPanic("Unknown sorted set encoding"); } @@ -390,19 +656,7 @@ int defragKey(redisDb *db, dictEntry *de) { if ((newzl = activeDefragAlloc(ob->ptr))) defragged++, ob->ptr = newzl; } else if (ob->encoding == OBJ_ENCODING_HT) { - d = ob->ptr; - di = dictGetIterator(d); - while((de = dictNext(di)) != NULL) { - sds sdsele = dictGetKey(de); - if ((newsds = activeDefragSds(sdsele))) - defragged++, de->key = newsds; - sdsele = dictGetVal(de); - if ((newsds = activeDefragSds(sdsele))) - defragged++, de->v.val = newsds; - defragged += dictIterDefragEntry(di); - } - dictReleaseIterator(di); - dictDefragTables((dict**)&ob->ptr); + defragged += defragHash(db, de); } else { serverPanic("Unknown hash encoding"); } @@ -417,18 +671,19 @@ int defragKey(redisDb *db, dictEntry *de) { /* Defrag scan callback for the main db dictionary. */ void defragScanCallback(void *privdata, const dictEntry *de) { - int defragged = defragKey((redisDb*)privdata, (dictEntry*)de); + long defragged = defragKey((redisDb*)privdata, (dictEntry*)de); server.stat_active_defrag_hits += defragged; if(defragged) server.stat_active_defrag_key_hits++; else server.stat_active_defrag_key_misses++; + server.stat_active_defrag_scanned++; } /* Defrag scan callback for for each hash table bicket, * used in order to defrag the dictEntry allocations. */ void defragDictBucketCallback(void *privdata, dictEntry **bucketref) { - UNUSED(privdata); + UNUSED(privdata); /* NOTE: this function is also used by both activeDefragCycle and scanLaterHash, etc. don't use privdata */ while(*bucketref) { dictEntry *de = *bucketref, *newde; if ((newde = activeDefragAlloc(de))) { @@ -439,7 +694,7 @@ void defragDictBucketCallback(void *privdata, dictEntry **bucketref) { } /* Utility function to get the fragmentation ratio from jemalloc. - * It is critical to do that by comparing only heap maps that belown to + * It is critical to do that by comparing only heap maps that belong to * jemalloc, and skip ones the jemalloc keeps as spare. Since we use this * fragmentation ratio in order to decide if a defrag action should be taken * or not, a false detection can cause the defragmenter to waste a lot of CPU @@ -464,14 +719,147 @@ float getAllocatorFragmentation(size_t *out_frag_bytes) { if(out_frag_bytes) *out_frag_bytes = frag_bytes; serverLog(LL_DEBUG, - "allocated=%zu, active=%zu, resident=%zu, frag=%.0f%% (%.0f%% rss), frag_bytes=%zu (%zu%% rss)", + "allocated=%zu, active=%zu, resident=%zu, frag=%.0f%% (%.0f%% rss), frag_bytes=%zu (%zu rss)", allocated, active, resident, frag_pct, rss_pct, frag_bytes, rss_bytes); return frag_pct; } +/* We may need to defrag other globals, one small allcation can hold a full allocator run. + * so although small, it is still important to defrag these */ +long defragOtherGlobals() { + long defragged = 0; + + /* there are many more pointers to defrag (e.g. client argv, output / aof buffers, etc. + * but we assume most of these are short lived, we only need to defrag allocations + * that remain static for a long time */ + defragged += activeDefragSdsDict(server.lua_scripts, DEFRAG_SDS_DICT_VAL_IS_STROB); + defragged += activeDefragSdsListAndDict(server.repl_scriptcache_fifo, server.repl_scriptcache_dict, DEFRAG_SDS_DICT_NO_VAL); + return defragged; +} + +unsigned long defragLaterItem(dictEntry *de, unsigned long cursor) { + long defragged = 0; + if (de) { + robj *ob = dictGetVal(de); + if (ob->type == OBJ_LIST) { + defragged += scanLaterList(ob); + cursor = 0; /* list has no scan, we must finish it in one go */ + } else if (ob->type == OBJ_SET) { + defragged += scanLaterSet(ob, &cursor); + } else if (ob->type == OBJ_ZSET) { + defragged += scanLaterZset(ob, &cursor); + } else if (ob->type == OBJ_HASH) { + defragged += scanLaterHash(ob, &cursor); + } else { + cursor = 0; /* object type may have changed since we schedule it for later */ + } + } else { + cursor = 0; /* object may have been deleted already */ + } + server.stat_active_defrag_hits += defragged; + return cursor; +} + +/* returns 0 if no more work needs to be been done, and 1 if time is up and more work is needed. */ +int defragLaterStep(redisDb *db, long long endtime) { + static sds current_key = NULL; + static unsigned long cursor = 0; + unsigned int iterations = 0; + unsigned long long prev_defragged = server.stat_active_defrag_hits; + unsigned long long prev_scanned = server.stat_active_defrag_scanned; + long long key_defragged; + + do { + /* if we're not continuing a scan from the last call or loop, start a new one */ + if (!cursor) { + listNode *head = listFirst(db->defrag_later); + + /* Move on to next key */ + if (current_key) { + serverAssert(current_key == head->value); + sdsfree(head->value); + listDelNode(db->defrag_later, head); + cursor = 0; + current_key = NULL; + } + + /* stop if we reached the last one. */ + head = listFirst(db->defrag_later); + if (!head) + return 0; + + /* start a new key */ + current_key = head->value; + cursor = 0; + } + + /* each time we enter this function we need to fetch the key from the dict again (if it still exists) */ + dictEntry *de = dictFind(db->dict, current_key); + key_defragged = server.stat_active_defrag_hits; + do { + cursor = defragLaterItem(de, cursor); + + /* Once in 16 scan iterations, 512 pointer reallocations, or 64 fields + * (if we have a lot of pointers in one hash bucket, or rehashing), + * check if we reached the time limit. + * But regardless, don't start a new BIG key in this loop, this is because the + * next key can be a list, and scanLaterList must be done in once cycle */ + if (!cursor || (++iterations > 16 || + server.stat_active_defrag_hits - prev_defragged > 512 || + server.stat_active_defrag_scanned - prev_scanned > 64)) { + if (!cursor || ustime() > endtime) { + if(key_defragged != server.stat_active_defrag_hits) + server.stat_active_defrag_key_hits++; + else + server.stat_active_defrag_key_misses++; + return 1; + } + iterations = 0; + prev_defragged = server.stat_active_defrag_hits; + prev_scanned = server.stat_active_defrag_scanned; + } + } while(cursor); + if(key_defragged != server.stat_active_defrag_hits) + server.stat_active_defrag_key_hits++; + else + server.stat_active_defrag_key_misses++; + } while(1); +} + #define INTERPOLATE(x, x1, x2, y1, y2) ( (y1) + ((x)-(x1)) * ((y2)-(y1)) / ((x2)-(x1)) ) #define LIMIT(y, min, max) ((y)<(min)? min: ((y)>(max)? max: (y))) +/* decide if defrag is needed, and at what CPU effort to invest in it */ +void computeDefragCycles() { + size_t frag_bytes; + float frag_pct = getAllocatorFragmentation(&frag_bytes); + /* If we're not already running, and below the threshold, exit. */ + if (!server.active_defrag_running) { + if(frag_pct < server.active_defrag_threshold_lower || frag_bytes < server.active_defrag_ignore_bytes) + return; + } + + /* Calculate the adaptive aggressiveness of the defrag */ + int cpu_pct = INTERPOLATE(frag_pct, + server.active_defrag_threshold_lower, + server.active_defrag_threshold_upper, + server.active_defrag_cycle_min, + server.active_defrag_cycle_max); + cpu_pct = LIMIT(cpu_pct, + server.active_defrag_cycle_min, + server.active_defrag_cycle_max); + /* We allow increasing the aggressiveness during a scan, but don't + * reduce it. */ + if (!server.active_defrag_running || + cpu_pct > server.active_defrag_running) + { + server.active_defrag_running = cpu_pct; + serverLog(LL_VERBOSE, + "Starting active defrag, frag=%.0f%%, frag_bytes=%zu, cpu=%d%%", + frag_pct, frag_bytes, cpu_pct); + } +} + /* Perform incremental defragmentation work from the serverCron. * This works in a similar way to activeExpireCycle, in the sense that * we do incremental work across calls. */ @@ -481,8 +869,11 @@ void activeDefragCycle(void) { static redisDb *db = NULL; static long long start_scan, start_stat; unsigned int iterations = 0; - unsigned long long defragged = server.stat_active_defrag_hits; - long long start, timelimit; + unsigned long long prev_defragged = server.stat_active_defrag_hits; + unsigned long long prev_scanned = server.stat_active_defrag_scanned; + long long start, timelimit, endtime; + mstime_t latency; + int quit = 0; if (server.aof_child_pid!=-1 || server.rdb_child_pid!=-1) return; /* Defragging memory while there's a fork will just do damage. */ @@ -490,33 +881,7 @@ void activeDefragCycle(void) { /* Once a second, check if we the fragmentation justfies starting a scan * or making it more aggressive. */ run_with_period(1000) { - size_t frag_bytes; - float frag_pct = getAllocatorFragmentation(&frag_bytes); - /* If we're not already running, and below the threshold, exit. */ - if (!server.active_defrag_running) { - if(frag_pct < server.active_defrag_threshold_lower || frag_bytes < server.active_defrag_ignore_bytes) - return; - } - - /* Calculate the adaptive aggressiveness of the defrag */ - int cpu_pct = INTERPOLATE(frag_pct, - server.active_defrag_threshold_lower, - server.active_defrag_threshold_upper, - server.active_defrag_cycle_min, - server.active_defrag_cycle_max); - cpu_pct = LIMIT(cpu_pct, - server.active_defrag_cycle_min, - server.active_defrag_cycle_max); - /* We allow increasing the aggressiveness during a scan, but don't - * reduce it. */ - if (!server.active_defrag_running || - cpu_pct > server.active_defrag_running) - { - server.active_defrag_running = cpu_pct; - serverLog(LL_VERBOSE, - "Starting active defrag, frag=%.0f%%, frag_bytes=%zu, cpu=%d%%", - frag_pct, frag_bytes, cpu_pct); - } + computeDefragCycles(); } if (!server.active_defrag_running) return; @@ -525,11 +890,23 @@ void activeDefragCycle(void) { start = ustime(); timelimit = 1000000*server.active_defrag_running/server.hz/100; if (timelimit <= 0) timelimit = 1; + endtime = start + timelimit; + latencyStartMonitor(latency); do { + /* if we're not continuing a scan from the last call or loop, start a new one */ if (!cursor) { + /* finish any leftovers from previous db before moving to the next one */ + if (db && defragLaterStep(db, endtime)) { + quit = 1; /* time is up, we didn't finish all the work */ + break; /* this will exit the function and we'll continue on the next cycle */ + } + /* Move on to next database, and stop if we reached the last one. */ if (++current_db >= server.dbnum) { + /* defrag other items not part of the db / keys */ + defragOtherGlobals(); + long long now = ustime(); size_t frag_bytes; float frag_pct = getAllocatorFragmentation(&frag_bytes); @@ -542,7 +919,11 @@ void activeDefragCycle(void) { cursor = 0; db = NULL; server.active_defrag_running = 0; - return; + + computeDefragCycles(); /* if another scan is needed, start it right away */ + if (server.active_defrag_running != 0 && ustime() < endtime) + continue; + break; } else if (current_db==0) { /* Start a scan from the first database. */ @@ -555,19 +936,35 @@ void activeDefragCycle(void) { } do { + /* before scanning the next bucket, see if we have big keys left from the previous bucket to scan */ + if (defragLaterStep(db, endtime)) { + quit = 1; /* time is up, we didn't finish all the work */ + break; /* this will exit the function and we'll continue on the next cycle */ + } + cursor = dictScan(db->dict, cursor, defragScanCallback, defragDictBucketCallback, db); - /* Once in 16 scan iterations, or 1000 pointer reallocations - * (if we have a lot of pointers in one hash bucket), check if we - * reached the tiem limit. */ - if (cursor && (++iterations > 16 || server.stat_active_defrag_hits - defragged > 1000)) { - if ((ustime() - start) > timelimit) { - return; + + /* Once in 16 scan iterations, 512 pointer reallocations. or 64 keys + * (if we have a lot of pointers in one hash bucket or rehasing), + * check if we reached the time limit. + * But regardless, don't start a new db in this loop, this is because after + * the last db we call defragOtherGlobals, which must be done in once cycle */ + if (!cursor || (++iterations > 16 || + server.stat_active_defrag_hits - prev_defragged > 512 || + server.stat_active_defrag_scanned - prev_scanned > 64)) { + if (!cursor || ustime() > endtime) { + quit = 1; + break; } iterations = 0; - defragged = server.stat_active_defrag_hits; + prev_defragged = server.stat_active_defrag_hits; + prev_scanned = server.stat_active_defrag_scanned; } - } while(cursor); - } while(1); + } while(cursor && !quit); + } while(!quit); + + latencyEndMonitor(latency); + latencyAddSampleIfNeeded("active-defrag-cycle",latency); } #else /* HAVE_DEFRAG */ diff --git a/src/server.c b/src/server.c index c14255db..c6d29229 100644 --- a/src/server.c +++ b/src/server.c @@ -1389,6 +1389,7 @@ void initServerConfig(void) { server.active_defrag_threshold_upper = CONFIG_DEFAULT_DEFRAG_THRESHOLD_UPPER; server.active_defrag_cycle_min = CONFIG_DEFAULT_DEFRAG_CYCLE_MIN; server.active_defrag_cycle_max = CONFIG_DEFAULT_DEFRAG_CYCLE_MAX; + server.active_defrag_max_scan_fields = CONFIG_DEFAULT_DEFRAG_MAX_SCAN_FIELDS; server.proto_max_bulk_len = CONFIG_DEFAULT_PROTO_MAX_BULK_LEN; server.client_max_querybuf_len = PROTO_MAX_QUERYBUF_LEN; server.saveparams = NULL; @@ -1806,6 +1807,7 @@ void resetServerStats(void) { server.stat_active_defrag_misses = 0; server.stat_active_defrag_key_hits = 0; server.stat_active_defrag_key_misses = 0; + server.stat_active_defrag_scanned = 0; server.stat_fork_time = 0; server.stat_fork_rate = 0; server.stat_rejected_conn = 0; @@ -1894,6 +1896,7 @@ void initServer(void) { server.db[j].watched_keys = dictCreate(&keylistDictType,NULL); server.db[j].id = j; server.db[j].avg_ttl = 0; + server.db[j].defrag_later = listCreate(); } evictionPoolAlloc(); /* Initialize the LRU keys pool. */ server.pubsub_channels = dictCreate(&keylistDictType,NULL); diff --git a/src/server.h b/src/server.h index 5b6074a8..3256278e 100644 --- a/src/server.h +++ b/src/server.h @@ -159,8 +159,9 @@ typedef long long mstime_t; /* millisecond time type. */ #define CONFIG_DEFAULT_DEFRAG_THRESHOLD_LOWER 10 /* don't defrag when fragmentation is below 10% */ #define CONFIG_DEFAULT_DEFRAG_THRESHOLD_UPPER 100 /* maximum defrag force at 100% fragmentation */ #define CONFIG_DEFAULT_DEFRAG_IGNORE_BYTES (100<<20) /* don't defrag if frag overhead is below 100mb */ -#define CONFIG_DEFAULT_DEFRAG_CYCLE_MIN 25 /* 25% CPU min (at lower threshold) */ +#define CONFIG_DEFAULT_DEFRAG_CYCLE_MIN 5 /* 5% CPU min (at lower threshold) */ #define CONFIG_DEFAULT_DEFRAG_CYCLE_MAX 75 /* 75% CPU max (at upper threshold) */ +#define CONFIG_DEFAULT_DEFRAG_MAX_SCAN_FIELDS 1000 /* keys with more than 1000 fields will be processed separately */ #define CONFIG_DEFAULT_PROTO_MAX_BULK_LEN (512ll*1024*1024) /* Bulk request max size */ #define ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP 20 /* Loopkups per loop. */ @@ -622,6 +623,7 @@ typedef struct redisDb { dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */ int id; /* Database ID */ long long avg_ttl; /* Average TTL, just for stats */ + list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */ } redisDb; /* Client MULTI/EXEC state */ @@ -957,6 +959,7 @@ struct redisServer { long long stat_active_defrag_misses; /* number of allocations scanned but not moved */ long long stat_active_defrag_key_hits; /* number of keys with moved allocations */ long long stat_active_defrag_key_misses;/* number of keys scanned and not moved */ + long long stat_active_defrag_scanned; /* number of dictEntries scanned */ size_t stat_peak_memory; /* Max used memory record */ long long stat_fork_time; /* Time needed to perform latest fork() */ double stat_fork_rate; /* Fork rate in GB/sec. */ @@ -992,6 +995,7 @@ struct redisServer { int active_defrag_threshold_upper; /* maximum percentage of fragmentation at which we use maximum effort */ int active_defrag_cycle_min; /* minimal effort for defrag in CPU percentage */ int active_defrag_cycle_max; /* maximal effort for defrag in CPU percentage */ + unsigned long active_defrag_max_scan_fields; /* maximum number of fields of set/hash/zset/list to process from within the main dict scan */ size_t client_max_querybuf_len; /* Limit for client query buffer length */ int dbnum; /* Total number of configured DBs */ int supervised; /* 1 if supervised, 0 otherwise. */