Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Blocking receive support #21

Open
wants to merge 33 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
d22952d
implement blocking recv
vjabrayilov Nov 1, 2023
8516624
WIP: blocking_recv
vjabrayilov Nov 11, 2023
66c3517
implement blocking recv
vjabrayilov Nov 11, 2023
36d4400
change machnet_recv API
vjabrayilov Nov 11, 2023
d4e8d1b
remove unnecessary code parts
vjabrayilov Nov 11, 2023
d1d7929
make operation atomic
vjabrayilov Nov 13, 2023
c139ea9
msg_gen blocking recv support
vjabrayilov Nov 13, 2023
ec29287
replace if by while
vjabrayilov Nov 13, 2023
e4da3cf
test after setting inactive
vjabrayilov Nov 13, 2023
581bec6
adding log message
vjabrayilov Nov 13, 2023
54452e0
init sem 1
vjabrayilov Nov 13, 2023
56e70c0
debug
vjabrayilov Nov 13, 2023
85c4c0b
add debug for sem value
vjabrayilov Nov 13, 2023
597963d
debug
vjabrayilov Nov 13, 2023
98556bd
make sem shared
vjabrayilov Nov 13, 2023
05b3e68
make sem shared
vjabrayilov Nov 13, 2023
10339c0
make msg_gen open_loop
vjabrayilov Nov 15, 2023
27d4dbe
add log when increasing window size
vjabrayilov Nov 15, 2023
cb47c5a
add sleep before increasing window_size
vjabrayilov Nov 15, 2023
27123da
fix window size check
vjabrayilov Nov 15, 2023
d30f620
increase sleep duration
vjabrayilov Nov 15, 2023
3aa3cbe
rm logging
vjabrayilov Nov 15, 2023
2aac9f6
set sleep
vjabrayilov Nov 15, 2023
5c3f6ef
replace sleep by busy poll
vjabrayilov Nov 15, 2023
13cc7cf
send next window in 70ms
vjabrayilov Nov 15, 2023
6e0edad
add load flag
vjabrayilov Nov 15, 2023
14b5aad
add logging
vjabrayilov Nov 15, 2023
e533675
improve rate limiting
vjabrayilov Nov 15, 2023
f10a967
change measure to microseconds
vjabrayilov Nov 15, 2023
bcaf095
handle correct exit
vjabrayilov Nov 15, 2023
ebe93f9
polish
vjabrayilov Nov 15, 2023
2a6c68e
minor changes to improve code quality
vjabrayilov Nov 15, 2023
5c1b94f
increase default ring size
vjabrayilov Nov 16, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 65 additions & 48 deletions src/apps/msg_gen/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <chrono>
#include <csignal>
#include <cstdint>
#include <deque>
#include <numeric>
#include <sstream>
#include <thread>
Expand All @@ -32,6 +33,8 @@ DEFINE_bool(active_generator, false,
"When 'true' this host is generating the traffic, otherwise it is "
"bouncing.");
DEFINE_bool(verify, false, "Verify payload of received messages.");
DEFINE_uint32(blocking, 0, "Blocking receive");
DEFINE_uint32(load, 100, "kRPS load");

static volatile int g_keep_running = 1;

Expand Down Expand Up @@ -102,6 +105,7 @@ class ThreadCtx {
hdr_histogram *latency_hist;
size_t num_request_latency_samples;
std::vector<msg_latency_info_t> msg_latency_info_vec;
std::chrono::microseconds time_limit;

struct {
stats_t current;
Expand Down Expand Up @@ -171,7 +175,7 @@ void ServerLoop(void *channel_ctx) {
MachnetFlow_t rx_flow;
const ssize_t rx_size =
machnet_recv(channel_ctx, thread_ctx.rx_message.data(),
thread_ctx.rx_message.size(), &rx_flow);
thread_ctx.rx_message.size(), &rx_flow, FLAGS_blocking);
if (rx_size <= 0) continue;
stats_cur.rx_count++;
stats_cur.rx_bytes += rx_size;
Expand Down Expand Up @@ -235,75 +239,88 @@ void ClientSendOne(ThreadCtx *thread_ctx, uint64_t window_slot) {
}

// Return the window slot for which a response was received
uint64_t ClientRecvOneBlocking(ThreadCtx *thread_ctx) {
int64_t ClientRecvOne(ThreadCtx *thread_ctx) {
const auto *channel_ctx = thread_ctx->channel_ctx;

while (true) {
if (g_keep_running == 0) {
LOG(INFO) << "ClientRecvOneBlocking: Exiting.";
return 0;
}
MachnetFlow_t rx_flow;
const ssize_t rx_size =
machnet_recv(channel_ctx, thread_ctx->rx_message.data(),
thread_ctx->rx_message.size(), &rx_flow, FLAGS_blocking);
if (rx_size <= 0) return -1;

MachnetFlow_t rx_flow;
const ssize_t rx_size =
machnet_recv(channel_ctx, thread_ctx->rx_message.data(),
thread_ctx->rx_message.size(), &rx_flow);
if (rx_size <= 0) continue;
thread_ctx->stats.current.rx_count++;
thread_ctx->stats.current.rx_bytes += rx_size;

thread_ctx->stats.current.rx_count++;
thread_ctx->stats.current.rx_bytes += rx_size;

const auto *msg_hdr =
reinterpret_cast<msg_hdr_t *>(thread_ctx->rx_message.data());
if (msg_hdr->window_slot >= FLAGS_msg_window) {
LOG(ERROR) << "Received invalid window slot: " << msg_hdr->window_slot;
continue;
}
const auto *msg_hdr =
reinterpret_cast<msg_hdr_t *>(thread_ctx->rx_message.data());
if (msg_hdr->window_slot > FLAGS_msg_window) {
LOG(ERROR) << "Received invalid window slot: " << msg_hdr->window_slot;
abort();
}

const size_t latency_us =
thread_ctx->RecordRequestEnd(msg_hdr->window_slot);
VLOG(1) << "Client: Received message for window slot "
<< msg_hdr->window_slot << " in " << latency_us << " us";

if (FLAGS_verify) {
for (uint32_t i = sizeof(msg_hdr_t); i < rx_size; i++) {
if (thread_ctx->rx_message[i] != thread_ctx->message_gold[i]) {
LOG(ERROR) << "Message data mismatch at index " << i << std::hex
<< " " << static_cast<uint32_t>(thread_ctx->rx_message[i])
<< " "
<< static_cast<uint32_t>(thread_ctx->message_gold[i]);
break;
}
const size_t latency_us = thread_ctx->RecordRequestEnd(msg_hdr->window_slot);
VLOG(1) << "Client: Received message for window slot " << msg_hdr->window_slot
<< " in " << latency_us << " us";

if (FLAGS_verify) {
for (uint32_t i = sizeof(msg_hdr_t); i < rx_size; i++) {
if (thread_ctx->rx_message[i] != thread_ctx->message_gold[i]) {
LOG(ERROR) << "Message data mismatch at index " << i << std::hex << " "
<< static_cast<uint32_t>(thread_ctx->rx_message[i]) << " "
<< static_cast<uint32_t>(thread_ctx->message_gold[i]);
break;
}
}

return msg_hdr->window_slot;
}

LOG(FATAL) << "Should not reach here";
return 0;
return msg_hdr->window_slot;
}

void ClientLoop(void *channel_ctx, MachnetFlow *flow) {
ThreadCtx thread_ctx(channel_ctx, flow);
thread_ctx.time_limit = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::microseconds(1000) / (FLAGS_load));
LOG(INFO) << "Client Loop: Starting.";
LOG(INFO) << "Time limit is: " << thread_ctx.time_limit.count();

// Send a full window of messages
for (uint32_t i = 0; i < FLAGS_msg_window; i++) {
ClientSendOne(&thread_ctx, i /* window slot */);
}

while (true) {
if (g_keep_running == 0) {
LOG(INFO) << "MsgGenLoop: Exiting.";
break;
auto next = std::chrono::steady_clock::now() + thread_ctx.time_limit;
std::deque<uint32_t> backlog;

while (g_keep_running) {
auto rx_window_slot = ClientRecvOne(&thread_ctx);

if (rx_window_slot <= 0) {
// Inner loop to handle the case where no message is received
while (g_keep_running) {
rx_window_slot = ClientRecvOne(&thread_ctx);
if (rx_window_slot > 0) break;

if (std::chrono::steady_clock::now() > next) {
// Handle timeout scenario
next = std::chrono::steady_clock::now() + thread_ctx.time_limit;
auto next_window = ++FLAGS_msg_window;
thread_ctx.msg_latency_info_vec.resize(next_window);
backlog.push_back(next_window);
ClientSendOne(&thread_ctx, backlog.front());
backlog.pop_front();
}
}
}

const uint64_t rx_window_slot = ClientRecvOneBlocking(&thread_ctx);
ClientSendOne(&thread_ctx, rx_window_slot);

if (g_keep_running == 0) break;
// Check if the time limit has passed and a message is received
if (std::chrono::steady_clock::now() > next) {
ClientSendOne(&thread_ctx, backlog.front());
backlog.pop_front();
next = std::chrono::steady_clock::now() + thread_ctx.time_limit;
}
backlog.push_back(rx_window_slot);
ReportStats(&thread_ctx);
}
LOG(INFO) << "MsgGenLoop: Exiting.";

auto &stats_cur = thread_ctx.stats.current;
LOG(INFO) << "Application Statistics (TOTAL) - [TX] Sent: "
Expand Down
3 changes: 2 additions & 1 deletion src/core/drivers/shm/channel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ ShmChannel::ShmChannel(const std::string channel_name,
channel_fd_(channel_fd),
cached_buf_indices(),
cached_bufs(),
cached_buf_count(0) {}
cached_buf_count(0),
posted(0) {}

ShmChannel::~ShmChannel() {
__machnet_channel_destroy(
Expand Down
27 changes: 17 additions & 10 deletions src/core/drivers/shm/channel_bench.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@
#include <glog/logging.h>
#include <machnet.h>
#include <machnet_common.h>
#include <signal.h>
#include <ttime.h>
#include <unistd.h>
#include <utils.h>

#include <csignal>
#include <numeric>
#include <thread>
#include <utility>

DEFINE_uint32(blocking, 0, "Block on receive");

static constexpr uint8_t kStackCpuCoreId = 3;
static constexpr uint8_t kAppCpuCoreId = 5;
Expand Down Expand Up @@ -39,7 +41,7 @@ struct thread_conf {
thread_conf(std::shared_ptr<ShmChannel> ch, uint8_t core_id,
uint64_t messages_to_send, uint64_t tx_message_size,
uint64_t messages_to_receive)
: channel(ch),
: channel(std::move(ch)),
cpu_core(core_id),
messages_to_send(messages_to_send),
tx_message_size(tx_message_size),
Expand Down Expand Up @@ -107,7 +109,6 @@ void stack_loop(thread_conf *conf) {
}
continue;
}

buf = channel->MsgBufAlloc();
if (buf == nullptr) {
continue;
Expand All @@ -126,7 +127,6 @@ void stack_loop(thread_conf *conf) {
// Send the message.
ret = channel->EnqueueMessages(&buf, 1);
if (ret != 1) {
LOG(ERROR) << "Couldn't enqueue message. ret: " << ret;
channel->MsgBufFree(buf);
}
conf->messages_sent += ret;
Expand Down Expand Up @@ -159,15 +159,14 @@ void application_loop(thread_conf *conf) {
while (!g_start.load()) {
__asm__ volatile("pause" ::: "memory");
}

// Now start receiving messages.
auto start = std::chrono::high_resolution_clock::now();
while (!g_should_stop.load()) {
// RX.
MachnetFlow_t flow;

auto nbytes =
machnet_recv(channel->ctx(), rx_buffer.data(), rx_buffer.size(), &flow);
auto nbytes = machnet_recv(channel->ctx(), rx_buffer.data(),
rx_buffer.size(), &flow, FLAGS_blocking);
if (nbytes > 0) {
conf->messages_received++;
CHECK_EQ(nbytes, conf->tx_message_size);
Expand Down Expand Up @@ -284,11 +283,18 @@ void print_results(const thread_conf &stack_conf, const thread_conf &app_conf) {
1e9))
: 0.0)
<< std::endl;
if (FLAGS_blocking) {
std::cout << juggler::utils::Format(
"Stack notified Application side %d times", channel->GetPosted());
channel->ResetPosted();
}
std::cout << std::endl;
}

int main() {
int main(int argc, char *argv[]) {
google::InitGoogleLogging("channel_bench");
gflags::ParseCommandLineFlags(&argc, &argv, true);

FLAGS_logtostderr = 1;
signal(SIGINT, [](int) { g_should_stop.store(true); });

Expand All @@ -306,7 +312,8 @@ int main() {
std::vector<std::pair<uint64_t, uint64_t>> exp_config_vec;

exp_config_vec.emplace_back(kMessagesToSend, 0); // Stack -> app only
exp_config_vec.emplace_back(0, kMessagesToSend); // App -> stack only
if (!FLAGS_blocking)
exp_config_vec.emplace_back(0, kMessagesToSend); // App -> stack only
exp_config_vec.emplace_back(kMessagesToSend, kMessagesToSend); // Bi-dir

LOG(INFO) << "Running channel_bench";
Expand Down
4 changes: 2 additions & 2 deletions src/core/drivers/shm/channel_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ TEST(BasicChannelTest, ChannelEnqueue) {
rx_msghdr.msg_iov = &rx_iov;
rx_msghdr.msg_iovlen = 1;

EXPECT_EQ(machnet_recvmsg(channel->ctx(), &rx_msghdr), 1);
EXPECT_EQ(machnet_recvmsg(channel->ctx(), &rx_msghdr, NON_BLOCKING), 1);
EXPECT_EQ(rx_msghdr.msg_size, kMessageSize);
EXPECT_EQ(rx_msg, tx_msg);
}
Expand Down Expand Up @@ -396,7 +396,7 @@ TEST(ChannelFullDuplex, SendRecvMsg) {
msghdr.flags = 0;
msghdr.flow_info = {
.src_ip = 0, .dst_ip = 0, .src_port = 0, .dst_port = 0};
auto ret = machnet_recvmsg(ctx, &msghdr);
auto ret = machnet_recvmsg(ctx, &msghdr, NON_BLOCKING);
if (ret == 1) msg_rx++;

// If already sent the amount of messages needed skip.
Expand Down
8 changes: 5 additions & 3 deletions src/core/flow_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ class FlowTest : public ::testing::Test {
num_msgbufs -= msgbuf_nr;
batch.Clear();
}
CHECK_NOTNULL(head);
CHECK_NOTNULL(tail);
head->set_msg_length(data.size());
head->set_last(tail->index());
head->mark_first();
Expand Down Expand Up @@ -317,7 +319,7 @@ TEST_F(FlowTest, RXQueue_Push) {
rx_msghdr.flow_info = {0, 0, 0, 0};
rx_msghdr.msg_iov = &rx_iov;
rx_msghdr.msg_iovlen = 1;
auto ret = machnet_recvmsg(channel_->ctx(), &rx_msghdr);
auto ret = machnet_recvmsg(channel_->ctx(), &rx_msghdr, NON_BLOCKING);
EXPECT_EQ(ret, 1) << "Failed to deliver message to application";
EXPECT_EQ(tx_message, rx_message);
EXPECT_EQ(channel_->GetFreeBufCount(), channel_->GetTotalBufCount());
Expand Down Expand Up @@ -397,7 +399,7 @@ TEST_F(FlowTest, RXQueue_Push_OutOfOrder1) {
rx_msghdr.flow_info = {0, 0, 0, 0};
rx_msghdr.msg_iov = &rx_iov;
rx_msghdr.msg_iovlen = 1;
auto ret = machnet_recvmsg(channel_->ctx(), &rx_msghdr);
auto ret = machnet_recvmsg(channel_->ctx(), &rx_msghdr, NON_BLOCKING);
EXPECT_EQ(ret, 1) << "Failed to deliver message to application";
EXPECT_EQ(tx_message, rx_message);
EXPECT_EQ(channel_->GetFreeBufCount(), channel_->GetTotalBufCount());
Expand Down Expand Up @@ -496,7 +498,7 @@ TEST_F(FlowTest, RXQueue_Push_OutOfOrder2) {
rx_msghdr.flow_info = {0, 0, 0, 0};
rx_msghdr.msg_iov = &rx_iov;
rx_msghdr.msg_iovlen = 1;
auto ret = machnet_recvmsg(channel_->ctx(), &rx_msghdr);
auto ret = machnet_recvmsg(channel_->ctx(), &rx_msghdr, NON_BLOCKING);
EXPECT_EQ(ret, 1) << "Failed to deliver message to application";
EXPECT_EQ(tx_message, rx_message);
EXPECT_EQ(channel_->GetFreeBufCount(), channel_->GetTotalBufCount());
Expand Down
2 changes: 1 addition & 1 deletion src/ext/Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
CC = gcc
CFLAGS = -Wall -fPIC
LDFLAGS = -shared
LDFLAGS = -shared -pthread
LIBS = -luuid
TARGET = libmachnet_shim.so
SRCS = machnet.c
Expand Down
21 changes: 16 additions & 5 deletions src/ext/machnet.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <arpa/inet.h>
#include <errno.h>
#include <netinet/in.h>
#include <semaphore.h>
#include <sys/mman.h>
#include <sys/socket.h>
#include <sys/types.h>
Expand Down Expand Up @@ -639,22 +640,23 @@ int machnet_sendmmsg(const void *channel_ctx,
}

ssize_t machnet_recv(const void *channel_ctx, void *buf, size_t len,
MachnetFlow_t *flow) {
MachnetFlow_t *flow, uint32_t blocking) {
MachnetMsgHdr_t msghdr;
MachnetIovec_t iov;
iov.base = buf;
iov.len = len;
msghdr.msg_iov = &iov;
msghdr.msg_iovlen = 1;

const int ret = machnet_recvmsg(channel_ctx, &msghdr);
const int ret = machnet_recvmsg(channel_ctx, &msghdr, blocking);
if (ret <= 0) return ret; // No message available, or error code

*flow = msghdr.flow_info;
return msghdr.msg_size;
}

int machnet_recvmsg(const void *channel_ctx, MachnetMsgHdr_t *msghdr) {
int machnet_recvmsg(const void *channel_ctx, MachnetMsgHdr_t *msghdr,
uint32_t blocking) {
assert(channel_ctx != NULL);
assert(msghdr != NULL);
MachnetChannelCtx_t *ctx = (MachnetChannelCtx_t *)channel_ctx;
Expand All @@ -664,8 +666,17 @@ int machnet_recvmsg(const void *channel_ctx, MachnetMsgHdr_t *msghdr) {
// Deque a message from the ring.
MachnetRingSlot_t buffer_index;
uint32_t n = __machnet_channel_machnet_ring_dequeue(ctx, 1, &buffer_index);
if (n != 1) return 0; // No message available.

while (n == 0) {
if (!blocking) return 0;
__atomic_store_n(&ctx->receiver_active, 0, __ATOMIC_SEQ_CST);
// n = __machnet_channel_machnet_ring_dequeue(ctx, 1, &buffer_index);
// if (n == 0) {
sem_wait(&ctx->sem);
n = __machnet_channel_machnet_ring_dequeue(ctx, 1, &buffer_index);
// assert(n == 1);
// }
}
assert(n == 1);
MachnetMsgBuf_t *buffer;
buffer = __machnet_channel_buf(ctx, buffer_index);
MachnetFlow_t flow_info = buffer->flow;
Expand Down
Loading