From 683d332abcc7a31ced0586871888fcabcec95256 Mon Sep 17 00:00:00 2001 From: Joseph Henry Date: Fri, 23 Feb 2024 09:57:39 -0800 Subject: [PATCH 01/13] Add multi-core concurrent packet processing --- osdep/BSDEthernetTap.cpp | 112 +++++++++++------ osdep/BSDEthernetTap.hpp | 4 + osdep/EthernetTap.cpp | 15 +-- osdep/EthernetTap.hpp | 1 + osdep/LinuxEthernetTap.cpp | 233 ++++++++++++++++++++--------------- osdep/LinuxEthernetTap.hpp | 8 +- osdep/MacEthernetTap.cpp | 22 +++- osdep/MacEthernetTap.hpp | 4 + osdep/MacEthernetTapAgent.c | 2 +- osdep/MacKextEthernetTap.cpp | 6 +- osdep/MacKextEthernetTap.hpp | 4 + service/OneService.cpp | 163 ++++++++++++++++++++---- 12 files changed, 392 insertions(+), 182 deletions(-) diff --git a/osdep/BSDEthernetTap.cpp b/osdep/BSDEthernetTap.cpp index b2e1a8760d..9d378b7701 100644 --- a/osdep/BSDEthernetTap.cpp +++ b/osdep/BSDEthernetTap.cpp @@ -39,7 +39,9 @@ #include #include #include +#include +#include #include #include #include @@ -53,6 +55,7 @@ #include "BSDEthernetTap.hpp" #define ZT_BASE32_CHARS "0123456789abcdefghijklmnopqrstuv" +#define ZT_TAP_BUF_SIZE (1024 * 16) // ff:ff:ff:ff:ff:ff with no ADI static const ZeroTier::MulticastGroup _blindWildcardMulticastGroup(ZeroTier::MAC(0xff),0); @@ -61,6 +64,7 @@ namespace ZeroTier { BSDEthernetTap::BSDEthernetTap( const char *homePath, + unsigned int concurrency, const MAC &mac, unsigned int mtu, unsigned int metric, @@ -69,6 +73,7 @@ BSDEthernetTap::BSDEthernetTap( void (*handler)(void *,void *,uint64_t,const MAC &,const MAC &,unsigned int,unsigned int,const void *,unsigned int), void *arg) : _handler(handler), + _concurrency(concurrency), _arg(arg), _nwid(nwid), _mtu(mtu), @@ -195,11 +200,9 @@ BSDEthernetTap::BSDEthernetTap( BSDEthernetTap::~BSDEthernetTap() { ::write(_shutdownSignalPipe[1],"\0",1); // causes thread to exit - Thread::join(_thread); ::close(_fd); ::close(_shutdownSignalPipe[0]); ::close(_shutdownSignalPipe[1]); - long cpid = (long)vfork(); if (cpid == 0) { #ifdef ZT_TRACE @@ -211,6 +214,10 @@ BSDEthernetTap::~BSDEthernetTap() int exitcode = -1; ::waitpid(cpid,&exitcode,0); } + Thread::join(_thread); + for (std::thread &t : _rxThreads) { + t.join(); + } } void BSDEthernetTap::setEnabled(bool en) @@ -418,53 +425,84 @@ void BSDEthernetTap::setMtu(unsigned int mtu) void BSDEthernetTap::threadMain() throw() { - fd_set readfds,nullfds; - MAC to,from; - int n,nfds,r; - char getBuf[ZT_MAX_MTU + 64]; + bool _enablePinning = false; + char* envvar = std::getenv("ZT_CPU_PINNING"); + if (envvar) { + int tmp = atoi(envvar); + if (tmp > 0) { + _enablePinning = true; + } + } // Wait for a moment after startup -- wait for Network to finish // constructing itself. Thread::sleep(500); - FD_ZERO(&readfds); - FD_ZERO(&nullfds); - nfds = (int)std::max(_shutdownSignalPipe[0],_fd) + 1; + for (unsigned int i = 0; i < _concurrency; ++i) { + _rxThreads.push_back(std::thread([this, i, _enablePinning] { + + if (_enablePinning) { + int pinCore = i % _concurrency; + fprintf(stderr, "pinning thread %d to core %d\n", i, pinCore); + pthread_t self = pthread_self(); + cpu_set_t cpuset; + CPU_ZERO(&cpuset); + CPU_SET(pinCore, &cpuset); + //int rc = sched_setaffinity(self, sizeof(cpu_set_t), &cpuset); + int rc = pthread_setaffinity_np(self, sizeof(cpu_set_t), &cpuset); + if (rc != 0) + { + fprintf(stderr, "failed to pin thread %d to core %d: %s\n", i, pinCore, strerror(errno)); + exit(1); + } + } - r = 0; - for(;;) { - FD_SET(_shutdownSignalPipe[0],&readfds); - FD_SET(_fd,&readfds); - select(nfds,&readfds,&nullfds,&nullfds,(struct timeval *)0); + uint8_t b[ZT_TAP_BUF_SIZE]; + MAC to, from; + fd_set readfds, nullfds; + int n, nfds, r; - if (FD_ISSET(_shutdownSignalPipe[0],&readfds)) // writes to shutdown pipe terminate thread - break; + FD_ZERO(&readfds); + FD_ZERO(&nullfds); + nfds = (int)std::max(_shutdownSignalPipe[0],_fd) + 1; + + r = 0; - if (FD_ISSET(_fd,&readfds)) { - n = (int)::read(_fd,getBuf + r,sizeof(getBuf) - r); - if (n < 0) { - if ((errno != EINTR)&&(errno != ETIMEDOUT)) + for(;;) { + FD_SET(_shutdownSignalPipe[0],&readfds); + FD_SET(_fd,&readfds); + select(nfds,&readfds,&nullfds,&nullfds,(struct timeval *)0); + + if (FD_ISSET(_shutdownSignalPipe[0],&readfds)) // writes to shutdown pipe terminate thread break; - } else { - // Some tap drivers like to send the ethernet frame and the - // payload in two chunks, so handle that by accumulating - // data until we have at least a frame. - r += n; - if (r > 14) { - if (r > ((int)_mtu + 14)) // sanity check for weird TAP behavior on some platforms - r = _mtu + 14; - - if (_enabled) { - to.setTo(getBuf,6); - from.setTo(getBuf + 6,6); - unsigned int etherType = ntohs(((const uint16_t *)getBuf)[6]); - _handler(_arg,(void *)0,_nwid,from,to,etherType,0,(const void *)(getBuf + 14),r - 14); - } - r = 0; + if (FD_ISSET(_fd,&readfds)) { + n = (int)::read(_fd,b + r,sizeof(b) - r); + if (n < 0) { + if ((errno != EINTR)&&(errno != ETIMEDOUT)) + break; + } else { + // Some tap drivers like to send the ethernet frame and the + // payload in two chunks, so handle that by accumulating + // data until we have at least a frame. + r += n; + if (r > 14) { + if (r > ((int)_mtu + 14)) // sanity check for weird TAP behavior on some platforms + r = _mtu + 14; + + if (_enabled) { + to.setTo(b,6); + from.setTo(b + 6,6); + unsigned int etherType = ntohs(((const uint16_t *)b)[6]); + _handler(_arg,(void *)0,_nwid,from,to,etherType,0,(const void *)(b + 14),r - 14); + } + + r = 0; + } + } } } - } + })); } } diff --git a/osdep/BSDEthernetTap.hpp b/osdep/BSDEthernetTap.hpp index fc4e4908e9..ecf6caf9bb 100644 --- a/osdep/BSDEthernetTap.hpp +++ b/osdep/BSDEthernetTap.hpp @@ -20,6 +20,7 @@ #include #include #include +#include #include "../node/Constants.hpp" #include "../node/MulticastGroup.hpp" @@ -34,6 +35,7 @@ class BSDEthernetTap : public EthernetTap public: BSDEthernetTap( const char *homePath, + unsigned int concurrency, const MAC &mac, unsigned int mtu, unsigned int metric, @@ -62,6 +64,7 @@ class BSDEthernetTap : public EthernetTap private: void (*_handler)(void *,void *,uint64_t,const MAC &,const MAC &,unsigned int,unsigned int,const void *,unsigned int); void *_arg; + unsigned int _concurrency; uint64_t _nwid; Thread _thread; std::string _dev; @@ -73,6 +76,7 @@ class BSDEthernetTap : public EthernetTap volatile bool _enabled; mutable std::vector _ifaddrs; mutable uint64_t _lastIfAddrsUpdate; + std::vector _rxThreads; }; } // namespace ZeroTier diff --git a/osdep/EthernetTap.cpp b/osdep/EthernetTap.cpp index 445a5fe438..d6c7bd7b22 100644 --- a/osdep/EthernetTap.cpp +++ b/osdep/EthernetTap.cpp @@ -58,6 +58,7 @@ namespace ZeroTier { std::shared_ptr EthernetTap::newInstance( const char *tapDeviceType, // OS-specific, NULL for default const char *homePath, + unsigned int concurrency, const MAC &mac, unsigned int mtu, unsigned int metric, @@ -83,16 +84,16 @@ std::shared_ptr EthernetTap::newInstance( // The "feth" virtual Ethernet device type appeared in Darwin 17.x.x. Older versions // (Sierra and earlier) must use the a kernel extension. if (strtol(osrelease,(char **)0,10) < 17) { - return std::shared_ptr(new MacKextEthernetTap(homePath,mac,mtu,metric,nwid,friendlyName,handler,arg)); + return std::shared_ptr(new MacKextEthernetTap(homePath,concurrency,mac,mtu,metric,nwid,friendlyName,handler,arg)); } else { - return std::shared_ptr(new MacEthernetTap(homePath,mac,mtu,metric,nwid,friendlyName,handler,arg)); + return std::shared_ptr(new MacEthernetTap(homePath,concurrency,mac,mtu,metric,nwid,friendlyName,handler,arg)); } } } #endif // __APPLE__ #ifdef __LINUX__ - return std::shared_ptr(new LinuxEthernetTap(homePath,mac,mtu,metric,nwid,friendlyName,handler,arg)); + return std::shared_ptr(new LinuxEthernetTap(homePath,concurrency,mac,mtu,metric,nwid,friendlyName,handler,arg)); #endif // __LINUX__ #ifdef __WINDOWS__ @@ -126,19 +127,19 @@ std::shared_ptr EthernetTap::newInstance( _comInit = true; } } - return std::shared_ptr(new WindowsEthernetTap(homePath,mac,mtu,metric,nwid,friendlyName,handler,arg)); + return std::shared_ptr(new WindowsEthernetTap(homePath,concurrency,mac,mtu,metric,nwid,friendlyName,handler,arg)); #endif // __WINDOWS__ #ifdef __FreeBSD__ - return std::shared_ptr(new BSDEthernetTap(homePath,mac,mtu,metric,nwid,friendlyName,handler,arg)); + return std::shared_ptr(new BSDEthernetTap(homePath,concurrency,mac,mtu,metric,nwid,friendlyName,handler,arg)); #endif // __FreeBSD__ #ifdef __NetBSD__ - return std::shared_ptr(new NetBSDEthernetTap(homePath,mac,mtu,metric,nwid,friendlyName,handler,arg)); + return std::shared_ptr(new NetBSDEthernetTap(homePath,concurrency,mac,mtu,metric,nwid,friendlyName,handler,arg)); #endif // __NetBSD__ #ifdef __OpenBSD__ - return std::shared_ptr(new BSDEthernetTap(homePath,mac,mtu,metric,nwid,friendlyName,handler,arg)); + return std::shared_ptr(new BSDEthernetTap(homePath,concurrency,mac,mtu,metric,nwid,friendlyName,handler,arg)); #endif // __OpenBSD__ #endif // ZT_SDK? diff --git a/osdep/EthernetTap.hpp b/osdep/EthernetTap.hpp index 893e70c340..c5e82470c7 100644 --- a/osdep/EthernetTap.hpp +++ b/osdep/EthernetTap.hpp @@ -33,6 +33,7 @@ class EthernetTap static std::shared_ptr newInstance( const char *tapDeviceType, // OS-specific, NULL for default const char *homePath, + unsigned int concurrency, const MAC &mac, unsigned int mtu, unsigned int metric, diff --git a/osdep/LinuxEthernetTap.cpp b/osdep/LinuxEthernetTap.cpp index a888db9d90..4919dbd69b 100644 --- a/osdep/LinuxEthernetTap.cpp +++ b/osdep/LinuxEthernetTap.cpp @@ -60,7 +60,7 @@ #define IFNAMSIZ 16 #endif -#define ZT_TAP_BUF_SIZE 16384 +#define ZT_TAP_BUF_SIZE (1024 * 16) // ff:ff:ff:ff:ff:ff with no ADI static const ZeroTier::MulticastGroup _blindWildcardMulticastGroup(ZeroTier::MAC(0xff),0); @@ -68,7 +68,7 @@ static const ZeroTier::MulticastGroup _blindWildcardMulticastGroup(ZeroTier::MAC namespace ZeroTier { // determine if we're running a really old linux kernel. -// Kernels in the 2.6.x series don't behave the same when bringing up +// Kernels in the 2.6.x series don't behave the same when bringing up // the tap devices. // // Returns true if the kernel major version is < 3 @@ -111,6 +111,7 @@ static void _base32_5_to_8(const uint8_t *in,char *out) LinuxEthernetTap::LinuxEthernetTap( const char *homePath, + unsigned int concurrency, const MAC &mac, unsigned int mtu, unsigned int metric, @@ -127,6 +128,7 @@ LinuxEthernetTap::LinuxEthernetTap( _fd(0), _enabled(true), _run(true), + _concurrency(concurrency), _lastIfAddrsUpdate(0) { static std::mutex s_tapCreateLock; @@ -220,135 +222,164 @@ LinuxEthernetTap::LinuxEthernetTap( (void)::pipe(_shutdownSignalPipe); - _tapReaderThread = std::thread([this]{ - uint8_t b[ZT_TAP_BUF_SIZE]; - fd_set readfds,nullfds; - int n,nfds,r; - std::vector buffers; - struct ifreq ifr; + bool _enablePinning = false; + char* envvar = std::getenv("ZT_CPU_PINNING"); + if (envvar) { + int tmp = atoi(envvar); + if (tmp > 0) { + _enablePinning = true; + } + } - memset(&ifr,0,sizeof(ifr)); - strcpy(ifr.ifr_name,_dev.c_str()); + for (unsigned int i = 0; i < _concurrency; ++i) { + _rxThreads.push_back(std::thread([this, i, _enablePinning] { + + if (_enablePinning) { + int pinCore = i % _concurrency; + fprintf(stderr, "pinning thread %d to core %d\n", i, pinCore); + pthread_t self = pthread_self(); + cpu_set_t cpuset; + CPU_ZERO(&cpuset); + CPU_SET(pinCore, &cpuset); + int rc = pthread_setaffinity_np(self, sizeof(cpu_set_t), &cpuset); + if (rc != 0) + { + fprintf(stderr, "failed to pin thread %d to core %d: %s\n", i, pinCore, strerror(errno)); + exit(1); + } + } - const int sock = socket(AF_INET,SOCK_DGRAM,0); - if (sock <= 0) - return; + uint8_t b[ZT_TAP_BUF_SIZE]; + fd_set readfds, nullfds; + int n, nfds, r; + if (i == 0) { + struct ifreq ifr; + memset(&ifr, 0, sizeof(ifr)); + strcpy(ifr.ifr_name, _dev.c_str()); + + const int sock = socket(AF_INET, SOCK_DGRAM, 0); + if (sock <= 0) + return; + + if (ioctl(sock, SIOCGIFFLAGS, (void*)&ifr) < 0) { + ::close(sock); + printf("WARNING: ioctl() failed setting up Linux tap device (bring interface up)\n"); + return; + } - if (ioctl(sock,SIOCGIFFLAGS,(void *)&ifr) < 0) { - ::close(sock); - printf("WARNING: ioctl() failed setting up Linux tap device (bring interface up)\n"); - return; - } + ifr.ifr_ifru.ifru_hwaddr.sa_family = ARPHRD_ETHER; + _mac.copyTo(ifr.ifr_ifru.ifru_hwaddr.sa_data, 6); + if (ioctl(sock, SIOCSIFHWADDR, (void*)&ifr) < 0) { + ::close(sock); + printf("WARNING: ioctl() failed setting up Linux tap device (set MAC)\n"); + return; + } - ifr.ifr_ifru.ifru_hwaddr.sa_family = ARPHRD_ETHER; - _mac.copyTo(ifr.ifr_ifru.ifru_hwaddr.sa_data,6); - if (ioctl(sock,SIOCSIFHWADDR,(void *)&ifr) < 0) { - ::close(sock); - printf("WARNING: ioctl() failed setting up Linux tap device (set MAC)\n"); - return; - } + usleep(100000); - usleep(100000); + if (isOldLinuxKernel()) { + ifr.ifr_ifru.ifru_mtu = (int)_mtu; + if (ioctl(sock, SIOCSIFMTU, (void*)&ifr) < 0) { + ::close(sock); + printf("WARNING: ioctl() failed setting up Linux tap device (set MTU)\n"); + return; + } - if (isOldLinuxKernel()) { - ifr.ifr_ifru.ifru_mtu = (int)_mtu; - if (ioctl(sock,SIOCSIFMTU,(void *)&ifr) < 0) { - ::close(sock); - printf("WARNING: ioctl() failed setting up Linux tap device (set MTU)\n"); - return; - } + usleep(100000); + } - usleep(100000); - } - - - ifr.ifr_flags |= IFF_MULTICAST; - ifr.ifr_flags |= IFF_UP; - if (ioctl(sock,SIOCSIFFLAGS,(void *)&ifr) < 0) { - ::close(sock); - printf("WARNING: ioctl() failed setting up Linux tap device (bring interface up)\n"); - return; - } + ifr.ifr_flags |= IFF_MULTICAST; + ifr.ifr_flags |= IFF_UP; + if (ioctl(sock, SIOCSIFFLAGS, (void*)&ifr) < 0) { + ::close(sock); + printf("WARNING: ioctl() failed setting up Linux tap device (bring interface up)\n"); + return; + } - usleep(100000); + usleep(100000); + + if (! isOldLinuxKernel()) { + ifr.ifr_ifru.ifru_hwaddr.sa_family = ARPHRD_ETHER; + _mac.copyTo(ifr.ifr_ifru.ifru_hwaddr.sa_data, 6); + if (ioctl(sock, SIOCSIFHWADDR, (void*)&ifr) < 0) { + ::close(sock); + printf("WARNING: ioctl() failed setting up Linux tap device (set MAC)\n"); + return; + } + + ifr.ifr_ifru.ifru_mtu = (int)_mtu; + if (ioctl(sock, SIOCSIFMTU, (void*)&ifr) < 0) { + ::close(sock); + printf("WARNING: ioctl() failed setting up Linux tap device (set MTU)\n"); + return; + } + } + + fcntl(_fd, F_SETFL, O_NONBLOCK); - if (!isOldLinuxKernel()) { - ifr.ifr_ifru.ifru_hwaddr.sa_family = ARPHRD_ETHER; - _mac.copyTo(ifr.ifr_ifru.ifru_hwaddr.sa_data,6); - if (ioctl(sock,SIOCSIFHWADDR,(void *)&ifr) < 0) { ::close(sock); - printf("WARNING: ioctl() failed setting up Linux tap device (set MAC)\n"); - return; } - ifr.ifr_ifru.ifru_mtu = (int)_mtu; - if (ioctl(sock,SIOCSIFMTU,(void *)&ifr) < 0) { - ::close(sock); - printf("WARNING: ioctl() failed setting up Linux tap device (set MTU)\n"); + if (! _run) { return; } - } - fcntl(_fd,F_SETFL,O_NONBLOCK); - - ::close(sock); - - if (!_run) - return; - - FD_ZERO(&readfds); - FD_ZERO(&nullfds); - nfds = (int)std::max(_shutdownSignalPipe[0],_fd) + 1; - - r = 0; - for(;;) { - FD_SET(_shutdownSignalPipe[0],&readfds); - FD_SET(_fd,&readfds); - select(nfds,&readfds,&nullfds,&nullfds,(struct timeval *)0); - - if (FD_ISSET(_shutdownSignalPipe[0],&readfds)) - break; - - if (FD_ISSET(_fd,&readfds)) { - for(;;) { // read until there are no more packets, then return to outer select() loop - n = (int)::read(_fd,b + r,ZT_TAP_BUF_SIZE - r); - if (n > 0) { - // Some tap drivers like to send the ethernet frame and the - // payload in two chunks, so handle that by accumulating - // data until we have at least a frame. - r += n; - if (r > 14) { - if (r > ((int)_mtu + 14)) // sanity check for weird TAP behavior on some platforms - r = _mtu + 14; - - if (_enabled) { - //_tapq.post(std::pair(buf,r)); - //buf = nullptr; - MAC to(b, 6),from(b + 6, 6); - unsigned int etherType = Utils::ntoh(((const uint16_t *)b)[6]); - _handler(_arg, nullptr, _nwid, from, to, etherType, 0, (const void *)(b + 14),(unsigned int)(r - 14)); - } + FD_ZERO(&readfds); + FD_ZERO(&nullfds); + nfds = (int)std::max(_shutdownSignalPipe[0], _fd) + 1; + + r = 0; + for (;;) { + FD_SET(_shutdownSignalPipe[0], &readfds); + FD_SET(_fd, &readfds); + select(nfds, &readfds, &nullfds, &nullfds, (struct timeval*)0); + if (FD_ISSET(_shutdownSignalPipe[0], &readfds)) { + break; + } + if (FD_ISSET(_fd, &readfds)) { + for (;;) { + // read until there are no more packets, then return to outer select() loop + n = (int)::read(_fd, b + r, ZT_TAP_BUF_SIZE - r); + if (n > 0) { + // Some tap drivers like to send the ethernet frame and the + // payload in two chunks, so handle that by accumulating + // data until we have at least a frame. + r += n; + if (r > 14) { + if (r > ((int)_mtu + 14)) // sanity check for weird TAP behavior on some platforms + r = _mtu + 14; + + if (_enabled) { + MAC to(b, 6), from(b + 6, 6); + unsigned int etherType = Utils::ntoh(((const uint16_t*)b)[6]); + _handler(_arg, nullptr, _nwid, from, to, etherType, 0, (const void*)(b + 14), (unsigned int)(r - 14)); + } + + r = 0; + } + } + else { r = 0; + break; } - } else { - r = 0; - break; } } } - } - }); + })); + } } LinuxEthernetTap::~LinuxEthernetTap() { _run = false; (void)::write(_shutdownSignalPipe[1],"\0",1); - _tapReaderThread.join(); ::close(_fd); ::close(_shutdownSignalPipe[0]); ::close(_shutdownSignalPipe[1]); + for (std::thread &t : _rxThreads) { + t.join(); + } } void LinuxEthernetTap::setEnabled(bool en) diff --git a/osdep/LinuxEthernetTap.hpp b/osdep/LinuxEthernetTap.hpp index 424a6d37e1..b694b277c7 100644 --- a/osdep/LinuxEthernetTap.hpp +++ b/osdep/LinuxEthernetTap.hpp @@ -26,6 +26,7 @@ #include #include "../node/MulticastGroup.hpp" #include "EthernetTap.hpp" +#include "BlockingQueue.hpp" namespace ZeroTier { @@ -34,6 +35,7 @@ class LinuxEthernetTap : public EthernetTap public: LinuxEthernetTap( const char *homePath, + unsigned int _concurrency, const MAC &mac, unsigned int mtu, unsigned int metric, @@ -57,9 +59,6 @@ class LinuxEthernetTap : public EthernetTap virtual void setMtu(unsigned int mtu); virtual void setDns(const char *domain, const std::vector &servers) {} - - - private: void (*_handler)(void *,void *,uint64_t,const MAC &,const MAC &,unsigned int,unsigned int,const void *,unsigned int); void *_arg; @@ -69,13 +68,14 @@ class LinuxEthernetTap : public EthernetTap std::string _dev; std::vector _multicastGroups; unsigned int _mtu; + unsigned int _concurrency; int _fd; int _shutdownSignalPipe[2]; std::atomic_bool _enabled; std::atomic_bool _run; - std::thread _tapReaderThread; mutable std::vector _ifaddrs; mutable uint64_t _lastIfAddrsUpdate; + std::vector _rxThreads; }; } // namespace ZeroTier diff --git a/osdep/MacEthernetTap.cpp b/osdep/MacEthernetTap.cpp index 37f27f87a2..8d15b3b208 100644 --- a/osdep/MacEthernetTap.cpp +++ b/osdep/MacEthernetTap.cpp @@ -69,6 +69,7 @@ static bool fethMaxMtuAdjusted = false; MacEthernetTap::MacEthernetTap( const char *homePath, + unsigned int concurrency, const MAC &mac, unsigned int mtu, unsigned int metric, @@ -77,6 +78,7 @@ MacEthernetTap::MacEthernetTap( void (*handler)(void *,void *,uint64_t,const MAC &,const MAC &,unsigned int,unsigned int,const void *data,unsigned int len), void *arg) : _handler(handler), + _concurrency(concurrency), _arg(arg), _nwid(nwid), _homePath(homePath), @@ -286,6 +288,9 @@ MacEthernetTap::~MacEthernetTap() } Thread::join(_thread); + for (std::thread &t : _rxThreads) { + t.join(); + } } void MacEthernetTap::setEnabled(bool en) { _enabled = en; } @@ -474,17 +479,25 @@ void MacEthernetTap::setMtu(unsigned int mtu) void MacEthernetTap::threadMain() throw() { + Thread::sleep(250); + + for (unsigned int i = 0; i < _concurrency; ++i) { + _rxThreads.push_back(std::thread([this, i] { + + fprintf(stderr, "starting thread %d\n", i); + char agentReadBuf[ZT_MACETHERNETTAP_AGENT_READ_BUF_SIZE]; char agentStderrBuf[256]; fd_set readfds,nullfds; MAC to,from; - Thread::sleep(250); - const int nfds = std::max(std::max(_shutdownSignalPipe[0],_agentStdout),_agentStderr) + 1; long agentReadPtr = 0; - fcntl(_agentStdout,F_SETFL,fcntl(_agentStdout,F_GETFL)|O_NONBLOCK); - fcntl(_agentStderr,F_SETFL,fcntl(_agentStderr,F_GETFL)|O_NONBLOCK); + + if (i == 0) { + fcntl(_agentStdout,F_SETFL,fcntl(_agentStdout,F_GETFL)|O_NONBLOCK); + fcntl(_agentStderr,F_SETFL,fcntl(_agentStderr,F_GETFL)|O_NONBLOCK); + } FD_ZERO(&readfds); FD_ZERO(&nullfds); @@ -533,6 +546,7 @@ void MacEthernetTap::threadMain() */ } } + }));} ::close(_agentStdin); ::close(_agentStdout); diff --git a/osdep/MacEthernetTap.hpp b/osdep/MacEthernetTap.hpp index 8ba378022b..0bb78a79f1 100644 --- a/osdep/MacEthernetTap.hpp +++ b/osdep/MacEthernetTap.hpp @@ -28,6 +28,7 @@ #include #include #include +#include namespace ZeroTier { @@ -36,6 +37,7 @@ class MacEthernetTap : public EthernetTap public: MacEthernetTap( const char *homePath, + unsigned int concurrency, const MAC &mac, unsigned int mtu, unsigned int metric, @@ -67,6 +69,7 @@ class MacEthernetTap : public EthernetTap uint64_t _nwid; Thread _thread; std::string _homePath; + unsigned int _concurrency; std::string _dev; std::vector _multicastGroups; Mutex _putLock; @@ -79,6 +82,7 @@ class MacEthernetTap : public EthernetTap volatile bool _enabled; mutable std::vector _ifaddrs; mutable uint64_t _lastIfAddrsUpdate; + std::vector _rxThreads; }; diff --git a/osdep/MacEthernetTapAgent.c b/osdep/MacEthernetTapAgent.c index ca75ed0542..6a0dbaf851 100644 --- a/osdep/MacEthernetTapAgent.c +++ b/osdep/MacEthernetTapAgent.c @@ -32,7 +32,7 @@ * All this stuff is basically undocumented. A lot of tracing through * the Darwin/XNU kernel source was required to figure out how to make * this actually work. - * + * * We hope to develop a DriverKit-based driver in the near-mid future to * replace this weird hack, but it works for now through Big Sur in our * testing. diff --git a/osdep/MacKextEthernetTap.cpp b/osdep/MacKextEthernetTap.cpp index fce0c121de..7ac0dac25b 100644 --- a/osdep/MacKextEthernetTap.cpp +++ b/osdep/MacKextEthernetTap.cpp @@ -306,6 +306,7 @@ static Mutex globalTapCreateLock; MacKextEthernetTap::MacKextEthernetTap( const char *homePath, + unsigned int concurrency, const MAC &mac, unsigned int mtu, unsigned int metric, @@ -317,6 +318,7 @@ MacKextEthernetTap::MacKextEthernetTap( _arg(arg), _nwid(nwid), _homePath(homePath), + _concurrency(concurrency), _mtu(mtu), _metric(metric), _fd(0), @@ -447,7 +449,9 @@ MacKextEthernetTap::~MacKextEthernetTap() ::write(_shutdownSignalPipe[1],"\0",1); // causes thread to exit Thread::join(_thread); - + for (std::thread &t : _rxThreads) { + t.join(); + } ::close(_fd); ::close(_shutdownSignalPipe[0]); ::close(_shutdownSignalPipe[1]); diff --git a/osdep/MacKextEthernetTap.hpp b/osdep/MacKextEthernetTap.hpp index 4c61c28435..6d85398744 100644 --- a/osdep/MacKextEthernetTap.hpp +++ b/osdep/MacKextEthernetTap.hpp @@ -20,6 +20,7 @@ #include #include #include +#include #include "../node/Constants.hpp" #include "../node/MAC.hpp" @@ -36,6 +37,7 @@ class MacKextEthernetTap : public EthernetTap public: MacKextEthernetTap( const char *homePath, + unsigned int concurrency, const MAC &mac, unsigned int mtu, unsigned int metric, @@ -70,11 +72,13 @@ class MacKextEthernetTap : public EthernetTap std::string _homePath; std::string _dev; std::vector _multicastGroups; + unsigned int _concurrency; unsigned int _mtu; unsigned int _metric; int _fd; int _shutdownSignalPipe[2]; volatile bool _enabled; + std::vector _rxThreads; }; } // namespace ZeroTier diff --git a/service/OneService.cpp b/service/OneService.cpp index fcebe46777..9d737a4135 100644 --- a/service/OneService.cpp +++ b/service/OneService.cpp @@ -16,7 +16,6 @@ #include #include #include - #include #include #include @@ -26,6 +25,11 @@ #include #include +#ifdef __FreeBSD__ +#include +#include +#endif + #include "../version.h" #include "../include/ZeroTierOne.h" @@ -758,7 +762,7 @@ struct TcpConnection Mutex writeq_m; }; -struct OneServiceIncomingPacket +struct PacketRecord { uint64_t now; int64_t sock; @@ -785,14 +789,20 @@ class OneServiceImpl : public OneService SoftwareUpdater *_updater; bool _updateAutoApply; - httplib::Server _controlPlane; + httplib::Server _controlPlane; httplib::Server _controlPlaneV6; - std::thread _serverThread; + std::thread _serverThread; std::thread _serverThreadV6; bool _serverThreadRunning; bool _serverThreadRunningV6; - bool _allowTcpFallbackRelay; + unsigned int _rxThreadCount; + BlockingQueue _rxPacketQueue; + std::vector _rxPacketVector; + std::vector _rxPacketThreads; + Mutex _rxPacketVector_m,_rxPacketThreads_m; + + bool _allowTcpFallbackRelay; bool _forceTcpRelay; bool _allowSecondaryPort; @@ -842,8 +852,6 @@ class OneServiceImpl : public OneService // Deadline for the next background task service function volatile int64_t _nextBackgroundTaskDeadline; - - std::map _nets; Mutex _nets_m; @@ -890,9 +898,9 @@ class OneServiceImpl : public OneService ,_node((Node *)0) ,_updater((SoftwareUpdater *)0) ,_updateAutoApply(false) - ,_controlPlane() + ,_controlPlane() ,_controlPlaneV6() - ,_serverThread() + ,_serverThread() ,_serverThreadV6() ,_serverThreadRunning(false) ,_serverThreadRunningV6(false) @@ -926,9 +934,79 @@ class OneServiceImpl : public OneService _ports[1] = 0; _ports[2] = 0; - prometheus::simpleapi::saver.set_registry(prometheus::simpleapi::registry_ptr); - prometheus::simpleapi::saver.set_delay(std::chrono::seconds(5)); - prometheus::simpleapi::saver.set_out_file(_homePath + ZT_PATH_SEPARATOR + "metrics.prom"); + bool _enablePinning = false; + char* pinningVar = std::getenv("ZT_CPU_PINNING"); + if (pinningVar) { + int tmp = atoi(pinningVar); + if (tmp > 0) { + _enablePinning = true; + } + } + char* concurrencyVar = std::getenv("ZT_PACKET_PROCESSING_CONCURRENCY"); + if (concurrencyVar) { + int tmp = atoi(concurrencyVar); + if (tmp > 0) { + _rxThreadCount = tmp; + } + else { + _rxThreadCount = std::thread::hardware_concurrency(); + } + } + else { + _rxThreadCount = std::thread::hardware_concurrency(); + } + for (unsigned int i = 0; i < _rxThreadCount; ++i) { + _rxPacketThreads.push_back(std::thread([this, i]() { + +#if defined(__LINUX__) || defined(__FreeBSD__) /* || defined(__APPLE__) */ + int pinCore = i % _rxThreadCount; + fprintf(stderr, "pinning thread %d to core %d\n", i, pinCore); + pthread_t self = pthread_self(); + cpu_set_t cpuset; + CPU_ZERO(&cpuset); + CPU_SET(pinCore, &cpuset); +#endif +#ifdef __LINUX__ + int rc = pthread_setaffinity_np(self, sizeof(cpu_set_t), &cpuset); +#elif __FreeBSD__ + int rc = pthread_setaffinity_np(self, sizeof(cpu_set_t), &cpuset); +#endif +#if defined(__LINUX__) || defined(__FreeBSD__) /* || defined(__APPLE__) */ + if (rc != 0) + { + fprintf(stderr, "failed to pin thread %d to core %d: %s\n", i, pinCore, strerror(errno)); + exit(1); + } +#endif + PacketRecord* packet = nullptr; + for (;;) { + if (! _rxPacketQueue.get(packet)) { + break; + } + if (! packet) { + break; + } + const ZT_ResultCode err = _node->processWirePacket(nullptr, packet->now, packet->sock, &(packet->from), packet->data, packet->size, &_nextBackgroundTaskDeadline); + { + Mutex::Lock l(_rxPacketVector_m); + _rxPacketVector.push_back(packet); + } + if (ZT_ResultCode_isFatal(err)) { + char tmp[256]; + OSUtils::ztsnprintf(tmp, sizeof(tmp), "error processing packet: %d", (int)err); + Mutex::Lock _l(_termReason_m); + _termReason = ONE_UNRECOVERABLE_ERROR; + _fatalErrorMessage = tmp; + this->terminate(); + break; + } + } + })); + } + + prometheus::simpleapi::saver.set_registry(prometheus::simpleapi::registry_ptr); + prometheus::simpleapi::saver.set_delay(std::chrono::seconds(5)); + prometheus::simpleapi::saver.set_out_file(_homePath + ZT_PATH_SEPARATOR + "metrics.prom"); #if ZT_VAULT_SUPPORT curl_global_init(CURL_GLOBAL_DEFAULT); @@ -940,20 +1018,34 @@ class OneServiceImpl : public OneService #ifdef __WINDOWS__ WinFWHelper::removeICMPRules(); #endif + + _rxPacketQueue.stop(); + _rxPacketThreads_m.lock(); + for(auto t=_rxPacketThreads.begin();t!=_rxPacketThreads.end();++t) { + t->join(); + } + _rxPacketThreads_m.unlock(); _binder.closeAll(_phy); #if ZT_VAULT_SUPPORT curl_global_cleanup(); #endif - _controlPlane.stop(); + _controlPlane.stop(); if (_serverThreadRunning) { - _serverThread.join(); + _serverThread.join(); } _controlPlaneV6.stop(); if (_serverThreadRunningV6) { _serverThreadV6.join(); } + _rxPacketVector_m.lock(); + while (!_rxPacketVector.empty()) { + delete _rxPacketVector.back(); + _rxPacketVector.pop_back(); + } + _rxPacketVector_m.unlock(); + #ifdef ZT_USE_MINIUPNPC delete _portMapper; @@ -1270,6 +1362,9 @@ class OneServiceImpl : public OneService const unsigned long delay = (dl > now) ? (unsigned long)(dl - now) : 500; clockShouldBe = now + (int64_t)delay; _phy.poll(delay); + + + } } catch (std::exception &e) { Mutex::Lock _l(_termReason_m); @@ -2756,25 +2851,37 @@ class OneServiceImpl : public OneService // Handlers for Node and Phy<> callbacks // ========================================================================= - inline void phyOnDatagram(PhySocket *sock,void **uptr,const struct sockaddr *localAddr,const struct sockaddr *from,void *data,unsigned long len) + + + + inline void phyOnDatagram(PhySocket* sock, void** uptr, const struct sockaddr* localAddr, const struct sockaddr* from, void* data, unsigned long len) { if (_forceTcpRelay) { return; } - Metrics::udp_recv += len; + Metrics::udp_recv += len; const uint64_t now = OSUtils::now(); - if ((len >= 16)&&(reinterpret_cast(from)->ipScope() == InetAddress::IP_SCOPE_GLOBAL)) { + if ((len >= 16) && (reinterpret_cast(from)->ipScope() == InetAddress::IP_SCOPE_GLOBAL)) { _lastDirectReceiveFromGlobal = now; - } - const ZT_ResultCode rc = _node->processWirePacket(nullptr,now,reinterpret_cast(sock),reinterpret_cast(from),data,len,&_nextBackgroundTaskDeadline); - if (ZT_ResultCode_isFatal(rc)) { - char tmp[256]; - OSUtils::ztsnprintf(tmp,sizeof(tmp),"fatal error code from processWirePacket: %d",(int)rc); - Mutex::Lock _l(_termReason_m); - _termReason = ONE_UNRECOVERABLE_ERROR; - _fatalErrorMessage = tmp; - this->terminate(); } + + PacketRecord* packet; + _rxPacketVector_m.lock(); + if (_rxPacketVector.empty()) { + packet = new PacketRecord; + } + else { + packet = _rxPacketVector.back(); + _rxPacketVector.pop_back(); + } + _rxPacketVector_m.unlock(); + + packet->sock = reinterpret_cast(sock); + packet->now = now; + memcpy(&(packet->from), from, sizeof(struct sockaddr_storage)); + packet->size = (unsigned int)len; + memcpy(packet->data, data, len); + _rxPacketQueue.postLimit(packet, 256 * _rxThreadCount); } inline void phyOnTcpConnect(PhySocket *sock,void **uptr,bool success) @@ -2996,6 +3103,7 @@ class OneServiceImpl : public OneService n.setTap(EthernetTap::newInstance( nullptr, _homePath.c_str(), + _rxThreadCount, MAC(nwc->mac), nwc->mtu, (unsigned int)ZT_IF_METRIC, @@ -3509,8 +3617,9 @@ class OneServiceImpl : public OneService inline void nodeVirtualNetworkFrameFunction(uint64_t nwid,void **nuptr,uint64_t sourceMac,uint64_t destMac,unsigned int etherType,unsigned int vlanId,const void *data,unsigned int len) { NetworkState *n = reinterpret_cast(*nuptr); - if ((!n)||(!n->tap())) + if ((!n)||(!n->tap())) { return; + } n->tap()->put(MAC(sourceMac),MAC(destMac),etherType,data,len); } From 64634c916c0c8494929b0f0f3a6eec5414199c88 Mon Sep 17 00:00:00 2001 From: Joseph Henry Date: Wed, 3 Jul 2024 08:49:07 -0700 Subject: [PATCH 02/13] Fix build for macOS, tune to prevent packet re-ordering --- osdep/EthernetTap.cpp | 4 +- osdep/LinuxEthernetTap.cpp | 2 +- osdep/MacEthernetTap.cpp | 22 +---- osdep/MacEthernetTap.hpp | 4 - osdep/MacKextEthernetTap.cpp | 2 - osdep/MacKextEthernetTap.hpp | 2 - service/OneService.cpp | 162 ++++++++++++++++++++--------------- 7 files changed, 102 insertions(+), 96 deletions(-) diff --git a/osdep/EthernetTap.cpp b/osdep/EthernetTap.cpp index d6c7bd7b22..95ce54b0a4 100644 --- a/osdep/EthernetTap.cpp +++ b/osdep/EthernetTap.cpp @@ -84,9 +84,9 @@ std::shared_ptr EthernetTap::newInstance( // The "feth" virtual Ethernet device type appeared in Darwin 17.x.x. Older versions // (Sierra and earlier) must use the a kernel extension. if (strtol(osrelease,(char **)0,10) < 17) { - return std::shared_ptr(new MacKextEthernetTap(homePath,concurrency,mac,mtu,metric,nwid,friendlyName,handler,arg)); + return std::shared_ptr(new MacKextEthernetTap(homePath,mac,mtu,metric,nwid,friendlyName,handler,arg)); } else { - return std::shared_ptr(new MacEthernetTap(homePath,concurrency,mac,mtu,metric,nwid,friendlyName,handler,arg)); + return std::shared_ptr(new MacEthernetTap(homePath,mac,mtu,metric,nwid,friendlyName,handler,arg)); } } } diff --git a/osdep/LinuxEthernetTap.cpp b/osdep/LinuxEthernetTap.cpp index 4919dbd69b..81ebedc11e 100644 --- a/osdep/LinuxEthernetTap.cpp +++ b/osdep/LinuxEthernetTap.cpp @@ -223,7 +223,7 @@ LinuxEthernetTap::LinuxEthernetTap( (void)::pipe(_shutdownSignalPipe); bool _enablePinning = false; - char* envvar = std::getenv("ZT_CPU_PINNING"); + char* envvar = std::getenv("ZT_CORE_PINNING"); if (envvar) { int tmp = atoi(envvar); if (tmp > 0) { diff --git a/osdep/MacEthernetTap.cpp b/osdep/MacEthernetTap.cpp index 8d15b3b208..37f27f87a2 100644 --- a/osdep/MacEthernetTap.cpp +++ b/osdep/MacEthernetTap.cpp @@ -69,7 +69,6 @@ static bool fethMaxMtuAdjusted = false; MacEthernetTap::MacEthernetTap( const char *homePath, - unsigned int concurrency, const MAC &mac, unsigned int mtu, unsigned int metric, @@ -78,7 +77,6 @@ MacEthernetTap::MacEthernetTap( void (*handler)(void *,void *,uint64_t,const MAC &,const MAC &,unsigned int,unsigned int,const void *data,unsigned int len), void *arg) : _handler(handler), - _concurrency(concurrency), _arg(arg), _nwid(nwid), _homePath(homePath), @@ -288,9 +286,6 @@ MacEthernetTap::~MacEthernetTap() } Thread::join(_thread); - for (std::thread &t : _rxThreads) { - t.join(); - } } void MacEthernetTap::setEnabled(bool en) { _enabled = en; } @@ -479,25 +474,17 @@ void MacEthernetTap::setMtu(unsigned int mtu) void MacEthernetTap::threadMain() throw() { - Thread::sleep(250); - - for (unsigned int i = 0; i < _concurrency; ++i) { - _rxThreads.push_back(std::thread([this, i] { - - fprintf(stderr, "starting thread %d\n", i); - char agentReadBuf[ZT_MACETHERNETTAP_AGENT_READ_BUF_SIZE]; char agentStderrBuf[256]; fd_set readfds,nullfds; MAC to,from; + Thread::sleep(250); + const int nfds = std::max(std::max(_shutdownSignalPipe[0],_agentStdout),_agentStderr) + 1; long agentReadPtr = 0; - - if (i == 0) { - fcntl(_agentStdout,F_SETFL,fcntl(_agentStdout,F_GETFL)|O_NONBLOCK); - fcntl(_agentStderr,F_SETFL,fcntl(_agentStderr,F_GETFL)|O_NONBLOCK); - } + fcntl(_agentStdout,F_SETFL,fcntl(_agentStdout,F_GETFL)|O_NONBLOCK); + fcntl(_agentStderr,F_SETFL,fcntl(_agentStderr,F_GETFL)|O_NONBLOCK); FD_ZERO(&readfds); FD_ZERO(&nullfds); @@ -546,7 +533,6 @@ void MacEthernetTap::threadMain() */ } } - }));} ::close(_agentStdin); ::close(_agentStdout); diff --git a/osdep/MacEthernetTap.hpp b/osdep/MacEthernetTap.hpp index 0bb78a79f1..8ba378022b 100644 --- a/osdep/MacEthernetTap.hpp +++ b/osdep/MacEthernetTap.hpp @@ -28,7 +28,6 @@ #include #include #include -#include namespace ZeroTier { @@ -37,7 +36,6 @@ class MacEthernetTap : public EthernetTap public: MacEthernetTap( const char *homePath, - unsigned int concurrency, const MAC &mac, unsigned int mtu, unsigned int metric, @@ -69,7 +67,6 @@ class MacEthernetTap : public EthernetTap uint64_t _nwid; Thread _thread; std::string _homePath; - unsigned int _concurrency; std::string _dev; std::vector _multicastGroups; Mutex _putLock; @@ -82,7 +79,6 @@ class MacEthernetTap : public EthernetTap volatile bool _enabled; mutable std::vector _ifaddrs; mutable uint64_t _lastIfAddrsUpdate; - std::vector _rxThreads; }; diff --git a/osdep/MacKextEthernetTap.cpp b/osdep/MacKextEthernetTap.cpp index 7ac0dac25b..69e97050dc 100644 --- a/osdep/MacKextEthernetTap.cpp +++ b/osdep/MacKextEthernetTap.cpp @@ -306,7 +306,6 @@ static Mutex globalTapCreateLock; MacKextEthernetTap::MacKextEthernetTap( const char *homePath, - unsigned int concurrency, const MAC &mac, unsigned int mtu, unsigned int metric, @@ -318,7 +317,6 @@ MacKextEthernetTap::MacKextEthernetTap( _arg(arg), _nwid(nwid), _homePath(homePath), - _concurrency(concurrency), _mtu(mtu), _metric(metric), _fd(0), diff --git a/osdep/MacKextEthernetTap.hpp b/osdep/MacKextEthernetTap.hpp index 6d85398744..a0cc1b81ac 100644 --- a/osdep/MacKextEthernetTap.hpp +++ b/osdep/MacKextEthernetTap.hpp @@ -37,7 +37,6 @@ class MacKextEthernetTap : public EthernetTap public: MacKextEthernetTap( const char *homePath, - unsigned int concurrency, const MAC &mac, unsigned int mtu, unsigned int metric, @@ -72,7 +71,6 @@ class MacKextEthernetTap : public EthernetTap std::string _homePath; std::string _dev; std::vector _multicastGroups; - unsigned int _concurrency; unsigned int _mtu; unsigned int _metric; int _fd; diff --git a/service/OneService.cpp b/service/OneService.cpp index 9d737a4135..0287276bc1 100644 --- a/service/OneService.cpp +++ b/service/OneService.cpp @@ -801,6 +801,7 @@ class OneServiceImpl : public OneService std::vector _rxPacketVector; std::vector _rxPacketThreads; Mutex _rxPacketVector_m,_rxPacketThreads_m; + bool _enableMulticore; bool _allowTcpFallbackRelay; bool _forceTcpRelay; @@ -934,74 +935,87 @@ class OneServiceImpl : public OneService _ports[1] = 0; _ports[2] = 0; - bool _enablePinning = false; - char* pinningVar = std::getenv("ZT_CPU_PINNING"); - if (pinningVar) { - int tmp = atoi(pinningVar); + _enableMulticore = false; + char* multicoreVar = std::getenv("ZT_ENABLE_MULTICORE"); + if (multicoreVar) { + int tmp = atoi(multicoreVar); if (tmp > 0) { - _enablePinning = true; + _enableMulticore = true; } } - char* concurrencyVar = std::getenv("ZT_PACKET_PROCESSING_CONCURRENCY"); - if (concurrencyVar) { - int tmp = atoi(concurrencyVar); - if (tmp > 0) { - _rxThreadCount = tmp; + if (_enableMulticore) { + bool _enablePinning = false; + char* pinningVar = std::getenv("ZT_CORE_PINNING"); + if (pinningVar) { + int tmp = atoi(pinningVar); + if (tmp > 0) { + _enablePinning = true; + } + } + char* concurrencyVar = std::getenv("ZT_CONCURRENCY"); + if (concurrencyVar) { + int tmp = atoi(concurrencyVar); + if (tmp > 0) { + _rxThreadCount = tmp; + } + else { + _rxThreadCount = std::thread::hardware_concurrency() >= 4 ? 2 : 1; + } } else { - _rxThreadCount = std::thread::hardware_concurrency(); + _rxThreadCount = std::thread::hardware_concurrency() >= 4 ? 2 : 1; } - } - else { - _rxThreadCount = std::thread::hardware_concurrency(); - } - for (unsigned int i = 0; i < _rxThreadCount; ++i) { - _rxPacketThreads.push_back(std::thread([this, i]() { + fprintf(stderr, "using %d rx threads\n", _rxThreadCount); + for (unsigned int i = 0; i < _rxThreadCount; ++i) { + _rxPacketThreads.push_back(std::thread([this, i, _enablePinning]() { + if (_enablePinning) { #if defined(__LINUX__) || defined(__FreeBSD__) /* || defined(__APPLE__) */ - int pinCore = i % _rxThreadCount; - fprintf(stderr, "pinning thread %d to core %d\n", i, pinCore); - pthread_t self = pthread_self(); - cpu_set_t cpuset; - CPU_ZERO(&cpuset); - CPU_SET(pinCore, &cpuset); + int pinCore = i % _rxThreadCount; + fprintf(stderr, "pinning thread %d to core %d\n", i, pinCore); + pthread_t self = pthread_self(); + cpu_set_t cpuset; + CPU_ZERO(&cpuset); + CPU_SET(pinCore, &cpuset); #endif #ifdef __LINUX__ - int rc = pthread_setaffinity_np(self, sizeof(cpu_set_t), &cpuset); + int rc = pthread_setaffinity_np(self, sizeof(cpu_set_t), &cpuset); #elif __FreeBSD__ - int rc = pthread_setaffinity_np(self, sizeof(cpu_set_t), &cpuset); + int rc = pthread_setaffinity_np(self, sizeof(cpu_set_t), &cpuset); #endif #if defined(__LINUX__) || defined(__FreeBSD__) /* || defined(__APPLE__) */ - if (rc != 0) - { - fprintf(stderr, "failed to pin thread %d to core %d: %s\n", i, pinCore, strerror(errno)); - exit(1); - } -#endif - PacketRecord* packet = nullptr; - for (;;) { - if (! _rxPacketQueue.get(packet)) { - break; - } - if (! packet) { - break; - } - const ZT_ResultCode err = _node->processWirePacket(nullptr, packet->now, packet->sock, &(packet->from), packet->data, packet->size, &_nextBackgroundTaskDeadline); + if (rc != 0) { - Mutex::Lock l(_rxPacketVector_m); - _rxPacketVector.push_back(packet); - } - if (ZT_ResultCode_isFatal(err)) { - char tmp[256]; - OSUtils::ztsnprintf(tmp, sizeof(tmp), "error processing packet: %d", (int)err); - Mutex::Lock _l(_termReason_m); - _termReason = ONE_UNRECOVERABLE_ERROR; - _fatalErrorMessage = tmp; - this->terminate(); - break; + fprintf(stderr, "failed to pin thread %d to core %d: %s\n", i, pinCore, strerror(errno)); + exit(1); } +#endif } - })); + PacketRecord* packet = nullptr; + for (;;) { + if (! _rxPacketQueue.get(packet)) { + break; + } + if (! packet) { + break; + } + const ZT_ResultCode err = _node->processWirePacket(nullptr, packet->now, packet->sock, &(packet->from), packet->data, packet->size, &_nextBackgroundTaskDeadline); + { + Mutex::Lock l(_rxPacketVector_m); + _rxPacketVector.push_back(packet); + } + if (ZT_ResultCode_isFatal(err)) { + char tmp[256]; + OSUtils::ztsnprintf(tmp, sizeof(tmp), "error processing packet: %d", (int)err); + Mutex::Lock _l(_termReason_m); + _termReason = ONE_UNRECOVERABLE_ERROR; + _fatalErrorMessage = tmp; + this->terminate(); + break; + } + } + })); + } } prometheus::simpleapi::saver.set_registry(prometheus::simpleapi::registry_ptr); @@ -2865,25 +2879,39 @@ class OneServiceImpl : public OneService _lastDirectReceiveFromGlobal = now; } - PacketRecord* packet; - _rxPacketVector_m.lock(); - if (_rxPacketVector.empty()) { - packet = new PacketRecord; + if (_enableMulticore) { + PacketRecord* packet; + _rxPacketVector_m.lock(); + if (_rxPacketVector.empty()) { + packet = new PacketRecord; + } + else { + packet = _rxPacketVector.back(); + _rxPacketVector.pop_back(); + } + _rxPacketVector_m.unlock(); + + packet->sock = reinterpret_cast(sock); + packet->now = now; + memcpy(&(packet->from), from, sizeof(struct sockaddr_storage)); + packet->size = (unsigned int)len; + memcpy(packet->data, data, len); + _rxPacketQueue.postLimit(packet, 256 * _rxThreadCount); } else { - packet = _rxPacketVector.back(); - _rxPacketVector.pop_back(); + const ZT_ResultCode rc = _node->processWirePacket(nullptr,now,reinterpret_cast(sock),reinterpret_cast(from),data,len,&_nextBackgroundTaskDeadline); + if (ZT_ResultCode_isFatal(rc)) { + char tmp[256]; + OSUtils::ztsnprintf(tmp,sizeof(tmp),"fatal error code from processWirePacket: %d",(int)rc); + Mutex::Lock _l(_termReason_m); + _termReason = ONE_UNRECOVERABLE_ERROR; + _fatalErrorMessage = tmp; + this->terminate(); + } } - _rxPacketVector_m.unlock(); - - packet->sock = reinterpret_cast(sock); - packet->now = now; - memcpy(&(packet->from), from, sizeof(struct sockaddr_storage)); - packet->size = (unsigned int)len; - memcpy(packet->data, data, len); - _rxPacketQueue.postLimit(packet, 256 * _rxThreadCount); } + inline void phyOnTcpConnect(PhySocket *sock,void **uptr,bool success) { if (!success) { @@ -3103,7 +3131,7 @@ class OneServiceImpl : public OneService n.setTap(EthernetTap::newInstance( nullptr, _homePath.c_str(), - _rxThreadCount, + _enableMulticore ? _rxThreadCount : 1, MAC(nwc->mac), nwc->mtu, (unsigned int)ZT_IF_METRIC, From 36adae3d8211f165b77cfc644bb60fede3f9034d Mon Sep 17 00:00:00 2001 From: Joseph Henry Date: Sun, 18 Aug 2024 15:07:18 -0700 Subject: [PATCH 03/13] Add packet multiplexer --- node/IncomingPacket.cpp | 13 ++-- node/Node.cpp | 14 +++- node/Node.hpp | 2 +- node/PacketMultiplexer.cpp | 125 ++++++++++++++++++++++++++++++++++++ node/PacketMultiplexer.hpp | 62 ++++++++++++++++++ node/RuntimeEnvironment.hpp | 2 + node/Switch.cpp | 2 +- objects.mk | 3 +- osdep/EthernetTap.cpp | 5 +- osdep/EthernetTap.hpp | 1 - osdep/LinuxEthernetTap.cpp | 23 +++++-- osdep/LinuxEthernetTap.hpp | 2 - service/OneService.cpp | 4 +- 13 files changed, 236 insertions(+), 22 deletions(-) create mode 100644 node/PacketMultiplexer.cpp create mode 100644 node/PacketMultiplexer.hpp diff --git a/node/IncomingPacket.cpp b/node/IncomingPacket.cpp index a5dd770173..e34d4b048b 100644 --- a/node/IncomingPacket.cpp +++ b/node/IncomingPacket.cpp @@ -38,6 +38,7 @@ #include "Path.hpp" #include "Bond.hpp" #include "Metrics.hpp" +#include "PacketMultiplexer.hpp" namespace ZeroTier { @@ -793,7 +794,7 @@ bool IncomingPacket::_doFRAME(const RuntimeEnvironment *RR,void *tPtr,const Shar { Metrics::pkt_frame_in++; int32_t _flowId = ZT_QOS_NO_FLOW; - if (peer->flowHashingSupported()) { + //if (peer->flowHashingSupported()) { if (size() > ZT_PROTO_VERB_EXT_FRAME_IDX_PAYLOAD) { const unsigned int etherType = at(ZT_PROTO_VERB_FRAME_IDX_ETHERTYPE); const unsigned int frameLen = size() - ZT_PROTO_VERB_FRAME_IDX_PAYLOAD; @@ -855,7 +856,9 @@ bool IncomingPacket::_doFRAME(const RuntimeEnvironment *RR,void *tPtr,const Shar } } } - } + //} + + //fprintf(stderr, "IncomingPacket::_doFRAME: flowId=%d\n", _flowId); const uint64_t nwid = at(ZT_PROTO_VERB_FRAME_IDX_NETWORK_ID); const SharedPtr network(RR->node->network(nwid)); @@ -869,7 +872,8 @@ bool IncomingPacket::_doFRAME(const RuntimeEnvironment *RR,void *tPtr,const Shar const unsigned int frameLen = size() - ZT_PROTO_VERB_FRAME_IDX_PAYLOAD; const uint8_t *const frameData = reinterpret_cast(data()) + ZT_PROTO_VERB_FRAME_IDX_PAYLOAD; if (network->filterIncomingPacket(tPtr,peer,RR->identity.address(),sourceMac,network->mac(),frameData,frameLen,etherType,0) > 0) { - RR->node->putFrame(tPtr,nwid,network->userPtr(),sourceMac,network->mac(),etherType,0,(const void *)frameData,frameLen); + //RR->node->putFrame(tPtr,nwid,network->userPtr(),sourceMac,network->mac(),etherType,0,(const void *)frameData,frameLen); + RR->pm->putFrame(tPtr,nwid,network->userPtr(),sourceMac,network->mac(),etherType,0,(const void *)frameData,frameLen, _flowId); } } } else { @@ -942,7 +946,8 @@ bool IncomingPacket::_doEXT_FRAME(const RuntimeEnvironment *RR,void *tPtr,const } // fall through -- 2 means accept regardless of bridging checks or other restrictions case 2: - RR->node->putFrame(tPtr,nwid,network->userPtr(),from,to,etherType,0,(const void *)frameData,frameLen); + //RR->node->putFrame(tPtr,nwid,network->userPtr(),from,to,etherType,0,(const void *)frameData,frameLen); + RR->pm->putFrame(tPtr,nwid,network->userPtr(),from,to,etherType,0,(const void *)frameData,frameLen, flowId); break; } } diff --git a/node/Node.cpp b/node/Node.cpp index 4913d1a4c8..f2ad40fe0e 100644 --- a/node/Node.cpp +++ b/node/Node.cpp @@ -35,6 +35,7 @@ #include "Network.hpp" #include "Trace.hpp" #include "Metrics.hpp" +#include "PacketMultiplexer.hpp" // FIXME: remove this suppression and actually fix warnings #ifdef __GNUC__ @@ -119,9 +120,10 @@ Node::Node(void *uptr,void *tptr,const struct ZT_Node_Callbacks *callbacks,int64 const unsigned long mcs = sizeof(Multicaster) + (((sizeof(Multicaster) & 0xf) != 0) ? (16 - (sizeof(Multicaster) & 0xf)) : 0); const unsigned long topologys = sizeof(Topology) + (((sizeof(Topology) & 0xf) != 0) ? (16 - (sizeof(Topology) & 0xf)) : 0); const unsigned long sas = sizeof(SelfAwareness) + (((sizeof(SelfAwareness) & 0xf) != 0) ? (16 - (sizeof(SelfAwareness) & 0xf)) : 0); - const unsigned long bc = sizeof(Bond) + (((sizeof(Bond) & 0xf) != 0) ? (16 - (sizeof(Bond) & 0xf)) : 0); + const unsigned long bcs = sizeof(Bond) + (((sizeof(Bond) & 0xf) != 0) ? (16 - (sizeof(Bond) & 0xf)) : 0); + const unsigned long pms = sizeof(PacketMultiplexer) + (((sizeof(PacketMultiplexer) & 0xf) != 0) ? (16 - (sizeof(PacketMultiplexer) & 0xf)) : 0); - m = reinterpret_cast(::malloc(16 + ts + sws + mcs + topologys + sas + bc)); + m = reinterpret_cast(::malloc(16 + ts + sws + mcs + topologys + sas + bcs + pms)); if (!m) { throw std::bad_alloc(); } @@ -141,6 +143,8 @@ Node::Node(void *uptr,void *tptr,const struct ZT_Node_Callbacks *callbacks,int64 RR->sa = new (m) SelfAwareness(RR); m += sas; RR->bc = new (m) Bond(RR); + m += bcs; + RR->pm = new (m) PacketMultiplexer(RR); } catch ( ... ) { if (RR->sa) { RR->sa->~SelfAwareness(); @@ -160,6 +164,9 @@ Node::Node(void *uptr,void *tptr,const struct ZT_Node_Callbacks *callbacks,int64 if (RR->bc) { RR->bc->~Bond(); } + if (RR->pm) { + RR->pm->~PacketMultiplexer(); + } ::free(m); throw; } @@ -191,6 +198,9 @@ Node::~Node() if (RR->bc) { RR->bc->~Bond(); } + if (RR->pm) { + RR->pm->~PacketMultiplexer(); + } ::free(RR->rtmem); } diff --git a/node/Node.hpp b/node/Node.hpp index 1f74b83405..1c789a0b04 100644 --- a/node/Node.hpp +++ b/node/Node.hpp @@ -283,7 +283,7 @@ class Node : public NetworkController::Sender return _lowBandwidthMode; } -private: +public: RuntimeEnvironment _RR; RuntimeEnvironment *RR; void *_uPtr; // _uptr (lower case) is reserved in Visual Studio :P diff --git a/node/PacketMultiplexer.cpp b/node/PacketMultiplexer.cpp new file mode 100644 index 0000000000..6f559c4b86 --- /dev/null +++ b/node/PacketMultiplexer.cpp @@ -0,0 +1,125 @@ +/* + * Copyright (c)2013-2021 ZeroTier, Inc. + * + * Use of this software is governed by the Business Source License included + * in the LICENSE.TXT file in the project's root directory. + * + * Change Date: 2026-01-01 + * + * On the date above, in accordance with the Business Source License, use + * of this software will be governed by version 2.0 of the Apache License. + */ +/****/ + +#include "PacketMultiplexer.hpp" + +#include "Node.hpp" +#include "RuntimeEnvironment.hpp" + +#include +#include + +namespace ZeroTier { + +void PacketMultiplexer::putFrame(void* tPtr, uint64_t nwid, void** nuptr, const MAC& source, const MAC& dest, unsigned int etherType, unsigned int vlanId, const void* data, unsigned int len, unsigned int flowId) +{ + PacketRecord* packet; + _rxPacketVector_m.lock(); + if (_rxPacketVector.empty()) { + packet = new PacketRecord; + } + else { + packet = _rxPacketVector.back(); + _rxPacketVector.pop_back(); + } + _rxPacketVector_m.unlock(); + + packet->tPtr = tPtr; + packet->nwid = nwid; + packet->nuptr = nuptr; + packet->source = source.toInt(); + packet->dest = dest.toInt(); + packet->etherType = etherType; + packet->vlanId = vlanId; + packet->len = len; + packet->flowId = flowId; + memcpy(packet->data, data, len); + + int bucket = flowId % _concurrency; + //fprintf(stderr, "bucket=%d\n", bucket); + _rxPacketQueues[bucket]->postLimit(packet, 2048); +} + +PacketMultiplexer::PacketMultiplexer(const RuntimeEnvironment* renv) +{ + RR = renv; + bool _enablePinning = false; + char* pinningVar = std::getenv("ZT_CPU_PINNING"); + if (pinningVar) { + int tmp = atoi(pinningVar); + if (tmp > 0) { + _enablePinning = true; + } + } + + _concurrency = 1; + char* concurrencyVar = std::getenv("ZT_PACKET_PROCESSING_CONCURRENCY"); + if (concurrencyVar) { + int tmp = atoi(concurrencyVar); + if (tmp > 0) { + _concurrency = tmp; + } + else { + _concurrency = std::max((unsigned int)1, std::thread::hardware_concurrency() / 2); + } + } + else { + _concurrency = std::max((unsigned int)1, std::thread::hardware_concurrency() / 2); + } + + for (unsigned int i = 0; i < _concurrency; ++i) { + fprintf(stderr, "reserved queue for thread %d\n", i); + _rxPacketQueues.push_back(new BlockingQueue()); + } + + // Each thread picks from its own queue to feed into the core + for (unsigned int i = 0; i < _concurrency; ++i) { + _rxThreads.push_back(std::thread([this, i, _enablePinning]() { + fprintf(stderr, "created post-decode packet ingestion thread %d\n", i); + + PacketRecord* packet = nullptr; + for (;;) { + if (! _rxPacketQueues[i]->get(packet)) { + break; + } + if (! packet) { + break; + } + + //fprintf(stderr, "popped packet from queue %d\n", i); + + MAC sourceMac = MAC(packet->source); + MAC destMac = MAC(packet->dest); + + RR->node->putFrame(packet->tPtr, packet->nwid, packet->nuptr, sourceMac, destMac, packet->etherType, 0, (const void*)packet->data, packet->len); + { + Mutex::Lock l(_rxPacketVector_m); + _rxPacketVector.push_back(packet); + } + /* + if (ZT_ResultCode_isFatal(err)) { + char tmp[256]; + OSUtils::ztsnprintf(tmp, sizeof(tmp), "error processing packet: %d", (int)err); + Mutex::Lock _l(_termReason_m); + _termReason = ONE_UNRECOVERABLE_ERROR; + _fatalErrorMessage = tmp; + this->terminate(); + break; + } + */ + } + })); + } +}; + +} // namespace ZeroTier \ No newline at end of file diff --git a/node/PacketMultiplexer.hpp b/node/PacketMultiplexer.hpp new file mode 100644 index 0000000000..152bda320a --- /dev/null +++ b/node/PacketMultiplexer.hpp @@ -0,0 +1,62 @@ +/* + * Copyright (c)2013-2021 ZeroTier, Inc. + * + * Use of this software is governed by the Business Source License included + * in the LICENSE.TXT file in the project's root directory. + * + * Change Date: 2026-01-01 + * + * On the date above, in accordance with the Business Source License, use + * of this software will be governed by version 2.0 of the Apache License. + */ +/****/ + +#ifndef ZT_PACKET_MULTIPLEXER_HPP +#define ZT_PACKET_MULTIPLEXER_HPP + +#include "../osdep/BlockingQueue.hpp" +#include "MAC.hpp" +#include "Mutex.hpp" +#include "RuntimeEnvironment.hpp" + +#include +#include + +namespace ZeroTier { + +struct PacketRecord { + void* tPtr; + uint64_t nwid; + void** nuptr; + uint64_t source; + uint64_t dest; + unsigned int etherType; + unsigned int vlanId; + uint8_t data[ZT_MAX_MTU]; + unsigned int len; + unsigned int flowId; +}; + +class PacketMultiplexer { + public: + const RuntimeEnvironment* RR; + + PacketMultiplexer(const RuntimeEnvironment* renv); + + void putFrame(void* tPtr, uint64_t nwid, void** nuptr, const MAC& source, const MAC& dest, unsigned int etherType, unsigned int vlanId, const void* data, unsigned int len, unsigned int flowId); + + std::vector*> _rxPacketQueues; + + unsigned int _concurrency; + // pool + std::vector _rxPacketVector; + std::vector _rxPacketThreads; + Mutex _rxPacketVector_m, _rxPacketThreads_m; + + std::vector _rxThreads; + unsigned int _rxThreadCount; +}; + +} // namespace ZeroTier + +#endif // ZT_PACKET_MULTIPLEXER_HPP \ No newline at end of file diff --git a/node/RuntimeEnvironment.hpp b/node/RuntimeEnvironment.hpp index 0196455136..274a9f265d 100644 --- a/node/RuntimeEnvironment.hpp +++ b/node/RuntimeEnvironment.hpp @@ -31,6 +31,7 @@ class NetworkController; class SelfAwareness; class Trace; class Bond; +class PacketMultiplexer; /** * Holds global state for an instance of ZeroTier::Node @@ -77,6 +78,7 @@ class RuntimeEnvironment Topology *topology; SelfAwareness *sa; Bond *bc; + PacketMultiplexer *pm; // This node's identity and string representations thereof Identity identity; diff --git a/node/Switch.cpp b/node/Switch.cpp index 5ea1653c2a..871a55b244 100644 --- a/node/Switch.cpp +++ b/node/Switch.cpp @@ -519,7 +519,7 @@ void Switch::onLocalEthernet(void *tPtr,const SharedPtr &network,const RR->node->putFrame(tPtr, network->id(), network->userPtr(), peerMac, from, ZT_ETHERTYPE_IPV6, 0, adv, 72); }).detach(); - + return; // NDP emulation done. We have forged a "fake" reply, so no need to send actual NDP query. } // else no NDP emulation } // else no NDP emulation diff --git a/objects.mk b/objects.mk index d07578fb36..1d8a6c0a5b 100644 --- a/objects.mk +++ b/objects.mk @@ -29,7 +29,8 @@ CORE_OBJS=\ node/Topology.o \ node/Trace.o \ node/Utils.o \ - node/Bond.o + node/Bond.o \ + node/PacketMultiplexer.o ONE_OBJS=\ controller/EmbeddedNetworkController.o \ diff --git a/osdep/EthernetTap.cpp b/osdep/EthernetTap.cpp index 95ce54b0a4..cdc01f7b8b 100644 --- a/osdep/EthernetTap.cpp +++ b/osdep/EthernetTap.cpp @@ -58,7 +58,6 @@ namespace ZeroTier { std::shared_ptr EthernetTap::newInstance( const char *tapDeviceType, // OS-specific, NULL for default const char *homePath, - unsigned int concurrency, const MAC &mac, unsigned int mtu, unsigned int metric, @@ -89,11 +88,11 @@ std::shared_ptr EthernetTap::newInstance( return std::shared_ptr(new MacEthernetTap(homePath,mac,mtu,metric,nwid,friendlyName,handler,arg)); } } - } + }/ #endif // __APPLE__ #ifdef __LINUX__ - return std::shared_ptr(new LinuxEthernetTap(homePath,concurrency,mac,mtu,metric,nwid,friendlyName,handler,arg)); + return std::shared_ptr(new LinuxEthernetTap(homePath,mac,mtu,metric,nwid,friendlyName,handler,arg)); #endif // __LINUX__ #ifdef __WINDOWS__ diff --git a/osdep/EthernetTap.hpp b/osdep/EthernetTap.hpp index c5e82470c7..893e70c340 100644 --- a/osdep/EthernetTap.hpp +++ b/osdep/EthernetTap.hpp @@ -33,7 +33,6 @@ class EthernetTap static std::shared_ptr newInstance( const char *tapDeviceType, // OS-specific, NULL for default const char *homePath, - unsigned int concurrency, const MAC &mac, unsigned int mtu, unsigned int metric, diff --git a/osdep/LinuxEthernetTap.cpp b/osdep/LinuxEthernetTap.cpp index 81ebedc11e..160f4d76bb 100644 --- a/osdep/LinuxEthernetTap.cpp +++ b/osdep/LinuxEthernetTap.cpp @@ -111,7 +111,6 @@ static void _base32_5_to_8(const uint8_t *in,char *out) LinuxEthernetTap::LinuxEthernetTap( const char *homePath, - unsigned int concurrency, const MAC &mac, unsigned int mtu, unsigned int metric, @@ -128,7 +127,6 @@ LinuxEthernetTap::LinuxEthernetTap( _fd(0), _enabled(true), _run(true), - _concurrency(concurrency), _lastIfAddrsUpdate(0) { static std::mutex s_tapCreateLock; @@ -231,12 +229,27 @@ LinuxEthernetTap::LinuxEthernetTap( } } + int _concurrency = 1; + char* concurrencyVar = std::getenv("ZT_PACKET_PROCESSING_CONCURRENCY"); + if (concurrencyVar) { + int tmp = atoi(concurrencyVar); + if (tmp > 0) { + _concurrency = tmp; + } + else { + _concurrency = std::max((unsigned int)1,std::thread::hardware_concurrency() / 2); + } + } + else { + _concurrency = std::max((unsigned int)1,std::thread::hardware_concurrency() / 2); + } + for (unsigned int i = 0; i < _concurrency; ++i) { - _rxThreads.push_back(std::thread([this, i, _enablePinning] { + _rxThreads.push_back(std::thread([this, i, _concurrency, _enablePinning] { if (_enablePinning) { int pinCore = i % _concurrency; - fprintf(stderr, "pinning thread %d to core %d\n", i, pinCore); + fprintf(stderr, "pinning tap thread %d to core %d\n", i, pinCore); pthread_t self = pthread_self(); cpu_set_t cpuset; CPU_ZERO(&cpuset); @@ -244,7 +257,7 @@ LinuxEthernetTap::LinuxEthernetTap( int rc = pthread_setaffinity_np(self, sizeof(cpu_set_t), &cpuset); if (rc != 0) { - fprintf(stderr, "failed to pin thread %d to core %d: %s\n", i, pinCore, strerror(errno)); + fprintf(stderr, "failed to pin tap thread %d to core %d: %s\n", i, pinCore, strerror(errno)); exit(1); } } diff --git a/osdep/LinuxEthernetTap.hpp b/osdep/LinuxEthernetTap.hpp index b694b277c7..383c67b7f4 100644 --- a/osdep/LinuxEthernetTap.hpp +++ b/osdep/LinuxEthernetTap.hpp @@ -35,7 +35,6 @@ class LinuxEthernetTap : public EthernetTap public: LinuxEthernetTap( const char *homePath, - unsigned int _concurrency, const MAC &mac, unsigned int mtu, unsigned int metric, @@ -68,7 +67,6 @@ class LinuxEthernetTap : public EthernetTap std::string _dev; std::vector _multicastGroups; unsigned int _mtu; - unsigned int _concurrency; int _fd; int _shutdownSignalPipe[2]; std::atomic_bool _enabled; diff --git a/service/OneService.cpp b/service/OneService.cpp index 0287276bc1..169fd6f91a 100644 --- a/service/OneService.cpp +++ b/service/OneService.cpp @@ -46,6 +46,7 @@ #include "../node/SHA512.hpp" #include "../node/Bond.hpp" #include "../node/Peer.hpp" +#include "../node/PacketMultiplexer.hpp" #include "../osdep/Phy.hpp" #include "../osdep/OSUtils.hpp" @@ -986,7 +987,7 @@ class OneServiceImpl : public OneService #if defined(__LINUX__) || defined(__FreeBSD__) /* || defined(__APPLE__) */ if (rc != 0) { - fprintf(stderr, "failed to pin thread %d to core %d: %s\n", i, pinCore, strerror(errno)); + fprintf(stderr, "failed to pin rx thread %d to core %d: %s\n", i, pinCore, strerror(errno)); exit(1); } #endif @@ -3131,7 +3132,6 @@ class OneServiceImpl : public OneService n.setTap(EthernetTap::newInstance( nullptr, _homePath.c_str(), - _enableMulticore ? _rxThreadCount : 1, MAC(nwc->mac), nwc->mtu, (unsigned int)ZT_IF_METRIC, From b1a30ae4ff8cfe3516a7b56d5291afa82cd7403c Mon Sep 17 00:00:00 2001 From: Joseph Henry Date: Tue, 20 Aug 2024 13:39:15 -0700 Subject: [PATCH 04/13] Switch to local.conf-based config of multithreading --- node/IncomingPacket.cpp | 135 +++++++++++++++--------------- node/Node.cpp | 6 ++ node/Node.hpp | 9 ++ node/PacketMultiplexer.cpp | 45 ++++------ node/PacketMultiplexer.hpp | 4 +- service/OneService.cpp | 163 +++++++++++++++++-------------------- 6 files changed, 178 insertions(+), 184 deletions(-) diff --git a/node/IncomingPacket.cpp b/node/IncomingPacket.cpp index b9e5d8b26d..4d6d5ce8e5 100644 --- a/node/IncomingPacket.cpp +++ b/node/IncomingPacket.cpp @@ -794,71 +794,68 @@ bool IncomingPacket::_doFRAME(const RuntimeEnvironment *RR,void *tPtr,const Shar { Metrics::pkt_frame_in++; int32_t _flowId = ZT_QOS_NO_FLOW; - //if (peer->flowHashingSupported()) { - if (size() > ZT_PROTO_VERB_EXT_FRAME_IDX_PAYLOAD) { - const unsigned int etherType = at(ZT_PROTO_VERB_FRAME_IDX_ETHERTYPE); - const unsigned int frameLen = size() - ZT_PROTO_VERB_FRAME_IDX_PAYLOAD; - const uint8_t *const frameData = reinterpret_cast(data()) + ZT_PROTO_VERB_FRAME_IDX_PAYLOAD; - - if (etherType == ZT_ETHERTYPE_IPV4 && (frameLen >= 20)) { - uint16_t srcPort = 0; - uint16_t dstPort = 0; - uint8_t proto = (reinterpret_cast(frameData)[9]); - const unsigned int headerLen = 4 * (reinterpret_cast(frameData)[0] & 0xf); - switch(proto) { - case 0x01: // ICMP - //flowId = 0x01; - break; - // All these start with 16-bit source and destination port in that order - case 0x06: // TCP - case 0x11: // UDP - case 0x84: // SCTP - case 0x88: // UDPLite - if (frameLen > (headerLen + 4)) { - unsigned int pos = headerLen + 0; - srcPort = (reinterpret_cast(frameData)[pos++]) << 8; - srcPort |= (reinterpret_cast(frameData)[pos]); - pos++; - dstPort = (reinterpret_cast(frameData)[pos++]) << 8; - dstPort |= (reinterpret_cast(frameData)[pos]); - _flowId = dstPort ^ srcPort ^ proto; - } - break; - } - } - if (etherType == ZT_ETHERTYPE_IPV6 && (frameLen >= 40)) { - uint16_t srcPort = 0; - uint16_t dstPort = 0; - unsigned int pos; - unsigned int proto; - _ipv6GetPayload((const uint8_t *)frameData, frameLen, pos, proto); - switch(proto) { - case 0x3A: // ICMPv6 - //flowId = 0x3A; - break; - // All these start with 16-bit source and destination port in that order - case 0x06: // TCP - case 0x11: // UDP - case 0x84: // SCTP - case 0x88: // UDPLite - if (frameLen > (pos + 4)) { - srcPort = (reinterpret_cast(frameData)[pos++]) << 8; - srcPort |= (reinterpret_cast(frameData)[pos]); - pos++; - dstPort = (reinterpret_cast(frameData)[pos++]) << 8; - dstPort |= (reinterpret_cast(frameData)[pos]); - _flowId = dstPort ^ srcPort ^ proto; - } - break; - default: - break; - } + if (size() > ZT_PROTO_VERB_EXT_FRAME_IDX_PAYLOAD) { + const unsigned int etherType = at(ZT_PROTO_VERB_FRAME_IDX_ETHERTYPE); + const unsigned int frameLen = size() - ZT_PROTO_VERB_FRAME_IDX_PAYLOAD; + const uint8_t *const frameData = reinterpret_cast(data()) + ZT_PROTO_VERB_FRAME_IDX_PAYLOAD; + + if (etherType == ZT_ETHERTYPE_IPV4 && (frameLen >= 20)) { + uint16_t srcPort = 0; + uint16_t dstPort = 0; + uint8_t proto = (reinterpret_cast(frameData)[9]); + const unsigned int headerLen = 4 * (reinterpret_cast(frameData)[0] & 0xf); + switch(proto) { + case 0x01: // ICMP + //flowId = 0x01; + break; + // All these start with 16-bit source and destination port in that order + case 0x06: // TCP + case 0x11: // UDP + case 0x84: // SCTP + case 0x88: // UDPLite + if (frameLen > (headerLen + 4)) { + unsigned int pos = headerLen + 0; + srcPort = (reinterpret_cast(frameData)[pos++]) << 8; + srcPort |= (reinterpret_cast(frameData)[pos]); + pos++; + dstPort = (reinterpret_cast(frameData)[pos++]) << 8; + dstPort |= (reinterpret_cast(frameData)[pos]); + _flowId = dstPort ^ srcPort ^ proto; + } + break; } } - //} - //fprintf(stderr, "IncomingPacket::_doFRAME: flowId=%d\n", _flowId); + if (etherType == ZT_ETHERTYPE_IPV6 && (frameLen >= 40)) { + uint16_t srcPort = 0; + uint16_t dstPort = 0; + unsigned int pos; + unsigned int proto; + _ipv6GetPayload((const uint8_t *)frameData, frameLen, pos, proto); + switch(proto) { + case 0x3A: // ICMPv6 + //flowId = 0x3A; + break; + // All these start with 16-bit source and destination port in that order + case 0x06: // TCP + case 0x11: // UDP + case 0x84: // SCTP + case 0x88: // UDPLite + if (frameLen > (pos + 4)) { + srcPort = (reinterpret_cast(frameData)[pos++]) << 8; + srcPort |= (reinterpret_cast(frameData)[pos]); + pos++; + dstPort = (reinterpret_cast(frameData)[pos++]) << 8; + dstPort |= (reinterpret_cast(frameData)[pos]); + _flowId = dstPort ^ srcPort ^ proto; + } + break; + default: + break; + } + } + } const uint64_t nwid = at(ZT_PROTO_VERB_FRAME_IDX_NETWORK_ID); const SharedPtr network(RR->node->network(nwid)); @@ -872,8 +869,12 @@ bool IncomingPacket::_doFRAME(const RuntimeEnvironment *RR,void *tPtr,const Shar const unsigned int frameLen = size() - ZT_PROTO_VERB_FRAME_IDX_PAYLOAD; const uint8_t *const frameData = reinterpret_cast(data()) + ZT_PROTO_VERB_FRAME_IDX_PAYLOAD; if (network->filterIncomingPacket(tPtr,peer,RR->identity.address(),sourceMac,network->mac(),frameData,frameLen,etherType,0) > 0) { - //RR->node->putFrame(tPtr,nwid,network->userPtr(),sourceMac,network->mac(),etherType,0,(const void *)frameData,frameLen); - RR->pm->putFrame(tPtr,nwid,network->userPtr(),sourceMac,network->mac(),etherType,0,(const void *)frameData,frameLen, _flowId); + if (RR->node->getMultithreadingEnabled()) { + RR->pm->putFrame(tPtr,nwid,network->userPtr(),sourceMac,network->mac(),etherType,0,(const void *)frameData,frameLen, _flowId); + } + else { + RR->node->putFrame(tPtr,nwid,network->userPtr(),sourceMac,network->mac(),etherType,0,(const void *)frameData,frameLen); + } } } } else { @@ -946,8 +947,12 @@ bool IncomingPacket::_doEXT_FRAME(const RuntimeEnvironment *RR,void *tPtr,const } // fall through -- 2 means accept regardless of bridging checks or other restrictions case 2: - //RR->node->putFrame(tPtr,nwid,network->userPtr(),from,to,etherType,0,(const void *)frameData,frameLen); - RR->pm->putFrame(tPtr,nwid,network->userPtr(),from,to,etherType,0,(const void *)frameData,frameLen, flowId); + if (RR->node->getMultithreadingEnabled()) { + RR->pm->putFrame(tPtr,nwid,network->userPtr(),from,to,etherType,0,(const void *)frameData,frameLen, flowId); + } + else { + RR->node->putFrame(tPtr,nwid,network->userPtr(),from,to,etherType,0,(const void *)frameData,frameLen); + } break; } } diff --git a/node/Node.cpp b/node/Node.cpp index 50a3c3ff57..c7cb37df57 100644 --- a/node/Node.cpp +++ b/node/Node.cpp @@ -240,6 +240,12 @@ ZT_ResultCode Node::processVirtualNetworkFrame( } } +void Node::initMultithreading(bool isEnabled, unsigned int concurrency, bool cpuPinningEnabled) +{ + _multithreadingEnabled = isEnabled; + RR->pm->setUpPostDecodeReceiveThreads(concurrency, cpuPinningEnabled); +} + // Closure used to ping upstream and active/online peers class _PingPeersThatNeedPing { diff --git a/node/Node.hpp b/node/Node.hpp index 86d1aee575..da2d5427f0 100644 --- a/node/Node.hpp +++ b/node/Node.hpp @@ -283,6 +283,14 @@ class Node : public NetworkController::Sender return _lowBandwidthMode; } + inline bool getMultithreadingEnabled() + { + return _multithreadingEnabled; + } + + void initMultithreading(bool isEnabled, unsigned int concurrency, bool cpuPinningEnabled); + + public: RuntimeEnvironment _RR; RuntimeEnvironment *RR; @@ -331,6 +339,7 @@ class Node : public NetworkController::Sender volatile int64_t _prngState[2]; bool _online; bool _lowBandwidthMode; + bool _multithreadingEnabled; }; } // namespace ZeroTier diff --git a/node/PacketMultiplexer.cpp b/node/PacketMultiplexer.cpp index 6f559c4b86..401e1db9f9 100644 --- a/node/PacketMultiplexer.cpp +++ b/node/PacketMultiplexer.cpp @@ -21,6 +21,11 @@ namespace ZeroTier { +PacketMultiplexer::PacketMultiplexer(const RuntimeEnvironment* renv) +{ + RR = renv; +}; + void PacketMultiplexer::putFrame(void* tPtr, uint64_t nwid, void** nuptr, const MAC& source, const MAC& dest, unsigned int etherType, unsigned int vlanId, const void* data, unsigned int len, unsigned int flowId) { PacketRecord* packet; @@ -46,46 +51,26 @@ void PacketMultiplexer::putFrame(void* tPtr, uint64_t nwid, void** nuptr, const memcpy(packet->data, data, len); int bucket = flowId % _concurrency; - //fprintf(stderr, "bucket=%d\n", bucket); - _rxPacketQueues[bucket]->postLimit(packet, 2048); + _rxPacketQueues[bucket]->postLimit(packet, 256); } -PacketMultiplexer::PacketMultiplexer(const RuntimeEnvironment* renv) +void PacketMultiplexer::setUpPostDecodeReceiveThreads(unsigned int concurrency, bool cpuPinningEnabled) { - RR = renv; - bool _enablePinning = false; - char* pinningVar = std::getenv("ZT_CPU_PINNING"); - if (pinningVar) { - int tmp = atoi(pinningVar); - if (tmp > 0) { - _enablePinning = true; - } - } - - _concurrency = 1; - char* concurrencyVar = std::getenv("ZT_PACKET_PROCESSING_CONCURRENCY"); - if (concurrencyVar) { - int tmp = atoi(concurrencyVar); - if (tmp > 0) { - _concurrency = tmp; - } - else { - _concurrency = std::max((unsigned int)1, std::thread::hardware_concurrency() / 2); - } - } - else { - _concurrency = std::max((unsigned int)1, std::thread::hardware_concurrency() / 2); + if (! RR->node->getMultithreadingEnabled()) { + return; } + _concurrency = concurrency; + bool _enablePinning = cpuPinningEnabled; for (unsigned int i = 0; i < _concurrency; ++i) { - fprintf(stderr, "reserved queue for thread %d\n", i); + fprintf(stderr, "Reserved queue for thread %d\n", i); _rxPacketQueues.push_back(new BlockingQueue()); } // Each thread picks from its own queue to feed into the core for (unsigned int i = 0; i < _concurrency; ++i) { _rxThreads.push_back(std::thread([this, i, _enablePinning]() { - fprintf(stderr, "created post-decode packet ingestion thread %d\n", i); + fprintf(stderr, "Created post-decode packet ingestion thread %d\n", i); PacketRecord* packet = nullptr; for (;;) { @@ -96,7 +81,7 @@ PacketMultiplexer::PacketMultiplexer(const RuntimeEnvironment* renv) break; } - //fprintf(stderr, "popped packet from queue %d\n", i); + // fprintf(stderr, "popped packet from queue %d\n", i); MAC sourceMac = MAC(packet->source); MAC destMac = MAC(packet->dest); @@ -120,6 +105,6 @@ PacketMultiplexer::PacketMultiplexer(const RuntimeEnvironment* renv) } })); } -}; +} } // namespace ZeroTier \ No newline at end of file diff --git a/node/PacketMultiplexer.hpp b/node/PacketMultiplexer.hpp index 152bda320a..8cd592a10c 100644 --- a/node/PacketMultiplexer.hpp +++ b/node/PacketMultiplexer.hpp @@ -43,11 +43,13 @@ class PacketMultiplexer { PacketMultiplexer(const RuntimeEnvironment* renv); + void setUpPostDecodeReceiveThreads(unsigned int concurrency, bool cpuPinningEnabled); + void putFrame(void* tPtr, uint64_t nwid, void** nuptr, const MAC& source, const MAC& dest, unsigned int etherType, unsigned int vlanId, const void* data, unsigned int len, unsigned int flowId); std::vector*> _rxPacketQueues; - unsigned int _concurrency; + unsigned int _concurrency; // pool std::vector _rxPacketVector; std::vector _rxPacketThreads; diff --git a/service/OneService.cpp b/service/OneService.cpp index c783f01cde..799f314122 100644 --- a/service/OneService.cpp +++ b/service/OneService.cpp @@ -798,12 +798,13 @@ class OneServiceImpl : public OneService bool _serverThreadRunning; bool _serverThreadRunningV6; - unsigned int _rxThreadCount; BlockingQueue _rxPacketQueue; std::vector _rxPacketVector; std::vector _rxPacketThreads; Mutex _rxPacketVector_m,_rxPacketThreads_m; - bool _enableMulticore; + bool _multicoreEnabled; + bool _cpuPinningEnabled; + unsigned int _concurrency; bool _allowTcpFallbackRelay; bool _forceTcpRelay; @@ -938,89 +939,6 @@ class OneServiceImpl : public OneService _ports[1] = 0; _ports[2] = 0; - _enableMulticore = false; - char* multicoreVar = std::getenv("ZT_ENABLE_MULTICORE"); - if (multicoreVar) { - int tmp = atoi(multicoreVar); - if (tmp > 0) { - _enableMulticore = true; - } - } - if (_enableMulticore) { - bool _enablePinning = false; - char* pinningVar = std::getenv("ZT_CORE_PINNING"); - if (pinningVar) { - int tmp = atoi(pinningVar); - if (tmp > 0) { - _enablePinning = true; - } - } - char* concurrencyVar = std::getenv("ZT_CONCURRENCY"); - if (concurrencyVar) { - int tmp = atoi(concurrencyVar); - if (tmp > 0) { - _rxThreadCount = tmp; - } - else { - _rxThreadCount = std::thread::hardware_concurrency() >= 4 ? 2 : 1; - } - } - else { - _rxThreadCount = std::thread::hardware_concurrency() >= 4 ? 2 : 1; - } - fprintf(stderr, "using %d rx threads\n", _rxThreadCount); - for (unsigned int i = 0; i < _rxThreadCount; ++i) { - _rxPacketThreads.push_back(std::thread([this, i, _enablePinning]() { - - if (_enablePinning) { -#if defined(__LINUX__) || defined(__FreeBSD__) /* || defined(__APPLE__) */ - int pinCore = i % _rxThreadCount; - fprintf(stderr, "pinning thread %d to core %d\n", i, pinCore); - pthread_t self = pthread_self(); - cpu_set_t cpuset; - CPU_ZERO(&cpuset); - CPU_SET(pinCore, &cpuset); -#endif -#ifdef __LINUX__ - int rc = pthread_setaffinity_np(self, sizeof(cpu_set_t), &cpuset); -#elif __FreeBSD__ - int rc = pthread_setaffinity_np(self, sizeof(cpu_set_t), &cpuset); -#endif -#if defined(__LINUX__) || defined(__FreeBSD__) /* || defined(__APPLE__) */ - if (rc != 0) - { - fprintf(stderr, "failed to pin rx thread %d to core %d: %s\n", i, pinCore, strerror(errno)); - exit(1); - } -#endif - } - PacketRecord* packet = nullptr; - for (;;) { - if (! _rxPacketQueue.get(packet)) { - break; - } - if (! packet) { - break; - } - const ZT_ResultCode err = _node->processWirePacket(nullptr, packet->now, packet->sock, &(packet->from), packet->data, packet->size, &_nextBackgroundTaskDeadline); - { - Mutex::Lock l(_rxPacketVector_m); - _rxPacketVector.push_back(packet); - } - if (ZT_ResultCode_isFatal(err)) { - char tmp[256]; - OSUtils::ztsnprintf(tmp, sizeof(tmp), "error processing packet: %d", (int)err); - Mutex::Lock _l(_termReason_m); - _termReason = ONE_UNRECOVERABLE_ERROR; - _fatalErrorMessage = tmp; - this->terminate(); - break; - } - } - })); - } - } - prometheus::simpleapi::saver.set_registry(prometheus::simpleapi::registry_ptr); prometheus::simpleapi::saver.set_delay(std::chrono::seconds(5)); prometheus::simpleapi::saver.set_out_file(_homePath + ZT_PATH_SEPARATOR + "metrics.prom"); @@ -1071,6 +989,64 @@ class OneServiceImpl : public OneService delete _rc; } + void setUpMultithreading() + { + _node->initMultithreading(true, _concurrency, _cpuPinningEnabled); + bool pinning = _cpuPinningEnabled; + + fprintf(stderr, "Starting %d RX threads\n", _concurrency); + for (unsigned int i = 0; i < _concurrency; ++i) { + _rxPacketThreads.push_back(std::thread([this, i, pinning]() { + + if (pinning) { +#if defined(__LINUX__) || defined(__FreeBSD__) /* || defined(__APPLE__) */ + int pinCore = i % _concurrency; + fprintf(stderr, "CPU Pinning enabled. Pinning thread %d to core %d\n", i, pinCore); + pthread_t self = pthread_self(); + cpu_set_t cpuset; + CPU_ZERO(&cpuset); + CPU_SET(pinCore, &cpuset); +#endif +#ifdef __LINUX__ + int rc = pthread_setaffinity_np(self, sizeof(cpu_set_t), &cpuset); +#elif __FreeBSD__ + int rc = pthread_setaffinity_np(self, sizeof(cpu_set_t), &cpuset); +#endif +#if defined(__LINUX__) || defined(__FreeBSD__) /* || defined(__APPLE__) */ + if (rc != 0) + { + fprintf(stderr, "failed to pin rx thread %d to core %d: %s\n", i, pinCore, strerror(errno)); + exit(1); + } +#endif + } + PacketRecord* packet = nullptr; + for (;;) { + if (! _rxPacketQueue.get(packet)) { + break; + } + if (! packet) { + break; + } + const ZT_ResultCode err = _node->processWirePacket(nullptr, packet->now, packet->sock, &(packet->from), packet->data, packet->size, &_nextBackgroundTaskDeadline); + { + Mutex::Lock l(_rxPacketVector_m); + _rxPacketVector.push_back(packet); + } + if (ZT_ResultCode_isFatal(err)) { + char tmp[256]; + OSUtils::ztsnprintf(tmp, sizeof(tmp), "error processing packet: %d", (int)err); + Mutex::Lock _l(_termReason_m); + _termReason = ONE_UNRECOVERABLE_ERROR; + _fatalErrorMessage = tmp; + this->terminate(); + break; + } + } + })); + } + } + virtual ReasonForTermination run() { try { @@ -2672,7 +2648,18 @@ class OneServiceImpl : public OneService fprintf(stderr,"WARNING: using manually-specified secondary and/or tertiary ports. This can cause NAT issues." ZT_EOL_S); } _portMappingEnabled = OSUtils::jsonBool(settings["portMappingEnabled"],true); - _node->setLowBandwidthMode(OSUtils::jsonBool(settings["lowBandwidthMode"],false)); + _multicoreEnabled = OSUtils::jsonBool(settings["multicoreEnabled"],false); + _concurrency = OSUtils::jsonInt(settings["concurrency"],0); + _cpuPinningEnabled = OSUtils::jsonBool(settings["cpuPinningEnabled"],false); + if (_multicoreEnabled) { + unsigned int maxConcurrency = std::thread::hardware_concurrency(); + if (_concurrency <= 1 || _concurrency >= maxConcurrency) { + unsigned int conservativeDefault = (std::thread::hardware_concurrency() >= 4 ? 2 : 1); + fprintf(stderr, "Concurrency level provided (%d) is invalid, assigning conservative default value of (%d)\n", _concurrency, conservativeDefault); + _concurrency = conservativeDefault; + } + setUpMultithreading(); + } #ifndef ZT_SDK const std::string up(OSUtils::jsonString(settings["softwareUpdate"],ZT_SOFTWARE_UPDATE_DEFAULT)); @@ -3001,7 +2988,7 @@ class OneServiceImpl : public OneService _lastDirectReceiveFromGlobal = now; } - if (_enableMulticore) { + if (_multicoreEnabled) { PacketRecord* packet; _rxPacketVector_m.lock(); if (_rxPacketVector.empty()) { @@ -3018,7 +3005,7 @@ class OneServiceImpl : public OneService memcpy(&(packet->from), from, sizeof(struct sockaddr_storage)); packet->size = (unsigned int)len; memcpy(packet->data, data, len); - _rxPacketQueue.postLimit(packet, 256 * _rxThreadCount); + _rxPacketQueue.postLimit(packet, 256 * _concurrency); } else { const ZT_ResultCode rc = _node->processWirePacket(nullptr,now,reinterpret_cast(sock),reinterpret_cast(from),data,len,&_nextBackgroundTaskDeadline); From b7d9290de983562ee5f2f8c8922ea2d26a6aae31 Mon Sep 17 00:00:00 2001 From: Joseph Henry Date: Wed, 21 Aug 2024 11:22:07 -0700 Subject: [PATCH 05/13] Fix build issue on macOS --- osdep/EthernetTap.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/osdep/EthernetTap.cpp b/osdep/EthernetTap.cpp index 4b695988fc..02fab3d07c 100644 --- a/osdep/EthernetTap.cpp +++ b/osdep/EthernetTap.cpp @@ -88,7 +88,7 @@ std::shared_ptr EthernetTap::newInstance( return std::shared_ptr(new MacEthernetTap(homePath,mac,mtu,metric,nwid,friendlyName,handler,arg)); } } - }/ + } #endif // __APPLE__ #ifdef __LINUX__ From e734019216d531263702266d83b7fd87040f3f0b Mon Sep 17 00:00:00 2001 From: Joseph Henry Date: Wed, 21 Aug 2024 14:06:25 -0700 Subject: [PATCH 06/13] More platform-related build fixes --- osdep/EthernetTap.cpp | 10 ++++++---- osdep/EthernetTap.hpp | 2 ++ osdep/LinuxEthernetTap.cpp | 38 ++++++++------------------------------ osdep/LinuxEthernetTap.hpp | 2 ++ service/OneService.cpp | 2 ++ 5 files changed, 20 insertions(+), 34 deletions(-) diff --git a/osdep/EthernetTap.cpp b/osdep/EthernetTap.cpp index 02fab3d07c..4395bc4042 100644 --- a/osdep/EthernetTap.cpp +++ b/osdep/EthernetTap.cpp @@ -57,6 +57,8 @@ namespace ZeroTier { std::shared_ptr EthernetTap::newInstance( const char *tapDeviceType, // OS-specific, NULL for default + unsigned int concurrency, + bool pinning, const char *homePath, const MAC &mac, unsigned int mtu, @@ -92,7 +94,7 @@ std::shared_ptr EthernetTap::newInstance( #endif // __APPLE__ #ifdef __LINUX__ - return std::shared_ptr(new LinuxEthernetTap(homePath,mac,mtu,metric,nwid,friendlyName,handler,arg)); + return std::shared_ptr(new LinuxEthernetTap(homePath,concurrency,pinning,mac,mtu,metric,nwid,friendlyName,handler,arg)); #endif // __LINUX__ #ifdef __WINDOWS__ @@ -126,15 +128,15 @@ std::shared_ptr EthernetTap::newInstance( _comInit = true; } } - return std::shared_ptr(new WindowsEthernetTap(homePath,concurrency,mac,mtu,metric,nwid,friendlyName,handler,arg)); + return std::shared_ptr(new WindowsEthernetTap(homePath,mac,mtu,metric,nwid,friendlyName,handler,arg)); #endif // __WINDOWS__ #ifdef __FreeBSD__ - return std::shared_ptr(new BSDEthernetTap(homePath,concurrency,mac,mtu,metric,nwid,friendlyName,handler,arg)); + return std::shared_ptr(new BSDEthernetTap(homePath,concurrency,pinning,mac,mtu,metric,nwid,friendlyName,handler,arg)); #endif // __FreeBSD__ #ifdef __NetBSD__ - return std::shared_ptr(new NetBSDEthernetTap(homePath,concurrency,mac,mtu,metric,nwid,friendlyName,handler,arg)); + return std::shared_ptr(new NetBSDEthernetTap(homePath,mac,mtu,metric,nwid,friendlyName,handler,arg)); #endif // __NetBSD__ #ifdef __OpenBSD__ diff --git a/osdep/EthernetTap.hpp b/osdep/EthernetTap.hpp index e6833c33d2..1d97f12563 100644 --- a/osdep/EthernetTap.hpp +++ b/osdep/EthernetTap.hpp @@ -32,6 +32,8 @@ class EthernetTap public: static std::shared_ptr newInstance( const char *tapDeviceType, // OS-specific, NULL for default + unsigned int concurrency, + bool pinning, const char *homePath, const MAC &mac, unsigned int mtu, diff --git a/osdep/LinuxEthernetTap.cpp b/osdep/LinuxEthernetTap.cpp index 7479ca7422..14929d1768 100644 --- a/osdep/LinuxEthernetTap.cpp +++ b/osdep/LinuxEthernetTap.cpp @@ -111,6 +111,8 @@ static void _base32_5_to_8(const uint8_t *in,char *out) LinuxEthernetTap::LinuxEthernetTap( const char *homePath, + unsigned int concurrency, + bool pinning, const MAC &mac, unsigned int mtu, unsigned int metric, @@ -220,36 +222,12 @@ LinuxEthernetTap::LinuxEthernetTap( (void)::pipe(_shutdownSignalPipe); - bool _enablePinning = false; - char* envvar = std::getenv("ZT_CORE_PINNING"); - if (envvar) { - int tmp = atoi(envvar); - if (tmp > 0) { - _enablePinning = true; - } - } - - int _concurrency = 1; - char* concurrencyVar = std::getenv("ZT_PACKET_PROCESSING_CONCURRENCY"); - if (concurrencyVar) { - int tmp = atoi(concurrencyVar); - if (tmp > 0) { - _concurrency = tmp; - } - else { - _concurrency = std::max((unsigned int)1,std::thread::hardware_concurrency() / 2); - } - } - else { - _concurrency = std::max((unsigned int)1,std::thread::hardware_concurrency() / 2); - } - - for (unsigned int i = 0; i < _concurrency; ++i) { - _rxThreads.push_back(std::thread([this, i, _concurrency, _enablePinning] { + for (unsigned int i = 0; i < concurrency; ++i) { + _rxThreads.push_back(std::thread([this, i, concurrency, pinning] { - if (_enablePinning) { - int pinCore = i % _concurrency; - fprintf(stderr, "pinning tap thread %d to core %d\n", i, pinCore); + if (pinning) { + int pinCore = i % concurrency; + fprintf(stderr, "Pinning tap thread %d to core %d\n", i, pinCore); pthread_t self = pthread_self(); cpu_set_t cpuset; CPU_ZERO(&cpuset); @@ -257,7 +235,7 @@ LinuxEthernetTap::LinuxEthernetTap( int rc = pthread_setaffinity_np(self, sizeof(cpu_set_t), &cpuset); if (rc != 0) { - fprintf(stderr, "failed to pin tap thread %d to core %d: %s\n", i, pinCore, strerror(errno)); + fprintf(stderr, "Failed to pin tap thread %d to core %d: %s\n", i, pinCore, strerror(errno)); exit(1); } } diff --git a/osdep/LinuxEthernetTap.hpp b/osdep/LinuxEthernetTap.hpp index 6406d88d8f..41e2998238 100644 --- a/osdep/LinuxEthernetTap.hpp +++ b/osdep/LinuxEthernetTap.hpp @@ -35,6 +35,8 @@ class LinuxEthernetTap : public EthernetTap public: LinuxEthernetTap( const char *homePath, + unsigned int concurrency, + bool pinning, const MAC &mac, unsigned int mtu, unsigned int metric, diff --git a/service/OneService.cpp b/service/OneService.cpp index 799f314122..acce0ec9f0 100644 --- a/service/OneService.cpp +++ b/service/OneService.cpp @@ -3239,6 +3239,8 @@ class OneServiceImpl : public OneService n.setTap(EthernetTap::newInstance( nullptr, + _concurrency, + _cpuPinningEnabled, _homePath.c_str(), MAC(nwc->mac), nwc->mtu, From b813ea70a5f9e7c2e91ec68461fdd132e8fc543a Mon Sep 17 00:00:00 2001 From: Joseph Henry Date: Thu, 22 Aug 2024 12:59:06 -0700 Subject: [PATCH 07/13] Simplify packet critical path. Plus more platform fixes --- node/IncomingPacket.cpp | 14 ++------------ node/Node.cpp | 3 +-- node/Node.hpp | 8 +------- node/PacketMultiplexer.cpp | 18 +++++++++++++++--- node/PacketMultiplexer.hpp | 1 + osdep/EthernetTap.cpp | 2 +- service/OneService.cpp | 12 ++++++++++-- 7 files changed, 31 insertions(+), 27 deletions(-) diff --git a/node/IncomingPacket.cpp b/node/IncomingPacket.cpp index 4d6d5ce8e5..318b057303 100644 --- a/node/IncomingPacket.cpp +++ b/node/IncomingPacket.cpp @@ -869,12 +869,7 @@ bool IncomingPacket::_doFRAME(const RuntimeEnvironment *RR,void *tPtr,const Shar const unsigned int frameLen = size() - ZT_PROTO_VERB_FRAME_IDX_PAYLOAD; const uint8_t *const frameData = reinterpret_cast(data()) + ZT_PROTO_VERB_FRAME_IDX_PAYLOAD; if (network->filterIncomingPacket(tPtr,peer,RR->identity.address(),sourceMac,network->mac(),frameData,frameLen,etherType,0) > 0) { - if (RR->node->getMultithreadingEnabled()) { - RR->pm->putFrame(tPtr,nwid,network->userPtr(),sourceMac,network->mac(),etherType,0,(const void *)frameData,frameLen, _flowId); - } - else { - RR->node->putFrame(tPtr,nwid,network->userPtr(),sourceMac,network->mac(),etherType,0,(const void *)frameData,frameLen); - } + RR->pm->putFrame(tPtr,nwid,network->userPtr(),sourceMac,network->mac(),etherType,0,(const void *)frameData,frameLen, _flowId); } } } else { @@ -947,12 +942,7 @@ bool IncomingPacket::_doEXT_FRAME(const RuntimeEnvironment *RR,void *tPtr,const } // fall through -- 2 means accept regardless of bridging checks or other restrictions case 2: - if (RR->node->getMultithreadingEnabled()) { - RR->pm->putFrame(tPtr,nwid,network->userPtr(),from,to,etherType,0,(const void *)frameData,frameLen, flowId); - } - else { - RR->node->putFrame(tPtr,nwid,network->userPtr(),from,to,etherType,0,(const void *)frameData,frameLen); - } + RR->pm->putFrame(tPtr,nwid,network->userPtr(),from,to,etherType,0,(const void *)frameData,frameLen, flowId); break; } } diff --git a/node/Node.cpp b/node/Node.cpp index c7cb37df57..1f377c5458 100644 --- a/node/Node.cpp +++ b/node/Node.cpp @@ -240,9 +240,8 @@ ZT_ResultCode Node::processVirtualNetworkFrame( } } -void Node::initMultithreading(bool isEnabled, unsigned int concurrency, bool cpuPinningEnabled) +void Node::initMultithreading(unsigned int concurrency, bool cpuPinningEnabled) { - _multithreadingEnabled = isEnabled; RR->pm->setUpPostDecodeReceiveThreads(concurrency, cpuPinningEnabled); } diff --git a/node/Node.hpp b/node/Node.hpp index da2d5427f0..f9d05483a9 100644 --- a/node/Node.hpp +++ b/node/Node.hpp @@ -283,12 +283,7 @@ class Node : public NetworkController::Sender return _lowBandwidthMode; } - inline bool getMultithreadingEnabled() - { - return _multithreadingEnabled; - } - - void initMultithreading(bool isEnabled, unsigned int concurrency, bool cpuPinningEnabled); + void initMultithreading(unsigned int concurrency, bool cpuPinningEnabled); public: @@ -339,7 +334,6 @@ class Node : public NetworkController::Sender volatile int64_t _prngState[2]; bool _online; bool _lowBandwidthMode; - bool _multithreadingEnabled; }; } // namespace ZeroTier diff --git a/node/PacketMultiplexer.cpp b/node/PacketMultiplexer.cpp index 401e1db9f9..9035ccc0c3 100644 --- a/node/PacketMultiplexer.cpp +++ b/node/PacketMultiplexer.cpp @@ -15,6 +15,7 @@ #include "Node.hpp" #include "RuntimeEnvironment.hpp" +#include "Constants.hpp" #include #include @@ -28,6 +29,16 @@ PacketMultiplexer::PacketMultiplexer(const RuntimeEnvironment* renv) void PacketMultiplexer::putFrame(void* tPtr, uint64_t nwid, void** nuptr, const MAC& source, const MAC& dest, unsigned int etherType, unsigned int vlanId, const void* data, unsigned int len, unsigned int flowId) { +#if defined(__APPLE__) || defined(__OpenBSD__) || defined(__NetBSD__) || defined(__WINDOWS__) + RR->node->putFrame(tPtr,nwid,nuptr,source,dest,etherType,vlanId,(const void *)data,len); + return; +#endif + + if (!_enabled) { + RR->node->putFrame(tPtr,nwid,nuptr,source,dest,etherType,vlanId,(const void *)data,len); + return; + } + PacketRecord* packet; _rxPacketVector_m.lock(); if (_rxPacketVector.empty()) { @@ -56,9 +67,10 @@ void PacketMultiplexer::putFrame(void* tPtr, uint64_t nwid, void** nuptr, const void PacketMultiplexer::setUpPostDecodeReceiveThreads(unsigned int concurrency, bool cpuPinningEnabled) { - if (! RR->node->getMultithreadingEnabled()) { - return; - } +#if defined(__APPLE__) || defined(__OpenBSD__) || defined(__NetBSD__) || defined(__WINDOWS__) + return; +#endif + _enabled = true; _concurrency = concurrency; bool _enablePinning = cpuPinningEnabled; diff --git a/node/PacketMultiplexer.hpp b/node/PacketMultiplexer.hpp index 8cd592a10c..4753180ed7 100644 --- a/node/PacketMultiplexer.hpp +++ b/node/PacketMultiplexer.hpp @@ -57,6 +57,7 @@ class PacketMultiplexer { std::vector _rxThreads; unsigned int _rxThreadCount; + bool _enabled; }; } // namespace ZeroTier diff --git a/osdep/EthernetTap.cpp b/osdep/EthernetTap.cpp index 4395bc4042..0be209ecd4 100644 --- a/osdep/EthernetTap.cpp +++ b/osdep/EthernetTap.cpp @@ -140,7 +140,7 @@ std::shared_ptr EthernetTap::newInstance( #endif // __NetBSD__ #ifdef __OpenBSD__ - return std::shared_ptr(new BSDEthernetTap(homePath,concurrency,mac,mtu,metric,nwid,friendlyName,handler,arg)); + return std::shared_ptr(new BSDEthernetTap(homePath,mac,mtu,metric,nwid,friendlyName,handler,arg)); #endif // __OpenBSD__ #endif // ZT_SDK? diff --git a/service/OneService.cpp b/service/OneService.cpp index acce0ec9f0..8d4b5bdfd3 100644 --- a/service/OneService.cpp +++ b/service/OneService.cpp @@ -991,7 +991,10 @@ class OneServiceImpl : public OneService void setUpMultithreading() { - _node->initMultithreading(true, _concurrency, _cpuPinningEnabled); +#if defined(__APPLE__) || defined(__OpenBSD__) || defined(__NetBSD__) || defined(__WINDOWS__) + return; +#endif + _node->initMultithreading(_concurrency, _cpuPinningEnabled); bool pinning = _cpuPinningEnabled; fprintf(stderr, "Starting %d RX threads\n", _concurrency); @@ -2648,6 +2651,7 @@ class OneServiceImpl : public OneService fprintf(stderr,"WARNING: using manually-specified secondary and/or tertiary ports. This can cause NAT issues." ZT_EOL_S); } _portMappingEnabled = OSUtils::jsonBool(settings["portMappingEnabled"],true); +#if defined(__LINUX__) || defined(__FreeBSD__) _multicoreEnabled = OSUtils::jsonBool(settings["multicoreEnabled"],false); _concurrency = OSUtils::jsonInt(settings["concurrency"],0); _cpuPinningEnabled = OSUtils::jsonBool(settings["cpuPinningEnabled"],false); @@ -2660,6 +2664,7 @@ class OneServiceImpl : public OneService } setUpMultithreading(); } +#endif #ifndef ZT_SDK const std::string up(OSUtils::jsonString(settings["softwareUpdate"],ZT_SOFTWARE_UPDATE_DEFAULT)); @@ -2987,7 +2992,7 @@ class OneServiceImpl : public OneService if ((len >= 16) && (reinterpret_cast(from)->ipScope() == InetAddress::IP_SCOPE_GLOBAL)) { _lastDirectReceiveFromGlobal = now; } - +#if defined(__LINUX__) || defined(__FreeBSD__) if (_multicoreEnabled) { PacketRecord* packet; _rxPacketVector_m.lock(); @@ -3008,6 +3013,7 @@ class OneServiceImpl : public OneService _rxPacketQueue.postLimit(packet, 256 * _concurrency); } else { +#endif const ZT_ResultCode rc = _node->processWirePacket(nullptr,now,reinterpret_cast(sock),reinterpret_cast(from),data,len,&_nextBackgroundTaskDeadline); if (ZT_ResultCode_isFatal(rc)) { char tmp[256]; @@ -3017,7 +3023,9 @@ class OneServiceImpl : public OneService _fatalErrorMessage = tmp; this->terminate(); } +#if defined(__LINUX__) || defined(__FreeBSD__) } +#endif } From c97943d69df74ca48feb5c44f570ac91817dd3c2 Mon Sep 17 00:00:00 2001 From: Joseph Henry Date: Thu, 22 Aug 2024 15:28:25 -0700 Subject: [PATCH 08/13] Add packet mux source file to Windows project --- windows/ZeroTierOne/ZeroTierOne.vcxproj | 1 + 1 file changed, 1 insertion(+) diff --git a/windows/ZeroTierOne/ZeroTierOne.vcxproj b/windows/ZeroTierOne/ZeroTierOne.vcxproj index fcd8b56e2b..2dd05aa577 100644 --- a/windows/ZeroTierOne/ZeroTierOne.vcxproj +++ b/windows/ZeroTierOne/ZeroTierOne.vcxproj @@ -88,6 +88,7 @@ + From 95983ba168963fb9b67346022cf2aef42bd35808 Mon Sep 17 00:00:00 2001 From: Joseph Henry Date: Mon, 26 Aug 2024 12:54:36 -0700 Subject: [PATCH 09/13] Build fix for FreeBSD --- osdep/BSDEthernetTap.cpp | 19 ++++++------------- osdep/BSDEthernetTap.hpp | 2 ++ 2 files changed, 8 insertions(+), 13 deletions(-) diff --git a/osdep/BSDEthernetTap.cpp b/osdep/BSDEthernetTap.cpp index 2a8aac0fbc..1a240c1a13 100644 --- a/osdep/BSDEthernetTap.cpp +++ b/osdep/BSDEthernetTap.cpp @@ -65,6 +65,7 @@ namespace ZeroTier { BSDEthernetTap::BSDEthernetTap( const char *homePath, unsigned int concurrency, + bool pinning, const MAC &mac, unsigned int mtu, unsigned int metric, @@ -74,6 +75,7 @@ BSDEthernetTap::BSDEthernetTap( void *arg) : _handler(handler), _concurrency(concurrency), + _pinning(pinning), _arg(arg), _nwid(nwid), _mtu(mtu), @@ -425,25 +427,16 @@ void BSDEthernetTap::setMtu(unsigned int mtu) void BSDEthernetTap::threadMain() throw() { - bool _enablePinning = false; - char* envvar = std::getenv("ZT_CPU_PINNING"); - if (envvar) { - int tmp = atoi(envvar); - if (tmp > 0) { - _enablePinning = true; - } - } - // Wait for a moment after startup -- wait for Network to finish // constructing itself. Thread::sleep(500); for (unsigned int i = 0; i < _concurrency; ++i) { - _rxThreads.push_back(std::thread([this, i, _enablePinning] { + _rxThreads.push_back(std::thread([this, i, _pinning] { - if (_enablePinning) { + if (_pinning) { int pinCore = i % _concurrency; - fprintf(stderr, "pinning thread %d to core %d\n", i, pinCore); + fprintf(stderr, "Pinning thread %d to core %d\n", i, pinCore); pthread_t self = pthread_self(); cpu_set_t cpuset; CPU_ZERO(&cpuset); @@ -452,7 +445,7 @@ void BSDEthernetTap::threadMain() int rc = pthread_setaffinity_np(self, sizeof(cpu_set_t), &cpuset); if (rc != 0) { - fprintf(stderr, "failed to pin thread %d to core %d: %s\n", i, pinCore, strerror(errno)); + fprintf(stderr, "Failed to pin thread %d to core %d: %s\n", i, pinCore, strerror(errno)); exit(1); } } diff --git a/osdep/BSDEthernetTap.hpp b/osdep/BSDEthernetTap.hpp index 4d1a42429c..50e2e6e8bc 100644 --- a/osdep/BSDEthernetTap.hpp +++ b/osdep/BSDEthernetTap.hpp @@ -36,6 +36,7 @@ class BSDEthernetTap : public EthernetTap BSDEthernetTap( const char *homePath, unsigned int concurrency, + bool pinning, const MAC &mac, unsigned int mtu, unsigned int metric, @@ -65,6 +66,7 @@ class BSDEthernetTap : public EthernetTap void (*_handler)(void *,void *,uint64_t,const MAC &,const MAC &,unsigned int,unsigned int,const void *,unsigned int); void *_arg; unsigned int _concurrency; + bool _pinning; uint64_t _nwid; Thread _thread; std::string _dev; From e42848d37dbb54c5bf8ac8474c0faed6d464bd00 Mon Sep 17 00:00:00 2001 From: Joseph Henry Date: Wed, 4 Sep 2024 11:00:23 -0700 Subject: [PATCH 10/13] Fix bug for when no multithreaded config is given --- service/OneService.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/service/OneService.cpp b/service/OneService.cpp index 8d4b5bdfd3..5a05c72cec 100644 --- a/service/OneService.cpp +++ b/service/OneService.cpp @@ -2653,7 +2653,7 @@ class OneServiceImpl : public OneService _portMappingEnabled = OSUtils::jsonBool(settings["portMappingEnabled"],true); #if defined(__LINUX__) || defined(__FreeBSD__) _multicoreEnabled = OSUtils::jsonBool(settings["multicoreEnabled"],false); - _concurrency = OSUtils::jsonInt(settings["concurrency"],0); + _concurrency = OSUtils::jsonInt(settings["concurrency"],1); _cpuPinningEnabled = OSUtils::jsonBool(settings["cpuPinningEnabled"],false); if (_multicoreEnabled) { unsigned int maxConcurrency = std::thread::hardware_concurrency(); From c86b91c5f00811c2a9f080e9e2e74ac2cb7af8c5 Mon Sep 17 00:00:00 2001 From: Joseph Henry Date: Wed, 4 Sep 2024 12:14:20 -0700 Subject: [PATCH 11/13] Fix condition where settings may be applied with multithreading is disabled --- service/OneService.cpp | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/service/OneService.cpp b/service/OneService.cpp index 5a05c72cec..ff79c6f855 100644 --- a/service/OneService.cpp +++ b/service/OneService.cpp @@ -2664,6 +2664,11 @@ class OneServiceImpl : public OneService } setUpMultithreading(); } + else { + // Force values in case the user accidentally defined them with multicore disabled + _concurrency = 1; + _cpuPinningEnabled = false; + } #endif #ifndef ZT_SDK From 059d05f41f8561aa5f92759b1cdef51723865291 Mon Sep 17 00:00:00 2001 From: Joseph Henry Date: Fri, 6 Sep 2024 09:52:39 -0700 Subject: [PATCH 12/13] Comment out vestigial test code (improves mt performance) --- node/PacketMultiplexer.cpp | 2 +- service/OneService.cpp | 16 ++++++++++------ 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/node/PacketMultiplexer.cpp b/node/PacketMultiplexer.cpp index 9035ccc0c3..a1dc835a10 100644 --- a/node/PacketMultiplexer.cpp +++ b/node/PacketMultiplexer.cpp @@ -62,7 +62,7 @@ void PacketMultiplexer::putFrame(void* tPtr, uint64_t nwid, void** nuptr, const memcpy(packet->data, data, len); int bucket = flowId % _concurrency; - _rxPacketQueues[bucket]->postLimit(packet, 256); + _rxPacketQueues[bucket]->postLimit(packet, 2048); } void PacketMultiplexer::setUpPostDecodeReceiveThreads(unsigned int concurrency, bool cpuPinningEnabled) diff --git a/service/OneService.cpp b/service/OneService.cpp index ff79c6f855..1544b81507 100644 --- a/service/OneService.cpp +++ b/service/OneService.cpp @@ -997,12 +997,13 @@ class OneServiceImpl : public OneService _node->initMultithreading(_concurrency, _cpuPinningEnabled); bool pinning = _cpuPinningEnabled; +/* fprintf(stderr, "Starting %d RX threads\n", _concurrency); for (unsigned int i = 0; i < _concurrency; ++i) { _rxPacketThreads.push_back(std::thread([this, i, pinning]() { if (pinning) { -#if defined(__LINUX__) || defined(__FreeBSD__) /* || defined(__APPLE__) */ +#if defined(__LINUX__) || defined(__FreeBSD__) // || defined(__APPLE__) int pinCore = i % _concurrency; fprintf(stderr, "CPU Pinning enabled. Pinning thread %d to core %d\n", i, pinCore); pthread_t self = pthread_self(); @@ -1015,7 +1016,7 @@ class OneServiceImpl : public OneService #elif __FreeBSD__ int rc = pthread_setaffinity_np(self, sizeof(cpu_set_t), &cpuset); #endif -#if defined(__LINUX__) || defined(__FreeBSD__) /* || defined(__APPLE__) */ +#if defined(__LINUX__) || defined(__FreeBSD__) // || defined(__APPLE__) if (rc != 0) { fprintf(stderr, "failed to pin rx thread %d to core %d: %s\n", i, pinCore, strerror(errno)); @@ -1048,6 +1049,7 @@ class OneServiceImpl : public OneService } })); } + */ } virtual ReasonForTermination run() @@ -2997,6 +2999,7 @@ class OneServiceImpl : public OneService if ((len >= 16) && (reinterpret_cast(from)->ipScope() == InetAddress::IP_SCOPE_GLOBAL)) { _lastDirectReceiveFromGlobal = now; } + /* #if defined(__LINUX__) || defined(__FreeBSD__) if (_multicoreEnabled) { PacketRecord* packet; @@ -3015,10 +3018,11 @@ class OneServiceImpl : public OneService memcpy(&(packet->from), from, sizeof(struct sockaddr_storage)); packet->size = (unsigned int)len; memcpy(packet->data, data, len); - _rxPacketQueue.postLimit(packet, 256 * _concurrency); + _rxPacketQueue.postLimit(packet, 2048 * _concurrency); } else { #endif +*/ const ZT_ResultCode rc = _node->processWirePacket(nullptr,now,reinterpret_cast(sock),reinterpret_cast(from),data,len,&_nextBackgroundTaskDeadline); if (ZT_ResultCode_isFatal(rc)) { char tmp[256]; @@ -3028,9 +3032,9 @@ class OneServiceImpl : public OneService _fatalErrorMessage = tmp; this->terminate(); } -#if defined(__LINUX__) || defined(__FreeBSD__) - } -#endif +//#if defined(__LINUX__) || defined(__FreeBSD__) +// } +//#endif } From 8f5cc4ed333905fd15b62418a64815fe8a3f4068 Mon Sep 17 00:00:00 2001 From: Joseph Henry Date: Fri, 6 Sep 2024 15:19:06 -0700 Subject: [PATCH 13/13] Completely remove vestigial RX code --- service/OneService.cpp | 97 ++++-------------------------------------- 1 file changed, 8 insertions(+), 89 deletions(-) diff --git a/service/OneService.cpp b/service/OneService.cpp index 1544b81507..b607f014ac 100644 --- a/service/OneService.cpp +++ b/service/OneService.cpp @@ -996,60 +996,6 @@ class OneServiceImpl : public OneService #endif _node->initMultithreading(_concurrency, _cpuPinningEnabled); bool pinning = _cpuPinningEnabled; - -/* - fprintf(stderr, "Starting %d RX threads\n", _concurrency); - for (unsigned int i = 0; i < _concurrency; ++i) { - _rxPacketThreads.push_back(std::thread([this, i, pinning]() { - - if (pinning) { -#if defined(__LINUX__) || defined(__FreeBSD__) // || defined(__APPLE__) - int pinCore = i % _concurrency; - fprintf(stderr, "CPU Pinning enabled. Pinning thread %d to core %d\n", i, pinCore); - pthread_t self = pthread_self(); - cpu_set_t cpuset; - CPU_ZERO(&cpuset); - CPU_SET(pinCore, &cpuset); -#endif -#ifdef __LINUX__ - int rc = pthread_setaffinity_np(self, sizeof(cpu_set_t), &cpuset); -#elif __FreeBSD__ - int rc = pthread_setaffinity_np(self, sizeof(cpu_set_t), &cpuset); -#endif -#if defined(__LINUX__) || defined(__FreeBSD__) // || defined(__APPLE__) - if (rc != 0) - { - fprintf(stderr, "failed to pin rx thread %d to core %d: %s\n", i, pinCore, strerror(errno)); - exit(1); - } -#endif - } - PacketRecord* packet = nullptr; - for (;;) { - if (! _rxPacketQueue.get(packet)) { - break; - } - if (! packet) { - break; - } - const ZT_ResultCode err = _node->processWirePacket(nullptr, packet->now, packet->sock, &(packet->from), packet->data, packet->size, &_nextBackgroundTaskDeadline); - { - Mutex::Lock l(_rxPacketVector_m); - _rxPacketVector.push_back(packet); - } - if (ZT_ResultCode_isFatal(err)) { - char tmp[256]; - OSUtils::ztsnprintf(tmp, sizeof(tmp), "error processing packet: %d", (int)err); - Mutex::Lock _l(_termReason_m); - _termReason = ONE_UNRECOVERABLE_ERROR; - _fatalErrorMessage = tmp; - this->terminate(); - break; - } - } - })); - } - */ } virtual ReasonForTermination run() @@ -2999,42 +2945,15 @@ class OneServiceImpl : public OneService if ((len >= 16) && (reinterpret_cast(from)->ipScope() == InetAddress::IP_SCOPE_GLOBAL)) { _lastDirectReceiveFromGlobal = now; } - /* -#if defined(__LINUX__) || defined(__FreeBSD__) - if (_multicoreEnabled) { - PacketRecord* packet; - _rxPacketVector_m.lock(); - if (_rxPacketVector.empty()) { - packet = new PacketRecord; - } - else { - packet = _rxPacketVector.back(); - _rxPacketVector.pop_back(); - } - _rxPacketVector_m.unlock(); - - packet->sock = reinterpret_cast(sock); - packet->now = now; - memcpy(&(packet->from), from, sizeof(struct sockaddr_storage)); - packet->size = (unsigned int)len; - memcpy(packet->data, data, len); - _rxPacketQueue.postLimit(packet, 2048 * _concurrency); + const ZT_ResultCode rc = _node->processWirePacket(nullptr,now,reinterpret_cast(sock),reinterpret_cast(from),data,len,&_nextBackgroundTaskDeadline); + if (ZT_ResultCode_isFatal(rc)) { + char tmp[256]; + OSUtils::ztsnprintf(tmp,sizeof(tmp),"fatal error code from processWirePacket: %d",(int)rc); + Mutex::Lock _l(_termReason_m); + _termReason = ONE_UNRECOVERABLE_ERROR; + _fatalErrorMessage = tmp; + this->terminate(); } - else { -#endif -*/ - const ZT_ResultCode rc = _node->processWirePacket(nullptr,now,reinterpret_cast(sock),reinterpret_cast(from),data,len,&_nextBackgroundTaskDeadline); - if (ZT_ResultCode_isFatal(rc)) { - char tmp[256]; - OSUtils::ztsnprintf(tmp,sizeof(tmp),"fatal error code from processWirePacket: %d",(int)rc); - Mutex::Lock _l(_termReason_m); - _termReason = ONE_UNRECOVERABLE_ERROR; - _fatalErrorMessage = tmp; - this->terminate(); - } -//#if defined(__LINUX__) || defined(__FreeBSD__) -// } -//#endif }