Skip to content

Commit

Permalink
[core] Fixed: attempt to send to a group in connection-pending state …
Browse files Browse the repository at this point in the history
…(nonblocking) reported wrong 'connection lost' error
  • Loading branch information
Mikolaj Malecki authored and maxsharabayko committed Dec 12, 2024
1 parent cdbe5f5 commit f109fb1
Showing 1 changed file with 63 additions and 8 deletions.
71 changes: 63 additions & 8 deletions srtcore/group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,7 @@ void CUDTGroup::deriveSettings(CUDT* u)
#undef IMF
}

// XXX This function is likely of no use now.
bool CUDTGroup::applyFlags(uint32_t flags, HandshakeSide)
{
const bool synconmsg = IsSet(flags, SRT_GFLAG_SYNCONMSG);
Expand Down Expand Up @@ -1418,9 +1419,28 @@ int CUDTGroup::sendBroadcast(const char* buf, int len, SRT_MSGCTRL& w_mc)

// { send_CheckBrokenSockets()

if (!pendingSockets.empty())
// Make an extra loop check to see if we could be
// in a condition of "all sockets either blocked or pending"

int nsuccessful = 0; // number of successfully connected sockets
int nblocked = 0; // number of sockets blocked in connection
bool is_pending_blocked = false;
for (vector<Sendstate>::iterator is = sendstates.begin(); is != sendstates.end(); ++is)
{
if (is->stat != -1)
{
nsuccessful++;
}
// is->stat == -1
else if (is->code == SRT_EASYNCSND)
{
++nblocked;
}
}

if (!pendingSockets.empty() || nblocked)
{
HLOGC(gslog.Debug, log << "grp/sendBroadcast: found pending sockets, polling them.");
HLOGC(gslog.Debug, log << "grp/sendBroadcast: found pending sockets (blocked: " << nblocked << "), polling them.");

// These sockets if they are in pending state, they should be added to m_SndEID
// at the connecting stage.
Expand All @@ -1435,12 +1455,24 @@ int CUDTGroup::sendBroadcast(const char* buf, int len, SRT_MSGCTRL& w_mc)
}
else
{
int swait_timeout = 0;

// There's also a hidden condition here that is the upper if condition.
is_pending_blocked = (nsuccessful == 0);

// If this is the case when
if (m_bSynSending && is_pending_blocked)
{
HLOGC(gslog.Debug, log << "grp/sendBroadcast: will block for " << m_iSndTimeOut << " - waiting for any writable in blocking mode");
swait_timeout = m_iSndTimeOut;
}

{
InvertedLock ug(m_GroupLock);

THREAD_PAUSED();
m_Global.m_EPoll.swait(
*m_SndEpolld, sready, 0, false /*report by retval*/); // Just check if anything happened
*m_SndEpolld, (sready), swait_timeout, false /*report by retval*/); // Just check if anything happened
THREAD_RESUMED();
}

Expand All @@ -1453,6 +1485,10 @@ int CUDTGroup::sendBroadcast(const char* buf, int len, SRT_MSGCTRL& w_mc)
HLOGC(gslog.Debug, log << "grp/sendBroadcast: RDY: " << DisplayEpollResults(sready));

// sockets in EX: should be moved to wipeme.
// IMPORTANT: we check only PENDING sockets (not blocked) because only
// pending sockets might report ERR epoll without being explicitly broken.
// Sockets that did connect and just have buffer full will be always broken,
// if they're going to report ERR in epoll.
for (vector<SRTSOCKET>::iterator i = pendingSockets.begin(); i != pendingSockets.end(); ++i)
{
if (CEPoll::isready(sready, *i, SRT_EPOLL_ERR))
Expand All @@ -1464,6 +1500,9 @@ int CUDTGroup::sendBroadcast(const char* buf, int len, SRT_MSGCTRL& w_mc)
int no_events = 0;
m_Global.m_EPoll.update_usock(m_SndEID, *i, &no_events);
}

if (CEPoll::isready(sready, *i, SRT_EPOLL_OUT))
is_pending_blocked = false;
}

// After that, all sockets that have been reported
Expand All @@ -1480,7 +1519,10 @@ int CUDTGroup::sendBroadcast(const char* buf, int len, SRT_MSGCTRL& w_mc)
if (m_bClosing)
throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0);

send_CloseBrokenSockets(wipeme);
// Just for a case, when a socket that was blocked or pending
// had switched to write-enabled,

send_CloseBrokenSockets((wipeme)); // wipeme will be cleared by this function

// Re-check after the waiting lock has been reacquired
if (m_bClosing)
Expand Down Expand Up @@ -1742,9 +1784,18 @@ int CUDTGroup::sendBroadcast(const char* buf, int len, SRT_MSGCTRL& w_mc)

if (none_succeeded)
{
HLOGC(gslog.Debug, log << "grp/sendBroadcast: all links broken (none succeeded to send a payload)");
m_Global.m_EPoll.update_events(id(), m_sPollID, SRT_EPOLL_OUT, false);
m_Global.m_EPoll.update_events(id(), m_sPollID, SRT_EPOLL_ERR, true);
if (!m_bSynSending && (is_pending_blocked || was_blocked))
{
HLOGC(gslog.Debug, log << "grp/sendBroadcast: no links are ready for sending");
ercode = SRT_EASYNCSND;
}
else
{
HLOGC(gslog.Debug, log << "grp/sendBroadcast: all links broken (none succeeded to send a payload)");
m_Global.m_EPoll.update_events(id(), m_sPollID, SRT_EPOLL_ERR, true);
}

// Reparse error code, if set.
// It might be set, if the last operation was failed.
// If any operation succeeded, this will not be executed anyway.
Expand Down Expand Up @@ -3398,11 +3449,13 @@ void CUDTGroup::sendBackup_RetryWaitBlocked(SendBackupCtx& w_sendBackupCtx
// Note: A link is added in unstableLinks if sending has failed with SRT_ESYNCSND.
const unsigned num_unstable = w_sendBackupCtx.countMembersByState(BKUPST_ACTIVE_UNSTABLE);
const unsigned num_wary = w_sendBackupCtx.countMembersByState(BKUPST_ACTIVE_UNSTABLE_WARY);
if ((num_unstable + num_wary == 0) || !w_none_succeeded)
const unsigned num_pending = w_sendBackupCtx.countMembersByState(BKUPST_PENDING);
if ((num_unstable + num_wary + num_pending == 0) || !w_none_succeeded)
return;

HLOGC(gslog.Debug, log << "grp/sendBackup: no successfull sending: "
<< (num_unstable + num_wary) << " unstable links - waiting to retry sending...");
<< (num_unstable + num_wary) << " unstable links, "
<< num_pending << " pending - waiting to retry sending...");

// Note: GroupLock is set already, skip locks and checks
getGroupData_LOCKED((w_mc.grpdata), (&w_mc.grpdata_size));
Expand Down Expand Up @@ -3658,6 +3711,7 @@ int CUDTGroup::sendBackup(const char* buf, int len, SRT_MSGCTRL& w_mc)
{
if (len <= 0)
{
LOGC(gslog.Error, log << "grp/send(backup): negative length: " << len);
throw CUDTException(MJ_NOTSUP, MN_INVAL, 0);
}

Expand All @@ -3677,6 +3731,7 @@ int CUDTGroup::sendBackup(const char* buf, int len, SRT_MSGCTRL& w_mc)
if (m_bClosing)
{
leaveCS(m_Global.m_GlobControlLock);
LOGC(gslog.Error, log << "grp/send(backup): Cannot send, connection lost!");
throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0);
}

Expand Down

0 comments on commit f109fb1

Please sign in to comment.