Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Offload replication writes to IO threads #1485

Open
wants to merge 3 commits into
base: unstable
Choose a base branch
from

Conversation

uriyage
Copy link
Contributor

@uriyage uriyage commented Dec 24, 2024

This PR offloads the write to replica clients to IO threads.

Main Changes

  • Replica writes will be offloaded but only after the replica is in online mode..
  • Replica reads will still be done in the main thread to reduce complexity and because read traffic from replicas is negligible.

Implementation Details

In order to offload the writes, writeToReplica has been split into 2 parts:

  1. The write itself made by the IO thread or by the main thread
  2. The post write where we update the replication buffers refcount will be done in the main-thread after the write-job is done in the IO thread (similar to what we do with a regular client)

Additional Changes

  • In writeToReplica we now use writev in case more than 1 buffer exists.
  • Changed client nwritten field to ssize_t since with a replica the nwritten can theoretically exceed int size (not subject to NET_MAX_WRITES_PER_EVENT limit).
  • Changed parsing code to use memchr instead of strchr:
    • During parsing command, ASAN got stuck for unknown reason when called to strchr to look for the next \r
    • Adding assert for null-terminated querybuf didn't resolve the issue.
    • Switched to memchr as it's more secure and resolves the issue

Testing

  • Added integration tests
  • Added unit tests

Related issue: #761

@uriyage uriyage force-pushed the replication-offload branch from 3aee49b to 846c816 Compare December 24, 2024 07:24
Copy link

codecov bot commented Dec 24, 2024

Codecov Report

Attention: Patch coverage is 76.00000% with 24 lines in your changes missing coverage. Please review.

Project coverage is 70.85%. Comparing base (980a801) to head (846c816).
Report is 19 commits behind head on unstable.

Files with missing lines Patch % Lines
src/io_threads.c 0.00% 14 Missing ⚠️
src/networking.c 88.23% 10 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##           unstable    #1485      +/-   ##
============================================
+ Coverage     70.78%   70.85%   +0.07%     
============================================
  Files           119      119              
  Lines         64691    64928     +237     
============================================
+ Hits          45790    46005     +215     
- Misses        18901    18923      +22     
Files with missing lines Coverage Δ
src/replication.c 87.54% <100.00%> (+0.03%) ⬆️
src/server.h 100.00% <ø> (ø)
src/networking.c 88.39% <88.23%> (+0.09%) ⬆️
src/io_threads.c 6.82% <0.00%> (-0.70%) ⬇️

... and 25 files with indirect coverage changes

Comment on lines +2044 to +2057
listNode *last_node;
size_t bufpos;

serverAssert(c->bufpos == 0 && listLength(c->reply) == 0);
while (clientHasPendingReplies(c)) {
replBufBlock *o = listNodeValue(c->ref_repl_buf_node);
serverAssert(o->used >= c->ref_block_pos);

/* Send current block if it is not fully sent. */
if (o->used > c->ref_block_pos) {
nwritten = connWrite(c->conn, o->buf + c->ref_block_pos, o->used - c->ref_block_pos);
if (nwritten <= 0) {
c->write_flags |= WRITE_FLAGS_WRITE_ERROR;
return;
}
c->nwritten += nwritten;
c->ref_block_pos += nwritten;
/* Determine the last block and buffer position based on thread context */
if (inMainThread()) {
last_node = listLast(server.repl_buffer_blocks);
if (!last_node) return;
bufpos = ((replBufBlock *)listNodeValue(last_node))->used;
} else {
last_node = c->io_last_reply_block;
serverAssert(last_node != NULL);
bufpos = c->io_last_bufpos;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider simplifying this code to reduce duplication and improve clarity, eg:

    listNode *last_node = inMainThread() ? listLast(server.repl_buffer_blocks) : c->io_last_reply_block;
    if (!last_node) return;

    size_t bufpos = inMainThread() ? 
        ((replBufBlock *)listNodeValue(last_node))->used :  c->io_last_bufpos;

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants