diff --git a/.github/workflows/redis_docs_sync.yaml b/.github/workflows/redis_docs_sync.yaml new file mode 100644 index 00000000000..2d6ef96cfad --- /dev/null +++ b/.github/workflows/redis_docs_sync.yaml @@ -0,0 +1,24 @@ +name: redis_docs_sync + +on: + release: + types: [published] + +jobs: + redis_docs_sync: + if: github.repository == 'redis/redis' + runs-on: ubuntu-latest + steps: + - name: Generate a token + id: generate-token + uses: actions/create-github-app-token@v1 + with: + app-id: ${{ secrets.DOCS_APP_ID }} + private-key: ${{ secrets.DOCS_APP_PRIVATE_KEY }} + + - name: Invoke workflow on redis/docs + env: + GH_TOKEN: ${{ steps.generate-token.outputs.token }} + RELEASE_NAME: ${{ github.event.release.tag_name }} + run: | + gh workflow run -R redis/docs redis_docs_sync.yaml -f release="${RELEASE_NAME}" \ No newline at end of file diff --git a/Makefile b/Makefile index 50ffec347c4..13a115b22ad 100644 --- a/Makefile +++ b/Makefile @@ -1,11 +1,16 @@ -# Top level makefile, the real stuff is at src/Makefile +# Top level makefile, the real stuff is at ./src/Makefile and in ./modules/Makefile + +SUBDIRS = src +ifeq ($(BUILD_WITH_MODULES), yes) + SUBDIRS += modules +endif default: all .DEFAULT: - cd src && $(MAKE) $@ + for dir in $(SUBDIRS); do $(MAKE) -C $$dir $@; done install: - cd src && $(MAKE) $@ + for dir in $(SUBDIRS); do $(MAKE) -C $$dir $@; done .PHONY: install diff --git a/README.md b/README.md index 52afab6fa54..4d6d7adb106 100644 --- a/README.md +++ b/README.md @@ -16,7 +16,7 @@ Another good example is to think of Redis as a more complex version of memcached If you want to know more, this is a list of selected starting points: * Introduction to Redis data types. https://redis.io/docs/latest/develop/data-types/ -* Try Redis directly inside your browser. https://try.redis.io + * The full list of Redis commands. https://redis.io/commands * There is much more inside the official Redis documentation. https://redis.io/documentation diff --git a/modules/Makefile b/modules/Makefile new file mode 100644 index 00000000000..af4dfa4ab37 --- /dev/null +++ b/modules/Makefile @@ -0,0 +1,72 @@ + +SUBDIRS = redisjson redistimeseries redisbloom redisearch + +define submake + for dir in $(SUBDIRS); do $(MAKE) -C $$dir $(1); done +endef + +all: prepare_source + $(call submake,$@) + +get_source: + $(call submake,$@) + +prepare_source: get_source handle-werrors setup_environment + +clean: + $(call submake,$@) + +distclean: clean_environment + $(call submake,$@) + +pristine: + $(call submake,$@) + +install: + $(call submake,$@) + +setup_environment: install-rust handle-werrors + +clean_environment: uninstall-rust + +# Keep all of the Rust stuff in one place +install-rust: +ifeq ($(INSTALL_RUST_TOOLCHAIN),yes) + @RUST_VERSION=1.80.1; \ + case "$$(uname -m)" in \ + 'x86_64') RUST_INSTALLER="rust-$${RUST_VERSION}-x86_64-unknown-linux-gnu"; RUST_SHA256="85e936d5d36970afb80756fa122edcc99bd72a88155f6bdd514f5d27e778e00a" ;; \ + 'aarch64') RUST_INSTALLER="rust-$${RUST_VERSION}-aarch64-unknown-linux-gnu"; RUST_SHA256="2e89bad7857711a1c11d017ea28fbfeec54076317763901194f8f5decbac1850" ;; \ + *) echo >&2 "Unsupported architecture: '$$(uname -m)'"; exit 1 ;; \ + esac; \ + wget --quiet -O $${RUST_INSTALLER}.tar.xz https://static.rust-lang.org/dist/$${RUST_INSTALLER}.tar.xz; \ + echo "$${RUST_SHA256} $${RUST_INSTALLER}.tar.xz" | sha256sum -c --quiet || { echo "Rust standalone installer checksum failed!"; exit 1; }; \ + tar -xf $${RUST_INSTALLER}.tar.xz; \ + (cd $${RUST_INSTALLER} && ./install.sh); \ + rm -rf $${RUST_INSTALLER} +endif + +uninstall-rust: +ifeq ($(INSTALL_RUST_TOOLCHAIN),yes) + @if [ -x "/usr/local/lib/rustlib/uninstall.sh" ]; then \ + echo "Uninstalling Rust using uninstall.sh script"; \ + rm -rf ~/.cargo; \ + /usr/local/lib/rustlib/uninstall.sh; \ + else \ + echo "WARNING: Rust toolchain not found or uninstall script is missing."; \ + fi +endif + +handle-werrors: get_source +ifeq ($(DISABLE_WERRORS),yes) + @echo "Disabling -Werror for all modules" + @for dir in $(SUBDIRS); do \ + echo "Processing $$dir"; \ + find $$dir/src -type f \ + \( -name "Makefile" \ + -o -name "*.mk" \ + -o -name "CMakeLists.txt" \) \ + -exec sed -i 's/-Werror//g' {} +; \ + done +endif + +.PHONY: all clean distclean install $(SUBDIRS) setup_environment clean_environment install-rust uninstall-rust handle-werrors diff --git a/modules/common.mk b/modules/common.mk new file mode 100644 index 00000000000..bf705df8d62 --- /dev/null +++ b/modules/common.mk @@ -0,0 +1,49 @@ +PREFIX ?= /usr/local +INSTALL_DIR ?= $(DESTDIR)$(PREFIX)/lib/redis/modules +INSTALL ?= install + +# This logic *partially* follows the current module build system. It is a bit awkward and +# should be changed if/when the modules' build process is refactored. + +ARCH_MAP_x86_64 := x64 +ARCH_MAP_i386 := x86 +ARCH_MAP_i686 := x86 +ARCH_MAP_aarch64 := arm64v8 +ARCH_MAP_arm64 := arm64v8 + +OS := $(shell uname -s | tr '[:upper:]' '[:lower:]') +ARCH := $(ARCH_MAP_$(shell uname -m)) +ifeq ($(ARCH),) + $(error Unrecognized CPU architecture $(shell uname -m)) +endif + +FULL_VARIANT := $(OS)-$(ARCH)-release + +# Common rules for all modules, based on per-module configuration + +all: $(TARGET_MODULE) + +$(TARGET_MODULE): get_source + $(MAKE) -C $(SRC_DIR) + +get_source: $(SRC_DIR)/.prepared + +$(SRC_DIR)/.prepared: + mkdir -p $(SRC_DIR) + git clone --recursive --depth 1 --branch $(MODULE_VERSION) $(MODULE_REPO) $(SRC_DIR) + touch $@ + +clean: + -$(MAKE) -C $(SRC_DIR) clean + +distclean: + -$(MAKE) -C $(SRC_DIR) distclean + +pristine: + -rm -rf $(SRC_DIR) + +install: $(TARGET_MODULE) + mkdir -p $(INSTALL_DIR) + $(INSTALL) -m 0755 -D $(TARGET_MODULE) $(INSTALL_DIR) + +.PHONY: all clean distclean pristine install diff --git a/modules/redisbloom/Makefile b/modules/redisbloom/Makefile new file mode 100644 index 00000000000..d8e01be1c6d --- /dev/null +++ b/modules/redisbloom/Makefile @@ -0,0 +1,6 @@ +SRC_DIR = src +MODULE_VERSION = v9.99.99 +MODULE_REPO = https://github.com/redisbloom/redisbloom +TARGET_MODULE = $(SRC_DIR)/bin/$(FULL_VARIANT)/redisbloom.so + +include ../common.mk diff --git a/modules/redisearch/Makefile b/modules/redisearch/Makefile new file mode 100644 index 00000000000..fcc43dfebe1 --- /dev/null +++ b/modules/redisearch/Makefile @@ -0,0 +1,7 @@ +SRC_DIR = src +MODULE_VERSION = v9.99.99 +MODULE_REPO = https://github.com/redisearch/redisearch +TARGET_MODULE = $(SRC_DIR)/bin/$(FULL_VARIANT)/coord-oss/redisearch.so + +include ../common.mk + diff --git a/modules/redisjson/Makefile b/modules/redisjson/Makefile new file mode 100644 index 00000000000..9d98c3a6b18 --- /dev/null +++ b/modules/redisjson/Makefile @@ -0,0 +1,11 @@ +SRC_DIR = src +MODULE_VERSION = v9.99.99 +MODULE_REPO = https://github.com/redisjson/redisjson +TARGET_MODULE = $(SRC_DIR)/bin/$(FULL_VARIANT)/rejson.so + +include ../common.mk + +$(SRC_DIR)/.cargo_fetched: + cd $(SRC_DIR) && cargo fetch + +get_source: $(SRC_DIR)/.cargo_fetched diff --git a/modules/redistimeseries/Makefile b/modules/redistimeseries/Makefile new file mode 100644 index 00000000000..0fcfc3284f7 --- /dev/null +++ b/modules/redistimeseries/Makefile @@ -0,0 +1,6 @@ +SRC_DIR = src +MODULE_VERSION = v9.99.99 +MODULE_REPO = https://github.com/redistimeseries/redistimeseries +TARGET_MODULE = $(SRC_DIR)/bin/$(FULL_VARIANT)/redistimeseries.so + +include ../common.mk diff --git a/src/acl.c b/src/acl.c index c2cca0f3fdd..6d698427fab 100644 --- a/src/acl.c +++ b/src/acl.c @@ -2762,7 +2762,6 @@ void aclCatWithFlags(client *c, dict *commands, uint64_t cflag, int *arraylen) { while ((de = dictNext(di)) != NULL) { struct redisCommand *cmd = dictGetVal(de); - if (cmd->flags & CMD_MODULE) continue; if (cmd->acl_categories & cflag) { addReplyBulkCBuffer(c, cmd->fullname, sdslen(cmd->fullname)); (*arraylen)++; diff --git a/src/aof.c b/src/aof.c index ec631c0e214..8ccd8d8f887 100644 --- a/src/aof.c +++ b/src/aof.c @@ -997,6 +997,29 @@ int startAppendOnly(void) { return C_OK; } +void startAppendOnlyWithRetry(void) { + unsigned int tries, max_tries = 10; + for (tries = 0; tries < max_tries; ++tries) { + if (startAppendOnly() == C_OK) + break; + serverLog(LL_WARNING, "Failed to enable AOF! Trying it again in one second."); + sleep(1); + } + if (tries == max_tries) { + serverLog(LL_WARNING, "FATAL: AOF can't be turned on. Exiting now."); + exit(1); + } +} + +/* Called after "appendonly" config is changed. */ +void applyAppendOnlyConfig(void) { + if (!server.aof_enabled && server.aof_state != AOF_OFF) { + stopAppendOnly(); + } else if (server.aof_enabled && server.aof_state == AOF_OFF) { + startAppendOnlyWithRetry(); + } +} + /* This is a wrapper to the write syscall in order to retry on short writes * or if the syscall gets interrupted. It could look strange that we retry * on short writes given that we are writing to a block device: normally if diff --git a/src/commands.def b/src/commands.def index f8ccdf07135..ef42fb8da70 100644 --- a/src/commands.def +++ b/src/commands.def @@ -11145,7 +11145,7 @@ struct COMMAND_STRUCT redisCommandTable[] = { {MAKE_CMD("sintercard","Returns the number of members of the intersect of multiple sets.","O(N*M) worst case where N is the cardinality of the smallest set and M is the number of sets.","7.0.0",CMD_DOC_NONE,NULL,NULL,"set",COMMAND_GROUP_SET,SINTERCARD_History,0,SINTERCARD_Tips,0,sinterCardCommand,-3,CMD_READONLY,ACL_CATEGORY_SET,SINTERCARD_Keyspecs,1,sintercardGetKeys,3),.args=SINTERCARD_Args}, {MAKE_CMD("sinterstore","Stores the intersect of multiple sets in a key.","O(N*M) worst case where N is the cardinality of the smallest set and M is the number of sets.","1.0.0",CMD_DOC_NONE,NULL,NULL,"set",COMMAND_GROUP_SET,SINTERSTORE_History,0,SINTERSTORE_Tips,0,sinterstoreCommand,-3,CMD_WRITE|CMD_DENYOOM,ACL_CATEGORY_SET,SINTERSTORE_Keyspecs,2,NULL,2),.args=SINTERSTORE_Args}, {MAKE_CMD("sismember","Determines whether a member belongs to a set.","O(1)","1.0.0",CMD_DOC_NONE,NULL,NULL,"set",COMMAND_GROUP_SET,SISMEMBER_History,0,SISMEMBER_Tips,0,sismemberCommand,3,CMD_READONLY|CMD_FAST,ACL_CATEGORY_SET,SISMEMBER_Keyspecs,1,NULL,2),.args=SISMEMBER_Args}, -{MAKE_CMD("smembers","Returns all members of a set.","O(N) where N is the set cardinality.","1.0.0",CMD_DOC_NONE,NULL,NULL,"set",COMMAND_GROUP_SET,SMEMBERS_History,0,SMEMBERS_Tips,1,sinterCommand,2,CMD_READONLY,ACL_CATEGORY_SET,SMEMBERS_Keyspecs,1,NULL,1),.args=SMEMBERS_Args}, +{MAKE_CMD("smembers","Returns all members of a set.","O(N) where N is the set cardinality.","1.0.0",CMD_DOC_NONE,NULL,NULL,"set",COMMAND_GROUP_SET,SMEMBERS_History,0,SMEMBERS_Tips,1,smembersCommand,2,CMD_READONLY,ACL_CATEGORY_SET,SMEMBERS_Keyspecs,1,NULL,1),.args=SMEMBERS_Args}, {MAKE_CMD("smismember","Determines whether multiple members belong to a set.","O(N) where N is the number of elements being checked for membership","6.2.0",CMD_DOC_NONE,NULL,NULL,"set",COMMAND_GROUP_SET,SMISMEMBER_History,0,SMISMEMBER_Tips,0,smismemberCommand,-3,CMD_READONLY|CMD_FAST,ACL_CATEGORY_SET,SMISMEMBER_Keyspecs,1,NULL,2),.args=SMISMEMBER_Args}, {MAKE_CMD("smove","Moves a member from one set to another.","O(1)","1.0.0",CMD_DOC_NONE,NULL,NULL,"set",COMMAND_GROUP_SET,SMOVE_History,0,SMOVE_Tips,0,smoveCommand,4,CMD_WRITE|CMD_FAST,ACL_CATEGORY_SET,SMOVE_Keyspecs,2,NULL,3),.args=SMOVE_Args}, {MAKE_CMD("spop","Returns one or more random members from a set after removing them. Deletes the set if the last member was popped.","Without the count argument O(1), otherwise O(N) where N is the value of the passed count.","1.0.0",CMD_DOC_NONE,NULL,NULL,"set",COMMAND_GROUP_SET,SPOP_History,1,SPOP_Tips,1,spopCommand,-2,CMD_WRITE|CMD_FAST,ACL_CATEGORY_SET,SPOP_Keyspecs,1,NULL,2),.args=SPOP_Args}, diff --git a/src/commands/smembers.json b/src/commands/smembers.json index c5114089b91..8878c18c12c 100644 --- a/src/commands/smembers.json +++ b/src/commands/smembers.json @@ -5,7 +5,7 @@ "group": "set", "since": "1.0.0", "arity": 2, - "function": "sinterCommand", + "function": "smembersCommand", "command_flags": [ "READONLY" ], diff --git a/src/config.c b/src/config.c index bfbc8fd335f..720311630ba 100644 --- a/src/config.c +++ b/src/config.c @@ -2502,6 +2502,13 @@ static int updateWatchdogPeriod(const char **err) { } static int updateAppendonly(const char **err) { + /* If loading flag is set, AOF might have been stopped temporarily, and it + * will be restarted depending on server.aof_enabled flag after loading is + * completed. So, we just need to update 'server.aof_enabled' which has been + * updated already before calling this function. */ + if (server.loading) + return 1; + if (!server.aof_enabled && server.aof_state != AOF_OFF) { stopAppendOnly(); } else if (server.aof_enabled && server.aof_state == AOF_OFF) { @@ -3080,7 +3087,7 @@ standardConfig static_configs[] = { createBoolConfig("activedefrag", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG, server.active_defrag_enabled, 0, isValidActiveDefrag, NULL), createBoolConfig("syslog-enabled", NULL, IMMUTABLE_CONFIG, server.syslog_enabled, 0, NULL, NULL), createBoolConfig("cluster-enabled", NULL, IMMUTABLE_CONFIG, server.cluster_enabled, 0, NULL, NULL), - createBoolConfig("appendonly", NULL, MODIFIABLE_CONFIG | DENY_LOADING_CONFIG, server.aof_enabled, 0, NULL, updateAppendonly), + createBoolConfig("appendonly", NULL, MODIFIABLE_CONFIG, server.aof_enabled, 0, NULL, updateAppendonly), createBoolConfig("cluster-allow-reads-when-down", NULL, MODIFIABLE_CONFIG, server.cluster_allow_reads_when_down, 0, NULL, NULL), createBoolConfig("cluster-allow-pubsubshard-when-down", NULL, MODIFIABLE_CONFIG, server.cluster_allow_pubsubshard_when_down, 1, NULL, NULL), createBoolConfig("crash-log-enabled", NULL, MODIFIABLE_CONFIG, server.crashlog_enabled, 1, NULL, updateSighandlerEnabled), diff --git a/src/db.c b/src/db.c index e9d00543fd7..a09fce790b1 100644 --- a/src/db.c +++ b/src/db.c @@ -49,6 +49,9 @@ void updateLFU(robj *val) { * found in the specified DB. This function implements the functionality of * lookupKeyRead(), lookupKeyWrite() and their ...WithFlags() variants. * + * 'deref' is an optional output dictEntry reference argument, to get the + * associated dictEntry* of the key in case the key is found. + * * Side-effects of calling this function: * * 1. A key gets expired if it reached it's TTL. @@ -72,7 +75,7 @@ void updateLFU(robj *val) { * Even if the key expiry is master-driven, we can correctly report a key is * expired on replicas even if the master is lagging expiring our key via DELs * in the replication link. */ -robj *lookupKey(redisDb *db, robj *key, int flags) { +robj *lookupKey(redisDb *db, robj *key, int flags, dictEntry **deref) { dictEntry *de = dbFind(db, key->ptr); robj *val = NULL; if (de) { @@ -123,6 +126,7 @@ robj *lookupKey(redisDb *db, robj *key, int flags) { /* TODO: Use separate misses stats and notify event for WRITE */ } + if (val && deref) *deref = de; return val; } @@ -137,7 +141,7 @@ robj *lookupKey(redisDb *db, robj *key, int flags) { * the key. */ robj *lookupKeyReadWithFlags(redisDb *db, robj *key, int flags) { serverAssert(!(flags & LOOKUP_WRITE)); - return lookupKey(db, key, flags); + return lookupKey(db, key, flags, NULL); } /* Like lookupKeyReadWithFlags(), but does not use any flag, which is the @@ -153,13 +157,20 @@ robj *lookupKeyRead(redisDb *db, robj *key) { * Returns the linked value object if the key exists or NULL if the key * does not exist in the specified DB. */ robj *lookupKeyWriteWithFlags(redisDb *db, robj *key, int flags) { - return lookupKey(db, key, flags | LOOKUP_WRITE); + return lookupKey(db, key, flags | LOOKUP_WRITE, NULL); } robj *lookupKeyWrite(redisDb *db, robj *key) { return lookupKeyWriteWithFlags(db, key, LOOKUP_NONE); } +/* Like lookupKeyWrite(), but accepts an optional dictEntry input, + * which can be used if we already have one, thus saving the dbFind call. + */ +robj *lookupKeyWriteWithDictEntry(redisDb *db, robj *key, dictEntry **deref) { + return lookupKey(db, key, LOOKUP_NONE | LOOKUP_WRITE, deref); +} + robj *lookupKeyReadOrReply(client *c, robj *key, robj *reply) { robj *o = lookupKeyRead(c->db, key); if (!o) addReplyOrErrorObject(c, reply); @@ -294,6 +305,14 @@ void dbReplaceValue(redisDb *db, robj *key, robj *val) { dbSetValue(db, key, val, 0, NULL); } +/* Replace an existing key with a new value, we just replace value and don't + * emit any events. + * The dictEntry input is optional, can be used if we already have one. + */ +void dbReplaceValueWithDictEntry(redisDb *db, robj *key, robj *val, dictEntry *de) { + dbSetValue(db, key, val, 0, de); +} + /* High level Set operation. This function can be used in order to set * a key, whatever it was existing or not, to a new object. * @@ -308,6 +327,12 @@ void dbReplaceValue(redisDb *db, robj *key, robj *val) { * The client 'c' argument may be set to NULL if the operation is performed * in a context where there is no clear client performing the operation. */ void setKey(client *c, redisDb *db, robj *key, robj *val, int flags) { + setKeyWithDictEntry(c,db,key,val,flags,NULL); +} + +/* Like setKey(), but accepts an optional dictEntry input, + * which can be used if we already have one, thus saving the dictFind call. */ +void setKeyWithDictEntry(client *c, redisDb *db, robj *key, robj *val, int flags, dictEntry *de) { int keyfound = 0; if (flags & SETKEY_ALREADY_EXIST) @@ -322,7 +347,7 @@ void setKey(client *c, redisDb *db, robj *key, robj *val, int flags) { } else if (keyfound<0) { dbAddInternal(db,key,val,1); } else { - dbSetValue(db,key,val,1,NULL); + dbSetValue(db,key,val,1,de); } incrRefCount(val); if (!(flags & SETKEY_KEEPTTL)) removeExpire(db,key); @@ -452,12 +477,18 @@ int dbDelete(redisDb *db, robj *key) { * using an sdscat() call to append some data, or anything else. */ robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o) { + return dbUnshareStringValueWithDictEntry(db,key,o,NULL); +} + +/* Like dbUnshareStringValue(), but accepts a optional dictEntry, + * which can be used if we already have one, thus saving the dbFind call. */ +robj *dbUnshareStringValueWithDictEntry(redisDb *db, robj *key, robj *o, dictEntry *de) { serverAssert(o->type == OBJ_STRING); if (o->refcount != 1 || o->encoding != OBJ_ENCODING_RAW) { robj *decoded = getDecodedObject(o); o = createRawStringObject(decoded->ptr, sdslen(decoded->ptr)); decrRefCount(decoded); - dbReplaceValue(db,key,o); + dbReplaceValueWithDictEntry(db,key,o,de); } return o; } @@ -575,11 +606,11 @@ redisDb *initTempDb(void) { } /* Discard tempDb, this can be slow (similar to FLUSHALL), but it's always async. */ -void discardTempDb(redisDb *tempDb, void(callback)(dict*)) { +void discardTempDb(redisDb *tempDb) { int async = 1; /* Release temp DBs. */ - emptyDbStructure(tempDb, -1, async, callback); + emptyDbStructure(tempDb, -1, async, NULL); for (int i=0; iexpires, getKeySlot(key->ptr), key->ptr) == DICT_OK; } + /* Set an expire to the specified key. If the expire is set in the context * of an user calling a command 'c' is the client, otherwise 'c' is set * to NULL. The 'when' parameter is the absolute unix time in milliseconds * after which the key will no longer be considered valid. */ void setExpire(client *c, redisDb *db, robj *key, long long when) { - dictEntry *kde, *de, *existing; + setExpireWithDictEntry(c,db,key,when,NULL); +} + +/* Like setExpire(), but accepts an optional dictEntry input, + * which can be used if we already have one, thus saving the kvstoreDictFind call. */ +void setExpireWithDictEntry(client *c, redisDb *db, robj *key, long long when, dictEntry *kde) { + dictEntry *de, *existing; /* Reuse the sds from the main dict in the expire dict */ int slot = getKeySlot(key->ptr); - kde = kvstoreDictFind(db->keys, slot, key->ptr); + if (!kde) kde = kvstoreDictFind(db->keys, slot, key->ptr); serverAssertWithInfo(NULL,key,kde != NULL); de = kvstoreDictAddRaw(db->expires, slot, dictGetKey(kde), &existing); if (existing) { diff --git a/src/debug.c b/src/debug.c index a026a94cd6c..df74f053481 100644 --- a/src/debug.c +++ b/src/debug.c @@ -567,6 +567,7 @@ NULL addReplyError(c,"Error trying to load the RDB dump, check server logs."); return; } + applyAppendOnlyConfig(); /* Check if AOF config was changed while loading */ serverLog(LL_NOTICE,"DB reloaded by DEBUG RELOAD"); addReply(c,shared.ok); } else if (!strcasecmp(c->argv[1]->ptr,"loadaof")) { @@ -582,6 +583,7 @@ NULL addReplyError(c, "Error trying to load the AOF files, check server logs."); return; } + applyAppendOnlyConfig(); /* Check if AOF config was changed while loading */ server.dirty = 0; /* Prevent AOF / replication */ serverLog(LL_NOTICE,"Append Only File loaded by DEBUG LOADAOF"); addReply(c,shared.ok); diff --git a/src/defrag.c b/src/defrag.c index 78de7224867..a819eb8ac6c 100644 --- a/src/defrag.c +++ b/src/defrag.c @@ -54,6 +54,17 @@ void* activeDefragAlloc(void *ptr) { return newptr; } +/* Raw memory allocation for defrag, avoid using tcache. */ +void *activeDefragAllocRaw(size_t size) { + return zmalloc_no_tcache(size); +} + +/* Raw memory free for defrag, avoid using tcache. */ +void activeDefragFreeRaw(void *ptr) { + zfree_no_tcache(ptr); + server.stat_active_defrag_hits++; +} + /*Defrag helper for sds strings * * returns NULL in case the allocation wasn't moved. @@ -1078,6 +1089,7 @@ void activeDefragCycle(void) { slot = -1; defrag_later_item_in_progress = 0; db = NULL; + moduleDefragEnd(); goto update_metrics; } return; @@ -1119,6 +1131,10 @@ void activeDefragCycle(void) { break; /* this will exit the function and we'll continue on the next cycle */ } + if (current_db == -1) { + moduleDefragStart(); + } + /* Move on to next database, and stop if we reached the last one. */ if (++current_db >= server.dbnum) { /* defrag other items not part of the db / keys */ @@ -1140,6 +1156,8 @@ void activeDefragCycle(void) { db = NULL; server.active_defrag_running = 0; + moduleDefragEnd(); + computeDefragCycles(); /* if another scan is needed, start it right away */ if (server.active_defrag_running != 0 && ustime() < endtime) continue; @@ -1259,6 +1277,16 @@ void *activeDefragAlloc(void *ptr) { return NULL; } +void *activeDefragAllocRaw(size_t size) { + /* fallback to regular allocation */ + return zmalloc(size); +} + +void activeDefragFreeRaw(void *ptr) { + /* fallback to regular free */ + zfree(ptr); +} + robj *activeDefragStringOb(robj *ob) { UNUSED(ob); return NULL; diff --git a/src/dict.h b/src/dict.h index 1c0e6accd32..e7883306631 100644 --- a/src/dict.h +++ b/src/dict.h @@ -257,6 +257,18 @@ dictStats* dictGetStatsHt(dict *d, int htidx, int full); void dictCombineStats(dictStats *from, dictStats *into); void dictFreeStats(dictStats *stats); +#define dictForEach(d, ty, m, ...) do { \ + dictIterator *di = dictGetIterator(d); \ + dictEntry *de; \ + while ((de = dictNext(di)) != NULL) { \ + ty *m = dictGetVal(de); \ + do { \ + __VA_ARGS__ \ + } while(0); \ + } \ + dictReleaseIterator(di); \ +} while(0); + #ifdef REDIS_TEST int dictTest(int argc, char *argv[], int flags); #endif diff --git a/src/expire.c b/src/expire.c index d4289d7588f..1a49f5384ad 100644 --- a/src/expire.c +++ b/src/expire.c @@ -465,12 +465,10 @@ void expireSlaveKeys(void) { if ((dbids & 1) != 0) { redisDb *db = server.db+dbid; dictEntry *expire = dbFindExpires(db, keyname); - int expired = 0; if (expire && activeExpireCycleTryExpire(server.db+dbid,expire,start)) { - expired = 1; /* Propagate the DEL (writable replicas do not propagate anything to other replicas, * but they might propagate to AOF) and trigger module hooks. */ postExecutionUnitOperations(); @@ -480,7 +478,7 @@ void expireSlaveKeys(void) { * corresponding bit in the new bitmap we set as value. * At the end of the loop if the bitmap is zero, it means we * no longer need to keep track of this key. */ - if (expire && !expired) { + else if (expire) { noexpire++; new_dbids |= (uint64_t)1 << dbid; } diff --git a/src/geo.c b/src/geo.c index 90817998a19..cbb96cc2595 100644 --- a/src/geo.c +++ b/src/geo.c @@ -796,8 +796,8 @@ void georadiusGeneric(client *c, int srcKeyIndex, int flags) { if (withcoords) { addReplyArrayLen(c, 2); - addReplyHumanLongDouble(c, gp->longitude); - addReplyHumanLongDouble(c, gp->latitude); + addReplyDouble(c,gp->longitude); + addReplyDouble(c,gp->latitude); } } } else { @@ -959,8 +959,8 @@ void geoposCommand(client *c) { continue; } addReplyArrayLen(c,2); - addReplyHumanLongDouble(c,xy[0]); - addReplyHumanLongDouble(c,xy[1]); + addReplyDouble(c,xy[0]); + addReplyDouble(c,xy[1]); } } } diff --git a/src/listpack.c b/src/listpack.c index 5d9028e13d0..8b9ae196e83 100644 --- a/src/listpack.c +++ b/src/listpack.c @@ -687,8 +687,8 @@ int lpGetIntegerValue(unsigned char *p, long long *lval) { * will be returned. 'user' is passed to this callback. * Skip 'skip' entries between every comparison. * Returns NULL when the field could not be found. */ -unsigned char *lpFindCb(unsigned char *lp, unsigned char *p, - void *user, lpCmp cmp, unsigned int skip) +static inline unsigned char *lpFindCbInternal(unsigned char *lp, unsigned char *p, + void *user, lpCmp cmp, unsigned int skip) { int skipcnt = 0; unsigned char *value; @@ -707,7 +707,7 @@ unsigned char *lpFindCb(unsigned char *lp, unsigned char *p, assert(p >= lp + LP_HDR_SIZE && p + entry_size < lp + lp_bytes); } - if (cmp(lp, p, user, value, ll) == 0) + if (unlikely(cmp(lp, p, user, value, ll) == 0)) return p; /* Reset skip count */ @@ -734,6 +734,12 @@ unsigned char *lpFindCb(unsigned char *lp, unsigned char *p, return NULL; } +unsigned char *lpFindCb(unsigned char *lp, unsigned char *p, + void *user, lpCmp cmp, unsigned int skip) +{ + return lpFindCbInternal(lp, p, user, cmp, skip); +} + struct lpFindArg { unsigned char *s; /* Item to search */ uint32_t slen; /* Item len */ @@ -787,7 +793,7 @@ unsigned char *lpFind(unsigned char *lp, unsigned char *p, unsigned char *s, .s = s, .slen = slen }; - return lpFindCb(lp, p, &arg, lpFindCmp, skip); + return lpFindCbInternal(lp, p, &arg, lpFindCmp, skip); } /* Insert, delete or replace the specified string element 'elestr' of length diff --git a/src/module.c b/src/module.c index 157dc15095e..27974bc3221 100644 --- a/src/module.c +++ b/src/module.c @@ -2287,6 +2287,8 @@ void RM_SetModuleAttribs(RedisModuleCtx *ctx, const char *name, int ver, int api module->options = 0; module->info_cb = 0; module->defrag_cb = 0; + module->defrag_start_cb = 0; + module->defrag_end_cb = 0; module->loadmod = NULL; module->num_commands_with_acl_categories = 0; module->onload = 1; @@ -4274,7 +4276,7 @@ int RM_SetAbsExpire(RedisModuleKey *key, mstime_t expire) { void RM_ResetDataset(int restart_aof, int async) { if (restart_aof && server.aof_state != AOF_OFF) stopAppendOnly(); flushAllDataAndResetRDB((async? EMPTYDB_ASYNC: EMPTYDB_NO_FLAGS) | EMPTYDB_NOFUNCTIONS); - if (server.aof_enabled && restart_aof) restartAOFAfterSYNC(); + if (server.aof_enabled && restart_aof) startAppendOnlyWithRetry(); } /* Returns the number of keys in the current db. */ @@ -12463,10 +12465,15 @@ void modulePipeReadable(aeEventLoop *el, int fd, void *privdata, int mask) { /* Helper function for the MODULE and HELLO command: send the list of the * loaded modules to the client. */ void addReplyLoadedModules(client *c) { + const long ln = dictSize(modules); + /* In case no module is load we avoid iterator creation */ + addReplyArrayLen(c,ln); + if (ln == 0) { + return; + } dictIterator *di = dictGetIterator(modules); dictEntry *de; - addReplyArrayLen(c,dictSize(modules)); while ((de = dictNext(di)) != NULL) { sds name = dictGetKey(de); struct RedisModule *module = dictGetVal(de); @@ -13070,7 +13077,7 @@ int RM_RdbLoad(RedisModuleCtx *ctx, RedisModuleRdbStream *stream, int flags) { int ret = rdbLoad(stream->data.filename,NULL,RDBFLAGS_NONE); if (server.current_client) unprotectClient(server.current_client); - if (server.aof_state != AOF_OFF) startAppendOnly(); + if (server.aof_enabled) startAppendOnlyWithRetry(); if (ret != RDB_OK) { errno = (ret == RDB_NOT_EXIST) ? ENOENT : EIO; @@ -13451,6 +13458,16 @@ int RM_RegisterDefragFunc(RedisModuleCtx *ctx, RedisModuleDefragFunc cb) { return REDISMODULE_OK; } +/* Register a defrag callbacks that will be called when defrag operation starts and ends. + * + * The callbacks are the same as `RM_RegisterDefragFunc` but the user + * can also assume the callbacks are called when the defrag operation starts and ends. */ +int RM_RegisterDefragCallbacks(RedisModuleCtx *ctx, RedisModuleDefragFunc start, RedisModuleDefragFunc end) { + ctx->module->defrag_start_cb = start; + ctx->module->defrag_end_cb = end; + return REDISMODULE_OK; +} + /* When the data type defrag callback iterates complex structures, this * function should be called periodically. A zero (false) return * indicates the callback may continue its work. A non-zero value (true) @@ -13529,6 +13546,30 @@ void *RM_DefragAlloc(RedisModuleDefragCtx *ctx, void *ptr) { return activeDefragAlloc(ptr); } +/* Allocate memory for defrag purposes + * + * On the common cases user simply want to reallocate a pointer with a single + * owner. For such usecase RM_DefragAlloc is enough. But on some usecases the user + * might want to replace a pointer with multiple owners in different keys. + * In such case, an in place replacement can not work because the other key still + * keep a pointer to the old value. + * + * RM_DefragAllocRaw and RM_DefragFreeRaw allows to control when the memory + * for defrag purposes will be allocated and when it will be freed, + * allow to support more complex defrag usecases. */ +void *RM_DefragAllocRaw(RedisModuleDefragCtx *ctx, size_t size) { + UNUSED(ctx); + return activeDefragAllocRaw(size); +} + +/* Free memory for defrag purposes + * + * See RM_DefragAllocRaw for more information. */ +void RM_DefragFreeRaw(RedisModuleDefragCtx *ctx, void *ptr) { + UNUSED(ctx); + activeDefragFreeRaw(ptr); +} + /* Defrag a RedisModuleString previously allocated by RM_Alloc, RM_Calloc, etc. * See RM_DefragAlloc() for more information on how the defragmentation process * works. @@ -13610,17 +13651,32 @@ int moduleDefragValue(robj *key, robj *value, int dbid) { /* Call registered module API defrag functions */ void moduleDefragGlobals(void) { - dictIterator *di = dictGetIterator(modules); - dictEntry *de; + dictForEach(modules, struct RedisModule, module, + if (module->defrag_cb) { + RedisModuleDefragCtx defrag_ctx = { 0, NULL, NULL, -1}; + module->defrag_cb(&defrag_ctx); + } + ); +} - while ((de = dictNext(di)) != NULL) { - struct RedisModule *module = dictGetVal(de); - if (!module->defrag_cb) - continue; - RedisModuleDefragCtx defrag_ctx = { 0, NULL, NULL, -1}; - module->defrag_cb(&defrag_ctx); - } - dictReleaseIterator(di); +/* Call registered module API defrag start functions */ +void moduleDefragStart(void) { + dictForEach(modules, struct RedisModule, module, + if (module->defrag_start_cb) { + RedisModuleDefragCtx defrag_ctx = { 0, NULL, NULL, -1}; + module->defrag_start_cb(&defrag_ctx); + } + ); +} + +/* Call registered module API defrag end functions */ +void moduleDefragEnd(void) { + dictForEach(modules, struct RedisModule, module, + if (module->defrag_end_cb) { + RedisModuleDefragCtx defrag_ctx = { 0, NULL, NULL, -1}; + module->defrag_end_cb(&defrag_ctx); + } + ); } /* Returns the name of the key currently being processed. @@ -13980,7 +14036,10 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(GetCurrentCommandName); REGISTER_API(GetTypeMethodVersion); REGISTER_API(RegisterDefragFunc); + REGISTER_API(RegisterDefragCallbacks); REGISTER_API(DefragAlloc); + REGISTER_API(DefragAllocRaw); + REGISTER_API(DefragFreeRaw); REGISTER_API(DefragRedisModuleString); REGISTER_API(DefragShouldStop); REGISTER_API(DefragCursorSet); diff --git a/src/networking.c b/src/networking.c index d3d9256642b..33a8f7b27bd 100644 --- a/src/networking.c +++ b/src/networking.c @@ -26,7 +26,11 @@ static void setProtocolError(const char *errstr, client *c); static void pauseClientsByClient(mstime_t end, int isPauseClientAll); int postponeClientRead(client *c); char *getClientSockname(client *c); +static inline int clientTypeIsSlave(client *c); int ProcessingEventsWhileBlocked = 0; /* See processEventsWhileBlocked(). */ +__thread sds thread_reusable_qb = NULL; +__thread int thread_reusable_qb_used = 0; /* Avoid multiple clients using reusable query + * buffer due to nested command execution. */ /* Return the size consumed from the allocator, for the specified SDS string, * including internal fragmentation. This function is used in order to compute @@ -144,7 +148,7 @@ client *createClient(connection *conn) { c->ref_repl_buf_node = NULL; c->ref_block_pos = 0; c->qb_pos = 0; - c->querybuf = sdsempty(); + c->querybuf = NULL; c->querybuf_peak = 0; c->reqtype = 0; c->argc = 0; @@ -391,7 +395,7 @@ void _addReplyToBufferOrList(client *c, const char *s, size_t len) { * replication link that caused a reply to be generated we'll simply disconnect it. * Note this is the simplest way to check a command added a response. Replication links are used to write data but * not for responses, so we should normally never get here on a replica client. */ - if (getClientType(c) == CLIENT_TYPE_SLAVE) { + if (unlikely(clientTypeIsSlave(c))) { sds cmdname = c->lastcmd ? c->lastcmd->fullname : NULL; logInvalidUseAndFreeClientAsync(c, "Replica generated a reply to command '%s'", cmdname ? cmdname : ""); @@ -733,7 +737,7 @@ void *addReplyDeferredLen(client *c) { * replication link that caused a reply to be generated we'll simply disconnect it. * Note this is the simplest way to check a command added a response. Replication links are used to write data but * not for responses, so we should normally never get here on a replica client. */ - if (getClientType(c) == CLIENT_TYPE_SLAVE) { + if (unlikely(clientTypeIsSlave(c))) { sds cmdname = c->lastcmd ? c->lastcmd->fullname : NULL; logInvalidUseAndFreeClientAsync(c, "Replica generated a reply to command '%s'", cmdname ? cmdname : ""); @@ -970,6 +974,12 @@ void addReplyLongLong(client *c, long long ll) { } } +void addReplyLongLongFromStr(client *c, robj *str) { + addReplyProto(c,":",1); + addReply(c,str); + addReplyProto(c,"\r\n",2); +} + void addReplyAggregateLen(client *c, long length, int prefix) { serverAssert(length >= 0); if (prepareClientToWrite(c) != C_OK) return; @@ -1242,7 +1252,7 @@ void copyReplicaOutputBuffer(client *dst, client *src) { /* Return true if the specified client has pending reply buffers to write to * the socket. */ int clientHasPendingReplies(client *c) { - if (getClientType(c) == CLIENT_TYPE_SLAVE) { + if (unlikely(clientTypeIsSlave(c))) { /* Replicas use global shared replication buffer instead of * private output buffer. */ serverAssert(c->bufpos == 0 && listLength(c->reply) == 0); @@ -1575,6 +1585,28 @@ void deauthenticateAndCloseClient(client *c) { } } +/* Resets the reusable query buffer used by the given client. + * If any data remained in the buffer, the client will take ownership of the buffer + * and a new empty buffer will be allocated for the reusable buffer. */ +static void resetReusableQueryBuf(client *c) { + serverAssert(c->flags & CLIENT_REUSABLE_QUERYBUFFER); + if (c->querybuf != thread_reusable_qb || sdslen(c->querybuf) > c->qb_pos) { + /* If querybuf has been reallocated or there is still data left, + * let the client take ownership of the reusable buffer. */ + thread_reusable_qb = NULL; + } else { + /* It is safe to dereference and reuse the reusable query buffer. */ + c->querybuf = NULL; + c->qb_pos = 0; + sdsclear(thread_reusable_qb); + } + + /* Mark that the client is no longer using the reusable query buffer + * and indicate that it is no longer used by any client. */ + c->flags &= ~CLIENT_REUSABLE_QUERYBUFFER; + thread_reusable_qb_used = 0; +} + void freeClient(client *c) { listNode *ln; @@ -1623,12 +1655,14 @@ void freeClient(client *c) { } /* Log link disconnection with slave */ - if (getClientType(c) == CLIENT_TYPE_SLAVE) { + if (clientTypeIsSlave(c)) { serverLog(LL_NOTICE,"Connection with replica %s lost.", replicationGetSlaveName(c)); } /* Free the query buffer */ + if (c->flags & CLIENT_REUSABLE_QUERYBUFFER) + resetReusableQueryBuf(c); sdsfree(c->querybuf); c->querybuf = NULL; @@ -1703,7 +1737,7 @@ void freeClient(client *c) { /* We need to remember the time when we started to have zero * attached slaves, as after some time we'll free the replication * backlog. */ - if (getClientType(c) == CLIENT_TYPE_SLAVE && listLength(server.slaves) == 0) + if (clientTypeIsSlave(c) && listLength(server.slaves) == 0) server.repl_no_slaves_since = server.unixtime; refreshGoodSlavesCount(); /* Fire the replica change modules event. */ @@ -1916,7 +1950,7 @@ static int _writevToClient(client *c, ssize_t *nwritten) { * to client. */ int _writeToClient(client *c, ssize_t *nwritten) { *nwritten = 0; - if (getClientType(c) == CLIENT_TYPE_SLAVE) { + if (unlikely(clientTypeIsSlave(c))) { serverAssert(c->bufpos == 0 && listLength(c->reply) == 0); replBufBlock *o = listNodeValue(c->ref_repl_buf_node); @@ -2003,7 +2037,7 @@ int writeToClient(client *c, int handler_installed) { !(c->flags & CLIENT_SLAVE)) break; } - if (getClientType(c) == CLIENT_TYPE_SLAVE) { + if (unlikely(clientTypeIsSlave(c))) { atomicIncr(server.stat_net_repl_output_bytes, totwritten); } else { atomicIncr(server.stat_net_output_bytes, totwritten); @@ -2207,7 +2241,7 @@ int processInlineBuffer(client *c) { /* Newline from slaves can be used to refresh the last ACK time. * This is useful for a slave to ping back while loading a big * RDB file. */ - if (querylen == 0 && getClientType(c) == CLIENT_TYPE_SLAVE) + if (querylen == 0 && clientTypeIsSlave(c)) c->repl_ack_time = server.unixtime; /* Masters should never send us inline protocol to run actual @@ -2674,6 +2708,11 @@ void readQueryFromClient(connection *conn) { if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1 && c->bulklen >= PROTO_MBULK_BIG_ARG) { + /* For big argv, the client always uses its private query buffer. + * Using the reusable query buffer would eventually expand it beyond 32k, + * causing the client to take ownership of the reusable query buffer. */ + if (!c->querybuf) c->querybuf = sdsempty(); + ssize_t remaining = (size_t)(c->bulklen+2)-(sdslen(c->querybuf)-c->qb_pos); big_arg = 1; @@ -2685,6 +2724,26 @@ void readQueryFromClient(connection *conn) { * but doesn't need align to the next arg, we can read more data. */ if (c->flags & CLIENT_MASTER && readlen < PROTO_IOBUF_LEN) readlen = PROTO_IOBUF_LEN; + } else if (c->querybuf == NULL) { + if (unlikely(thread_reusable_qb_used)) { + /* The reusable query buffer is already used by another client, + * switch to using the client's private query buffer. This only + * occurs when commands are executed nested via processEventsWhileBlocked(). */ + c->querybuf = sdsnewlen(NULL, PROTO_IOBUF_LEN); + sdsclear(c->querybuf); + } else { + /* Create the reusable query buffer if it doesn't exist. */ + if (!thread_reusable_qb) { + thread_reusable_qb = sdsnewlen(NULL, PROTO_IOBUF_LEN); + sdsclear(thread_reusable_qb); + } + + /* Assign the reusable query buffer to the client and mark it as in use. */ + serverAssert(sdslen(thread_reusable_qb) == 0); + c->querybuf = thread_reusable_qb; + c->flags |= CLIENT_REUSABLE_QUERYBUFFER; + thread_reusable_qb_used = 1; + } } qblen = sdslen(c->querybuf); @@ -2708,7 +2767,7 @@ void readQueryFromClient(connection *conn) { nread = connRead(c->conn, c->querybuf+qblen, readlen); if (nread == -1) { if (connGetState(conn) == CONN_STATE_CONNECTED) { - return; + goto done; } else { serverLog(LL_VERBOSE, "Reading from client: %s",connGetLastError(c->conn)); freeClientAsync(c); @@ -2760,6 +2819,10 @@ void readQueryFromClient(connection *conn) { c = NULL; done: + if (c && (c->flags & CLIENT_REUSABLE_QUERYBUFFER)) { + serverAssert(c->qb_pos == 0); /* Ensure the client's query buffer is trimmed in processInputBuffer */ + resetReusableQueryBuf(c); + } beforeNextClient(c); } @@ -2875,8 +2938,8 @@ sds catClientInfoString(sds s, client *client) { " ssub=%i", (int) dictSize(client->pubsubshard_channels), " multi=%i", (client->flags & CLIENT_MULTI) ? client->mstate.count : -1, " watch=%i", (int) listLength(client->watched_keys), - " qbuf=%U", (unsigned long long) sdslen(client->querybuf), - " qbuf-free=%U", (unsigned long long) sdsavail(client->querybuf), + " qbuf=%U", client->querybuf ? (unsigned long long) sdslen(client->querybuf) : 0, + " qbuf-free=%U", client->querybuf ? (unsigned long long) sdsavail(client->querybuf) : 0, " argv-mem=%U", (unsigned long long) client->argv_len_sum, " multi-mem=%U", (unsigned long long) client->mstate.argv_len_sums, " rbs=%U", (unsigned long long) client->buf_usable_size, @@ -3662,29 +3725,30 @@ void helloCommand(client *c) { if (ver) c->resp = ver; addReplyMapLen(c,6 + !server.sentinel_mode); - addReplyBulkCString(c,"server"); - addReplyBulkCString(c,"redis"); + ADD_REPLY_BULK_CBUFFER_STRING_CONSTANT(c,"server"); + ADD_REPLY_BULK_CBUFFER_STRING_CONSTANT(c,"redis"); - addReplyBulkCString(c,"version"); - addReplyBulkCString(c,REDIS_VERSION); + ADD_REPLY_BULK_CBUFFER_STRING_CONSTANT(c,"version"); + ADD_REPLY_BULK_CBUFFER_STRING_CONSTANT(c,REDIS_VERSION); + + ADD_REPLY_BULK_CBUFFER_STRING_CONSTANT(c,"proto"); - addReplyBulkCString(c,"proto"); addReplyLongLong(c,c->resp); - addReplyBulkCString(c,"id"); + ADD_REPLY_BULK_CBUFFER_STRING_CONSTANT(c,"id"); addReplyLongLong(c,c->id); - addReplyBulkCString(c,"mode"); + ADD_REPLY_BULK_CBUFFER_STRING_CONSTANT(c,"mode"); if (server.sentinel_mode) addReplyBulkCString(c,"sentinel"); else if (server.cluster_enabled) addReplyBulkCString(c,"cluster"); else addReplyBulkCString(c,"standalone"); if (!server.sentinel_mode) { - addReplyBulkCString(c,"role"); + ADD_REPLY_BULK_CBUFFER_STRING_CONSTANT(c,"role"); addReplyBulkCString(c,server.masterhost ? "replica" : "master"); } - addReplyBulkCString(c,"modules"); + ADD_REPLY_BULK_CBUFFER_STRING_CONSTANT(c,"modules"); addReplyLoadedModules(c); } @@ -3834,7 +3898,7 @@ void rewriteClientCommandArgument(client *c, int i, robj *newval) { * the caller wishes. The main usage of this function currently is * enforcing the client output length limits. */ size_t getClientOutputBufferMemoryUsage(client *c) { - if (getClientType(c) == CLIENT_TYPE_SLAVE) { + if (unlikely(clientTypeIsSlave(c))) { size_t repl_buf_size = 0; size_t repl_node_num = 0; size_t repl_node_size = sizeof(listNode) + sizeof(replBufBlock); @@ -3856,9 +3920,10 @@ size_t getClientOutputBufferMemoryUsage(client *c) { * the client output buffer memory usage portion of the total. */ size_t getClientMemoryUsage(client *c, size_t *output_buffer_mem_usage) { size_t mem = getClientOutputBufferMemoryUsage(c); + if (output_buffer_mem_usage != NULL) *output_buffer_mem_usage = mem; - mem += sdsZmallocSize(c->querybuf); + mem += c->querybuf ? sdsZmallocSize(c->querybuf) : 0; mem += zmalloc_size(c); mem += c->buf_usable_size; /* For efficiency (less work keeping track of the argv memory), it doesn't include the used memory @@ -3897,6 +3962,12 @@ int getClientType(client *c) { return CLIENT_TYPE_NORMAL; } +static inline int clientTypeIsSlave(client *c) { + if (unlikely((c->flags & CLIENT_SLAVE) && !(c->flags & CLIENT_MONITOR))) + return 1; + return 0; +} + int getClientTypeByName(char *name) { if (!strcasecmp(name,"normal")) return CLIENT_TYPE_NORMAL; else if (!strcasecmp(name,"slave")) return CLIENT_TYPE_SLAVE; @@ -3986,7 +4057,7 @@ int closeClientOnOutputBufferLimitReached(client *c, int async) { serverAssert(c->reply_bytes < SIZE_MAX-(1024*64)); /* Note that c->reply_bytes is irrelevant for replica clients * (they use the global repl buffers). */ - if ((c->reply_bytes == 0 && getClientType(c) != CLIENT_TYPE_SLAVE) || + if ((c->reply_bytes == 0 && !clientTypeIsSlave(c)) || c->flags & CLIENT_CLOSE_ASAP) return 0; if (checkClientOutputBufferLimits(c)) { sds client = catClientInfoString(sdsempty(),c); @@ -4416,7 +4487,7 @@ int handleClientsWithPendingWritesUsingThreads(void) { * buffer, to guarantee data accessing thread safe, we must put all * replicas client into io_threads_list[0] i.e. main thread handles * sending the output buffer of all replicas. */ - if (getClientType(c) == CLIENT_TYPE_SLAVE) { + if (unlikely(clientTypeIsSlave(c))) { listAddNodeTail(io_threads_list[0],c); continue; } diff --git a/src/rdb.c b/src/rdb.c index 2a39c8af538..0bd4ee1babf 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -3161,7 +3161,13 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, int dbid, int *error) /* Mark that we are loading in the global state and setup the fields * needed to provide loading stats. */ void startLoading(size_t size, int rdbflags, int async) { - /* Load the DB */ + loadingSetFlags(NULL, size, async); + loadingFireEvent(rdbflags); +} + +/* Initialize stats, set loading flags and filename if provided. */ +void loadingSetFlags(char *filename, size_t size, int async) { + rdbFileBeingLoaded = filename; server.loading = 1; if (async == 1) server.async_loading = 1; server.loading_start_time = time(NULL); @@ -3171,7 +3177,9 @@ void startLoading(size_t size, int rdbflags, int async) { server.rdb_last_load_keys_expired = 0; server.rdb_last_load_keys_loaded = 0; blockingOperationStarts(); +} +void loadingFireEvent(int rdbflags) { /* Fire the loading modules start event. */ int subevent; if (rdbflags & RDBFLAGS_AOF_PREAMBLE) @@ -3187,8 +3195,8 @@ void startLoading(size_t size, int rdbflags, int async) { * needed to provide loading stats. * 'filename' is optional and used for rdb-check on error */ void startLoadingFile(size_t size, char* filename, int rdbflags) { - rdbFileBeingLoaded = filename; - startLoading(size, rdbflags, 0); + loadingSetFlags(filename, size, 0); + loadingFireEvent(rdbflags); } /* Refresh the absolute loading progress info */ @@ -3702,14 +3710,21 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin return C_ERR; } +int rdbLoad(char *filename, rdbSaveInfo *rsi, int rdbflags) { + return rdbLoadWithEmptyFunc(filename, rsi, rdbflags, NULL); +} + /* Like rdbLoadRio() but takes a filename instead of a rio stream. The * filename is open for reading and a rio stream object created in order * to do the actual loading. Moreover the ETA displayed in the INFO * output is initialized and finalized. * * If you pass an 'rsi' structure initialized with RDB_SAVE_INFO_INIT, the - * loading code will fill the information fields in the structure. */ -int rdbLoad(char *filename, rdbSaveInfo *rsi, int rdbflags) { + * loading code will fill the information fields in the structure. + * + * If emptyDbFunc is not NULL, it will be called to flush old db or to + * discard partial db on error. */ +int rdbLoadWithEmptyFunc(char *filename, rdbSaveInfo *rsi, int rdbflags, void (*emptyDbFunc)(void)) { FILE *fp; rio rdb; int retval; @@ -3727,12 +3742,20 @@ int rdbLoad(char *filename, rdbSaveInfo *rsi, int rdbflags) { if (fstat(fileno(fp), &sb) == -1) sb.st_size = 0; - startLoadingFile(sb.st_size, filename, rdbflags); + loadingSetFlags(filename, sb.st_size, 0); + /* Note that inside loadingSetFlags(), server.loading is set. + * emptyDbCallback() may yield back to event-loop to reply -LOADING. */ + if (emptyDbFunc) + emptyDbFunc(); /* Flush existing db. */ + loadingFireEvent(rdbflags); rioInitWithFile(&rdb,fp); retval = rdbLoadRio(&rdb,rdbflags,rsi); fclose(fp); + if (retval != C_OK && emptyDbFunc) + emptyDbFunc(); /* Clean up partial db. */ + stopLoading(retval==C_OK); /* Reclaim the cache backed by rdb */ if (retval == C_OK && !(rdbflags & RDBFLAGS_KEEP_CACHE)) { diff --git a/src/rdb.h b/src/rdb.h index 953780bb6bc..85045f1527f 100644 --- a/src/rdb.h +++ b/src/rdb.h @@ -136,6 +136,7 @@ uint64_t rdbLoadLen(rio *rdb, int *isencoded); int rdbLoadLenByRef(rio *rdb, int *isencoded, uint64_t *lenptr); int rdbSaveObjectType(rio *rdb, robj *o); int rdbLoadObjectType(rio *rdb); +int rdbLoadWithEmptyFunc(char *filename, rdbSaveInfo *rsi, int rdbflags, void (*emptyDbFunc)(void)); int rdbLoad(char *filename, rdbSaveInfo *rsi, int rdbflags); int rdbSaveBackground(int req, char *filename, rdbSaveInfo *rsi, int rdbflags); int rdbSaveToSlavesSockets(int req, rdbSaveInfo *rsi); diff --git a/src/redisassert.c b/src/redisassert.c index fb16bd6a2fe..e4918f2894a 100644 --- a/src/redisassert.c +++ b/src/redisassert.c @@ -35,6 +35,7 @@ */ +#include #include #include #include @@ -46,8 +47,15 @@ void _serverAssert(const char *estr, const char *file, int line) { } void _serverPanic(const char *file, int line, const char *msg, ...) { + va_list ap; + char fmtmsg[256]; + + va_start(ap,msg); + vsnprintf(fmtmsg,sizeof(fmtmsg),msg,ap); + va_end(ap); + fprintf(stderr, "------------------------------------------------"); fprintf(stderr, "!!! Software Failure. Press left mouse button to continue"); - fprintf(stderr, "Guru Meditation: %s #%s:%d",msg,file,line); + fprintf(stderr, "Guru Meditation: %s #%s:%d",fmtmsg,file,line); abort(); } diff --git a/src/redismodule.h b/src/redismodule.h index 8b5d2beb65d..9e0b5c5ee0b 100644 --- a/src/redismodule.h +++ b/src/redismodule.h @@ -1296,7 +1296,10 @@ REDISMODULE_API int *(*RedisModule_GetCommandKeys)(RedisModuleCtx *ctx, RedisMod REDISMODULE_API int *(*RedisModule_GetCommandKeysWithFlags)(RedisModuleCtx *ctx, RedisModuleString **argv, int argc, int *num_keys, int **out_flags) REDISMODULE_ATTR; REDISMODULE_API const char *(*RedisModule_GetCurrentCommandName)(RedisModuleCtx *ctx) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_RegisterDefragFunc)(RedisModuleCtx *ctx, RedisModuleDefragFunc func) REDISMODULE_ATTR; +REDISMODULE_API int (*RedisModule_RegisterDefragCallbacks)(RedisModuleCtx *ctx, RedisModuleDefragFunc start, RedisModuleDefragFunc end) REDISMODULE_ATTR; REDISMODULE_API void *(*RedisModule_DefragAlloc)(RedisModuleDefragCtx *ctx, void *ptr) REDISMODULE_ATTR; +REDISMODULE_API void *(*RedisModule_DefragAllocRaw)(RedisModuleDefragCtx *ctx, size_t size) REDISMODULE_ATTR; +REDISMODULE_API void (*RedisModule_DefragFreeRaw)(RedisModuleDefragCtx *ctx, void *ptr) REDISMODULE_ATTR; REDISMODULE_API RedisModuleString *(*RedisModule_DefragRedisModuleString)(RedisModuleDefragCtx *ctx, RedisModuleString *str) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_DefragShouldStop)(RedisModuleDefragCtx *ctx) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_DefragCursorSet)(RedisModuleDefragCtx *ctx, unsigned long cursor) REDISMODULE_ATTR; @@ -1662,7 +1665,10 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int REDISMODULE_GET_API(GetCommandKeysWithFlags); REDISMODULE_GET_API(GetCurrentCommandName); REDISMODULE_GET_API(RegisterDefragFunc); + REDISMODULE_GET_API(RegisterDefragCallbacks); REDISMODULE_GET_API(DefragAlloc); + REDISMODULE_GET_API(DefragAllocRaw); + REDISMODULE_GET_API(DefragFreeRaw); REDISMODULE_GET_API(DefragRedisModuleString); REDISMODULE_GET_API(DefragShouldStop); REDISMODULE_GET_API(DefragCursorSet); diff --git a/src/replication.c b/src/replication.c index 52e901e88ff..abf930e61b6 100644 --- a/src/replication.c +++ b/src/replication.c @@ -3,13 +3,15 @@ * Copyright (c) 2009-Present, Redis Ltd. * All rights reserved. * + * Copyright (c) 2024-present, Valkey contributors. + * All rights reserved. + * * Licensed under your choice of the Redis Source Available License 2.0 * (RSALv2) or the Server Side Public License v1 (SSPLv1). * * Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information. */ - #include "server.h" #include "cluster.h" #include "bio.h" @@ -1736,12 +1738,24 @@ void replicationSendNewlineToMaster(void) { } /* Callback used by emptyData() while flushing away old data to load - * the new dataset received by the master and by discardTempDb() - * after loading succeeded or failed. */ + * the new dataset received by the master or to clear partial db if loading + * fails. */ void replicationEmptyDbCallback(dict *d) { UNUSED(d); if (server.repl_state == REPL_STATE_TRANSFER) replicationSendNewlineToMaster(); + + processEventsWhileBlocked(); +} + +/* Function to flush old db or the partial db on error. */ +static void rdbLoadEmptyDbFunc(void) { + serverAssert(server.loading); + + serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Flushing old data"); + int empty_db_flags = server.repl_slave_lazy_flush ? EMPTYDB_ASYNC : + EMPTYDB_NO_FLAGS; + emptyData(-1, empty_db_flags, replicationEmptyDbCallback); } /* Once we have a link with the master and the synchronization was @@ -1765,6 +1779,9 @@ void replicationCreateMasterClient(connection *conn, int dbid) { * connection. */ server.master->flags |= CLIENT_MASTER; + /* Allocate a private query buffer for the master client instead of using the reusable query buffer. + * This is done because the master's query buffer data needs to be preserved for my sub-replicas to use. */ + server.master->querybuf = sdsempty(); server.master->authenticated = 1; server.master->reploff = server.master_initial_offset; server.master->read_reploff = server.master->reploff; @@ -1778,27 +1795,6 @@ void replicationCreateMasterClient(connection *conn, int dbid) { if (dbid != -1) selectDb(server.master,dbid); } -/* This function will try to re-enable the AOF file after the - * master-replica synchronization: if it fails after multiple attempts - * the replica cannot be considered reliable and exists with an - * error. */ -void restartAOFAfterSYNC(void) { - unsigned int tries, max_tries = 10; - for (tries = 0; tries < max_tries; ++tries) { - if (startAppendOnly() == C_OK) break; - serverLog(LL_WARNING, - "Failed enabling the AOF after successful master synchronization! " - "Trying it again in one second."); - sleep(1); - } - if (tries == max_tries) { - serverLog(LL_WARNING, - "FATAL: this replica instance finished the synchronization with " - "its master, but the AOF can't be turned on. Exiting now."); - exit(1); - } -} - static int useDisklessLoad(void) { /* compute boolean decision to use diskless load */ int enabled = server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB || @@ -1831,7 +1827,7 @@ redisDb *disklessLoadInitTempDb(void) { /* Helper function for readSyncBulkPayload() to discard our tempDb * when the loading succeeded or failed. */ void disklessLoadDiscardTempDb(redisDb *tempDb) { - discardTempDb(tempDb, replicationEmptyDbCallback); + discardTempDb(tempDb); } /* If we know we got an entirely different data set from our master @@ -2057,9 +2053,6 @@ void readSyncBulkPayload(connection *conn) { NULL); } else { replicationAttachToNewMaster(); - - serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Flushing old data"); - emptyData(-1,empty_db_flags,replicationEmptyDbCallback); } /* Before loading the DB into memory we need to delete the readable @@ -2093,13 +2086,22 @@ void readSyncBulkPayload(connection *conn) { functionsLibCtxClear(functions_lib_ctx); } + loadingSetFlags(NULL, server.repl_transfer_size, asyncLoading); + if (server.repl_diskless_load != REPL_DISKLESS_LOAD_SWAPDB) { + serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Flushing old data"); + /* Note that inside loadingSetFlags(), server.loading is set. + * replicationEmptyDbCallback() may yield back to event-loop to + * reply -LOADING. */ + emptyData(-1, empty_db_flags, replicationEmptyDbCallback); + } + loadingFireEvent(RDBFLAGS_REPLICATION); + rioInitWithConn(&rdb,conn,server.repl_transfer_size); /* Put the socket in blocking mode to simplify RDB transfer. * We'll restore it when the RDB is received. */ connBlock(conn); connRecvTimeout(conn, server.repl_timeout*1000); - startLoading(server.repl_transfer_size, RDBFLAGS_REPLICATION, asyncLoading); int loadingFailed = 0; rdbLoadingCtx loadingCtx = { .dbarray = dbarray, .functions_lib_ctx = functions_lib_ctx }; @@ -2120,7 +2122,6 @@ void readSyncBulkPayload(connection *conn) { } if (loadingFailed) { - stopLoading(0); cancelReplicationHandshake(1); rioFreeConn(&rdb, NULL); @@ -2138,6 +2139,11 @@ void readSyncBulkPayload(connection *conn) { emptyData(-1,empty_db_flags,replicationEmptyDbCallback); } + /* Note that replicationEmptyDbCallback() may yield back to event + * loop to reply -LOADING if flushing the db takes a long time. So, + * stopLoading() must be called after emptyData() above. */ + stopLoading(0); + /* Note that there's no point in restarting the AOF on SYNC * failure, it'll be restarted when sync succeeds or the replica * gets promoted. */ @@ -2213,7 +2219,7 @@ void readSyncBulkPayload(connection *conn) { return; } - if (rdbLoad(server.rdb_filename,&rsi,RDBFLAGS_REPLICATION) != RDB_OK) { + if (rdbLoadWithEmptyFunc(server.rdb_filename,&rsi,RDBFLAGS_REPLICATION,rdbLoadEmptyDbFunc) != RDB_OK) { serverLog(LL_WARNING, "Failed trying to load the MASTER synchronization " "DB from disk, check server logs."); @@ -2225,9 +2231,6 @@ void readSyncBulkPayload(connection *conn) { bg_unlink(server.rdb_filename); } - /* If disk-based RDB loading fails, remove the half-loaded dataset. */ - emptyData(-1, empty_db_flags, replicationEmptyDbCallback); - /* Note that there's no point in restarting the AOF on sync failure, it'll be restarted when sync succeeds or replica promoted. */ return; @@ -2281,7 +2284,10 @@ void readSyncBulkPayload(connection *conn) { /* Restart the AOF subsystem now that we finished the sync. This * will trigger an AOF rewrite, and when done will start appending * to the new file. */ - if (server.aof_enabled) restartAOFAfterSYNC(); + if (server.aof_enabled) { + serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Starting AOF after a successful sync"); + startAppendOnlyWithRetry(); + } return; error: @@ -3098,7 +3104,10 @@ void replicationUnsetMaster(void) { /* Restart the AOF subsystem in case we shut it down during a sync when * we were still a slave. */ - if (server.aof_enabled && server.aof_state == AOF_OFF) restartAOFAfterSYNC(); + if (server.aof_enabled && server.aof_state == AOF_OFF) { + serverLog(LL_NOTICE, "Restarting AOF after becoming master"); + startAppendOnlyWithRetry(); + } } /* This function is called when the slave lose the connection with the diff --git a/src/server.c b/src/server.c index 1a7bf27001e..f4248740f55 100644 --- a/src/server.c +++ b/src/server.c @@ -2,8 +2,13 @@ * Copyright (c) 2009-Present, Redis Ltd. * All rights reserved. * + * Copyright (c) 2024-present, Valkey contributors. + * All rights reserved. + * * Licensed under your choice of the Redis Source Available License 2.0 * (RSALv2) or the Server Side Public License v1 (SSPLv1). + * + * Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information. */ #include "server.h" @@ -734,6 +739,8 @@ long long getInstantaneousMetric(int metric) { * * The function always returns 0 as it never terminates the client. */ int clientsCronResizeQueryBuffer(client *c) { + /* If the client query buffer is NULL, it is using the reusable query buffer and there is nothing to do. */ + if (c->querybuf == NULL) return 0; size_t querybuf_size = sdsalloc(c->querybuf); time_t idletime = server.unixtime - c->lastinteraction; @@ -743,7 +750,18 @@ int clientsCronResizeQueryBuffer(client *c) { /* There are two conditions to resize the query buffer: */ if (idletime > 2) { /* 1) Query is idle for a long time. */ - c->querybuf = sdsRemoveFreeSpace(c->querybuf, 1); + size_t remaining = sdslen(c->querybuf) - c->qb_pos; + if (!(c->flags & CLIENT_MASTER) && !remaining) { + /* If the client is not a master and no data is pending, + * The client can safely use the reusable query buffer in the next read - free the client's querybuf. */ + sdsfree(c->querybuf); + /* By setting the querybuf to NULL, the client will use the reusable query buffer in the next read. + * We don't move the client to the reusable query buffer immediately, because if we allocated a private + * query buffer for the client, it's likely that the client will use it again soon. */ + c->querybuf = NULL; + } else { + c->querybuf = sdsRemoveFreeSpace(c->querybuf, 1); + } } else if (querybuf_size > PROTO_RESIZE_THRESHOLD && querybuf_size/2 > c->querybuf_peak) { /* 2) Query buffer is too big for latest peak and is larger than * resize threshold. Trim excess space but only up to a limit, @@ -759,7 +777,7 @@ int clientsCronResizeQueryBuffer(client *c) { /* Reset the peak again to capture the peak memory usage in the next * cycle. */ - c->querybuf_peak = sdslen(c->querybuf); + c->querybuf_peak = c->querybuf ? sdslen(c->querybuf) : 0; /* We reset to either the current used, or currently processed bulk size, * which ever is bigger. */ if (c->bulklen != -1 && (size_t)c->bulklen + 2 > c->querybuf_peak) c->querybuf_peak = c->bulklen + 2; @@ -834,8 +852,9 @@ size_t ClientsPeakMemInput[CLIENTS_PEAK_MEM_USAGE_SLOTS] = {0}; size_t ClientsPeakMemOutput[CLIENTS_PEAK_MEM_USAGE_SLOTS] = {0}; int clientsCronTrackExpansiveClients(client *c, int time_idx) { - size_t in_usage = sdsZmallocSize(c->querybuf) + c->argv_len_sum + - (c->argv ? zmalloc_size(c->argv) : 0); + size_t qb_size = c->querybuf ? sdsZmallocSize(c->querybuf) : 0; + size_t argv_size = c->argv ? zmalloc_size(c->argv) : 0; + size_t in_usage = qb_size + c->argv_len_sum + argv_size; size_t out_usage = getClientOutputBufferMemoryUsage(c); /* Track the biggest values observed so far in this slot. */ @@ -6567,7 +6586,7 @@ void dismissMemory(void* ptr, size_t size_hint) { void dismissClientMemory(client *c) { /* Dismiss client query buffer and static reply buffer. */ dismissMemory(c->buf, c->buf_usable_size); - dismissSds(c->querybuf); + if (c->querybuf) dismissSds(c->querybuf); /* Dismiss argv array only if we estimate it contains a big buffer. */ if (c->argc && c->argv_len_sum/c->argc >= server.page_size) { for (int i = 0; i < c->argc; i++) { @@ -7212,6 +7231,11 @@ int main(int argc, char **argv) { loadDataFromDisk(); aofOpenIfNeededOnServerStart(); aofDelHistoryFiles(); + /* While loading data, we delay applying "appendonly" config change. + * If there was a config change while we were inside loadDataFromDisk() + * above, we'll apply it here. */ + applyAppendOnlyConfig(); + if (server.cluster_enabled) { serverAssert(verifyClusterConfigWithData() == C_OK); } diff --git a/src/server.h b/src/server.h index 459d5b9744e..d6b16b9b412 100644 --- a/src/server.h +++ b/src/server.h @@ -388,6 +388,7 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT]; #define CLIENT_MODULE_PREVENT_AOF_PROP (1ULL<<48) /* Module client do not want to propagate to AOF */ #define CLIENT_MODULE_PREVENT_REPL_PROP (1ULL<<49) /* Module client do not want to propagate to replica */ #define CLIENT_REPROCESSING_COMMAND (1ULL<<50) /* The client is re-processing the command. */ +#define CLIENT_REUSABLE_QUERYBUFFER (1ULL<<51) /* The client is using the reusable query buffer. */ /* Any flag that does not let optimize FLUSH SYNC to run it in bg as blocking client ASYNC */ #define CLIENT_AVOID_BLOCKING_ASYNC_FLUSH (CLIENT_DENY_BLOCKING|CLIENT_MULTI|CLIENT_LUA_DEBUG|CLIENT_LUA_DEBUG_SYNC|CLIENT_MODULE) @@ -820,6 +821,8 @@ struct RedisModule { int blocked_clients; /* Count of RedisModuleBlockedClient in this module. */ RedisModuleInfoFunc info_cb; /* Callback for module to add INFO fields. */ RedisModuleDefragFunc defrag_cb; /* Callback for global data defrag. */ + RedisModuleDefragFunc defrag_start_cb; /* Callback indicating defrag started. */ + RedisModuleDefragFunc defrag_end_cb; /* Callback indicating defrag ended. */ struct moduleLoadQueueEntry *loadmod; /* Module load arguments for config rewrite. */ int num_commands_with_acl_categories; /* Number of commands in this module included in acl categories */ int onload; /* Flag to identify if the call is being made from Onload (0 or 1) */ @@ -2555,6 +2558,8 @@ robj *moduleTypeDupOrReply(client *c, robj *fromkey, robj *tokey, int todb, robj int moduleDefragValue(robj *key, robj *obj, int dbid); int moduleLateDefrag(robj *key, robj *value, unsigned long *cursor, long long endtime, int dbid); void moduleDefragGlobals(void); +void moduleDefragStart(void); +void moduleDefragEnd(void); void *moduleGetHandleByName(char *modulename); int moduleIsModuleCommand(void *module_handle, struct redisCommand *cmd); @@ -2627,6 +2632,7 @@ void addReplyDouble(client *c, double d); void addReplyBigNum(client *c, const char *num, size_t len); void addReplyHumanLongDouble(client *c, long double d); void addReplyLongLong(client *c, long long ll); +void addReplyLongLongFromStr(client *c, robj* str); void addReplyArrayLen(client *c, long length); void addReplyMapLen(client *c, long length); void addReplySetLen(client *c, long length); @@ -2690,6 +2696,8 @@ void initThreadedIO(void); client *lookupClientByID(uint64_t id); int authRequired(client *c); void putClientInPendingWriteQueue(client *c); +/* reply macros */ +#define ADD_REPLY_BULK_CBUFFER_STRING_CONSTANT(c, str) addReplyBulkCBuffer(c, str, strlen(str)) /* logreqres.c - logging of requests and responses */ void reqresReset(client *c, int free_buf); @@ -2740,7 +2748,7 @@ robj *listTypeGet(listTypeEntry *entry); unsigned char *listTypeGetValue(listTypeEntry *entry, size_t *vlen, long long *lval); void listTypeInsert(listTypeEntry *entry, robj *value, int where); void listTypeReplace(listTypeEntry *entry, robj *value); -int listTypeEqual(listTypeEntry *entry, robj *o); +int listTypeEqual(listTypeEntry *entry, robj *o, size_t object_len); void listTypeDelete(listTypeIterator *iter, listTypeEntry *entry); robj *listTypeDup(robj *o); void listTypeDelRange(robj *o, long start, long stop); @@ -2877,6 +2885,8 @@ const char *getFailoverStateString(void); /* Generic persistence functions */ void startLoadingFile(size_t size, char* filename, int rdbflags); void startLoading(size_t size, int rdbflags, int async); +void loadingSetFlags(char *filename, size_t size, int async); +void loadingFireEvent(int rdbflags); void loadingAbsProgress(off_t pos); void loadingIncrProgress(off_t size); void stopLoading(int success); @@ -2904,9 +2914,10 @@ int rewriteAppendOnlyFileBackground(void); int loadAppendOnlyFiles(aofManifest *am); void stopAppendOnly(void); int startAppendOnly(void); +void startAppendOnlyWithRetry(void); +void applyAppendOnlyConfig(void); void backgroundRewriteDoneHandler(int exitcode, int bysignal); void killAppendOnlyChild(void); -void restartAOFAfterSYNC(void); void aofLoadManifestFromDisk(void); void aofOpenIfNeededOnServerStart(void); void aofManifestFree(aofManifest *am); @@ -3127,6 +3138,8 @@ void checkChildrenDone(void); int setOOMScoreAdj(int process_class); void rejectCommandFormat(client *c, const char *fmt, ...); void *activeDefragAlloc(void *ptr); +void *activeDefragAllocRaw(size_t size); +void activeDefragFreeRaw(void *ptr); robj *activeDefragStringOb(robj* ob); void dismissSds(sds s); void dismissMemory(void* ptr, size_t size_hint); @@ -3355,10 +3368,12 @@ void propagateDeletion(redisDb *db, robj *key, int lazy); int keyIsExpired(redisDb *db, robj *key); long long getExpire(redisDb *db, robj *key); void setExpire(client *c, redisDb *db, robj *key, long long when); +void setExpireWithDictEntry(client *c, redisDb *db, robj *key, long long when, dictEntry *kde); int checkAlreadyExpired(long long when); int parseExtendedExpireArgumentsOrReply(client *c, int *flags); robj *lookupKeyRead(redisDb *db, robj *key); robj *lookupKeyWrite(redisDb *db, robj *key); +robj *lookupKeyWriteWithDictEntry(redisDb *db, robj *key, dictEntry **deref); robj *lookupKeyReadOrReply(client *c, robj *key, robj *reply); robj *lookupKeyWriteOrReply(client *c, robj *key, robj *reply); robj *lookupKeyReadWithFlags(redisDb *db, robj *key, int flags); @@ -3378,6 +3393,7 @@ int objectSetLRUOrLFU(robj *val, long long lfu_freq, long long lru_idle, dictEntry *dbAdd(redisDb *db, robj *key, robj *val); int dbAddRDBLoad(redisDb *db, sds key, robj *val); void dbReplaceValue(redisDb *db, robj *key, robj *val); +void dbReplaceValueWithDictEntry(redisDb *db, robj *key, robj *val, dictEntry *de); #define SETKEY_KEEPTTL 1 #define SETKEY_NO_SIGNAL 2 @@ -3385,11 +3401,14 @@ void dbReplaceValue(redisDb *db, robj *key, robj *val); #define SETKEY_DOESNT_EXIST 8 #define SETKEY_ADD_OR_UPDATE 16 /* Key most likely doesn't exists */ void setKey(client *c, redisDb *db, robj *key, robj *val, int flags); +void setKeyWithDictEntry(client *c, redisDb *db, robj *key, robj *val, int flags, dictEntry *de); robj *dbRandomKey(redisDb *db); int dbGenericDelete(redisDb *db, robj *key, int async, int flags); int dbSyncDelete(redisDb *db, robj *key); int dbDelete(redisDb *db, robj *key); robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o); +robj *dbUnshareStringValueWithDictEntry(redisDb *db, robj *key, robj *o, dictEntry *de); + #define EMPTYDB_NO_FLAGS 0 /* No flags. */ #define EMPTYDB_ASYNC (1<<0) /* Reclaim memory in another thread. */ @@ -3399,7 +3418,7 @@ long long emptyDbStructure(redisDb *dbarray, int dbnum, int async, void(callback void flushAllDataAndResetRDB(int flags); long long dbTotalServerKeyCount(void); redisDb *initTempDb(void); -void discardTempDb(redisDb *tempDb, void(callback)(dict*)); +void discardTempDb(redisDb *tempDb); int selectDb(client *c, int id); @@ -3634,6 +3653,7 @@ void scardCommand(client *c); void spopCommand(client *c); void srandmemberCommand(client *c); void sinterCommand(client *c); +void smembersCommand(client *c); void sinterCardCommand(client *c); void sinterstoreCommand(client *c); void sunionCommand(client *c); diff --git a/src/t_hash.c b/src/t_hash.c index 66a3083c629..f06040e3349 100644 --- a/src/t_hash.c +++ b/src/t_hash.c @@ -2428,7 +2428,11 @@ void genericHgetallCommand(client *c, int flags) { /* We return a map if the user requested keys and values, like in the * HGETALL case. Otherwise to use a flat array makes more sense. */ - length = hashTypeLength(o, 1 /*subtractExpiredFields*/); + if ((length = hashTypeLength(o, 1 /*subtractExpiredFields*/)) == 0) { + addReply(c, emptyResp); + return; + } + if (flags & OBJ_HASH_KEY && flags & OBJ_HASH_VALUE) { addReplyMapLen(c, length); } else { @@ -2437,11 +2441,7 @@ void genericHgetallCommand(client *c, int flags) { hi = hashTypeInitIterator(o); - /* Skip expired fields if the hash has an expire time set at global HFE DS. We could - * set it to constant 1, but then it will make another lookup for each field expiration */ - int skipExpiredFields = (EB_EXPIRE_TIME_INVALID == hashTypeGetMinExpire(o, 0)) ? 0 : 1; - - while (hashTypeNext(hi, skipExpiredFields) != C_ERR) { + while (hashTypeNext(hi, 1 /*skipExpiredFields*/) != C_ERR) { if (flags & OBJ_HASH_KEY) { addHashIteratorCursorToReply(c, hi, OBJ_HASH_KEY); count++; diff --git a/src/t_list.c b/src/t_list.c index ba8ece9f57d..98b180aa16a 100644 --- a/src/t_list.c +++ b/src/t_list.c @@ -383,12 +383,12 @@ int listTypeReplaceAtIndex(robj *o, int index, robj *value) { } /* Compare the given object with the entry at the current position. */ -int listTypeEqual(listTypeEntry *entry, robj *o) { +int listTypeEqual(listTypeEntry *entry, robj *o, size_t object_len) { serverAssertWithInfo(NULL,o,sdsEncodedObject(o)); if (entry->li->encoding == OBJ_ENCODING_QUICKLIST) { - return quicklistCompare(&entry->entry,o->ptr,sdslen(o->ptr)); + return quicklistCompare(&entry->entry,o->ptr,object_len); } else if (entry->li->encoding == OBJ_ENCODING_LISTPACK) { - return lpCompare(entry->lpe,o->ptr,sdslen(o->ptr)); + return lpCompare(entry->lpe,o->ptr,object_len); } else { serverPanic("Unknown list encoding"); } @@ -538,8 +538,9 @@ void linsertCommand(client *c) { /* Seek pivot from head to tail */ iter = listTypeInitIterator(subject,0,LIST_TAIL); + const size_t object_len = sdslen(c->argv[3]->ptr); while (listTypeNext(iter,&entry)) { - if (listTypeEqual(&entry,c->argv[3])) { + if (listTypeEqual(&entry,c->argv[3],object_len)) { listTypeInsert(&entry,c->argv[4],where); inserted = 1; break; @@ -999,8 +1000,9 @@ void lposCommand(client *c) { listTypeEntry entry; long llen = listTypeLength(o); long index = 0, matches = 0, matchindex = -1, arraylen = 0; + const size_t ele_len = sdslen(ele->ptr); while (listTypeNext(li,&entry) && (maxlen == 0 || index < maxlen)) { - if (listTypeEqual(&entry,ele)) { + if (listTypeEqual(&entry,ele,ele_len)) { matches++; matchindex = (direction == LIST_TAIL) ? index : llen - index - 1; if (matches >= rank) { @@ -1052,8 +1054,9 @@ void lremCommand(client *c) { } listTypeEntry entry; + const size_t object_len = sdslen(c->argv[3]->ptr); while (listTypeNext(li,&entry)) { - if (listTypeEqual(&entry,obj)) { + if (listTypeEqual(&entry,obj,object_len)) { listTypeDelete(li, &entry); server.dirty++; removed++; diff --git a/src/t_set.c b/src/t_set.c index f16cde81830..c2bb120a80a 100644 --- a/src/t_set.c +++ b/src/t_set.c @@ -1244,7 +1244,7 @@ int qsortCompareSetsByRevCardinality(const void *s1, const void *s2) { return 0; } -/* SINTER / SMEMBERS / SINTERSTORE / SINTERCARD +/* SINTER / SINTERSTORE / SINTERCARD * * 'cardinality_only' work for SINTERCARD, only return the cardinality * with minimum processing and memory overheads. @@ -1420,6 +1420,36 @@ void sinterCommand(client *c) { sinterGenericCommand(c, c->argv+1, c->argc-1, NULL, 0, 0); } +/* SMEMBERS key */ +void smembersCommand(client *c) { + setTypeIterator *si; + char *str; + size_t len; + int64_t intobj; + robj *setobj = lookupKeyRead(c->db, c->argv[1]); + if (checkType(c,setobj,OBJ_SET)) return; + if (!setobj) { + addReply(c, shared.emptyset[c->resp]); + return; + } + + /* Prepare the response. */ + unsigned long length = setTypeSize(setobj); + addReplySetLen(c,length); + /* Iterate through the elements of the set. */ + si = setTypeInitIterator(setobj); + + while (setTypeNext(si, &str, &len, &intobj) != -1) { + if (str != NULL) + addReplyBulkCBuffer(c, str, len); + else + addReplyBulkLongLong(c, intobj); + length--; + } + setTypeReleaseIterator(si); + serverAssert(length == 0); /* fail on corrupt data */ +} + /* SINTERCARD numkeys key [key ...] [LIMIT limit] */ void sinterCardCommand(client *c) { long j; diff --git a/src/t_string.c b/src/t_string.c index 067617a92d0..d1d6dce3900 100644 --- a/src/t_string.c +++ b/src/t_string.c @@ -73,7 +73,8 @@ void setGenericCommand(client *c, int flags, robj *key, robj *val, robj *expire, if (getGenericCommand(c) == C_ERR) return; } - found = (lookupKeyWrite(c->db,key) != NULL); + dictEntry *de = NULL; + found = (lookupKeyWriteWithDictEntry(c->db,key,&de) != NULL); if ((flags & OBJ_SET_NX && found) || (flags & OBJ_SET_XX && !found)) @@ -88,12 +89,12 @@ void setGenericCommand(client *c, int flags, robj *key, robj *val, robj *expire, setkey_flags |= ((flags & OBJ_KEEPTTL) || expire) ? SETKEY_KEEPTTL : 0; setkey_flags |= found ? SETKEY_ALREADY_EXIST : SETKEY_DOESNT_EXIST; - setKey(c,c->db,key,val,setkey_flags); + setKeyWithDictEntry(c,c->db,key,val,setkey_flags,de); server.dirty++; notifyKeyspaceEvent(NOTIFY_STRING,"set",key,c->db->id); if (expire) { - setExpire(c,c->db,key,milliseconds); + setExpireWithDictEntry(c,c->db,key,milliseconds,de); /* Propagate as SET Key Value PXAT millisecond-timestamp if there is * EX/PX/EXAT flag. */ if (!(flags & OBJ_PXAT)) { @@ -422,6 +423,7 @@ void setrangeCommand(client *c) { robj *o; long offset; sds value = c->argv[3]->ptr; + const size_t value_len = sdslen(value); if (getLongFromObjectOrReply(c,c->argv[2],&offset,NULL) != C_OK) return; @@ -431,19 +433,20 @@ void setrangeCommand(client *c) { return; } - o = lookupKeyWrite(c->db,c->argv[1]); + dictEntry *de; + o = lookupKeyWriteWithDictEntry(c->db,c->argv[1],&de); if (o == NULL) { /* Return 0 when setting nothing on a non-existing string */ - if (sdslen(value) == 0) { + if (value_len == 0) { addReply(c,shared.czero); return; } /* Return when the resulting string exceeds allowed size */ - if (checkStringLength(c,offset,sdslen(value)) != C_OK) + if (checkStringLength(c,offset,value_len) != C_OK) return; - o = createObject(OBJ_STRING,sdsnewlen(NULL, offset+sdslen(value))); + o = createObject(OBJ_STRING,sdsnewlen(NULL, offset+value_len)); dbAdd(c->db,c->argv[1],o); } else { size_t olen; @@ -454,22 +457,22 @@ void setrangeCommand(client *c) { /* Return existing string length when setting nothing */ olen = stringObjectLen(o); - if (sdslen(value) == 0) { + if (value_len == 0) { addReplyLongLong(c,olen); return; } /* Return when the resulting string exceeds allowed size */ - if (checkStringLength(c,offset,sdslen(value)) != C_OK) + if (checkStringLength(c,offset,value_len) != C_OK) return; /* Create a copy when the object is shared or encoded. */ - o = dbUnshareStringValue(c->db,c->argv[1],o); + o = dbUnshareStringValueWithDictEntry(c->db,c->argv[1],o,de); } - if (sdslen(value) > 0) { - o->ptr = sdsgrowzero(o->ptr,offset+sdslen(value)); - memcpy((char*)o->ptr+offset,value,sdslen(value)); + if (value_len > 0) { + o->ptr = sdsgrowzero(o->ptr,offset+value_len); + memcpy((char*)o->ptr+offset,value,value_len); signalModifiedKey(c,c->db,c->argv[1]); notifyKeyspaceEvent(NOTIFY_STRING, "setrange",c->argv[1],c->db->id); @@ -571,8 +574,8 @@ void msetnxCommand(client *c) { void incrDecrCommand(client *c, long long incr) { long long value, oldvalue; robj *o, *new; - - o = lookupKeyWrite(c->db,c->argv[1]); + dictEntry *de; + o = lookupKeyWriteWithDictEntry(c->db,c->argv[1],&de); if (checkType(c,o,OBJ_STRING)) return; if (getLongLongFromObjectOrReply(c,o,&value,NULL) != C_OK) return; @@ -593,7 +596,7 @@ void incrDecrCommand(client *c, long long incr) { } else { new = createStringObjectFromLongLongForValue(value); if (o) { - dbReplaceValue(c->db,c->argv[1],new); + dbReplaceValueWithDictEntry(c->db,c->argv[1],new,de); } else { dbAdd(c->db,c->argv[1],new); } @@ -601,7 +604,7 @@ void incrDecrCommand(client *c, long long incr) { signalModifiedKey(c,c->db,c->argv[1]); notifyKeyspaceEvent(NOTIFY_STRING,"incrby",c->argv[1],c->db->id); server.dirty++; - addReplyLongLong(c, value); + addReplyLongLongFromStr(c,new); } void incrCommand(client *c) { @@ -635,7 +638,8 @@ void incrbyfloatCommand(client *c) { long double incr, value; robj *o, *new; - o = lookupKeyWrite(c->db,c->argv[1]); + dictEntry *de; + o = lookupKeyWriteWithDictEntry(c->db,c->argv[1],&de); if (checkType(c,o,OBJ_STRING)) return; if (getLongDoubleFromObjectOrReply(c,o,&value,NULL) != C_OK || getLongDoubleFromObjectOrReply(c,c->argv[2],&incr,NULL) != C_OK) @@ -648,7 +652,7 @@ void incrbyfloatCommand(client *c) { } new = createStringObjectFromLongDouble(value,1); if (o) - dbReplaceValue(c->db,c->argv[1],new); + dbReplaceValueWithDictEntry(c->db,c->argv[1],new,de); else dbAdd(c->db,c->argv[1],new); signalModifiedKey(c,c->db,c->argv[1]); @@ -668,7 +672,8 @@ void appendCommand(client *c) { size_t totlen; robj *o, *append; - o = lookupKeyWrite(c->db,c->argv[1]); + dictEntry *de; + o = lookupKeyWriteWithDictEntry(c->db,c->argv[1],&de); if (o == NULL) { /* Create the key */ c->argv[2] = tryObjectEncoding(c->argv[2]); @@ -682,12 +687,13 @@ void appendCommand(client *c) { /* "append" is an argument, so always an sds */ append = c->argv[2]; - if (checkStringLength(c,stringObjectLen(o),sdslen(append->ptr)) != C_OK) + const size_t append_len = sdslen(append->ptr); + if (checkStringLength(c,stringObjectLen(o),append_len) != C_OK) return; /* Append the value */ - o = dbUnshareStringValue(c->db,c->argv[1],o); - o->ptr = sdscatlen(o->ptr,append->ptr,sdslen(append->ptr)); + o = dbUnshareStringValueWithDictEntry(c->db,c->argv[1],o,de); + o->ptr = sdscatlen(o->ptr,append->ptr,append_len); totlen = sdslen(o->ptr); } signalModifiedKey(c,c->db,c->argv[1]); diff --git a/tests/cluster/tests/14-consistency-check.tcl b/tests/cluster/tests/14-consistency-check.tcl index e3b9a1918eb..e6f7723fc3c 100644 --- a/tests/cluster/tests/14-consistency-check.tcl +++ b/tests/cluster/tests/14-consistency-check.tcl @@ -32,6 +32,13 @@ proc get_one_of_my_replica {id} { } set replica_port [lindex [lindex [lindex [R $id role] 2] 0] 1] set replica_id_num [get_instance_id_by_port redis $replica_port] + + # To avoid -LOADING reply, wait until replica syncs with master. + wait_for_condition 1000 50 { + [RI $replica_id_num master_link_status] eq {up} + } else { + fail "Replica did not sync in time." + } return $replica_id_num } @@ -70,6 +77,7 @@ proc test_slave_load_expired_keys {aof} { # wait for replica to be in sync with master wait_for_condition 500 10 { + [RI $replica_id master_link_status] eq {up} && [R $replica_id dbsize] eq [R $master_id dbsize] } else { fail "replica didn't sync" @@ -104,8 +112,15 @@ proc test_slave_load_expired_keys {aof} { # start the replica again (loading an RDB or AOF file) restart_instance redis $replica_id + # Replica may start a full sync after restart, trying in a loop to avoid + # -LOADING reply in that case. + wait_for_condition 1000 50 { + [catch {set replica_dbsize_3 [R $replica_id dbsize]} e] == 0 + } else { + fail "Replica is not up." + } + # make sure the keys are still there - set replica_dbsize_3 [R $replica_id dbsize] assert {$replica_dbsize_3 > $replica_dbsize_0} # restore settings @@ -113,6 +128,7 @@ proc test_slave_load_expired_keys {aof} { # wait for the master to expire all keys and replica to get the DELs wait_for_condition 500 10 { + [RI $replica_id master_link_status] eq {up} && [R $replica_id dbsize] eq $master_dbsize_0 } else { fail "keys didn't expire" diff --git a/tests/cluster/tests/28-cluster-shards.tcl b/tests/cluster/tests/28-cluster-shards.tcl index cbdae001fb9..2271ff6f81e 100644 --- a/tests/cluster/tests/28-cluster-shards.tcl +++ b/tests/cluster/tests/28-cluster-shards.tcl @@ -137,7 +137,8 @@ test "Restarting primary node" { test "Instance #0 gets converted into a replica" { wait_for_condition 1000 50 { - [RI $replica_id role] eq {slave} + [RI $replica_id role] eq {slave} && + [RI $replica_id master_link_status] eq {up} } else { fail "Old primary was not converted into replica" } diff --git a/tests/cluster/tests/includes/init-tests.tcl b/tests/cluster/tests/includes/init-tests.tcl index 4875a010694..c0e8982413b 100644 --- a/tests/cluster/tests/includes/init-tests.tcl +++ b/tests/cluster/tests/includes/init-tests.tcl @@ -32,6 +32,17 @@ test "Cluster nodes hard reset" { } else { set node_timeout 3000 } + + # Wait until slave is synced. Otherwise, it may reply -LOADING + # for any commands below. + if {[RI $id role] eq {slave}} { + wait_for_condition 50 1000 { + [RI $id master_link_status] eq {up} + } else { + fail "Slave were not able to sync." + } + } + catch {R $id flushall} ; # May fail for readonly slaves. R $id MULTI R $id cluster reset hard diff --git a/tests/integration/aof.tcl b/tests/integration/aof.tcl index 137b2d86339..7fbf520da86 100644 --- a/tests/integration/aof.tcl +++ b/tests/integration/aof.tcl @@ -655,4 +655,50 @@ tags {"aof external:skip"} { } } } + + start_server {overrides {loading-process-events-interval-bytes 1024}} { + test "Allow changing appendonly config while loading from AOF on startup" { + # Set AOF enabled, populate db and restart. + r config set appendonly yes + r config set key-load-delay 100 + r config rewrite + populate 10000 + restart_server 0 false false + + # Disable AOF while loading from the disk. + assert_equal 1 [s loading] + r config set appendonly no + assert_equal 1 [s loading] + + # Speed up loading, verify AOF disabled. + r config set key-load-delay 0 + wait_done_loading r + assert_equal {10000} [r dbsize] + assert_equal 0 [s aof_enabled] + } + + test "Allow changing appendonly config while loading from RDB on startup" { + # Set AOF disabled, populate db and restart. + r flushall + r config set appendonly no + r config set key-load-delay 100 + r config rewrite + populate 10000 + r save + restart_server 0 false false + + # Enable AOF while loading from the disk. + assert_equal 1 [s loading] + r config set appendonly yes + assert_equal 1 [s loading] + + # Speed up loading, verify AOF enabled, do a quick sanity. + r config set key-load-delay 0 + wait_done_loading r + assert_equal {10000} [r dbsize] + assert_equal 1 [s aof_enabled] + r set t 1 + assert_equal {1} [r get t] + } + } } diff --git a/tests/integration/corrupt-dump.tcl b/tests/integration/corrupt-dump.tcl index 3e644cbb968..273ab4fb946 100644 --- a/tests/integration/corrupt-dump.tcl +++ b/tests/integration/corrupt-dump.tcl @@ -885,5 +885,17 @@ test {corrupt payload: fuzzer findings - set with duplicate elements causes sdif } } {} {logreqres:skip} ;# This test violates {"uniqueItems": true} +test {corrupt payload: fuzzer findings - set with invalid length causes smembers to hang} { + start_server [list overrides [list loglevel verbose use-exit-on-panic yes crash-memcheck-enabled no] ] { + # In the past, it generated a broken protocol and left the client hung in smembers + r config set sanitize-dump-payload no + assert_equal {OK} [r restore _set 0 "\x14\x16\x16\x00\x00\x00\x0c\x00\x81\x61\x02\x81\x62\x02\x81\x63\x02\x01\x01\x02\x01\x03\x01\xff\x0c\x00\x91\x00\x56\x73\xc1\x82\xd5\xbd" replace] + assert_encoding listpack _set + catch { r SMEMBERS _set } err + assert_equal [count_log_message 0 "crashed by signal"] 0 + assert_equal [count_log_message 0 "ASSERTION FAILED"] 1 + } +} + } ;# tags diff --git a/tests/integration/replication.tcl b/tests/integration/replication.tcl index 43bc071ed2b..6f4c3d43d8f 100644 --- a/tests/integration/replication.tcl +++ b/tests/integration/replication.tcl @@ -652,7 +652,6 @@ foreach testType {Successful Aborted} { } test {Blocked commands and configs during async-loading} { - assert_error {LOADING*} {$replica config set appendonly no} assert_error {LOADING*} {$replica REPLICAOF no one} } @@ -1457,3 +1456,143 @@ start_server {tags {"repl external:skip"}} { } } } + +foreach disklessload {disabled on-empty-db} { + test "Replica should reply LOADING while flushing a large db (disklessload: $disklessload)" { + start_server {} { + set replica [srv 0 client] + start_server {} { + set master [srv 0 client] + set master_host [srv 0 host] + set master_port [srv 0 port] + + $replica config set repl-diskless-load $disklessload + + # Populate replica with many keys, master with a few keys. + $replica debug populate 2000000 + populate 3 master 10 + + # Start the replication process... + $replica replicaof $master_host $master_port + + wait_for_condition 100 100 { + [s -1 loading] eq 1 + } else { + fail "Replica didn't get into loading mode" + } + + # If replica has a large db, it may take some time to discard it + # after receiving new db from the master. In this case, replica + # should reply -LOADING. Replica may reply -LOADING while + # loading the new db as well. To test the first case, populated + # replica with large amount of keys and master with a few keys. + # Discarding old db will take a long time and loading new one + # will be quick. So, if we receive -LOADING, most probably it is + # when flushing the db. + wait_for_condition 1 10000 { + [catch {$replica ping} err] && + [string match *LOADING* $err] + } else { + # There is a chance that we may not catch LOADING response + # if flushing db happens too fast compared to test execution + # Then, we may consider increasing key count or introducing + # artificial delay to db flush. + fail "Replica did not reply LOADING." + } + + catch {$replica shutdown nosave} + } + } + } {} {repl external:skip} +} + +start_server {tags {"repl external:skip"} overrides {save {}}} { + set master [srv 0 client] + set master_host [srv 0 host] + set master_port [srv 0 port] + populate 10000 master 10 + + start_server {overrides {save {} rdb-del-sync-files yes loading-process-events-interval-bytes 1024}} { + test "Allow appendonly config change while loading rdb on slave" { + set replica [srv 0 client] + + # While loading rdb on slave, verify appendonly config changes are allowed + # 1- Change appendonly config from no to yes + $replica config set appendonly no + $replica config set key-load-delay 100 + $replica debug populate 1000 + + # Start the replication process... + $replica replicaof $master_host $master_port + + wait_for_condition 10 1000 { + [s loading] eq 1 + } else { + fail "Replica didn't get into loading mode" + } + + # Change config while replica is loading data + $replica config set appendonly yes + assert_equal 1 [s loading] + + # Speed up loading and verify aof is enabled + $replica config set key-load-delay 0 + wait_done_loading $replica + assert_equal 1 [s aof_enabled] + + # Quick sanity for AOF + $replica replicaof no one + set prev [s aof_current_size] + $replica set x 100 + assert_morethan [s aof_current_size] $prev + + # 2- While loading rdb, change appendonly from yes to no + $replica config set appendonly yes + $replica config set key-load-delay 100 + $replica flushall + + # Start the replication process... + $replica replicaof $master_host $master_port + + wait_for_condition 10 1000 { + [s loading] eq 1 + } else { + fail "Replica didn't get into loading mode" + } + + # Change config while replica is loading data + $replica config set appendonly no + assert_equal 1 [s loading] + + # Speed up loading and verify aof is disabled + $replica config set key-load-delay 0 + wait_done_loading $replica + assert_equal 0 [s 0 aof_enabled] + } + } +} + +start_server {tags {"repl external:skip"}} { + set replica [srv 0 client] + start_server {} { + set master [srv 0 client] + set master_host [srv 0 host] + set master_port [srv 0 port] + + test "Replica flushes db lazily when replica-lazy-flush enabled" { + $replica config set replica-lazy-flush yes + $replica debug populate 1000 + populate 1 master 10 + + # Start the replication process... + $replica replicaof $master_host $master_port + + wait_for_condition 100 100 { + [s -1 lazyfreed_objects] >= 1000 && + [s -1 master_link_status] eq {up} + } else { + fail "Replica did not free db lazily" + } + } + } +} diff --git a/tests/modules/defragtest.c b/tests/modules/defragtest.c index 6a02a059f2d..a27b57e13b1 100644 --- a/tests/modules/defragtest.c +++ b/tests/modules/defragtest.c @@ -3,6 +3,7 @@ #include "redismodule.h" #include +#include static RedisModuleType *FragType; @@ -17,9 +18,12 @@ unsigned long int last_set_cursor = 0; unsigned long int datatype_attempts = 0; unsigned long int datatype_defragged = 0; +unsigned long int datatype_raw_defragged = 0; unsigned long int datatype_resumes = 0; unsigned long int datatype_wrong_cursor = 0; unsigned long int global_attempts = 0; +unsigned long int defrag_started = 0; +unsigned long int defrag_ended = 0; unsigned long int global_defragged = 0; int global_strings_len = 0; @@ -47,16 +51,29 @@ static void defragGlobalStrings(RedisModuleDefragCtx *ctx) } } +static void defragStart(RedisModuleDefragCtx *ctx) { + REDISMODULE_NOT_USED(ctx); + defrag_started++; +} + +static void defragEnd(RedisModuleDefragCtx *ctx) { + REDISMODULE_NOT_USED(ctx); + defrag_ended++; +} + static void FragInfo(RedisModuleInfoCtx *ctx, int for_crash_report) { REDISMODULE_NOT_USED(for_crash_report); RedisModule_InfoAddSection(ctx, "stats"); RedisModule_InfoAddFieldLongLong(ctx, "datatype_attempts", datatype_attempts); RedisModule_InfoAddFieldLongLong(ctx, "datatype_defragged", datatype_defragged); + RedisModule_InfoAddFieldLongLong(ctx, "datatype_raw_defragged", datatype_raw_defragged); RedisModule_InfoAddFieldLongLong(ctx, "datatype_resumes", datatype_resumes); RedisModule_InfoAddFieldLongLong(ctx, "datatype_wrong_cursor", datatype_wrong_cursor); RedisModule_InfoAddFieldLongLong(ctx, "global_attempts", global_attempts); RedisModule_InfoAddFieldLongLong(ctx, "global_defragged", global_defragged); + RedisModule_InfoAddFieldLongLong(ctx, "defrag_started", defrag_started); + RedisModule_InfoAddFieldLongLong(ctx, "defrag_ended", defrag_ended); } struct FragObject *createFragObject(unsigned long len, unsigned long size, int maxstep) { @@ -79,10 +96,13 @@ static int fragResetStatsCommand(RedisModuleCtx *ctx, RedisModuleString **argv, datatype_attempts = 0; datatype_defragged = 0; + datatype_raw_defragged = 0; datatype_resumes = 0; datatype_wrong_cursor = 0; global_attempts = 0; global_defragged = 0; + defrag_started = 0; + defrag_ended = 0; RedisModule_ReplyWithSimpleString(ctx, "OK"); return REDISMODULE_OK; @@ -188,6 +208,14 @@ int FragDefrag(RedisModuleDefragCtx *ctx, RedisModuleString *key, void **value) } } + /* Defrag the values array itself using RedisModule_DefragAllocRaw + * and RedisModule_DefragFreeRaw for testing purposes. */ + void *new_values = RedisModule_DefragAllocRaw(ctx, o->len * sizeof(void*)); + memcpy(new_values, o->values, o->len * sizeof(void*)); + RedisModule_DefragFreeRaw(ctx, o->values); + o->values = new_values; + datatype_raw_defragged++; + last_set_cursor = 0; return 0; } @@ -230,6 +258,7 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) RedisModule_RegisterInfoFunc(ctx, FragInfo); RedisModule_RegisterDefragFunc(ctx, defragGlobalStrings); + RedisModule_RegisterDefragCallbacks(ctx, defragStart, defragEnd); return REDISMODULE_OK; } diff --git a/tests/support/test.tcl b/tests/support/test.tcl index b7cd38b3823..d85f31e0b18 100644 --- a/tests/support/test.tcl +++ b/tests/support/test.tcl @@ -217,6 +217,7 @@ proc test {name code {okpattern undefined} {tags {}}} { send_data_packet $::test_server_fd testing $name + set failed false set test_start_time [clock milliseconds] if {[catch {set retval [uplevel 1 $code]} error]} { set assertion [string match "assertion:*" $error] @@ -231,6 +232,7 @@ proc test {name code {okpattern undefined} {tags {}}} { lappend ::tests_failed $details incr ::num_failed + set failed true send_data_packet $::test_server_fd err [join $details "\n"] if {$::stop_on_failure} { @@ -253,10 +255,17 @@ proc test {name code {okpattern undefined} {tags {}}} { lappend ::tests_failed $details incr ::num_failed + set failed true send_data_packet $::test_server_fd err [join $details "\n"] } } + if {$::dump_logs && $failed} { + foreach srv $::servers { + dump_server_log $srv + } + } + if {$::traceleaks} { set output [exec leaks redis-server] if {![string match {*0 leaks*} $output]} { diff --git a/tests/unit/moduleapi/defrag.tcl b/tests/unit/moduleapi/defrag.tcl index b2e23967ec6..3f36ee1912b 100644 --- a/tests/unit/moduleapi/defrag.tcl +++ b/tests/unit/moduleapi/defrag.tcl @@ -18,6 +18,9 @@ start_server {tags {"modules"} overrides {{save ""}}} { set info [r info defragtest_stats] assert {[getInfoProperty $info defragtest_datatype_attempts] > 0} assert_equal 0 [getInfoProperty $info defragtest_datatype_resumes] + assert_morethan [getInfoProperty $info defragtest_datatype_raw_defragged] 0 + assert_morethan [getInfoProperty $info defragtest_defrag_started] 0 + assert_morethan [getInfoProperty $info defragtest_defrag_ended] 0 } test {Module defrag: late defrag with cursor works} { @@ -32,6 +35,9 @@ start_server {tags {"modules"} overrides {{save ""}}} { set info [r info defragtest_stats] assert {[getInfoProperty $info defragtest_datatype_resumes] > 10} assert_equal 0 [getInfoProperty $info defragtest_datatype_wrong_cursor] + assert_morethan [getInfoProperty $info defragtest_datatype_raw_defragged] 0 + assert_morethan [getInfoProperty $info defragtest_defrag_started] 0 + assert_morethan [getInfoProperty $info defragtest_defrag_ended] 0 } test {Module defrag: global defrag works} { @@ -41,6 +47,8 @@ start_server {tags {"modules"} overrides {{save ""}}} { after 2000 set info [r info defragtest_stats] assert {[getInfoProperty $info defragtest_global_attempts] > 0} + assert_morethan [getInfoProperty $info defragtest_defrag_started] 0 + assert_morethan [getInfoProperty $info defragtest_defrag_ended] 0 } } } diff --git a/tests/unit/moduleapi/rdbloadsave.tcl b/tests/unit/moduleapi/rdbloadsave.tcl index 9319c938549..f10bf131392 100644 --- a/tests/unit/moduleapi/rdbloadsave.tcl +++ b/tests/unit/moduleapi/rdbloadsave.tcl @@ -83,6 +83,9 @@ start_server {tags {"modules"}} { # Verify the value in the loaded rdb assert_equal v1 [r get k] + # Verify aof is still enabled after RM_RdbLoad() call + assert_equal 1 [s aof_enabled] + r flushdb r config set rdb-key-save-delay 0 r config set appendonly no diff --git a/tests/unit/querybuf.tcl b/tests/unit/querybuf.tcl index f4859dd2782..d0591115645 100644 --- a/tests/unit/querybuf.tcl +++ b/tests/unit/querybuf.tcl @@ -1,3 +1,15 @@ +# +# Copyright (c) 2009-Present, Redis Ltd. +# All rights reserved. +# +# Copyright (c) 2024-present, Valkey contributors. +# All rights reserved. +# +# Licensed under your choice of the Redis Source Available License 2.0 +# (RSALv2) or the Server Side Public License v1 (SSPLv1). +# +# Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information. +# proc client_idle_sec {name} { set clients [split [r client list] "\r\n"] set c [lsearch -inline $clients *name=$name*] @@ -24,16 +36,45 @@ start_server {tags {"querybuf slow"}} { # The test will run at least 2s to check if client query # buffer will be resized when client idle 2s. test "query buffer resized correctly" { - set rd [redis_client] + + set rd [redis_deferring_client] + $rd client setname test_client + $rd read + + # Make sure query buff has size of 0 bytes at start as the client uses the reusable qb. + assert {[client_query_buffer test_client] == 0} + + # Pause cron to prevent premature shrinking (timing issue). + r debug pause-cron 1 + + # Send partial command to client to make sure it doesn't use the reusable qb. + $rd write "*3\r\n\$3\r\nset\r\n\$2\r\na" + $rd flush + # Wait for the client to start using a private query buffer. + wait_for_condition 1000 10 { + [client_query_buffer test_client] > 0 + } else { + fail "client should start using a private query buffer" + } + + # send the rest of the command + $rd write "a\r\n\$1\r\nb\r\n" + $rd flush + assert_equal {OK} [$rd read] + set orig_test_client_qbuf [client_query_buffer test_client] # Make sure query buff has less than the peak resize threshold (PROTO_RESIZE_THRESHOLD) 32k # but at least the basic IO reading buffer size (PROTO_IOBUF_LEN) 16k - assert {$orig_test_client_qbuf >= 16384 && $orig_test_client_qbuf < 32768} + set MAX_QUERY_BUFFER_SIZE [expr 32768 + 2] ; # 32k + 2, allowing for potential greedy allocation of (16k + 1) * 2 bytes for the query buffer. + assert {$orig_test_client_qbuf >= 16384 && $orig_test_client_qbuf <= $MAX_QUERY_BUFFER_SIZE} + + # Allow shrinking to occur + r debug pause-cron 0 # Check that the initial query buffer is resized after 2 sec wait_for_condition 1000 10 { - [client_idle_sec test_client] >= 3 && [client_query_buffer test_client] == 0 + [client_idle_sec test_client] >= 3 && [client_query_buffer test_client] < $orig_test_client_qbuf } else { fail "query buffer was not resized" } @@ -75,10 +116,27 @@ start_server {tags {"querybuf slow"}} { test "query buffer resized correctly with fat argv" { set rd [redis_client] $rd client setname test_client + + # Pause cron to prevent premature shrinking (timing issue). + r debug pause-cron 1 + $rd write "*3\r\n\$3\r\nset\r\n\$1\r\na\r\n\$1000000\r\n" $rd flush + + # Wait for the client to start using a private query buffer of > 1000000 size. + wait_for_condition 1000 10 { + [client_query_buffer test_client] > 1000000 + } else { + fail "client should start using a private query buffer" + } - after 20 + # Send the start of the arg and make sure the client is not using reusable qb for it rather a private buf of > 1000000 size. + $rd write "a" + $rd flush + + r debug pause-cron 0 + + after 120 if {[client_query_buffer test_client] < 1000000} { fail "query buffer should not be resized when client idle time smaller than 2s" } @@ -92,5 +150,24 @@ start_server {tags {"querybuf slow"}} { $rd close } +} + +start_server {tags {"querybuf"}} { + test "Client executes small argv commands using reusable query buffer" { + set rd [redis_deferring_client] + $rd client setname test_client + $rd read + set res [r client list] + + # Verify that the client does not create a private query buffer after + # executing a small parameter command. + assert_match {*name=test_client * qbuf=0 qbuf-free=0 * cmd=client|setname *} $res + # The client executing the command is currently using the reusable query buffer, + # so the size shown is that of the reusable query buffer. It will be returned + # to the reusable query buffer after command execution. + assert_match {*qbuf=26 qbuf-free=* cmd=client|list *} $res + + $rd close + } } diff --git a/tests/unit/scripting.tcl b/tests/unit/scripting.tcl index a452fb22b4d..edc03f4aa47 100644 --- a/tests/unit/scripting.tcl +++ b/tests/unit/scripting.tcl @@ -2069,6 +2069,14 @@ start_server {tags {"scripting"}} { } 1 x r replicaof [srv -1 host] [srv -1 port] + + # To avoid -LOADING reply, wait until replica syncs with master. + wait_for_condition 50 100 { + [s master_link_status] eq {up} + } else { + fail "Replica did not sync in time." + } + assert_error {EXECABORT Transaction discarded because of: READONLY *} {$rr exec} assert_error {READONLY You can't write against a read only replica. script: *} {$rr2 exec} $rr close diff --git a/tests/unit/type/hash-field-expire.tcl b/tests/unit/type/hash-field-expire.tcl index 2c0ff8abdba..d955384a966 100644 --- a/tests/unit/type/hash-field-expire.tcl +++ b/tests/unit/type/hash-field-expire.tcl @@ -560,10 +560,10 @@ start_server {tags {"external:skip needs:debug"}} { # Test with small hash r debug set-active-expire 0 r del myhash - r hset myhash1 f1 v1 f2 v2 f3 v3 f4 v4 f5 v5 f6 v6 - r hpexpire myhash1 1 NX FIELDS 3 f2 f4 f6 + r hset myhash f1 v1 f2 v2 f3 v3 f4 v4 f5 v5 f6 v6 + r hpexpire myhash 1 NX FIELDS 3 f2 f4 f6 after 10 - assert_equal [lsort [r hgetall myhash1]] "f1 f3 f5 v1 v3 v5" + assert_equal [lsort [r hgetall myhash]] "f1 f3 f5 v1 v3 v5" # Test with large hash r del myhash @@ -573,6 +573,13 @@ start_server {tags {"external:skip needs:debug"}} { } after 10 assert_equal [lsort [r hgetall myhash]] [lsort "f1 f2 f3 v1 v2 v3"] + + # hash that all fields are expired return empty result + r del myhash + r hset myhash f1 v1 f2 v2 f3 v3 f4 v4 f5 v5 f6 v6 + r hpexpire myhash 1 FIELDS 6 f1 f2 f3 f4 f5 f6 + after 10 + assert_equal [r hgetall myhash] "" r debug set-active-expire 1 }