unstable merge conflicts resolved

This commit is contained in:
antirez
2011-06-25 12:29:24 +02:00
110 changed files with 40790 additions and 2138 deletions

View File

@@ -5,35 +5,45 @@
release_hdr := $(shell sh -c './mkreleasehdr.sh')
uname_S := $(shell sh -c 'uname -s 2>/dev/null || echo not')
OPTIMIZATION?=-O2
ifeq ($(uname_S),Linux)
ifneq ($(FORCE_LIBC_MALLOC),yes)
USE_JEMALLOC=yes
endif
endif
ifeq ($(uname_S),SunOS)
CFLAGS?= -std=c99 -pedantic $(OPTIMIZATION) -Wall -W -D__EXTENSIONS__ -D_XPG6
CCLINK?= -ldl -lnsl -lsocket -lm -lpthread
DEBUG?= -g -ggdb
CFLAGS?=-std=c99 -pedantic $(OPTIMIZATION) -Wall -W -D__EXTENSIONS__ -D_XPG6
CCLINK?=-ldl -lnsl -lsocket -lm -lpthread
DEBUG?=-g -ggdb
else
CFLAGS?= -std=c99 -pedantic $(OPTIMIZATION) -Wall -W $(ARCH) $(PROF)
CCLINK?= -lm -pthread
DEBUG?= -g -rdynamic -ggdb
CFLAGS?=-std=c99 -pedantic $(OPTIMIZATION) -Wall -W $(ARCH) $(PROF)
CCLINK?=-lm -pthread
DEBUG?=-g -rdynamic -ggdb
endif
ifeq ($(USE_TCMALLOC),yes)
ALLOD_DEPS=
ALLOC_LINK=-ltcmalloc
ALLOC_FLAGS=-DUSE_TCMALLOC
endif
ifeq ($(USE_TCMALLOC_MINIMAL),yes)
ALLOD_DEPS=
ALLOC_LINK=-ltcmalloc_minimal
ALLOC_FLAGS=-DUSE_TCMALLOC
endif
ifeq ($(USE_JEMALLOC),yes)
ALLOC_LINK=-ljemalloc
ALLOC_FLAGS=-DUSE_JEMALLOC
ALLOC_DEP=../deps/jemalloc/lib/libjemalloc.a
ALLOC_LINK=$(ALLOC_DEP) -ldl
ALLOC_FLAGS=-DUSE_JEMALLOC -I../deps/jemalloc/include
endif
CCLINK+= $(ALLOC_LINK)
CFLAGS+= $(ALLOC_FLAGS)
CCOPT= $(CFLAGS) $(CCLINK) $(ARCH) $(PROF)
CCOPT= $(CFLAGS) $(ARCH) $(PROF)
PREFIX= /usr/local
INSTALL_BIN= $(PREFIX)/bin
@@ -51,7 +61,7 @@ QUIET_CC = @printf ' %b %b\n' $(CCCOLOR)CC$(ENDCOLOR) $(SRCCOLOR)$@$(ENDCOLOR
QUIET_LINK = @printf ' %b %b\n' $(LINKCOLOR)LINK$(ENDCOLOR) $(BINCOLOR)$@$(ENDCOLOR);
endif
OBJ = adlist.o ae.o anet.o dict.o redis.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o dscache.o pubsub.o multi.o debug.o sort.o intset.o syncio.o diskstore.o cluster.o crc16.o endian.o scripting.o
OBJ = adlist.o ae.o anet.o dict.o redis.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endian.o scripting.o
BENCHOBJ = ae.o anet.o redis-benchmark.o sds.o adlist.o zmalloc.o
CLIOBJ = anet.o sds.o adlist.o redis-cli.o zmalloc.o release.o
CHECKDUMPOBJ = redis-check-dump.o lzf_c.o lzf_d.o
@@ -76,32 +86,38 @@ ae_kqueue.o: ae_kqueue.c
ae_select.o: ae_select.c
anet.o: anet.c fmacros.h anet.h
aof.o: aof.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \
zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h
zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h
cluster.o: cluster.c redis.h fmacros.h config.h ae.h sds.h dict.h \
adlist.h zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h
config.o: config.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \
zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h
zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h
crc16.o: crc16.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \
zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h
db.o: db.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \
zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h
zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h
debug.o: debug.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \
zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h sha1.h
zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h sha1.h
dict.o: dict.c fmacros.h dict.h zmalloc.h
diskstore.o: diskstore.c redis.h fmacros.h config.h ae.h sds.h dict.h \
adlist.h zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h
adlist.h zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h \
sha1.h
dscache.o: dscache.c redis.h fmacros.h config.h ae.h sds.h dict.h \
adlist.h zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h
intset.o: intset.c intset.h zmalloc.h
adlist.h zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h
endian.o: endian.c
intset.o: intset.c intset.h zmalloc.h endian.h
lzf_c.o: lzf_c.c lzfP.h
lzf_d.o: lzf_d.c lzfP.h
multi.o: multi.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \
zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h
zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h
networking.o: networking.c redis.h fmacros.h config.h ae.h sds.h dict.h \
adlist.h zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h
adlist.h zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h
object.o: object.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \
zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h
zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h
pqsort.o: pqsort.c
pubsub.o: pubsub.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \
zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h
zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h
rdb.o: rdb.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \
zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h lzf.h
zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h lzf.h
redis-benchmark.o: redis-benchmark.c fmacros.h ae.h \
../deps/hiredis/hiredis.h sds.h adlist.h zmalloc.h
redis-check-aof.o: redis-check-aof.c fmacros.h config.h
@@ -109,32 +125,33 @@ redis-check-dump.o: redis-check-dump.c lzf.h
redis-cli.o: redis-cli.c fmacros.h version.h ../deps/hiredis/hiredis.h \
sds.h zmalloc.h ../deps/linenoise/linenoise.h help.h
redis.o: redis.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \
zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h asciilogo.h
zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h \
asciilogo.h
release.o: release.c release.h
replication.o: replication.c redis.h fmacros.h config.h ae.h sds.h dict.h \
adlist.h zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h
adlist.h zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h
sds.o: sds.c sds.h zmalloc.h
sha1.o: sha1.c sha1.h
sha1.o: sha1.c sha1.h config.h
sort.o: sort.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \
zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h pqsort.h
zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h pqsort.h
syncio.o: syncio.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \
zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h
zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h
t_hash.o: t_hash.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \
zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h
zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h
t_list.o: t_list.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \
zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h
zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h
t_set.o: t_set.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \
zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h
zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h
t_string.o: t_string.c redis.h fmacros.h config.h ae.h sds.h dict.h \
adlist.h zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h
adlist.h zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h
t_zset.o: t_zset.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \
zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h
util.o: util.c util.h
cluster.o: redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \
zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h
ziplist.o: ziplist.c zmalloc.h ziplist.h
zipmap.o: zipmap.c zmalloc.h
zmalloc.o: zmalloc.c config.h
zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h
util.o: util.c fmacros.h util.h
ziplist.o: ziplist.c zmalloc.h util.h ziplist.h endian.h
zipmap.o: zipmap.c zmalloc.h endian.h
zmalloc.o: zmalloc.c config.h zmalloc.h
.PHONY: dependencies
dependencies:
@printf '%b %b\n' $(MAKECOLOR)MAKE$(ENDCOLOR) $(BINCOLOR)hiredis$(ENDCOLOR)
@@ -144,29 +161,34 @@ dependencies:
@echo $(MAKECOLOR)MAKE$(ENDCOLOR) $(BINCOLOR)Lua ansi$(ENDCOLOR)
@cd ../deps/lua && $(MAKE) ARCH="$(ARCH)" ansi
../deps/jemalloc/lib/libjemalloc.a:
cd ../deps/jemalloc && ./configure $(JEMALLOC_CFLAGS) --with-jemalloc-prefix=je_ --enable-cc-silence && $(MAKE) lib/libjemalloc.a
redis-server: $(OBJ)
$(QUIET_CC)$(CC) -o $(PRGNAME) $(CCOPT) $(DEBUG) $(OBJ) ../deps/lua/src/liblua.a
$(QUIET_CC)$(CC) -o $(PRGNAME) $(CCOPT) $(DEBUG) $(OBJ) $(CCLINK) $(ALLOC_LINK) ../deps/lua/src/liblua.a
redis-benchmark: dependencies $(BENCHOBJ)
@cd ../deps/hiredis && $(MAKE) static
$(QUIET_LINK)$(CC) -o $(BENCHPRGNAME) $(CCOPT) $(DEBUG) $(BENCHOBJ) ../deps/hiredis/libhiredis.a
$(QUIET_LINK)$(CC) -o $(BENCHPRGNAME) $(CCOPT) $(DEBUG) $(BENCHOBJ) ../deps/hiredis/libhiredis.a $(CCLINK) $(ALLOC_LINK)
redis-benchmark.o:
$(QUIET_CC)$(CC) -c $(CFLAGS) -I../deps/hiredis $(DEBUG) $(COMPILE_TIME) $<
redis-cli: dependencies $(CLIOBJ)
$(QUIET_LINK)$(CC) -o $(CLIPRGNAME) $(CCOPT) $(DEBUG) $(CLIOBJ) ../deps/hiredis/libhiredis.a ../deps/linenoise/linenoise.o
$(QUIET_LINK)$(CC) -o $(CLIPRGNAME) $(CCOPT) $(DEBUG) $(CLIOBJ) ../deps/hiredis/libhiredis.a ../deps/linenoise/linenoise.o $(CCLINK) $(ALLOC_LINK)
redis-cli.o:
$(QUIET_CC)$(CC) -c $(CFLAGS) -I../deps/hiredis -I../deps/linenoise $(DEBUG) $(COMPILE_TIME) $<
redis-check-dump: $(CHECKDUMPOBJ)
$(QUIET_LINK)$(CC) -o $(CHECKDUMPPRGNAME) $(CCOPT) $(DEBUG) $(CHECKDUMPOBJ)
$(QUIET_LINK)$(CC) -o $(CHECKDUMPPRGNAME) $(CCOPT) $(DEBUG) $(CHECKDUMPOBJ) $(CCLINK) $(ALLOC_LINK)
redis-check-aof: $(CHECKAOFOBJ)
$(QUIET_LINK)$(CC) -o $(CHECKAOFPRGNAME) $(CCOPT) $(DEBUG) $(CHECKAOFOBJ)
$(QUIET_LINK)$(CC) -o $(CHECKAOFPRGNAME) $(CCOPT) $(DEBUG) $(CHECKAOFOBJ) $(CCLINK) $(ALLOC_LINK)
.c.o:
# Because the jemalloc.h header is generated as a part of the jemalloc build
# process, building it should complete before building any other object.
%.o: %.c $(ALLOC_DEP)
$(QUIET_CC)$(CC) -c $(CFLAGS) $(DEBUG) $(COMPILE_TIME) -I../deps/lua/src $<
clean:
@@ -176,7 +198,7 @@ dep:
$(CC) -MM *.c -I ../deps/hiredis -I ../deps/linenoise
test: redis-server
(cd ..; tclsh8.5 tests/test_helper.tcl --tags "${TAGS}" --file "${FILE}")
@(cd ..; (which tclsh >/dev/null && tclsh tests/test_helper.tcl --tags "${TAGS}" --file "${FILE}") || echo "You need to install Tcl in order to run tests.")
bench:
./redis-benchmark
@@ -188,7 +210,7 @@ log:
@echo ""
@echo "WARNING: if it fails under Linux you probably need to install libc6-dev-i386"
@echo ""
$(MAKE) ARCH="-m32"
$(MAKE) ARCH="-m32" JEMALLOC_CFLAGS='CFLAGS="-std=gnu99 -Wall -pipe -g3 -fvisibility=hidden -O3 -funroll-loops -m32"'
gprof:
$(MAKE) PROF="-pg"

View File

@@ -342,7 +342,7 @@ int rewriteAppendOnlyFile(char *filename) {
redisDb *db = server.db+j;
dict *d = db->dict;
if (dictSize(d) == 0) continue;
di = dictGetIterator(d);
di = dictGetSafeIterator(d);
if (!di) {
fclose(fp);
return REDIS_ERR;
@@ -574,10 +574,6 @@ int rewriteAppendOnlyFileBackground(void) {
long long start;
if (server.bgrewritechildpid != -1) return REDIS_ERR;
if (server.ds_enabled != 0) {
redisLog(REDIS_WARNING,"BGREWRITEAOF called with diskstore enabled: AOF is not supported when diskstore is enabled. Operation not performed.");
return REDIS_ERR;
}
start = ustime();
if ((childpid = fork()) == 0) {
char tmpfile[256];

View File

@@ -1390,7 +1390,7 @@ void restoreCommand(redisClient *c) {
long ttl;
/* Make sure this key does not already exist here... */
if (dbExists(c->db,c->argv[1])) {
if (lookupKeyWrite(c->db,c->argv[1]) != NULL) {
addReplyError(c,"Target key name is busy.");
return;
}

View File

@@ -251,18 +251,6 @@ void loadServerConfig(char *filename) {
} else if (!strcasecmp(argv[0],"dbfilename") && argc == 2) {
zfree(server.dbfilename);
server.dbfilename = zstrdup(argv[1]);
} else if (!strcasecmp(argv[0],"diskstore-enabled") && argc == 2) {
if ((server.ds_enabled = yesnotoi(argv[1])) == -1) {
err = "argument must be 'yes' or 'no'"; goto loaderr;
}
} else if (!strcasecmp(argv[0],"diskstore-path") && argc == 2) {
sdsfree(server.ds_path);
server.ds_path = sdsnew(argv[1]);
} else if (!strcasecmp(argv[0],"cache-max-memory") && argc == 2) {
server.cache_max_memory = memtoll(argv[1],NULL);
} else if (!strcasecmp(argv[0],"cache-flush-delay") && argc == 2) {
server.cache_flush_delay = atoi(argv[1]);
if (server.cache_flush_delay < 0) server.cache_flush_delay = 0;
} else if (!strcasecmp(argv[0],"hash-max-zipmap-entries") && argc == 2) {
server.hash_max_zipmap_entries = memtoll(argv[1], NULL);
} else if (!strcasecmp(argv[0],"hash-max-zipmap-value") && argc == 2) {

View File

@@ -5,35 +5,6 @@
#include <AvailabilityMacros.h>
#endif
/* Use tcmalloc's malloc_size() when available.
* When tcmalloc is used, native OSX malloc_size() may never be used because
* this expects a different allocation scheme. Therefore, *exclusively* use
* either tcmalloc or OSX's malloc_size()! */
#if defined(USE_TCMALLOC)
#define REDIS_MALLOC "tcmalloc"
#include <google/tcmalloc.h>
#if TC_VERSION_MAJOR >= 1 && TC_VERSION_MINOR >= 6
#define HAVE_MALLOC_SIZE 1
#define redis_malloc_size(p) tc_malloc_size(p)
#endif
#elif defined(USE_JEMALLOC)
#define REDIS_MALLOC "jemalloc"
#define JEMALLOC_MANGLE
#include <jemalloc/jemalloc.h>
#if JEMALLOC_VERSION_MAJOR >= 2 && JEMALLOC_VERSION_MINOR >= 1
#define HAVE_MALLOC_SIZE 1
#define redis_malloc_size(p) JEMALLOC_P(malloc_usable_size)(p)
#endif
#elif defined(__APPLE__)
#include <malloc/malloc.h>
#define HAVE_MALLOC_SIZE 1
#define redis_malloc_size(p) malloc_size(p)
#endif
#ifndef REDIS_MALLOC
#define REDIS_MALLOC "libc"
#endif
/* Define redis_fstat to fstat or fstat64() */
#if defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
#define redis_fstat fstat64

157
src/db.c
View File

@@ -31,13 +31,6 @@ void SlotToKeyDel(robj *key);
* the disk object. If it is in this state, we wait.
*/
void lookupWaitBusyKey(redisDb *db, robj *key) {
/* FIXME: wait just for this key, not everything */
waitEmptyIOJobsQueue();
processAllPendingIOJobs();
redisAssert((cacheScheduleIOGetFlags(db,key) & REDIS_IO_SAVEINPROG) == 0);
}
robj *lookupKey(redisDb *db, robj *key) {
dictEntry *de = dictFind(db->dict,key->ptr);
if (de) {
@@ -48,53 +41,9 @@ robj *lookupKey(redisDb *db, robj *key) {
* a copy on write madness. */
if (server.bgsavechildpid == -1 && server.bgrewritechildpid == -1)
val->lru = server.lruclock;
if (server.ds_enabled &&
cacheScheduleIOGetFlags(db,key) & REDIS_IO_SAVEINPROG)
{
/* Need to wait for the key to get unbusy */
redisLog(REDIS_DEBUG,"Lookup found a key in SAVEINPROG state. Waiting. (Key was in the cache)");
lookupWaitBusyKey(db,key);
}
server.stat_keyspace_hits++;
return val;
} else {
time_t expire;
robj *val;
/* Key not found in the in memory hash table, but if disk store is
* enabled we may have this key on disk. If so load it in memory
* in a blocking way. */
if (server.ds_enabled && cacheKeyMayExist(db,key)) {
long flags = cacheScheduleIOGetFlags(db,key);
/* They key is not in cache, but it has a SAVE op in queue?
* The only possibility is that the key was deleted, since
* dirty keys are not evicted. */
if (flags & REDIS_IO_SAVE) {
server.stat_keyspace_misses++;
return NULL;
}
/* At this point we need to blocking load the key in memory.
* The first thing we do is waiting here if the key is busy. */
if (flags & REDIS_IO_SAVEINPROG) {
redisLog(REDIS_DEBUG,"Lookup found a key in SAVEINPROG state. Waiting (while force loading).");
lookupWaitBusyKey(db,key);
}
redisLog(REDIS_DEBUG,"Force loading key %s via lookup", key->ptr);
val = dsGet(db,key,&expire);
if (val) {
int retval = dbAdd(db,key,val);
redisAssert(retval == REDIS_OK);
if (expire != -1) setExpire(db,key,expire);
server.stat_keyspace_hits++;
return val;
} else {
cacheSetKeyDoesNotExist(db,key);
}
}
server.stat_keyspace_misses++;
return NULL;
}
@@ -122,42 +71,45 @@ robj *lookupKeyWriteOrReply(redisClient *c, robj *key, robj *reply) {
return o;
}
/* Add the key to the DB. If the key already exists REDIS_ERR is returned,
* otherwise REDIS_OK is returned, and the caller should increment the
* refcount of 'val'. */
int dbAdd(redisDb *db, robj *key, robj *val) {
/* Perform a lookup before adding the key, as we need to copy the
* key value. */
if (dictFind(db->dict, key->ptr) != NULL) {
return REDIS_ERR;
} else {
sds copy = sdsdup(key->ptr);
dictAdd(db->dict, copy, val);
if (server.ds_enabled) cacheSetKeyMayExist(db,key);
if (server.cluster_enabled) SlotToKeyAdd(key);
return REDIS_OK;
}
/* Add the key to the DB. It's up to the caller to increment the reference
* counte of the value if needed.
*
* The program is aborted if the key already exists. */
void dbAdd(redisDb *db, robj *key, robj *val) {
sds copy = sdsdup(key->ptr);
int retval = dictAdd(db->dict, copy, val);
redisAssert(retval == REDIS_OK);
if (server.cluster_enabled) SlotToKeyAdd(key);
}
/* Overwrite an existing key with a new value. Incrementing the reference
* count of the new value is up to the caller.
* This function does not modify the expire time of the existing key.
*
* The program is aborted if the key was not already present. */
void dbOverwrite(redisDb *db, robj *key, robj *val) {
struct dictEntry *de = dictFind(db->dict,key->ptr);
redisAssert(de != NULL);
dictReplace(db->dict, key->ptr, val);
}
/* If the key does not exist, this is just like dbAdd(). Otherwise
* the value associated to the key is replaced with the new one.
/* High level Set operation. This function can be used in order to set
* a key, whatever it was existing or not, to a new object.
*
* On update (key already existed) 0 is returned. Otherwise 1. */
int dbReplace(redisDb *db, robj *key, robj *val) {
robj *oldval;
int retval;
if ((oldval = dictFetchValue(db->dict,key->ptr)) == NULL) {
sds copy = sdsdup(key->ptr);
dictAdd(db->dict, copy, val);
if (server.cluster_enabled) SlotToKeyAdd(key);
retval = 1;
* 1) The ref count of the value object is incremented.
* 2) clients WATCHing for the destination key notified.
* 3) The expire time of the key is reset (the key is made persistent). */
void setKey(redisDb *db, robj *key, robj *val) {
if (lookupKeyWrite(db,key) == NULL) {
dbAdd(db,key,val);
} else {
dictReplace(db->dict, key->ptr, val);
retval = 0;
dbOverwrite(db,key,val);
}
if (server.ds_enabled) cacheSetKeyMayExist(db,key);
return retval;
incrRefCount(val);
removeExpire(db,key);
touchWatchedKey(db,key);
}
int dbExists(redisDb *db, robj *key) {
@@ -192,14 +144,6 @@ robj *dbRandomKey(redisDb *db) {
/* Delete a key, value, and associated expiration entry if any, from the DB */
int dbDelete(redisDb *db, robj *key) {
/* If diskstore is enabled make sure to awake waiting clients for this key
* as it is not really useful to wait for a key already deleted to be
* loaded from disk. */
if (server.ds_enabled) {
handleClientsBlockedOnSwappedKey(db,key);
cacheSetKeyDoesNotExist(db,key);
}
/* Deleting an entry from the expires dict will not free the sds of
* the key, because it is shared with the main dictionary. */
if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
@@ -221,7 +165,6 @@ long long emptyDb() {
removed += dictSize(server.db[j].dict);
dictEmpty(server.db[j].dict);
dictEmpty(server.db[j].expires);
if (server.ds_enabled) dictEmpty(server.db[j].io_negcache);
}
return removed;
}
@@ -244,8 +187,6 @@ int selectDb(redisClient *c, int id) {
void signalModifiedKey(redisDb *db, robj *key) {
touchWatchedKey(db,key);
if (server.ds_enabled)
cacheScheduleIO(db,key,REDIS_IO_SAVE);
}
void signalFlushedDb(int dbid) {
@@ -261,7 +202,6 @@ void flushdbCommand(redisClient *c) {
signalFlushedDb(c->db->id);
dictEmpty(c->db->dict);
dictEmpty(c->db->expires);
if (server.ds_enabled) dsFlushDb(c->db->id);
addReply(c,shared.ok);
}
@@ -273,10 +213,7 @@ void flushallCommand(redisClient *c) {
kill(server.bgsavechildpid,SIGKILL);
rdbRemoveTempFile(server.bgsavechildpid);
}
if (server.ds_enabled)
dsFlushDb(-1);
else
rdbSave(server.dbfilename);
rdbSave(server.dbfilename);
server.dirty++;
}
@@ -284,22 +221,10 @@ void delCommand(redisClient *c) {
int deleted = 0, j;
for (j = 1; j < c->argc; j++) {
if (server.ds_enabled) {
lookupKeyRead(c->db,c->argv[j]);
/* FIXME: this can be optimized a lot, no real need to load
* a possibly huge value. */
}
if (dbDelete(c->db,c->argv[j])) {
signalModifiedKey(c->db,c->argv[j]);
server.dirty++;
deleted++;
} else if (server.ds_enabled) {
if (cacheKeyMayExist(c->db,c->argv[j]) &&
dsExists(c->db,c->argv[j]))
{
cacheScheduleIO(c->db,c->argv[j],REDIS_IO_SAVE);
deleted = 1;
}
}
}
addReplyLongLong(c,deleted);
@@ -414,13 +339,15 @@ void renameGenericCommand(redisClient *c, int nx) {
return;
incrRefCount(o);
if (dbAdd(c->db,c->argv[2],o) == REDIS_ERR) {
if (lookupKeyWrite(c->db,c->argv[2]) != NULL) {
if (nx) {
decrRefCount(o);
addReply(c,shared.czero);
return;
}
dbReplace(c->db,c->argv[2],o);
dbOverwrite(c->db,c->argv[2],o);
} else {
dbAdd(c->db,c->argv[2],o);
}
dbDelete(c->db,c->argv[1]);
signalModifiedKey(c->db,c->argv[1]);
@@ -471,11 +398,12 @@ void moveCommand(redisClient *c) {
return;
}
/* Try to add the element to the target DB */
if (dbAdd(dst,c->argv[1],o) == REDIS_ERR) {
/* Return zero if the key already exists in the target DB */
if (lookupKeyWrite(dst,c->argv[1]) != NULL) {
addReply(c,shared.czero);
return;
}
dbAdd(dst,c->argv[1],o);
incrRefCount(o);
/* OK! key moved, free the entry in the source DB */
@@ -611,7 +539,6 @@ void expireatCommand(redisClient *c) {
void ttlCommand(redisClient *c) {
time_t expire, ttl = -1;
if (server.ds_enabled) lookupKeyRead(c->db,c->argv[1]);
expire = getExpire(c->db,c->argv[1]);
if (expire != -1) {
ttl = (expire-time(NULL));

View File

@@ -212,26 +212,7 @@ void computeDatasetDigest(unsigned char *final) {
void debugCommand(redisClient *c) {
if (!strcasecmp(c->argv[1]->ptr,"segfault")) {
*((char*)-1) = 'x';
} else if (!strcasecmp(c->argv[1]->ptr,"flushcache")) {
if (!server.ds_enabled) {
addReplyError(c, "DEBUG FLUSHCACHE called with diskstore off.");
return;
} else if (server.bgsavethread != (pthread_t) -1) {
addReplyError(c, "Can't flush cache while BGSAVE is in progress.");
return;
} else {
/* To flush the whole cache we need to wait for everything to
* be flushed on disk... */
cacheForcePointInTime();
emptyDb();
addReply(c,shared.ok);
return;
}
} else if (!strcasecmp(c->argv[1]->ptr,"reload")) {
if (server.ds_enabled) {
addReply(c,shared.ok);
return;
}
if (rdbSave(server.dbfilename) != REDIS_OK) {
addReply(c,shared.err);
return;
@@ -256,7 +237,6 @@ void debugCommand(redisClient *c) {
robj *val;
char *strenc;
if (server.ds_enabled) lookupKeyRead(c->db,c->argv[2]);
if ((de = dictFind(c->db->dict,c->argv[2]->ptr)) == NULL) {
addReply(c,shared.nokeyerr);
return;

View File

@@ -1,509 +0,0 @@
/* diskstore.c implements a very simple disk backed key-value store used
* by Redis for the "disk" backend. This implementation uses the filesystem
* to store key/value pairs. Every file represents a given key.
*
* The key path is calculated using the SHA1 of the key name. For instance
* the key "foo" is stored as a file name called:
*
* /0b/ee/0beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33
*
* The couples of characters from the hex output of SHA1 are also used
* to locate two two levels of directories to store the file (as most
* filesystems are not able to handle too many files in a single dir).
*
* In the end there are 65536 final directories (256 directories inside
* every 256 top level directories), so that with 1 billion of files every
* directory will contain in the average 15258 entires, that is ok with
* most filesystems implementation.
*
* Note that since Redis supports multiple databases, the actual key name
* is:
*
* /0b/ee/<dbid>_0beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33
*
* so for instance if the key is inside DB 0:
*
* /0b/ee/0_0beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33
*
* The actaul implementation of this disk store is highly dependant to the
* filesystem implementation itself. This implementation may be replaced by
* a B+TREE implementation in future implementations.
*
* Data ok every key is serialized using the same format used for .rdb
* serialization. Everything is serialized on every entry: key name,
* ttl information in case of keys with an associated expire time, and the
* serialized value itself.
*
* Because the format is the same of the .rdb files it is trivial to create
* an .rdb file starting from this format just by mean of scanning the
* directories and concatenating entries, with the sole addition of an
* .rdb header at the start and the end-of-db opcode at the end.
*
* -------------------------------------------------------------------------
*
* Copyright (c) 2010-2011, Salvatore Sanfilippo <antirez at gmail dot com>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of Redis nor the names of its contributors may be used
* to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#include "redis.h"
#include "sha1.h"
#include <fcntl.h>
#include <sys/stat.h>
#include <dirent.h>
int create256dir(char *prefix) {
char buf[1024];
int j;
for (j = 0; j < 256; j++) {
snprintf(buf,sizeof(buf),"%s%02x",prefix,j);
if (mkdir(buf,0755) == -1) {
redisLog(REDIS_WARNING,"Error creating dir %s for diskstore: %s",
buf,strerror(errno));
return REDIS_ERR;
}
}
return REDIS_OK;
}
int dsOpen(void) {
struct stat sb;
int retval, j;
char *path = server.ds_path;
char buf[1024];
if ((retval = stat(path,&sb) == -1) && errno != ENOENT) {
redisLog(REDIS_WARNING, "Error opening disk store at %s: %s",
path, strerror(errno));
return REDIS_ERR;
}
/* Directory already in place. Assume everything is ok. */
if (retval == 0 && S_ISDIR(sb.st_mode)) {
redisLog(REDIS_NOTICE,"Disk store %s exists", path);
return REDIS_OK;
}
/* File exists but it's not a directory */
if (retval == 0 && !S_ISDIR(sb.st_mode)) {
redisLog(REDIS_WARNING,"Disk store at %s is not a directory", path);
return REDIS_ERR;
}
/* New disk store, create the directory structure now, as creating
* them in a lazy way is not a good idea, after very few insertions
* we'll need most of the 65536 directories anyway. */
redisLog(REDIS_NOTICE,"Disk store %s does not exist: creating", path);
if (mkdir(path,0755) == -1) {
redisLog(REDIS_WARNING,"Disk store init failed creating dir %s: %s",
path, strerror(errno));
return REDIS_ERR;
}
/* Create the top level 256 directories */
snprintf(buf,sizeof(buf),"%s/",path);
if (create256dir(buf) == REDIS_ERR) return REDIS_ERR;
/* For every 256 top level dir, create 256 nested dirs */
for (j = 0; j < 256; j++) {
snprintf(buf,sizeof(buf),"%s/%02x/",path,j);
if (create256dir(buf) == REDIS_ERR) return REDIS_ERR;
}
return REDIS_OK;
}
int dsClose(void) {
return REDIS_OK;
}
/* Convert key into full path for this object. Dirty but hopefully
* is fast enough. Returns the length of the returned path. */
int dsKeyToPath(redisDb *db, char *buf, robj *key) {
SHA1_CTX ctx;
unsigned char hash[20];
char hex[40], digits[] = "0123456789abcdef";
int j, l;
char *origbuf = buf;
SHA1Init(&ctx);
SHA1Update(&ctx,key->ptr,sdslen(key->ptr));
SHA1Final(hash,&ctx);
/* Convert the hash into hex format */
for (j = 0; j < 20; j++) {
hex[j*2] = digits[(hash[j]&0xF0)>>4];
hex[(j*2)+1] = digits[hash[j]&0x0F];
}
/* Create the object path. Start with server.ds_path that's the root dir */
l = sdslen(server.ds_path);
memcpy(buf,server.ds_path,l);
buf += l;
*buf++ = '/';
/* Then add xx/yy/ that is the two level directories */
buf[0] = hex[0];
buf[1] = hex[1];
buf[2] = '/';
buf[3] = hex[2];
buf[4] = hex[3];
buf[5] = '/';
buf += 6;
/* Add the database number followed by _ and finall the SHA1 hex */
l = ll2string(buf,64,db->id);
buf += l;
buf[0] = '_';
memcpy(buf+1,hex,40);
buf[41] = '\0';
return (buf-origbuf)+41;
}
int dsSet(redisDb *db, robj *key, robj *val, time_t expire) {
char buf[1024], buf2[1024];
FILE *fp;
int retval, len;
len = dsKeyToPath(db,buf,key);
memcpy(buf2,buf,len);
snprintf(buf2+len,sizeof(buf2)-len,"-%ld-%ld",(long)time(NULL),(long)val);
while ((fp = fopen(buf2,"w")) == NULL) {
if (errno == ENOSPC) {
redisLog(REDIS_WARNING,"Diskstore: No space left on device. Please make room and wait 30 seconds for Redis to continue.");
sleep(30);
} else {
redisLog(REDIS_WARNING,"diskstore error opening %s: %s",
buf2, strerror(errno));
redisPanic("Unrecoverable diskstore error. Exiting.");
}
}
if ((retval = rdbSaveKeyValuePair(fp,key,val,expire,time(NULL))) == -1)
return REDIS_ERR;
fclose(fp);
if (retval == 0) {
/* Expired key. Unlink failing not critical */
unlink(buf);
unlink(buf2);
} else {
/* Use rename for atomic updadte of value */
if (rename(buf2,buf) == -1) {
redisLog(REDIS_WARNING,"rename(2) returned an error: %s",
strerror(errno));
redisPanic("Unrecoverable diskstore error. Exiting.");
}
}
return REDIS_OK;
}
robj *dsGet(redisDb *db, robj *key, time_t *expire) {
char buf[1024];
int type;
time_t expiretime = -1; /* -1 means: no expire */
robj *dskey; /* Key as loaded from disk. */
robj *val;
FILE *fp;
dsKeyToPath(db,buf,key);
fp = fopen(buf,"r");
if (fp == NULL && errno == ENOENT) return NULL; /* No such key */
if (fp == NULL) {
redisLog(REDIS_WARNING,"Disk store failed opening %s: %s",
buf, strerror(errno));
goto readerr;
}
if ((type = rdbLoadType(fp)) == -1) goto readerr;
if (type == REDIS_EXPIRETIME) {
if ((expiretime = rdbLoadTime(fp)) == -1) goto readerr;
/* We read the time so we need to read the object type again */
if ((type = rdbLoadType(fp)) == -1) goto readerr;
}
/* Read key */
if ((dskey = rdbLoadStringObject(fp)) == NULL) goto readerr;
/* Read value */
if ((val = rdbLoadObject(type,fp)) == NULL) goto readerr;
fclose(fp);
/* The key we asked, and the key returned, must be the same */
redisAssert(equalStringObjects(key,dskey));
/* Check if the key already expired */
decrRefCount(dskey);
if (expiretime != -1 && expiretime < time(NULL)) {
decrRefCount(val);
unlink(buf); /* This failing is non critical here */
return NULL;
}
/* Everything ok... */
*expire = expiretime;
return val;
readerr:
redisLog(REDIS_WARNING,"Read error reading reading %s. Corrupted key?",
buf);
redisPanic("Unrecoverable error reading from disk store");
return NULL; /* unreached */
}
int dsDel(redisDb *db, robj *key) {
char buf[1024];
dsKeyToPath(db,buf,key);
if (unlink(buf) == -1) {
if (errno == ENOENT) {
return REDIS_ERR;
} else {
redisLog(REDIS_WARNING,"Disk store can't remove %s: %s",
buf, strerror(errno));
redisPanic("Unrecoverable Disk store errore. Existing.");
return REDIS_ERR; /* unreached */
}
} else {
return REDIS_OK;
}
}
int dsExists(redisDb *db, robj *key) {
char buf[1024];
dsKeyToPath(db,buf,key);
return access(buf,R_OK) == 0;
}
int dsGetDbidFromFilename(char *path) {
char id[64];
char *p = strchr(path,'_');
int len = (p - path);
redisAssert(p != NULL && len < 64);
memcpy(id,path,len);
id[len] = '\0';
return atoi(id);
}
void dsFlushOneDir(char *path, int dbid) {
DIR *dir;
struct dirent *dp, de;
dir = opendir(path);
if (dir == NULL) {
redisLog(REDIS_WARNING,"Disk store can't open dir %s: %s",
path, strerror(errno));
redisPanic("Unrecoverable Disk store errore. Existing.");
}
while(1) {
char buf[1024];
readdir_r(dir,&de,&dp);
if (dp == NULL) break;
if (dp->d_name[0] == '.') continue;
/* Check if we need to remove this entry accordingly to the
* DB number. */
if (dbid != -1 && dsGetDbidFromFilename(dp->d_name)) continue;
/* Finally unlink the file */
snprintf(buf,1024,"%s/%s",path,dp->d_name);
if (unlink(buf) == -1) {
redisLog(REDIS_WARNING,
"Can't unlink %s: %s", buf, strerror(errno));
redisPanic("Unrecoverable Disk store errore. Existing.");
}
}
closedir(dir);
}
void dsFlushDb(int dbid) {
char buf[1024];
int j, i;
redisLog(REDIS_NOTICE,"Flushing diskstore DB (%d)",dbid);
for (j = 0; j < 256; j++) {
for (i = 0; i < 256; i++) {
snprintf(buf,1024,"%s/%02x/%02x",server.ds_path,j,i);
dsFlushOneDir(buf,dbid);
}
}
}
void dsRdbSaveSetState(int state) {
pthread_mutex_lock(&server.bgsavethread_mutex);
server.bgsavethread_state = state;
pthread_mutex_unlock(&server.bgsavethread_mutex);
}
void *dsRdbSave_thread(void *arg) {
char tmpfile[256], *filename = (char*)arg;
struct dirent *dp, de;
int j, i, last_dbid = -1;
FILE *fp;
/* Change state to ACTIVE, to signal there is a saving thead working. */
redisLog(REDIS_NOTICE,"Diskstore BGSAVE thread started");
dsRdbSaveSetState(REDIS_BGSAVE_THREAD_ACTIVE);
snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
fp = fopen(tmpfile,"w");
if (!fp) {
redisLog(REDIS_WARNING, "Failed opening .rdb for saving: %s",
strerror(errno));
dsRdbSaveSetState(REDIS_BGSAVE_THREAD_DONE_ERR);
return NULL;
}
if (fwrite("REDIS0001",9,1,fp) == 0) goto werr;
sleep(5);
/* Scan all diskstore dirs looking for keys */
for (j = 0; j < 256; j++) {
for (i = 0; i < 256; i++) {
DIR *dir;
char buf[1024];
/* For every directory, collect all the keys */
snprintf(buf,sizeof(buf),"%s/%02x/%02x",server.ds_path,j,i);
if ((dir = opendir(buf)) == NULL) {
redisLog(REDIS_WARNING,"Disk store can't open dir %s: %s",
buf, strerror(errno));
goto werr;
}
while(1) {
char buf[1024];
int dbid;
FILE *entryfp;
readdir_r(dir,&de,&dp);
if (dp == NULL) break;
if (dp->d_name[0] == '.') continue;
/* If there is a '-' char in the file name, it's a temp file */
if (strchr(dp->d_name,'-') != NULL) continue;
/* Emit the SELECT DB opcode if needed. */
dbid = dsGetDbidFromFilename(dp->d_name);
if (dbid != last_dbid) {
last_dbid = dbid;
if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr;
if (rdbSaveLen(fp,dbid) == -1) goto werr;
}
/* Let's copy this file into the target .rdb */
snprintf(buf,sizeof(buf),"%s/%02x/%02x/%s",
server.ds_path,j,i,dp->d_name);
if ((entryfp = fopen(buf,"r")) == NULL) {
redisLog(REDIS_WARNING,"Can't open %s: %s",
buf,strerror(errno));
closedir(dir);
goto werr;
}
while(1) {
int nread = fread(buf,1,sizeof(buf),entryfp);
if (nread == 0) {
if (ferror(entryfp)) {
redisLog(REDIS_WARNING,"Error reading from file entry while performing BGSAVE for diskstore: %s", strerror(errno));
closedir(dir);
goto werr;
} else {
break;
}
}
if (fwrite(buf,1,nread,fp) != (unsigned)nread) {
closedir(dir);
goto werr;
}
}
fclose(entryfp);
}
closedir(dir);
}
}
/* Output the end of file opcode */
if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr;
/* Make sure data will not remain on the OS's output buffers */
fflush(fp);
fsync(fileno(fp));
fclose(fp);
zfree(filename);
/* Use RENAME to make sure the DB file is changed atomically only
* if the generate DB file is ok. */
if (rename(tmpfile,filename) == -1) {
redisLog(REDIS_WARNING,"Error moving temp DB file on the final destination: %s (diskstore)", strerror(errno));
unlink(tmpfile);
dsRdbSaveSetState(REDIS_BGSAVE_THREAD_DONE_ERR);
return NULL;
}
redisLog(REDIS_NOTICE,"DB saved on disk by diskstore thread");
dsRdbSaveSetState(REDIS_BGSAVE_THREAD_DONE_OK);
return NULL;
werr:
zfree(filename);
fclose(fp);
unlink(tmpfile);
dsRdbSaveSetState(REDIS_BGSAVE_THREAD_DONE_ERR);
redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
return NULL;
}
int dsRdbSaveBackground(char *filename) {
pthread_t thread;
if (pthread_create(&thread,NULL,dsRdbSave_thread,zstrdup(filename)) != 0) {
redisLog(REDIS_WARNING,"Can't create diskstore BGSAVE thread: %s",
strerror(errno));
return REDIS_ERR;
} else {
server.bgsavethread = thread;
return REDIS_OK;
}
}
int dsRdbSave(char *filename) {
/* A blocking save is actually a non blocking save... just we wait
* for it to terminate in a non-busy loop. */
redisLog(REDIS_NOTICE,"Starting a blocking SAVE (BGSAVE + blocking wait)");
server.dirty_before_bgsave = server.dirty;
if (dsRdbSaveBackground(filename) == REDIS_ERR) return REDIS_ERR;
while(1) {
usleep(1000);
int state;
pthread_mutex_lock(&server.bgsavethread_mutex);
state = server.bgsavethread_state;
pthread_mutex_unlock(&server.bgsavethread_mutex);
if (state == REDIS_BGSAVE_THREAD_DONE_OK ||
state == REDIS_BGSAVE_THREAD_DONE_ERR) break;
}
return REDIS_OK;
}

File diff suppressed because it is too large Load Diff

View File

@@ -501,25 +501,6 @@ void freeClient(redisClient *c) {
redisAssert(ln != NULL);
listDelNode(server.unblocked_clients,ln);
}
/* Remove from the list of clients waiting for swapped keys, or ready
* to be restarted, but not yet woken up again. */
if (c->flags & REDIS_IO_WAIT) {
redisAssert(server.ds_enabled);
if (listLength(c->io_keys) == 0) {
ln = listSearchKey(server.io_ready_clients,c);
/* When this client is waiting to be woken up (REDIS_IO_WAIT),
* it should be present in the list io_ready_clients */
redisAssert(ln != NULL);
listDelNode(server.io_ready_clients,ln);
} else {
while (listLength(c->io_keys)) {
ln = listFirst(c->io_keys);
dontWaitForSwappedKey(c,ln->value);
}
}
server.cache_blocked_clients--;
}
listRelease(c->io_keys);
/* Master/slave cleanup.
* Case 1: we lost the connection with a slave. */
@@ -536,6 +517,7 @@ void freeClient(redisClient *c) {
if (c->flags & REDIS_MASTER) {
server.master = NULL;
server.replstate = REDIS_REPL_CONNECT;
server.repl_down_since = time(NULL);
/* Since we lost the connection with the master, we should also
* close the connection with all our slaves if we have any, so
* when we'll resync with the master the other slaves will sync again
@@ -809,9 +791,6 @@ int processMultibulkBuffer(redisClient *c) {
void processInputBuffer(redisClient *c) {
/* Keep processing while there is something in the input buffer */
while(sdslen(c->querybuf)) {
/* Immediately abort if the client is in the middle of something. */
if (c->flags & REDIS_BLOCKED || c->flags & REDIS_IO_WAIT) return;
/* REDIS_CLOSE_AFTER_REPLY closes the connection once the reply is
* written to the client. Make sure to not let the reply grow after
* this flag has been set (i.e. don't process more commands). */
@@ -920,7 +899,6 @@ void clientCommand(redisClient *c) {
if (p == flags) *p++ = 'N';
if (client->flags & REDIS_MULTI) *p++ = 'x';
if (client->flags & REDIS_BLOCKED) *p++ = 'b';
if (client->flags & REDIS_IO_WAIT) *p++ = 'i';
if (client->flags & REDIS_DIRTY_CAS) *p++ = 'd';
if (client->flags & REDIS_CLOSE_AFTER_REPLY) *p++ = 'c';
if (client->flags & REDIS_UNBLOCKED) *p++ = 'u';
@@ -960,3 +938,28 @@ void clientCommand(redisClient *c) {
addReplyError(c, "Syntax error, try CLIENT (LIST | KILL ip:port)");
}
}
void rewriteClientCommandVector(redisClient *c, int argc, ...) {
va_list ap;
int j;
robj **argv; /* The new argument vector */
argv = zmalloc(sizeof(robj*)*argc);
va_start(ap,argc);
for (j = 0; j < argc; j++) {
robj *a;
a = va_arg(ap, robj*);
argv[j] = a;
incrRefCount(a);
}
/* We free the objects in the original vector at the end, so we are
* sure that if the same objects are reused in the new vector the
* refcount gets incremented before it gets decremented. */
for (j = 0; j < c->argc; j++) decrRefCount(c->argv[j]);
zfree(c->argv);
/* Replace argv and argc with our new versions. */
c->argv = argv;
c->argc = argc;
va_end(ap);
}

View File

@@ -1,5 +1,4 @@
#include "redis.h"
#include <pthread.h>
#include <math.h>
robj *createObject(int type, void *ptr) {
@@ -30,9 +29,7 @@ robj *createStringObject(char *ptr, size_t len) {
robj *createStringObjectFromLongLong(long long value) {
robj *o;
if (value >= 0 && value < REDIS_SHARED_INTEGERS &&
!server.ds_enabled &&
pthread_equal(pthread_self(),server.mainthread)) {
if (value >= 0 && value < REDIS_SHARED_INTEGERS) {
incrRefCount(shared.integers[value]);
o = shared.integers[value];
} else {
@@ -241,10 +238,7 @@ robj *tryObjectEncoding(robj *o) {
* Note that we also avoid using shared integers when maxmemory is used
* because every object needs to have a private LRU field for the LRU
* algorithm to work well. */
if (!server.ds_enabled &&
server.maxmemory == 0 && value >= 0 && value < REDIS_SHARED_INTEGERS &&
pthread_equal(pthread_self(),server.mainthread))
{
if (server.maxmemory == 0 && value >= 0 && value < REDIS_SHARED_INTEGERS) {
decrRefCount(o);
incrRefCount(shared.integers[value]);
return shared.integers[value];

View File

@@ -413,11 +413,6 @@ int rdbSave(char *filename) {
int j;
time_t now = time(NULL);
if (server.ds_enabled) {
cacheForcePointInTime();
return dsRdbSave(filename);
}
snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
fp = fopen(tmpfile,"w");
if (!fp) {
@@ -430,7 +425,7 @@ int rdbSave(char *filename) {
redisDb *db = server.db+j;
dict *d = db->dict;
if (dictSize(d) == 0) continue;
di = dictGetIterator(d);
di = dictGetSafeIterator(d);
if (!di) {
fclose(fp);
return REDIS_ERR;
@@ -484,16 +479,10 @@ int rdbSaveBackground(char *filename) {
pid_t childpid;
long long start;
if (server.bgsavechildpid != -1 ||
server.bgsavethread != (pthread_t) -1) return REDIS_ERR;
if (server.bgsavechildpid != -1) return REDIS_ERR;
server.dirty_before_bgsave = server.dirty;
if (server.ds_enabled) {
cacheForcePointInTime();
return dsRdbSaveBackground(filename);
}
start = ustime();
if ((childpid = fork()) == 0) {
int retval;
@@ -918,7 +907,7 @@ void stopLoading(void) {
int rdbLoad(char *filename) {
FILE *fp;
uint32_t dbid;
int type, retval, rdbver;
int type, rdbver;
redisDb *db = server.db+0;
char buf[1024];
time_t expiretime, now = time(NULL);
@@ -981,11 +970,8 @@ int rdbLoad(char *filename) {
continue;
}
/* Add the new object in the hash table */
retval = dbAdd(db,key,val);
if (retval == REDIS_ERR) {
redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", key->ptr);
exit(1);
}
dbAdd(db,key,val);
/* Set the expire time if needed */
if (expiretime != -1) setExpire(db,key,expiretime);
@@ -1016,15 +1002,13 @@ void backgroundSaveDoneHandler(int exitcode, int bysignal) {
rdbRemoveTempFile(server.bgsavechildpid);
}
server.bgsavechildpid = -1;
server.bgsavethread = (pthread_t) -1;
server.bgsavethread_state = REDIS_BGSAVE_THREAD_UNACTIVE;
/* Possibly there are slaves waiting for a BGSAVE in order to be served
* (the first stage of SYNC is a bulk transfer of dump.rdb) */
updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
}
void saveCommand(redisClient *c) {
if (server.bgsavechildpid != -1 || server.bgsavethread != (pthread_t)-1) {
if (server.bgsavechildpid != -1) {
addReplyError(c,"Background save already in progress");
return;
}
@@ -1036,7 +1020,7 @@ void saveCommand(redisClient *c) {
}
void bgsaveCommand(redisClient *c) {
if (server.bgsavechildpid != -1 || server.bgsavethread != (pthread_t)-1) {
if (server.bgsavechildpid != -1) {
addReplyError(c,"Background save already in progress");
} else if (server.bgrewritechildpid != -1) {
addReplyError(c,"Can't BGSAVE while AOF log rewriting is in progress");

View File

@@ -50,7 +50,6 @@
#include <limits.h>
#include <float.h>
#include <math.h>
#include <pthread.h>
#include <sys/resource.h>
/* Our shared "common" objects */
@@ -661,22 +660,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
}
updateDictResizePolicy();
}
} else if (server.bgsavethread != (pthread_t) -1) {
if (server.bgsavethread != (pthread_t) -1) {
int state;
pthread_mutex_lock(&server.bgsavethread_mutex);
state = server.bgsavethread_state;
pthread_mutex_unlock(&server.bgsavethread_mutex);
if (state == REDIS_BGSAVE_THREAD_DONE_OK ||
state == REDIS_BGSAVE_THREAD_DONE_ERR)
{
backgroundSaveDoneHandler(
(state == REDIS_BGSAVE_THREAD_DONE_OK) ? 0 : 1, 0);
}
}
} else if (!server.ds_enabled) {
} else {
time_t now = time(NULL);
/* If there is not a background saving/rewrite in progress check if
@@ -714,10 +698,6 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
* in order to guarantee a strict consistency. */
if (server.masterhost == NULL) activeExpireCycle();
/* Remove a few cached objects from memory if we are over the
* configured memory limit */
if (server.ds_enabled) cacheCron();
/* Replication cron function -- used to reconnect to master and
* to detect transfer failures. */
if (!(loops % 10)) replicationCron();
@@ -737,31 +717,6 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
listNode *ln;
redisClient *c;
/* Awake clients that got all the on disk keys they requested */
if (server.ds_enabled && listLength(server.io_ready_clients)) {
listIter li;
listRewind(server.io_ready_clients,&li);
while((ln = listNext(&li))) {
c = ln->value;
struct redisCommand *cmd;
/* Resume the client. */
listDelNode(server.io_ready_clients,ln);
c->flags &= (~REDIS_IO_WAIT);
server.cache_blocked_clients--;
aeCreateFileEvent(server.el, c->fd, AE_READABLE,
readQueryFromClient, c);
cmd = lookupCommand(c->argv[0]->ptr);
redisAssert(cmd != NULL);
call(c,cmd);
resetClient(c);
/* There may be more data to process in the input buffer. */
if (c->querybuf && sdslen(c->querybuf) > 0)
processInputBuffer(c);
}
}
/* Try to process pending commands for clients that were just unblocked. */
while (listLength(server.unblocked_clients)) {
ln = listFirst(server.unblocked_clients);
@@ -874,10 +829,6 @@ void initServerConfig() {
server.maxmemory = 0;
server.maxmemory_policy = REDIS_MAXMEMORY_VOLATILE_LRU;
server.maxmemory_samples = 3;
server.ds_enabled = 0;
server.ds_path = sdsnew("/tmp/redis.ds");
server.cache_max_memory = 64LL*1024*1024; /* 64 MB of RAM */
server.cache_blocked_clients = 0;
server.hash_max_zipmap_entries = REDIS_HASH_MAX_ZIPMAP_ENTRIES;
server.hash_max_zipmap_value = REDIS_HASH_MAX_ZIPMAP_VALUE;
server.list_max_ziplist_entries = REDIS_LIST_MAX_ZIPLIST_ENTRIES;
@@ -886,7 +837,6 @@ void initServerConfig() {
server.zset_max_ziplist_entries = REDIS_ZSET_MAX_ZIPLIST_ENTRIES;
server.zset_max_ziplist_value = REDIS_ZSET_MAX_ZIPLIST_VALUE;
server.shutdown_asap = 0;
server.cache_flush_delay = 0;
server.cluster_enabled = 0;
server.cluster.configfile = zstrdup("nodes.conf");
server.lua_time_limit = REDIS_LUA_TIME_LIMIT;
@@ -906,6 +856,7 @@ void initServerConfig() {
server.replstate = REDIS_REPL_NONE;
server.repl_syncio_timeout = REDIS_REPL_SYNCIO_TIMEOUT;
server.repl_serve_stale_data = 1;
server.repl_down_since = -1;
/* Double constants initialization */
R_Zero = 0.0;
@@ -934,12 +885,10 @@ void initServer() {
server.syslog_facility);
}
server.mainthread = pthread_self();
server.clients = listCreate();
server.slaves = listCreate();
server.monitors = listCreate();
server.unblocked_clients = listCreate();
server.cache_io_queue = listCreate();
createSharedObjects();
server.el = aeCreateEventLoop();
@@ -969,11 +918,6 @@ void initServer() {
server.db[j].expires = dictCreate(&keyptrDictType,NULL);
server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL);
server.db[j].watched_keys = dictCreate(&keylistDictType,NULL);
if (server.ds_enabled) {
server.db[j].io_keys = dictCreate(&keylistDictType,NULL);
server.db[j].io_negcache = dictCreate(&setDictType,NULL);
server.db[j].io_queued = dictCreate(&setDictType,NULL);
}
server.db[j].id = j;
}
server.pubsub_channels = dictCreate(&keylistDictType,NULL);
@@ -983,8 +927,6 @@ void initServer() {
server.cronloops = 0;
server.bgsavechildpid = -1;
server.bgrewritechildpid = -1;
server.bgsavethread_state = REDIS_BGSAVE_THREAD_UNACTIVE;
server.bgsavethread = (pthread_t) -1;
server.bgrewritebuf = sdsempty();
server.aofbuf = sdsempty();
server.lastsave = time(NULL);
@@ -1014,7 +956,6 @@ void initServer() {
}
}
if (server.ds_enabled) dsInit();
if (server.cluster_enabled) clusterInit();
scriptingInit();
srand(time(NULL)^getpid());
@@ -1193,8 +1134,6 @@ int processCommand(redisClient *c) {
queueMultiCommand(c,cmd);
addReply(c,shared.queued);
} else {
if (server.ds_enabled && blockClientOnSwappedKeys(c,cmd))
return REDIS_ERR;
call(c,cmd);
}
return REDIS_OK;
@@ -1212,9 +1151,7 @@ int prepareForShutdown() {
kill(server.bgsavechildpid,SIGKILL);
rdbRemoveTempFile(server.bgsavechildpid);
}
if (server.ds_enabled) {
/* FIXME: flush all objects on disk */
} else if (server.appendonly) {
if (server.appendonly) {
/* Append only file: fsync() the AOF and exit */
aof_fsync(server.appendfd);
} else if (server.saveparamslen > 0) {
@@ -1364,7 +1301,7 @@ sds genRedisInfoString(char *section) {
peak_hmem,
((long long)lua_gc(server.lua,LUA_GCCOUNT,0))*1024LL,
zmalloc_get_fragmentation_ratio(),
REDIS_MALLOC
ZMALLOC_LIB
);
}
@@ -1398,8 +1335,7 @@ sds genRedisInfoString(char *section) {
server.loading,
server.appendonly,
server.dirty,
server.bgsavechildpid != -1 ||
server.bgsavethread != (pthread_t) -1,
server.bgsavechildpid != -1,
server.lastsave,
server.bgrewritechildpid != -1);
@@ -1445,35 +1381,6 @@ sds genRedisInfoString(char *section) {
}
}
/* Diskstore */
if (allsections || defsections || !strcasecmp(section,"diskstore")) {
if (sections++) info = sdscat(info,"\r\n");
info = sdscatprintf(info,
"# Diskstore\r\n"
"ds_enabled:%d\r\n",
server.ds_enabled != 0);
if (server.ds_enabled) {
lockThreadedIO();
info = sdscatprintf(info,
"cache_max_memory:%llu\r\n"
"cache_blocked_clients:%lu\r\n"
"cache_io_queue_len:%lu\r\n"
"cache_io_jobs_new:%lu\r\n"
"cache_io_jobs_processing:%lu\r\n"
"cache_io_jobs_processed:%lu\r\n"
"cache_io_ready_clients:%lu\r\n"
,(unsigned long long) server.cache_max_memory,
(unsigned long) server.cache_blocked_clients,
(unsigned long) listLength(server.cache_io_queue),
(unsigned long) listLength(server.io_newjobs),
(unsigned long) listLength(server.io_processing),
(unsigned long) listLength(server.io_processed),
(unsigned long) listLength(server.io_ready_clients)
);
unlockThreadedIO();
}
}
/* Stats */
if (allsections || defsections || !strcasecmp(section,"stats")) {
if (sections++) info = sdscat(info,"\r\n");
@@ -1530,6 +1437,12 @@ sds genRedisInfoString(char *section) {
(int)(time(NULL)-server.repl_transfer_lastio)
);
}
if (server.replstate != REDIS_REPL_CONNECTED) {
info = sdscatprintf(info,
"master_link_down_since_seconds:%ld\r\n",
(long)time(NULL)-server.repl_down_since);
}
}
info = sdscatprintf(info,
"connected_slaves:%d\r\n",
@@ -1825,9 +1738,7 @@ int main(int argc, char **argv) {
linuxOvercommitMemoryWarning();
#endif
start = ustime();
if (server.ds_enabled) {
redisLog(REDIS_NOTICE,"DB not loaded (running with disk back end)");
} else if (server.appendonly) {
if (server.appendonly) {
if (loadAppendOnlyFile(server.appendfilename) == REDIS_OK)
redisLog(REDIS_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000);
} else {

View File

@@ -125,26 +125,12 @@
#define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
#define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
/* Scheduled IO opeations flags. */
#define REDIS_IO_LOAD 1
#define REDIS_IO_SAVE 2
#define REDIS_IO_LOADINPROG 4
#define REDIS_IO_SAVEINPROG 8
/* Generic IO flags */
#define REDIS_IO_ONLYLOADS 1
#define REDIS_IO_ASAP 2
#define REDIS_MAX_COMPLETED_JOBS_PROCESSED 1
#define REDIS_THREAD_STACK_SIZE (1024*1024*4)
/* Client flags */
#define REDIS_SLAVE 1 /* This client is a slave server */
#define REDIS_MASTER 2 /* This client is a master server */
#define REDIS_MONITOR 4 /* This client is a slave monitor, see MONITOR */
#define REDIS_MULTI 8 /* This client is in a MULTI context */
#define REDIS_BLOCKED 16 /* The client is waiting in a blocking operation */
#define REDIS_IO_WAIT 32 /* The client is waiting for Virtual Memory I/O */
#define REDIS_DIRTY_CAS 64 /* Watched keys modified. EXEC will fail. */
#define REDIS_CLOSE_AFTER_REPLY 128 /* Close after writing entire reply. */
#define REDIS_UNBLOCKED 256 /* This client was unblocked and is stored in
@@ -224,12 +210,6 @@
#define REDIS_MAXMEMORY_ALLKEYS_RANDOM 4
#define REDIS_MAXMEMORY_NO_EVICTION 5
/* Diskstore background saving thread states */
#define REDIS_BGSAVE_THREAD_UNACTIVE 0
#define REDIS_BGSAVE_THREAD_ACTIVE 1
#define REDIS_BGSAVE_THREAD_DONE_OK 2
#define REDIS_BGSAVE_THREAD_DONE_ERR 3
/* Scripting */
#define REDIS_LUA_TIME_LIMIT 60000 /* milliseconds */
@@ -297,9 +277,6 @@ typedef struct redisDb {
dict *dict; /* The keyspace for this DB */
dict *expires; /* Timeout of keys with a timeout set */
dict *blocking_keys; /* Keys with clients waiting for data (BLPOP) */
dict *io_keys; /* Keys with clients waiting for DS I/O */
dict *io_negcache; /* Negative caching for disk store */
dict *io_queued; /* Queued IO operations hash table */
dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */
int id;
} redisDb;
@@ -521,7 +498,6 @@ typedef struct {
struct redisServer {
/* General */
pthread_t mainthread;
redisDb *db;
dict *commands; /* Command table hahs table */
aeEventLoop *el;
@@ -579,9 +555,6 @@ struct redisServer {
char *pidfile;
pid_t bgsavechildpid;
pid_t bgrewritechildpid;
int bgsavethread_state;
pthread_mutex_t bgsavethread_mutex;
pthread_t bgsavethread;
sds bgrewritebuf; /* buffer taken by parent during oppend only rewrite */
sds aofbuf; /* AOF buffer, written before entering the event loop */
struct saveparam *saveparams;
@@ -609,6 +582,7 @@ struct redisServer {
char *repl_transfer_tmpfile; /* slave-> master SYNC temp file name */
time_t repl_transfer_lastio; /* unix time of the latest read, for timeout */
int repl_serve_stale_data; /* Serve stale data when link is down? */
time_t repl_down_since; /* unix time at which link with master went down */
/* Limits */
unsigned int maxclients;
unsigned long long maxmemory;
@@ -616,19 +590,12 @@ struct redisServer {
int maxmemory_samples;
/* Blocked clients */
unsigned int bpop_blocked_clients;
unsigned int cache_blocked_clients;
list *unblocked_clients; /* list of clients to unblock before next loop */
list *cache_io_queue; /* IO operations queue */
int cache_flush_delay; /* seconds to wait before flushing keys */
/* Sort parameters - qsort_r() is only available under BSD so we
* have to take this state global, in order to pass it to sortCompare() */
int sort_desc;
int sort_alpha;
int sort_bypattern;
/* Virtual memory configuration */
int ds_enabled; /* backend disk in redis.conf */
char *ds_path; /* location of the disk store on disk */
unsigned long long cache_max_memory;
/* Zip structure config */
size_t hash_max_zipmap_entries;
size_t hash_max_zipmap_value;
@@ -691,7 +658,7 @@ struct redisCommand {
int arity;
int flags;
/* Use a function to determine keys arguments in a command line.
* Used both for diskstore preloading and Redis Cluster. */
* Used for Redis Cluster redirect. */
redisGetKeysProc *getkeys_proc;
/* What keys should be loaded in background when calling this command? */
int firstkey; /* The first argument that's a key (0 = no keys) */
@@ -718,27 +685,6 @@ typedef struct _redisSortOperation {
robj *pattern;
} redisSortOperation;
/* DIsk store threaded I/O request message */
#define REDIS_IOJOB_LOAD 0
#define REDIS_IOJOB_SAVE 1
typedef struct iojob {
int type; /* Request type, REDIS_IOJOB_* */
redisDb *db;/* Redis database */
robj *key; /* This I/O request is about this key */
robj *val; /* the value to swap for REDIS_IOJOB_SAVE, otherwise this
* field is populated by the I/O thread for REDIS_IOJOB_LOAD. */
time_t expire; /* Expire time for this key on REDIS_IOJOB_LOAD */
} iojob;
/* IO operations scheduled -- check dscache.c for more info */
typedef struct ioop {
int type;
redisDb *db;
robj *key;
time_t ctime; /* This is the creation time of the entry. */
} ioop;
/* Structure to hold list iteration abstraction. */
typedef struct {
robj *subject;
@@ -828,6 +774,7 @@ void addReplyMultiBulkLen(redisClient *c, long length);
void *dupClientReplyValue(void *o);
void getClientsMaxBuffers(unsigned long *longest_output_list,
unsigned long *biggest_input_buffer);
void rewriteClientCommandVector(redisClient *c, int argc, ...);
#ifdef __GNUC__
void addReplyErrorFormat(redisClient *c, const char *fmt, ...)
@@ -982,40 +929,6 @@ void oom(const char *msg);
void populateCommandTable(void);
void resetCommandTableStats(void);
/* Disk store */
int dsOpen(void);
int dsClose(void);
int dsSet(redisDb *db, robj *key, robj *val, time_t expire);
robj *dsGet(redisDb *db, robj *key, time_t *expire);
int dsDel(redisDb *db, robj *key);
int dsExists(redisDb *db, robj *key);
void dsFlushDb(int dbid);
int dsRdbSaveBackground(char *filename);
int dsRdbSave(char *filename);
/* Disk Store Cache */
void dsInit(void);
void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata, int mask);
void lockThreadedIO(void);
void unlockThreadedIO(void);
void freeIOJob(iojob *j);
void queueIOJob(iojob *j);
void waitEmptyIOJobsQueue(void);
void processAllPendingIOJobs(void);
int blockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd);
int dontWaitForSwappedKey(redisClient *c, robj *key);
void handleClientsBlockedOnSwappedKey(redisDb *db, robj *key);
int cacheFreeOneEntry(void);
void cacheScheduleIOAddFlag(redisDb *db, robj *key, long flag);
void cacheScheduleIODelFlag(redisDb *db, robj *key, long flag);
int cacheScheduleIOGetFlags(redisDb *db, robj *key);
void cacheScheduleIO(redisDb *db, robj *key, int type);
void cacheCron(void);
int cacheKeyMayExist(redisDb *db, robj *key);
void cacheSetKeyMayExist(redisDb *db, robj *key);
void cacheSetKeyDoesNotExist(redisDb *db, robj *key);
void cacheForcePointInTime(void);
/* Set data type */
robj *setTypeCreate(robj *value);
int setTypeAdd(robj *subject, robj *value);
@@ -1068,8 +981,9 @@ robj *lookupKeyRead(redisDb *db, robj *key);
robj *lookupKeyWrite(redisDb *db, robj *key);
robj *lookupKeyReadOrReply(redisClient *c, robj *key, robj *reply);
robj *lookupKeyWriteOrReply(redisClient *c, robj *key, robj *reply);
int dbAdd(redisDb *db, robj *key, robj *val);
int dbReplace(redisDb *db, robj *key, robj *val);
void dbAdd(redisDb *db, robj *key, robj *val);
void dbOverwrite(redisDb *db, robj *key, robj *val);
void setKey(redisDb *db, robj *key, robj *val);
int dbExists(redisDb *db, robj *key);
robj *dbRandomKey(redisDb *db);
int dbDelete(redisDb *db, robj *key);

View File

@@ -366,12 +366,12 @@ void sortCommand(redisClient *c) {
}
}
}
dbReplace(c->db,storekey,sobj);
setKey(c->db,storekey,sobj);
decrRefCount(sobj);
/* Note: we add 1 because the DB is dirty anyway since even if the
* SORT result is empty a new key is set and maybe the old content
* replaced. */
server.dirty += 1+outputlen;
signalModifiedKey(c->db,storekey);
addReplyLongLong(c,outputlen);
}

View File

@@ -640,7 +640,9 @@ void lremCommand(redisClient *c) {
* as well. This command was originally proposed by Ezra Zygmuntowicz.
*/
void rpoplpushHandlePush(redisClient *c, robj *dstkey, robj *dstobj, robj *value) {
void rpoplpushHandlePush(redisClient *origclient, redisClient *c, robj *dstkey, robj *dstobj, robj *value) {
robj *aux;
if (!handleClientsWaitingListPush(c,dstkey,value)) {
/* Create the list if the key does not exist */
if (!dstobj) {
@@ -648,9 +650,25 @@ void rpoplpushHandlePush(redisClient *c, robj *dstkey, robj *dstobj, robj *value
dbAdd(c->db,dstkey,dstobj);
} else {
signalModifiedKey(c->db,dstkey);
server.dirty++;
}
listTypePush(dstobj,value,REDIS_HEAD);
/* If we are pushing as a result of LPUSH against a key
* watched by BLPOPLPUSH, we need to rewrite the command vector.
* But if this is called directly by RPOPLPUSH (either directly
* or via a BRPOPLPUSH where the popped list exists)
* we should replicate the BRPOPLPUSH command itself. */
if (c != origclient) {
aux = createStringObject("LPUSH",5);
rewriteClientCommandVector(origclient,3,aux,dstkey,value);
decrRefCount(aux);
} else {
/* Make sure to always use RPOPLPUSH in the replication / AOF,
* even if the original command was BRPOPLPUSH. */
aux = createStringObject("RPOPLPUSH",9);
rewriteClientCommandVector(origclient,3,aux,c->argv[1],c->argv[2]);
decrRefCount(aux);
}
server.dirty++;
}
/* Always send the pushed value to the client. */
@@ -666,16 +684,22 @@ void rpoplpushCommand(redisClient *c) {
addReply(c,shared.nullbulk);
} else {
robj *dobj = lookupKeyWrite(c->db,c->argv[2]);
robj *touchedkey = c->argv[1];
if (dobj && checkType(c,dobj,REDIS_LIST)) return;
value = listTypePop(sobj,REDIS_TAIL);
rpoplpushHandlePush(c,c->argv[2],dobj,value);
/* We saved touched key, and protect it, since rpoplpushHandlePush
* may change the client command argument vector. */
incrRefCount(touchedkey);
rpoplpushHandlePush(c,c,c->argv[2],dobj,value);
/* listTypePop returns an object with its refcount incremented */
decrRefCount(value);
/* Delete the source list when it is empty */
if (listTypeLength(sobj) == 0) dbDelete(c->db,c->argv[1]);
signalModifiedKey(c->db,c->argv[1]);
if (listTypeLength(sobj) == 0) dbDelete(c->db,touchedkey);
signalModifiedKey(c->db,touchedkey);
decrRefCount(touchedkey);
server.dirty++;
}
}
@@ -777,6 +801,7 @@ void unblockClientWaitingData(redisClient *c) {
/* Cleanup the client structure */
zfree(c->bpop.keys);
c->bpop.keys = NULL;
if (c->bpop.target) decrRefCount(c->bpop.target);
c->bpop.target = NULL;
c->flags &= ~REDIS_BLOCKED;
c->flags |= REDIS_UNBLOCKED;
@@ -820,6 +845,10 @@ int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) {
receiver = ln->value;
dstkey = receiver->bpop.target;
/* Protect receiver->bpop.target, that will be freed by
* the next unblockClientWaitingData() call. */
if (dstkey) incrRefCount(dstkey);
/* This should remove the first element of the "clients" list. */
unblockClientWaitingData(receiver);
@@ -828,17 +857,16 @@ int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) {
addReplyMultiBulkLen(receiver,2);
addReplyBulk(receiver,key);
addReplyBulk(receiver,ele);
return 1;
return 1; /* Serve just the first client as in B[RL]POP semantics */
} else {
/* BRPOPLPUSH, note that receiver->db is always equal to c->db. */
dstobj = lookupKeyWrite(receiver->db,dstkey);
if (dstobj && checkType(receiver,dstobj,REDIS_LIST)) {
decrRefCount(dstkey);
} else {
rpoplpushHandlePush(receiver,dstkey,dstobj,ele);
if (!(dstobj && checkType(receiver,dstobj,REDIS_LIST))) {
rpoplpushHandlePush(c,receiver,dstkey,dstobj,ele);
decrRefCount(dstkey);
return 1;
}
decrRefCount(dstkey);
}
}

View File

@@ -332,7 +332,7 @@ void scardCommand(redisClient *c) {
}
void spopCommand(redisClient *c) {
robj *set, *ele;
robj *set, *ele, *aux;
int64_t llele;
int encoding;
@@ -348,16 +348,11 @@ void spopCommand(redisClient *c) {
setTypeRemove(set,ele);
}
/* Change argv to replicate as SREM */
c->argc = 3;
c->argv = zrealloc(c->argv,sizeof(robj*)*(c->argc));
/* Overwrite SREM with SPOP (same length) */
redisAssert(sdslen(c->argv[0]->ptr) == 4);
memcpy(c->argv[0]->ptr, "SREM", 4);
/* Popped element already has incremented refcount */
c->argv[2] = ele;
/* Replicate/AOF this command as an SREM operation */
aux = createStringObject("SREM",4);
rewriteClientCommandVector(c,3,aux,c->argv[1],ele);
decrRefCount(ele);
decrRefCount(aux);
addReplyBulk(c,ele);
if (setTypeSize(set) == 0) dbDelete(c->db,c->argv[1]);

View File

@@ -13,7 +13,6 @@ static int checkStringLength(redisClient *c, long long size) {
}
void setGenericCommand(redisClient *c, int nx, robj *key, robj *val, robj *expire) {
int retval;
long seconds = 0; /* initialized to avoid an harmness warning */
if (expire) {
@@ -25,21 +24,12 @@ void setGenericCommand(redisClient *c, int nx, robj *key, robj *val, robj *expir
}
}
retval = dbAdd(c->db,key,val);
if (retval == REDIS_ERR) {
if (!nx) {
dbReplace(c->db,key,val);
incrRefCount(val);
} else {
addReply(c,shared.czero);
return;
}
} else {
incrRefCount(val);
if (lookupKeyWrite(c->db,key) != NULL && nx) {
addReply(c,shared.czero);
return;
}
signalModifiedKey(c->db,key);
setKey(c->db,key,val);
server.dirty++;
removeExpire(c->db,key);
if (expire) setExpire(c->db,key,time(NULL)+seconds);
addReply(c, nx ? shared.cone : shared.ok);
}
@@ -81,11 +71,8 @@ void getCommand(redisClient *c) {
void getsetCommand(redisClient *c) {
if (getGenericCommand(c) == REDIS_ERR) return;
c->argv[2] = tryObjectEncoding(c->argv[2]);
dbReplace(c->db,c->argv[1],c->argv[2]);
incrRefCount(c->argv[2]);
signalModifiedKey(c->db,c->argv[1]);
setKey(c->db,c->argv[1],c->argv[2]);
server.dirty++;
removeExpire(c->db,c->argv[1]);
}
static int getBitOffsetFromArgument(redisClient *c, robj *o, size_t *offset) {
@@ -138,7 +125,7 @@ void setbitCommand(redisClient *c) {
robj *decoded = getDecodedObject(o);
o = createStringObject(decoded->ptr, sdslen(decoded->ptr));
decrRefCount(decoded);
dbReplace(c->db,c->argv[1],o);
dbOverwrite(c->db,c->argv[1],o);
}
}
@@ -236,7 +223,7 @@ void setrangeCommand(redisClient *c) {
robj *decoded = getDecodedObject(o);
o = createStringObject(decoded->ptr, sdslen(decoded->ptr));
decrRefCount(decoded);
dbReplace(c->db,c->argv[1],o);
dbOverwrite(c->db,c->argv[1],o);
}
}
@@ -319,18 +306,15 @@ void msetGenericCommand(redisClient *c, int nx) {
busykeys++;
}
}
}
if (busykeys) {
addReply(c, shared.czero);
return;
if (busykeys) {
addReply(c, shared.czero);
return;
}
}
for (j = 1; j < c->argc; j += 2) {
c->argv[j+1] = tryObjectEncoding(c->argv[j+1]);
dbReplace(c->db,c->argv[j],c->argv[j+1]);
incrRefCount(c->argv[j+1]);
removeExpire(c->db,c->argv[j]);
signalModifiedKey(c->db,c->argv[j]);
setKey(c->db,c->argv[j],c->argv[j+1]);
}
server.dirty += (c->argc-1)/2;
addReply(c, nx ? shared.cone : shared.ok);
@@ -346,7 +330,7 @@ void msetnxCommand(redisClient *c) {
void incrDecrCommand(redisClient *c, long long incr) {
long long value, oldvalue;
robj *o;
robj *o, *new;
o = lookupKeyWrite(c->db,c->argv[1]);
if (o != NULL && checkType(c,o,REDIS_STRING)) return;
@@ -358,12 +342,15 @@ void incrDecrCommand(redisClient *c, long long incr) {
addReplyError(c,"increment or decrement would overflow");
return;
}
o = createStringObjectFromLongLong(value);
dbReplace(c->db,c->argv[1],o);
new = createStringObjectFromLongLong(value);
if (o)
dbOverwrite(c->db,c->argv[1],new);
else
dbAdd(c->db,c->argv[1],new);
signalModifiedKey(c->db,c->argv[1]);
server.dirty++;
addReply(c,shared.colon);
addReply(c,o);
addReply(c,new);
addReply(c,shared.crlf);
}
@@ -416,7 +403,7 @@ void appendCommand(redisClient *c) {
robj *decoded = getDecodedObject(o);
o = createStringObject(decoded->ptr, sdslen(decoded->ptr));
decrRefCount(decoded);
dbReplace(c->db,c->argv[1],o);
dbOverwrite(c->db,c->argv[1],o);
}
/* Append the value */

View File

@@ -51,6 +51,11 @@
#define calloc(count,size) tc_calloc(count,size)
#define realloc(ptr,size) tc_realloc(ptr,size)
#define free(ptr) tc_free(ptr)
#elif defined(USE_JEMALLOC)
#define malloc(size) je_malloc(size)
#define calloc(count,size) je_calloc(count,size)
#define realloc(ptr,size) je_realloc(ptr,size)
#define free(ptr) je_free(ptr)
#endif
#define update_zmalloc_stat_alloc(__n,__size) do { \
@@ -98,7 +103,7 @@ void *zmalloc(size_t size) {
if (!ptr) zmalloc_oom(size);
#ifdef HAVE_MALLOC_SIZE
update_zmalloc_stat_alloc(redis_malloc_size(ptr),size);
update_zmalloc_stat_alloc(zmalloc_size(ptr),size);
return ptr;
#else
*((size_t*)ptr) = size;
@@ -112,7 +117,7 @@ void *zcalloc(size_t size) {
if (!ptr) zmalloc_oom(size);
#ifdef HAVE_MALLOC_SIZE
update_zmalloc_stat_alloc(redis_malloc_size(ptr),size);
update_zmalloc_stat_alloc(zmalloc_size(ptr),size);
return ptr;
#else
*((size_t*)ptr) = size;
@@ -130,12 +135,12 @@ void *zrealloc(void *ptr, size_t size) {
if (ptr == NULL) return zmalloc(size);
#ifdef HAVE_MALLOC_SIZE
oldsize = redis_malloc_size(ptr);
oldsize = zmalloc_size(ptr);
newptr = realloc(ptr,size);
if (!newptr) zmalloc_oom(size);
update_zmalloc_stat_free(oldsize);
update_zmalloc_stat_alloc(redis_malloc_size(newptr),size);
update_zmalloc_stat_alloc(zmalloc_size(newptr),size);
return newptr;
#else
realptr = (char*)ptr-PREFIX_SIZE;
@@ -158,7 +163,7 @@ void zfree(void *ptr) {
if (ptr == NULL) return;
#ifdef HAVE_MALLOC_SIZE
update_zmalloc_stat_free(redis_malloc_size(ptr));
update_zmalloc_stat_free(zmalloc_size(ptr));
free(ptr);
#else
realptr = (char*)ptr-PREFIX_SIZE;

View File

@@ -28,8 +28,43 @@
* POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef _ZMALLOC_H
#define _ZMALLOC_H
#ifndef __ZMALLOC_H
#define __ZMALLOC_H
/* Double expansion needed for stringification of macro values. */
#define __xstr(s) __str(s)
#define __str(s) #s
#if defined(USE_TCMALLOC)
#define ZMALLOC_LIB ("tcmalloc-" __xstr(TC_VERSION_MAJOR) "." __xstr(TC_VERSION_MINOR))
#include <google/tcmalloc.h>
#if TC_VERSION_MAJOR >= 1 && TC_VERSION_MINOR >= 6
#define HAVE_MALLOC_SIZE 1
#define zmalloc_size(p) tc_malloc_size(p)
#else
#error "Newer version of tcmalloc required"
#endif
#elif defined(USE_JEMALLOC)
#define ZMALLOC_LIB ("jemalloc-" __xstr(JEMALLOC_VERSION_MAJOR) "." __xstr(JEMALLOC_VERSION_MINOR) "." __xstr(JEMALLOC_VERSION_BUGFIX))
#define JEMALLOC_MANGLE
#include <jemalloc/jemalloc.h>
#if JEMALLOC_VERSION_MAJOR >= 2 && JEMALLOC_VERSION_MINOR >= 1
#define HAVE_MALLOC_SIZE 1
#define zmalloc_size(p) JEMALLOC_P(malloc_usable_size)(p)
#else
#error "Newer version of jemalloc required"
#endif
#elif defined(__APPLE__)
#include <malloc/malloc.h>
#define HAVE_MALLOC_SIZE 1
#define zmalloc_size(p) malloc_size(p)
#endif
#ifndef ZMALLOC_LIB
#define ZMALLOC_LIB "libc"
#endif
void *zmalloc(size_t size);
void *zcalloc(size_t size);
@@ -44,4 +79,4 @@ size_t zmalloc_allocations_for_size(size_t size);
#define ZMALLOC_MAX_ALLOC_STAT 256
#endif /* _ZMALLOC_H */
#endif /* __ZMALLOC_H */