Skip to content

Commit

Permalink
addressed PR comments
Browse files Browse the repository at this point in the history
Signed-off-by: Alexander Shabanov <[email protected]>
  • Loading branch information
alexander-shabanov committed Dec 19, 2024
1 parent ae57ba2 commit ac7e1f5
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 70 deletions.
105 changes: 45 additions & 60 deletions src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -40,19 +40,18 @@
#include <sys/uio.h>
#include <math.h>
#include <ctype.h>
#include <stdbool.h>
#include <stdatomic.h>

typedef enum {
CLIENT_REPLY_PAYLOAD_DATA = 0,
CLIENT_REPLY_PAYLOAD_BULK_OFFLOAD,
} clientReplyPayloadType;
CLIENT_REPLY_PLAIN = 0,
CLIENT_REPLY_BULK_OFFLOAD,
} clientReplyType;

/* Reply payload header */
typedef struct __attribute__((__packed__)) payloadHeader {
typedef struct payloadHeader {
size_t len; /* payload length in a reply buffer */
size_t actual_len; /* actual reply length after offload expanding */
uint8_t type; /* one of clientReplyPayloadType */
uint8_t type; /* one of clientReplyType */
int16_t slot; /* to report network-bytes-out for offloads */

} payloadHeader;
Expand Down Expand Up @@ -141,17 +140,17 @@ static inline int isReplicaReadyForReplData(client *replica) {
* Reply offload can be allowed only for regular Valkey clients
* that use _writeToClient handler to write replies to client connection
*/
static bool isReplyOffloadAllowable(client *c) {
static int isReplyOffloadAllowable(client *c) {
if (c->flag.fake) {
return false;
return 0;
}

switch (getClientType(c)) {
case CLIENT_TYPE_NORMAL:
case CLIENT_TYPE_PUBSUB:
return true;
return 1;
default:
return false;
return 0;
}
}

Expand Down Expand Up @@ -448,7 +447,7 @@ static inline int updatePayloadHeader(payloadHeader *last_header, uint8_t type,

static size_t upsertPayloadHeader(char *buf, size_t *bufpos, payloadHeader **last_header, uint8_t type, size_t len, int slot, size_t available) {
/* Enforce min len for offloads as whole pointers must be written to the buffer */
size_t min_len = (type == CLIENT_REPLY_PAYLOAD_BULK_OFFLOAD ? len : 1);
size_t min_len = (type == CLIENT_REPLY_BULK_OFFLOAD ? len : 1);
if (min_len > available) return 0;
size_t reply_len = min(available, len);

Expand All @@ -460,7 +459,7 @@ static size_t upsertPayloadHeader(char *buf, size_t *bufpos, payloadHeader **las
if (updatePayloadHeader(*last_header, type, reply_len, slot) == C_OK) return reply_len;
}

/* Recheck min len condition and recalcuate allowed len with a new header to be added */
/* Recheck min len condition and recalculate allowed len with a new header to be added */
if (sizeof(payloadHeader) + min_len > available) return 0;
available -= sizeof(payloadHeader);
if (len > available) reply_len = available;
Expand Down Expand Up @@ -497,11 +496,11 @@ static size_t _addReplyPayloadToBuffer(client *c, const void *payload, size_t le
}

size_t _addReplyToBuffer(client *c, const char *s, size_t len) {
return _addReplyPayloadToBuffer(c, s, len, CLIENT_REPLY_PAYLOAD_DATA);
return _addReplyPayloadToBuffer(c, s, len, CLIENT_REPLY_PLAIN);
}

size_t _addBulkOffloadToBuffer(client *c, robj *obj) {
return _addReplyPayloadToBuffer(c, &obj, sizeof(void*), CLIENT_REPLY_PAYLOAD_BULK_OFFLOAD);
return _addReplyPayloadToBuffer(c, &obj, sizeof(void*), CLIENT_REPLY_BULK_OFFLOAD);
}

/* Adds the reply to the reply linked list.
Expand Down Expand Up @@ -554,11 +553,11 @@ static void _addReplyPayloadToList(client *c, list *reply_list, const char *payl
}

void _addReplyProtoToList(client *c, list *reply_list, const char *s, size_t len) {
_addReplyPayloadToList(c, reply_list, s, len, CLIENT_REPLY_PAYLOAD_DATA);
_addReplyPayloadToList(c, reply_list, s, len, CLIENT_REPLY_PLAIN);
}

void _addBulkOffloadToList(client *c, robj *obj) {
_addReplyPayloadToList(c, c->reply, (char*) &obj, sizeof(void*), CLIENT_REPLY_PAYLOAD_BULK_OFFLOAD);
_addReplyPayloadToList(c, c->reply, (char*) &obj, sizeof(void*), CLIENT_REPLY_BULK_OFFLOAD);
}

/* The subscribe / unsubscribe command family has a push as a reply,
Expand Down Expand Up @@ -1000,7 +999,7 @@ void setDeferredReply(client *c, void *node, const char *s, size_t length) {
buf->used = 0;
buf->last_header = 0;
if (c->flag.reply_offload) {
upsertPayloadHeader(buf->buf, &buf->used, &buf->last_header, CLIENT_REPLY_PAYLOAD_DATA, length, c->slot, buf->size);
upsertPayloadHeader(buf->buf, &buf->used, &buf->last_header, CLIENT_REPLY_PLAIN, length, c->slot, buf->size);
}
memcpy(buf->buf + buf->used, s, length);
buf->used += length;
Expand Down Expand Up @@ -2251,10 +2250,11 @@ static void addPlainBufferToReplyIOV(char *buf, size_t buf_len, replyIOV *reply,
return;
}

/* Aggregate data len from the beginning of the buffer even though
* part of the data should be skipped in this round due to last_written_len */
/* Aggregate data length from the beginning of the buffer even though
* part of the data can be skipped in this writevToClient invocation due to last_written_len */
metadata->data_len += buf_len;

/* Skip data written in the previous writevToClient invocation(s) */
if (reply->last_written_len >= buf_len) {
reply->last_written_len -= buf_len;
return;
Expand All @@ -2268,10 +2268,10 @@ static void addPlainBufferToReplyIOV(char *buf, size_t buf_len, replyIOV *reply,
}

static void addOffloadedBulkToReplyIOV(char *buf, size_t buf_len, replyIOV *reply, bufWriteMetadata *metadata) {
robj **objv = (robj **)buf;
while (buf_len > 0 && !reply->limit_reached) {
robj **obj = (robj **)buf;
char *str = (*obj)->ptr;
size_t str_len = stringObjectLen(*obj);
char *str = (*objv)->ptr;
size_t str_len = stringObjectLen(*objv);

char* prefix = reply->prefixes[reply->prfxcnt];
prefix[0] = '$';
Expand All @@ -2281,12 +2281,12 @@ static void addOffloadedBulkToReplyIOV(char *buf, size_t buf_len, replyIOV *repl

int cnt = reply->cnt;
addPlainBufferToReplyIOV(reply->prefixes[reply->prfxcnt], num_len + 3, reply, metadata);
/* Increment prfxcnt only if prefix was added to reply in this round */
/* Increment prfxcnt only if prefix was added to reply in this writevToClient invocation */
if (reply->cnt > cnt) reply->prfxcnt++;
addPlainBufferToReplyIOV(str, str_len, reply, metadata);
addPlainBufferToReplyIOV(reply->crlf, 2, reply, metadata);

buf += sizeof(void*);
objv++;
buf_len -= sizeof(void*);
}
}
Expand All @@ -2296,10 +2296,10 @@ static void addCompoundBufferToReplyIOV(char *buf, size_t bufpos, replyIOV *repl
while (ptr < buf + bufpos && !reply->limit_reached) {
payloadHeader *header = (payloadHeader*)ptr;
ptr += sizeof(payloadHeader);
if (header->type == CLIENT_REPLY_PAYLOAD_DATA) {
if (header->type == CLIENT_REPLY_PLAIN) {
addPlainBufferToReplyIOV(ptr, header->len, reply, metadata);
} else {
serverAssert(header->type == CLIENT_REPLY_PAYLOAD_BULK_OFFLOAD);
serverAssert(header->type == CLIENT_REPLY_BULK_OFFLOAD);
uint64_t data_len = metadata->data_len;
addOffloadedBulkToReplyIOV(ptr, header->len, reply, metadata);
/* Store actual reply len for cluster slot stats */
Expand All @@ -2325,6 +2325,19 @@ static void addBufferToReplyIOV(char *buf, size_t bufpos, replyIOV *reply, bufWr
metadata->bufpos = bufpos;
}

/*
* This function calculates and stores on the client next:
* io_last_written_buf - Last buffer that has been written to the client connection
* io_last_written_bufpos - The buffer has been written until this position
* io_last_written_data_len - The actual length of the data written from this buffer
* This length differs from written bufpos in case of reply offload
*
* The io_last_written_buf and io_last_written_bufpos are used by _postWriteToClient
* to detect last client reply buffer that can be released
*
* The io_last_written_data_len is used by writevToClient for resuming write from the point
* where previous writevToClient invocation stopped
**/
static void saveLastWrittenBuf(client *c, bufWriteMetadata *metadata, int bufcnt, size_t totlen, size_t totwritten) {
int last = bufcnt - 1;
if (totwritten == totlen) {
Expand Down Expand Up @@ -2363,35 +2376,7 @@ void proceedToUnwritten(replyIOV *reply, int nwritten) {
* it gathers the scattered buffers from reply list and sends them away with connWritev.
* If we write successfully, it returns C_OK, otherwise, C_ERR is returned.
* Sets the c->nwritten to the number of bytes the server wrote to the client.
* Can be called from the main thread or an I/O thread
*
* INTERNALS
* The writevToClient strives to write all client reply buffers to the client connection.
* However, it may encounter NET_MAX_WRITES_PER_EVENT or IOV_MAX or socket limit. In such case,
* some client reply buffers will be written completely and some partially.
* In next invocation writevToClient should resume from the exact position where it stopped.
* Also writevToClient should communicate to _postWriteToClient which buffers written completely
* and can be released. It is intricate in case of reply offloading as length of reply buffer does not match
* to network bytes out.
*
* For this purpose, writevToClient uses 3 data members on the client struct as input/output paramaters:
* io_last_written_buf - Last buffer that has been written to the client connection
* io_last_written_bufpos - The buffer has been written until this position
* io_last_written_data_len - The actual length of the data written from this buffer
* This length differs from written bufpos in case of reply offload
*
* The writevToClient uses addBufferToReplyIOV, addCompoundBufferToReplyIOV, addOffloadedBulkToReplyIOV, addPlainBufferToReplyIOV
* to build reply iovec array. These functions know to skip io_last_written_data_len, specifically addPlainBufferToReplyIOV
*
* In the end of execution writevToClient calls saveLastWrittenBuf for calculating "last written" buf/pos/data_len
* and storing on the client. While building reply iov, writevToClient gathers auxiliary bufWriteMetadata that
* helps in this calculation. In some cases, It may take several (> 2) invocations for writevToClient to write reply
* from a single buffer but saveLastWrittenBuf knows to calculate "last written" buf/pos/data_len properly
*
* The _postWriteToClient uses io_last_written_buf and io_last_written_bufpos in order to detect completely written buffers
* and release them
*
* */
* Can be called from the main thread or an I/O thread */
static int writevToClient(client *c) {
int iovmax = min(IOV_MAX, c->conn->iovcnt);
struct iovec iov_arr[iovmax];
Expand Down Expand Up @@ -2539,15 +2524,15 @@ static void releaseBufOffloads(char *buf, size_t bufpos) {
payloadHeader *header = (payloadHeader *)ptr;
ptr += sizeof(payloadHeader);

if (header->type == CLIENT_REPLY_PAYLOAD_BULK_OFFLOAD) {
if (header->type == CLIENT_REPLY_BULK_OFFLOAD) {
clusterSlotStatsAddNetworkBytesOutForSlot(header->slot, header->actual_len);

robj** obj_ptr = (robj**)ptr;
robj **objv = (robj **)ptr;
size_t len = header->len;
while (len > 0) {
decrRefCount(*obj_ptr);
obj_ptr++;
len -= sizeof(obj_ptr);
decrRefCount(*objv);
objv++;
len -= sizeof(void*);
}
}

Expand Down
23 changes: 13 additions & 10 deletions src/unit/test_networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ int test_addRepliesWithOffloadsToBuffer(int argc, char **argv, int flags) {
TEST_ASSERT(c->bufpos == sizeof(payloadHeader) + sizeof(void*));

payloadHeader *header1 = c->last_header;
TEST_ASSERT(header1->type == CLIENT_REPLY_PAYLOAD_BULK_OFFLOAD);
TEST_ASSERT(header1->type == CLIENT_REPLY_BULK_OFFLOAD);
TEST_ASSERT(header1->len == sizeof(void*));

robj **ptr = (robj **)(c->buf + sizeof(payloadHeader));
Expand All @@ -174,7 +174,7 @@ int test_addRepliesWithOffloadsToBuffer(int argc, char **argv, int flags) {
_addBulkOffloadToBufferOrList(c, obj2);

TEST_ASSERT(c->bufpos == sizeof(payloadHeader) + 2 * sizeof(void*));
TEST_ASSERT(header1->type == CLIENT_REPLY_PAYLOAD_BULK_OFFLOAD);
TEST_ASSERT(header1->type == CLIENT_REPLY_BULK_OFFLOAD);
TEST_ASSERT(header1->len == 2 * sizeof(void*));

ptr = (robj **)(c->buf + sizeof(payloadHeader) + sizeof(void*));
Expand All @@ -186,23 +186,23 @@ int test_addRepliesWithOffloadsToBuffer(int argc, char **argv, int flags) {
_addReplyToBufferOrList(c, plain, plain_len);

TEST_ASSERT(c->bufpos == 2 * sizeof(payloadHeader) + 2 * sizeof(void*) + plain_len);
TEST_ASSERT(header1->type == CLIENT_REPLY_PAYLOAD_BULK_OFFLOAD);
TEST_ASSERT(header1->type == CLIENT_REPLY_BULK_OFFLOAD);
TEST_ASSERT(header1->len == 2 * sizeof(void*));
payloadHeader *header2 = c->last_header;
TEST_ASSERT(header2->type == CLIENT_REPLY_PAYLOAD_DATA);
TEST_ASSERT(header2->type == CLIENT_REPLY_PLAIN);
TEST_ASSERT(header2->len == plain_len);

for (int i = 0; i < 9; ++i) _addReplyToBufferOrList(c, plain, plain_len);
TEST_ASSERT(c->bufpos == 2 * sizeof(payloadHeader) + 2 * sizeof(void*) + 10 * plain_len);
TEST_ASSERT(header2->type == CLIENT_REPLY_PAYLOAD_DATA);
TEST_ASSERT(header2->type == CLIENT_REPLY_PLAIN);
TEST_ASSERT(header2->len == plain_len * 10);

/* Test 3: Add one more bulk offload to the buffer */
_addBulkOffloadToBufferOrList(c, obj);
TEST_ASSERT(obj->refcount == 3);
TEST_ASSERT(c->bufpos == 3 * sizeof(payloadHeader) + 3 * sizeof(void*) + 10 * plain_len);
payloadHeader *header3 = c->last_header;
TEST_ASSERT(header3->type == CLIENT_REPLY_PAYLOAD_BULK_OFFLOAD);
TEST_ASSERT(header3->type == CLIENT_REPLY_BULK_OFFLOAD);
ptr = (robj **)((char*)c->last_header + sizeof(payloadHeader));
TEST_ASSERT(obj == *ptr);

Expand Down Expand Up @@ -250,7 +250,7 @@ int test_addRepliesWithOffloadsToList(int argc, char **argv, int flags) {

TEST_ASSERT(blk->used == sizeof(payloadHeader) + sizeof(void*));
payloadHeader *header1 = blk->last_header;
TEST_ASSERT(header1->type == CLIENT_REPLY_PAYLOAD_BULK_OFFLOAD);
TEST_ASSERT(header1->type == CLIENT_REPLY_BULK_OFFLOAD);
TEST_ASSERT(header1->len == sizeof(void*));

robj **ptr = (robj **)(blk->buf + sizeof(payloadHeader));
Expand All @@ -261,7 +261,7 @@ int test_addRepliesWithOffloadsToList(int argc, char **argv, int flags) {
TEST_ASSERT(obj->refcount == 3);
TEST_ASSERT(listLength(c->reply) == 1);
TEST_ASSERT(blk->used == sizeof(payloadHeader) + 2 * sizeof(void*));
TEST_ASSERT(header1->type == CLIENT_REPLY_PAYLOAD_BULK_OFFLOAD);
TEST_ASSERT(header1->type == CLIENT_REPLY_BULK_OFFLOAD);
TEST_ASSERT(header1->len == 2 * sizeof(void*));

/* Test 3: Add plain replies to cause reply list grow */
Expand All @@ -277,13 +277,15 @@ int test_addRepliesWithOffloadsToList(int argc, char **argv, int flags) {
clientReplyBlock *blk2 = listNodeValue(next);
/* last header in 2nd block */
payloadHeader *header3 = blk2->last_header;
TEST_ASSERT(header2->type == CLIENT_REPLY_PAYLOAD_DATA && header3->type == CLIENT_REPLY_PAYLOAD_DATA);
TEST_ASSERT(header2->type == CLIENT_REPLY_PLAIN && header3->type == CLIENT_REPLY_PLAIN);
TEST_ASSERT((header2->len + header3->len) % reply_len == 0);

decrRefCount(obj);
decrRefCount(obj);
decrRefCount(obj);

zfree(reply);

freeReplyOffloadClient(c);

return 0;
Expand Down Expand Up @@ -366,4 +368,5 @@ int test_addBufferToReplyIOV(int argc, char **argv, int flags) {
freeReplyOffloadClient(c);

return 0;
}
}

0 comments on commit ac7e1f5

Please sign in to comment.