Skip to content

Commit

Permalink
Completely remove vestigial RX code
Browse files Browse the repository at this point in the history
  • Loading branch information
Joseph Henry committed Sep 6, 2024
1 parent 059d05f commit 8f5cc4e
Showing 1 changed file with 8 additions and 89 deletions.
97 changes: 8 additions & 89 deletions service/OneService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -996,60 +996,6 @@ class OneServiceImpl : public OneService
#endif
_node->initMultithreading(_concurrency, _cpuPinningEnabled);
bool pinning = _cpuPinningEnabled;

/*
fprintf(stderr, "Starting %d RX threads\n", _concurrency);
for (unsigned int i = 0; i < _concurrency; ++i) {
_rxPacketThreads.push_back(std::thread([this, i, pinning]() {
if (pinning) {
#if defined(__LINUX__) || defined(__FreeBSD__) // || defined(__APPLE__)
int pinCore = i % _concurrency;
fprintf(stderr, "CPU Pinning enabled. Pinning thread %d to core %d\n", i, pinCore);
pthread_t self = pthread_self();
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(pinCore, &cpuset);
#endif
#ifdef __LINUX__
int rc = pthread_setaffinity_np(self, sizeof(cpu_set_t), &cpuset);
#elif __FreeBSD__
int rc = pthread_setaffinity_np(self, sizeof(cpu_set_t), &cpuset);
#endif
#if defined(__LINUX__) || defined(__FreeBSD__) // || defined(__APPLE__)
if (rc != 0)
{
fprintf(stderr, "failed to pin rx thread %d to core %d: %s\n", i, pinCore, strerror(errno));
exit(1);
}
#endif
}
PacketRecord* packet = nullptr;
for (;;) {
if (! _rxPacketQueue.get(packet)) {
break;
}
if (! packet) {
break;
}
const ZT_ResultCode err = _node->processWirePacket(nullptr, packet->now, packet->sock, &(packet->from), packet->data, packet->size, &_nextBackgroundTaskDeadline);
{
Mutex::Lock l(_rxPacketVector_m);
_rxPacketVector.push_back(packet);
}
if (ZT_ResultCode_isFatal(err)) {
char tmp[256];
OSUtils::ztsnprintf(tmp, sizeof(tmp), "error processing packet: %d", (int)err);
Mutex::Lock _l(_termReason_m);
_termReason = ONE_UNRECOVERABLE_ERROR;
_fatalErrorMessage = tmp;
this->terminate();
break;
}
}
}));
}
*/
}

virtual ReasonForTermination run()
Expand Down Expand Up @@ -2999,42 +2945,15 @@ class OneServiceImpl : public OneService
if ((len >= 16) && (reinterpret_cast<const InetAddress*>(from)->ipScope() == InetAddress::IP_SCOPE_GLOBAL)) {
_lastDirectReceiveFromGlobal = now;
}
/*
#if defined(__LINUX__) || defined(__FreeBSD__)
if (_multicoreEnabled) {
PacketRecord* packet;
_rxPacketVector_m.lock();
if (_rxPacketVector.empty()) {
packet = new PacketRecord;
}
else {
packet = _rxPacketVector.back();
_rxPacketVector.pop_back();
}
_rxPacketVector_m.unlock();
packet->sock = reinterpret_cast<int64_t>(sock);
packet->now = now;
memcpy(&(packet->from), from, sizeof(struct sockaddr_storage));
packet->size = (unsigned int)len;
memcpy(packet->data, data, len);
_rxPacketQueue.postLimit(packet, 2048 * _concurrency);
const ZT_ResultCode rc = _node->processWirePacket(nullptr,now,reinterpret_cast<int64_t>(sock),reinterpret_cast<const struct sockaddr_storage *>(from),data,len,&_nextBackgroundTaskDeadline);
if (ZT_ResultCode_isFatal(rc)) {
char tmp[256];
OSUtils::ztsnprintf(tmp,sizeof(tmp),"fatal error code from processWirePacket: %d",(int)rc);
Mutex::Lock _l(_termReason_m);
_termReason = ONE_UNRECOVERABLE_ERROR;
_fatalErrorMessage = tmp;
this->terminate();
}
else {
#endif
*/
const ZT_ResultCode rc = _node->processWirePacket(nullptr,now,reinterpret_cast<int64_t>(sock),reinterpret_cast<const struct sockaddr_storage *>(from),data,len,&_nextBackgroundTaskDeadline);
if (ZT_ResultCode_isFatal(rc)) {
char tmp[256];
OSUtils::ztsnprintf(tmp,sizeof(tmp),"fatal error code from processWirePacket: %d",(int)rc);
Mutex::Lock _l(_termReason_m);
_termReason = ONE_UNRECOVERABLE_ERROR;
_fatalErrorMessage = tmp;
this->terminate();
}
//#if defined(__LINUX__) || defined(__FreeBSD__)
// }
//#endif
}


Expand Down

0 comments on commit 8f5cc4e

Please sign in to comment.