diff --git a/src/config.c b/src/config.c index 12c999e747..65d286cd55 100644 --- a/src/config.c +++ b/src/config.c @@ -3193,6 +3193,7 @@ standardConfig static_configs[] = { createBoolConfig("cluster-slot-stats-enabled", NULL, MODIFIABLE_CONFIG, server.cluster_slot_stats_enabled, 0, NULL, NULL), createBoolConfig("hide-user-data-from-log", NULL, MODIFIABLE_CONFIG, server.hide_user_data_from_log, 1, NULL, NULL), createBoolConfig("import-mode", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG, server.import_mode, 0, NULL, NULL), + createBoolConfig("write-throttling", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG, server.write_throttling, 0, NULL, NULL), /* String Configs */ createStringConfig("aclfile", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, server.acl_filename, "", NULL, NULL), diff --git a/src/networking.c b/src/networking.c index 302aee368c..e33516bfcf 100644 --- a/src/networking.c +++ b/src/networking.c @@ -4611,20 +4611,23 @@ char *getClientTypeName(int class) { } } -/* The function checks if the client reached output buffer soft or hard - * limit, and also update the state needed to check the soft limit as - * a side effect. - * - * Return value: non-zero if the client reached the soft or the hard limit. - * Otherwise zero is returned. */ -int checkClientOutputBufferLimits(client *c) { - int soft = 0, hard = 0, class; - unsigned long used_mem = getClientOutputBufferMemoryUsage(c); - - class = getClientType(c); - /* For the purpose of output buffer limiting, primaries are handled - * like normal clients. */ +/* Returns the client type for output buffer limits calculation + * (i.e., primaries are treated as normal). */ +int getClientTypeForOutputBuffering(client *c) { + int class = getClientType(c); if (class == CLIENT_TYPE_PRIMARY) class = CLIENT_TYPE_NORMAL; + return class; +} + +/* Returns the client's output buffer soft limit in bytes. */ +unsigned long long getClientOutputBufferSoftLimit(client *c) { + int class = getClientTypeForOutputBuffering(c); + return server.client_obuf_limits[class].soft_limit_bytes; +} + +/* Returns the client's output buffer hard limit in bytes. */ +unsigned long long getClientOutputBufferHardLimit(client *c) { + int class = getClientTypeForOutputBuffering(c); /* Note that it doesn't make sense to set the replica clients output buffer * limit lower than the repl-backlog-size config (partial sync will succeed @@ -4632,13 +4635,53 @@ int checkClientOutputBufferLimits(client *c) { * Such a configuration is ignored (the size of repl-backlog-size will be used). * This doesn't have memory consumption implications since the replica client * will share the backlog buffers memory. */ - size_t hard_limit_bytes = server.client_obuf_limits[class].hard_limit_bytes; - if (class == CLIENT_TYPE_REPLICA && hard_limit_bytes && (long long)hard_limit_bytes < server.repl_backlog_size) + unsigned long long hard_limit_bytes = server.client_obuf_limits[class].hard_limit_bytes; + if (class == CLIENT_TYPE_REPLICA && + hard_limit_bytes && + hard_limit_bytes < (unsigned long long)server.repl_backlog_size) hard_limit_bytes = server.repl_backlog_size; - if (server.client_obuf_limits[class].hard_limit_bytes && used_mem >= hard_limit_bytes) hard = 1; - if (server.client_obuf_limits[class].soft_limit_bytes && - used_mem >= server.client_obuf_limits[class].soft_limit_bytes) - soft = 1; + + return hard_limit_bytes; +} + +/* This function checks if the client will exceed the soft or hard limit of the output + * buffer if it writes the incoming command. + * + * Return value: 1 if the client will exceed the limit; otherwise, 0. */ +int willClientOutputBufferExceedLimits(client *c, unsigned long long command_size) { + unsigned long long soft_limit_bytes = getClientOutputBufferSoftLimit(c); + unsigned long long hard_limit_bytes = getClientOutputBufferHardLimit(c); + unsigned long long used_mem = getClientOutputBufferMemoryUsage(c); + unsigned long long required_mem = used_mem + command_size; + + if (required_mem >= hard_limit_bytes) { + return 1; + } + if (required_mem >= soft_limit_bytes) { + time_t elapsed = server.unixtime - c->obuf_soft_limit_reached_time; + if (c->obuf_soft_limit_reached_time == 0 || + elapsed <= server.client_obuf_limits[getClientTypeForOutputBuffering(c)].soft_limit_seconds) { + return 0; + } + return 1; + } + return 0; +} + +/* The function checks if the client reached output buffer soft or hard + * limit, and also update the state needed to check the soft limit as + * a side effect. + * + * Return value: non-zero if the client reached the soft or the hard limit. + * Otherwise zero is returned. */ +int checkClientOutputBufferLimits(client *c) { + int soft = 0, hard = 0; + unsigned long used_mem = getClientOutputBufferMemoryUsage(c); + unsigned long long soft_limit_bytes = getClientOutputBufferSoftLimit(c); + unsigned long long hard_limit_bytes = getClientOutputBufferHardLimit(c); + + if (used_mem >= hard_limit_bytes) hard = 1; + if (used_mem >= soft_limit_bytes) soft = 1; /* We need to check if the soft limit is reached continuously for the * specified amount of seconds. */ @@ -4649,7 +4692,7 @@ int checkClientOutputBufferLimits(client *c) { } else { time_t elapsed = server.unixtime - c->obuf_soft_limit_reached_time; - if (elapsed <= server.client_obuf_limits[class].soft_limit_seconds) { + if (elapsed <= server.client_obuf_limits[getClientTypeForOutputBuffering(c)].soft_limit_seconds) { soft = 0; /* The client still did not reached the max number of seconds for the soft limit to be considered reached. */ diff --git a/src/networking.h b/src/networking.h new file mode 100644 index 0000000000..671ed5f978 --- /dev/null +++ b/src/networking.h @@ -0,0 +1,8 @@ +#ifndef __NETWORKING_H +#define __NETWORKING_H + +#include "server.h" + +int willClientOutputBufferExceedLimits(client *c, unsigned long long command_size); + +#endif diff --git a/src/server.c b/src/server.c index 72690acf59..b7ea39b5f8 100644 --- a/src/server.c +++ b/src/server.c @@ -44,6 +44,7 @@ #include "sds.h" #include "module.h" #include "scripting_engine.h" +#include "networking.h" #include "lua/engine_lua.h" #include "lua/debug_lua.h" #include "eval.h" @@ -2069,6 +2070,7 @@ void createSharedObjects(void) { createObject(OBJ_STRING, sdsnew("-EXECABORT Transaction discarded because of previous errors.\r\n")); shared.noreplicaserr = createObject(OBJ_STRING, sdsnew("-NOREPLICAS Not enough good replicas to write.\r\n")); shared.busykeyerr = createObject(OBJ_STRING, sdsnew("-BUSYKEY Target key name already exists.\r\n")); + shared.throttlederr = createObject(OBJ_STRING, sdsnew("-THROTTLED\r\n")); /* The shared NULL depends on the protocol version. */ shared.null[0] = NULL; @@ -4346,6 +4348,26 @@ int processCommand(client *c) { return C_OK; } + /* Don't accept the write command if write throttling is enabled, we are the primary, + * and the output buffer (for any replica) cannot store the incoming command. + * + * Note that this code does not take into account the case where the command is re-written + * before being put into the output buffer: if it is smaller than expected, then it may + * throttle erroneously; if it is larger, then it will fall-back to the behavior of client + * disconnection. + */ + if (server.write_throttling && is_write_command && server.primary_host == NULL) { + listIter li; + listNode *ln; + listRewind(server.replicas, &li); + while ((ln = listNext(&li))) { + if (willClientOutputBufferExceedLimits((client *)listNodeValue(ln), c->net_input_bytes_curr_cmd)) { + rejectCommand(c, shared.throttlederr); + return C_OK; + } + } + } + /* Exec the command */ if (c->flag.multi && c->cmd->proc != execCommand && c->cmd->proc != discardCommand && c->cmd->proc != multiCommand && c->cmd->proc != watchCommand && c->cmd->proc != quitCommand && diff --git a/src/server.h b/src/server.h index 35c6540f1c..401066afe1 100644 --- a/src/server.h +++ b/src/server.h @@ -1307,7 +1307,7 @@ struct sharedObjectsStruct { robj *ok, *err, *emptybulk, *czero, *cone, *pong, *space, *queued, *null[4], *nullarray[4], *emptymap[4], *emptyset[4], *emptyarray, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr, *outofrangeerr, *noscripterr, *loadingerr, *slowevalerr, *slowscripterr, *slowmoduleerr, *bgsaveerr, *primarydownerr, *roreplicaerr, - *execaborterr, *noautherr, *noreplicaserr, *busykeyerr, *oomerr, *plus, *messagebulk, *pmessagebulk, + *execaborterr, *noautherr, *noreplicaserr, *busykeyerr, *oomerr, *throttlederr, *plus, *messagebulk, *pmessagebulk, *subscribebulk, *unsubscribebulk, *psubscribebulk, *punsubscribebulk, *del, *unlink, *rpop, *lpop, *lpush, *rpoplpush, *lmove, *blmove, *zpopmin, *zpopmax, *emptyscan, *multi, *exec, *left, *right, *hset, *srem, *xgroup, *xclaim, *script, *replconf, *eval, *persist, *set, *pexpireat, *pexpire, *time, *pxat, *absttl, @@ -1799,6 +1799,8 @@ struct valkeyServer { invocation of the event loop. */ unsigned int max_new_conns_per_cycle; /* The maximum number of tcp connections that will be accepted during each invocation of the event loop. */ + int write_throttling; /* 1 if write throttling is enabled: prevents output buffer overflow (and + client disconnection) by throttling write commands. */ /* AOF persistence */ int aof_enabled; /* AOF configuration */ int aof_state; /* AOF_(ON|OFF|WAIT_REWRITE) */ diff --git a/tests/integration/write-throttling.tcl b/tests/integration/write-throttling.tcl new file mode 100644 index 0000000000..77fddb4f80 --- /dev/null +++ b/tests/integration/write-throttling.tcl @@ -0,0 +1,64 @@ +start_server {tags {"repl external:skip" "throttling"}} { + set replica [srv 0 client] + set replica_host [srv 0 host] + set replica_port [srv 0 port] + set replica_log [srv 0 stdout] + + start_server {} { + set primary [srv 0 client] + set primary_host [srv 0 host] + set primary_port [srv 0 port] + + test {Establish replica and primary relationship} { + $replica replicaof $primary_host $primary_port + wait_for_condition 50 1000 { + [status $replica master_link_status] == "up" + } else { + fail "Replica not replicating from primary" + } + } + + $primary config set repl-backlog-size 25000 + $primary config set client-output-buffer-limit "replica 512 256 0" + $primary config set write-throttling yes + + test {Do not throttle on command below soft limit} { + wait_for_condition 50 1000 { + [status $replica master_link_status] == "up" + } else { + fail "Replica not replicating from primary" + } + + set smallval [string repeat x 50] + catch {$primary set foo $smallval} result + + assert_match {OK} $result + } + + test {Do not throttle on command size above hard limit but below repl-backlog-size} { + wait_for_condition 50 1000 { + [status $replica master_link_status] == "up" + } else { + fail "Replica not replicating from primary" + } + + set bigval [string repeat x 600] + catch {$primary set foo $bigval} result + + assert_match {OK} $result + } + + test {Throttle on command size above hard limit and repl-backlog-size} { + wait_for_condition 50 1000 { + [status $replica master_link_status] == "up" + } else { + fail "Replica not replicating from primary" + } + + set bigval [string repeat x 26000] + catch {$primary set foo $bigval} err + + assert_match {THROTTLED*} $err + } + } +} \ No newline at end of file