diskstore removed
This commit is contained in:
parent
c1c9d551da
commit
c9d0c3623a
26
redis.conf
26
redis.conf
@ -312,32 +312,6 @@ no-appendfsync-on-rewrite no
|
||||
auto-aof-rewrite-percentage 100
|
||||
auto-aof-rewrite-min-size 64mb
|
||||
|
||||
#################################### DISK STORE ###############################
|
||||
|
||||
# When disk store is active Redis works as an on-disk database, where memory
|
||||
# is only used as a object cache.
|
||||
#
|
||||
# This mode is good for datasets that are bigger than memory, and in general
|
||||
# when you want to trade speed for:
|
||||
#
|
||||
# - less memory used
|
||||
# - immediate server restart
|
||||
# - per key durability, without need for backgrond savig
|
||||
#
|
||||
# On the other hand, with disk store enabled MULTI/EXEC are no longer
|
||||
# transactional from the point of view of the persistence on disk, that is,
|
||||
# Redis transactions will still guarantee that commands are either processed
|
||||
# all or nothing, but there is no guarantee that all the keys are flushed
|
||||
# on disk in an atomic way.
|
||||
#
|
||||
# Of course with disk store enabled Redis is not as fast as it is when
|
||||
# working with just the memory back end.
|
||||
|
||||
diskstore-enabled no
|
||||
diskstore-path redis.ds
|
||||
cache-max-memory 0
|
||||
cache-flush-delay 0
|
||||
|
||||
############################### ADVANCED CONFIG ###############################
|
||||
|
||||
# Hashes are encoded in a special way (much more memory efficient) when they
|
||||
|
@ -61,7 +61,7 @@ QUIET_CC = @printf ' %b %b\n' $(CCCOLOR)CC$(ENDCOLOR) $(SRCCOLOR)$@$(ENDCOLOR
|
||||
QUIET_LINK = @printf ' %b %b\n' $(LINKCOLOR)LINK$(ENDCOLOR) $(BINCOLOR)$@$(ENDCOLOR);
|
||||
endif
|
||||
|
||||
OBJ = adlist.o ae.o anet.o dict.o redis.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o dscache.o pubsub.o multi.o debug.o sort.o intset.o syncio.o diskstore.o cluster.o crc16.o endian.o
|
||||
OBJ = adlist.o ae.o anet.o dict.o redis.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endian.o
|
||||
BENCHOBJ = ae.o anet.o redis-benchmark.o sds.o adlist.o zmalloc.o
|
||||
CLIOBJ = anet.o sds.o adlist.o redis-cli.o zmalloc.o release.o
|
||||
CHECKDUMPOBJ = redis-check-dump.o lzf_c.o lzf_d.o
|
||||
|
@ -574,10 +574,6 @@ int rewriteAppendOnlyFileBackground(void) {
|
||||
long long start;
|
||||
|
||||
if (server.bgrewritechildpid != -1) return REDIS_ERR;
|
||||
if (server.ds_enabled != 0) {
|
||||
redisLog(REDIS_WARNING,"BGREWRITEAOF called with diskstore enabled: AOF is not supported when diskstore is enabled. Operation not performed.");
|
||||
return REDIS_ERR;
|
||||
}
|
||||
start = ustime();
|
||||
if ((childpid = fork()) == 0) {
|
||||
char tmpfile[256];
|
||||
|
12
src/config.c
12
src/config.c
@ -251,18 +251,6 @@ void loadServerConfig(char *filename) {
|
||||
} else if (!strcasecmp(argv[0],"dbfilename") && argc == 2) {
|
||||
zfree(server.dbfilename);
|
||||
server.dbfilename = zstrdup(argv[1]);
|
||||
} else if (!strcasecmp(argv[0],"diskstore-enabled") && argc == 2) {
|
||||
if ((server.ds_enabled = yesnotoi(argv[1])) == -1) {
|
||||
err = "argument must be 'yes' or 'no'"; goto loaderr;
|
||||
}
|
||||
} else if (!strcasecmp(argv[0],"diskstore-path") && argc == 2) {
|
||||
sdsfree(server.ds_path);
|
||||
server.ds_path = sdsnew(argv[1]);
|
||||
} else if (!strcasecmp(argv[0],"cache-max-memory") && argc == 2) {
|
||||
server.cache_max_memory = memtoll(argv[1],NULL);
|
||||
} else if (!strcasecmp(argv[0],"cache-flush-delay") && argc == 2) {
|
||||
server.cache_flush_delay = atoi(argv[1]);
|
||||
if (server.cache_flush_delay < 0) server.cache_flush_delay = 0;
|
||||
} else if (!strcasecmp(argv[0],"hash-max-zipmap-entries") && argc == 2) {
|
||||
server.hash_max_zipmap_entries = memtoll(argv[1], NULL);
|
||||
} else if (!strcasecmp(argv[0],"hash-max-zipmap-value") && argc == 2) {
|
||||
|
82
src/db.c
82
src/db.c
@ -31,13 +31,6 @@ void SlotToKeyDel(robj *key);
|
||||
* the disk object. If it is in this state, we wait.
|
||||
*/
|
||||
|
||||
void lookupWaitBusyKey(redisDb *db, robj *key) {
|
||||
/* FIXME: wait just for this key, not everything */
|
||||
waitEmptyIOJobsQueue();
|
||||
processAllPendingIOJobs();
|
||||
redisAssert((cacheScheduleIOGetFlags(db,key) & REDIS_IO_SAVEINPROG) == 0);
|
||||
}
|
||||
|
||||
robj *lookupKey(redisDb *db, robj *key) {
|
||||
dictEntry *de = dictFind(db->dict,key->ptr);
|
||||
if (de) {
|
||||
@ -48,52 +41,9 @@ robj *lookupKey(redisDb *db, robj *key) {
|
||||
* a copy on write madness. */
|
||||
if (server.bgsavechildpid == -1 && server.bgrewritechildpid == -1)
|
||||
val->lru = server.lruclock;
|
||||
|
||||
if (server.ds_enabled &&
|
||||
cacheScheduleIOGetFlags(db,key) & REDIS_IO_SAVEINPROG)
|
||||
{
|
||||
/* Need to wait for the key to get unbusy */
|
||||
redisLog(REDIS_DEBUG,"Lookup found a key in SAVEINPROG state. Waiting. (Key was in the cache)");
|
||||
lookupWaitBusyKey(db,key);
|
||||
}
|
||||
server.stat_keyspace_hits++;
|
||||
return val;
|
||||
} else {
|
||||
time_t expire;
|
||||
robj *val;
|
||||
|
||||
/* Key not found in the in memory hash table, but if disk store is
|
||||
* enabled we may have this key on disk. If so load it in memory
|
||||
* in a blocking way. */
|
||||
if (server.ds_enabled && cacheKeyMayExist(db,key)) {
|
||||
long flags = cacheScheduleIOGetFlags(db,key);
|
||||
|
||||
/* They key is not in cache, but it has a SAVE op in queue?
|
||||
* The only possibility is that the key was deleted, since
|
||||
* dirty keys are not evicted. */
|
||||
if (flags & REDIS_IO_SAVE) {
|
||||
server.stat_keyspace_misses++;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/* At this point we need to blocking load the key in memory.
|
||||
* The first thing we do is waiting here if the key is busy. */
|
||||
if (flags & REDIS_IO_SAVEINPROG) {
|
||||
redisLog(REDIS_DEBUG,"Lookup found a key in SAVEINPROG state. Waiting (while force loading).");
|
||||
lookupWaitBusyKey(db,key);
|
||||
}
|
||||
|
||||
redisLog(REDIS_DEBUG,"Force loading key %s via lookup", key->ptr);
|
||||
val = dsGet(db,key,&expire);
|
||||
if (val) {
|
||||
dbAdd(db,key,val);
|
||||
if (expire != -1) setExpire(db,key,expire);
|
||||
server.stat_keyspace_hits++;
|
||||
return val;
|
||||
} else {
|
||||
cacheSetKeyDoesNotExist(db,key);
|
||||
}
|
||||
}
|
||||
server.stat_keyspace_misses++;
|
||||
return NULL;
|
||||
}
|
||||
@ -130,7 +80,6 @@ void dbAdd(redisDb *db, robj *key, robj *val) {
|
||||
int retval = dictAdd(db->dict, copy, val);
|
||||
|
||||
redisAssert(retval == REDIS_OK);
|
||||
if (server.ds_enabled) cacheSetKeyMayExist(db,key);
|
||||
if (server.cluster_enabled) SlotToKeyAdd(key);
|
||||
}
|
||||
|
||||
@ -144,7 +93,6 @@ void dbOverwrite(redisDb *db, robj *key, robj *val) {
|
||||
|
||||
redisAssert(de != NULL);
|
||||
dictReplace(db->dict, key->ptr, val);
|
||||
if (server.ds_enabled) cacheSetKeyMayExist(db,key);
|
||||
}
|
||||
|
||||
/* High level Set operation. This function can be used in order to set
|
||||
@ -196,14 +144,6 @@ robj *dbRandomKey(redisDb *db) {
|
||||
|
||||
/* Delete a key, value, and associated expiration entry if any, from the DB */
|
||||
int dbDelete(redisDb *db, robj *key) {
|
||||
/* If diskstore is enabled make sure to awake waiting clients for this key
|
||||
* as it is not really useful to wait for a key already deleted to be
|
||||
* loaded from disk. */
|
||||
if (server.ds_enabled) {
|
||||
handleClientsBlockedOnSwappedKey(db,key);
|
||||
cacheSetKeyDoesNotExist(db,key);
|
||||
}
|
||||
|
||||
/* Deleting an entry from the expires dict will not free the sds of
|
||||
* the key, because it is shared with the main dictionary. */
|
||||
if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
|
||||
@ -225,7 +165,6 @@ long long emptyDb() {
|
||||
removed += dictSize(server.db[j].dict);
|
||||
dictEmpty(server.db[j].dict);
|
||||
dictEmpty(server.db[j].expires);
|
||||
if (server.ds_enabled) dictEmpty(server.db[j].io_negcache);
|
||||
}
|
||||
return removed;
|
||||
}
|
||||
@ -248,8 +187,6 @@ int selectDb(redisClient *c, int id) {
|
||||
|
||||
void signalModifiedKey(redisDb *db, robj *key) {
|
||||
touchWatchedKey(db,key);
|
||||
if (server.ds_enabled)
|
||||
cacheScheduleIO(db,key,REDIS_IO_SAVE);
|
||||
}
|
||||
|
||||
void signalFlushedDb(int dbid) {
|
||||
@ -265,7 +202,6 @@ void flushdbCommand(redisClient *c) {
|
||||
signalFlushedDb(c->db->id);
|
||||
dictEmpty(c->db->dict);
|
||||
dictEmpty(c->db->expires);
|
||||
if (server.ds_enabled) dsFlushDb(c->db->id);
|
||||
addReply(c,shared.ok);
|
||||
}
|
||||
|
||||
@ -277,10 +213,7 @@ void flushallCommand(redisClient *c) {
|
||||
kill(server.bgsavechildpid,SIGKILL);
|
||||
rdbRemoveTempFile(server.bgsavechildpid);
|
||||
}
|
||||
if (server.ds_enabled)
|
||||
dsFlushDb(-1);
|
||||
else
|
||||
rdbSave(server.dbfilename);
|
||||
rdbSave(server.dbfilename);
|
||||
server.dirty++;
|
||||
}
|
||||
|
||||
@ -288,22 +221,10 @@ void delCommand(redisClient *c) {
|
||||
int deleted = 0, j;
|
||||
|
||||
for (j = 1; j < c->argc; j++) {
|
||||
if (server.ds_enabled) {
|
||||
lookupKeyRead(c->db,c->argv[j]);
|
||||
/* FIXME: this can be optimized a lot, no real need to load
|
||||
* a possibly huge value. */
|
||||
}
|
||||
if (dbDelete(c->db,c->argv[j])) {
|
||||
signalModifiedKey(c->db,c->argv[j]);
|
||||
server.dirty++;
|
||||
deleted++;
|
||||
} else if (server.ds_enabled) {
|
||||
if (cacheKeyMayExist(c->db,c->argv[j]) &&
|
||||
dsExists(c->db,c->argv[j]))
|
||||
{
|
||||
cacheScheduleIO(c->db,c->argv[j],REDIS_IO_SAVE);
|
||||
deleted = 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
addReplyLongLong(c,deleted);
|
||||
@ -618,7 +539,6 @@ void expireatCommand(redisClient *c) {
|
||||
void ttlCommand(redisClient *c) {
|
||||
time_t expire, ttl = -1;
|
||||
|
||||
if (server.ds_enabled) lookupKeyRead(c->db,c->argv[1]);
|
||||
expire = getExpire(c->db,c->argv[1]);
|
||||
if (expire != -1) {
|
||||
ttl = (expire-time(NULL));
|
||||
|
20
src/debug.c
20
src/debug.c
@ -212,26 +212,7 @@ void computeDatasetDigest(unsigned char *final) {
|
||||
void debugCommand(redisClient *c) {
|
||||
if (!strcasecmp(c->argv[1]->ptr,"segfault")) {
|
||||
*((char*)-1) = 'x';
|
||||
} else if (!strcasecmp(c->argv[1]->ptr,"flushcache")) {
|
||||
if (!server.ds_enabled) {
|
||||
addReplyError(c, "DEBUG FLUSHCACHE called with diskstore off.");
|
||||
return;
|
||||
} else if (server.bgsavethread != (pthread_t) -1) {
|
||||
addReplyError(c, "Can't flush cache while BGSAVE is in progress.");
|
||||
return;
|
||||
} else {
|
||||
/* To flush the whole cache we need to wait for everything to
|
||||
* be flushed on disk... */
|
||||
cacheForcePointInTime();
|
||||
emptyDb();
|
||||
addReply(c,shared.ok);
|
||||
return;
|
||||
}
|
||||
} else if (!strcasecmp(c->argv[1]->ptr,"reload")) {
|
||||
if (server.ds_enabled) {
|
||||
addReply(c,shared.ok);
|
||||
return;
|
||||
}
|
||||
if (rdbSave(server.dbfilename) != REDIS_OK) {
|
||||
addReply(c,shared.err);
|
||||
return;
|
||||
@ -256,7 +237,6 @@ void debugCommand(redisClient *c) {
|
||||
robj *val;
|
||||
char *strenc;
|
||||
|
||||
if (server.ds_enabled) lookupKeyRead(c->db,c->argv[2]);
|
||||
if ((de = dictFind(c->db->dict,c->argv[2]->ptr)) == NULL) {
|
||||
addReply(c,shared.nokeyerr);
|
||||
return;
|
||||
|
509
src/diskstore.c
509
src/diskstore.c
@ -1,509 +0,0 @@
|
||||
/* diskstore.c implements a very simple disk backed key-value store used
|
||||
* by Redis for the "disk" backend. This implementation uses the filesystem
|
||||
* to store key/value pairs. Every file represents a given key.
|
||||
*
|
||||
* The key path is calculated using the SHA1 of the key name. For instance
|
||||
* the key "foo" is stored as a file name called:
|
||||
*
|
||||
* /0b/ee/0beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33
|
||||
*
|
||||
* The couples of characters from the hex output of SHA1 are also used
|
||||
* to locate two two levels of directories to store the file (as most
|
||||
* filesystems are not able to handle too many files in a single dir).
|
||||
*
|
||||
* In the end there are 65536 final directories (256 directories inside
|
||||
* every 256 top level directories), so that with 1 billion of files every
|
||||
* directory will contain in the average 15258 entires, that is ok with
|
||||
* most filesystems implementation.
|
||||
*
|
||||
* Note that since Redis supports multiple databases, the actual key name
|
||||
* is:
|
||||
*
|
||||
* /0b/ee/<dbid>_0beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33
|
||||
*
|
||||
* so for instance if the key is inside DB 0:
|
||||
*
|
||||
* /0b/ee/0_0beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33
|
||||
*
|
||||
* The actaul implementation of this disk store is highly dependant to the
|
||||
* filesystem implementation itself. This implementation may be replaced by
|
||||
* a B+TREE implementation in future implementations.
|
||||
*
|
||||
* Data ok every key is serialized using the same format used for .rdb
|
||||
* serialization. Everything is serialized on every entry: key name,
|
||||
* ttl information in case of keys with an associated expire time, and the
|
||||
* serialized value itself.
|
||||
*
|
||||
* Because the format is the same of the .rdb files it is trivial to create
|
||||
* an .rdb file starting from this format just by mean of scanning the
|
||||
* directories and concatenating entries, with the sole addition of an
|
||||
* .rdb header at the start and the end-of-db opcode at the end.
|
||||
*
|
||||
* -------------------------------------------------------------------------
|
||||
*
|
||||
* Copyright (c) 2010-2011, Salvatore Sanfilippo <antirez at gmail dot com>
|
||||
* All rights reserved.
|
||||
*
|
||||
* Redistribution and use in source and binary forms, with or without
|
||||
* modification, are permitted provided that the following conditions are met:
|
||||
*
|
||||
* * Redistributions of source code must retain the above copyright notice,
|
||||
* this list of conditions and the following disclaimer.
|
||||
* * Redistributions in binary form must reproduce the above copyright
|
||||
* notice, this list of conditions and the following disclaimer in the
|
||||
* documentation and/or other materials provided with the distribution.
|
||||
* * Neither the name of Redis nor the names of its contributors may be used
|
||||
* to endorse or promote products derived from this software without
|
||||
* specific prior written permission.
|
||||
*
|
||||
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
|
||||
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
||||
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
|
||||
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
|
||||
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
|
||||
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
|
||||
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
|
||||
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
|
||||
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
|
||||
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
||||
* POSSIBILITY OF SUCH DAMAGE.
|
||||
*/
|
||||
|
||||
#include "redis.h"
|
||||
#include "sha1.h"
|
||||
|
||||
#include <fcntl.h>
|
||||
#include <sys/stat.h>
|
||||
#include <dirent.h>
|
||||
|
||||
int create256dir(char *prefix) {
|
||||
char buf[1024];
|
||||
int j;
|
||||
|
||||
for (j = 0; j < 256; j++) {
|
||||
snprintf(buf,sizeof(buf),"%s%02x",prefix,j);
|
||||
if (mkdir(buf,0755) == -1) {
|
||||
redisLog(REDIS_WARNING,"Error creating dir %s for diskstore: %s",
|
||||
buf,strerror(errno));
|
||||
return REDIS_ERR;
|
||||
}
|
||||
}
|
||||
return REDIS_OK;
|
||||
}
|
||||
|
||||
int dsOpen(void) {
|
||||
struct stat sb;
|
||||
int retval, j;
|
||||
char *path = server.ds_path;
|
||||
char buf[1024];
|
||||
|
||||
if ((retval = stat(path,&sb) == -1) && errno != ENOENT) {
|
||||
redisLog(REDIS_WARNING, "Error opening disk store at %s: %s",
|
||||
path, strerror(errno));
|
||||
return REDIS_ERR;
|
||||
}
|
||||
|
||||
/* Directory already in place. Assume everything is ok. */
|
||||
if (retval == 0 && S_ISDIR(sb.st_mode)) {
|
||||
redisLog(REDIS_NOTICE,"Disk store %s exists", path);
|
||||
return REDIS_OK;
|
||||
}
|
||||
|
||||
/* File exists but it's not a directory */
|
||||
if (retval == 0 && !S_ISDIR(sb.st_mode)) {
|
||||
redisLog(REDIS_WARNING,"Disk store at %s is not a directory", path);
|
||||
return REDIS_ERR;
|
||||
}
|
||||
|
||||
/* New disk store, create the directory structure now, as creating
|
||||
* them in a lazy way is not a good idea, after very few insertions
|
||||
* we'll need most of the 65536 directories anyway. */
|
||||
redisLog(REDIS_NOTICE,"Disk store %s does not exist: creating", path);
|
||||
if (mkdir(path,0755) == -1) {
|
||||
redisLog(REDIS_WARNING,"Disk store init failed creating dir %s: %s",
|
||||
path, strerror(errno));
|
||||
return REDIS_ERR;
|
||||
}
|
||||
/* Create the top level 256 directories */
|
||||
snprintf(buf,sizeof(buf),"%s/",path);
|
||||
if (create256dir(buf) == REDIS_ERR) return REDIS_ERR;
|
||||
|
||||
/* For every 256 top level dir, create 256 nested dirs */
|
||||
for (j = 0; j < 256; j++) {
|
||||
snprintf(buf,sizeof(buf),"%s/%02x/",path,j);
|
||||
if (create256dir(buf) == REDIS_ERR) return REDIS_ERR;
|
||||
}
|
||||
return REDIS_OK;
|
||||
}
|
||||
|
||||
int dsClose(void) {
|
||||
return REDIS_OK;
|
||||
}
|
||||
|
||||
/* Convert key into full path for this object. Dirty but hopefully
|
||||
* is fast enough. Returns the length of the returned path. */
|
||||
int dsKeyToPath(redisDb *db, char *buf, robj *key) {
|
||||
SHA1_CTX ctx;
|
||||
unsigned char hash[20];
|
||||
char hex[40], digits[] = "0123456789abcdef";
|
||||
int j, l;
|
||||
char *origbuf = buf;
|
||||
|
||||
SHA1Init(&ctx);
|
||||
SHA1Update(&ctx,key->ptr,sdslen(key->ptr));
|
||||
SHA1Final(hash,&ctx);
|
||||
|
||||
/* Convert the hash into hex format */
|
||||
for (j = 0; j < 20; j++) {
|
||||
hex[j*2] = digits[(hash[j]&0xF0)>>4];
|
||||
hex[(j*2)+1] = digits[hash[j]&0x0F];
|
||||
}
|
||||
|
||||
/* Create the object path. Start with server.ds_path that's the root dir */
|
||||
l = sdslen(server.ds_path);
|
||||
memcpy(buf,server.ds_path,l);
|
||||
buf += l;
|
||||
*buf++ = '/';
|
||||
|
||||
/* Then add xx/yy/ that is the two level directories */
|
||||
buf[0] = hex[0];
|
||||
buf[1] = hex[1];
|
||||
buf[2] = '/';
|
||||
buf[3] = hex[2];
|
||||
buf[4] = hex[3];
|
||||
buf[5] = '/';
|
||||
buf += 6;
|
||||
|
||||
/* Add the database number followed by _ and finall the SHA1 hex */
|
||||
l = ll2string(buf,64,db->id);
|
||||
buf += l;
|
||||
buf[0] = '_';
|
||||
memcpy(buf+1,hex,40);
|
||||
buf[41] = '\0';
|
||||
return (buf-origbuf)+41;
|
||||
}
|
||||
|
||||
int dsSet(redisDb *db, robj *key, robj *val, time_t expire) {
|
||||
char buf[1024], buf2[1024];
|
||||
FILE *fp;
|
||||
int retval, len;
|
||||
|
||||
len = dsKeyToPath(db,buf,key);
|
||||
memcpy(buf2,buf,len);
|
||||
snprintf(buf2+len,sizeof(buf2)-len,"-%ld-%ld",(long)time(NULL),(long)val);
|
||||
while ((fp = fopen(buf2,"w")) == NULL) {
|
||||
if (errno == ENOSPC) {
|
||||
redisLog(REDIS_WARNING,"Diskstore: No space left on device. Please make room and wait 30 seconds for Redis to continue.");
|
||||
sleep(30);
|
||||
} else {
|
||||
redisLog(REDIS_WARNING,"diskstore error opening %s: %s",
|
||||
buf2, strerror(errno));
|
||||
redisPanic("Unrecoverable diskstore error. Exiting.");
|
||||
}
|
||||
}
|
||||
if ((retval = rdbSaveKeyValuePair(fp,key,val,expire,time(NULL))) == -1)
|
||||
return REDIS_ERR;
|
||||
fclose(fp);
|
||||
if (retval == 0) {
|
||||
/* Expired key. Unlink failing not critical */
|
||||
unlink(buf);
|
||||
unlink(buf2);
|
||||
} else {
|
||||
/* Use rename for atomic updadte of value */
|
||||
if (rename(buf2,buf) == -1) {
|
||||
redisLog(REDIS_WARNING,"rename(2) returned an error: %s",
|
||||
strerror(errno));
|
||||
redisPanic("Unrecoverable diskstore error. Exiting.");
|
||||
}
|
||||
}
|
||||
return REDIS_OK;
|
||||
}
|
||||
|
||||
robj *dsGet(redisDb *db, robj *key, time_t *expire) {
|
||||
char buf[1024];
|
||||
int type;
|
||||
time_t expiretime = -1; /* -1 means: no expire */
|
||||
robj *dskey; /* Key as loaded from disk. */
|
||||
robj *val;
|
||||
FILE *fp;
|
||||
|
||||
dsKeyToPath(db,buf,key);
|
||||
fp = fopen(buf,"r");
|
||||
if (fp == NULL && errno == ENOENT) return NULL; /* No such key */
|
||||
if (fp == NULL) {
|
||||
redisLog(REDIS_WARNING,"Disk store failed opening %s: %s",
|
||||
buf, strerror(errno));
|
||||
goto readerr;
|
||||
}
|
||||
|
||||
if ((type = rdbLoadType(fp)) == -1) goto readerr;
|
||||
if (type == REDIS_EXPIRETIME) {
|
||||
if ((expiretime = rdbLoadTime(fp)) == -1) goto readerr;
|
||||
/* We read the time so we need to read the object type again */
|
||||
if ((type = rdbLoadType(fp)) == -1) goto readerr;
|
||||
}
|
||||
/* Read key */
|
||||
if ((dskey = rdbLoadStringObject(fp)) == NULL) goto readerr;
|
||||
/* Read value */
|
||||
if ((val = rdbLoadObject(type,fp)) == NULL) goto readerr;
|
||||
fclose(fp);
|
||||
|
||||
/* The key we asked, and the key returned, must be the same */
|
||||
redisAssert(equalStringObjects(key,dskey));
|
||||
|
||||
/* Check if the key already expired */
|
||||
decrRefCount(dskey);
|
||||
if (expiretime != -1 && expiretime < time(NULL)) {
|
||||
decrRefCount(val);
|
||||
unlink(buf); /* This failing is non critical here */
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/* Everything ok... */
|
||||
*expire = expiretime;
|
||||
return val;
|
||||
|
||||
readerr:
|
||||
redisLog(REDIS_WARNING,"Read error reading reading %s. Corrupted key?",
|
||||
buf);
|
||||
redisPanic("Unrecoverable error reading from disk store");
|
||||
return NULL; /* unreached */
|
||||
}
|
||||
|
||||
int dsDel(redisDb *db, robj *key) {
|
||||
char buf[1024];
|
||||
|
||||
dsKeyToPath(db,buf,key);
|
||||
if (unlink(buf) == -1) {
|
||||
if (errno == ENOENT) {
|
||||
return REDIS_ERR;
|
||||
} else {
|
||||
redisLog(REDIS_WARNING,"Disk store can't remove %s: %s",
|
||||
buf, strerror(errno));
|
||||
redisPanic("Unrecoverable Disk store errore. Existing.");
|
||||
return REDIS_ERR; /* unreached */
|
||||
}
|
||||
} else {
|
||||
return REDIS_OK;
|
||||
}
|
||||
}
|
||||
|
||||
int dsExists(redisDb *db, robj *key) {
|
||||
char buf[1024];
|
||||
|
||||
dsKeyToPath(db,buf,key);
|
||||
return access(buf,R_OK) == 0;
|
||||
}
|
||||
|
||||
int dsGetDbidFromFilename(char *path) {
|
||||
char id[64];
|
||||
char *p = strchr(path,'_');
|
||||
int len = (p - path);
|
||||
|
||||
redisAssert(p != NULL && len < 64);
|
||||
memcpy(id,path,len);
|
||||
id[len] = '\0';
|
||||
return atoi(id);
|
||||
}
|
||||
|
||||
void dsFlushOneDir(char *path, int dbid) {
|
||||
DIR *dir;
|
||||
struct dirent *dp, de;
|
||||
|
||||
dir = opendir(path);
|
||||
if (dir == NULL) {
|
||||
redisLog(REDIS_WARNING,"Disk store can't open dir %s: %s",
|
||||
path, strerror(errno));
|
||||
redisPanic("Unrecoverable Disk store errore. Existing.");
|
||||
}
|
||||
while(1) {
|
||||
char buf[1024];
|
||||
|
||||
readdir_r(dir,&de,&dp);
|
||||
if (dp == NULL) break;
|
||||
if (dp->d_name[0] == '.') continue;
|
||||
|
||||
/* Check if we need to remove this entry accordingly to the
|
||||
* DB number. */
|
||||
if (dbid != -1 && dsGetDbidFromFilename(dp->d_name)) continue;
|
||||
|
||||
/* Finally unlink the file */
|
||||
snprintf(buf,1024,"%s/%s",path,dp->d_name);
|
||||
if (unlink(buf) == -1) {
|
||||
redisLog(REDIS_WARNING,
|
||||
"Can't unlink %s: %s", buf, strerror(errno));
|
||||
redisPanic("Unrecoverable Disk store errore. Existing.");
|
||||
}
|
||||
}
|
||||
closedir(dir);
|
||||
}
|
||||
|
||||
void dsFlushDb(int dbid) {
|
||||
char buf[1024];
|
||||
int j, i;
|
||||
|
||||
redisLog(REDIS_NOTICE,"Flushing diskstore DB (%d)",dbid);
|
||||
for (j = 0; j < 256; j++) {
|
||||
for (i = 0; i < 256; i++) {
|
||||
snprintf(buf,1024,"%s/%02x/%02x",server.ds_path,j,i);
|
||||
dsFlushOneDir(buf,dbid);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void dsRdbSaveSetState(int state) {
|
||||
pthread_mutex_lock(&server.bgsavethread_mutex);
|
||||
server.bgsavethread_state = state;
|
||||
pthread_mutex_unlock(&server.bgsavethread_mutex);
|
||||
}
|
||||
|
||||
void *dsRdbSave_thread(void *arg) {
|
||||
char tmpfile[256], *filename = (char*)arg;
|
||||
struct dirent *dp, de;
|
||||
int j, i, last_dbid = -1;
|
||||
FILE *fp;
|
||||
|
||||
/* Change state to ACTIVE, to signal there is a saving thead working. */
|
||||
redisLog(REDIS_NOTICE,"Diskstore BGSAVE thread started");
|
||||
dsRdbSaveSetState(REDIS_BGSAVE_THREAD_ACTIVE);
|
||||
|
||||
snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
|
||||
fp = fopen(tmpfile,"w");
|
||||
if (!fp) {
|
||||
redisLog(REDIS_WARNING, "Failed opening .rdb for saving: %s",
|
||||
strerror(errno));
|
||||
dsRdbSaveSetState(REDIS_BGSAVE_THREAD_DONE_ERR);
|
||||
return NULL;
|
||||
}
|
||||
if (fwrite("REDIS0001",9,1,fp) == 0) goto werr;
|
||||
|
||||
sleep(5);
|
||||
|
||||
/* Scan all diskstore dirs looking for keys */
|
||||
for (j = 0; j < 256; j++) {
|
||||
for (i = 0; i < 256; i++) {
|
||||
DIR *dir;
|
||||
char buf[1024];
|
||||
|
||||
/* For every directory, collect all the keys */
|
||||
snprintf(buf,sizeof(buf),"%s/%02x/%02x",server.ds_path,j,i);
|
||||
if ((dir = opendir(buf)) == NULL) {
|
||||
redisLog(REDIS_WARNING,"Disk store can't open dir %s: %s",
|
||||
buf, strerror(errno));
|
||||
goto werr;
|
||||
}
|
||||
|
||||
while(1) {
|
||||
char buf[1024];
|
||||
int dbid;
|
||||
FILE *entryfp;
|
||||
|
||||
readdir_r(dir,&de,&dp);
|
||||
if (dp == NULL) break;
|
||||
if (dp->d_name[0] == '.') continue;
|
||||
/* If there is a '-' char in the file name, it's a temp file */
|
||||
if (strchr(dp->d_name,'-') != NULL) continue;
|
||||
|
||||
/* Emit the SELECT DB opcode if needed. */
|
||||
dbid = dsGetDbidFromFilename(dp->d_name);
|
||||
if (dbid != last_dbid) {
|
||||
last_dbid = dbid;
|
||||
if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr;
|
||||
if (rdbSaveLen(fp,dbid) == -1) goto werr;
|
||||
}
|
||||
|
||||
/* Let's copy this file into the target .rdb */
|
||||
snprintf(buf,sizeof(buf),"%s/%02x/%02x/%s",
|
||||
server.ds_path,j,i,dp->d_name);
|
||||
if ((entryfp = fopen(buf,"r")) == NULL) {
|
||||
redisLog(REDIS_WARNING,"Can't open %s: %s",
|
||||
buf,strerror(errno));
|
||||
closedir(dir);
|
||||
goto werr;
|
||||
}
|
||||
while(1) {
|
||||
int nread = fread(buf,1,sizeof(buf),entryfp);
|
||||
|
||||
if (nread == 0) {
|
||||
if (ferror(entryfp)) {
|
||||
redisLog(REDIS_WARNING,"Error reading from file entry while performing BGSAVE for diskstore: %s", strerror(errno));
|
||||
closedir(dir);
|
||||
goto werr;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (fwrite(buf,1,nread,fp) != (unsigned)nread) {
|
||||
closedir(dir);
|
||||
goto werr;
|
||||
}
|
||||
}
|
||||
fclose(entryfp);
|
||||
}
|
||||
closedir(dir);
|
||||
}
|
||||
}
|
||||
|
||||
/* Output the end of file opcode */
|
||||
if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr;
|
||||
|
||||
/* Make sure data will not remain on the OS's output buffers */
|
||||
fflush(fp);
|
||||
fsync(fileno(fp));
|
||||
fclose(fp);
|
||||
zfree(filename);
|
||||
|
||||
/* Use RENAME to make sure the DB file is changed atomically only
|
||||
* if the generate DB file is ok. */
|
||||
if (rename(tmpfile,filename) == -1) {
|
||||
redisLog(REDIS_WARNING,"Error moving temp DB file on the final destination: %s (diskstore)", strerror(errno));
|
||||
unlink(tmpfile);
|
||||
dsRdbSaveSetState(REDIS_BGSAVE_THREAD_DONE_ERR);
|
||||
return NULL;
|
||||
}
|
||||
redisLog(REDIS_NOTICE,"DB saved on disk by diskstore thread");
|
||||
dsRdbSaveSetState(REDIS_BGSAVE_THREAD_DONE_OK);
|
||||
return NULL;
|
||||
|
||||
werr:
|
||||
zfree(filename);
|
||||
fclose(fp);
|
||||
unlink(tmpfile);
|
||||
dsRdbSaveSetState(REDIS_BGSAVE_THREAD_DONE_ERR);
|
||||
redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int dsRdbSaveBackground(char *filename) {
|
||||
pthread_t thread;
|
||||
|
||||
if (pthread_create(&thread,NULL,dsRdbSave_thread,zstrdup(filename)) != 0) {
|
||||
redisLog(REDIS_WARNING,"Can't create diskstore BGSAVE thread: %s",
|
||||
strerror(errno));
|
||||
return REDIS_ERR;
|
||||
} else {
|
||||
server.bgsavethread = thread;
|
||||
return REDIS_OK;
|
||||
}
|
||||
}
|
||||
|
||||
int dsRdbSave(char *filename) {
|
||||
/* A blocking save is actually a non blocking save... just we wait
|
||||
* for it to terminate in a non-busy loop. */
|
||||
|
||||
redisLog(REDIS_NOTICE,"Starting a blocking SAVE (BGSAVE + blocking wait)");
|
||||
server.dirty_before_bgsave = server.dirty;
|
||||
if (dsRdbSaveBackground(filename) == REDIS_ERR) return REDIS_ERR;
|
||||
while(1) {
|
||||
usleep(1000);
|
||||
int state;
|
||||
|
||||
pthread_mutex_lock(&server.bgsavethread_mutex);
|
||||
state = server.bgsavethread_state;
|
||||
pthread_mutex_unlock(&server.bgsavethread_mutex);
|
||||
|
||||
if (state == REDIS_BGSAVE_THREAD_DONE_OK ||
|
||||
state == REDIS_BGSAVE_THREAD_DONE_ERR) break;
|
||||
}
|
||||
return REDIS_OK;
|
||||
}
|
1028
src/dscache.c
1028
src/dscache.c
File diff suppressed because it is too large
Load Diff
@ -487,25 +487,6 @@ void freeClient(redisClient *c) {
|
||||
redisAssert(ln != NULL);
|
||||
listDelNode(server.unblocked_clients,ln);
|
||||
}
|
||||
/* Remove from the list of clients waiting for swapped keys, or ready
|
||||
* to be restarted, but not yet woken up again. */
|
||||
if (c->flags & REDIS_IO_WAIT) {
|
||||
redisAssert(server.ds_enabled);
|
||||
if (listLength(c->io_keys) == 0) {
|
||||
ln = listSearchKey(server.io_ready_clients,c);
|
||||
|
||||
/* When this client is waiting to be woken up (REDIS_IO_WAIT),
|
||||
* it should be present in the list io_ready_clients */
|
||||
redisAssert(ln != NULL);
|
||||
listDelNode(server.io_ready_clients,ln);
|
||||
} else {
|
||||
while (listLength(c->io_keys)) {
|
||||
ln = listFirst(c->io_keys);
|
||||
dontWaitForSwappedKey(c,ln->value);
|
||||
}
|
||||
}
|
||||
server.cache_blocked_clients--;
|
||||
}
|
||||
listRelease(c->io_keys);
|
||||
/* Master/slave cleanup.
|
||||
* Case 1: we lost the connection with a slave. */
|
||||
@ -796,9 +777,6 @@ int processMultibulkBuffer(redisClient *c) {
|
||||
void processInputBuffer(redisClient *c) {
|
||||
/* Keep processing while there is something in the input buffer */
|
||||
while(sdslen(c->querybuf)) {
|
||||
/* Immediately abort if the client is in the middle of something. */
|
||||
if (c->flags & REDIS_BLOCKED || c->flags & REDIS_IO_WAIT) return;
|
||||
|
||||
/* REDIS_CLOSE_AFTER_REPLY closes the connection once the reply is
|
||||
* written to the client. Make sure to not let the reply grow after
|
||||
* this flag has been set (i.e. don't process more commands). */
|
||||
@ -907,7 +885,6 @@ void clientCommand(redisClient *c) {
|
||||
if (p == flags) *p++ = 'N';
|
||||
if (client->flags & REDIS_MULTI) *p++ = 'x';
|
||||
if (client->flags & REDIS_BLOCKED) *p++ = 'b';
|
||||
if (client->flags & REDIS_IO_WAIT) *p++ = 'i';
|
||||
if (client->flags & REDIS_DIRTY_CAS) *p++ = 'd';
|
||||
if (client->flags & REDIS_CLOSE_AFTER_REPLY) *p++ = 'c';
|
||||
if (client->flags & REDIS_UNBLOCKED) *p++ = 'u';
|
||||
|
10
src/object.c
10
src/object.c
@ -1,5 +1,4 @@
|
||||
#include "redis.h"
|
||||
#include <pthread.h>
|
||||
#include <math.h>
|
||||
|
||||
robj *createObject(int type, void *ptr) {
|
||||
@ -30,9 +29,7 @@ robj *createStringObject(char *ptr, size_t len) {
|
||||
|
||||
robj *createStringObjectFromLongLong(long long value) {
|
||||
robj *o;
|
||||
if (value >= 0 && value < REDIS_SHARED_INTEGERS &&
|
||||
!server.ds_enabled &&
|
||||
pthread_equal(pthread_self(),server.mainthread)) {
|
||||
if (value >= 0 && value < REDIS_SHARED_INTEGERS) {
|
||||
incrRefCount(shared.integers[value]);
|
||||
o = shared.integers[value];
|
||||
} else {
|
||||
@ -241,10 +238,7 @@ robj *tryObjectEncoding(robj *o) {
|
||||
* Note that we also avoid using shared integers when maxmemory is used
|
||||
* because every object needs to have a private LRU field for the LRU
|
||||
* algorithm to work well. */
|
||||
if (!server.ds_enabled &&
|
||||
server.maxmemory == 0 && value >= 0 && value < REDIS_SHARED_INTEGERS &&
|
||||
pthread_equal(pthread_self(),server.mainthread))
|
||||
{
|
||||
if (server.maxmemory == 0 && value >= 0 && value < REDIS_SHARED_INTEGERS) {
|
||||
decrRefCount(o);
|
||||
incrRefCount(shared.integers[value]);
|
||||
return shared.integers[value];
|
||||
|
19
src/rdb.c
19
src/rdb.c
@ -413,11 +413,6 @@ int rdbSave(char *filename) {
|
||||
int j;
|
||||
time_t now = time(NULL);
|
||||
|
||||
if (server.ds_enabled) {
|
||||
cacheForcePointInTime();
|
||||
return dsRdbSave(filename);
|
||||
}
|
||||
|
||||
snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
|
||||
fp = fopen(tmpfile,"w");
|
||||
if (!fp) {
|
||||
@ -484,16 +479,10 @@ int rdbSaveBackground(char *filename) {
|
||||
pid_t childpid;
|
||||
long long start;
|
||||
|
||||
if (server.bgsavechildpid != -1 ||
|
||||
server.bgsavethread != (pthread_t) -1) return REDIS_ERR;
|
||||
if (server.bgsavechildpid != -1) return REDIS_ERR;
|
||||
|
||||
server.dirty_before_bgsave = server.dirty;
|
||||
|
||||
if (server.ds_enabled) {
|
||||
cacheForcePointInTime();
|
||||
return dsRdbSaveBackground(filename);
|
||||
}
|
||||
|
||||
start = ustime();
|
||||
if ((childpid = fork()) == 0) {
|
||||
int retval;
|
||||
@ -1013,15 +1002,13 @@ void backgroundSaveDoneHandler(int exitcode, int bysignal) {
|
||||
rdbRemoveTempFile(server.bgsavechildpid);
|
||||
}
|
||||
server.bgsavechildpid = -1;
|
||||
server.bgsavethread = (pthread_t) -1;
|
||||
server.bgsavethread_state = REDIS_BGSAVE_THREAD_UNACTIVE;
|
||||
/* Possibly there are slaves waiting for a BGSAVE in order to be served
|
||||
* (the first stage of SYNC is a bulk transfer of dump.rdb) */
|
||||
updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
|
||||
}
|
||||
|
||||
void saveCommand(redisClient *c) {
|
||||
if (server.bgsavechildpid != -1 || server.bgsavethread != (pthread_t)-1) {
|
||||
if (server.bgsavechildpid != -1) {
|
||||
addReplyError(c,"Background save already in progress");
|
||||
return;
|
||||
}
|
||||
@ -1033,7 +1020,7 @@ void saveCommand(redisClient *c) {
|
||||
}
|
||||
|
||||
void bgsaveCommand(redisClient *c) {
|
||||
if (server.bgsavechildpid != -1 || server.bgsavethread != (pthread_t)-1) {
|
||||
if (server.bgsavechildpid != -1) {
|
||||
addReplyError(c,"Background save already in progress");
|
||||
} else if (server.bgrewritechildpid != -1) {
|
||||
addReplyError(c,"Can't BGSAVE while AOF log rewriting is in progress");
|
||||
|
104
src/redis.c
104
src/redis.c
@ -50,7 +50,6 @@
|
||||
#include <limits.h>
|
||||
#include <float.h>
|
||||
#include <math.h>
|
||||
#include <pthread.h>
|
||||
#include <sys/resource.h>
|
||||
|
||||
/* Our shared "common" objects */
|
||||
@ -659,22 +658,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
|
||||
}
|
||||
updateDictResizePolicy();
|
||||
}
|
||||
} else if (server.bgsavethread != (pthread_t) -1) {
|
||||
if (server.bgsavethread != (pthread_t) -1) {
|
||||
int state;
|
||||
|
||||
pthread_mutex_lock(&server.bgsavethread_mutex);
|
||||
state = server.bgsavethread_state;
|
||||
pthread_mutex_unlock(&server.bgsavethread_mutex);
|
||||
|
||||
if (state == REDIS_BGSAVE_THREAD_DONE_OK ||
|
||||
state == REDIS_BGSAVE_THREAD_DONE_ERR)
|
||||
{
|
||||
backgroundSaveDoneHandler(
|
||||
(state == REDIS_BGSAVE_THREAD_DONE_OK) ? 0 : 1, 0);
|
||||
}
|
||||
}
|
||||
} else if (!server.ds_enabled) {
|
||||
} else {
|
||||
time_t now = time(NULL);
|
||||
|
||||
/* If there is not a background saving/rewrite in progress check if
|
||||
@ -712,10 +696,6 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
|
||||
* in order to guarantee a strict consistency. */
|
||||
if (server.masterhost == NULL) activeExpireCycle();
|
||||
|
||||
/* Remove a few cached objects from memory if we are over the
|
||||
* configured memory limit */
|
||||
if (server.ds_enabled) cacheCron();
|
||||
|
||||
/* Replication cron function -- used to reconnect to master and
|
||||
* to detect transfer failures. */
|
||||
if (!(loops % 10)) replicationCron();
|
||||
@ -735,31 +715,6 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
|
||||
listNode *ln;
|
||||
redisClient *c;
|
||||
|
||||
/* Awake clients that got all the on disk keys they requested */
|
||||
if (server.ds_enabled && listLength(server.io_ready_clients)) {
|
||||
listIter li;
|
||||
|
||||
listRewind(server.io_ready_clients,&li);
|
||||
while((ln = listNext(&li))) {
|
||||
c = ln->value;
|
||||
struct redisCommand *cmd;
|
||||
|
||||
/* Resume the client. */
|
||||
listDelNode(server.io_ready_clients,ln);
|
||||
c->flags &= (~REDIS_IO_WAIT);
|
||||
server.cache_blocked_clients--;
|
||||
aeCreateFileEvent(server.el, c->fd, AE_READABLE,
|
||||
readQueryFromClient, c);
|
||||
cmd = lookupCommand(c->argv[0]->ptr);
|
||||
redisAssert(cmd != NULL);
|
||||
call(c,cmd);
|
||||
resetClient(c);
|
||||
/* There may be more data to process in the input buffer. */
|
||||
if (c->querybuf && sdslen(c->querybuf) > 0)
|
||||
processInputBuffer(c);
|
||||
}
|
||||
}
|
||||
|
||||
/* Try to process pending commands for clients that were just unblocked. */
|
||||
while (listLength(server.unblocked_clients)) {
|
||||
ln = listFirst(server.unblocked_clients);
|
||||
@ -870,10 +825,6 @@ void initServerConfig() {
|
||||
server.maxmemory = 0;
|
||||
server.maxmemory_policy = REDIS_MAXMEMORY_VOLATILE_LRU;
|
||||
server.maxmemory_samples = 3;
|
||||
server.ds_enabled = 0;
|
||||
server.ds_path = sdsnew("/tmp/redis.ds");
|
||||
server.cache_max_memory = 64LL*1024*1024; /* 64 MB of RAM */
|
||||
server.cache_blocked_clients = 0;
|
||||
server.hash_max_zipmap_entries = REDIS_HASH_MAX_ZIPMAP_ENTRIES;
|
||||
server.hash_max_zipmap_value = REDIS_HASH_MAX_ZIPMAP_VALUE;
|
||||
server.list_max_ziplist_entries = REDIS_LIST_MAX_ZIPLIST_ENTRIES;
|
||||
@ -882,7 +833,6 @@ void initServerConfig() {
|
||||
server.zset_max_ziplist_entries = REDIS_ZSET_MAX_ZIPLIST_ENTRIES;
|
||||
server.zset_max_ziplist_value = REDIS_ZSET_MAX_ZIPLIST_VALUE;
|
||||
server.shutdown_asap = 0;
|
||||
server.cache_flush_delay = 0;
|
||||
server.cluster_enabled = 0;
|
||||
server.cluster.configfile = zstrdup("nodes.conf");
|
||||
|
||||
@ -930,12 +880,10 @@ void initServer() {
|
||||
server.syslog_facility);
|
||||
}
|
||||
|
||||
server.mainthread = pthread_self();
|
||||
server.clients = listCreate();
|
||||
server.slaves = listCreate();
|
||||
server.monitors = listCreate();
|
||||
server.unblocked_clients = listCreate();
|
||||
server.cache_io_queue = listCreate();
|
||||
|
||||
createSharedObjects();
|
||||
server.el = aeCreateEventLoop();
|
||||
@ -965,11 +913,6 @@ void initServer() {
|
||||
server.db[j].expires = dictCreate(&keyptrDictType,NULL);
|
||||
server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL);
|
||||
server.db[j].watched_keys = dictCreate(&keylistDictType,NULL);
|
||||
if (server.ds_enabled) {
|
||||
server.db[j].io_keys = dictCreate(&keylistDictType,NULL);
|
||||
server.db[j].io_negcache = dictCreate(&setDictType,NULL);
|
||||
server.db[j].io_queued = dictCreate(&setDictType,NULL);
|
||||
}
|
||||
server.db[j].id = j;
|
||||
}
|
||||
server.pubsub_channels = dictCreate(&keylistDictType,NULL);
|
||||
@ -979,8 +922,6 @@ void initServer() {
|
||||
server.cronloops = 0;
|
||||
server.bgsavechildpid = -1;
|
||||
server.bgrewritechildpid = -1;
|
||||
server.bgsavethread_state = REDIS_BGSAVE_THREAD_UNACTIVE;
|
||||
server.bgsavethread = (pthread_t) -1;
|
||||
server.bgrewritebuf = sdsempty();
|
||||
server.aofbuf = sdsempty();
|
||||
server.lastsave = time(NULL);
|
||||
@ -1010,7 +951,6 @@ void initServer() {
|
||||
}
|
||||
}
|
||||
|
||||
if (server.ds_enabled) dsInit();
|
||||
if (server.cluster_enabled) clusterInit();
|
||||
srand(time(NULL)^getpid());
|
||||
}
|
||||
@ -1188,8 +1128,6 @@ int processCommand(redisClient *c) {
|
||||
queueMultiCommand(c,cmd);
|
||||
addReply(c,shared.queued);
|
||||
} else {
|
||||
if (server.ds_enabled && blockClientOnSwappedKeys(c,cmd))
|
||||
return REDIS_ERR;
|
||||
call(c,cmd);
|
||||
}
|
||||
return REDIS_OK;
|
||||
@ -1207,9 +1145,7 @@ int prepareForShutdown() {
|
||||
kill(server.bgsavechildpid,SIGKILL);
|
||||
rdbRemoveTempFile(server.bgsavechildpid);
|
||||
}
|
||||
if (server.ds_enabled) {
|
||||
/* FIXME: flush all objects on disk */
|
||||
} else if (server.appendonly) {
|
||||
if (server.appendonly) {
|
||||
/* Append only file: fsync() the AOF and exit */
|
||||
aof_fsync(server.appendfd);
|
||||
} else if (server.saveparamslen > 0) {
|
||||
@ -1391,8 +1327,7 @@ sds genRedisInfoString(char *section) {
|
||||
server.loading,
|
||||
server.appendonly,
|
||||
server.dirty,
|
||||
server.bgsavechildpid != -1 ||
|
||||
server.bgsavethread != (pthread_t) -1,
|
||||
server.bgsavechildpid != -1,
|
||||
server.lastsave,
|
||||
server.bgrewritechildpid != -1);
|
||||
|
||||
@ -1438,35 +1373,6 @@ sds genRedisInfoString(char *section) {
|
||||
}
|
||||
}
|
||||
|
||||
/* Diskstore */
|
||||
if (allsections || defsections || !strcasecmp(section,"diskstore")) {
|
||||
if (sections++) info = sdscat(info,"\r\n");
|
||||
info = sdscatprintf(info,
|
||||
"# Diskstore\r\n"
|
||||
"ds_enabled:%d\r\n",
|
||||
server.ds_enabled != 0);
|
||||
if (server.ds_enabled) {
|
||||
lockThreadedIO();
|
||||
info = sdscatprintf(info,
|
||||
"cache_max_memory:%llu\r\n"
|
||||
"cache_blocked_clients:%lu\r\n"
|
||||
"cache_io_queue_len:%lu\r\n"
|
||||
"cache_io_jobs_new:%lu\r\n"
|
||||
"cache_io_jobs_processing:%lu\r\n"
|
||||
"cache_io_jobs_processed:%lu\r\n"
|
||||
"cache_io_ready_clients:%lu\r\n"
|
||||
,(unsigned long long) server.cache_max_memory,
|
||||
(unsigned long) server.cache_blocked_clients,
|
||||
(unsigned long) listLength(server.cache_io_queue),
|
||||
(unsigned long) listLength(server.io_newjobs),
|
||||
(unsigned long) listLength(server.io_processing),
|
||||
(unsigned long) listLength(server.io_processed),
|
||||
(unsigned long) listLength(server.io_ready_clients)
|
||||
);
|
||||
unlockThreadedIO();
|
||||
}
|
||||
}
|
||||
|
||||
/* Stats */
|
||||
if (allsections || defsections || !strcasecmp(section,"stats")) {
|
||||
if (sections++) info = sdscat(info,"\r\n");
|
||||
@ -1824,9 +1730,7 @@ int main(int argc, char **argv) {
|
||||
linuxOvercommitMemoryWarning();
|
||||
#endif
|
||||
start = ustime();
|
||||
if (server.ds_enabled) {
|
||||
redisLog(REDIS_NOTICE,"DB not loaded (running with disk back end)");
|
||||
} else if (server.appendonly) {
|
||||
if (server.appendonly) {
|
||||
if (loadAppendOnlyFile(server.appendfilename) == REDIS_OK)
|
||||
redisLog(REDIS_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000);
|
||||
} else {
|
||||
|
91
src/redis.h
91
src/redis.h
@ -124,26 +124,12 @@
|
||||
#define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
|
||||
#define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
|
||||
|
||||
/* Scheduled IO opeations flags. */
|
||||
#define REDIS_IO_LOAD 1
|
||||
#define REDIS_IO_SAVE 2
|
||||
#define REDIS_IO_LOADINPROG 4
|
||||
#define REDIS_IO_SAVEINPROG 8
|
||||
|
||||
/* Generic IO flags */
|
||||
#define REDIS_IO_ONLYLOADS 1
|
||||
#define REDIS_IO_ASAP 2
|
||||
|
||||
#define REDIS_MAX_COMPLETED_JOBS_PROCESSED 1
|
||||
#define REDIS_THREAD_STACK_SIZE (1024*1024*4)
|
||||
|
||||
/* Client flags */
|
||||
#define REDIS_SLAVE 1 /* This client is a slave server */
|
||||
#define REDIS_MASTER 2 /* This client is a master server */
|
||||
#define REDIS_MONITOR 4 /* This client is a slave monitor, see MONITOR */
|
||||
#define REDIS_MULTI 8 /* This client is in a MULTI context */
|
||||
#define REDIS_BLOCKED 16 /* The client is waiting in a blocking operation */
|
||||
#define REDIS_IO_WAIT 32 /* The client is waiting for Virtual Memory I/O */
|
||||
#define REDIS_DIRTY_CAS 64 /* Watched keys modified. EXEC will fail. */
|
||||
#define REDIS_CLOSE_AFTER_REPLY 128 /* Close after writing entire reply. */
|
||||
#define REDIS_UNBLOCKED 256 /* This client was unblocked and is stored in
|
||||
@ -222,12 +208,6 @@
|
||||
#define REDIS_MAXMEMORY_ALLKEYS_RANDOM 4
|
||||
#define REDIS_MAXMEMORY_NO_EVICTION 5
|
||||
|
||||
/* Diskstore background saving thread states */
|
||||
#define REDIS_BGSAVE_THREAD_UNACTIVE 0
|
||||
#define REDIS_BGSAVE_THREAD_ACTIVE 1
|
||||
#define REDIS_BGSAVE_THREAD_DONE_OK 2
|
||||
#define REDIS_BGSAVE_THREAD_DONE_ERR 3
|
||||
|
||||
/* We can print the stacktrace, so our assert is defined this way: */
|
||||
#define redisAssert(_e) ((_e)?(void)0 : (_redisAssert(#_e,__FILE__,__LINE__),_exit(1)))
|
||||
#define redisPanic(_e) _redisPanic(#_e,__FILE__,__LINE__),_exit(1)
|
||||
@ -292,9 +272,6 @@ typedef struct redisDb {
|
||||
dict *dict; /* The keyspace for this DB */
|
||||
dict *expires; /* Timeout of keys with a timeout set */
|
||||
dict *blocking_keys; /* Keys with clients waiting for data (BLPOP) */
|
||||
dict *io_keys; /* Keys with clients waiting for DS I/O */
|
||||
dict *io_negcache; /* Negative caching for disk store */
|
||||
dict *io_queued; /* Queued IO operations hash table */
|
||||
dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */
|
||||
int id;
|
||||
} redisDb;
|
||||
@ -516,7 +493,6 @@ typedef struct {
|
||||
|
||||
struct redisServer {
|
||||
/* General */
|
||||
pthread_t mainthread;
|
||||
redisDb *db;
|
||||
dict *commands; /* Command table hahs table */
|
||||
aeEventLoop *el;
|
||||
@ -574,9 +550,6 @@ struct redisServer {
|
||||
char *pidfile;
|
||||
pid_t bgsavechildpid;
|
||||
pid_t bgrewritechildpid;
|
||||
int bgsavethread_state;
|
||||
pthread_mutex_t bgsavethread_mutex;
|
||||
pthread_t bgsavethread;
|
||||
sds bgrewritebuf; /* buffer taken by parent during oppend only rewrite */
|
||||
sds aofbuf; /* AOF buffer, written before entering the event loop */
|
||||
struct saveparam *saveparams;
|
||||
@ -612,19 +585,12 @@ struct redisServer {
|
||||
int maxmemory_samples;
|
||||
/* Blocked clients */
|
||||
unsigned int bpop_blocked_clients;
|
||||
unsigned int cache_blocked_clients;
|
||||
list *unblocked_clients; /* list of clients to unblock before next loop */
|
||||
list *cache_io_queue; /* IO operations queue */
|
||||
int cache_flush_delay; /* seconds to wait before flushing keys */
|
||||
/* Sort parameters - qsort_r() is only available under BSD so we
|
||||
* have to take this state global, in order to pass it to sortCompare() */
|
||||
int sort_desc;
|
||||
int sort_alpha;
|
||||
int sort_bypattern;
|
||||
/* Virtual memory configuration */
|
||||
int ds_enabled; /* backend disk in redis.conf */
|
||||
char *ds_path; /* location of the disk store on disk */
|
||||
unsigned long long cache_max_memory;
|
||||
/* Zip structure config */
|
||||
size_t hash_max_zipmap_entries;
|
||||
size_t hash_max_zipmap_value;
|
||||
@ -682,7 +648,7 @@ struct redisCommand {
|
||||
int arity;
|
||||
int flags;
|
||||
/* Use a function to determine keys arguments in a command line.
|
||||
* Used both for diskstore preloading and Redis Cluster. */
|
||||
* Used for Redis Cluster redirect. */
|
||||
redisGetKeysProc *getkeys_proc;
|
||||
/* What keys should be loaded in background when calling this command? */
|
||||
int firstkey; /* The first argument that's a key (0 = no keys) */
|
||||
@ -709,27 +675,6 @@ typedef struct _redisSortOperation {
|
||||
robj *pattern;
|
||||
} redisSortOperation;
|
||||
|
||||
/* DIsk store threaded I/O request message */
|
||||
#define REDIS_IOJOB_LOAD 0
|
||||
#define REDIS_IOJOB_SAVE 1
|
||||
|
||||
typedef struct iojob {
|
||||
int type; /* Request type, REDIS_IOJOB_* */
|
||||
redisDb *db;/* Redis database */
|
||||
robj *key; /* This I/O request is about this key */
|
||||
robj *val; /* the value to swap for REDIS_IOJOB_SAVE, otherwise this
|
||||
* field is populated by the I/O thread for REDIS_IOJOB_LOAD. */
|
||||
time_t expire; /* Expire time for this key on REDIS_IOJOB_LOAD */
|
||||
} iojob;
|
||||
|
||||
/* IO operations scheduled -- check dscache.c for more info */
|
||||
typedef struct ioop {
|
||||
int type;
|
||||
redisDb *db;
|
||||
robj *key;
|
||||
time_t ctime; /* This is the creation time of the entry. */
|
||||
} ioop;
|
||||
|
||||
/* Structure to hold list iteration abstraction. */
|
||||
typedef struct {
|
||||
robj *subject;
|
||||
@ -973,40 +918,6 @@ void oom(const char *msg);
|
||||
void populateCommandTable(void);
|
||||
void resetCommandTableStats(void);
|
||||
|
||||
/* Disk store */
|
||||
int dsOpen(void);
|
||||
int dsClose(void);
|
||||
int dsSet(redisDb *db, robj *key, robj *val, time_t expire);
|
||||
robj *dsGet(redisDb *db, robj *key, time_t *expire);
|
||||
int dsDel(redisDb *db, robj *key);
|
||||
int dsExists(redisDb *db, robj *key);
|
||||
void dsFlushDb(int dbid);
|
||||
int dsRdbSaveBackground(char *filename);
|
||||
int dsRdbSave(char *filename);
|
||||
|
||||
/* Disk Store Cache */
|
||||
void dsInit(void);
|
||||
void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata, int mask);
|
||||
void lockThreadedIO(void);
|
||||
void unlockThreadedIO(void);
|
||||
void freeIOJob(iojob *j);
|
||||
void queueIOJob(iojob *j);
|
||||
void waitEmptyIOJobsQueue(void);
|
||||
void processAllPendingIOJobs(void);
|
||||
int blockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd);
|
||||
int dontWaitForSwappedKey(redisClient *c, robj *key);
|
||||
void handleClientsBlockedOnSwappedKey(redisDb *db, robj *key);
|
||||
int cacheFreeOneEntry(void);
|
||||
void cacheScheduleIOAddFlag(redisDb *db, robj *key, long flag);
|
||||
void cacheScheduleIODelFlag(redisDb *db, robj *key, long flag);
|
||||
int cacheScheduleIOGetFlags(redisDb *db, robj *key);
|
||||
void cacheScheduleIO(redisDb *db, robj *key, int type);
|
||||
void cacheCron(void);
|
||||
int cacheKeyMayExist(redisDb *db, robj *key);
|
||||
void cacheSetKeyMayExist(redisDb *db, robj *key);
|
||||
void cacheSetKeyDoesNotExist(redisDb *db, robj *key);
|
||||
void cacheForcePointInTime(void);
|
||||
|
||||
/* Set data type */
|
||||
robj *setTypeCreate(robj *value);
|
||||
int setTypeAdd(robj *subject, robj *value);
|
||||
|
@ -291,32 +291,6 @@ appendfsync everysec
|
||||
# "no" that is the safest pick from the point of view of durability.
|
||||
no-appendfsync-on-rewrite no
|
||||
|
||||
#################################### DISK STORE ###############################
|
||||
|
||||
# When disk store is active Redis works as an on-disk database, where memory
|
||||
# is only used as a object cache.
|
||||
#
|
||||
# This mode is good for datasets that are bigger than memory, and in general
|
||||
# when you want to trade speed for:
|
||||
#
|
||||
# - less memory used
|
||||
# - immediate server restart
|
||||
# - per key durability, without need for backgrond savig
|
||||
#
|
||||
# On the other hand, with disk store enabled MULTI/EXEC are no longer
|
||||
# transactional from the point of view of the persistence on disk, that is,
|
||||
# Redis transactions will still guarantee that commands are either processed
|
||||
# all or nothing, but there is no guarantee that all the keys are flushed
|
||||
# on disk in an atomic way.
|
||||
#
|
||||
# Of course with disk store enabled Redis is not as fast as it is when
|
||||
# working with just the memory back end.
|
||||
|
||||
diskstore-enabled no
|
||||
diskstore-path redis.ds
|
||||
cache-max-memory 0
|
||||
cache-flush-delay 0
|
||||
|
||||
############################### ADVANCED CONFIG ###############################
|
||||
|
||||
# Hashes are encoded in a special way (much more memory efficient) when they
|
||||
|
Loading…
Reference in New Issue
Block a user