Skip to content

Commit

Permalink
withFramedSink(): Don't use a thread to monitor the other side
Browse files Browse the repository at this point in the history
Since withFramedSink() is now used a lot more than in the past (for
every addToStore() variant), we were creating a lot of threads, e.g.

  nix flake show --no-eval-cache --all-systems github:NixOS/nix/afdd12be5e19c0001ff3297dea544301108d298

would create 46418 threads. While threads on Linux are cheap, this is
still substantial overhead.

So instead, just poll from FramedSink before every write whether there
are pending messages from the daemon. This could slightly increase the
latency on log messages from the daemon, but not on exceptions (which
were only synchronously checked from FramedSink anyway).

This speeds up the command above from 19.2s to 17.5s on my machine (a
9% speedup).
  • Loading branch information
edolstra committed Aug 19, 2024
1 parent b0a7edb commit 39daa4a
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 52 deletions.
2 changes: 1 addition & 1 deletion src/libstore/remote-store-connection.hh
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ struct RemoteStore::ConnectionHandle
RemoteStore::Connection & operator * () { return *handle; }
RemoteStore::Connection * operator -> () { return &*handle; }

void processStderr(Sink * sink = 0, Source * source = 0, bool flush = true);
void processStderr(Sink * sink = 0, Source * source = 0, bool flush = true, bool block = true);

void withFramedSink(std::function<void(Sink & sink)> fun);
};
Expand Down
42 changes: 8 additions & 34 deletions src/libstore/remote-store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,9 @@ RemoteStore::ConnectionHandle::~ConnectionHandle()
}
}

void RemoteStore::ConnectionHandle::processStderr(Sink * sink, Source * source, bool flush)
void RemoteStore::ConnectionHandle::processStderr(Sink * sink, Source * source, bool flush, bool block)
{
handle->processStderr(&daemonException, sink, source, flush);
handle->processStderr(&daemonException, sink, source, flush, block);
}


Expand Down Expand Up @@ -926,43 +926,17 @@ void RemoteStore::ConnectionHandle::withFramedSink(std::function<void(Sink & sin
{
(*this)->to.flush();

std::exception_ptr ex;

/* Handle log messages / exceptions from the remote on a separate
thread. */
std::thread stderrThread([&]()
{
try {
ReceiveInterrupts receiveInterrupts;
processStderr(nullptr, nullptr, false);
} catch (...) {
ex = std::current_exception();
}
});

Finally joinStderrThread([&]()
{
if (stderrThread.joinable()) {
stderrThread.join();
if (ex) {
try {
std::rethrow_exception(ex);
} catch (...) {
ignoreException();
}
}
}
});

{
FramedSink sink((*this)->to, ex);
FramedSink sink((*this)->to, [&]() {
/* Periodically process stderr messages and exceptions
from the daemon. */
processStderr(nullptr, nullptr, false, false);
});
fun(sink);
sink.flush();
}

stderrThread.join();
if (ex)
std::rethrow_exception(ex);
processStderr(nullptr, nullptr, false);
}

}
15 changes: 11 additions & 4 deletions src/libstore/worker-protocol-connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ static Logger::Fields readFields(Source & from)
return fields;
}

std::exception_ptr WorkerProto::BasicClientConnection::processStderrReturn(Sink * sink, Source * source, bool flush)
std::exception_ptr
WorkerProto::BasicClientConnection::processStderrReturn(Sink * sink, Source * source, bool flush, bool block)
{
if (flush)
to.flush();
Expand All @@ -41,6 +42,9 @@ std::exception_ptr WorkerProto::BasicClientConnection::processStderrReturn(Sink

while (true) {

if (!block && !from.hasData())
break;

auto msg = readNum<uint64_t>(from);

if (msg == STDERR_WRITE) {
Expand Down Expand Up @@ -95,8 +99,10 @@ std::exception_ptr WorkerProto::BasicClientConnection::processStderrReturn(Sink
logger->result(act, type, fields);
}

else if (msg == STDERR_LAST)
else if (msg == STDERR_LAST) {
assert(block);
break;
}

else
throw Error("got unknown message type %x from Nix daemon", msg);
Expand Down Expand Up @@ -130,9 +136,10 @@ std::exception_ptr WorkerProto::BasicClientConnection::processStderrReturn(Sink
}
}

void WorkerProto::BasicClientConnection::processStderr(bool * daemonException, Sink * sink, Source * source, bool flush)
void WorkerProto::BasicClientConnection::processStderr(
bool * daemonException, Sink * sink, Source * source, bool flush, bool block)
{
auto ex = processStderrReturn(sink, source, flush);
auto ex = processStderrReturn(sink, source, flush, block);
if (ex) {
*daemonException = true;
std::rethrow_exception(ex);
Expand Down
5 changes: 3 additions & 2 deletions src/libstore/worker-protocol-connection.hh
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,10 @@ struct WorkerProto::BasicClientConnection : WorkerProto::BasicConnection

virtual void closeWrite() = 0;

std::exception_ptr processStderrReturn(Sink * sink = 0, Source * source = 0, bool flush = true);
std::exception_ptr processStderrReturn(Sink * sink = 0, Source * source = 0, bool flush = true, bool block = true);

void processStderr(bool * daemonException, Sink * sink = 0, Source * source = 0, bool flush = true);
void
processStderr(bool * daemonException, Sink * sink = 0, Source * source = 0, bool flush = true, bool block = true);

/**
* Establishes connection, negotiating version.
Expand Down
20 changes: 20 additions & 0 deletions src/libutil/serialise.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
#ifdef _WIN32
# include <fileapi.h>
# include "windows-error.hh"
#else
# include <poll.h>
#endif


Expand Down Expand Up @@ -158,6 +160,24 @@ bool FdSource::good()
}


bool FdSource::hasData()
{
if (BufferedSource::hasData()) return true;

while (true) {
struct pollfd fds[1];
fds[0].fd = fd;
fds[0].events = POLLIN;
auto n = poll(fds, 1, 0);
if (n < 0) {
if (errno == EINTR) continue;
throw SysError("polling file descriptor");
}
return n == 1 && (fds[0].events & POLLIN);
}
}


size_t StringSource::read(char * data, size_t len)
{
if (pos == s.size()) throw EndOfFile("end of string reached");
Expand Down
29 changes: 18 additions & 11 deletions src/libutil/serialise.hh
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ struct BufferedSource : Source

size_t read(char * data, size_t len) override;

/**
* Return true if the buffer is not empty.
*/
bool hasData();

protected:
Expand Down Expand Up @@ -162,6 +165,13 @@ struct FdSource : BufferedSource
FdSource & operator=(FdSource && s) = default;

bool good() override;

/**
* Return true if the buffer is not empty after a non-blocking
* read.
*/
bool hasData();

protected:
size_t readUnbuffered(char * data, size_t len) override;
private:
Expand Down Expand Up @@ -522,15 +532,16 @@ struct FramedSource : Source
/**
* Write as chunks in the format expected by FramedSource.
*
* The exception_ptr reference can be used to terminate the stream when you
* detect that an error has occurred on the remote end.
* The `checkError` function can be used to terminate the stream when you
* detect that an error has occurred.
*/
struct FramedSink : nix::BufferedSink
{
BufferedSink & to;
std::exception_ptr & ex;
std::function<void()> checkError;

FramedSink(BufferedSink & to, std::exception_ptr & ex) : to(to), ex(ex)
FramedSink(BufferedSink & to, std::function<void()> && checkError)
: to(to), checkError(checkError)
{ }

~FramedSink()
Expand All @@ -545,13 +556,9 @@ struct FramedSink : nix::BufferedSink

void writeUnbuffered(std::string_view data) override
{
/* Don't send more data if the remote has
encountered an error. */
if (ex) {
auto ex2 = ex;
ex = nullptr;
std::rethrow_exception(ex2);
}
/* Don't send more data if an error has occured. */
checkError();

to << data.size();
to(data);
};
Expand Down

0 comments on commit 39daa4a

Please sign in to comment.