Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementation of write throttling #1672

Open
wants to merge 5 commits into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -3193,6 +3193,7 @@ standardConfig static_configs[] = {
createBoolConfig("cluster-slot-stats-enabled", NULL, MODIFIABLE_CONFIG, server.cluster_slot_stats_enabled, 0, NULL, NULL),
createBoolConfig("hide-user-data-from-log", NULL, MODIFIABLE_CONFIG, server.hide_user_data_from_log, 1, NULL, NULL),
createBoolConfig("import-mode", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG, server.import_mode, 0, NULL, NULL),
createBoolConfig("write-throttling", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG, server.write_throttling, 0, NULL, NULL),

/* String Configs */
createStringConfig("aclfile", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, server.acl_filename, "", NULL, NULL),
Expand Down
83 changes: 63 additions & 20 deletions src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -4517,34 +4517,77 @@ char *getClientTypeName(int class) {
}
}

/* The function checks if the client reached output buffer soft or hard
* limit, and also update the state needed to check the soft limit as
* a side effect.
*
* Return value: non-zero if the client reached the soft or the hard limit.
* Otherwise zero is returned. */
int checkClientOutputBufferLimits(client *c) {
int soft = 0, hard = 0, class;
unsigned long used_mem = getClientOutputBufferMemoryUsage(c);

class = getClientType(c);
/* For the purpose of output buffer limiting, primaries are handled
* like normal clients. */
/* Returns the client type for output buffer limits calculation
* (i.e., primaries are treated as normal). */
int getClientTypeForOutputBuffering(client *c) {
int class = getClientType(c);
if (class == CLIENT_TYPE_PRIMARY) class = CLIENT_TYPE_NORMAL;
return class;
}

/* Returns the client's output buffer soft limit in bytes. */
unsigned long long getClientOutputBufferSoftLimit(client *c) {
int class = getClientTypeForOutputBuffering(c);
return server.client_obuf_limits[class].soft_limit_bytes;
}

/* Returns the client's output buffer hard limit in bytes. */
unsigned long long getClientOutputBufferHardLimit(client *c) {
int class = getClientTypeForOutputBuffering(c);

/* Note that it doesn't make sense to set the replica clients output buffer
* limit lower than the repl-backlog-size config (partial sync will succeed
* and then replica will get disconnected).
* Such a configuration is ignored (the size of repl-backlog-size will be used).
* This doesn't have memory consumption implications since the replica client
* will share the backlog buffers memory. */
size_t hard_limit_bytes = server.client_obuf_limits[class].hard_limit_bytes;
if (class == CLIENT_TYPE_REPLICA && hard_limit_bytes && (long long)hard_limit_bytes < server.repl_backlog_size)
unsigned long long hard_limit_bytes = server.client_obuf_limits[class].hard_limit_bytes;
if (class == CLIENT_TYPE_REPLICA &&
hard_limit_bytes &&
hard_limit_bytes < (unsigned long long)server.repl_backlog_size)
hard_limit_bytes = server.repl_backlog_size;
if (server.client_obuf_limits[class].hard_limit_bytes && used_mem >= hard_limit_bytes) hard = 1;
if (server.client_obuf_limits[class].soft_limit_bytes &&
used_mem >= server.client_obuf_limits[class].soft_limit_bytes)
soft = 1;

return hard_limit_bytes;
}

/* This function checks if the client will exceed the soft or hard limit of the output
* buffer if it writes the incoming command.
*
* Return value: 1 if the client will exceed the limit; otherwise, 0. */
int willClientOutputBufferExceedLimits(client *c, unsigned long long command_size) {
unsigned long long soft_limit_bytes = getClientOutputBufferSoftLimit(c);
unsigned long long hard_limit_bytes = getClientOutputBufferHardLimit(c);
unsigned long long used_mem = getClientOutputBufferMemoryUsage(c);
unsigned long long required_mem = used_mem + command_size;

if (required_mem >= hard_limit_bytes) {
return 1;
}
if (required_mem >= soft_limit_bytes) {
time_t elapsed = server.unixtime - c->obuf_soft_limit_reached_time;
if (c->obuf_soft_limit_reached_time == 0 ||
elapsed <= server.client_obuf_limits[getClientTypeForOutputBuffering(c)].soft_limit_seconds) {
return 0;
}
return 1;
}
return 0;
}

/* The function checks if the client reached output buffer soft or hard
* limit, and also update the state needed to check the soft limit as
* a side effect.
*
* Return value: non-zero if the client reached the soft or the hard limit.
* Otherwise zero is returned. */
int checkClientOutputBufferLimits(client *c) {
int soft = 0, hard = 0;
unsigned long used_mem = getClientOutputBufferMemoryUsage(c);
unsigned long long soft_limit_bytes = getClientOutputBufferSoftLimit(c);
unsigned long long hard_limit_bytes = getClientOutputBufferHardLimit(c);

if (used_mem >= hard_limit_bytes) hard = 1;
if (used_mem >= soft_limit_bytes) soft = 1;

/* We need to check if the soft limit is reached continuously for the
* specified amount of seconds. */
Expand All @@ -4555,7 +4598,7 @@ int checkClientOutputBufferLimits(client *c) {
} else {
time_t elapsed = server.unixtime - c->obuf_soft_limit_reached_time;

if (elapsed <= server.client_obuf_limits[class].soft_limit_seconds) {
if (elapsed <= server.client_obuf_limits[getClientTypeForOutputBuffering(c)].soft_limit_seconds) {
soft = 0; /* The client still did not reached the max number of
seconds for the soft limit to be considered
reached. */
Expand Down
8 changes: 8 additions & 0 deletions src/networking.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#ifndef __NETWORKING_H
#define __NETWORKING_H

#include "server.h"

int willClientOutputBufferExceedLimits(client *c, unsigned long long command_size);

#endif
22 changes: 22 additions & 0 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
#include "sds.h"
#include "module.h"
#include "scripting_engine.h"
#include "networking.h"

#include <time.h>
#include <signal.h>
Expand Down Expand Up @@ -2059,6 +2060,7 @@ void createSharedObjects(void) {
createObject(OBJ_STRING, sdsnew("-EXECABORT Transaction discarded because of previous errors.\r\n"));
shared.noreplicaserr = createObject(OBJ_STRING, sdsnew("-NOREPLICAS Not enough good replicas to write.\r\n"));
shared.busykeyerr = createObject(OBJ_STRING, sdsnew("-BUSYKEY Target key name already exists.\r\n"));
shared.throttlederr = createObject(OBJ_STRING, sdsnew("-THROTTLED\r\n"));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sending a generic throttled error seems risky to let each client build their own retry logic. Either we need to send out an estimated replication duration as retry-after time or maintain a backoff period for each client and reset it once the replication backlog gets cleared.

Copy link
Member

@xbasel xbasel Feb 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or block the client with a configurable timeout, then either resume writing if COB pressure subsides or return an error.


/* The shared NULL depends on the protocol version. */
shared.null[0] = NULL;
Expand Down Expand Up @@ -4324,6 +4326,26 @@ int processCommand(client *c) {
return C_OK;
}

/* Don't accept the write command if write throttling is enabled, we are the primary,
* and the output buffer (for any replica) cannot store the incoming command.
*
* Note that this code does not take into account the case where the command is re-written
* before being put into the output buffer: if it is smaller than expected, then it may
* throttle erroneously; if it is larger, then it will fall-back to the behavior of client
* disconnection.
*/
if (server.write_throttling && is_write_command && server.primary_host == NULL) {
listIter li;
listNode *ln;
listRewind(server.replicas, &li);
while ((ln = listNext(&li))) {
if (willClientOutputBufferExceedLimits((client *)listNodeValue(ln), c->net_input_bytes_curr_cmd)) {
rejectCommand(c, shared.throttlederr);
return C_OK;
}
}
}

/* Exec the command */
if (c->flag.multi && c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
c->cmd->proc != multiCommand && c->cmd->proc != watchCommand && c->cmd->proc != quitCommand &&
Expand Down
4 changes: 3 additions & 1 deletion src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -1308,7 +1308,7 @@ struct sharedObjectsStruct {
robj *ok, *err, *emptybulk, *czero, *cone, *pong, *space, *queued, *null[4], *nullarray[4], *emptymap[4],
*emptyset[4], *emptyarray, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr, *outofrangeerr, *noscripterr,
*loadingerr, *slowevalerr, *slowscripterr, *slowmoduleerr, *bgsaveerr, *primarydownerr, *roreplicaerr,
*execaborterr, *noautherr, *noreplicaserr, *busykeyerr, *oomerr, *plus, *messagebulk, *pmessagebulk,
*execaborterr, *noautherr, *noreplicaserr, *busykeyerr, *oomerr, *throttlederr, *plus, *messagebulk, *pmessagebulk,
*subscribebulk, *unsubscribebulk, *psubscribebulk, *punsubscribebulk, *del, *unlink, *rpop, *lpop, *lpush,
*rpoplpush, *lmove, *blmove, *zpopmin, *zpopmax, *emptyscan, *multi, *exec, *left, *right, *hset, *srem,
*xgroup, *xclaim, *script, *replconf, *eval, *persist, *set, *pexpireat, *pexpire, *time, *pxat, *absttl,
Expand Down Expand Up @@ -1800,6 +1800,8 @@ struct valkeyServer {
invocation of the event loop. */
unsigned int max_new_conns_per_cycle; /* The maximum number of tcp connections that will be accepted during each
invocation of the event loop. */
int write_throttling; /* 1 if write throttling is enabled: prevents output buffer overflow (and
client disconnection) by throttling write commands. */
/* AOF persistence */
int aof_enabled; /* AOF configuration */
int aof_state; /* AOF_(ON|OFF|WAIT_REWRITE) */
Expand Down
64 changes: 64 additions & 0 deletions tests/integration/write-throttling.tcl
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
start_server {tags {"repl external:skip" "throttling"}} {
set replica [srv 0 client]
set replica_host [srv 0 host]
set replica_port [srv 0 port]
set replica_log [srv 0 stdout]

start_server {} {
set primary [srv 0 client]
set primary_host [srv 0 host]
set primary_port [srv 0 port]

test {Establish replica and primary relationship} {
$replica replicaof $primary_host $primary_port
wait_for_condition 50 1000 {
[status $replica master_link_status] == "up"
} else {
fail "Replica not replicating from primary"
}
}

$primary config set repl-backlog-size 25000
$primary config set client-output-buffer-limit "replica 512 256 0"
$primary config set write-throttling yes

test {Do not throttle on command below soft limit} {
wait_for_condition 50 1000 {
[status $replica master_link_status] == "up"
} else {
fail "Replica not replicating from primary"
}

set smallval [string repeat x 50]
catch {$primary set foo $smallval} result

assert_match {OK} $result
}

test {Do not throttle on command size above hard limit but below repl-backlog-size} {
wait_for_condition 50 1000 {
[status $replica master_link_status] == "up"
} else {
fail "Replica not replicating from primary"
}

set bigval [string repeat x 600]
catch {$primary set foo $bigval} result

assert_match {OK} $result
}

test {Throttle on command size above hard limit and repl-backlog-size} {
wait_for_condition 50 1000 {
[status $replica master_link_status] == "up"
} else {
fail "Replica not replicating from primary"
}

set bigval [string repeat x 26000]
catch {$primary set foo $bigval} err

assert_match {THROTTLED*} $err
}
}
}
Loading