Skip to content

Commit

Permalink
Several optimizations & code clean-up written for Homa artifact evalu…
Browse files Browse the repository at this point in the history
…ation

Driver changes:
1) No more ad-hoc sync mechanisms when returning packet buf to drivers.
A new method Driver::returnPacket(char*) is introduced to handle sync for
all driver subclasses. It acquires the dispatch lock when called in a worker
thread and elides locking when called in the dispatch thread (which is the
common case).

2) Driver::release now becomes a batch release interface that should be
called once in every iteration of the transport's poll method (as opposed
to once for each received packet).

3) RX zero-copy is removed due to its complexity. It was a dirty hack to
allow Homa to run W3 under 80% load. It's no longer necessary.

4) DpdkDriver's RX path is simplified quite a bit. For instance, an incorrect
prefetch is removed (it was prefetching a cache-line that we immediately
accessed a few lines later).

Basic & HomaTransport changes:
 * Remove RX zero-copy for multi-packet messages (payloads are copied out
to reduce # chunks in buffer)
 * Code reorganization/refactoring to improve performance
 * Faster ScheduledMessage::senderHash computation
  • Loading branch information
yilongli committed Feb 22, 2019
1 parent 5d5c932 commit 4319853
Show file tree
Hide file tree
Showing 25 changed files with 553 additions and 646 deletions.
273 changes: 131 additions & 142 deletions src/BasicTransport.cc

Large diffs are not rendered by default.

29 changes: 10 additions & 19 deletions src/BasicTransport.h
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,8 @@ class BasicTransport : public Transport {
*/
class MessageAccumulator {
public:
MessageAccumulator(BasicTransport* t, Buffer* buffer);
MessageAccumulator(BasicTransport* t, Buffer* buffer,
uint32_t totalLength);
~MessageAccumulator();
bool addPacket(DataHeader *header, uint32_t length);
uint32_t requestRetransmission(BasicTransport *t,
Expand Down Expand Up @@ -260,9 +261,6 @@ class BasicTransport : public Transport {
ClientRpc* clientRpc;
ServerRpc* serverRpc;

/// Where to send the message.
const Driver::Address* recipient;

/// Offset within the message of the next byte we should transmit to
/// the recipient; all preceding bytes have already been sent.
uint32_t transmitOffset;
Expand All @@ -288,12 +286,10 @@ class BasicTransport : public Transport {
uint32_t unscheduledBytes;

OutgoingMessage(ClientRpc* clientRpc, ServerRpc* serverRpc,
BasicTransport* t, Buffer* buffer,
const Driver::Address* recipient)
BasicTransport* t, Buffer* buffer)
: buffer(buffer)
, clientRpc(clientRpc)
, serverRpc(serverRpc)
, recipient(recipient)
, transmitOffset(0)
, transmitLimit()
, topChoice(false)
Expand Down Expand Up @@ -351,7 +347,7 @@ class BasicTransport : public Transport {
ClientRpc(Session* session, uint64_t sequence, Buffer* request,
Buffer* response, RpcNotifier* notifier)
: session(session)
, request(this, NULL, session->t, request, session->serverAddress)
, request(this, NULL, session->t, request)
, response(response)
, notifier(notifier)
, rpcId(session->t->clientId, sequence)
Expand Down Expand Up @@ -389,6 +385,10 @@ class BasicTransport : public Transport {
/// this RPC should be removed at the server's earliest convenience.
bool cancelled;

/// Address of the client to which the RPC response will be sent.
/// Not owned by this class.
const Driver::Address* clientAddress;

/// Unique identifier for this RPC.
RpcId rpcId;

Expand Down Expand Up @@ -426,12 +426,13 @@ class BasicTransport : public Transport {
: t(transport)
, sequence(sequence)
, cancelled(false)
, clientAddress(clientAddress)
, rpcId(rpcId)
, silentIntervals(0)
, requestComplete(false)
, sendingResponse(false)
, accumulator()
, response(NULL, this, transport, &replyPayload, clientAddress)
, response(NULL, this, transport, &replyPayload)
, scheduledMessage()
, timerLinks()
, outgoingResponseLinks()
Expand Down Expand Up @@ -666,16 +667,6 @@ class BasicTransport : public Transport {
/// Maximum # bytes of message data that can fit in one packet.
CONST uint32_t maxDataPerPacket;

/// Messages smaller than or equal to this many bytes are received in
/// a zero-copy fashion in their entireties, if the underlying driver
/// permits. The larger this number is set, the more hardware packet
/// buffers we will likely retain at any given time, and the more we
/// deviate from the SRPT policy. If this number is set too large, we
/// may run out of hardware packet buffers and have to stop receiving
/// packets (even worse, we can't complete any message to free up the
/// hardware packet buffers; thus, a deadlock!).
const uint32_t messageZeroCopyThreshold;

/// Maximum # bytes of a message that we consider as small. For small
/// messages, the tryToTransmitData mechanism takes more time then just
/// transmitting the packet. In order to be efficient on workloads with
Expand Down
15 changes: 8 additions & 7 deletions src/BasicTransportTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class BasicTransportTest : public ::testing::Test {

BasicTransportTest()
: context(false)
, driver(new MockDriver(BasicTransport::headerToString))
, driver(new MockDriver(&context, BasicTransport::headerToString))
, transport(&context, NULL, driver, true, 666)
, address1("mock:node=1")
, address2("mock:node=2")
Expand Down Expand Up @@ -660,7 +660,7 @@ TEST_F(BasicTransportTest, handlePacket_dataFromServer_basics) {
EXPECT_EQ("abcde12345", TestUtil::toString(&wrapper.response));
EXPECT_EQ("", driver->outputLog);
EXPECT_EQ(0lu, transport.outgoingRpcs.size());
EXPECT_EQ(2u, Driver::Received::stealCount);
EXPECT_EQ(1u, Driver::Received::stealCount);
}
TEST_F(BasicTransportTest, handlePacket_dataFromServer_extraData) {
MockWrapper wrapper("message1");
Expand Down Expand Up @@ -987,6 +987,7 @@ TEST_F(BasicTransportTest, handlePacket_dataFromClient_extraBytes) {
TestUtil::toString(&serverRpc->requestPayload));
}
TEST_F(BasicTransportTest, handlePacket_dataFromClient_dontIssueGrant) {
transport.maxDataPerPacket = 5;
transport.roundTripBytes = 1000;
transport.grantIncrement = 500;
uint32_t unscheduledBytes = transport.roundTripBytes;
Expand Down Expand Up @@ -1300,7 +1301,7 @@ TEST_F(BasicTransportTest, addPacket_basics) {
EXPECT_EQ(0u, serverRpc->accumulator->fragments.size());
EXPECT_EQ("P0000P1111P2222P3333P4444",
TestUtil::toString(&serverRpc->requestPayload));
EXPECT_EQ(5u, Driver::Received::stealCount);
EXPECT_EQ(3u, Driver::Received::stealCount);
}
TEST_F(BasicTransportTest, addPacket_skipRedundantPacket) {
// Receive two duplicate packets that contain bytes 10-14.
Expand Down Expand Up @@ -1409,7 +1410,7 @@ TEST_F(BasicTransportTest, poll_incomingPackets) {
BasicTransport::ServerRpc* serverRpc = it->second;
EXPECT_EQ("0123456789ABCDE",
TestUtil::toString(&serverRpc->requestPayload));
EXPECT_EQ(3u, Driver::Received::stealCount);
EXPECT_EQ(1u, Driver::Received::stealCount);
EXPECT_EQ(1, result);
}
TEST_F(BasicTransportTest, poll_callCheckTimeouts) {
Expand Down Expand Up @@ -1478,8 +1479,8 @@ TEST_F(BasicTransportTest, poll_outgoingGrant) {
BasicTransport::DataHeader(BasicTransport::RpcId(100, 102), 100, 0,
unscheduledBytes, BasicTransport::FROM_CLIENT), "ABCDE");
int result = transport.poller.poll();
EXPECT_EQ("GRANT FROM_SERVER, rpcId 100.101, offset 40 | "
"GRANT FROM_SERVER, rpcId 100.102, offset 30",
EXPECT_EQ("GRANT FROM_SERVER, rpcId 100.102, offset 30 | "
"GRANT FROM_SERVER, rpcId 100.101, offset 40",
driver->outputLog);
EXPECT_EQ(1, result);
EXPECT_EQ(0u, transport.messagesToGrant.size());
Expand All @@ -1493,7 +1494,7 @@ TEST_F(BasicTransportTest, poll_outgoingPacket) {
int result = transport.poller.poll();
EXPECT_EQ("ALL_DATA FROM_CLIENT, rpcId 666.1 0123456789 (+5 more)",
driver->outputLog);
EXPECT_EQ(1, result);
EXPECT_GT(result, 1);
}
TEST_F(BasicTransportTest, checkTimeouts_clientTransmissionNotStartedYet) {
driver->transmitQueueSpace = 0;
Expand Down
20 changes: 20 additions & 0 deletions src/Buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,26 @@ class Buffer {

uint32_t peek(uint32_t offset, void** returnPtr);
void prependChunk(Chunk* chunk);

/**
* Reserve a contiguous region of storage for future use by Buffer::alloc;
* the storage is managed internally by the buffer.
*
* Note: This method must only be called when the buffer is in its pristine
* state (i.e., after calling constructor or reset()).
*
* \param numBytes
* # bytes to reserve.
*/
void
reserve(uint32_t numBytes)
{
assert((totalLength == 0) && (firstChunk == NULL));
uint32_t bytesAllocated;
firstAvailable = getNewAllocation(numBytes, &bytesAllocated);
availableLength = bytesAllocated;
}

virtual void reset();

/**
Expand Down
15 changes: 15 additions & 0 deletions src/BufferTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -975,6 +975,21 @@ TEST_F(BufferTest, getOffset) {
EXPECT_EQ('x', t2->c);
}

TEST_F(BufferTest, reserve) {
Buffer buffer;
uint32_t numBytes = 1500;
string largeStr(numBytes, 'x');
buffer.appendCopy(largeStr.c_str(), numBytes);
buffer.appendCopy(largeStr.c_str(), numBytes);
EXPECT_EQ(2u, buffer.getNumberChunks());

buffer.reset();
buffer.reserve(10000);
buffer.appendCopy(largeStr.c_str(), numBytes);
buffer.appendCopy(largeStr.c_str(), numBytes);
EXPECT_EQ(1u, buffer.getNumberChunks());
}

TEST_F(BufferTest, resetInternal_partial) {
// Construct a Buffer in a character array; this is needed so that
// the Buffer constructor doesn't get called after we do a partial
Expand Down
115 changes: 19 additions & 96 deletions src/DpdkDriver.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,6 @@ namespace {
}
}

// Short-hand to obtain a reference to the metadata storage space that we
// used to store the PacketBufType.
#define packet_buf_type(payload) *(payload - PACKETBUF_TYPE_SIZE)

// Short-hand to obtain the starting address of a DPDK rte_mbuf based on its
// payload address.
#define payload_to_mbuf(payload) reinterpret_cast<struct rte_mbuf*>( \
Expand All @@ -84,10 +80,8 @@ constexpr uint16_t DpdkDriver::PRIORITY_TO_PCP[8];
* Construct a mock DpdkDriver, used for testing only.
*/
DpdkDriver::DpdkDriver()
: context(NULL)
, packetBufPool()
: Driver(NULL)
, packetBufsUtilized(0)
, payloadsToRelease()
, locatorString()
, localMac()
, portId(0)
Expand Down Expand Up @@ -120,10 +114,8 @@ DpdkDriver::DpdkDriver()
*/

DpdkDriver::DpdkDriver(Context* context, int port)
: context(context)
, packetBufPool()
: Driver(context)
, packetBufsUtilized(0)
, payloadsToRelease()
, locatorString()
, localMac()
, portId(0)
Expand Down Expand Up @@ -289,7 +281,6 @@ DpdkDriver::DpdkDriver(Context* context, int port)
*/
DpdkDriver::~DpdkDriver()
{
releaseHint(-1);
if (packetBufsUtilized != 0)
LOG(ERROR, "DpdkDriver deleted with %d packets still in use",
packetBufsUtilized);
Expand Down Expand Up @@ -359,36 +350,12 @@ DpdkDriver::receivePackets(uint32_t maxPackets,
// Process received packets by constructing appropriate Received objects.
for (uint32_t i = 0; i < totalPkts; i++) {
struct rte_mbuf* m = mPkts[i];
rte_prefetch0(rte_pktmbuf_mtod(m, void *));
if (unlikely(m->nb_segs > 1)) {
RAMCLOUD_CLOG(WARNING,
"Can't handle packet with %u segments; discarding",
m->nb_segs);
rte_pktmbuf_free(m);
continue;
}

char* data = rte_pktmbuf_mtod(m, char*);
char* payload = data + ETHER_HDR_LEN;
uint32_t length = rte_pktmbuf_pkt_len(m) - ETHER_HDR_LEN;
struct ether_hdr* ethHdr = rte_pktmbuf_mtod(m, struct ether_hdr*);
uint16_t ether_type = ethHdr->ether_type;
uint32_t headerLength = ETHER_HDR_LEN;
char* payload = reinterpret_cast<char *>(ethHdr + 1);
if (ether_type == rte_cpu_to_be_16(ETHER_TYPE_VLAN)) {
struct vlan_hdr* vlanHdr =
reinterpret_cast<struct vlan_hdr*>(payload);
ether_type = vlanHdr->eth_proto;
headerLength += VLAN_TAG_LEN;
payload += VLAN_TAG_LEN;
}
if (!hasHardwareFilter) {
// Perform packet filtering by software to skip irrelevant
// packets such as ipmi or kernel TCP/IP traffics.
if (ether_type !=
rte_cpu_to_be_16(NetUtil::EthPayloadType::RAMCLOUD)) {
rte_pktmbuf_free(m);
continue;
}
}
PerfStats::threadStats.networkInputBytes += rte_pktmbuf_pkt_len(m);
assert(ethHdr->ether_type == rte_cpu_to_be_16(
NetUtil::EthPayloadType::RAMCLOUD));

// By default, we would like to construct the Received object using
// the payload directly (as opposed to copying out the payload to a
Expand All @@ -397,71 +364,28 @@ DpdkDriver::receivePackets(uint32_t maxPackets,
// and the packet buf type (so we know where this payload comes from).
// See http://dpdk.org/doc/guides/prog_guide/mbuf_lib.html for the
// diagram of rte_mbuf's internal structure.
assert(m->data_off <= sizeof(MacAddress));
MacAddress* sender = reinterpret_cast<MacAddress*>(m->buf_addr);
if (unlikely(reinterpret_cast<char*>(sender + 1) >
rte_pktmbuf_mtod(m, char*))) {
LOG(ERROR, "Not enough headroom in the packet mbuf; "
"dropping packet");
rte_pktmbuf_free(m);
continue;
}
new(sender) MacAddress(ethHdr->s_addr.addr_bytes);
packet_buf_type(payload) = DPDK_MBUF;
packetBufsUtilized++;
uint32_t length = rte_pktmbuf_pkt_len(m) - headerLength;
assert(length <= MAX_PAYLOAD_SIZE);
receivedPackets->emplace_back(sender, this, length, payload);
PerfStats::threadStats.networkInputBytes += rte_pktmbuf_pkt_len(m);
packetBufsUtilized++;
timeTrace("received packet processed, payload size %u", length);
}
}

// See docs in Driver class.
void
DpdkDriver::release(char *payload)
DpdkDriver::release()
{
// Must sync with the dispatch thread, since this method could potentially
// be invoked in a worker.
Dispatch::Lock _(context->dispatch);
payloadsToRelease.push_back(payload);
}

// See docs in Driver class.
void
DpdkDriver::releaseHint(int maxCount)
{
while ((maxCount != 0) && !payloadsToRelease.empty()) {
maxCount--;
char* payload = payloadsToRelease.back();
payloadsToRelease.pop_back();

packetBufsUtilized--;
assert(packetBufsUtilized >= 0);
if (packet_buf_type(payload) == DPDK_MBUF) {
rte_pktmbuf_free(payload_to_mbuf(payload));
} else {
packetBufPool.destroy(reinterpret_cast<PacketBuf*>(
payload - OFFSET_OF(PacketBuf, payload)));
}
packetBufsUtilized -= static_cast<int>(packetsToRelease.size());
assert(packetBufsUtilized >= 0);
while (!packetsToRelease.empty()) {
rte_pktmbuf_free(payload_to_mbuf(packetsToRelease.back()));
packetsToRelease.pop_back();
}
}

// See docs in Driver class.
void
DpdkDriver::releaseHwPacketBuf(Driver::Received* received)
{
struct rte_mbuf* mbuf = payload_to_mbuf(received->payload);
MacAddress* sender = reinterpret_cast<MacAddress*>(mbuf->buf_addr);
// Copy the sender address and payload to our PacketBuf.
PacketBuf* packetBuf = packetBufPool.construct();
packetBuf->sender.construct(*sender);
packet_buf_type(packetBuf->payload) = RAMCLOUD_PACKET_BUF;
rte_memcpy(packetBuf->payload, received->payload, received->len);
// Replace rte_mbuf with our PacketBuf and release it.
received->sender = packetBuf->sender.get();
received->payload = packetBuf->payload;
rte_pktmbuf_free(mbuf);
}

// See docs in Driver class.
void
DpdkDriver::sendPacket(const Address* addr,
Expand All @@ -487,12 +411,11 @@ DpdkDriver::sendPacket(const Address* addr,
struct rte_mbuf* mbuf = rte_pktmbuf_alloc(mbufPool);
#endif
if (unlikely(NULL == mbuf)) {
uint32_t numMbufsAvail = rte_mempool_avail_count(mbufPool);
uint32_t numMbufsInUse = rte_mempool_in_use_count(mbufPool);
RAMCLOUD_CLOG(WARNING,
"Failed to allocate a packet buffer; dropping packet; "
"%u mbufs available, %u mbufs in use, %lu payloads to release",
numMbufsAvail, numMbufsInUse, payloadsToRelease.size());
"%u mbufs available, %u mbufs in use",
rte_mempool_avail_count(mbufPool),
rte_mempool_in_use_count(mbufPool));
return;
}

Expand Down
Loading

0 comments on commit 4319853

Please sign in to comment.