Skip to content

Commit

Permalink
Merge pull request #2238 from zerotier/jh-multipath-improvements
Browse files Browse the repository at this point in the history
Port multipath improvements to newer version
  • Loading branch information
joseph-henry authored Mar 5, 2024
2 parents 8b15fa2 + d658c11 commit 08d85d4
Show file tree
Hide file tree
Showing 9 changed files with 149 additions and 55 deletions.
8 changes: 8 additions & 0 deletions include/ZeroTierOne.h
Original file line number Diff line number Diff line change
Expand Up @@ -1347,8 +1347,16 @@ typedef struct
*/
char ifname[ZT_MAX_PHYSIFNAME];

/**
* Pointer to PhySocket object for this path
*/
uint64_t localSocket;

/**
* Local port corresponding to this path's localSocket
*/
uint16_t localPort;

/**
* Is path expired?
*/
Expand Down
78 changes: 51 additions & 27 deletions node/Bond.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@

#include "Switch.hpp"

#include <cinttypes> // for PRId64, etc. macros
#include <cmath>
#include <cstdio>
#include <string>
#include <cinttypes> // for PRId64, etc. macros

// FIXME: remove this suppression and actually fix warnings
#ifdef __GNUC__
Expand Down Expand Up @@ -108,7 +108,7 @@ bool Bond::setAllMtuByTuple(uint16_t mtu, const std::string& ifStr, const std::s
std::map<int64_t, SharedPtr<Bond> >::iterator bondItr = _bonds.begin();
bool found = false;
while (bondItr != _bonds.end()) {
if (bondItr->second->setMtuByTuple(mtu,ifStr,ipStr)) {
if (bondItr->second->setMtuByTuple(mtu, ifStr, ipStr)) {
found = true;
}
++bondItr;
Expand Down Expand Up @@ -154,11 +154,13 @@ SharedPtr<Bond> Bond::createBond(const RuntimeEnvironment* renv, const SharedPtr
bond = new Bond(renv, _bondPolicyTemplates[_defaultPolicyStr].ptr(), peer);
bond->debug("new default custom bond (based on %s)", bond->getPolicyStrByCode(bond->policy()).c_str());
}
} else {
}
else {
if (! _bondPolicyTemplates[_policyTemplateAssignments[identity]]) {
bond = new Bond(renv, _defaultPolicy, peer);
bond->debug("peer-specific bond, was specified as %s but the bond definition was not found, using default %s", _policyTemplateAssignments[identity].c_str(), getPolicyStrByCode(_defaultPolicy).c_str());
} else {
}
else {
bond = new Bond(renv, _bondPolicyTemplates[_policyTemplateAssignments[identity]].ptr(), peer);
bond->debug("new default bond");
}
Expand Down Expand Up @@ -227,10 +229,12 @@ SharedPtr<Link> Bond::getLinkBySocket(const std::string& policyAlias, uint64_t l
SharedPtr<Link> s = new Link(ifnameStr, 0, 0, 0, true, ZT_BOND_SLAVE_MODE_PRIMARY, "");
_interfaceToLinkMap[policyAlias].insert(std::pair<std::string, SharedPtr<Link> >(ifnameStr, s));
return s;
} else {
}
else {
return SharedPtr<Link>();
}
} else {
}
else {
return search->second;
}
}
Expand Down Expand Up @@ -340,6 +344,7 @@ void Bond::nominatePathToBond(const SharedPtr<Path>& path, int64_t now)
_paths[i].ipvPref = sl->ipvPref();
_paths[i].mode = sl->mode();
_paths[i].enabled = sl->enabled();
_paths[i].localPort = _phy->getLocalPort((PhySocket*)((uintptr_t)path->localSocket()));
_paths[i].onlyPathOnLink = ! bFoundCommonLink;
}
}
Expand Down Expand Up @@ -397,7 +402,8 @@ SharedPtr<Path> Bond::getAppropriatePath(int64_t now, int32_t flowId)
_rrPacketsSentOnCurrLink = 0;
if (_numBondedPaths == 1 || _rrIdx >= (ZT_MAX_PEER_NETWORK_PATHS - 1)) {
_rrIdx = 0;
} else {
}
else {
int _tempIdx = _rrIdx;
for (int searchCount = 0; searchCount < (_numBondedPaths - 1); searchCount++) {
_tempIdx = (_tempIdx == (_numBondedPaths - 1)) ? 0 : _tempIdx + 1;
Expand Down Expand Up @@ -427,7 +433,8 @@ SharedPtr<Path> Bond::getAppropriatePath(int64_t now, int32_t flowId)
if (likely(it != _flows.end())) {
it->second->lastActivity = now;
return _paths[it->second->assignedPath].p;
} else {
}
else {
unsigned char entropy;
Utils::getSecureRandom(&entropy, 1);
SharedPtr<Flow> flow = createFlow(ZT_MAX_PEER_NETWORK_PATHS, flowId, entropy, now);
Expand Down Expand Up @@ -505,7 +512,8 @@ void Bond::recordIncomingPacket(const SharedPtr<Path>& path, uint64_t packetId,
_paths[pathIdx].qosStatsIn[packetId] = now;
++(_paths[pathIdx].packetsReceivedSinceLastQoS);
//_paths[pathIdx].packetValiditySamples.push(true);
} else {
}
else {
// debug("QoS buffer full, will not record information");
}
/*
Expand All @@ -532,7 +540,8 @@ void Bond::recordIncomingPacket(const SharedPtr<Path>& path, uint64_t packetId,
SharedPtr<Flow> flow;
if (! _flows.count(flowId)) {
flow = createFlow(pathIdx, flowId, 0, now);
} else {
}
else {
flow = _flows[flowId];
}
if (flow) {
Expand Down Expand Up @@ -618,7 +627,8 @@ bool Bond::assignFlowToBondedPath(SharedPtr<Flow>& flow, int64_t now, bool reass

if (reassign) {
log("attempting to re-assign out-flow %04x previously on idx %d (%u / %zu flows)", flow->id, flow->assignedPath, _paths[_realIdxMap[flow->assignedPath]].assignedFlowCount, _flows.size());
} else {
}
else {
debug("attempting to assign flow for the first time");
}

Expand All @@ -632,7 +642,8 @@ bool Bond::assignFlowToBondedPath(SharedPtr<Flow>& flow, int64_t now, bool reass

if (reassign) {
bondedIdx = (flow->assignedPath + offset) % (_numBondedPaths);
} else {
}
else {
bondedIdx = abs((int)((entropy + offset) % (_numBondedPaths)));
}
// debug("idx=%d, offset=%d, randomCap=%f, actualCap=%f", bondedIdx, offset, randomLinkCapacity, _paths[_realIdxMap[bondedIdx]].relativeLinkCapacity);
Expand All @@ -655,7 +666,8 @@ bool Bond::assignFlowToBondedPath(SharedPtr<Flow>& flow, int64_t now, bool reass
flow->assignPath(_realIdxMap[bondedIdx], now);
++(_paths[_realIdxMap[bondedIdx]].assignedFlowCount);
// debug(" ABLE to find optimal link %f idx %d", _paths[_realIdxMap[bondedIdx]].relativeQuality, bondedIdx);
} else {
}
else {
// We were (unable) to find a path that didn't violate at least one quality requirement, will choose next best option
flow->assignPath(_realIdxMap[nextBestQualIdx], now);
++(_paths[_realIdxMap[nextBestQualIdx]].assignedFlowCount);
Expand Down Expand Up @@ -715,11 +727,13 @@ void Bond::forgetFlowsWhenNecessary(uint64_t age, bool oldest, int64_t now)
debug("forget flow %04x (age %" PRId64 ") (%u / %zu)", it->first, it->second->age(now), _paths[it->second->assignedPath].assignedFlowCount, (_flows.size() - 1));
_paths[it->second->assignedPath].assignedFlowCount--;
it = _flows.erase(it);
} else {
}
else {
++it;
}
}
} else if (oldest) { // Remove single oldest by natural expiration
}
else if (oldest) { // Remove single oldest by natural expiration
uint64_t maxAge = 0;
while (it != _flows.end()) {
if (it->second->age(now) > maxAge) {
Expand Down Expand Up @@ -766,7 +780,8 @@ void Bond::processIncomingPathNegotiationRequest(uint64_t now, SharedPtr<Path>&
if (_peer->_id.address().toInt() > RR->node->identity().address().toInt()) {
debug("agree with peer to use alternate link %s/%s\n", link->ifname().c_str(), pathStr);
_negotiatedPathIdx = pathIdx;
} else {
}
else {
debug("ignore petition from peer to use alternate link %s/%s\n", link->ifname().c_str(), pathStr);
}
}
Expand Down Expand Up @@ -881,7 +896,8 @@ void Bond::sendQOS_MEASUREMENT(void* tPtr, int pathIdx, int64_t localSocket, con
if (atAddress) {
outp.armor(_peer->key(), false, _peer->aesKeysIfSupported());
RR->node->putPacket(tPtr, localSocket, atAddress, outp.data(), outp.size());
} else {
}
else {
RR->sw->send(tPtr, outp, false);
}
Metrics::pkt_qos_out++;
Expand Down Expand Up @@ -1222,7 +1238,8 @@ void Bond::estimatePathQuality(int64_t now)
if ((now - it->second) >= qosRecordTimeout) {
it = _paths[i].qosStatsOut.erase(it);
++numDroppedQosOutRecords;
} else {
}
else {
++it;
}
}
Expand Down Expand Up @@ -1250,7 +1267,8 @@ void Bond::estimatePathQuality(int64_t now)
if ((now - it->second) >= qosRecordTimeout) {
it = _paths[i].qosStatsIn.erase(it);
++numDroppedQosInRecords;
} else {
}
else {
++it;
}
}
Expand Down Expand Up @@ -1327,10 +1345,10 @@ void Bond::estimatePathQuality(int64_t now)
continue;
}
// Compute/Smooth average of real-world observations
if (_paths[i].latencySamples.count() == ZT_QOS_SHORTTERM_SAMPLE_WIN_SIZE) {
if (_paths[i].latencySamples.count() >= ZT_QOS_SHORTTERM_SAMPLE_WIN_MIN_REQ_SIZE) {
_paths[i].latency = _paths[i].latencySamples.mean();
}
if (_paths[i].latencySamples.count() == ZT_QOS_SHORTTERM_SAMPLE_WIN_SIZE) {
if (_paths[i].latencySamples.count() >= ZT_QOS_SHORTTERM_SAMPLE_WIN_MIN_REQ_SIZE) {
_paths[i].latencyVariance = _paths[i].latencySamples.stddev();
}

Expand All @@ -1344,6 +1362,7 @@ void Bond::estimatePathQuality(int64_t now)
//_paths[i].packetErrorRatio = 1.0 - (_paths[i].packetValiditySamples.count() ? _paths[i].packetValiditySamples.mean() : 1.0);
// _valid is written elsewhere
_paths[i].p->_relativeQuality = _paths[i].relativeQuality;
_paths[i].p->_localPort = _paths[i].localPort;
}

// Flag links for avoidance
Expand All @@ -1370,7 +1389,8 @@ void Bond::estimatePathQuality(int64_t now)
shouldAvoid = true;
}
_paths[i].shouldAvoid = shouldAvoid;
} else {
}
else {
if (! shouldAvoid) {
log("no longer avoiding link %s", pathToStr(_paths[i].p).c_str());
_paths[i].shouldAvoid = false;
Expand Down Expand Up @@ -1482,7 +1502,8 @@ void Bond::processActiveBackupTasks(void* tPtr, int64_t now)
_lastBondStatusLog = now;
if (_abPathIdx == ZT_MAX_PEER_NETWORK_PATHS) {
log("no active link");
} else if (_paths[_abPathIdx].p) {
}
else if (_paths[_abPathIdx].p) {
log("active link is %s, failover queue size is %zu", pathToStr(_paths[_abPathIdx].p).c_str(), _abFailoverQueue.size());
}
if (_abFailoverQueue.empty()) {
Expand Down Expand Up @@ -1590,7 +1611,8 @@ void Bond::processActiveBackupTasks(void* tPtr, int64_t now)
log("link %s is ineligible, removing from failover queue (%zu links remain in queue)", pathToStr(_paths[_abPathIdx].p).c_str(), _abFailoverQueue.size());
}
continue;
} else {
}
else {
++it;
}
}
Expand Down Expand Up @@ -1739,7 +1761,8 @@ void Bond::processActiveBackupTasks(void* tPtr, int64_t now)
if (! _abFailoverQueue.empty()) {
dequeueNextActiveBackupPath(now);
log("active link switched to %s", pathToStr(_paths[_abPathIdx].p).c_str());
} else {
}
else {
log("failover queue is empty, no links to choose from");
}
}
Expand Down Expand Up @@ -1785,7 +1808,8 @@ void Bond::processActiveBackupTasks(void* tPtr, int64_t now)
dequeueNextActiveBackupPath(now);
_lastPathNegotiationCheck = now;
log("switch negotiated link %s (select mode: optimize)", pathToStr(_paths[_abPathIdx].p).c_str());
} else {
}
else {
// Try to find a better path and automatically switch to it -- not too often, though.
if ((now - _lastActiveBackupPathChange) > ZT_BOND_OPTIMIZE_INTERVAL) {
if (! _abFailoverQueue.empty()) {
Expand Down Expand Up @@ -1901,7 +1925,7 @@ void Bond::setBondParameters(int policy, SharedPtr<Bond> templateBond, bool useT
}

if (! _isLeaf) {
_policy = ZT_BOND_POLICY_ACTIVE_BACKUP;
_policy = ZT_BOND_POLICY_NONE;
}

// Timer geometry
Expand Down
23 changes: 15 additions & 8 deletions node/Bond.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,6 @@ class Peer;

class Bond {
public:

/**
* Stop bond's internal functions (can be resumed)
*/
Expand Down Expand Up @@ -909,7 +908,8 @@ class Bond {
_lastAckRateCheck = now;
if (_ackCutoffCount > numToDrain) {
_ackCutoffCount -= numToDrain;
} else {
}
else {
_ackCutoffCount = 0;
}
return (_ackCutoffCount < ZT_ACK_CUTOFF_LIMIT);
Expand All @@ -928,7 +928,8 @@ class Bond {
uint64_t diff = now - _lastQoSRateCheck;
if ((diff) <= (_qosSendInterval / ZT_MAX_PEER_NETWORK_PATHS)) {
++_qosCutoffCount;
} else {
}
else {
_qosCutoffCount = 0;
}
_lastQoSRateCheck = now;
Expand All @@ -948,7 +949,8 @@ class Bond {
int diff = now - _lastPathNegotiationReceived;
if ((diff) <= (ZT_PATH_NEGOTIATION_CUTOFF_TIME / ZT_MAX_PEER_NETWORK_PATHS)) {
++_pathNegotiationCutoffCount;
} else {
}
else {
_pathNegotiationCutoffCount = 0;
}
_lastPathNegotiationReceived = now;
Expand Down Expand Up @@ -1230,6 +1232,7 @@ class Bond {
, packetsReceivedSinceLastQoS(0)
, packetsIn(0)
, packetsOut(0)
, localPort(0)
{
}

Expand All @@ -1245,17 +1248,20 @@ class Bond {
unsigned int suggestedRefractoryPeriod = refractoryPeriod ? punishment + (refractoryPeriod * 2) : punishment;
refractoryPeriod = std::min(suggestedRefractoryPeriod, (unsigned int)ZT_BOND_MAX_REFRACTORY_PERIOD);
lastRefractoryUpdate = 0;
} else {
}
else {
uint32_t drainRefractory = 0;
if (lastRefractoryUpdate) {
drainRefractory = (now - lastRefractoryUpdate);
} else {
}
else {
drainRefractory = (now - lastAliveToggle);
}
lastRefractoryUpdate = now;
if (refractoryPeriod > drainRefractory) {
refractoryPeriod -= drainRefractory;
} else {
}
else {
refractoryPeriod = 0;
lastRefractoryUpdate = 0;
}
Expand Down Expand Up @@ -1292,7 +1298,6 @@ class Bond {
*/
inline bool needsToSendQoS(int64_t now, uint64_t qosSendInterval)
{
// fprintf(stderr, "QOS table (%d / %d)\n", packetsReceivedSinceLastQoS, ZT_QOS_TABLE_SIZE);
return ((packetsReceivedSinceLastQoS >= ZT_QOS_TABLE_SIZE) || ((now - lastQoSMeasurement) > qosSendInterval)) && packetsReceivedSinceLastQoS;
}

Expand Down Expand Up @@ -1364,6 +1369,8 @@ class Bond {
int packetsIn;
int packetsOut;

uint16_t localPort;

// AtomicCounter __refCount;

SharedPtr<Path> p;
Expand Down
5 changes: 5 additions & 0 deletions node/Constants.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,11 @@
*/
#define ZT_QOS_SHORTTERM_SAMPLE_WIN_SIZE 64

/**
* Number of samples required before statistics summaries are computed
*/
#define ZT_QOS_SHORTTERM_SAMPLE_WIN_MIN_REQ_SIZE 4

/**
* Max allowable time spent in any queue (in ms)
*/
Expand Down
1 change: 1 addition & 0 deletions node/Node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,7 @@ ZT_PeerList *Node::peers() const
if((*path)->valid()) {
memcpy(&(p->paths[p->pathCount].address),&((*path)->address()),sizeof(struct sockaddr_storage));
p->paths[p->pathCount].localSocket = (*path)->localSocket();
p->paths[p->pathCount].localPort = (*path)->localPort();
p->paths[p->pathCount].lastSend = (*path)->lastOut();
p->paths[p->pathCount].lastReceive = (*path)->lastIn();
p->paths[p->pathCount].trustedPathId = RR->topology->getOutboundPathTrust((*path)->address());
Expand Down
Loading

0 comments on commit 08d85d4

Please sign in to comment.