active defrag v2

- big keys are not defragged in one go from within the dict scan
  instead they are scanned in parts after the main dict hash bucket is done.
- add latency monitor sample for defrag
- change default active-defrag-cycle-min to induce lower latency
- make active defrag start a new scan right away if needed, so it's easier
  (for the test suite) to detect when it's done
- make active defrag quick the current cycle after each db / big key
- defrag  some non key long term global allocations
- some refactoring for smaller functions and more reusable code
- during dict rehashing, one scan iteration of the dict, can end up scanning
  one bucket in the smaller dict and many many buckets in the larger dict.
  so waiting for 16 scan iterations before checking the time, may be much too long.
This commit is contained in:
Oran Agra 2018-02-18 17:15:22 +02:00
parent aa57481d8c
commit be1b4aa9aa
5 changed files with 551 additions and 133 deletions

View File

@ -1302,8 +1302,12 @@ aof-rewrite-incremental-fsync yes
# active-defrag-threshold-upper 100
# Minimal effort for defrag in CPU percentage
# active-defrag-cycle-min 25
# active-defrag-cycle-min 5
# Maximal effort for defrag in CPU percentage
# active-defrag-cycle-max 75
# Maximum number of set/hash/zset/list fields that will be processed from
# the main dictionary scan
# active-defrag-max-scan-fields 1000

View File

@ -537,6 +537,12 @@ void loadServerConfigFromString(char *config) {
err = "active-defrag-cycle-max must be between 1 and 99";
goto loaderr;
}
} else if (!strcasecmp(argv[0],"active-defrag-max-scan-fields") && argc == 2) {
server.active_defrag_max_scan_fields = strtoll(argv[1],NULL,10);
if (server.active_defrag_max_scan_fields < 1) {
err = "active-defrag-max-scan-fields must be positive";
goto loaderr;
}
} else if (!strcasecmp(argv[0],"hash-max-ziplist-entries") && argc == 2) {
server.hash_max_ziplist_entries = memtoll(argv[1], NULL);
} else if (!strcasecmp(argv[0],"hash-max-ziplist-value") && argc == 2) {
@ -1058,6 +1064,8 @@ void configSetCommand(client *c) {
"active-defrag-cycle-min",server.active_defrag_cycle_min,1,99) {
} config_set_numerical_field(
"active-defrag-cycle-max",server.active_defrag_cycle_max,1,99) {
} config_set_numerical_field(
"active-defrag-max-scan-fields",server.active_defrag_max_scan_fields,1,LLONG_MAX) {
} config_set_numerical_field(
"auto-aof-rewrite-percentage",server.aof_rewrite_perc,0,LLONG_MAX){
} config_set_numerical_field(
@ -1239,6 +1247,7 @@ void configGetCommand(client *c) {
config_get_numerical_field("active-defrag-ignore-bytes",server.active_defrag_ignore_bytes);
config_get_numerical_field("active-defrag-cycle-min",server.active_defrag_cycle_min);
config_get_numerical_field("active-defrag-cycle-max",server.active_defrag_cycle_max);
config_get_numerical_field("active-defrag-max-scan-fields",server.active_defrag_max_scan_fields);
config_get_numerical_field("auto-aof-rewrite-percentage",
server.aof_rewrite_perc);
config_get_numerical_field("auto-aof-rewrite-min-size",
@ -2013,6 +2022,7 @@ int rewriteConfig(char *path) {
rewriteConfigBytesOption(state,"active-defrag-ignore-bytes",server.active_defrag_ignore_bytes,CONFIG_DEFAULT_DEFRAG_IGNORE_BYTES);
rewriteConfigNumericalOption(state,"active-defrag-cycle-min",server.active_defrag_cycle_min,CONFIG_DEFAULT_DEFRAG_CYCLE_MIN);
rewriteConfigNumericalOption(state,"active-defrag-cycle-max",server.active_defrag_cycle_max,CONFIG_DEFAULT_DEFRAG_CYCLE_MAX);
rewriteConfigNumericalOption(state,"active-defrag-max-scan-fields",server.active_defrag_max_scan_fields,CONFIG_DEFAULT_DEFRAG_MAX_SCAN_FIELDS);
rewriteConfigYesNoOption(state,"appendonly",server.aof_state != AOF_OFF,0);
rewriteConfigStringOption(state,"appendfilename",server.aof_filename,CONFIG_DEFAULT_AOF_FILENAME);
rewriteConfigEnumOption(state,"appendfsync",server.aof_fsync,aof_fsync_enum,CONFIG_DEFAULT_AOF_FSYNC);

View File

@ -45,6 +45,10 @@
* pointers are worthwhile moving and which aren't */
int je_get_defrag_hint(void* ptr, int *bin_util, int *run_util);
/* forward declarations*/
void defragDictBucketCallback(void *privdata, dictEntry **bucketref);
dictEntry* replaceSateliteDictKeyPtrAndOrDefragDictEntry(dict *d, sds oldkey, sds newkey, unsigned int hash, long *defragged);
/* Defrag helper for generic allocations.
*
* returns NULL in case the allocatoin wasn't moved.
@ -96,7 +100,7 @@ sds activeDefragSds(sds sdsptr) {
* returns NULL in case the allocatoin wasn't moved.
* when it returns a non-null value, the old pointer was already released
* and should NOT be accessed. */
robj *activeDefragStringOb(robj* ob, int *defragged) {
robj *activeDefragStringOb(robj* ob, long *defragged) {
robj *ret = NULL;
if (ob->refcount!=1)
return NULL;
@ -134,11 +138,11 @@ robj *activeDefragStringOb(robj* ob, int *defragged) {
/* Defrag helper for dictEntries to be used during dict iteration (called on
* each step). Teturns a stat of how many pointers were moved. */
int dictIterDefragEntry(dictIterator *iter) {
long dictIterDefragEntry(dictIterator *iter) {
/* This function is a little bit dirty since it messes with the internals
* of the dict and it's iterator, but the benefit is that it is very easy
* to use, and require no other chagnes in the dict. */
int defragged = 0;
long defragged = 0;
dictht *ht;
/* Handle the next entry (if there is one), and update the pointer in the
* current entry. */
@ -166,14 +170,9 @@ int dictIterDefragEntry(dictIterator *iter) {
/* Defrag helper for dict main allocations (dict struct, and hash tables).
* receives a pointer to the dict* and implicitly updates it when the dict
* struct itself was moved. Returns a stat of how many pointers were moved. */
int dictDefragTables(dict** dictRef) {
dict *d = *dictRef;
long dictDefragTables(dict* d) {
dictEntry **newtable;
int defragged = 0;
/* handle the dict struct */
dict *newd = activeDefragAlloc(d);
if (newd)
defragged++, *dictRef = d = newd;
long defragged = 0;
/* handle the first hash table */
newtable = activeDefragAlloc(d->ht[0].table);
if (newtable)
@ -246,6 +245,146 @@ double *zslDefrag(zskiplist *zsl, double score, sds oldele, sds newele) {
return NULL;
}
/* Defrag helpler for sorted set.
* Defrag a single dict entry key name, and corresponding skiplist struct */
long activeDefragZsetEntry(zset *zs, dictEntry *de) {
sds newsds;
double* newscore;
long defragged = 0;
sds sdsele = dictGetKey(de);
if ((newsds = activeDefragSds(sdsele)))
defragged++, de->key = newsds;
newscore = zslDefrag(zs->zsl, *(double*)dictGetVal(de), sdsele, newsds);
if (newscore) {
dictSetVal(zs->dict, de, newscore);
defragged++;
}
return defragged;
}
#define DEFRAG_SDS_DICT_NO_VAL 0
#define DEFRAG_SDS_DICT_VAL_IS_SDS 1
#define DEFRAG_SDS_DICT_VAL_IS_STROB 2
#define DEFRAG_SDS_DICT_VAL_VOID_PTR 3
/* Defrag a dict with sds key and optional value (either ptr, sds or robj string) */
long activeDefragSdsDict(dict* d, int val_type) {
dictIterator *di;
dictEntry *de;
long defragged = 0;
di = dictGetIterator(d);
while((de = dictNext(di)) != NULL) {
sds sdsele = dictGetKey(de), newsds;
if ((newsds = activeDefragSds(sdsele)))
de->key = newsds, defragged++;
/* defrag the value */
if (val_type == DEFRAG_SDS_DICT_VAL_IS_SDS) {
sdsele = dictGetVal(de);
if ((newsds = activeDefragSds(sdsele)))
de->v.val = newsds, defragged++;
} else if (val_type == DEFRAG_SDS_DICT_VAL_IS_STROB) {
robj *newele, *ele = dictGetVal(de);
if ((newele = activeDefragStringOb(ele, &defragged)))
de->v.val = newele;
} else if (val_type == DEFRAG_SDS_DICT_VAL_VOID_PTR) {
void *newptr, *ptr = dictGetVal(de);
if ((newptr = activeDefragAlloc(ptr)))
de->v.val = newptr, defragged++;
}
defragged += dictIterDefragEntry(di);
}
dictReleaseIterator(di);
return defragged;
}
/* Defrag a list of ptr, sds or robj string values */
long activeDefragList(list *l, int val_type) {
long defragged = 0;
listNode *ln, *newln;
for (ln = l->head; ln; ln = ln->next) {
if ((newln = activeDefragAlloc(ln))) {
if (newln->prev)
newln->prev->next = newln;
else
l->head = newln;
if (newln->next)
newln->next->prev = newln;
else
l->tail = newln;
ln = newln;
defragged++;
}
if (val_type == DEFRAG_SDS_DICT_VAL_IS_SDS) {
sds newsds, sdsele = ln->value;
if ((newsds = activeDefragSds(sdsele)))
ln->value = newsds, defragged++;
} else if (val_type == DEFRAG_SDS_DICT_VAL_IS_STROB) {
robj *newele, *ele = ln->value;
if ((newele = activeDefragStringOb(ele, &defragged)))
ln->value = newele;
} else if (val_type == DEFRAG_SDS_DICT_VAL_VOID_PTR) {
void *newptr, *ptr = ln->value;
if ((newptr = activeDefragAlloc(ptr)))
ln->value = newptr, defragged++;
}
}
return defragged;
}
/* Defrag a list of sds values and a dict with the same sds keys */
long activeDefragSdsListAndDict(list *l, dict *d, int dict_val_type) {
long defragged = 0;
sds newsds, sdsele;
listNode *ln, *newln;
dictIterator *di;
dictEntry *de;
/* Defrag the list and it's sds values */
for (ln = l->head; ln; ln = ln->next) {
if ((newln = activeDefragAlloc(ln))) {
if (newln->prev)
newln->prev->next = newln;
else
l->head = newln;
if (newln->next)
newln->next->prev = newln;
else
l->tail = newln;
ln = newln;
defragged++;
}
sdsele = ln->value;
if ((newsds = activeDefragSds(sdsele))) {
/* When defragging an sds value, we need to update the dict key */
unsigned int hash = dictGetHash(d, sdsele);
replaceSateliteDictKeyPtrAndOrDefragDictEntry(d, sdsele, newsds, hash, &defragged);
ln->value = newsds;
defragged++;
}
}
/* Defrag the dict values (keys were already handled) */
di = dictGetIterator(d);
while((de = dictNext(di)) != NULL) {
if (dict_val_type == DEFRAG_SDS_DICT_VAL_IS_SDS) {
sds newsds, sdsele = dictGetVal(de);
if ((newsds = activeDefragSds(sdsele)))
de->v.val = newsds, defragged++;
} else if (dict_val_type == DEFRAG_SDS_DICT_VAL_IS_STROB) {
robj *newele, *ele = dictGetVal(de);
if ((newele = activeDefragStringOb(ele, &defragged)))
de->v.val = newele, defragged++;
} else if (dict_val_type == DEFRAG_SDS_DICT_VAL_VOID_PTR) {
void *newptr, *ptr = ln->value;
if ((newptr = activeDefragAlloc(ptr)))
ln->value = newptr, defragged++;
}
defragged += dictIterDefragEntry(di);
}
dictReleaseIterator(di);
return defragged;
}
/* Utility function that replaces an old key pointer in the dictionary with a
* new pointer. Additionally, we try to defrag the dictEntry in that dict.
* Oldkey mey be a dead pointer and should not be accessed (we get a
@ -253,7 +392,7 @@ double *zslDefrag(zskiplist *zsl, double score, sds oldele, sds newele) {
* moved. Return value is the the dictEntry if found, or NULL if not found.
* NOTE: this is very ugly code, but it let's us avoid the complication of
* doing a scan on another dict. */
dictEntry* replaceSateliteDictKeyPtrAndOrDefragDictEntry(dict *d, sds oldkey, sds newkey, unsigned int hash, int *defragged) {
dictEntry* replaceSateliteDictKeyPtrAndOrDefragDictEntry(dict *d, sds oldkey, sds newkey, unsigned int hash, long *defragged) {
dictEntry **deref = dictFindEntryRefByPtrAndHash(d, oldkey, hash);
if (deref) {
dictEntry *de = *deref;
@ -269,16 +408,198 @@ dictEntry* replaceSateliteDictKeyPtrAndOrDefragDictEntry(dict *d, sds oldkey, sd
return NULL;
}
long activeDefragQuickListNodes(quicklist *ql) {
quicklistNode *node = ql->head, *newnode;
long defragged = 0;
unsigned char *newzl;
while (node) {
if ((newnode = activeDefragAlloc(node))) {
if (newnode->prev)
newnode->prev->next = newnode;
else
ql->head = newnode;
if (newnode->next)
newnode->next->prev = newnode;
else
ql->tail = newnode;
node = newnode;
defragged++;
}
if ((newzl = activeDefragAlloc(node->zl)))
defragged++, node->zl = newzl;
node = node->next;
}
return defragged;
}
/* when the value has lots of elements, we want to handle it later and not as
* oart of the main dictionary scan. this is needed in order to prevent latency
* spikes when handling large items */
void defragLater(redisDb *db, dictEntry *kde) {
sds key = sdsdup(dictGetKey(kde));
listAddNodeTail(db->defrag_later, key);
}
long scanLaterList(robj *ob) {
quicklist *ql = ob->ptr;
if (ob->type != OBJ_LIST || ob->encoding != OBJ_ENCODING_QUICKLIST)
return 0;
server.stat_active_defrag_scanned+=ql->len;
return activeDefragQuickListNodes(ql);
}
typedef struct {
zset *zs;
long defragged;
} scanLaterZsetData;
void scanLaterZsetCallback(void *privdata, const dictEntry *_de) {
dictEntry *de = (dictEntry*)_de;
scanLaterZsetData *data = privdata;
data->defragged += activeDefragZsetEntry(data->zs, de);
server.stat_active_defrag_scanned++;
}
long scanLaterZset(robj *ob, unsigned long *cursor) {
if (ob->type != OBJ_ZSET || ob->encoding != OBJ_ENCODING_SKIPLIST)
return 0;
zset *zs = (zset*)ob->ptr;
dict *d = zs->dict;
scanLaterZsetData data = {zs, 0};
*cursor = dictScan(d, *cursor, scanLaterZsetCallback, defragDictBucketCallback, &data);
return data.defragged;
}
void scanLaterSetCallback(void *privdata, const dictEntry *_de) {
dictEntry *de = (dictEntry*)_de;
long *defragged = privdata;
sds sdsele = dictGetKey(de), newsds;
if ((newsds = activeDefragSds(sdsele)))
(*defragged)++, de->key = newsds;
server.stat_active_defrag_scanned++;
}
long scanLaterSet(robj *ob, unsigned long *cursor) {
long defragged = 0;
if (ob->type != OBJ_SET || ob->encoding != OBJ_ENCODING_HT)
return 0;
dict *d = ob->ptr;
*cursor = dictScan(d, *cursor, scanLaterSetCallback, defragDictBucketCallback, &defragged);
return defragged;
}
void scanLaterHashCallback(void *privdata, const dictEntry *_de) {
dictEntry *de = (dictEntry*)_de;
long *defragged = privdata;
sds sdsele = dictGetKey(de), newsds;
if ((newsds = activeDefragSds(sdsele)))
(*defragged)++, de->key = newsds;
sdsele = dictGetVal(de);
if ((newsds = activeDefragSds(sdsele)))
(*defragged)++, de->v.val = newsds;
server.stat_active_defrag_scanned++;
}
long scanLaterHash(robj *ob, unsigned long *cursor) {
long defragged = 0;
if (ob->type != OBJ_HASH || ob->encoding != OBJ_ENCODING_HT)
return 0;
dict *d = ob->ptr;
*cursor = dictScan(d, *cursor, scanLaterHashCallback, defragDictBucketCallback, &defragged);
return defragged;
}
long defragQuicklist(redisDb *db, dictEntry *kde) {
robj *ob = dictGetVal(kde);
long defragged = 0;
quicklist *ql = ob->ptr, *newql;
serverAssert(ob->type == OBJ_LIST && ob->encoding == OBJ_ENCODING_QUICKLIST);
if ((newql = activeDefragAlloc(ql)))
defragged++, ob->ptr = ql = newql;
if (ql->len > server.active_defrag_max_scan_fields)
defragLater(db, kde);
else
defragged += activeDefragQuickListNodes(ql);
return defragged;
}
long defragZsetSkiplist(redisDb *db, dictEntry *kde) {
robj *ob = dictGetVal(kde);
long defragged = 0;
zset *zs = (zset*)ob->ptr;
zset *newzs;
zskiplist *newzsl;
dict *newdict;
dictEntry *de;
struct zskiplistNode *newheader;
serverAssert(ob->type == OBJ_ZSET && ob->encoding == OBJ_ENCODING_SKIPLIST);
if ((newzs = activeDefragAlloc(zs)))
defragged++, ob->ptr = zs = newzs;
if ((newzsl = activeDefragAlloc(zs->zsl)))
defragged++, zs->zsl = newzsl;
if ((newheader = activeDefragAlloc(zs->zsl->header)))
defragged++, zs->zsl->header = newheader;
if (dictSize(zs->dict) > server.active_defrag_max_scan_fields)
defragLater(db, kde);
else {
dictIterator *di = dictGetIterator(zs->dict);
while((de = dictNext(di)) != NULL) {
defragged += activeDefragZsetEntry(zs, de);
}
dictReleaseIterator(di);
}
/* handle the dict struct */
if ((newdict = activeDefragAlloc(zs->dict)))
defragged++, zs->dict = newdict;
/* defrag the dict tables */
defragged += dictDefragTables(zs->dict);
return defragged;
}
long defragHash(redisDb *db, dictEntry *kde) {
long defragged = 0;
robj *ob = dictGetVal(kde);
dict *d, *newd;
serverAssert(ob->type == OBJ_HASH && ob->encoding == OBJ_ENCODING_HT);
d = ob->ptr;
if (dictSize(d) > server.active_defrag_max_scan_fields)
defragLater(db, kde);
else
defragged += activeDefragSdsDict(d, DEFRAG_SDS_DICT_VAL_IS_SDS);
/* handle the dict struct */
if ((newd = activeDefragAlloc(ob->ptr)))
defragged++, ob->ptr = newd;
/* defrag the dict tables */
defragged += dictDefragTables(ob->ptr);
return defragged;
}
long defragSet(redisDb *db, dictEntry *kde) {
long defragged = 0;
robj *ob = dictGetVal(kde);
dict *d, *newd;
serverAssert(ob->type == OBJ_SET && ob->encoding == OBJ_ENCODING_HT);
d = ob->ptr;
if (dictSize(d) > server.active_defrag_max_scan_fields)
defragLater(db, kde);
else
defragged += activeDefragSdsDict(d, DEFRAG_SDS_DICT_NO_VAL);
/* handle the dict struct */
if ((newd = activeDefragAlloc(ob->ptr)))
defragged++, ob->ptr = newd;
/* defrag the dict tables */
defragged += dictDefragTables(ob->ptr);
return defragged;
}
/* for each key we scan in the main dict, this function will attempt to defrag
* all the various pointers it has. Returns a stat of how many pointers were
* moved. */
int defragKey(redisDb *db, dictEntry *de) {
long defragKey(redisDb *db, dictEntry *de) {
sds keysds = dictGetKey(de);
robj *newob, *ob;
unsigned char *newzl;
dict *d;
dictIterator *di;
int defragged = 0;
long defragged = 0;
sds newsds;
/* Try to defrag the key name. */
@ -304,27 +625,7 @@ int defragKey(redisDb *db, dictEntry *de) {
/* Already handled in activeDefragStringOb. */
} else if (ob->type == OBJ_LIST) {
if (ob->encoding == OBJ_ENCODING_QUICKLIST) {
quicklist *ql = ob->ptr, *newql;
quicklistNode *node = ql->head, *newnode;
if ((newql = activeDefragAlloc(ql)))
defragged++, ob->ptr = ql = newql;
while (node) {
if ((newnode = activeDefragAlloc(node))) {
if (newnode->prev)
newnode->prev->next = newnode;
else
ql->head = newnode;
if (newnode->next)
newnode->next->prev = newnode;
else
ql->tail = newnode;
node = newnode;
defragged++;
}
if ((newzl = activeDefragAlloc(node->zl)))
defragged++, node->zl = newzl;
node = node->next;
}
defragged += defragQuicklist(db, de);
} else if (ob->encoding == OBJ_ENCODING_ZIPLIST) {
if ((newzl = activeDefragAlloc(ob->ptr)))
defragged++, ob->ptr = newzl;
@ -333,20 +634,10 @@ int defragKey(redisDb *db, dictEntry *de) {
}
} else if (ob->type == OBJ_SET) {
if (ob->encoding == OBJ_ENCODING_HT) {
d = ob->ptr;
di = dictGetIterator(d);
while((de = dictNext(di)) != NULL) {
sds sdsele = dictGetKey(de);
if ((newsds = activeDefragSds(sdsele)))
defragged++, de->key = newsds;
defragged += dictIterDefragEntry(di);
}
dictReleaseIterator(di);
dictDefragTables((dict**)&ob->ptr);
defragged += defragSet(db, de);
} else if (ob->encoding == OBJ_ENCODING_INTSET) {
intset *is = ob->ptr;
intset *newis = activeDefragAlloc(is);
if (newis)
intset *newis, *is = ob->ptr;
if ((newis = activeDefragAlloc(is)))
defragged++, ob->ptr = newis;
} else {
serverPanic("Unknown set encoding");
@ -356,32 +647,7 @@ int defragKey(redisDb *db, dictEntry *de) {
if ((newzl = activeDefragAlloc(ob->ptr)))
defragged++, ob->ptr = newzl;
} else if (ob->encoding == OBJ_ENCODING_SKIPLIST) {
zset *zs = (zset*)ob->ptr;
zset *newzs;
zskiplist *newzsl;
struct zskiplistNode *newheader;
if ((newzs = activeDefragAlloc(zs)))
defragged++, ob->ptr = zs = newzs;
if ((newzsl = activeDefragAlloc(zs->zsl)))
defragged++, zs->zsl = newzsl;
if ((newheader = activeDefragAlloc(zs->zsl->header)))
defragged++, zs->zsl->header = newheader;
d = zs->dict;
di = dictGetIterator(d);
while((de = dictNext(di)) != NULL) {
double* newscore;
sds sdsele = dictGetKey(de);
if ((newsds = activeDefragSds(sdsele)))
defragged++, de->key = newsds;
newscore = zslDefrag(zs->zsl, *(double*)dictGetVal(de), sdsele, newsds);
if (newscore) {
dictSetVal(d, de, newscore);
defragged++;
}
defragged += dictIterDefragEntry(di);
}
dictReleaseIterator(di);
dictDefragTables(&zs->dict);
defragged += defragZsetSkiplist(db, de);
} else {
serverPanic("Unknown sorted set encoding");
}
@ -390,19 +656,7 @@ int defragKey(redisDb *db, dictEntry *de) {
if ((newzl = activeDefragAlloc(ob->ptr)))
defragged++, ob->ptr = newzl;
} else if (ob->encoding == OBJ_ENCODING_HT) {
d = ob->ptr;
di = dictGetIterator(d);
while((de = dictNext(di)) != NULL) {
sds sdsele = dictGetKey(de);
if ((newsds = activeDefragSds(sdsele)))
defragged++, de->key = newsds;
sdsele = dictGetVal(de);
if ((newsds = activeDefragSds(sdsele)))
defragged++, de->v.val = newsds;
defragged += dictIterDefragEntry(di);
}
dictReleaseIterator(di);
dictDefragTables((dict**)&ob->ptr);
defragged += defragHash(db, de);
} else {
serverPanic("Unknown hash encoding");
}
@ -417,18 +671,19 @@ int defragKey(redisDb *db, dictEntry *de) {
/* Defrag scan callback for the main db dictionary. */
void defragScanCallback(void *privdata, const dictEntry *de) {
int defragged = defragKey((redisDb*)privdata, (dictEntry*)de);
long defragged = defragKey((redisDb*)privdata, (dictEntry*)de);
server.stat_active_defrag_hits += defragged;
if(defragged)
server.stat_active_defrag_key_hits++;
else
server.stat_active_defrag_key_misses++;
server.stat_active_defrag_scanned++;
}
/* Defrag scan callback for for each hash table bicket,
* used in order to defrag the dictEntry allocations. */
void defragDictBucketCallback(void *privdata, dictEntry **bucketref) {
UNUSED(privdata);
UNUSED(privdata); /* NOTE: this function is also used by both activeDefragCycle and scanLaterHash, etc. don't use privdata */
while(*bucketref) {
dictEntry *de = *bucketref, *newde;
if ((newde = activeDefragAlloc(de))) {
@ -439,7 +694,7 @@ void defragDictBucketCallback(void *privdata, dictEntry **bucketref) {
}
/* Utility function to get the fragmentation ratio from jemalloc.
* It is critical to do that by comparing only heap maps that belown to
* It is critical to do that by comparing only heap maps that belong to
* jemalloc, and skip ones the jemalloc keeps as spare. Since we use this
* fragmentation ratio in order to decide if a defrag action should be taken
* or not, a false detection can cause the defragmenter to waste a lot of CPU
@ -464,14 +719,147 @@ float getAllocatorFragmentation(size_t *out_frag_bytes) {
if(out_frag_bytes)
*out_frag_bytes = frag_bytes;
serverLog(LL_DEBUG,
"allocated=%zu, active=%zu, resident=%zu, frag=%.0f%% (%.0f%% rss), frag_bytes=%zu (%zu%% rss)",
"allocated=%zu, active=%zu, resident=%zu, frag=%.0f%% (%.0f%% rss), frag_bytes=%zu (%zu rss)",
allocated, active, resident, frag_pct, rss_pct, frag_bytes, rss_bytes);
return frag_pct;
}
/* We may need to defrag other globals, one small allcation can hold a full allocator run.
* so although small, it is still important to defrag these */
long defragOtherGlobals() {
long defragged = 0;
/* there are many more pointers to defrag (e.g. client argv, output / aof buffers, etc.
* but we assume most of these are short lived, we only need to defrag allocations
* that remain static for a long time */
defragged += activeDefragSdsDict(server.lua_scripts, DEFRAG_SDS_DICT_VAL_IS_STROB);
defragged += activeDefragSdsListAndDict(server.repl_scriptcache_fifo, server.repl_scriptcache_dict, DEFRAG_SDS_DICT_NO_VAL);
return defragged;
}
unsigned long defragLaterItem(dictEntry *de, unsigned long cursor) {
long defragged = 0;
if (de) {
robj *ob = dictGetVal(de);
if (ob->type == OBJ_LIST) {
defragged += scanLaterList(ob);
cursor = 0; /* list has no scan, we must finish it in one go */
} else if (ob->type == OBJ_SET) {
defragged += scanLaterSet(ob, &cursor);
} else if (ob->type == OBJ_ZSET) {
defragged += scanLaterZset(ob, &cursor);
} else if (ob->type == OBJ_HASH) {
defragged += scanLaterHash(ob, &cursor);
} else {
cursor = 0; /* object type may have changed since we schedule it for later */
}
} else {
cursor = 0; /* object may have been deleted already */
}
server.stat_active_defrag_hits += defragged;
return cursor;
}
/* returns 0 if no more work needs to be been done, and 1 if time is up and more work is needed. */
int defragLaterStep(redisDb *db, long long endtime) {
static sds current_key = NULL;
static unsigned long cursor = 0;
unsigned int iterations = 0;
unsigned long long prev_defragged = server.stat_active_defrag_hits;
unsigned long long prev_scanned = server.stat_active_defrag_scanned;
long long key_defragged;
do {
/* if we're not continuing a scan from the last call or loop, start a new one */
if (!cursor) {
listNode *head = listFirst(db->defrag_later);
/* Move on to next key */
if (current_key) {
serverAssert(current_key == head->value);
sdsfree(head->value);
listDelNode(db->defrag_later, head);
cursor = 0;
current_key = NULL;
}
/* stop if we reached the last one. */
head = listFirst(db->defrag_later);
if (!head)
return 0;
/* start a new key */
current_key = head->value;
cursor = 0;
}
/* each time we enter this function we need to fetch the key from the dict again (if it still exists) */
dictEntry *de = dictFind(db->dict, current_key);
key_defragged = server.stat_active_defrag_hits;
do {
cursor = defragLaterItem(de, cursor);
/* Once in 16 scan iterations, 512 pointer reallocations, or 64 fields
* (if we have a lot of pointers in one hash bucket, or rehashing),
* check if we reached the time limit.
* But regardless, don't start a new BIG key in this loop, this is because the
* next key can be a list, and scanLaterList must be done in once cycle */
if (!cursor || (++iterations > 16 ||
server.stat_active_defrag_hits - prev_defragged > 512 ||
server.stat_active_defrag_scanned - prev_scanned > 64)) {
if (!cursor || ustime() > endtime) {
if(key_defragged != server.stat_active_defrag_hits)
server.stat_active_defrag_key_hits++;
else
server.stat_active_defrag_key_misses++;
return 1;
}
iterations = 0;
prev_defragged = server.stat_active_defrag_hits;
prev_scanned = server.stat_active_defrag_scanned;
}
} while(cursor);
if(key_defragged != server.stat_active_defrag_hits)
server.stat_active_defrag_key_hits++;
else
server.stat_active_defrag_key_misses++;
} while(1);
}
#define INTERPOLATE(x, x1, x2, y1, y2) ( (y1) + ((x)-(x1)) * ((y2)-(y1)) / ((x2)-(x1)) )
#define LIMIT(y, min, max) ((y)<(min)? min: ((y)>(max)? max: (y)))
/* decide if defrag is needed, and at what CPU effort to invest in it */
void computeDefragCycles() {
size_t frag_bytes;
float frag_pct = getAllocatorFragmentation(&frag_bytes);
/* If we're not already running, and below the threshold, exit. */
if (!server.active_defrag_running) {
if(frag_pct < server.active_defrag_threshold_lower || frag_bytes < server.active_defrag_ignore_bytes)
return;
}
/* Calculate the adaptive aggressiveness of the defrag */
int cpu_pct = INTERPOLATE(frag_pct,
server.active_defrag_threshold_lower,
server.active_defrag_threshold_upper,
server.active_defrag_cycle_min,
server.active_defrag_cycle_max);
cpu_pct = LIMIT(cpu_pct,
server.active_defrag_cycle_min,
server.active_defrag_cycle_max);
/* We allow increasing the aggressiveness during a scan, but don't
* reduce it. */
if (!server.active_defrag_running ||
cpu_pct > server.active_defrag_running)
{
server.active_defrag_running = cpu_pct;
serverLog(LL_VERBOSE,
"Starting active defrag, frag=%.0f%%, frag_bytes=%zu, cpu=%d%%",
frag_pct, frag_bytes, cpu_pct);
}
}
/* Perform incremental defragmentation work from the serverCron.
* This works in a similar way to activeExpireCycle, in the sense that
* we do incremental work across calls. */
@ -481,8 +869,11 @@ void activeDefragCycle(void) {
static redisDb *db = NULL;
static long long start_scan, start_stat;
unsigned int iterations = 0;
unsigned long long defragged = server.stat_active_defrag_hits;
long long start, timelimit;
unsigned long long prev_defragged = server.stat_active_defrag_hits;
unsigned long long prev_scanned = server.stat_active_defrag_scanned;
long long start, timelimit, endtime;
mstime_t latency;
int quit = 0;
if (server.aof_child_pid!=-1 || server.rdb_child_pid!=-1)
return; /* Defragging memory while there's a fork will just do damage. */
@ -490,33 +881,7 @@ void activeDefragCycle(void) {
/* Once a second, check if we the fragmentation justfies starting a scan
* or making it more aggressive. */
run_with_period(1000) {
size_t frag_bytes;
float frag_pct = getAllocatorFragmentation(&frag_bytes);
/* If we're not already running, and below the threshold, exit. */
if (!server.active_defrag_running) {
if(frag_pct < server.active_defrag_threshold_lower || frag_bytes < server.active_defrag_ignore_bytes)
return;
}
/* Calculate the adaptive aggressiveness of the defrag */
int cpu_pct = INTERPOLATE(frag_pct,
server.active_defrag_threshold_lower,
server.active_defrag_threshold_upper,
server.active_defrag_cycle_min,
server.active_defrag_cycle_max);
cpu_pct = LIMIT(cpu_pct,
server.active_defrag_cycle_min,
server.active_defrag_cycle_max);
/* We allow increasing the aggressiveness during a scan, but don't
* reduce it. */
if (!server.active_defrag_running ||
cpu_pct > server.active_defrag_running)
{
server.active_defrag_running = cpu_pct;
serverLog(LL_VERBOSE,
"Starting active defrag, frag=%.0f%%, frag_bytes=%zu, cpu=%d%%",
frag_pct, frag_bytes, cpu_pct);
}
computeDefragCycles();
}
if (!server.active_defrag_running)
return;
@ -525,11 +890,23 @@ void activeDefragCycle(void) {
start = ustime();
timelimit = 1000000*server.active_defrag_running/server.hz/100;
if (timelimit <= 0) timelimit = 1;
endtime = start + timelimit;
latencyStartMonitor(latency);
do {
/* if we're not continuing a scan from the last call or loop, start a new one */
if (!cursor) {
/* finish any leftovers from previous db before moving to the next one */
if (db && defragLaterStep(db, endtime)) {
quit = 1; /* time is up, we didn't finish all the work */
break; /* this will exit the function and we'll continue on the next cycle */
}
/* Move on to next database, and stop if we reached the last one. */
if (++current_db >= server.dbnum) {
/* defrag other items not part of the db / keys */
defragOtherGlobals();
long long now = ustime();
size_t frag_bytes;
float frag_pct = getAllocatorFragmentation(&frag_bytes);
@ -542,7 +919,11 @@ void activeDefragCycle(void) {
cursor = 0;
db = NULL;
server.active_defrag_running = 0;
return;
computeDefragCycles(); /* if another scan is needed, start it right away */
if (server.active_defrag_running != 0 && ustime() < endtime)
continue;
break;
}
else if (current_db==0) {
/* Start a scan from the first database. */
@ -555,19 +936,35 @@ void activeDefragCycle(void) {
}
do {
/* before scanning the next bucket, see if we have big keys left from the previous bucket to scan */
if (defragLaterStep(db, endtime)) {
quit = 1; /* time is up, we didn't finish all the work */
break; /* this will exit the function and we'll continue on the next cycle */
}
cursor = dictScan(db->dict, cursor, defragScanCallback, defragDictBucketCallback, db);
/* Once in 16 scan iterations, or 1000 pointer reallocations
* (if we have a lot of pointers in one hash bucket), check if we
* reached the tiem limit. */
if (cursor && (++iterations > 16 || server.stat_active_defrag_hits - defragged > 1000)) {
if ((ustime() - start) > timelimit) {
return;
/* Once in 16 scan iterations, 512 pointer reallocations. or 64 keys
* (if we have a lot of pointers in one hash bucket or rehasing),
* check if we reached the time limit.
* But regardless, don't start a new db in this loop, this is because after
* the last db we call defragOtherGlobals, which must be done in once cycle */
if (!cursor || (++iterations > 16 ||
server.stat_active_defrag_hits - prev_defragged > 512 ||
server.stat_active_defrag_scanned - prev_scanned > 64)) {
if (!cursor || ustime() > endtime) {
quit = 1;
break;
}
iterations = 0;
defragged = server.stat_active_defrag_hits;
prev_defragged = server.stat_active_defrag_hits;
prev_scanned = server.stat_active_defrag_scanned;
}
} while(cursor);
} while(1);
} while(cursor && !quit);
} while(!quit);
latencyEndMonitor(latency);
latencyAddSampleIfNeeded("active-defrag-cycle",latency);
}
#else /* HAVE_DEFRAG */

View File

@ -1389,6 +1389,7 @@ void initServerConfig(void) {
server.active_defrag_threshold_upper = CONFIG_DEFAULT_DEFRAG_THRESHOLD_UPPER;
server.active_defrag_cycle_min = CONFIG_DEFAULT_DEFRAG_CYCLE_MIN;
server.active_defrag_cycle_max = CONFIG_DEFAULT_DEFRAG_CYCLE_MAX;
server.active_defrag_max_scan_fields = CONFIG_DEFAULT_DEFRAG_MAX_SCAN_FIELDS;
server.proto_max_bulk_len = CONFIG_DEFAULT_PROTO_MAX_BULK_LEN;
server.client_max_querybuf_len = PROTO_MAX_QUERYBUF_LEN;
server.saveparams = NULL;
@ -1806,6 +1807,7 @@ void resetServerStats(void) {
server.stat_active_defrag_misses = 0;
server.stat_active_defrag_key_hits = 0;
server.stat_active_defrag_key_misses = 0;
server.stat_active_defrag_scanned = 0;
server.stat_fork_time = 0;
server.stat_fork_rate = 0;
server.stat_rejected_conn = 0;
@ -1894,6 +1896,7 @@ void initServer(void) {
server.db[j].watched_keys = dictCreate(&keylistDictType,NULL);
server.db[j].id = j;
server.db[j].avg_ttl = 0;
server.db[j].defrag_later = listCreate();
}
evictionPoolAlloc(); /* Initialize the LRU keys pool. */
server.pubsub_channels = dictCreate(&keylistDictType,NULL);

View File

@ -159,8 +159,9 @@ typedef long long mstime_t; /* millisecond time type. */
#define CONFIG_DEFAULT_DEFRAG_THRESHOLD_LOWER 10 /* don't defrag when fragmentation is below 10% */
#define CONFIG_DEFAULT_DEFRAG_THRESHOLD_UPPER 100 /* maximum defrag force at 100% fragmentation */
#define CONFIG_DEFAULT_DEFRAG_IGNORE_BYTES (100<<20) /* don't defrag if frag overhead is below 100mb */
#define CONFIG_DEFAULT_DEFRAG_CYCLE_MIN 25 /* 25% CPU min (at lower threshold) */
#define CONFIG_DEFAULT_DEFRAG_CYCLE_MIN 5 /* 5% CPU min (at lower threshold) */
#define CONFIG_DEFAULT_DEFRAG_CYCLE_MAX 75 /* 75% CPU max (at upper threshold) */
#define CONFIG_DEFAULT_DEFRAG_MAX_SCAN_FIELDS 1000 /* keys with more than 1000 fields will be processed separately */
#define CONFIG_DEFAULT_PROTO_MAX_BULK_LEN (512ll*1024*1024) /* Bulk request max size */
#define ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP 20 /* Loopkups per loop. */
@ -622,6 +623,7 @@ typedef struct redisDb {
dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */
int id; /* Database ID */
long long avg_ttl; /* Average TTL, just for stats */
list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */
} redisDb;
/* Client MULTI/EXEC state */
@ -957,6 +959,7 @@ struct redisServer {
long long stat_active_defrag_misses; /* number of allocations scanned but not moved */
long long stat_active_defrag_key_hits; /* number of keys with moved allocations */
long long stat_active_defrag_key_misses;/* number of keys scanned and not moved */
long long stat_active_defrag_scanned; /* number of dictEntries scanned */
size_t stat_peak_memory; /* Max used memory record */
long long stat_fork_time; /* Time needed to perform latest fork() */
double stat_fork_rate; /* Fork rate in GB/sec. */
@ -992,6 +995,7 @@ struct redisServer {
int active_defrag_threshold_upper; /* maximum percentage of fragmentation at which we use maximum effort */
int active_defrag_cycle_min; /* minimal effort for defrag in CPU percentage */
int active_defrag_cycle_max; /* maximal effort for defrag in CPU percentage */
unsigned long active_defrag_max_scan_fields; /* maximum number of fields of set/hash/zset/list to process from within the main dict scan */
size_t client_max_querybuf_len; /* Limit for client query buffer length */
int dbnum; /* Total number of configured DBs */
int supervised; /* 1 if supervised, 0 otherwise. */