Skip to content

Commit

Permalink
Merge pull request paullouisageneau#7 from paullouisageneau/back-pres…
Browse files Browse the repository at this point in the history
…sure

Back-pressure
  • Loading branch information
paullouisageneau authored Dec 6, 2019
2 parents 84219d3 + 08931de commit 04df12b
Show file tree
Hide file tree
Showing 12 changed files with 315 additions and 140 deletions.
21 changes: 15 additions & 6 deletions include/rtc/channel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "include.hpp"

#include <functional>
#include <mutex>
#include <variant>

namespace rtc {
Expand All @@ -30,30 +31,38 @@ class Channel {
public:
virtual void close(void) = 0;
virtual void send(const std::variant<binary, string> &data) = 0;

virtual std::optional<std::variant<binary, string>> receive() = 0;
virtual bool isOpen(void) const = 0;
virtual bool isClosed(void) const = 0;

void onOpen(std::function<void()> callback);
void onClosed(std::function<void()> callback);
void onError(std::function<void(const string &error)> callback);

void onMessage(std::function<void(const std::variant<binary, string> &data)> callback);
void onMessage(std::function<void(const binary &data)> binaryCallback,
std::function<void(const string &data)> stringCallback);

void onAvailable(std::function<void()> callback);
void onSent(std::function<void()> callback);

protected:
virtual void triggerOpen(void);
virtual void triggerClosed(void);
virtual void triggerError(const string &error);
virtual void triggerMessage(const std::variant<binary, string> &data);
virtual void triggerAvailable(size_t available);
virtual void triggerSent();

private:
std::function<void()> mOpenCallback;
std::function<void()> mClosedCallback;
std::function<void(const string &)> mErrorCallback;
std::function<void(const std::variant<binary, string> &)> mMessageCallback;
synchronized_callback<> mOpenCallback;
synchronized_callback<> mClosedCallback;
synchronized_callback<const string &> mErrorCallback;
synchronized_callback<const std::variant<binary, string> &> mMessageCallback;
synchronized_callback<> mAvailableCallback;
synchronized_callback<> mSentCallback;
};

} // namespace rtc

#endif // RTC_CHANNEL_H

23 changes: 18 additions & 5 deletions include/rtc/datachannel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
#include "channel.hpp"
#include "include.hpp"
#include "message.hpp"
#include "queue.hpp"
#include "reliability.hpp"

#include <atomic>
#include <chrono>
#include <functional>
#include <variant>
Expand All @@ -35,13 +37,19 @@ class PeerConnection;

class DataChannel : public Channel {
public:
DataChannel(unsigned int stream_, string label_, string protocol_, Reliability reliability_);
DataChannel(unsigned int stream, std::shared_ptr<SctpTransport> sctpTransport);
DataChannel(std::shared_ptr<PeerConnection> pc, unsigned int stream, string label,
string protocol, Reliability reliability);
DataChannel(std::shared_ptr<PeerConnection> pc, std::shared_ptr<SctpTransport> transport,
unsigned int stream);
~DataChannel();

void close(void);
void send(const std::variant<binary, string> &data);
void send(const byte *data, size_t size);
std::optional<std::variant<binary, string>> receive();

size_t available() const;
size_t availableSize() const;

unsigned int stream() const;
string label() const;
Expand All @@ -56,14 +64,19 @@ class DataChannel : public Channel {
void incoming(message_ptr message);
void processOpenMessage(message_ptr message);

unsigned int mStream;
const std::shared_ptr<PeerConnection> mPeerConnection; // keeps the PeerConnection alive
std::shared_ptr<SctpTransport> mSctpTransport;

unsigned int mStream;
string mLabel;
string mProtocol;
std::shared_ptr<Reliability> mReliability;

bool mIsOpen = false;
bool mIsClosed = false;
std::atomic<bool> mIsOpen = false;
std::atomic<bool> mIsClosed = false;

Queue<message_ptr> mRecvQueue;
std::atomic<size_t> mRecvSize = 0;

friend class PeerConnection;
};
Expand Down
30 changes: 30 additions & 0 deletions include/rtc/include.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
#define RTC_INCLUDE_H

#include <cstddef>
#include <functional>
#include <memory>
#include <mutex>
#include <optional>
#include <string>
#include <vector>
Expand All @@ -33,6 +35,7 @@ using binary = std::vector<byte>;

using std::nullopt;

using std::size_t;
using std::uint16_t;
using std::uint32_t;
using std::uint64_t;
Expand All @@ -41,10 +44,37 @@ using std::uint8_t;
const size_t MAX_NUMERICNODE_LEN = 48; // Max IPv6 string representation length
const size_t MAX_NUMERICSERV_LEN = 6; // Max port string representation length

const size_t RECV_QUEUE_SIZE = 256; // DataChannel receive queue size in messages
// (0 means unlimited)

const uint16_t DEFAULT_SCTP_PORT = 5000; // SCTP port to use by default

template <class... Ts> struct overloaded : Ts... { using Ts::operator()...; };
template <class... Ts> overloaded(Ts...)->overloaded<Ts...>;

template <typename... P> class synchronized_callback {
public:
synchronized_callback() = default;
~synchronized_callback() { *this = nullptr; }

synchronized_callback &operator=(std::function<void(P...)> func) {
std::lock_guard<std::recursive_mutex> lock(mutex);
callback = func;
return *this;
}

void operator()(P... args) const {
std::lock_guard<std::recursive_mutex> lock(mutex);
if (callback)
callback(args...);
}

operator bool() const { return callback ? true : false; }

private:
std::function<void(P...)> callback;
mutable std::recursive_mutex mutex;
};
}

#endif
13 changes: 6 additions & 7 deletions include/rtc/peerconnection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ class PeerConnection : public std::enable_shared_from_this<PeerConnection> {

bool checkFingerprint(std::weak_ptr<PeerConnection> weak_this, const std::string &fingerprint) const;
void forwardMessage(std::weak_ptr<PeerConnection> weak_this, message_ptr message);
void forwardSent(std::weak_ptr<PeerConnection> weak_this, uint16_t stream);
void iterateDataChannels(std::function<void(std::shared_ptr<DataChannel> channel)> func);
void openDataChannels();
void closeDataChannels();
Expand All @@ -114,13 +115,11 @@ class PeerConnection : public std::enable_shared_from_this<PeerConnection> {
std::atomic<State> mState;
std::atomic<GatheringState> mGatheringState;

std::list<std::thread> mResolveThreads;

std::function<void(std::shared_ptr<DataChannel> dataChannel)> mDataChannelCallback;
std::function<void(const Description &description)> mLocalDescriptionCallback;
std::function<void(const Candidate &candidate)> mLocalCandidateCallback;
std::function<void(State state)> mStateChangeCallback;
std::function<void(GatheringState state)> mGatheringStateChangeCallback;
synchronized_callback<std::shared_ptr<DataChannel>> mDataChannelCallback;
synchronized_callback<const Description &> mLocalDescriptionCallback;
synchronized_callback<const Candidate &> mLocalCandidateCallback;
synchronized_callback<State> mStateChangeCallback;
synchronized_callback<GatheringState> mGatheringStateChangeCallback;
};

} // namespace rtc
Expand Down
60 changes: 39 additions & 21 deletions src/queue.hpp → include/rtc/queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,70 +32,88 @@ namespace rtc {

template <typename T> class Queue {
public:
Queue();
Queue(std::size_t limit = 0);
~Queue();

void stop();
bool empty() const;
size_t size() const;
void push(const T &element);
std::optional<T> pop();
std::optional<T> tryPop();
void wait();
void wait(const std::chrono::milliseconds &duration);

private:
const size_t mLimit;
std::queue<T> mQueue;
std::condition_variable mCondition;
std::atomic<bool> mStopping;
std::condition_variable mPopCondition, mPushCondition;
bool mStopping = false;

mutable std::mutex mMutex;
};

template <typename T> Queue<T>::Queue() : mStopping(false) {}
template <typename T> Queue<T>::Queue(size_t limit) : mLimit(limit) {}

template <typename T> Queue<T>::~Queue() { stop(); }

template <typename T> void Queue<T>::stop() {
std::lock_guard<std::mutex> lock(mMutex);
mStopping = true;
mCondition.notify_all();
mPopCondition.notify_all();
mPushCondition.notify_all();
}

template <typename T> bool Queue<T>::empty() const {
std::lock_guard<std::mutex> lock(mMutex);
return mQueue.empty();
}

template <typename T> void Queue<T>::push(const T &element) {
template <typename T> size_t Queue<T>::size() const {
std::lock_guard<std::mutex> lock(mMutex);
if (mStopping)
return;
mQueue.push(element);
mCondition.notify_one();
return mQueue.size();
}

template <typename T> void Queue<T>::push(const T &element) {
std::unique_lock<std::mutex> lock(mMutex);
mPushCondition.wait(lock, [this]() { return !mLimit || mQueue.size() < mLimit || mStopping; });
if (!mStopping) {
mQueue.push(element);
mPopCondition.notify_one();
}
}

template <typename T> std::optional<T> Queue<T>::pop() {
std::unique_lock<std::mutex> lock(mMutex);
while (mQueue.empty()) {
if (mStopping)
return nullopt;
mCondition.wait(lock);
mPopCondition.wait(lock, [this]() { return !mQueue.empty() || mStopping; });
if (!mQueue.empty()) {
std::optional<T> element(std::move(mQueue.front()));
mQueue.pop();
return element;
} else {
return nullopt;
}
}

std::optional<T> element = mQueue.front();
mQueue.pop();
return element;
template <typename T> std::optional<T> Queue<T>::tryPop() {
std::unique_lock<std::mutex> lock(mMutex);
if (!mQueue.empty()) {
std::optional<T> element(std::move(mQueue.front()));
mQueue.pop();
return element;
} else {
return nullopt;
}
}

template <typename T> void Queue<T>::wait() {
std::unique_lock<std::mutex> lock(mMutex);
if (mQueue.empty() && !mStopping)
mCondition.wait(lock);
mPopCondition.wait(lock, [this]() { return !mQueue.empty() || mStopping; });
}

template <typename T> void Queue<T>::wait(const std::chrono::milliseconds &duration) {
std::unique_lock<std::mutex> lock(mMutex);
if (mQueue.empty() && !mStopping)
mCondition.wait_for(lock, duration);
mPopCondition.wait_for(lock, duration, [this]() { return !mQueue.empty() || mStopping; });
}

} // namespace rtc
Expand Down
48 changes: 33 additions & 15 deletions src/channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,28 @@

#include "channel.hpp"

namespace {}

namespace rtc {

void Channel::onOpen(std::function<void()> callback) { mOpenCallback = callback; }
void Channel::onOpen(std::function<void()> callback) {
mOpenCallback = callback;
}

void Channel::onClosed(std::function<void()> callback) { mClosedCallback = callback; }
void Channel::onClosed(std::function<void()> callback) {
mClosedCallback = callback;
}

void Channel::onError(std::function<void(const string &error)> callback) {
mErrorCallback = callback;
}

void Channel::onMessage(std::function<void(const std::variant<binary, string> &data)> callback) {
mMessageCallback = callback;

// Pass pending messages
while (auto message = receive())
mMessageCallback(*message);
}

void Channel::onMessage(std::function<void(const binary &data)> binaryCallback,
Expand All @@ -39,25 +49,33 @@ void Channel::onMessage(std::function<void(const binary &data)> binaryCallback,
});
}

void Channel::triggerOpen(void) {
if (mOpenCallback)
mOpenCallback();
void Channel::onAvailable(std::function<void()> callback) {
mAvailableCallback = callback;
}

void Channel::triggerClosed(void) {
if (mClosedCallback)
mClosedCallback();
void Channel::onSent(std::function<void()> callback) {
mSentCallback = callback;
}

void Channel::triggerError(const string &error) {
if (mErrorCallback)
mErrorCallback(error);
}
void Channel::triggerOpen() { mOpenCallback(); }

void Channel::triggerMessage(const std::variant<binary, string> &data) {
if (mMessageCallback)
mMessageCallback(data);
void Channel::triggerClosed() { mClosedCallback(); }

void Channel::triggerError(const string &error) { mErrorCallback(error); }

void Channel::triggerAvailable(size_t available) {
if (available == 1)
mAvailableCallback();

while (mMessageCallback && available--) {
auto message = receive();
if (!message)
break;
mMessageCallback(*message);
}
}

void Channel::triggerSent() { mSentCallback(); }

} // namespace rtc

Loading

0 comments on commit 04df12b

Please sign in to comment.