From 43198532f7b34a7e41291bfd3c061a8e5de4556f Mon Sep 17 00:00:00 2001 From: Yilong Li Date: Wed, 20 Feb 2019 21:41:32 -0800 Subject: [PATCH] Several optimizations & code clean-up written for Homa artifact evaluation Driver changes: 1) No more ad-hoc sync mechanisms when returning packet buf to drivers. A new method Driver::returnPacket(char*) is introduced to handle sync for all driver subclasses. It acquires the dispatch lock when called in a worker thread and elides locking when called in the dispatch thread (which is the common case). 2) Driver::release now becomes a batch release interface that should be called once in every iteration of the transport's poll method (as opposed to once for each received packet). 3) RX zero-copy is removed due to its complexity. It was a dirty hack to allow Homa to run W3 under 80% load. It's no longer necessary. 4) DpdkDriver's RX path is simplified quite a bit. For instance, an incorrect prefetch is removed (it was prefetching a cache-line that we immediately accessed a few lines later). Basic & HomaTransport changes: * Remove RX zero-copy for multi-packet messages (payloads are copied out to reduce # chunks in buffer) * Code reorganization/refactoring to improve performance * Faster ScheduledMessage::senderHash computation --- src/BasicTransport.cc | 273 +++++++++++++++++------------------ src/BasicTransport.h | 29 ++-- src/BasicTransportTest.cc | 15 +- src/Buffer.h | 20 +++ src/BufferTest.cc | 15 ++ src/DpdkDriver.cc | 115 +++------------ src/DpdkDriver.h | 33 +---- src/Driver.cc | 12 +- src/Driver.h | 110 +++++++------- src/HomaTransport.cc | 295 +++++++++++++++++++------------------- src/HomaTransport.h | 26 ++-- src/HomaTransportTest.cc | 28 ++-- src/InfUdDriver.cc | 37 +++-- src/InfUdDriver.h | 11 +- src/Infiniband.cc | 8 ++ src/Infiniband.h | 1 + src/IpAddress.cc | 10 ++ src/IpAddress.h | 1 + src/MacAddress.cc | 42 ++---- src/MacAddress.h | 23 ++- src/MacAddressTest.cc | 7 - src/MockDriver.cc | 34 +---- src/MockDriver.h | 24 +++- src/UdpDriver.cc | 25 ++-- src/UdpDriver.h | 5 +- 25 files changed, 553 insertions(+), 646 deletions(-) diff --git a/src/BasicTransport.cc b/src/BasicTransport.cc index b4eef8592..c95ed6e06 100644 --- a/src/BasicTransport.cc +++ b/src/BasicTransport.cc @@ -70,12 +70,6 @@ BasicTransport::BasicTransport(Context* context, const ServiceLocator* locator, , poller(context, this) , maxDataPerPacket(driver->getMaxPacketSize() - sizeof32(DataHeader)) - // As of 09/2017, with this value set to 100*maxDataPerPacket, we haven't - // observed any message drop due to driver packet buffer exhaustion when - // running workloads W3, W4 and W5 that are used in the HomaTransport paper - // evaluation. - , messageZeroCopyThreshold(100*maxDataPerPacket) - // As of 09/2017, we consider messages less than 300 bytes as small (which // takes at most 240 ns to transmit on a 10Gbps network). This value is // chosen experimentally so that we can run W3 in Homa paper at 80% load on @@ -153,6 +147,7 @@ BasicTransport::~BasicTransport() deleteClientRpc(clientRpc); } + driver->release(); if (driverOwner) { delete driver; } @@ -271,15 +266,17 @@ BasicTransport::getRoundTripBytes(const ServiceLocator* locator) } } } - if (gBitsPerSec == 0) { - gBitsPerSec = driver->getBandwidth() / 1000; - if (gBitsPerSec == 0) { - gBitsPerSec = 10; + + uint32_t mBitsPerSec = gBitsPerSec * 1000; + if (mBitsPerSec == 0) { + mBitsPerSec = driver->getBandwidth(); + if (mBitsPerSec == 0) { + mBitsPerSec = 10 * 1000; } } // Compute round-trip time in terms of full packets (round up). - uint32_t roundTripBytes = (roundTripMicros*gBitsPerSec*1000)/8; + uint32_t roundTripBytes = (roundTripMicros*mBitsPerSec)/8; roundTripBytes = ((roundTripBytes+maxDataPerPacket-1)/maxDataPerPacket) * maxDataPerPacket; return roundTripBytes; @@ -320,7 +317,7 @@ BasicTransport::opcodeSymbol(uint8_t opcode) { * both for requests and for responses. When a method returns, the given * range of data will have been queued for the NIC but may not actually * have been transmitted yet. - * + * * \param address * Identifies the destination for the message. * \param rpcId @@ -352,27 +349,25 @@ BasicTransport::sendBytes(const Driver::Address* address, RpcId rpcId, Buffer* message, uint32_t offset, uint32_t maxBytes, uint32_t unscheduledBytes, uint8_t flags, bool partialOK) { - uint32_t messageSize = message->size(); - uint32_t curOffset = offset; + uint32_t transmitLimit = std::min(message->size(), curOffset + maxBytes); uint32_t bytesSent = 0; - while ((curOffset < messageSize) && (bytesSent < maxBytes)) { + while (curOffset < transmitLimit) { // Don't send less-than-full-size packets except for the last packet // of the message (unless the caller explicitly requested it). - uint32_t bytesThisPacket = - std::min(maxDataPerPacket, messageSize - curOffset); - if ((bytesSent + bytesThisPacket) > maxBytes) { - if (!partialOK) { - break; - } - bytesThisPacket = maxBytes - bytesSent; + uint32_t bytesThisPacket = transmitLimit - curOffset; + if (bytesThisPacket >= maxDataPerPacket) { + bytesThisPacket = maxDataPerPacket; + } else if ((transmitLimit < message->size()) && !partialOK) { + break; } + QueueEstimator::TransmitQueueState txQueueState; - if (bytesThisPacket == messageSize) { + if (bytesThisPacket == message->size()) { // Entire message fits in a single packet. AllDataHeader header(rpcId, flags, - downCast(messageSize)); - Buffer::Iterator iter(message, 0, messageSize); + downCast(message->size())); + Buffer::Iterator iter(message); const char* fmt = (flags & FROM_CLIENT) ? "client sending ALL_DATA, clientId %u, sequence %u" : "server sending ALL_DATA, clientId %u, sequence %u"; @@ -394,8 +389,7 @@ BasicTransport::sendBytes(const Driver::Address* address, RpcId rpcId, timeTrace("sent data, %u bytes queued ahead", txQueueState.outstandingBytes); } else { - timeTrace("sent data, tx queue idle time %u cyc", - txQueueState.idleTime); + timeTrace("sent data, tx queue idle %u cyc", txQueueState.idleTime); } bytesSent += bytesThisPacket; curOffset += bytesThisPacket; @@ -427,7 +421,7 @@ BasicTransport::sendControlPacket(const Driver::Address* recipient, /** * Given a pointer to a BasicTransport packet, return a human-readable * string describing the information in its header. - * + * * \param packet * Address of the first byte of the packet header, which must be * contiguous in memory. @@ -547,10 +541,14 @@ BasicTransport::tryToTransmitData() // driver immediately. int transmitQueueSpace = driver->getTransmitQueueSpace(context->dispatch->currentTime); + if ((transmitQueueSpace <= 0) || + (outgoingRequests.empty() && outgoingResponses.empty())) { + return 0; + } // Each iteration of the following loop transmits data packets for // a single request or response. - while (transmitQueueSpace > 0) { + do { // Find an outgoing request or response that is ready to transmit. // The policy here is "shortest remaining processing time" (SRPT). // That is, choosing the message with the fewest bytes remaining @@ -660,12 +658,15 @@ BasicTransport::tryToTransmitData() RpcId rpcId = clientRpc ? clientRpc->rpcId : serverRpc->rpcId; uint8_t whoFrom = clientRpc ? FROM_CLIENT : FROM_SERVER; - uint32_t bytesSent = sendBytes(message->recipient, rpcId, - message->buffer, message->transmitOffset, maxBytes, + const Driver::Address* address = clientRpc ? + clientRpc->session->serverAddress : + serverRpc->clientAddress; + uint32_t bytesSent = sendBytes(address, rpcId, message->buffer, + message->transmitOffset, maxBytes, message->unscheduledBytes, whoFrom); if (bytesSent == 0) { - // We can't transmit any more data because the remaining queue - // space is too small. + // If this message can't be transmitted due to the queue space + // limit, neither can the next message (which is even larger). break; } @@ -700,7 +701,13 @@ BasicTransport::tryToTransmitData() // There are no messages with data that can be transmitted. break; } - } + // Exit the loop if the remaining queue space is *likely* too small for + // us to send more data. For example, if the queue space is now smaller + // than one full packet, the remaining size of the next message we pick + // must be larger than one full packet unless the message we pick in + // the current iteration had less-than-one-full-packet data left before + // we finished it. + } while (transmitQueueSpace >= downCast(maxDataPerPacket)); return totalBytesSent; } @@ -967,7 +974,7 @@ BasicTransport::handlePacket(Driver::Received* received) "short (got %u bytes, expected %u)", received->sender->toString().c_str(), length, requiredLength); - driver->release(payload); + driver->returnPacket(payload); return; } timeTrace("client received ALL_DATA, clientId %u, sequence %u, " @@ -991,22 +998,15 @@ BasicTransport::handlePacket(Driver::Received* received) header->common.rpcId.clientId, header->common.rpcId.sequence, header->offset, received->len); - if (header->totalLength > messageZeroCopyThreshold) { - // For relatively long messages, it's possible we need to - // retain their packets for quite some time; give the - // driver a chance to copy out the contents of the - // underlying NIC packet buffer and then release it. - driver->releaseHwPacketBuf(received); - header = received->getOffset(0); - } if (!clientRpc->accumulator) { - clientRpc->accumulator.construct(this, clientRpc->response); - if (header->totalLength > header->unscheduledBytes) { + uint32_t totalLength = header->totalLength; + clientRpc->accumulator.construct(this, clientRpc->response, + totalLength); + if (totalLength > header->unscheduledBytes) { clientRpc->scheduledMessage.construct( clientRpc->rpcId, clientRpc->accumulator.get(), uint32_t(header->unscheduledBytes), - clientRpc->session->serverAddress, - uint32_t(header->totalLength), + clientRpc->session->serverAddress, totalLength, uint8_t(FROM_SERVER)); } } @@ -1206,7 +1206,7 @@ BasicTransport::handlePacket(Driver::Received* received) "short (got %u bytes, expected %u)", received->sender->toString().c_str(), length, requiredLength); - driver->release(payload); + driver->returnPacket(payload); return; } timeTrace("server received ALL_DATA, clientId %u, sequence %u, " @@ -1236,28 +1236,20 @@ BasicTransport::handlePacket(Driver::Received* received) header->common.rpcId.clientId, header->common.rpcId.sequence, header->offset, received->len); - if (header->totalLength > messageZeroCopyThreshold) { - // For relatively long messages, it's possible we need to - // retain their packets for quite some time; give the - // driver a chance to copy out the contents of the - // underlying NIC packet buffer and then release it. - driver->releaseHwPacketBuf(received); - header = received->getOffset(0); - } if (serverRpc == NULL) { serverRpc = serverRpcPool.construct(this, nextServerSequenceNumber, received->sender, header->common.rpcId); nextServerSequenceNumber++; incomingRpcs[header->common.rpcId] = serverRpc; + uint32_t totalLength = header->totalLength; serverRpc->accumulator.construct(this, - &serverRpc->requestPayload); - if (header->totalLength > header->unscheduledBytes) { + &serverRpc->requestPayload, totalLength); + if (totalLength > header->unscheduledBytes) { serverRpc->scheduledMessage.construct( serverRpc->rpcId, serverRpc->accumulator.get(), uint32_t(header->unscheduledBytes), - serverRpc->response.recipient, - uint32_t(header->totalLength), + serverRpc->clientAddress, totalLength, uint8_t(FROM_CLIENT)); } serverTimerList.push_back(*serverRpc); @@ -1407,8 +1399,11 @@ BasicTransport::handlePacket(Driver::Received* received) // it's unlikely that bytes have been lost, so don't // retransmit; just return an BUSY so the client knows // we're still alive. + timeTrace("server about to send BUSY, clientId %u, " + "sequence %u", serverRpc->rpcId.clientId, + serverRpc->rpcId.sequence); BusyHeader busy(serverRpc->rpcId, FROM_SERVER); - sendControlPacket(response->recipient, &busy); + sendControlPacket(serverRpc->clientAddress, &busy); return; } double elapsedMicros = Cycles::toSeconds(Cycles::rdtsc() @@ -1419,7 +1414,7 @@ BasicTransport::handlePacket(Driver::Received* received) received->sender->toString().c_str(), header->common.rpcId.sequence, header->offset, header->length, elapsedMicros); - sendBytes(response->recipient, + sendBytes(serverRpc->clientAddress, serverRpc->rpcId, &serverRpc->replyPayload, header->offset, header->length, response->unscheduledBytes, @@ -1478,7 +1473,7 @@ BasicTransport::handlePacket(Driver::Received* received) string BasicTransport::ServerRpc::getClientServiceLocator() { - return response.recipient->toString(); + return clientAddress->toString(); } /** @@ -1511,7 +1506,7 @@ BasicTransport::ServerRpc::sendReply() Buffer::Iterator iter(&replyPayload, 0, length); timeTrace("server sending ALL_DATA, clientId %u, sequence %u, " "priority %u", rpcId.clientId, rpcId.sequence, 0); - t->driver->sendPacket(response.recipient, &header, &iter, 0); + t->driver->sendPacket(clientAddress, &header, &iter, 0); t->deleteServerRpc(this); bytesSent = length; } else { @@ -1535,14 +1530,17 @@ BasicTransport::ServerRpc::sendReply() * The complete message will be assembled here; caller should ensure * that this is initially empty. The caller owns the storage for this * and must ensure that it persists as long as this object persists. + * \param totalLength + * Length of the message, in bytes. */ BasicTransport::MessageAccumulator::MessageAccumulator(BasicTransport* t, - Buffer* buffer) + Buffer* buffer, uint32_t totalLength) : t(t) , buffer(buffer) , fragments() { assert(buffer->size() == 0); + buffer->reserve(totalLength); } /** @@ -1555,7 +1553,7 @@ BasicTransport::MessageAccumulator::~MessageAccumulator() for (FragmentMap::iterator it = fragments.begin(); it != fragments.end(); it++) { MessageFragment fragment = it->second; - t->driver->release(fragment.header); + t->driver->returnPacket(fragment.header); } fragments.clear(); } @@ -1583,16 +1581,10 @@ BasicTransport::MessageAccumulator::addPacket(DataHeader *header, { length -= sizeof32(DataHeader); - // These should not happen normally. - if (expect_false(header->offset % t->maxDataPerPacket != 0)) { - LOG(WARNING, "Unexpected packet offset %u", header->offset); - return false; - } else if (expect_false((length != t->maxDataPerPacket) && - (header->offset + length < header->totalLength))) { - LOG(WARNING, "Unexpected packet size %u, offset %u", length, - header->offset); - return false; - } + // We only allow a partial packet to appear at the end of a message. + assert(header->offset % t->maxDataPerPacket == 0); + assert((length == t->maxDataPerPacket) || + (header->offset + length == header->totalLength)); bool retainPacket; if (header->offset > buffer->size()) { @@ -1612,12 +1604,24 @@ BasicTransport::MessageAccumulator::addPacket(DataHeader *header, MessageFragment fragment(header, length); do { char* payload = reinterpret_cast(fragment.header); - Driver::PayloadChunk::appendToBuffer(buffer, - payload + sizeof32(DataHeader), fragment.length, - t->driver, payload); + // Currently, the first packet of a multi-packet message must + // be retained in the buffer because ServerRpc::clientAddress + // is pointing to some memory owned by the packet buffer (see + // docs of Driver::Received::sender). In contrast, other packets + // are copied out to reduce the number of chunks in the buffer + // and, thus, jitters caused by destroying the buffer. + if (expect_false(header->offset == 0)) { + Driver::PayloadChunk::appendToBuffer(buffer, + payload + sizeof32(DataHeader), fragment.length, + t->driver, payload); + } else { + buffer->appendCopy(payload + sizeof32(DataHeader), + fragment.length); + } FragmentMap::iterator it = fragments.find(buffer->size()); if (it == fragments.end()) { - return true; + // Only the first packet will be retained. + return (header->offset == 0); } else { fragment = it->second; fragments.erase(it); @@ -1681,7 +1685,7 @@ BasicTransport::MessageAccumulator::requestRetransmission(BasicTransport *t, endOffset = t->roundTripBytes; } if (endOffset <= buffer->size()) { - LOG(ERROR, "Bad endOffset %u, offset %u", endOffset, buffer->size()); + DIE("Bad endOffset %u, offset %u", endOffset, buffer->size()); } const char* fmt = (whoFrom == FROM_SERVER) ? "server requesting retransmission of bytes %u-%u, clientId %u, " @@ -1746,49 +1750,50 @@ BasicTransport::Poller::poll() uint64_t startTime = Cycles::rdtsc(); #endif - // Process available incoming packets. Try to receive MAX_PACKETS packets - // at a time (an optimized driver implementation may prefetch the payloads - // for us). As of 07/2016, MAX_PACKETS is set to 8 because our CPU can - // take at most 8 cache misses at a time (although it's not clear 8 is the - // best value). + // Try to receive MAX_PACKETS packets at a time (an optimized driver + // implementation may prefetch the payloads for us). As of 07/2016, + // MAX_PACKETS is set to 8 because our CPU can take at most 8 cache + // misses at a time (although it's not clear 8 is the best value). #define MAX_PACKETS 8 - uint32_t numPackets; t->driver->receivePackets(MAX_PACKETS, &t->receivedPackets); - numPackets = downCast(t->receivedPackets.size()); -#if TIME_TRACE - // Log the beginning of poll() here so that timetrace entries do not - // go back in time. + uint32_t numPackets = downCast(t->receivedPackets.size()); + + // Process any incoming packet. if (numPackets > 0) { - uint32_t ns = downCast( + result = 1; +#if TIME_TRACE + // Log the beginning of poll() here so that timetrace entries do not + // go back in time. + uint32_t ns = static_cast( Cycles::toNanoseconds(startTime - lastPollTime)); TimeTrace::record(startTime, "start of polling iteration %u, " "last poll was %u ns ago", uint32_t(owner->iteration), ns); - } - lastPollTime = Cycles::rdtsc(); #endif - for (uint i = 0; i < numPackets; i++) { - t->handlePacket(&t->receivedPackets[i]); - } - t->receivedPackets.clear(); - result = numPackets > 0 ? 1 : result; - - // See if we should send out new GRANT packets. Grants are sent here as - // opposed to inside #handlePacket because we would like to coalesse - // GRANT packets to the same message whenever possible. Besides, - // structuring code this way seems to improve the overall performance, - // potentially by being more cache-friendly. - for (ScheduledMessage* recipient : t->messagesToGrant) { - uint8_t whoFrom = (recipient->whoFrom == FROM_CLIENT) ? - FROM_SERVER : FROM_CLIENT; - GrantHeader grant(recipient->rpcId, recipient->grantOffset, whoFrom); - const char* fmt = (whoFrom == FROM_CLIENT) ? - "client sending GRANT, clientId %u, sequence %u, offset %u" : - "server sending GRANT, clientId %u, sequence %u, offset %u"; - timeTrace(fmt, recipient->rpcId.clientId, recipient->rpcId.sequence, - grant.offset); - t->sendControlPacket(recipient->senderAddress, &grant); + for (uint32_t i = 0; i < numPackets; i++) { + t->handlePacket(&t->receivedPackets[i]); + } + t->receivedPackets.clear(); + + // See if we should send out new GRANT packets. Grants are sent here as + // opposed to inside #handlePacket because we would like to coalesse + // GRANT packets to the same message whenever possible. Besides, + // structuring code this way seems to improve the overall performance, + // potentially by being more cache-friendly. + while (!t->messagesToGrant.empty()) { + ScheduledMessage* recipient = t->messagesToGrant.back(); + t->messagesToGrant.pop_back(); + uint8_t whoFrom = (recipient->whoFrom == FROM_CLIENT) ? + FROM_SERVER : FROM_CLIENT; + GrantHeader grant(recipient->rpcId, recipient->grantOffset, + whoFrom); + const char* fmt = (whoFrom == FROM_CLIENT) ? + "client sending GRANT, clientId %u, sequence %u, offset %u": + "server sending GRANT, clientId %u, sequence %u, offset %u"; + timeTrace(fmt, recipient->rpcId.clientId, recipient->rpcId.sequence, + grant.offset); + t->sendControlPacket(recipient->senderAddress, &grant); + } } - t->messagesToGrant.clear(); // See if we should check for timeouts. Ideally, we'd like to do this // every timerInterval. However, it's better not to call checkTimeouts @@ -1804,7 +1809,7 @@ BasicTransport::Poller::poll() // make it harder to notice when a *real* problem happens. Thus, it's // best to eliminate spurious retransmissions as much as possible. uint64_t now = owner->currentTime; - if (now >= t->nextTimeoutCheck) { + if (expect_false(now >= t->nextTimeoutCheck)) { if (t->timeoutCheckDeadline == 0) { t->timeoutCheckDeadline = now + t->timerInterval; } @@ -1823,29 +1828,14 @@ BasicTransport::Poller::poll() // Transmit data packets if possible. uint32_t totalBytesSent = t->tryToTransmitData(); - result = totalBytesSent > 0 ? 1 : result; - - - // Provide a hint to the driver on how many packet buffers to release. - // We try to release only a few packet buffers from large messages at - // a time to avoid jitters. As of 02/2017, releasing one packet buffer - // in the DPDK library takes ~65ns. - int maxRelease; - if (!result) { - // We haven't found anything useful to do in this method up till now. - // Try to release more packet buffers. - maxRelease = 16; - } else if (numPackets == MAX_PACKETS) { - // We received MAX_PACKETS packets, so there may be more packets - // outstanding in the NIC's rx queue. Let's skip the release to get - // to the next poll ASAP. - maxRelease = 0; - } else { - // Common case: release as many packets as received. - maxRelease = numPackets; - } - t->driver->releaseHint(maxRelease); + result += totalBytesSent; + // Release packet buffers that have been returned to the driver. + t->driver->release(); + +#if TIME_TRACE + lastPollTime = startTime; +#endif if (result) { timeTrace("end of polling iteration %u, received %u packets, " "transmitted %u bytes", owner->iteration, numPackets, @@ -2007,9 +1997,8 @@ BasicTransport::checkTimeouts() LOG(ERROR, "Bad grant offset %u", grantOffset); } serverRpc->accumulator->requestRetransmission(this, - serverRpc->response.recipient, serverRpc->rpcId, - grantOffset, FROM_SERVER); - + serverRpc->clientAddress, serverRpc->rpcId, grantOffset, + FROM_SERVER); } } } diff --git a/src/BasicTransport.h b/src/BasicTransport.h index a64d41a14..e48ea332b 100644 --- a/src/BasicTransport.h +++ b/src/BasicTransport.h @@ -160,7 +160,8 @@ class BasicTransport : public Transport { */ class MessageAccumulator { public: - MessageAccumulator(BasicTransport* t, Buffer* buffer); + MessageAccumulator(BasicTransport* t, Buffer* buffer, + uint32_t totalLength); ~MessageAccumulator(); bool addPacket(DataHeader *header, uint32_t length); uint32_t requestRetransmission(BasicTransport *t, @@ -260,9 +261,6 @@ class BasicTransport : public Transport { ClientRpc* clientRpc; ServerRpc* serverRpc; - /// Where to send the message. - const Driver::Address* recipient; - /// Offset within the message of the next byte we should transmit to /// the recipient; all preceding bytes have already been sent. uint32_t transmitOffset; @@ -288,12 +286,10 @@ class BasicTransport : public Transport { uint32_t unscheduledBytes; OutgoingMessage(ClientRpc* clientRpc, ServerRpc* serverRpc, - BasicTransport* t, Buffer* buffer, - const Driver::Address* recipient) + BasicTransport* t, Buffer* buffer) : buffer(buffer) , clientRpc(clientRpc) , serverRpc(serverRpc) - , recipient(recipient) , transmitOffset(0) , transmitLimit() , topChoice(false) @@ -351,7 +347,7 @@ class BasicTransport : public Transport { ClientRpc(Session* session, uint64_t sequence, Buffer* request, Buffer* response, RpcNotifier* notifier) : session(session) - , request(this, NULL, session->t, request, session->serverAddress) + , request(this, NULL, session->t, request) , response(response) , notifier(notifier) , rpcId(session->t->clientId, sequence) @@ -389,6 +385,10 @@ class BasicTransport : public Transport { /// this RPC should be removed at the server's earliest convenience. bool cancelled; + /// Address of the client to which the RPC response will be sent. + /// Not owned by this class. + const Driver::Address* clientAddress; + /// Unique identifier for this RPC. RpcId rpcId; @@ -426,12 +426,13 @@ class BasicTransport : public Transport { : t(transport) , sequence(sequence) , cancelled(false) + , clientAddress(clientAddress) , rpcId(rpcId) , silentIntervals(0) , requestComplete(false) , sendingResponse(false) , accumulator() - , response(NULL, this, transport, &replyPayload, clientAddress) + , response(NULL, this, transport, &replyPayload) , scheduledMessage() , timerLinks() , outgoingResponseLinks() @@ -666,16 +667,6 @@ class BasicTransport : public Transport { /// Maximum # bytes of message data that can fit in one packet. CONST uint32_t maxDataPerPacket; - /// Messages smaller than or equal to this many bytes are received in - /// a zero-copy fashion in their entireties, if the underlying driver - /// permits. The larger this number is set, the more hardware packet - /// buffers we will likely retain at any given time, and the more we - /// deviate from the SRPT policy. If this number is set too large, we - /// may run out of hardware packet buffers and have to stop receiving - /// packets (even worse, we can't complete any message to free up the - /// hardware packet buffers; thus, a deadlock!). - const uint32_t messageZeroCopyThreshold; - /// Maximum # bytes of a message that we consider as small. For small /// messages, the tryToTransmitData mechanism takes more time then just /// transmitting the packet. In order to be efficient on workloads with diff --git a/src/BasicTransportTest.cc b/src/BasicTransportTest.cc index 2303d55a5..ea892854f 100644 --- a/src/BasicTransportTest.cc +++ b/src/BasicTransportTest.cc @@ -36,7 +36,7 @@ class BasicTransportTest : public ::testing::Test { BasicTransportTest() : context(false) - , driver(new MockDriver(BasicTransport::headerToString)) + , driver(new MockDriver(&context, BasicTransport::headerToString)) , transport(&context, NULL, driver, true, 666) , address1("mock:node=1") , address2("mock:node=2") @@ -660,7 +660,7 @@ TEST_F(BasicTransportTest, handlePacket_dataFromServer_basics) { EXPECT_EQ("abcde12345", TestUtil::toString(&wrapper.response)); EXPECT_EQ("", driver->outputLog); EXPECT_EQ(0lu, transport.outgoingRpcs.size()); - EXPECT_EQ(2u, Driver::Received::stealCount); + EXPECT_EQ(1u, Driver::Received::stealCount); } TEST_F(BasicTransportTest, handlePacket_dataFromServer_extraData) { MockWrapper wrapper("message1"); @@ -987,6 +987,7 @@ TEST_F(BasicTransportTest, handlePacket_dataFromClient_extraBytes) { TestUtil::toString(&serverRpc->requestPayload)); } TEST_F(BasicTransportTest, handlePacket_dataFromClient_dontIssueGrant) { + transport.maxDataPerPacket = 5; transport.roundTripBytes = 1000; transport.grantIncrement = 500; uint32_t unscheduledBytes = transport.roundTripBytes; @@ -1300,7 +1301,7 @@ TEST_F(BasicTransportTest, addPacket_basics) { EXPECT_EQ(0u, serverRpc->accumulator->fragments.size()); EXPECT_EQ("P0000P1111P2222P3333P4444", TestUtil::toString(&serverRpc->requestPayload)); - EXPECT_EQ(5u, Driver::Received::stealCount); + EXPECT_EQ(3u, Driver::Received::stealCount); } TEST_F(BasicTransportTest, addPacket_skipRedundantPacket) { // Receive two duplicate packets that contain bytes 10-14. @@ -1409,7 +1410,7 @@ TEST_F(BasicTransportTest, poll_incomingPackets) { BasicTransport::ServerRpc* serverRpc = it->second; EXPECT_EQ("0123456789ABCDE", TestUtil::toString(&serverRpc->requestPayload)); - EXPECT_EQ(3u, Driver::Received::stealCount); + EXPECT_EQ(1u, Driver::Received::stealCount); EXPECT_EQ(1, result); } TEST_F(BasicTransportTest, poll_callCheckTimeouts) { @@ -1478,8 +1479,8 @@ TEST_F(BasicTransportTest, poll_outgoingGrant) { BasicTransport::DataHeader(BasicTransport::RpcId(100, 102), 100, 0, unscheduledBytes, BasicTransport::FROM_CLIENT), "ABCDE"); int result = transport.poller.poll(); - EXPECT_EQ("GRANT FROM_SERVER, rpcId 100.101, offset 40 | " - "GRANT FROM_SERVER, rpcId 100.102, offset 30", + EXPECT_EQ("GRANT FROM_SERVER, rpcId 100.102, offset 30 | " + "GRANT FROM_SERVER, rpcId 100.101, offset 40", driver->outputLog); EXPECT_EQ(1, result); EXPECT_EQ(0u, transport.messagesToGrant.size()); @@ -1493,7 +1494,7 @@ TEST_F(BasicTransportTest, poll_outgoingPacket) { int result = transport.poller.poll(); EXPECT_EQ("ALL_DATA FROM_CLIENT, rpcId 666.1 0123456789 (+5 more)", driver->outputLog); - EXPECT_EQ(1, result); + EXPECT_GT(result, 1); } TEST_F(BasicTransportTest, checkTimeouts_clientTransmissionNotStartedYet) { driver->transmitQueueSpace = 0; diff --git a/src/Buffer.h b/src/Buffer.h index a392a7c99..bf0d7e27f 100644 --- a/src/Buffer.h +++ b/src/Buffer.h @@ -271,6 +271,26 @@ class Buffer { uint32_t peek(uint32_t offset, void** returnPtr); void prependChunk(Chunk* chunk); + + /** + * Reserve a contiguous region of storage for future use by Buffer::alloc; + * the storage is managed internally by the buffer. + * + * Note: This method must only be called when the buffer is in its pristine + * state (i.e., after calling constructor or reset()). + * + * \param numBytes + * # bytes to reserve. + */ + void + reserve(uint32_t numBytes) + { + assert((totalLength == 0) && (firstChunk == NULL)); + uint32_t bytesAllocated; + firstAvailable = getNewAllocation(numBytes, &bytesAllocated); + availableLength = bytesAllocated; + } + virtual void reset(); /** diff --git a/src/BufferTest.cc b/src/BufferTest.cc index 6a4ee8ce4..77b6b535c 100644 --- a/src/BufferTest.cc +++ b/src/BufferTest.cc @@ -975,6 +975,21 @@ TEST_F(BufferTest, getOffset) { EXPECT_EQ('x', t2->c); } +TEST_F(BufferTest, reserve) { + Buffer buffer; + uint32_t numBytes = 1500; + string largeStr(numBytes, 'x'); + buffer.appendCopy(largeStr.c_str(), numBytes); + buffer.appendCopy(largeStr.c_str(), numBytes); + EXPECT_EQ(2u, buffer.getNumberChunks()); + + buffer.reset(); + buffer.reserve(10000); + buffer.appendCopy(largeStr.c_str(), numBytes); + buffer.appendCopy(largeStr.c_str(), numBytes); + EXPECT_EQ(1u, buffer.getNumberChunks()); +} + TEST_F(BufferTest, resetInternal_partial) { // Construct a Buffer in a character array; this is needed so that // the Buffer constructor doesn't get called after we do a partial diff --git a/src/DpdkDriver.cc b/src/DpdkDriver.cc index 32c881834..cbee97bc7 100644 --- a/src/DpdkDriver.cc +++ b/src/DpdkDriver.cc @@ -68,10 +68,6 @@ namespace { } } -// Short-hand to obtain a reference to the metadata storage space that we -// used to store the PacketBufType. -#define packet_buf_type(payload) *(payload - PACKETBUF_TYPE_SIZE) - // Short-hand to obtain the starting address of a DPDK rte_mbuf based on its // payload address. #define payload_to_mbuf(payload) reinterpret_cast( \ @@ -84,10 +80,8 @@ constexpr uint16_t DpdkDriver::PRIORITY_TO_PCP[8]; * Construct a mock DpdkDriver, used for testing only. */ DpdkDriver::DpdkDriver() - : context(NULL) - , packetBufPool() + : Driver(NULL) , packetBufsUtilized(0) - , payloadsToRelease() , locatorString() , localMac() , portId(0) @@ -120,10 +114,8 @@ DpdkDriver::DpdkDriver() */ DpdkDriver::DpdkDriver(Context* context, int port) - : context(context) - , packetBufPool() + : Driver(context) , packetBufsUtilized(0) - , payloadsToRelease() , locatorString() , localMac() , portId(0) @@ -289,7 +281,6 @@ DpdkDriver::DpdkDriver(Context* context, int port) */ DpdkDriver::~DpdkDriver() { - releaseHint(-1); if (packetBufsUtilized != 0) LOG(ERROR, "DpdkDriver deleted with %d packets still in use", packetBufsUtilized); @@ -359,36 +350,12 @@ DpdkDriver::receivePackets(uint32_t maxPackets, // Process received packets by constructing appropriate Received objects. for (uint32_t i = 0; i < totalPkts; i++) { struct rte_mbuf* m = mPkts[i]; - rte_prefetch0(rte_pktmbuf_mtod(m, void *)); - if (unlikely(m->nb_segs > 1)) { - RAMCLOUD_CLOG(WARNING, - "Can't handle packet with %u segments; discarding", - m->nb_segs); - rte_pktmbuf_free(m); - continue; - } - + char* data = rte_pktmbuf_mtod(m, char*); + char* payload = data + ETHER_HDR_LEN; + uint32_t length = rte_pktmbuf_pkt_len(m) - ETHER_HDR_LEN; struct ether_hdr* ethHdr = rte_pktmbuf_mtod(m, struct ether_hdr*); - uint16_t ether_type = ethHdr->ether_type; - uint32_t headerLength = ETHER_HDR_LEN; - char* payload = reinterpret_cast(ethHdr + 1); - if (ether_type == rte_cpu_to_be_16(ETHER_TYPE_VLAN)) { - struct vlan_hdr* vlanHdr = - reinterpret_cast(payload); - ether_type = vlanHdr->eth_proto; - headerLength += VLAN_TAG_LEN; - payload += VLAN_TAG_LEN; - } - if (!hasHardwareFilter) { - // Perform packet filtering by software to skip irrelevant - // packets such as ipmi or kernel TCP/IP traffics. - if (ether_type != - rte_cpu_to_be_16(NetUtil::EthPayloadType::RAMCLOUD)) { - rte_pktmbuf_free(m); - continue; - } - } - PerfStats::threadStats.networkInputBytes += rte_pktmbuf_pkt_len(m); + assert(ethHdr->ether_type == rte_cpu_to_be_16( + NetUtil::EthPayloadType::RAMCLOUD)); // By default, we would like to construct the Received object using // the payload directly (as opposed to copying out the payload to a @@ -397,71 +364,28 @@ DpdkDriver::receivePackets(uint32_t maxPackets, // and the packet buf type (so we know where this payload comes from). // See http://dpdk.org/doc/guides/prog_guide/mbuf_lib.html for the // diagram of rte_mbuf's internal structure. + assert(m->data_off <= sizeof(MacAddress)); MacAddress* sender = reinterpret_cast(m->buf_addr); - if (unlikely(reinterpret_cast(sender + 1) > - rte_pktmbuf_mtod(m, char*))) { - LOG(ERROR, "Not enough headroom in the packet mbuf; " - "dropping packet"); - rte_pktmbuf_free(m); - continue; - } new(sender) MacAddress(ethHdr->s_addr.addr_bytes); - packet_buf_type(payload) = DPDK_MBUF; - packetBufsUtilized++; - uint32_t length = rte_pktmbuf_pkt_len(m) - headerLength; - assert(length <= MAX_PAYLOAD_SIZE); receivedPackets->emplace_back(sender, this, length, payload); + PerfStats::threadStats.networkInputBytes += rte_pktmbuf_pkt_len(m); + packetBufsUtilized++; timeTrace("received packet processed, payload size %u", length); } } // See docs in Driver class. void -DpdkDriver::release(char *payload) +DpdkDriver::release() { - // Must sync with the dispatch thread, since this method could potentially - // be invoked in a worker. - Dispatch::Lock _(context->dispatch); - payloadsToRelease.push_back(payload); -} - -// See docs in Driver class. -void -DpdkDriver::releaseHint(int maxCount) -{ - while ((maxCount != 0) && !payloadsToRelease.empty()) { - maxCount--; - char* payload = payloadsToRelease.back(); - payloadsToRelease.pop_back(); - - packetBufsUtilized--; - assert(packetBufsUtilized >= 0); - if (packet_buf_type(payload) == DPDK_MBUF) { - rte_pktmbuf_free(payload_to_mbuf(payload)); - } else { - packetBufPool.destroy(reinterpret_cast( - payload - OFFSET_OF(PacketBuf, payload))); - } + packetBufsUtilized -= static_cast(packetsToRelease.size()); + assert(packetBufsUtilized >= 0); + while (!packetsToRelease.empty()) { + rte_pktmbuf_free(payload_to_mbuf(packetsToRelease.back())); + packetsToRelease.pop_back(); } } -// See docs in Driver class. -void -DpdkDriver::releaseHwPacketBuf(Driver::Received* received) -{ - struct rte_mbuf* mbuf = payload_to_mbuf(received->payload); - MacAddress* sender = reinterpret_cast(mbuf->buf_addr); - // Copy the sender address and payload to our PacketBuf. - PacketBuf* packetBuf = packetBufPool.construct(); - packetBuf->sender.construct(*sender); - packet_buf_type(packetBuf->payload) = RAMCLOUD_PACKET_BUF; - rte_memcpy(packetBuf->payload, received->payload, received->len); - // Replace rte_mbuf with our PacketBuf and release it. - received->sender = packetBuf->sender.get(); - received->payload = packetBuf->payload; - rte_pktmbuf_free(mbuf); -} - // See docs in Driver class. void DpdkDriver::sendPacket(const Address* addr, @@ -487,12 +411,11 @@ DpdkDriver::sendPacket(const Address* addr, struct rte_mbuf* mbuf = rte_pktmbuf_alloc(mbufPool); #endif if (unlikely(NULL == mbuf)) { - uint32_t numMbufsAvail = rte_mempool_avail_count(mbufPool); - uint32_t numMbufsInUse = rte_mempool_in_use_count(mbufPool); RAMCLOUD_CLOG(WARNING, "Failed to allocate a packet buffer; dropping packet; " - "%u mbufs available, %u mbufs in use, %lu payloads to release", - numMbufsAvail, numMbufsInUse, payloadsToRelease.size()); + "%u mbufs available, %u mbufs in use", + rte_mempool_avail_count(mbufPool), + rte_mempool_in_use_count(mbufPool)); return; } diff --git a/src/DpdkDriver.h b/src/DpdkDriver.h index efafb4076..2163723b4 100644 --- a/src/DpdkDriver.h +++ b/src/DpdkDriver.h @@ -65,9 +65,7 @@ class DpdkDriver : public Driver virtual uint32_t getPacketOverhead(); virtual void receivePackets(uint32_t maxPackets, std::vector* receivedPackets); - virtual void release(char *payload); - virtual void releaseHint(int maxCount); - virtual void releaseHwPacketBuf(Driver::Received* received); + virtual void release(); virtual void sendPacket(const Address* addr, const void* header, uint32_t headerLen, @@ -110,38 +108,9 @@ class DpdkDriver : public Driver {1 << 13, 0 << 13, 2 << 13, 3 << 13, 4 << 13, 5 << 13, 6 << 13, 7 << 13}; - /// See docs in Driver class. The additional headroom space is used to - /// store the packet buf type. - typedef Driver::PacketBuf PacketBuf; - - /** - * This enum defines two types of DpdkDriver::PacketBuf that differ on - * their backing memory. Used to implement the zero-copy RX mechanism. - */ - enum PacketBufType { - /// The memory is allocated from #mbufPool and the packet buf is - /// constructed in a zero-copy fashion. - DPDK_MBUF, - /// The memory is allocated from #packetBufPool and the packet buf is - /// constructed in a copy-out fashion. - RAMCLOUD_PACKET_BUF - }; - - Context* context; - - /// Holds packet buffers that are no longer in use, for use in future - /// requests; saves the overhead of calling malloc/free for each request. - ObjectPool packetBufPool; - /// Tracks number of outstanding allocated payloads. For detecting leaks. int packetBufsUtilized; - /// Holds packet buffers that the transport has done processing and - /// returned. These packet buffers are recycled incrementally to avoid - /// jitters. - std::vector payloadsToRelease; - /// The original ServiceLocator string. May be empty if the constructor /// argument was NULL. May also differ if dynamic ports are used. string locatorString; diff --git a/src/Driver.cc b/src/Driver.cc index cdd5c377e..492961f91 100644 --- a/src/Driver.cc +++ b/src/Driver.cc @@ -25,8 +25,8 @@ uint32_t Driver::Received::stealCount = 0; */ Driver::Received::~Received() { - if (payload && driver) - driver->release(payload); + if (payload) + driver->returnPacket(payload); } /** @@ -52,7 +52,7 @@ Driver::Received::getRange(uint32_t offset, uint32_t length) /** * Return a pointer to the raw data received from the Driver, obligating - * the caller to return the resources to the Driver using Driver::release() + * the caller to return the resources to the Driver using Driver::returnPacket * when the resources are no longer in use. * * This is generally used with PayloadChunk to allow Driver allocated memory @@ -95,9 +95,9 @@ Driver::Received::steal(uint32_t *len) * The length in bytes of the region starting at data that is a * subregion of the payload. * \param driver - * The Driver to release() this payload to on Buffer destruction. + * The Driver to returnPacket() this payload to on Buffer destruction. * \param payload - * The address to release() to the Driver on destruction. + * The address to returnPacket() to the Driver on destruction. */ Driver::PayloadChunk* Driver::PayloadChunk::appendToBuffer(Buffer* buffer, @@ -116,7 +116,7 @@ Driver::PayloadChunk::appendToBuffer(Buffer* buffer, Driver::PayloadChunk::~PayloadChunk() { if (driver) { - driver->release(payload); + driver->returnPacket(payload, false); } } diff --git a/src/Driver.h b/src/Driver.h index 6a367c34a..3a980cb1e 100644 --- a/src/Driver.h +++ b/src/Driver.h @@ -21,6 +21,7 @@ #include "Common.h" #include "Buffer.h" +#include "Dispatch.h" #include "QueueEstimator.h" #undef CURRENT_LOG_MODULE @@ -48,6 +49,12 @@ class Driver { public: virtual ~Address() {} + /** + * Return a 64-bit unsigned integer hash of this Address (for debugging, + * logging, and fast equality test). + */ + virtual uint64_t getHash() const = 0; + /** * Return a string describing the contents of this Address (for * debugging, logging, etc). @@ -214,9 +221,17 @@ class Driver { /// Create a protected short alias to be used in Driver subclasses. using TransmitQueueState = QueueEstimator::TransmitQueueState; - explicit Driver() - : lastTransmitTime(0) + /** + * Driver constructor. + * + * \param context + * Overall information about the RAMCloud server or client. + */ + explicit Driver(Context* context) + : context(context) + , lastTransmitTime(0) , maxTransmitQueueSize(0) + , packetsToRelease() , queueEstimator(0) { // Default: no throttling of transmissions (probably not a good idea). @@ -307,65 +322,47 @@ class Driver { } /** - * Return ownership of a packet buffer back to the driver. - * - * Invoked by a transport when it has finished processing the data - * in an incoming packet; used by drivers to recycle packet buffers - * at a safe time. + * Release the packet buffers previously enqueued by Driver::returnPacket + * so their associated resources can be recycled. * - * \param payload - * The first byte of a packet that was previously "stolen" - * (i.e., the payload field from the Received object used to - * pass the packet to the transport when it was received). + * Invoked by a transport in its poll method. */ - virtual void release(char *payload) = 0; - - template - void release(T* payload) { - release(reinterpret_cast(payload)); - } + virtual void release() = 0; /** - * A hint from the transport that it's a good time for the driver to - * actually recycle the resources of a few packet buffers. + * Return ownership of a packet buffer back to the driver so the packet + * buffer can be safely released later. * - * Note: there is no guarantee that the driver will follow this hint. - * For instance, a driver can simply ignore this hint and perform the - * release whenever the ownership of a packet buffer is returned via - * #release (which is the default behavior). + * This method is usually invoked by a transport when it has finished + * processing the data in an incoming packet. This method can also be + * invoked by an RPC client when it has finished processing the RPC reply. + * Note that this method does *NOT* actually release the packet buffer; + * the transport must invoke Driver::release in its poll method for that + * to happen. * - * \param maxCount - * The maximum number of packet buffers to release. - * -1 means unlimited. - */ - virtual void releaseHint(int maxCount) {} - - /** - * This method is invoked by transports to tell a driver that it should - * release a packet buffer back to the NIC. It is needed because some - * drivers may use zero-copy techniques when packets arrive, passing the - * input packet buffer to the transport instead of copying the data into - * a separate buffer. The zero-copy approach can potentially cause the NIC - * to run out of packet buffers (e.g. if several very long messages are - * being received but none is yet complete). Transports must detect - * excessive use of packet buffers and invoke this method on some - * of the incoming packets. If received is currently occupying one of - * the NIC's packet buffers (i.e. it hasn't already been copied), the - * driver must copy the packet into a separate copy in memory (e.g. - * Driver::PacketBuf) so it can release the packet buffer back to the - * NIC. If this packet has already been copied out of the NIC's buffer, - * then the method does nothing. This method must not be invoked if the - * transport has retained a pointer to data in the original packet (e.g. - * by calling getRange or getOffset, or by accessing payload). + * Defining this method here as inline allows the compiler to remove + * unnecessary synchronization when it's called from the dispatch thread, + * which is the common case. * - * \param received - * An incoming packet which contains the packet buffer to copy. + * \param payload + * The first byte of a packet that was previously "stolen" + * (i.e., the payload field from the Received object used to + * pass the packet to the transport when it was received). + * \param isDispatchThread + * True if this method is invoked from the dispatch thread. */ - virtual void releaseHwPacketBuf(Driver::Received* received) + VIRTUAL_FOR_TESTING void + returnPacket(void* payload, bool isDispatchThread = true) { - // The default implementation does nothing. It can be used by drivers - // that don't support zero-copy RX or have sufficient hardware-managed - // packet buffers. + if (isDispatchThread) { + assert(context->dispatch->isDispatchThread()); + packetsToRelease.push_back(static_cast(payload)); + } else { + // Must sync with the dispatch thread, since this method could + // be invoked from a worker thread (e.g. in ~PayloadChunk). + Dispatch::Lock _(context->dispatch); + packetsToRelease.push_back(static_cast(payload)); + } } /** @@ -507,6 +504,9 @@ class Driver { static const uint32_t MAX_DRAIN_TIME = 2000; PROTECTED: + /// Shared RAMCloud information. + Context* context; + /// The most recent time that the driver handed a packet to the NIC, /// in rdtsc ticks. uint64_t lastTransmitTime; @@ -515,8 +515,14 @@ class Driver { /// at any given time. uint32_t maxTransmitQueueSize; + /// Holds incoming packets that have been processed by the transport and + /// can be safely released by the driver. + std::vector packetsToRelease; + /// Used to estimate # bytes outstanding in the NIC's transmit queue. QueueEstimator queueEstimator; + + DISALLOW_COPY_AND_ASSIGN(Driver) }; /** diff --git a/src/HomaTransport.cc b/src/HomaTransport.cc index 9472e6e82..6e2d7bae5 100644 --- a/src/HomaTransport.cc +++ b/src/HomaTransport.cc @@ -72,12 +72,6 @@ HomaTransport::HomaTransport(Context* context, const ServiceLocator* locator, , poller(context, this) , maxDataPerPacket(driver->getMaxPacketSize() - sizeof32(DataHeader)) - // As of 09/2017, with this value set to 100*maxDataPerPacket, we haven't - // observed any message drop due to driver packet buffer exhaustion when - // running workloads W3, W4 and W5 that are used in the HomaTransport paper - // evaluation. - , messageZeroCopyThreshold(100*maxDataPerPacket) - // As of 09/2017, we consider messages less than 300 bytes as small (which // takes at most 240 ns to transmit on a 10Gbps network). This value is // chosen experimentally so that we can run W3 in Homa paper at 80% load on @@ -202,6 +196,7 @@ HomaTransport::~HomaTransport() deleteClientRpc(clientRpc); } + driver->release(); if (driverOwner) { delete driver; } @@ -244,7 +239,7 @@ HomaTransport::deleteClientRpc(ClientRpc* clientRpc) * When we are finished processing an incoming RPC, this method is * invoked to delete the RPC object and remove it from all existing * data structures. - * + * * \param serverRpc * An RPC that has either completed normally or should be * aborted. @@ -321,20 +316,21 @@ HomaTransport::getRoundTripBytes(const ServiceLocator* locator) } } } - if (gBitsPerSec == 0) { - gBitsPerSec = driver->getBandwidth() / 1000; - if (gBitsPerSec == 0) { - gBitsPerSec = 10; + + uint32_t mBitsPerSec = gBitsPerSec * 1000; + if (mBitsPerSec == 0) { + mBitsPerSec = driver->getBandwidth(); + if (mBitsPerSec == 0) { + mBitsPerSec = 10 * 1000; } } // Compute round-trip time in terms of full packets (round up). - uint32_t roundTripBytes = (roundTripMicros*gBitsPerSec*1000)/8; + uint32_t roundTripBytes = (roundTripMicros*mBitsPerSec)/8; roundTripBytes = ((roundTripBytes+maxDataPerPacket-1)/maxDataPerPacket) * maxDataPerPacket; - - LOG(NOTICE, "roundTripMicros %u, gBitsPerSec %u, roundTripBytes %u", - roundTripMicros, gBitsPerSec, roundTripBytes); + LOG(NOTICE, "roundTripMicros %u, mBitsPerSec %u, roundTripBytes %u", + roundTripMicros, mBitsPerSec, roundTripBytes); return roundTripBytes; } @@ -514,7 +510,7 @@ HomaTransport::opcodeSymbol(uint8_t opcode) { * both for requests and for responses. When a method returns, the given * range of data will have been queued for the NIC but may not actually * have been transmitted yet. - * + * * \param address * Identifies the destination for the message. * \param rpcId @@ -549,27 +545,25 @@ HomaTransport::sendBytes(const Driver::Address* address, RpcId rpcId, uint32_t unscheduledBytes, uint8_t priority, uint8_t flags, bool partialOK) { - uint32_t messageSize = message->size(); - uint32_t curOffset = offset; + uint32_t transmitLimit = std::min(message->size(), curOffset + maxBytes); uint32_t bytesSent = 0; - while ((curOffset < messageSize) && (bytesSent < maxBytes)) { + while (curOffset < transmitLimit) { // Don't send less-than-full-size packets except for the last packet // of the message (unless the caller explicitly requested it). - uint32_t bytesThisPacket = - std::min(maxDataPerPacket, messageSize - curOffset); - if ((bytesSent + bytesThisPacket) > maxBytes) { - if (!partialOK) { - break; - } - bytesThisPacket = maxBytes - bytesSent; + uint32_t bytesThisPacket = transmitLimit - curOffset; + if (bytesThisPacket >= maxDataPerPacket) { + bytesThisPacket = maxDataPerPacket; + } else if ((transmitLimit < message->size()) && !partialOK) { + break; } + QueueEstimator::TransmitQueueState txQueueState; - if (bytesThisPacket == messageSize) { + if (bytesThisPacket == message->size()) { // Entire message fits in a single packet. AllDataHeader header(rpcId, flags, - downCast(messageSize)); - Buffer::Iterator iter(message, 0, messageSize); + downCast(message->size())); + Buffer::Iterator iter(message); const char* fmt = (flags & FROM_CLIENT) ? "client sending ALL_DATA, clientId %u, sequence %u, " "priority %u" : @@ -596,8 +590,7 @@ HomaTransport::sendBytes(const Driver::Address* address, RpcId rpcId, timeTrace("sent data, %u bytes queued ahead", txQueueState.outstandingBytes); } else { - timeTrace("sent data, tx queue idle time %u cyc", - txQueueState.idleTime); + timeTrace("sent data, tx queue idle %u cyc", txQueueState.idleTime); } bytesSent += bytesThisPacket; curOffset += bytesThisPacket; @@ -630,7 +623,7 @@ HomaTransport::sendControlPacket(const Driver::Address* recipient, /** * Given a pointer to a HomaTransport packet, return a human-readable * string describing the information in its header. - * + * * \param packet * Address of the first byte of the packet header, which must be * contiguous in memory. @@ -751,10 +744,14 @@ HomaTransport::tryToTransmitData() // driver immediately. int transmitQueueSpace = driver->getTransmitQueueSpace(context->dispatch->currentTime); + if ((transmitQueueSpace <= 0) || + (outgoingRequests.empty() && outgoingResponses.empty())) { + return 0; + } // Each iteration of the following loop transmits data packets for // a single request or response. - while (transmitQueueSpace > 0) { + do { // Find an outgoing request or response that is ready to transmit. // The policy here is "shortest remaining processing time" (SRPT). // That is, choosing the message with the fewest bytes remaining @@ -864,13 +861,16 @@ HomaTransport::tryToTransmitData() RpcId rpcId = clientRpc ? clientRpc->rpcId : serverRpc->rpcId; uint8_t whoFrom = clientRpc ? FROM_CLIENT : FROM_SERVER; - uint32_t bytesSent = sendBytes(message->recipient, rpcId, - message->buffer, message->transmitOffset, maxBytes, + const Driver::Address* address = clientRpc ? + clientRpc->session->serverAddress : + serverRpc->clientAddress; + uint32_t bytesSent = sendBytes(address, rpcId, message->buffer, + message->transmitOffset, maxBytes, message->unscheduledBytes, message->transmitPriority, whoFrom); if (bytesSent == 0) { - // We can't transmit any more data because the remaining queue - // space is too small. + // If this message can't be transmitted due to the queue space + // limit, neither can the next message (which is even larger). break; } @@ -905,7 +905,13 @@ HomaTransport::tryToTransmitData() // There are no messages with data that can be transmitted. break; } - } + // Exit the loop if the remaining queue space is *likely* too small for + // us to send more data. For example, if the queue space is now smaller + // than one full packet, the remaining size of the next message we pick + // must be larger than one full packet unless the message we pick in + // the current iteration had less-than-one-full-packet data left before + // we finished it. + } while (transmitQueueSpace >= downCast(maxDataPerPacket)); return totalBytesSent; } @@ -1115,7 +1121,7 @@ HomaTransport::Session::sendRequest(Buffer* request, Buffer* response, * This method is invoked whenever a packet arrives. It is the top-level * dispatching method for dealing with incoming packets, both for requests * and responses. - * + * * \param received * Information about the new packet. */ @@ -1175,7 +1181,7 @@ HomaTransport::handlePacket(Driver::Received* received) "short (got %u bytes, expected %u)", received->sender->toString().c_str(), length, requiredLength); - driver->release(payload); + driver->returnPacket(payload); return; } timeTrace("client received ALL_DATA, clientId %u, sequence %u, " @@ -1199,14 +1205,6 @@ HomaTransport::handlePacket(Driver::Received* received) header->common.rpcId.clientId, header->common.rpcId.sequence, header->offset, received->len); - if (header->totalLength > messageZeroCopyThreshold) { - // For relatively long messages, it's possible we need to - // retain their packets for quite some time; give the - // driver a chance to copy out the contents of the - // underlying NIC packet buffer and then release it. - driver->releaseHwPacketBuf(received); - header = received->getOffset(0); - } if (!clientRpc->accumulator) { uint32_t totalLength = header->totalLength; clientRpc->accumulator.construct(this, clientRpc->response, @@ -1342,12 +1340,17 @@ HomaTransport::handlePacket(Driver::Received* received) // happen when some bytes of a large message sent with the // lowest priorities gets stuck at the TOR switch queue for // too long. - LOG(NOTICE, "Retransmitting to server %s: " + RAMCLOUD_CLOG(WARNING, "Retransmitting to server %s: " "sequence %lu, offset %u, length %u, priority %u, " "elapsed time %.1f us", received->sender->toString().c_str(), header->common.rpcId.sequence, header->offset, header->length, header->priority, elapsedMicros); + timeTrace("Retransmitting to server clientId %u" + "sequence %u, offset %u, length %u", + header->common.rpcId.clientId, + header->common.rpcId.sequence, header->offset, + header->length); // Resent bytes are passed directly to the NIC for simplicity; // we expect retransmission to be rare enough so that this // won't affect even the tail latency of other messages. @@ -1410,7 +1413,7 @@ HomaTransport::handlePacket(Driver::Received* received) "short (got %u bytes, expected %u)", received->sender->toString().c_str(), length, requiredLength); - driver->release(payload); + driver->returnPacket(payload); return; } timeTrace("server received ALL_DATA, clientId %u, sequence %u, " @@ -1440,14 +1443,6 @@ HomaTransport::handlePacket(Driver::Received* received) header->common.rpcId.clientId, header->common.rpcId.sequence, header->offset, received->len); - if (header->totalLength > messageZeroCopyThreshold) { - // For relatively long messages, it's possible we need to - // retain their packets for quite some time; give the - // driver a chance to copy out the contents of the - // underlying NIC packet buffer and then release it. - driver->releaseHwPacketBuf(received); - header = received->getOffset(0); - } if (serverRpc == NULL) { serverRpc = serverRpcPool.construct(this, nextServerSequenceNumber, received->sender, @@ -1461,7 +1456,7 @@ HomaTransport::handlePacket(Driver::Received* received) serverRpc->scheduledMessage.construct( serverRpc->rpcId, serverRpc->accumulator.get(), uint32_t(header->unscheduledBytes), - serverRpc->response.recipient, totalLength, + serverRpc->clientAddress, totalLength, uint8_t(FROM_CLIENT)); } serverTimerList.push_back(*serverRpc); @@ -1596,19 +1591,27 @@ HomaTransport::handlePacket(Driver::Received* received) // it's unlikely that bytes have been lost, so don't // retransmit; just return an BUSY so the client knows // we're still alive. + timeTrace("server about to send BUSY, clientId %u, " + "sequence %u", serverRpc->rpcId.clientId, + serverRpc->rpcId.sequence); BusyHeader busy(serverRpc->rpcId, FROM_SERVER); - sendControlPacket(response->recipient, &busy); + sendControlPacket(serverRpc->clientAddress, &busy); return; } double elapsedMicros = Cycles::toSeconds(Cycles::rdtsc() - response->lastTransmitTime)*1e06; - LOG(NOTICE, "Retransmitting to client %s: " + RAMCLOUD_CLOG(WARNING, "Retransmitting to client %s: " "sequence %lu, offset %u, length %u, priority %u, " "elapsed time %.1f us", received->sender->toString().c_str(), header->common.rpcId.sequence, header->offset, header->length, header->priority, elapsedMicros); - sendBytes(response->recipient, + timeTrace("Retransmitting to clientId %u, " + "sequence %u, offset %u, length %u", + header->common.rpcId.clientId, + header->common.rpcId.sequence, header->offset, + header->length); + sendBytes(serverRpc->clientAddress, serverRpc->rpcId, &serverRpc->replyPayload, header->offset, header->length, response->unscheduledBytes, header->priority, @@ -1667,7 +1670,7 @@ HomaTransport::handlePacket(Driver::Received* received) string HomaTransport::ServerRpc::getClientServiceLocator() { - return response.recipient->toString(); + return clientAddress->toString(); } /** @@ -1702,7 +1705,7 @@ HomaTransport::ServerRpc::sendReply() timeTrace("server sending ALL_DATA, clientId %u, sequence %u, " "priority %u", rpcId.clientId, rpcId.sequence, response.transmitPriority); - t->driver->sendPacket(response.recipient, &header, &iter, + t->driver->sendPacket(clientAddress, &header, &iter, response.transmitPriority); t->deleteServerRpc(this); bytesSent = length; @@ -1738,6 +1741,7 @@ HomaTransport::MessageAccumulator::MessageAccumulator(HomaTransport* t, , totalLength(totalLength) { assert(buffer->size() == 0); + buffer->reserve(totalLength); } /** @@ -1750,7 +1754,7 @@ HomaTransport::MessageAccumulator::~MessageAccumulator() for (FragmentMap::iterator it = fragments.begin(); it != fragments.end(); it++) { MessageFragment fragment = it->second; - t->driver->release(fragment.header); + t->driver->returnPacket(fragment.header); } fragments.clear(); } @@ -1778,16 +1782,10 @@ HomaTransport::MessageAccumulator::addPacket(DataHeader *header, { length -= sizeof32(DataHeader); - // These should not happen normally. - if (expect_false(header->offset % t->maxDataPerPacket != 0)) { - LOG(WARNING, "Unexpected packet offset %u", header->offset); - return false; - } else if (expect_false((length != t->maxDataPerPacket) && - (header->offset + length < header->totalLength))) { - LOG(WARNING, "Unexpected packet size %u, offset %u", length, - header->offset); - return false; - } + // We only allow a partial packet to appear at the end of a message. + assert(header->offset % t->maxDataPerPacket == 0); + assert((length == t->maxDataPerPacket) || + (header->offset + length == header->totalLength)); bool retainPacket; if (header->offset > buffer->size()) { @@ -1807,12 +1805,24 @@ HomaTransport::MessageAccumulator::addPacket(DataHeader *header, MessageFragment fragment(header, length); do { char* payload = reinterpret_cast(fragment.header); - Driver::PayloadChunk::appendToBuffer(buffer, - payload + sizeof32(DataHeader), fragment.length, - t->driver, payload); + // Currently, the first packet of a multi-packet message must + // be retained in the buffer because ServerRpc::clientAddress + // is pointing to some memory owned by the packet buffer (see + // docs of Driver::Received::sender). In contrast, other packets + // are copied out to reduce the number of chunks in the buffer + // and, thus, jitters caused by destroying the buffer. + if (expect_false(header->offset == 0)) { + Driver::PayloadChunk::appendToBuffer(buffer, + payload + sizeof32(DataHeader), fragment.length, + t->driver, payload); + } else { + buffer->appendCopy(payload + sizeof32(DataHeader), + fragment.length); + } FragmentMap::iterator it = fragments.find(buffer->size()); if (it == fragments.end()) { - return true; + // Only the first packet will be retained. + return (header->offset == 0); } else { fragment = it->second; fragments.erase(it); @@ -1879,7 +1889,7 @@ HomaTransport::MessageAccumulator::requestRetransmission(HomaTransport *t, endOffset = t->roundTripBytes; } if (endOffset <= buffer->size()) { - LOG(ERROR, "Bad endOffset %u, offset %u", endOffset, buffer->size()); + DIE("Bad endOffset %u, offset %u", endOffset, buffer->size()); } const char* fmt = (whoFrom == FROM_SERVER) ? "server requesting retransmission of bytes %u-%u, clientId %u, " @@ -1921,7 +1931,7 @@ HomaTransport::ScheduledMessage::ScheduledMessage(RpcId rpcId, , grantPriority() , rpcId(rpcId) , senderAddress(senderAddress) - , senderHash(std::hash{}(senderAddress->toString())) + , senderHash(senderAddress->getHash()) , state(NEW) , totalLength(totalLength) , whoFrom(whoFrom) @@ -1980,53 +1990,52 @@ HomaTransport::Poller::poll() uint64_t startTime = Cycles::rdtsc(); #endif - // Process available incoming packets. Try to receive MAX_PACKETS packets - // at a time (an optimized driver implementation may prefetch the payloads - // for us). As of 07/2016, MAX_PACKETS is set to 8 because our CPU can - // take at most 8 cache misses at a time (although it's not clear 8 is the - // best value). + // Try to receive MAX_PACKETS packets at a time (an optimized driver + // implementation may prefetch the payloads for us). As of 07/2016, + // MAX_PACKETS is set to 8 because our CPU can take at most 8 cache + // misses at a time (although it's not clear 8 is the best value). #define MAX_PACKETS 8 - uint32_t numPackets; t->driver->receivePackets(MAX_PACKETS, &t->receivedPackets); - numPackets = downCast(t->receivedPackets.size()); -#if TIME_TRACE - // Log the beginning of poll() here so that timetrace entries do not - // go back in time. + uint32_t numPackets = downCast(t->receivedPackets.size()); + + // Process any incoming packet. if (numPackets > 0) { - uint32_t ns = downCast( + result = 1; +#if TIME_TRACE + // Log the beginning of poll() here so that timetrace entries do not + // go back in time. + uint32_t ns = static_cast( Cycles::toNanoseconds(startTime - lastPollTime)); TimeTrace::record(startTime, "start of polling iteration %u, " "last poll was %u ns ago", uint32_t(owner->iteration), ns); - } - lastPollTime = Cycles::rdtsc(); #endif - for (uint i = 0; i < numPackets; i++) { - t->handlePacket(&t->receivedPackets[i]); - } - t->receivedPackets.clear(); - result = numPackets > 0 ? 1 : result; - - // See if we should send out new GRANT packets. Grants are sent here as - // opposed to inside #handlePacket because we would like to coalesse - // GRANT packets to the same message whenever possible. Besides, - // structuring code this way seems to improve the overall performance, - // potentially by being more cache-friendly. - for (ScheduledMessage* recipient : t->messagesToGrant) { - uint8_t whoFrom = (recipient->whoFrom == FROM_CLIENT) ? - FROM_SERVER : FROM_CLIENT; - GrantHeader grant(recipient->rpcId, recipient->grantOffset, - downCast(recipient->grantPriority), whoFrom); - const char* fmt = (whoFrom == FROM_CLIENT) ? - "client sending GRANT, clientId %u, sequence %u, offset %u, " - "priority %u" : - "server sending GRANT, clientId %u, sequence %u, offset %u, " - "priority %u"; - timeTrace(fmt, recipient->rpcId.clientId, recipient->rpcId.sequence, - grant.offset, grant.priority); - - t->sendControlPacket(recipient->senderAddress, &grant); - } - t->messagesToGrant.clear(); + for (uint32_t i = 0; i < numPackets; i++) { + t->handlePacket(&t->receivedPackets[i]); + } + t->receivedPackets.clear(); + + // See if we should send out new GRANT packets. Grants are sent here as + // opposed to inside #handlePacket because we would like to coalesse + // GRANT packets to the same message whenever possible. Besides, + // structuring code this way seems to improve the overall performance, + // potentially by being more cache-friendly. + while (!t->messagesToGrant.empty()) { + ScheduledMessage* recipient = t->messagesToGrant.back(); + t->messagesToGrant.pop_back(); + uint8_t whoFrom = (recipient->whoFrom == FROM_CLIENT) ? + FROM_SERVER : FROM_CLIENT; + GrantHeader grant(recipient->rpcId, recipient->grantOffset, + downCast(recipient->grantPriority), whoFrom); + const char* fmt = (whoFrom == FROM_CLIENT) ? + "client sending GRANT, clientId %u, sequence %u, offset %u," + "priority %u" : + "server sending GRANT, clientId %u, sequence %u, offset %u," + " priority %u"; + timeTrace(fmt, recipient->rpcId.clientId, recipient->rpcId.sequence, + grant.offset, grant.priority); + t->sendControlPacket(recipient->senderAddress, &grant); + } + } // See if we should check for timeouts. Ideally, we'd like to do this // every timerInterval. However, it's better not to call checkTimeouts @@ -2042,7 +2051,7 @@ HomaTransport::Poller::poll() // make it harder to notice when a *real* problem happens. Thus, it's // best to eliminate spurious retransmissions as much as possible. uint64_t now = owner->currentTime; - if (now >= t->nextTimeoutCheck) { + if (expect_false(now >= t->nextTimeoutCheck)) { if (t->timeoutCheckDeadline == 0) { t->timeoutCheckDeadline = now + t->timerInterval; } @@ -2061,29 +2070,14 @@ HomaTransport::Poller::poll() // Transmit data packets if possible. uint32_t totalBytesSent = t->tryToTransmitData(); - result = totalBytesSent > 0 ? 1 : result; - - - // Provide a hint to the driver on how many packet buffers to release. - // We try to release only a few packet buffers from large messages at - // a time to avoid jitters. As of 02/2017, releasing one packet buffer - // in the DPDK library takes ~65ns. - int maxRelease; - if (!result) { - // We haven't found anything useful to do in this method up till now. - // Try to release more packet buffers. - maxRelease = 16; - } else if (numPackets == MAX_PACKETS) { - // We received MAX_PACKETS packets, so there may be more packets - // outstanding in the NIC's rx queue. Let's skip the release to get - // to the next poll ASAP. - maxRelease = 0; - } else { - // Common case: release as many packets as received. - maxRelease = numPackets; - } - t->driver->releaseHint(maxRelease); + result += totalBytesSent; + + // Release packet buffers that have been returned to the driver. + t->driver->release(); +#if TIME_TRACE + lastPollTime = startTime; +#endif if (result) { timeTrace("end of polling iteration %u, received %u packets, " "transmitted %u bytes", owner->iteration, numPackets, @@ -2132,6 +2126,8 @@ HomaTransport::checkTimeouts() WireFormat::opcodeSymbol(clientRpc->request.buffer), clientRpc->session->serverAddress->toString().c_str(), sequence); + timeTrace("aborting client RPC, clientId %u, sequence %u", + clientId, sequence); clientRpc->notifier->failed(); deleteClientRpc(clientRpc); continue; @@ -2233,7 +2229,12 @@ HomaTransport::checkTimeouts() // could process the RPC before the retransmitted data arrived. assert(serverRpc->sendingResponse || !serverRpc->requestComplete); if (serverRpc->silentIntervals >= timeoutIntervals) { - timeTrace("aborting %s RPC from client, clientId %u, sequence %u", + RAMCLOUD_LOG(WARNING, "aborting %s RPC from client %s, " + "sequence %lu: timeout", + WireFormat::opcodeSymbol(&serverRpc->requestPayload), + serverRpc->clientAddress->toString().c_str(), + serverRpc->rpcId.sequence); + timeTrace("aborting server RPC, clientId %u, sequence %u", serverRpc->rpcId.clientId, serverRpc->rpcId.sequence); deleteServerRpc(serverRpc); continue; @@ -2264,8 +2265,8 @@ HomaTransport::checkTimeouts() scheduledMessage->grantPriority : getUnschedTrafficPrio(accumulator->totalLength); accumulator->requestRetransmission(this, - serverRpc->response.recipient, serverRpc->rpcId, - grantOffset, priority, FROM_SERVER); + serverRpc->clientAddress, serverRpc->rpcId, grantOffset, + priority, FROM_SERVER); } } } diff --git a/src/HomaTransport.h b/src/HomaTransport.h index 1b47d2553..6dff871bb 100644 --- a/src/HomaTransport.h +++ b/src/HomaTransport.h @@ -298,9 +298,6 @@ class HomaTransport : public Transport { ClientRpc* clientRpc; ServerRpc* serverRpc; - /// Where to send the message. - const Driver::Address* recipient; - /// Offset within the message of the next byte we should transmit to /// the recipient; all preceding bytes have already been sent. uint32_t transmitOffset; @@ -331,12 +328,10 @@ class HomaTransport : public Transport { uint32_t unscheduledBytes; OutgoingMessage(ClientRpc* clientRpc, ServerRpc* serverRpc, - HomaTransport* t, Buffer* buffer, - const Driver::Address* recipient) + HomaTransport* t, Buffer* buffer) : buffer(buffer) , clientRpc(clientRpc) , serverRpc(serverRpc) - , recipient(recipient) , transmitOffset(0) , transmitPriority(0) , transmitLimit() @@ -395,7 +390,7 @@ class HomaTransport : public Transport { ClientRpc(Session* session, uint64_t sequence, Buffer* request, Buffer* response, RpcNotifier* notifier) : session(session) - , request(this, NULL, session->t, request, session->serverAddress) + , request(this, NULL, session->t, request) , response(response) , notifier(notifier) , rpcId(session->t->clientId, sequence) @@ -433,6 +428,10 @@ class HomaTransport : public Transport { /// this RPC should be removed at the server's earliest convenience. bool cancelled; + /// Address of the client to which the RPC response will be sent. + /// Not owned by this class. + const Driver::Address* clientAddress; + /// Unique identifier for this RPC. RpcId rpcId; @@ -470,12 +469,13 @@ class HomaTransport : public Transport { : t(transport) , sequence(sequence) , cancelled(false) + , clientAddress(clientAddress) , rpcId(rpcId) , silentIntervals(0) , requestComplete(false) , sendingResponse(false) , accumulator() - , response(NULL, this, transport, &replyPayload, clientAddress) + , response(NULL, this, transport, &replyPayload) , scheduledMessage() , timerLinks() , outgoingResponseLinks() @@ -739,16 +739,6 @@ class HomaTransport : public Transport { /// Maximum # bytes of message data that can fit in one packet. CONST uint32_t maxDataPerPacket; - /// Messages smaller than or equal to this many bytes are received in - /// a zero-copy fashion in their entireties, if the underlying driver - /// permits. The larger this number is set, the more hardware packet - /// buffers we will likely retain at any given time, and the more we - /// deviate from the SRPT policy. If this number is set too large, we - /// may run out of hardware packet buffers and have to stop receiving - /// packets (even worse, we can't complete any message to free up the - /// hardware packet buffers; thus, a deadlock!). - const uint32_t messageZeroCopyThreshold; - /// Maximum # bytes of a message that we consider as small. For small /// messages, the tryToTransmitData mechanism takes more time then just /// transmitting the packet. In order to be efficient on workloads with diff --git a/src/HomaTransportTest.cc b/src/HomaTransportTest.cc index 194b5e214..d00124388 100644 --- a/src/HomaTransportTest.cc +++ b/src/HomaTransportTest.cc @@ -36,7 +36,7 @@ class HomaTransportTest : public ::testing::Test { HomaTransportTest() : context(false) - , driver(new MockDriver(HomaTransport::headerToString)) + , driver(new MockDriver(&context, HomaTransport::headerToString)) , transport(&context, NULL, driver, true, 666) , address1("mock:node=1") , address2("mock:node=2") @@ -213,7 +213,7 @@ TEST_F(HomaTransportTest, getRoundTripBytes_bogusGbsOption) { EXPECT_EQ(5000u, transport.getRoundTripBytes(&locator)); EXPECT_EQ("getRoundTripBytes: Bad HomaTransport gbs option value 'xyz' " "(expected positive integer); ignoring option | getRoundTripBytes: " - "roundTripMicros 4, gBitsPerSec 10, roundTripBytes 5000", + "roundTripMicros 4, mBitsPerSec 10000, roundTripBytes 5000", TestLog::get()); ServiceLocator locator2("mock:gbs=99foo,rttMicros=4"); @@ -221,7 +221,7 @@ TEST_F(HomaTransportTest, getRoundTripBytes_bogusGbsOption) { EXPECT_EQ(5000u, transport.getRoundTripBytes(&locator2)); EXPECT_EQ("getRoundTripBytes: Bad HomaTransport gbs option value '99foo' " "(expected positive integer); ignoring option | getRoundTripBytes: " - "roundTripMicros 4, gBitsPerSec 10, roundTripBytes 5000", + "roundTripMicros 4, mBitsPerSec 10000, roundTripBytes 5000", TestLog::get()); } TEST_F(HomaTransportTest, getRoundTripBytes_noRttOption) { @@ -236,7 +236,7 @@ TEST_F(HomaTransportTest, getRoundTripBytes_bogusRttOption) { EXPECT_EQ(8000u, transport.getRoundTripBytes(&locator)); EXPECT_EQ("getRoundTripBytes: Bad HomaTransport rttMicros option value " "'xyz' (expected positive integer); ignoring option | " - "getRoundTripBytes: roundTripMicros 8, gBitsPerSec 8, " + "getRoundTripBytes: roundTripMicros 8, mBitsPerSec 8000, " "roundTripBytes 8000", TestLog::get()); ServiceLocator locator2("mock:gbs=8,rttMicros=5zzz"); @@ -244,7 +244,7 @@ TEST_F(HomaTransportTest, getRoundTripBytes_bogusRttOption) { EXPECT_EQ(8000u, transport.getRoundTripBytes(&locator2)); EXPECT_EQ("getRoundTripBytes: Bad HomaTransport rttMicros option value " "'5zzz' (expected positive integer); ignoring option | " - "getRoundTripBytes: roundTripMicros 8, gBitsPerSec 8, " + "getRoundTripBytes: roundTripMicros 8, mBitsPerSec 8000, " "roundTripBytes 8000", TestLog::get()); } TEST_F(HomaTransportTest, getRoundTripBytes_roundUpToEvenPackets) { @@ -732,7 +732,7 @@ TEST_F(HomaTransportTest, handlePacket_dataFromServer_basics) { EXPECT_EQ("abcde12345", TestUtil::toString(&wrapper.response)); EXPECT_EQ("", driver->outputLog); EXPECT_EQ(0lu, transport.outgoingRpcs.size()); - EXPECT_EQ(2u, Driver::Received::stealCount); + EXPECT_EQ(1u, Driver::Received::stealCount); } TEST_F(HomaTransportTest, handlePacket_dataFromServer_extraData) { MockWrapper wrapper("message1"); @@ -1065,6 +1065,7 @@ TEST_F(HomaTransportTest, handlePacket_dataFromClient_extraBytes) { TestUtil::toString(&serverRpc->requestPayload)); } TEST_F(HomaTransportTest, handlePacket_dataFromClient_dontIssueGrant) { + transport.maxDataPerPacket = 5; transport.roundTripBytes = 1000; transport.grantIncrement = 500; uint32_t unscheduledBytes = transport.roundTripBytes; @@ -1381,7 +1382,7 @@ TEST_F(HomaTransportTest, addPacket_basics) { EXPECT_EQ(0u, serverRpc->accumulator->fragments.size()); EXPECT_EQ("P0000P1111P2222P3333P4444", TestUtil::toString(&serverRpc->requestPayload)); - EXPECT_EQ(5u, Driver::Received::stealCount); + EXPECT_EQ(3u, Driver::Received::stealCount); } TEST_F(HomaTransportTest, addPacket_skipRedundantPacket) { // Receive two duplicate packets that contain bytes 10-14. @@ -1490,7 +1491,7 @@ TEST_F(HomaTransportTest, poll_incomingPackets) { HomaTransport::ServerRpc* serverRpc = it->second; EXPECT_EQ("0123456789ABCDE", TestUtil::toString(&serverRpc->requestPayload)); - EXPECT_EQ(3u, Driver::Received::stealCount); + EXPECT_EQ(1u, Driver::Received::stealCount); EXPECT_EQ(1, result); } TEST_F(HomaTransportTest, poll_callCheckTimeouts) { @@ -1559,8 +1560,8 @@ TEST_F(HomaTransportTest, poll_outgoingGrant) { HomaTransport::DataHeader(HomaTransport::RpcId(101, 102), 50, 0, unscheduledBytes, HomaTransport::FROM_CLIENT), "ABCDE"); int result = transport.poller.poll(); - EXPECT_EQ("GRANT FROM_SERVER, rpcId 100.101, offset 35, priority 0 | " - "GRANT FROM_SERVER, rpcId 101.102, offset 25, priority 1", + EXPECT_EQ("GRANT FROM_SERVER, rpcId 101.102, offset 25, priority 1 | " + "GRANT FROM_SERVER, rpcId 100.101, offset 35, priority 0", driver->outputLog); EXPECT_EQ(1, result); EXPECT_EQ(0u, transport.messagesToGrant.size()); @@ -1574,7 +1575,7 @@ TEST_F(HomaTransportTest, poll_outgoingPacket) { int result = transport.poller.poll(); EXPECT_EQ("ALL_DATA FROM_CLIENT, rpcId 666.1 0123456789 (+5 more)", driver->outputLog); - EXPECT_EQ(1, result); + EXPECT_GT(result, 1); } TEST_F(HomaTransportTest, checkTimeouts_clientTransmissionNotStartedYet) { driver->transmitQueueSpace = 0; @@ -1689,8 +1690,9 @@ TEST_F(HomaTransportTest, checkTimeouts_serverAbortsRequest) { EXPECT_EQ("", TestLog::get()); transport.checkTimeouts(); - EXPECT_EQ("deleteServerRpc: RpcId (100, 101)", - TestLog::get()); + EXPECT_EQ("checkTimeouts: aborting unknown(25185) RPC from client " + "mock:client=1, sequence 101: timeout | " + "deleteServerRpc: RpcId (100, 101)", TestLog::get()); } TEST_F(HomaTransportTest, checkTimeouts_sendResendFromServer) { transport.roundTripBytes = 100; diff --git a/src/InfUdDriver.cc b/src/InfUdDriver.cc index fbf3e32a0..2a998b604 100644 --- a/src/InfUdDriver.cc +++ b/src/InfUdDriver.cc @@ -80,11 +80,10 @@ namespace { */ InfUdDriver::InfUdDriver(Context* context, const ServiceLocator *sl, bool ethernet) - : context(context) + : Driver(context) , realInfiniband() , infiniband() , rxPool() - , mutex("InfUdDriver") , rxBuffersInHca(0) , rxBufferLogThreshold(0) , txPool() @@ -357,23 +356,25 @@ InfUdDriver::registerMemory(void* base, size_t bytes) * See docs in the ``Driver'' class. */ void -InfUdDriver::release(char *payload) +InfUdDriver::release() { - SpinLock::Guard guard(mutex); - - // Payload points to the first byte of the packet buffer after the - // Ethernet header or GRH header; from that, compute the address of its - // corresponding buffer descriptor. - if (localMac) { - payload -= sizeof(EthernetHeader); - } else { - payload -= GRH_SIZE; + while (!packetsToRelease.empty()) { + // Payload points to the first byte of the packet buffer after the + // Ethernet header or GRH header; from that, compute the address of its + // corresponding buffer descriptor. + char* payload = packetsToRelease.back(); + packetsToRelease.pop_back(); + if (localMac) { + payload -= sizeof(EthernetHeader); + } else { + payload -= GRH_SIZE; + } + int index = downCast((payload - rxPool->bufferMemory) + /rxPool->descriptors[0].length); + BufferDescriptor* bd = &rxPool->descriptors[index]; + assert(payload == bd->buffer); + rxPool->freeBuffers.push_back(bd); } - int index = downCast((payload - rxPool->bufferMemory) - /rxPool->descriptors[0].length); - BufferDescriptor* descriptor = &rxPool->descriptors[index]; - assert(payload == descriptor->buffer); - rxPool->freeBuffers.push_back(descriptor); } /* @@ -563,7 +564,6 @@ InfUdDriver::receivePackets(uint32_t maxPackets, continue; error: - SpinLock::Guard guard(mutex); rxPool->freeBuffers.push_back(bd); } timeTrace("InfUdDriver::receivePackets done"); @@ -592,7 +592,6 @@ InfUdDriver::getBandwidth() void InfUdDriver::refillReceiver() { - SpinLock::Guard guard(mutex); while ((rxBuffersInHca < MAX_RX_QUEUE_DEPTH) && !rxPool->freeBuffers.empty()) { BufferDescriptor* bd = rxPool->freeBuffers.back(); diff --git a/src/InfUdDriver.h b/src/InfUdDriver.h index 4974f4721..eab6c9010 100644 --- a/src/InfUdDriver.h +++ b/src/InfUdDriver.h @@ -53,7 +53,7 @@ class InfUdDriver : public Driver { virtual void receivePackets(uint32_t maxPackets, std::vector* receivedPackets); virtual void registerMemory(void* base, size_t bytes); - virtual void release(char *payload); + virtual void release(); virtual void sendPacket(const Driver::Address* addr, const void* header, uint32_t headerLen, Buffer::Iterator* payload, int priority = 0, @@ -185,9 +185,6 @@ class InfUdDriver : public Driver { // packets } __attribute__((packed)); - /// Shared RAMCloud information. - Context* context; - /// See #infiniband. Tub realInfiniband; @@ -199,12 +196,6 @@ class InfUdDriver : public Driver { /// Packet buffers used for receiving incoming packets. Tub rxPool; - /// Must be held whenever accessing rxPool.freeBuffers: it is shared - /// between worker threads (returning packet buffers when they are - /// finished) and the dispatch thread (moving buffers from there to the - /// HCA). - SpinLock mutex; - /// Number of receive buffers currently in the possession of the /// HCA. uint32_t rxBuffersInHca; diff --git a/src/Infiniband.cc b/src/Infiniband.cc index 9ca0737ae..b390eb5e7 100644 --- a/src/Infiniband.cc +++ b/src/Infiniband.cc @@ -939,6 +939,14 @@ Infiniband::Address::~Address() { // forever in the ahMap cache. } +uint64_t +Infiniband::Address::getHash() const +{ + uint64_t hash = lid; + hash = (hash << 32) + qpn; + return hash; +} + /** * Return a string describing the contents of this Address (host * address & port). diff --git a/src/Infiniband.h b/src/Infiniband.h index b265e5a60..969c1c8b7 100644 --- a/src/Infiniband.h +++ b/src/Infiniband.h @@ -252,6 +252,7 @@ class Infiniband { "' couldn't be converted to Infiniband address: " + msg) {} }; + uint64_t getHash() const; string toString() const; Address(Infiniband& infiniband, int physicalPort, diff --git a/src/IpAddress.cc b/src/IpAddress.cc index ed8efeaae..1eae67490 100644 --- a/src/IpAddress.cc +++ b/src/IpAddress.cc @@ -100,6 +100,16 @@ IpAddress::IpAddress(const uint32_t ip, const uint16_t port) addr->sin_addr.s_addr = htonl(ip); addr->sin_port = NTOHS(port); } + +uint64_t +IpAddress::getHash() const +{ + const sockaddr_in* addr = reinterpret_cast(&address); + uint64_t hash = addr->sin_addr.s_addr; + hash = (hash << 32) + addr->sin_port; + return hash; +} + /** * Return a string describing the contents of this IpAddress (host * address & port). diff --git a/src/IpAddress.h b/src/IpAddress.h index 182877ba6..45e01cfea 100644 --- a/src/IpAddress.h +++ b/src/IpAddress.h @@ -61,6 +61,7 @@ class IpAddress : public Driver::Address { explicit IpAddress(const uint32_t ip, const uint16_t port); IpAddress(const IpAddress& other) : Address(other), address(other.address) {} + uint64_t getHash() const; string toString() const; sockaddr address; private: diff --git a/src/MacAddress.cc b/src/MacAddress.cc index d4d031621..f22c1c68b 100644 --- a/src/MacAddress.cc +++ b/src/MacAddress.cc @@ -17,16 +17,6 @@ namespace RAMCloud { -/** - * Create a new address from 6 bytes. - * \param raw - * The raw bytes. - */ -MacAddress::MacAddress(const uint8_t raw[6]) -{ - memcpy(address, raw, 6); -} - /** * Create a new address from a string representation. * \param macStr @@ -47,12 +37,6 @@ MacAddress::MacAddress(const char* macStr) address[i] = downCast(bytes[i]); } -MacAddress::MacAddress(const MacAddress& other) - : Address() -{ - memcpy(address, other.address, 6); -} - /** * Generate a random MAC address. * Guaranteed to not be a multicast address and in the locally administered mac @@ -70,6 +54,18 @@ MacAddress::MacAddress(Random _) address[0] |= 0x02; } +uint64_t +MacAddress::getHash() const +{ + // The following code implements the djb2 hash function found at: + // http://www.cse.yorku.ca/~oz/hash.html. + uint64_t hash = 5381; + for (uint8_t c : address) { + hash = ((hash << 5) + hash) + c; /* hash * 33 + c */ + } + return hash; +} + inline string MacAddress::toString() const { @@ -80,18 +76,4 @@ MacAddress::toString() const return buf; } -/** - * \return - * True if the MacAddress consists of all zero bytes, false if not. - */ -bool -MacAddress::isNull() const -{ - if (address[0] == 0 && address[1] == 0 && address[2] == 0 && - address[3] == 0 && address[4] == 0 && address[5] == 0) - return true; - else - return false; -} - } // namespace RAMCloud diff --git a/src/MacAddress.h b/src/MacAddress.h index ec70439bc..1bf486bac 100644 --- a/src/MacAddress.h +++ b/src/MacAddress.h @@ -26,12 +26,29 @@ namespace RAMCloud { */ struct MacAddress : public Driver::Address { enum Random { RANDOM }; - explicit MacAddress(const uint8_t raw[6]); + + /** + * Create a new address from 6 bytes. + * \param raw + * The raw bytes. + */ + explicit MacAddress(const uint8_t raw[6]) + { + // Hand-optimized version of memcpy(address, raw, 6). + char* dst = (char*)address + 6; // NOLINT + const char* src = (const char*)raw + 6; + *((uint32_t*)(dst - 6)) = *((const uint32_t*)(src - 6)); // NOLINT + *((uint16_t*)(dst - 2)) = *((const uint16_t*)(src - 2)); // NOLINT + } + + MacAddress(const MacAddress& other) + : MacAddress(other.address) {} + explicit MacAddress(const char* macStr); explicit MacAddress(Random _); - MacAddress(const MacAddress& other); + + uint64_t getHash() const; string toString() const; - bool isNull() const; /// The raw bytes of the MAC address. uint8_t address[6]; diff --git a/src/MacAddressTest.cc b/src/MacAddressTest.cc index d54aa4deb..5dc675862 100644 --- a/src/MacAddressTest.cc +++ b/src/MacAddressTest.cc @@ -44,11 +44,4 @@ TEST(MacAddressTest, toString) { // tested sufficiently in constructor tests } -TEST(MacAddressTest, isNull) { - uint8_t raw[] = {0x0, 0x0, 0x0, 0x0, 0x0, 0x0}; - MacAddress r(MacAddress::RANDOM); - EXPECT_TRUE(MacAddress(raw).isNull()); - EXPECT_FALSE(r.isNull()); -} - } // namespace RAMCloud diff --git a/src/MockDriver.cc b/src/MockDriver.cc index 9232a67f6..eda68865d 100644 --- a/src/MockDriver.cc +++ b/src/MockDriver.cc @@ -19,29 +19,19 @@ namespace RAMCloud { -/** - * Construct a MockDriver which does not include the header in the outputLog. - */ -MockDriver::MockDriver() - : headerToString(0) - , outputLog() - , sendPacketCount(0) - , releaseCount(0) - , incomingPackets() - , transmitQueueSpace(10000) -{ -} - /** * Construct a MockDriver with a custom serializer for the opaque header in * the outputLog. * + * \param context + * RAMCloud context * \param headerToString * A pointer to a function which serializes a Header into a format * for prefixing packets in the outputLog. */ -MockDriver::MockDriver(HeaderToString headerToString) - : headerToString(headerToString) +MockDriver::MockDriver(Context* context, HeaderToString headerToString) + : Driver(context) + , headerToString(headerToString) , outputLog() , sendPacketCount(0) , releaseCount(0) @@ -57,20 +47,6 @@ MockDriver::~MockDriver() } } -/** - * Counts number of times release is called to allow unit tests to check - * that Driver resources are properly reclaimed. - * - * See Driver::release(). - */ -void -MockDriver::release(char *payload) -{ - delete reinterpret_cast(payload - OFFSET_OF( - PacketBuf, payload)); - releaseCount++; -} - /** * Counts number of times sendPacket for unit tests and logs the sent * packet to outputLog. diff --git a/src/MockDriver.h b/src/MockDriver.h index 6e1847994..d0e1a8dd8 100644 --- a/src/MockDriver.h +++ b/src/MockDriver.h @@ -41,6 +41,9 @@ class MockDriver : public Driver { bool operator==(const MockAddress& other) const { return (this->address == other.address); } + uint64_t getHash() const { + return std::hash{}(address); + } string toString() const { return address; } @@ -72,8 +75,7 @@ class MockDriver : public Driver { /// The type of a customer header serializer. See headerToString. typedef string (*HeaderToString)(const void*, uint32_t); - MockDriver(); - explicit MockDriver(HeaderToString headerToString); + explicit MockDriver(Context* context, HeaderToString headerToString); virtual ~MockDriver(); virtual int getHighestPacketPriority() { return 7; } virtual uint32_t getMaxPacketSize() { return MAX_PAYLOAD_SIZE; } @@ -84,7 +86,23 @@ class MockDriver : public Driver { #endif virtual void receivePackets(uint32_t maxPackets, std::vector* receivedPackets); - virtual void release(char *payload); + virtual void release() + { + // Nothing to do; packet bufs are directly released in returnPacket + // when testing. + assert(packetsToRelease.empty()); + } +#if TESTING + /** + * Counts number of times returnPacket is called to allow unit tests + * to check that Driver resources are properly reclaimed. + */ + virtual void returnPacket(void* payload, bool isDispatchThread) { + delete reinterpret_cast(static_cast(payload) - + OFFSET_OF(PacketBuf, payload)); + releaseCount++; + } +#endif virtual void sendPacket(const Address* addr, const void* header, uint32_t headerLen, diff --git a/src/UdpDriver.cc b/src/UdpDriver.cc index 6c1328285..b2161be6c 100644 --- a/src/UdpDriver.cc +++ b/src/UdpDriver.cc @@ -56,12 +56,12 @@ Syscall* UdpDriver::sys = &defaultSyscall; */ UdpDriver::UdpDriver(Context* context, const ServiceLocator* localServiceLocator) - : context(context) + : Driver(context) , socketFd(-1) , packetBatches() , currentBatch(0) , packetBufPool() - , mutex("UdpDriver") + , mutex("UdpDriver::packetBufPool") , locatorString("udp:") , bandwidthGbps(10) // Default bandwidth = 10 gbs , readerThread() @@ -133,8 +133,11 @@ UdpDriver::~UdpDriver() for (int batch = 0; batch < 2; batch++) { for (int i = 0; i < PacketBatch::MAX_PACKETS; i++) { if (packetBatches[batch].buffers[i] != NULL) { - release(packetBatches[batch].buffers[i]->payload); - packetBatches[batch].buffers[i] = NULL; + // No need to sync before acessing packetBufPool since we have + // joined readerThread in close(). + char* payload = packetBatches[batch].buffers[i]->payload; + packetBufPool.destroy(reinterpret_cast( + payload - OFFSET_OF(PacketBuf, payload))); } } } @@ -201,14 +204,18 @@ UdpDriver::receivePackets(uint32_t maxPackets, // See docs in Driver class. void -UdpDriver::release(char *payload) +UdpDriver::release() { SpinLock::Guard guard(mutex); - // Note: the payload is actually contained in a PacketBuf structure, - // which we return to a pool for reuse later. - packetBufPool.destroy( - reinterpret_cast(payload - OFFSET_OF(PacketBuf, payload))); + while (!packetsToRelease.empty()) { + // Note: the payload is actually contained in a PacketBuf structure, + // which we return to a pool for reuse later. + char* payload = packetsToRelease.back(); + packetsToRelease.pop_back(); + packetBufPool.destroy(reinterpret_cast( + payload - OFFSET_OF(PacketBuf, payload))); + } } // See docs in Driver class. diff --git a/src/UdpDriver.h b/src/UdpDriver.h index 684146cc4..8fd732452 100644 --- a/src/UdpDriver.h +++ b/src/UdpDriver.h @@ -46,7 +46,7 @@ class UdpDriver : public Driver { virtual uint32_t getMaxPacketSize(); virtual void receivePackets(uint32_t maxPackets, std::vector* receivedPackets); - virtual void release(char *payload); + virtual void release(); virtual void sendPacket(const Address* addr, const void* header, uint32_t headerLen, @@ -121,9 +121,6 @@ class UdpDriver : public Driver { } }; - /// Shared RAMCloud information. - Context* context; - /// File descriptor of the UDP socket this driver uses for communication. /// -1 means socket was closed because of error. int socketFd;