From be9d52d235a31789a7d2adf728231a7bf7e29952 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Luis=20Mill=C3=A1n?= Date: Wed, 7 Sep 2022 12:49:46 +0200 Subject: [PATCH 01/16] ObjectPoolAllocator --- worker/include/Utils.hpp | 104 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 104 insertions(+) diff --git a/worker/include/Utils.hpp b/worker/include/Utils.hpp index b19415c346..2722cb7142 100644 --- a/worker/include/Utils.hpp +++ b/worker/include/Utils.hpp @@ -1,7 +1,15 @@ #ifndef MS_UTILS_HPP #define MS_UTILS_HPP +#ifndef MS_CLASS +#define MS_CLASS "Utils" +#endif + +// #define MS_MEM_POOL_FREE_ON_RETURN 1 + #include "common.hpp" +#include "Logger.hpp" +#include "handles/Timer.hpp" #include #include #include // std::memcmp(), std::memcpy() @@ -353,6 +361,102 @@ namespace Utils return false; } }; + + // Simple implementation of object pool only for single objects + // Arrays are allocated as usual + template + class ObjectPoolAllocator : public Timer::Listener + { + std::shared_ptr> pool_data; + + public: + typedef T value_type; + thread_local static Utils::ObjectPoolAllocator Pool; + + ObjectPoolAllocator() + { + pool_data = std::shared_ptr>( + new std::vector(), + [](std::vector* pool) + { + for (auto* ptr : *pool) + { + std::free(ptr); + } + delete pool; + }); + + this->timer = new Timer(this); + this->timer->Start(3000); + } + + template + ObjectPoolAllocator(const ObjectPoolAllocator& other) + : pool_data(ObjectPoolAllocator::Pool.pool_data) + { + } + + ~ObjectPoolAllocator() + { + if (this->timer) + { + this->timer->Stop(); + } + } + + T* allocate(size_t n) + { + MS_ASSERT(n == 1, "only single object can be allocated"); + + if (this->pool_data->empty()) + { + this->allocated++; + + return static_cast(std::malloc(sizeof(T))); + } + + T* ptr = this->pool_data->back(); + this->pool_data->pop_back(); + + return ptr; + } + + void deallocate(T* ptr, size_t n) + { + if (!ptr) + { + return; + } + + if (n > 1) + { + std::free(ptr); + return; + } + +#ifdef MS_MEM_POOL_FREE_ON_RETURN + std::free(ptr); +#else + this->pool_data->push_back(ptr); +#endif + } + + /* Pure virtual methods inherited from Timer::Listener. */ + public: + void OnTimer(Timer* timer) override + { + MS_ERROR("allocated: %zu, available: %zu", this->allocated, this->pool_data->size()); + + this->timer->Start(3000); + } + + private: + Timer* timer{ nullptr }; + size_t allocated{ 0 }; + }; + + template + thread_local Utils::ObjectPoolAllocator Utils::ObjectPoolAllocator::Pool; } // namespace Utils #endif From 04a3728270b4a526a4b519f3e56d4e16c720940e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Luis=20Mill=C3=A1n?= Date: Wed, 7 Sep 2022 12:50:44 +0200 Subject: [PATCH 02/16] Use ObjectPoolAllocator in RtpStreamSend --- worker/include/RTC/RtpStreamSend.hpp | 3 +++ worker/src/RTC/RtpStreamSend.cpp | 16 +++++++++++----- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/worker/include/RTC/RtpStreamSend.hpp b/worker/include/RTC/RtpStreamSend.hpp index e4b7e43f28..bb858802bf 100644 --- a/worker/include/RTC/RtpStreamSend.hpp +++ b/worker/include/RTC/RtpStreamSend.hpp @@ -26,6 +26,9 @@ namespace RTC public: struct StorageItem { + using Allocator = Utils::ObjectPoolAllocator; + using AllocatorTraits = std::allocator_traits; + void Reset(); // Original packet. diff --git a/worker/src/RTC/RtpStreamSend.cpp b/worker/src/RTC/RtpStreamSend.cpp index f0cf95ac59..6ff363c248 100644 --- a/worker/src/RTC/RtpStreamSend.cpp +++ b/worker/src/RTC/RtpStreamSend.cpp @@ -123,7 +123,10 @@ namespace RTC auto* storageItem = this->buffer[0]; - delete storageItem; + // Reset (free RTP packet) the old storage item. + storageItem->Reset(); + // Return into the pool. + StorageItem::Allocator::Pool.deallocate(storageItem, 1); this->buffer[0] = nullptr; @@ -145,8 +148,8 @@ namespace RTC // Reset the storage item (decrease RTP packet shared pointer counter). storageItem->Reset(); - - delete storageItem; + // Return into the pool. + StorageItem::Allocator::Pool.deallocate(storageItem, 1); } this->buffer.clear(); @@ -492,7 +495,10 @@ namespace RTC else { // Allocate a new storage item. - storageItem = new StorageItem(); + storageItem = StorageItem::Allocator::Pool.allocate(1); + // Memory is not initialized in any way, reset it. + // Create a new StorageItem instance in this memory. + StorageItem::AllocatorTraits::construct(StorageItem::Allocator::Pool, storageItem); this->storageItemBuffer.Insert(seq, storageItem); } @@ -500,7 +506,7 @@ namespace RTC // Only clone once and only if necessary. if (!sharedPacket.get()) { - sharedPacket.reset(packet->Clone()); + sharedPacket = packet->Clone(); } // Store original packet and some extra info into the storage item. From 215ef3e8a82c5c52b739f801b52a2cc7845d3f49 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Luis=20Mill=C3=A1n?= Date: Wed, 7 Sep 2022 13:00:40 +0200 Subject: [PATCH 03/16] Use ObjectPoolAllocator for send callback function --- worker/include/RTC/DirectTransport.hpp | 3 +- worker/include/RTC/PipeTransport.hpp | 3 +- worker/include/RTC/PlainTransport.hpp | 3 +- worker/include/RTC/TcpConnection.hpp | 6 +- worker/include/RTC/Transport.hpp | 29 ++++- worker/include/RTC/TransportTuple.hpp | 14 ++- worker/include/RTC/WebRtcTransport.hpp | 3 +- .../include/handles/TcpConnectionHandler.hpp | 10 +- worker/include/handles/UdpSocketHandler.hpp | 16 +-- worker/src/RTC/DirectTransport.cpp | 13 +- worker/src/RTC/PipeTransport.cpp | 13 +- worker/src/RTC/PlainTransport.cpp | 13 +- worker/src/RTC/TcpConnection.cpp | 8 +- worker/src/RTC/Transport.cpp | 119 ++++++++---------- worker/src/RTC/WebRtcTransport.cpp | 16 +-- worker/src/handles/TcpConnectionHandler.cpp | 35 +++--- worker/src/handles/UdpSocketHandler.cpp | 41 +++--- 17 files changed, 192 insertions(+), 153 deletions(-) diff --git a/worker/include/RTC/DirectTransport.hpp b/worker/include/RTC/DirectTransport.hpp index f8325ef07c..2a820e8354 100644 --- a/worker/include/RTC/DirectTransport.hpp +++ b/worker/include/RTC/DirectTransport.hpp @@ -20,7 +20,8 @@ namespace RTC void SendRtpPacket( RTC::Consumer* consumer, RTC::RtpPacket* packet, - RTC::Transport::onSendCallback* cb = nullptr) override; + RTC::Transport::onSendCallback* cb = nullptr, + RTC::Transport::OnSendCallbackCtx* ctx = nullptr) override; void SendRtcpPacket(RTC::RTCP::Packet* packet) override; void SendRtcpCompoundPacket(RTC::RTCP::CompoundPacket* packet) override; void SendMessage( diff --git a/worker/include/RTC/PipeTransport.hpp b/worker/include/RTC/PipeTransport.hpp index be6bee3cf8..256f2739c0 100644 --- a/worker/include/RTC/PipeTransport.hpp +++ b/worker/include/RTC/PipeTransport.hpp @@ -44,7 +44,8 @@ namespace RTC void SendRtpPacket( RTC::Consumer* consumer, RTC::RtpPacket* packet, - RTC::Transport::onSendCallback* cb = nullptr) override; + RTC::Transport::onSendCallback* cb = nullptr, + RTC::Transport::OnSendCallbackCtx* ctx = nullptr) override; void SendRtcpPacket(RTC::RTCP::Packet* packet) override; void SendRtcpCompoundPacket(RTC::RTCP::CompoundPacket* packet) override; void SendMessage( diff --git a/worker/include/RTC/PlainTransport.hpp b/worker/include/RTC/PlainTransport.hpp index b9609874cf..b464c296f8 100644 --- a/worker/include/RTC/PlainTransport.hpp +++ b/worker/include/RTC/PlainTransport.hpp @@ -45,7 +45,8 @@ namespace RTC void SendRtpPacket( RTC::Consumer* consumer, RTC::RtpPacket* packet, - RTC::Transport::onSendCallback* cb = nullptr) override; + RTC::Transport::onSendCallback* cb = nullptr, + RTC::Transport::OnSendCallbackCtx* ctx = nullptr) override; void SendRtcpPacket(RTC::RTCP::Packet* packet) override; void SendRtcpCompoundPacket(RTC::RTCP::CompoundPacket* packet) override; void SendMessage( diff --git a/worker/include/RTC/TcpConnection.hpp b/worker/include/RTC/TcpConnection.hpp index 0aa17ebb62..420389297e 100644 --- a/worker/include/RTC/TcpConnection.hpp +++ b/worker/include/RTC/TcpConnection.hpp @@ -24,7 +24,11 @@ namespace RTC ~TcpConnection() override; public: - void Send(const uint8_t* data, size_t len, ::TcpConnectionHandler::onSendCallback* cb); + void Send( + const uint8_t* data, + size_t len, + RTC::Transport::onSendCallback* cb, + RTC::Transport::OnSendCallbackCtx* ctx); /* Pure virtual methods inherited from ::TcpConnectionHandler. */ public: diff --git a/worker/include/RTC/Transport.hpp b/worker/include/RTC/Transport.hpp index e8e97247e5..dd5734c197 100644 --- a/worker/include/RTC/Transport.hpp +++ b/worker/include/RTC/Transport.hpp @@ -51,9 +51,31 @@ namespace RTC public Timer::Listener { protected: - using onSendCallback = const std::function; using onQueuedCallback = const std::function; + public: +#ifdef ENABLE_RTC_SENDER_BANDWIDTH_ESTIMATOR + struct OnSendCallbackCtx + { + RTC::TransportCongestionControlClient* tccClient; + webrtc::RtpPacketSendInfo packetInfo; + RTC::SenderBandwidthEstimator* senderBwe; + RTC::SenderBandwidthEstimator::SentInfo sentInfo; + }; +#else + struct OnSendCallbackCtx + { + using Allocator = Utils::ObjectPoolAllocator; + using AllocatorTraits = std::allocator_traits; + + RTC::TransportCongestionControlClient* tccClient; + webrtc::RtpPacketSendInfo packetInfo; + }; +#endif + // This function MUST NOT be de-allocated manually and MUST be called EXACTLY once. + static void OnSendCallback(bool sent, OnSendCallbackCtx* ctx); + using onSendCallback = void(bool sent, OnSendCallbackCtx* ctx); + public: class Listener { @@ -170,7 +192,10 @@ namespace RTC private: virtual bool IsConnected() const = 0; virtual void SendRtpPacket( - RTC::Consumer* consumer, RTC::RtpPacket* packet, onSendCallback* cb = nullptr) = 0; + RTC::Consumer* consumer, + RTC::RtpPacket* packet, + onSendCallback* cb = nullptr, + OnSendCallbackCtx* ctx = nullptr) = 0; void HandleRtcpPacket(RTC::RTCP::Packet* packet); void SendRtcp(uint64_t nowMs); virtual void SendRtcpPacket(RTC::RTCP::Packet* packet) = 0; diff --git a/worker/include/RTC/TransportTuple.hpp b/worker/include/RTC/TransportTuple.hpp index 535d221a68..07d77fd471 100644 --- a/worker/include/RTC/TransportTuple.hpp +++ b/worker/include/RTC/TransportTuple.hpp @@ -4,6 +4,7 @@ #include "common.hpp" #include "Utils.hpp" #include "RTC/TcpConnection.hpp" +#include "RTC/Transport.hpp" #include "RTC/UdpSocket.hpp" #include #include @@ -14,9 +15,6 @@ namespace RTC { class TransportTuple { - protected: - using onSendCallback = const std::function; - public: enum class Protocol { @@ -85,12 +83,16 @@ namespace RTC this->localAnnouncedIp = localAnnouncedIp; } - void Send(const uint8_t* data, size_t len, RTC::TransportTuple::onSendCallback* cb = nullptr) + void Send( + const uint8_t* data, + size_t len, + Transport::onSendCallback* cb = nullptr, + Transport::OnSendCallbackCtx* ctx = nullptr) { if (this->protocol == Protocol::UDP) - this->udpSocket->Send(data, len, this->udpRemoteAddr, cb); + this->udpSocket->Send(data, len, this->udpRemoteAddr, cb, ctx); else - this->tcpConnection->Send(data, len, cb); + this->tcpConnection->Send(data, len, cb, ctx); } Protocol GetProtocol() const diff --git a/worker/include/RTC/WebRtcTransport.hpp b/worker/include/RTC/WebRtcTransport.hpp index 1fd308d1dd..efe2e4a176 100644 --- a/worker/include/RTC/WebRtcTransport.hpp +++ b/worker/include/RTC/WebRtcTransport.hpp @@ -80,7 +80,8 @@ namespace RTC void SendRtpPacket( RTC::Consumer* consumer, RTC::RtpPacket* packet, - RTC::Transport::onSendCallback* cb = nullptr) override; + RTC::Transport::onSendCallback* cb = nullptr, + RTC::Transport::OnSendCallbackCtx* ctx = nullptr) override; void SendRtcpPacket(RTC::RTCP::Packet* packet) override; void SendRtcpCompoundPacket(RTC::RTCP::CompoundPacket* packet) override; void SendMessage( diff --git a/worker/include/handles/TcpConnectionHandler.hpp b/worker/include/handles/TcpConnectionHandler.hpp index 0b536225ba..7ef163c05c 100644 --- a/worker/include/handles/TcpConnectionHandler.hpp +++ b/worker/include/handles/TcpConnectionHandler.hpp @@ -2,6 +2,7 @@ #define MS_TCP_CONNECTION_HPP #include "common.hpp" +#include "RTC/Transport.hpp" #include #include @@ -35,12 +36,12 @@ class TcpConnectionHandler ~UvWriteData() { delete[] this->store; - delete this->cb; } uv_write_t req; uint8_t* store{ nullptr }; - TcpConnectionHandler::onSendCallback* cb{ nullptr }; + RTC::Transport::onSendCallback* cb{ nullptr }; + RTC::Transport::OnSendCallbackCtx* ctx{ nullptr }; }; public: @@ -71,7 +72,8 @@ class TcpConnectionHandler size_t len1, const uint8_t* data2, size_t len2, - TcpConnectionHandler::onSendCallback* cb); + RTC::Transport::onSendCallback* cb, + RTC::Transport::OnSendCallbackCtx* ctx); void ErrorReceiving(); const struct sockaddr* GetLocalAddress() const { @@ -117,7 +119,7 @@ class TcpConnectionHandler public: void OnUvReadAlloc(size_t suggestedSize, uv_buf_t* buf); void OnUvRead(ssize_t nread, const uv_buf_t* buf); - void OnUvWrite(int status, onSendCallback* cb); + void OnUvWrite(int status, RTC::Transport::onSendCallback* cb, RTC::Transport::OnSendCallbackCtx* ctx); /* Pure virtual methods that must be implemented by the subclass. */ protected: diff --git a/worker/include/handles/UdpSocketHandler.hpp b/worker/include/handles/UdpSocketHandler.hpp index 188f23b4fa..7b4cbd1b8c 100644 --- a/worker/include/handles/UdpSocketHandler.hpp +++ b/worker/include/handles/UdpSocketHandler.hpp @@ -2,14 +2,12 @@ #define MS_UDP_SOCKET_HPP #include "common.hpp" +#include "RTC/Transport.hpp" #include #include class UdpSocketHandler { -protected: - using onSendCallback = const std::function; - public: /* Struct for the data field of uv_req_t when sending a datagram. */ struct UvSendData @@ -25,12 +23,12 @@ class UdpSocketHandler ~UvSendData() { delete[] this->store; - delete this->cb; } uv_udp_send_t req; uint8_t* store{ nullptr }; - UdpSocketHandler::onSendCallback* cb{ nullptr }; + RTC::Transport::onSendCallback* cb{ nullptr }; + RTC::Transport::OnSendCallbackCtx* ctx{ nullptr }; }; public: @@ -50,7 +48,11 @@ class UdpSocketHandler } virtual void Dump() const; void Send( - const uint8_t* data, size_t len, const struct sockaddr* addr, UdpSocketHandler::onSendCallback* cb); + const uint8_t* data, + size_t len, + const struct sockaddr* addr, + RTC::Transport::onSendCallback* cb, + RTC::Transport::OnSendCallbackCtx* ctx); const struct sockaddr* GetLocalAddress() const { return reinterpret_cast(&this->localAddr); @@ -83,7 +85,7 @@ class UdpSocketHandler public: void OnUvRecvAlloc(size_t suggestedSize, uv_buf_t* buf); void OnUvRecv(ssize_t nread, const uv_buf_t* buf, const struct sockaddr* addr, unsigned int flags); - void OnUvSend(int status, UdpSocketHandler::onSendCallback* cb); + void OnUvSend(int status, RTC::Transport::onSendCallback* cb, RTC::Transport::OnSendCallbackCtx* ctx); /* Pure virtual methods that must be implemented by the subclass. */ protected: diff --git a/worker/src/RTC/DirectTransport.cpp b/worker/src/RTC/DirectTransport.cpp index 7a8a027bc8..3e27463d76 100644 --- a/worker/src/RTC/DirectTransport.cpp +++ b/worker/src/RTC/DirectTransport.cpp @@ -111,7 +111,10 @@ namespace RTC } void DirectTransport::SendRtpPacket( - RTC::Consumer* consumer, RTC::RtpPacket* packet, RTC::Transport::onSendCallback* cb) + RTC::Consumer* consumer, + RTC::RtpPacket* packet, + RTC::Transport::onSendCallback* cb, + RTC::Transport::OnSendCallbackCtx* ctx) { MS_TRACE(); @@ -119,6 +122,11 @@ namespace RTC { MS_WARN_TAG(rtp, "cannot send RTP packet not associated to a Consumer"); + if (cb) + { + (*cb)(false, ctx); + } + return; } @@ -130,8 +138,7 @@ namespace RTC if (cb) { - (*cb)(true); - delete cb; + (*cb)(true, ctx); } // Increase send transmission. diff --git a/worker/src/RTC/PipeTransport.cpp b/worker/src/RTC/PipeTransport.cpp index f1db0e4e19..6e1ab2bbb5 100644 --- a/worker/src/RTC/PipeTransport.cpp +++ b/worker/src/RTC/PipeTransport.cpp @@ -444,7 +444,10 @@ namespace RTC } void PipeTransport::SendRtpPacket( - RTC::Consumer* /*consumer*/, RTC::RtpPacket* packet, RTC::Transport::onSendCallback* cb) + RTC::Consumer* /*consumer*/, + RTC::RtpPacket* packet, + RTC::Transport::onSendCallback* cb, + RTC::Transport::OnSendCallbackCtx* ctx) { MS_TRACE(); @@ -452,8 +455,7 @@ namespace RTC { if (cb) { - (*cb)(false); - delete cb; + (*cb)(false, ctx); } return; @@ -466,8 +468,7 @@ namespace RTC { if (cb) { - (*cb)(false); - delete cb; + (*cb)(false, ctx); } return; @@ -475,7 +476,7 @@ namespace RTC auto len = static_cast(intLen); - this->tuple->Send(data, len, cb); + this->tuple->Send(data, len, cb, ctx); // Increase send transmission. RTC::Transport::DataSent(len); diff --git a/worker/src/RTC/PlainTransport.cpp b/worker/src/RTC/PlainTransport.cpp index ffe427209c..73e806a84a 100644 --- a/worker/src/RTC/PlainTransport.cpp +++ b/worker/src/RTC/PlainTransport.cpp @@ -712,7 +712,10 @@ namespace RTC } void PlainTransport::SendRtpPacket( - RTC::Consumer* /*consumer*/, RTC::RtpPacket* packet, RTC::Transport::onSendCallback* cb) + RTC::Consumer* /*consumer*/, + RTC::RtpPacket* packet, + RTC::Transport::onSendCallback* cb, + RTC::Transport::OnSendCallbackCtx* ctx) { MS_TRACE(); @@ -720,8 +723,7 @@ namespace RTC { if (cb) { - (*cb)(false); - delete cb; + (*cb)(false, ctx); } return; @@ -734,8 +736,7 @@ namespace RTC { if (cb) { - (*cb)(false); - delete cb; + (*cb)(false, ctx); } return; @@ -743,7 +744,7 @@ namespace RTC auto len = static_cast(intLen); - this->tuple->Send(data, len, cb); + this->tuple->Send(data, len, cb, ctx); // Increase send transmission. RTC::Transport::DataSent(len); diff --git a/worker/src/RTC/TcpConnection.cpp b/worker/src/RTC/TcpConnection.cpp index 511ec56190..98988ad76c 100644 --- a/worker/src/RTC/TcpConnection.cpp +++ b/worker/src/RTC/TcpConnection.cpp @@ -154,7 +154,11 @@ namespace RTC } } - void TcpConnection::Send(const uint8_t* data, size_t len, ::TcpConnectionHandler::onSendCallback* cb) + void TcpConnection::Send( + const uint8_t* data, + size_t len, + RTC::Transport::onSendCallback* cb, + RTC::Transport::OnSendCallbackCtx* ctx) { MS_TRACE(); @@ -163,6 +167,6 @@ namespace RTC uint8_t frameLen[2]; Utils::Byte::Set2Bytes(frameLen, 0, len); - ::TcpConnectionHandler::Write(frameLen, 2, data, len, cb); + ::TcpConnectionHandler::Write(frameLen, 2, data, len, cb, ctx); } } // namespace RTC diff --git a/worker/src/RTC/Transport.cpp b/worker/src/RTC/Transport.cpp index 60a5e5dcf9..6b40edb841 100644 --- a/worker/src/RTC/Transport.cpp +++ b/worker/src/RTC/Transport.cpp @@ -30,6 +30,30 @@ namespace RTC static size_t DefaultSctpSendBufferSize{ 262144 }; // 2^18. static size_t MaxSctpSendBufferSize{ 268435456 }; // 2^28. +#ifdef ENABLE_RTC_SENDER_BANDWIDTH_ESTIMATOR + void Transport::OnSendCallback(bool sent, OnSendCallbackCtx* ctx) + { + if (sent) + { + ctx->tccClient->PacketSent(ctx->packetInfo, DepLibUV::GetTimeMsInt64()); + + ctx->sentInfo.sentAtMs = DepLibUV::GetTimeMs(); + + ctx->senderBwe->RtpPacketSent(ctx->sentInfo); + } + + OnSendCallbackCtx::Allocator::Pool.deallocate(ctx, 1); + } +#else + void Transport::OnSendCallback(bool sent, OnSendCallbackCtx* ctx) + { + if (sent) + ctx->tccClient->PacketSent(ctx->packetInfo, DepLibUV::GetTimeMsInt64()); + + OnSendCallbackCtx::Allocator::Pool.deallocate(ctx, 1); + } +#endif + /* Instance methods. */ Transport::Transport(const std::string& id, Listener* listener, json& data) @@ -2522,7 +2546,6 @@ namespace RTC { this->transportWideCcSeq++; - auto* tccClient = this->tccClient; webrtc::RtpPacketSendInfo packetInfo; packetInfo.ssrc = packet->GetSsrc(); @@ -2535,6 +2558,8 @@ namespace RTC // Indicate the pacer (and prober) that a packet is to be sent. this->tccClient->InsertPacket(packetInfo); + auto* ctx = OnSendCallbackCtx::Allocator::Pool.allocate(1); + OnSendCallbackCtx::AllocatorTraits::construct(OnSendCallbackCtx::Allocator::Pool, ctx); #ifdef ENABLE_RTC_SENDER_BANDWIDTH_ESTIMATOR auto* senderBwe = this->senderBwe; RTC::SenderBandwidthEstimator::SentInfo sentInfo; @@ -2543,30 +2568,15 @@ namespace RTC sentInfo.size = packet->GetSize(); sentInfo.sendingAtMs = DepLibUV::GetTimeMs(); - auto* cb = new onSendCallback( - [tccClient, &packetInfo, senderBwe, &sentInfo](bool sent) - { - if (sent) - { - tccClient->PacketSent(packetInfo, DepLibUV::GetTimeMsInt64()); - - sentInfo.sentAtMs = DepLibUV::GetTimeMs(); - - senderBwe->RtpPacketSent(sentInfo); - } - }); - - SendRtpPacket(consumer, packet, cb); + ctx->tccClient = this->tccClient; + ctx->packetInfo = packetInfo; + ctx->senderBwe = senderBwe; + ctx->sentInfo = sentInfo; #else - const auto* cb = new onSendCallback( - [tccClient, &packetInfo](bool sent) - { - if (sent) - tccClient->PacketSent(packetInfo, DepLibUV::GetTimeMsInt64()); - }); - - SendRtpPacket(consumer, packet, cb); + ctx->tccClient = this->tccClient; + ctx->packetInfo = packetInfo; #endif + SendRtpPacket(consumer, packet, OnSendCallback, ctx); } else { @@ -2594,7 +2604,6 @@ namespace RTC { this->transportWideCcSeq++; - auto* tccClient = this->tccClient; webrtc::RtpPacketSendInfo packetInfo; packetInfo.ssrc = packet->GetSsrc(); @@ -2607,6 +2616,8 @@ namespace RTC // Indicate the pacer (and prober) that a packet is to be sent. this->tccClient->InsertPacket(packetInfo); + auto* ctx = OnSendCallbackCtx::Allocator::Pool.allocate(1); + OnSendCallbackCtx::AllocatorTraits::construct(OnSendCallbackCtx::Allocator::Pool, ctx); #ifdef ENABLE_RTC_SENDER_BANDWIDTH_ESTIMATOR auto* senderBwe = this->senderBwe; RTC::SenderBandwidthEstimator::SentInfo sentInfo; @@ -2615,30 +2626,15 @@ namespace RTC sentInfo.size = packet->GetSize(); sentInfo.sendingAtMs = DepLibUV::GetTimeMs(); - auto* cb = new onSendCallback( - [tccClient, &packetInfo, senderBwe, &sentInfo](bool sent) - { - if (sent) - { - tccClient->PacketSent(packetInfo, DepLibUV::GetTimeMsInt64()); - - sentInfo.sentAtMs = DepLibUV::GetTimeMs(); - - senderBwe->RtpPacketSent(sentInfo); - } - }); - - SendRtpPacket(consumer, packet, cb); + ctx->tccClient = this->tccClient; + ctx->packetInfo = packetInfo; + ctx->senderBwe = senderBwe; + ctx->sentInfo = sentInfo; #else - const auto* cb = new onSendCallback( - [tccClient, &packetInfo](bool sent) - { - if (sent) - tccClient->PacketSent(packetInfo, DepLibUV::GetTimeMsInt64()); - }); - - SendRtpPacket(consumer, packet, cb); + ctx->tccClient = this->tccClient; + ctx->packetInfo = packetInfo; #endif + SendRtpPacket(consumer, packet, OnSendCallback, ctx); } else { @@ -2945,6 +2941,8 @@ namespace RTC // Indicate the pacer (and prober) that a packet is to be sent. this->tccClient->InsertPacket(packetInfo); + auto* ctx = OnSendCallbackCtx::Allocator::Pool.allocate(1); + OnSendCallbackCtx::AllocatorTraits::construct(OnSendCallbackCtx::Allocator::Pool, ctx); #ifdef ENABLE_RTC_SENDER_BANDWIDTH_ESTIMATOR auto* senderBwe = this->senderBwe; RTC::SenderBandwidthEstimator::SentInfo sentInfo; @@ -2954,30 +2952,15 @@ namespace RTC sentInfo.isProbation = true; sentInfo.sendingAtMs = DepLibUV::GetTimeMs(); - auto* cb = new onSendCallback( - [tccClient, &packetInfo, senderBwe, &sentInfo](bool sent) - { - if (sent) - { - tccClient->PacketSent(packetInfo, DepLibUV::GetTimeMsInt64()); - - sentInfo.sentAtMs = DepLibUV::GetTimeMs(); - - senderBwe->RtpPacketSent(sentInfo); - } - }); - - SendRtpPacket(nullptr, packet, cb); + ctx->tccClient = this->tccClient; + ctx->packetInfo = packetInfo; + ctx->senderBwe = senderBwe; + ctx->sentInfo = sentInfo; #else - const auto* cb = new onSendCallback( - [tccClient, &packetInfo](bool sent) - { - if (sent) - tccClient->PacketSent(packetInfo, DepLibUV::GetTimeMsInt64()); - }); - - SendRtpPacket(nullptr, packet, cb); + ctx->tccClient = this->tccClient; + ctx->packetInfo = packetInfo; #endif + SendRtpPacket(nullptr, packet, OnSendCallback, ctx); } else { diff --git a/worker/src/RTC/WebRtcTransport.cpp b/worker/src/RTC/WebRtcTransport.cpp index e697c94075..5443180c75 100644 --- a/worker/src/RTC/WebRtcTransport.cpp +++ b/worker/src/RTC/WebRtcTransport.cpp @@ -831,7 +831,10 @@ namespace RTC } void WebRtcTransport::SendRtpPacket( - RTC::Consumer* /*consumer*/, RTC::RtpPacket* packet, RTC::Transport::onSendCallback* cb) + RTC::Consumer* /*consumer*/, + RTC::RtpPacket* packet, + RTC::Transport::onSendCallback* cb, + RTC::Transport::OnSendCallbackCtx* ctx) { MS_TRACE(); @@ -839,8 +842,7 @@ namespace RTC { if (cb) { - (*cb)(false); - delete cb; + (*cb)(false, ctx); } return; @@ -853,8 +855,7 @@ namespace RTC if (cb) { - (*cb)(false); - delete cb; + (*cb)(false, ctx); } return; @@ -867,8 +868,7 @@ namespace RTC { if (cb) { - (*cb)(false); - delete cb; + (*cb)(false, ctx); } return; @@ -876,7 +876,7 @@ namespace RTC auto len = static_cast(intLen); - this->iceServer->GetSelectedTuple()->Send(data, len, cb); + this->iceServer->GetSelectedTuple()->Send(data, len, cb, ctx); // Increase send transmission. RTC::Transport::DataSent(len); diff --git a/worker/src/handles/TcpConnectionHandler.cpp b/worker/src/handles/TcpConnectionHandler.cpp index 5fd6a7f045..7dfa24e272 100644 --- a/worker/src/handles/TcpConnectionHandler.cpp +++ b/worker/src/handles/TcpConnectionHandler.cpp @@ -32,11 +32,14 @@ inline static void onWrite(uv_write_t* req, int status) auto* handle = req->handle; auto* connection = static_cast(handle->data); auto* cb = writeData->cb; + auto* ctx = writeData->ctx; if (connection) - connection->OnUvWrite(status, cb); + connection->OnUvWrite(status, cb, ctx); + else if (cb) + (*cb)(false, ctx); - // Delete the UvWriteData struct and the cb. + // Delete the UvWriteData struct. delete writeData; } @@ -179,7 +182,8 @@ void TcpConnectionHandler::Write( size_t len1, const uint8_t* data2, size_t len2, - TcpConnectionHandler::onSendCallback* cb) + RTC::Transport::onSendCallback* cb, + RTC::Transport::OnSendCallbackCtx* ctx) { MS_TRACE(); @@ -187,8 +191,7 @@ void TcpConnectionHandler::Write( { if (cb) { - (*cb)(false); - delete cb; + (*cb)(false, ctx); } return; @@ -198,8 +201,7 @@ void TcpConnectionHandler::Write( { if (cb) { - (*cb)(false); - delete cb; + (*cb)(false, ctx); } return; @@ -225,8 +227,7 @@ void TcpConnectionHandler::Write( if (cb) { - (*cb)(true); - delete cb; + (*cb)(true, ctx); } return; @@ -267,7 +268,8 @@ void TcpConnectionHandler::Write( len2 - (static_cast(written) - len1)); } - writeData->cb = cb; + writeData->cb = cb; + writeData->ctx = ctx; uv_buf_t buffer = uv_buf_init(reinterpret_cast(writeData->store), pendingLen); @@ -283,9 +285,9 @@ void TcpConnectionHandler::Write( MS_WARN_DEV("uv_write() failed: %s", uv_strerror(err)); if (cb) - (*cb)(false); + (*cb)(false, ctx); - // Delete the UvWriteData struct (it will delete the store and cb too). + // Delete the UvWriteData struct (it will delete the store too). delete writeData; } else @@ -399,16 +401,15 @@ inline void TcpConnectionHandler::OnUvRead(ssize_t nread, const uv_buf_t* /*buf* } } -inline void TcpConnectionHandler::OnUvWrite(int status, TcpConnectionHandler::onSendCallback* cb) +inline void TcpConnectionHandler::OnUvWrite( + int status, RTC::Transport::onSendCallback* cb, RTC::Transport::OnSendCallbackCtx* ctx) { MS_TRACE(); - // NOTE: Do not delete cb here since it will be delete in onWrite() above. - if (status == 0) { if (cb) - (*cb)(true); + (*cb)(true, ctx); } else { @@ -418,7 +419,7 @@ inline void TcpConnectionHandler::OnUvWrite(int status, TcpConnectionHandler::on MS_WARN_DEV("write error, closing the connection: %s", uv_strerror(status)); if (cb) - (*cb)(false); + (*cb)(false, ctx); Close(); diff --git a/worker/src/handles/UdpSocketHandler.cpp b/worker/src/handles/UdpSocketHandler.cpp index 6cd1fa8e3e..39e74eea37 100644 --- a/worker/src/handles/UdpSocketHandler.cpp +++ b/worker/src/handles/UdpSocketHandler.cpp @@ -37,11 +37,14 @@ inline static void onSend(uv_udp_send_t* req, int status) auto* handle = req->handle; auto* socket = static_cast(handle->data); auto* cb = sendData->cb; + auto* ctx = sendData->ctx; if (socket) - socket->OnUvSend(status, cb); + socket->OnUvSend(status, cb, ctx); + else if (cb) + (*cb)(false, ctx); - // Delete the UvSendData struct (it will delete the store and cb too). + // Delete the UvSendData struct (it will delete the store too). delete sendData; } @@ -119,7 +122,11 @@ void UdpSocketHandler::Dump() const } void UdpSocketHandler::Send( - const uint8_t* data, size_t len, const struct sockaddr* addr, UdpSocketHandler::onSendCallback* cb) + const uint8_t* data, + size_t len, + const struct sockaddr* addr, + RTC::Transport::onSendCallback* cb, + RTC::Transport::OnSendCallbackCtx* ctx) { MS_TRACE(); @@ -127,8 +134,7 @@ void UdpSocketHandler::Send( { if (cb) { - (*cb)(false); - delete cb; + (*cb)(false, ctx); } return; @@ -138,8 +144,7 @@ void UdpSocketHandler::Send( { if (cb) { - (*cb)(false); - delete cb; + (*cb)(false, ctx); } return; @@ -159,8 +164,7 @@ void UdpSocketHandler::Send( if (cb) { - (*cb)(true); - delete cb; + (*cb)(true, ctx); } return; @@ -174,8 +178,7 @@ void UdpSocketHandler::Send( if (cb) { - (*cb)(false); - delete cb; + (*cb)(false, ctx); } return; @@ -190,7 +193,8 @@ void UdpSocketHandler::Send( sendData->req.data = static_cast(sendData); std::memcpy(sendData->store, data, len); - sendData->cb = cb; + sendData->cb = cb; + sendData->ctx = ctx; buffer = uv_buf_init(reinterpret_cast(sendData->store), len); @@ -204,9 +208,9 @@ void UdpSocketHandler::Send( MS_WARN_DEV("uv_udp_send() failed: %s", uv_strerror(err)); if (cb) - (*cb)(false); + (*cb)(false, ctx); - // Delete the UvSendData struct (it will delete the store and cb too). + // Delete the UvSendData struct. delete sendData; } else @@ -284,16 +288,15 @@ inline void UdpSocketHandler::OnUvRecv( } } -inline void UdpSocketHandler::OnUvSend(int status, UdpSocketHandler::onSendCallback* cb) +inline void UdpSocketHandler::OnUvSend( + int status, RTC::Transport::onSendCallback* cb, RTC::Transport::OnSendCallbackCtx* ctx) { MS_TRACE(); - // NOTE: Do not delete cb here since it will be delete in onSend() above. - if (status == 0) { if (cb) - (*cb)(true); + (*cb)(true, ctx); } else { @@ -302,6 +305,6 @@ inline void UdpSocketHandler::OnUvSend(int status, UdpSocketHandler::onSendCallb #endif if (cb) - (*cb)(false); + (*cb)(false, ctx); } } From cd7478a9f9d9fbe22db86389ad2df966b7a96202 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Luis=20Mill=C3=A1n?= Date: Wed, 7 Sep 2022 13:02:00 +0200 Subject: [PATCH 04/16] Use ObjectPoolAllocator in RtpPacket --- worker/include/RTC/RtpPacket.hpp | 17 +++++-- worker/src/RTC/PipeTransport.cpp | 4 +- worker/src/RTC/PlainTransport.cpp | 6 +-- worker/src/RTC/RtpPacket.cpp | 77 +++++++++++++++++++++--------- worker/src/RTC/Transport.cpp | 4 +- worker/src/RTC/WebRtcTransport.cpp | 2 +- 6 files changed, 75 insertions(+), 35 deletions(-) diff --git a/worker/include/RTC/RtpPacket.hpp b/worker/include/RTC/RtpPacket.hpp index 41cac4d0c1..79c72b4116 100644 --- a/worker/include/RTC/RtpPacket.hpp +++ b/worker/include/RTC/RtpPacket.hpp @@ -23,6 +23,15 @@ namespace RTC class RtpPacket { public: + using Allocator = Utils::ObjectPoolAllocator; + using AllocatorTraits = std::allocator_traits; + // Memory to hold the cloned packet (with extra space for RTX encoding). + using RtpPacketBuffer = std::array; + using BufferAllocator = Utils::ObjectPoolAllocator; + using BufferAllocatorTraits = std::allocator_traits; + + static void Deallocate(RtpPacket* packet); + /* Struct for RTP header. */ struct Header { @@ -133,7 +142,7 @@ namespace RTC static RtpPacket* Parse(const uint8_t* data, size_t len); - private: + public: RtpPacket( Header* header, HeaderExtension* headerExtension, @@ -142,7 +151,6 @@ namespace RTC uint8_t payloadPadding, size_t size); - public: ~RtpPacket(); void Dump() const; @@ -589,7 +597,7 @@ namespace RTC return this->payloadDescriptorHandler->IsKeyFrame(); } - RtpPacket* Clone() const; + std::shared_ptr Clone() const; void RtxEncode(uint8_t payloadType, uint32_t ssrc, uint16_t seq); @@ -607,6 +615,7 @@ namespace RTC void ShiftPayload(size_t payloadOffset, size_t shift, bool expand = true); private: + // friend AllocatorTraits; void ParseExtensions(); private: @@ -635,7 +644,7 @@ namespace RTC std::shared_ptr payloadDescriptorHandler; // Buffer where this packet is allocated, can be `nullptr` if packet was // parsed from externally provided buffer. - uint8_t* buffer{ nullptr }; + RtpPacketBuffer* buffer{ nullptr }; }; } // namespace RTC diff --git a/worker/src/RTC/PipeTransport.cpp b/worker/src/RTC/PipeTransport.cpp index 6e1ab2bbb5..9114ec9d41 100644 --- a/worker/src/RTC/PipeTransport.cpp +++ b/worker/src/RTC/PipeTransport.cpp @@ -620,7 +620,7 @@ namespace RTC packet->GetPayloadType(), packet->GetSequenceNumber()); - delete packet; + RtpPacket::Deallocate(packet); } return; @@ -643,7 +643,7 @@ namespace RTC // Remove this SSRC. RecvStreamClosed(packet->GetSsrc()); - delete packet; + RtpPacket::Deallocate(packet); return; } diff --git a/worker/src/RTC/PlainTransport.cpp b/worker/src/RTC/PlainTransport.cpp index 73e806a84a..b07b0cce52 100644 --- a/worker/src/RTC/PlainTransport.cpp +++ b/worker/src/RTC/PlainTransport.cpp @@ -895,7 +895,7 @@ namespace RTC packet->GetPayloadType(), packet->GetSequenceNumber()); - delete packet; + RtpPacket::Deallocate(packet); } return; @@ -920,7 +920,7 @@ namespace RTC // Remove this SSRC. RecvStreamClosed(packet->GetSsrc()); - delete packet; + RtpPacket::Deallocate(packet); return; } @@ -956,7 +956,7 @@ namespace RTC // Remove this SSRC. RecvStreamClosed(packet->GetSsrc()); - delete packet; + RtpPacket::Deallocate(packet); return; } diff --git a/worker/src/RTC/RtpPacket.cpp b/worker/src/RTC/RtpPacket.cpp index 324932ff4b..8ffe24d6a1 100644 --- a/worker/src/RTC/RtpPacket.cpp +++ b/worker/src/RTC/RtpPacket.cpp @@ -9,6 +9,15 @@ namespace RTC { + /* Static Class methods. */ + + void RtpPacket::Deallocate(RtpPacket* packet) + { + // Destroy and deallocate the RtpPacket. + packet->~RtpPacket(); + RtpPacket::Allocator::Pool.deallocate(packet, 1); + } + /* Class methods. */ RtpPacket* RtpPacket::Parse(const uint8_t* data, size_t len) @@ -117,7 +126,18 @@ namespace RTC payloadLength + size_t{ payloadPadding }, "packet's computed size does not match received size"); - return new RtpPacket(header, headerExtension, payload, payloadLength, payloadPadding, len); + auto* rtpPacket = RtpPacket::Allocator::Pool.allocate(1); + RtpPacket::AllocatorTraits::construct( + RtpPacket::Allocator::Pool, + rtpPacket, + header, + headerExtension, + payload, + payloadLength, + payloadPadding, + len); + + return rtpPacket; } /* Instance methods. */ @@ -145,9 +165,12 @@ namespace RTC { MS_TRACE(); + // This is a cloned RtpPacket. if (this->buffer) { - delete[] this->buffer; + this->buffer->~array(); + RtpPacket::BufferAllocator::Pool.deallocate(this->buffer, 1); + this->buffer = nullptr; } } @@ -633,12 +656,14 @@ namespace RTC SetPayloadPaddingFlag(false); } - RtpPacket* RtpPacket::Clone() const + std::shared_ptr RtpPacket::Clone() const { MS_TRACE(); - auto* buffer = new uint8_t[MtuSize + 100]; - auto* ptr = const_cast(buffer); + auto* buffer = RtpPacket::BufferAllocator::Pool.allocate(1); + RtpPacket::BufferAllocatorTraits::construct(RtpPacket::BufferAllocator::Pool, buffer); + + auto* ptr = const_cast(buffer->data()); size_t numBytes{ 0 }; @@ -692,28 +717,34 @@ namespace RTC ptr += size_t{ this->payloadPadding }; } - MS_ASSERT(static_cast(ptr - buffer) == this->size, "ptr - buffer == this->size"); + MS_ASSERT( + static_cast(ptr - buffer->data()) == this->size, "ptr - buffer->data() == this->size"); // Create the new RtpPacket instance and return it. - auto* packet = new RtpPacket( - newHeader, newHeaderExtension, newPayload, this->payloadLength, this->payloadPadding, this->size); - - // Keep already set extension ids. - packet->midExtensionId = this->midExtensionId; - packet->ridExtensionId = this->ridExtensionId; - packet->rridExtensionId = this->rridExtensionId; - packet->absSendTimeExtensionId = this->absSendTimeExtensionId; - packet->transportWideCc01ExtensionId = this->transportWideCc01ExtensionId; - packet->frameMarking07ExtensionId = this->frameMarking07ExtensionId; // Remove once RFC. - packet->frameMarkingExtensionId = this->frameMarkingExtensionId; - packet->ssrcAudioLevelExtensionId = this->ssrcAudioLevelExtensionId; - packet->videoOrientationExtensionId = this->videoOrientationExtensionId; - // Assign the payload descriptor handler. - packet->payloadDescriptorHandler = this->payloadDescriptorHandler; + std::shared_ptr shared = std::allocate_shared( + RtpPacket::Allocator::Pool, + newHeader, + newHeaderExtension, + newPayload, + this->payloadLength, + this->payloadPadding, + this->size); + + shared->midExtensionId = this->midExtensionId; + shared->ridExtensionId = this->ridExtensionId; + shared->rridExtensionId = this->rridExtensionId; + shared->absSendTimeExtensionId = this->absSendTimeExtensionId; + shared->transportWideCc01ExtensionId = this->transportWideCc01ExtensionId; + shared->frameMarking07ExtensionId = this->frameMarking07ExtensionId; // Remove once RFC. + shared->frameMarkingExtensionId = this->frameMarkingExtensionId; + shared->ssrcAudioLevelExtensionId = this->ssrcAudioLevelExtensionId; + shared->videoOrientationExtensionId = this->videoOrientationExtensionId; + // Clone payload descriptor handler. + shared->payloadDescriptorHandler = this->payloadDescriptorHandler; // Store allocated buffer. - packet->buffer = buffer; + shared->buffer = buffer; - return packet; + return shared; } // NOTE: The caller must ensure that the buffer/memmory of the packet has diff --git a/worker/src/RTC/Transport.cpp b/worker/src/RTC/Transport.cpp index 6b40edb841..7be7b48ee7 100644 --- a/worker/src/RTC/Transport.cpp +++ b/worker/src/RTC/Transport.cpp @@ -1611,7 +1611,7 @@ namespace RTC // Tell the child class to remove this SSRC. RecvStreamClosed(packet->GetSsrc()); - delete packet; + RtpPacket::Deallocate(packet); return; } @@ -1640,7 +1640,7 @@ namespace RTC default:; } - delete packet; + RtpPacket::Deallocate(packet); } void Transport::ReceiveRtcpPacket(RTC::RTCP::Packet* packet) diff --git a/worker/src/RTC/WebRtcTransport.cpp b/worker/src/RTC/WebRtcTransport.cpp index 5443180c75..f4ae6a6663 100644 --- a/worker/src/RTC/WebRtcTransport.cpp +++ b/worker/src/RTC/WebRtcTransport.cpp @@ -1119,7 +1119,7 @@ namespace RTC packet->GetPayloadType(), packet->GetSequenceNumber()); - delete packet; + RtpPacket::Deallocate(packet); } return; From 527a4538217e9af04428a9d4d94b47a597778b7f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Luis=20Mill=C3=A1n?= Date: Wed, 7 Sep 2022 17:20:14 +0200 Subject: [PATCH 05/16] Remove dev logs --- worker/include/Utils.hpp | 25 +------------------------ 1 file changed, 1 insertion(+), 24 deletions(-) diff --git a/worker/include/Utils.hpp b/worker/include/Utils.hpp index 2722cb7142..d486d05d4e 100644 --- a/worker/include/Utils.hpp +++ b/worker/include/Utils.hpp @@ -9,7 +9,6 @@ #include "common.hpp" #include "Logger.hpp" -#include "handles/Timer.hpp" #include #include #include // std::memcmp(), std::memcpy() @@ -365,7 +364,7 @@ namespace Utils // Simple implementation of object pool only for single objects // Arrays are allocated as usual template - class ObjectPoolAllocator : public Timer::Listener + class ObjectPoolAllocator { std::shared_ptr> pool_data; @@ -385,9 +384,6 @@ namespace Utils } delete pool; }); - - this->timer = new Timer(this); - this->timer->Start(3000); } template @@ -398,10 +394,6 @@ namespace Utils ~ObjectPoolAllocator() { - if (this->timer) - { - this->timer->Stop(); - } } T* allocate(size_t n) @@ -410,8 +402,6 @@ namespace Utils if (this->pool_data->empty()) { - this->allocated++; - return static_cast(std::malloc(sizeof(T))); } @@ -440,19 +430,6 @@ namespace Utils this->pool_data->push_back(ptr); #endif } - - /* Pure virtual methods inherited from Timer::Listener. */ - public: - void OnTimer(Timer* timer) override - { - MS_ERROR("allocated: %zu, available: %zu", this->allocated, this->pool_data->size()); - - this->timer->Start(3000); - } - - private: - Timer* timer{ nullptr }; - size_t allocated{ 0 }; }; template From fff516c63dbdc72aab85cb41c94b280592e2ec95 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Luis=20Mill=C3=A1n?= Date: Wed, 7 Sep 2022 17:20:30 +0200 Subject: [PATCH 06/16] Adapt Tests and Fuzzer files --- worker/fuzzer/src/RTC/FuzzerRtpPacket.cpp | 6 ++---- worker/fuzzer/src/RTC/FuzzerRtpStreamSend.cpp | 3 ++- worker/test/src/RTC/TestRtpPacket.cpp | 5 +---- 3 files changed, 5 insertions(+), 9 deletions(-) diff --git a/worker/fuzzer/src/RTC/FuzzerRtpPacket.cpp b/worker/fuzzer/src/RTC/FuzzerRtpPacket.cpp index 4167e371d5..0536298b33 100644 --- a/worker/fuzzer/src/RTC/FuzzerRtpPacket.cpp +++ b/worker/fuzzer/src/RTC/FuzzerRtpPacket.cpp @@ -179,9 +179,7 @@ void Fuzzer::RTC::RtpPacket::Fuzz(const uint8_t* data, size_t len) packet->GetPayloadPadding(); packet->IsKeyFrame(); - auto* clonedPacket = packet->Clone(); - - delete clonedPacket; + auto clonedPacket = packet->Clone(); // TODO: packet->RtxEncode(); // This cannot be tested this way. // TODO: packet->RtxDecode(); // This cannot be tested this way. @@ -190,5 +188,5 @@ void Fuzzer::RTC::RtpPacket::Fuzz(const uint8_t* data, size_t len) // TODO: packet->ProcessPayload(); // TODO: packet->ShiftPayload(); - delete packet; + ::RTC::RtpPacket::Deallocate(packet); } diff --git a/worker/fuzzer/src/RTC/FuzzerRtpStreamSend.cpp b/worker/fuzzer/src/RTC/FuzzerRtpStreamSend.cpp index d85ad2700c..7d3b7b5370 100644 --- a/worker/fuzzer/src/RTC/FuzzerRtpStreamSend.cpp +++ b/worker/fuzzer/src/RTC/FuzzerRtpStreamSend.cpp @@ -60,5 +60,6 @@ void Fuzzer::RTC::RtpStreamSend::Fuzz(const uint8_t* data, size_t len) } delete stream; - delete packet; + + ::RTC::RtpPacket::Deallocate(packet); } diff --git a/worker/test/src/RTC/TestRtpPacket.cpp b/worker/test/src/RTC/TestRtpPacket.cpp index 56f9f16f01..4b9fb70e62 100644 --- a/worker/test/src/RTC/TestRtpPacket.cpp +++ b/worker/test/src/RTC/TestRtpPacket.cpp @@ -127,7 +127,7 @@ SCENARIO("parse RTP packets", "[parser][rtp]") REQUIRE(packet->ReadAbsSendTime(absSendTime) == true); REQUIRE(absSendTime == 0x65341e); - auto* clonedPacket = packet->Clone(); + auto clonedPacket = packet->Clone(); std::memset(buffer, '0', sizeof(buffer)); @@ -164,7 +164,6 @@ SCENARIO("parse RTP packets", "[parser][rtp]") REQUIRE(absSendTime == 0x65341e); delete packet; - delete clonedPacket; } SECTION("create RtpPacket without header extension") @@ -369,8 +368,6 @@ SCENARIO("parse RTP packets", "[parser][rtp]") REQUIRE(rtxPacket->GetHeaderExtensionLength() == 12); REQUIRE(rtxPacket->HasOneByteExtensions() == false); REQUIRE(rtxPacket->HasTwoBytesExtensions()); - - delete rtxPacket; } SECTION("create RtpPacket and apply payload shift to it") From 564876d5e8c72a124024eae0a0fe778e13b6548d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Luis=20Mill=C3=A1n?= Date: Wed, 7 Sep 2022 17:47:38 +0200 Subject: [PATCH 07/16] RtpPacket: remove comment --- worker/include/RTC/RtpPacket.hpp | 1 - 1 file changed, 1 deletion(-) diff --git a/worker/include/RTC/RtpPacket.hpp b/worker/include/RTC/RtpPacket.hpp index 79c72b4116..d622f82e29 100644 --- a/worker/include/RTC/RtpPacket.hpp +++ b/worker/include/RTC/RtpPacket.hpp @@ -615,7 +615,6 @@ namespace RTC void ShiftPayload(size_t payloadOffset, size_t shift, bool expand = true); private: - // friend AllocatorTraits; void ParseExtensions(); private: From 7ae537f2d152772c39292bb6df24a26b10966c14 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Luis=20Mill=C3=A1n?= Date: Wed, 7 Sep 2022 18:04:23 +0200 Subject: [PATCH 08/16] Add dot to end of comments --- worker/include/Utils.hpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/worker/include/Utils.hpp b/worker/include/Utils.hpp index d486d05d4e..7b09e5524c 100644 --- a/worker/include/Utils.hpp +++ b/worker/include/Utils.hpp @@ -361,8 +361,8 @@ namespace Utils } }; - // Simple implementation of object pool only for single objects - // Arrays are allocated as usual + // Simple implementation of object pool only for single objects. + // Arrays are allocated as usual. template class ObjectPoolAllocator { From d1f78a6f2c0c412791c29e7e3b0a9546b9c2aaba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Luis=20Mill=C3=A1n?= Date: Fri, 9 Sep 2022 12:39:08 +0200 Subject: [PATCH 09/16] Test: RtpPacket, add performance tests --- worker/test/src/RTC/TestRtpPacket.cpp | 146 ++++++++++++++++++++++++-- 1 file changed, 135 insertions(+), 11 deletions(-) diff --git a/worker/test/src/RTC/TestRtpPacket.cpp b/worker/test/src/RTC/TestRtpPacket.cpp index 4b9fb70e62..0da69b89ca 100644 --- a/worker/test/src/RTC/TestRtpPacket.cpp +++ b/worker/test/src/RTC/TestRtpPacket.cpp @@ -6,6 +6,8 @@ #include #include +// #define PERFORMANCE_TEST 1 + using namespace RTC; static uint8_t buffer[65536]; @@ -47,7 +49,7 @@ SCENARIO("parse RTP packets", "[parser][rtp]") REQUIRE(packet->ReadRid(rid) == false); REQUIRE(rid == ""); - delete packet; + RTC::RtpPacket::Deallocate(packet); } SECTION("parse packet2.raw") @@ -73,7 +75,7 @@ SCENARIO("parse RTP packets", "[parser][rtp]") REQUIRE(packet->HasOneByteExtensions() == false); REQUIRE(packet->HasTwoBytesExtensions() == false); - delete packet; + RTC::RtpPacket::Deallocate(packet); } SECTION("parse packet3.raw") @@ -163,7 +165,7 @@ SCENARIO("parse RTP packets", "[parser][rtp]") REQUIRE(clonedPacket->ReadAbsSendTime(absSendTime) == true); REQUIRE(absSendTime == 0x65341e); - delete packet; + RTC::RtpPacket::Deallocate(packet); } SECTION("create RtpPacket without header extension") @@ -191,7 +193,7 @@ SCENARIO("parse RTP packets", "[parser][rtp]") REQUIRE(packet->HasTwoBytesExtensions() == false); REQUIRE(packet->GetSsrc() == 5); - delete packet; + RTC::RtpPacket::Deallocate(packet); } SECTION("create RtpPacket with One-Byte header extension") @@ -232,7 +234,7 @@ SCENARIO("parse RTP packets", "[parser][rtp]") REQUIRE(packet->GetPayloadLength() == 1000); REQUIRE(packet->GetSize() == 1028); - delete packet; + RTC::RtpPacket::Deallocate(packet); } SECTION("create RtpPacket with Two-Bytes header extension") @@ -298,7 +300,7 @@ SCENARIO("parse RTP packets", "[parser][rtp]") REQUIRE(extenValue == nullptr); REQUIRE(extenLen == 0); - delete packet; + RTC::RtpPacket::Deallocate(packet); } SECTION("rtx encryption-decryption") @@ -339,7 +341,7 @@ SCENARIO("parse RTP packets", "[parser][rtp]") auto rtxPacket = packet->Clone(); - delete packet; + RTC::RtpPacket::Deallocate(packet); std::memset(buffer, '0', sizeof(buffer)); @@ -478,7 +480,7 @@ SCENARIO("parse RTP packets", "[parser][rtp]") REQUIRE(packet->GetPayloadPadding() == 0); REQUIRE(packet->GetSize() == 1028); - delete packet; + RTC::RtpPacket::Deallocate(packet); } SECTION("set One-Byte header extensions") @@ -642,7 +644,7 @@ SCENARIO("parse RTP packets", "[parser][rtp]") REQUIRE(extenValue[2] == 0x03); REQUIRE(extenValue[3] == 0x00); - delete packet; + RTC::RtpPacket::Deallocate(packet); } SECTION("set Two-Bytes header extensions") @@ -787,7 +789,7 @@ SCENARIO("parse RTP packets", "[parser][rtp]") REQUIRE(packet->HasExtension(24) == true); REQUIRE(extenLen == 4); - delete packet; + RTC::RtpPacket::Deallocate(packet); } SECTION("read frame-marking extension") @@ -837,6 +839,128 @@ SCENARIO("parse RTP packets", "[parser][rtp]") REQUIRE(frameMarking->lid == 1); REQUIRE(frameMarking->tl0picidx == 5); - delete packet; + RTC::RtpPacket::Deallocate(packet); + } + +#ifdef PERFORMANCE_TEST + SECTION("Parse()") + { + // clang-format off + uint8_t buffer[] = + { + 0b10010000, 0b00000001, 0, 8, + 0, 0, 0, 4, + 0, 0, 0, 5, + 0xBE, 0xDE, 0, 1, // Header Extension + 0b00110010, 0b10101011, 1, 5, + 1, 2, 3, 4 + }; + // clang-format on + + const uint8_t probes = 5; + size_t iterations = 1000000; + std::array durations; + + for (auto idx = 0; idx < probes; ++idx) + { + auto start = std::chrono::system_clock::now(); + + for (auto i = 0; i < iterations; ++i) + { + RtpPacket* packet = RtpPacket::Parse(buffer, sizeof(buffer)); + + if (!packet) + FAIL("not a RTP packet"); + + REQUIRE(packet->HasMarker() == false); + REQUIRE(packet->HasHeaderExtension() == true); + REQUIRE(packet->GetPayloadType() == 1); + REQUIRE(packet->GetSequenceNumber() == 8); + REQUIRE(packet->GetTimestamp() == 4); + REQUIRE(packet->GetSsrc() == 5); + REQUIRE(packet->GetHeaderExtensionId() == 0xBEDE); + REQUIRE(packet->GetHeaderExtensionLength() == 4); + REQUIRE(packet->HasOneByteExtensions()); + REQUIRE(packet->HasTwoBytesExtensions() == false); + REQUIRE(packet->GetPayloadLength() == 4); + + RTC::RtpPacket::Deallocate(packet); + } + + std::chrono::duration dur = std::chrono::system_clock::now() - start; + + durations[idx] = dur.count(); + } + + double sum{ 0 }; + + for (auto idx = 0; idx < probes; ++idx) + { + sum += durations[idx]; + } + + std::cout << +probes << " probes of " << iterations << " RtpPackets parsed in an AVG: \t" + << sum / probes << " seconds" << std::endl; + } + + SECTION("Clone()") + { + // clang-format off + uint8_t buffer[] = + { + 0b10010000, 0b00000001, 0, 8, + 0, 0, 0, 4, + 0, 0, 0, 5, + 0xBE, 0xDE, 0, 1, // Header Extension + 0b00110010, 0b10101011, 1, 5, + 1, 2, 3, 4 + }; + // clang-format on + + RtpPacket* packet = RtpPacket::Parse(buffer, sizeof(buffer)); + + if (!packet) + FAIL("not a RTP packet"); + + const uint8_t probes = 5; + size_t iterations = 1000000; + std::array durations; + + for (auto idx = 0; idx < probes; ++idx) + { + auto start = std::chrono::system_clock::now(); + + for (auto i = 0; i < iterations; ++i) + { + auto clone = packet->Clone(); + + REQUIRE(clone->HasMarker() == false); + REQUIRE(clone->HasHeaderExtension() == true); + REQUIRE(clone->GetPayloadType() == 1); + REQUIRE(clone->GetSequenceNumber() == 8); + REQUIRE(clone->GetTimestamp() == 4); + REQUIRE(clone->GetSsrc() == 5); + REQUIRE(clone->GetHeaderExtensionId() == 0xBEDE); + REQUIRE(clone->GetHeaderExtensionLength() == 4); + REQUIRE(clone->HasOneByteExtensions()); + REQUIRE(clone->HasTwoBytesExtensions() == false); + REQUIRE(clone->GetPayloadLength() == 4); + } + + std::chrono::duration dur = std::chrono::system_clock::now() - start; + + durations[idx] = dur.count(); + } + + double sum{ 0 }; + + for (auto idx = 0; idx < probes; ++idx) + { + sum += durations[idx]; + } + + std::cout << +probes << " probes of " << iterations << " RtpPackets cloned in an AVG: \t" + << sum / probes << " seconds" << std::endl; } +#endif } From 65e0805359ae4ac6eee64dd9e227ed63f115c582 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Luis=20Mill=C3=A1n?= Date: Fri, 9 Sep 2022 13:35:07 +0200 Subject: [PATCH 10/16] Test: adapt tests accordingly --- worker/test/src/RTC/TestRtpPacketH264Svc.cpp | 22 ++++++----------- worker/test/src/RTC/TestRtpStreamSend.cpp | 26 +++++++++++--------- 2 files changed, 22 insertions(+), 26 deletions(-) diff --git a/worker/test/src/RTC/TestRtpPacketH264Svc.cpp b/worker/test/src/RTC/TestRtpPacketH264Svc.cpp index 022bee4a13..195355fe15 100644 --- a/worker/test/src/RTC/TestRtpPacketH264Svc.cpp +++ b/worker/test/src/RTC/TestRtpPacketH264Svc.cpp @@ -12,7 +12,7 @@ using namespace RTC; static uint8_t buffer[65536]; static uint8_t buffer2[65536]; -SCENARIO("parse RTP packets with H264 SVC", "[parser][rtp]") +SCENARIO("parse RTP packets with H264 SVC", "[parser][rtp][h264]") { SECTION("parse I0-7.bin") { @@ -70,8 +70,7 @@ SCENARIO("parse RTP packets with H264 SVC", "[parser][rtp]") REQUIRE(payloadDescriptor->isKeyFrame == true); delete payloadDescriptor; - - delete packet; + RTC::RtpPacket::Deallocate(packet); } SECTION("parse I0-8.bin") @@ -130,8 +129,7 @@ SCENARIO("parse RTP packets with H264 SVC", "[parser][rtp]") REQUIRE(payloadDescriptor->isKeyFrame == false); delete payloadDescriptor; - - delete packet; + RTC::RtpPacket::Deallocate(packet); } SECTION("parse I0-5.bin") @@ -189,8 +187,7 @@ SCENARIO("parse RTP packets with H264 SVC", "[parser][rtp]") REQUIRE(payloadDescriptor->hasTlIndex == false); delete payloadDescriptor; - - delete packet; + RTC::RtpPacket::Deallocate(packet); } SECTION("parse I1-15.bin") @@ -249,8 +246,7 @@ SCENARIO("parse RTP packets with H264 SVC", "[parser][rtp]") REQUIRE(payloadDescriptor->isKeyFrame == false); delete payloadDescriptor; - - delete packet; + RTC::RtpPacket::Deallocate(packet); } SECTION("parse I0-14.bin") @@ -309,8 +305,7 @@ SCENARIO("parse RTP packets with H264 SVC", "[parser][rtp]") REQUIRE(payloadDescriptor->isKeyFrame == true); delete payloadDescriptor; - - delete packet; + RTC::RtpPacket::Deallocate(packet); } SECTION("parse 2SL-I14.bin") @@ -370,8 +365,7 @@ SCENARIO("parse RTP packets with H264 SVC", "[parser][rtp]") REQUIRE(payloadDescriptor->isKeyFrame == true); delete payloadDescriptor; - - delete packet; + RTC::RtpPacket::Deallocate(packet); } SECTION("create and test RTP files") @@ -446,7 +440,7 @@ SCENARIO("parse RTP packets with H264 SVC", "[parser][rtp]") rows++; delete payloadDescriptor; - delete packet; + RTC::RtpPacket::Deallocate(packet); } nf.close(); diff --git a/worker/test/src/RTC/TestRtpStreamSend.cpp b/worker/test/src/RTC/TestRtpStreamSend.cpp index 89f4e0aeed..6c28f6193f 100644 --- a/worker/test/src/RTC/TestRtpStreamSend.cpp +++ b/worker/test/src/RTC/TestRtpStreamSend.cpp @@ -441,23 +441,26 @@ SCENARIO("NACK and RTP packets retransmission", "[rtp][rtcp][nack]") auto start = std::chrono::system_clock::now(); + // Create packet. + auto* packet = RtpPacket::Parse(rtpBuffer1, 1500); + packet->SetSsrc(1111); + for (size_t i = 0; i < iterations; i++) { - // Create packet. - auto* packet = RtpPacket::Parse(rtpBuffer1, 1500); - packet->SetSsrc(1111); - - std::shared_ptr sharedPacket(packet); + std::shared_ptr sharedPacket; stream->ReceivePacket(packet, sharedPacket); + stream->Pause(); } std::chrono::duration dur = std::chrono::system_clock::now() - start; - std::cout << "nullptr && initialized shared_ptr: \t" << dur.count() << " seconds" << std::endl; + std::cout << iterations << " video RtpPackets processed in \t" << dur.count() << " seconds for a NACK enabled stream" << std::endl; delete stream; - params.mimeType.type = RTC::RtpCodecMimeType::Type::AUDIO; + // Perform the same test with NACK disabled. + + params.useNack = false; stream = new RtpStreamSend(&testRtpStreamListener, params, mid); start = std::chrono::system_clock::now(); @@ -466,17 +469,16 @@ SCENARIO("NACK and RTP packets retransmission", "[rtp][rtcp][nack]") { std::shared_ptr sharedPacket; - // Create packet. - auto* packet = RtpPacket::Parse(rtpBuffer1, 1500); - packet->SetSsrc(1111); - stream->ReceivePacket(packet, sharedPacket); + stream->Pause(); } dur = std::chrono::system_clock::now() - start; - std::cout << "raw && empty shared_ptr duration: \t" << dur.count() << " seconds" << std::endl; + std::cout << iterations << " video RtpPackets processed in \t" << dur.count() << " seconds for a NACK disabled stream" << std::endl; delete stream; + + RTC::RtpPacket::Deallocate(packet); } #endif } From 9bf740b51c4f10d50a0d7a6bee335e492afdc6f1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Luis=20Mill=C3=A1n?= Date: Fri, 9 Sep 2022 13:40:55 +0200 Subject: [PATCH 11/16] lint --- worker/test/src/RTC/TestRtpStreamSend.cpp | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/worker/test/src/RTC/TestRtpStreamSend.cpp b/worker/test/src/RTC/TestRtpStreamSend.cpp index 6c28f6193f..d465abd98d 100644 --- a/worker/test/src/RTC/TestRtpStreamSend.cpp +++ b/worker/test/src/RTC/TestRtpStreamSend.cpp @@ -6,7 +6,7 @@ #include #include -// #define PERFORMANCE_TEST 1 +#define PERFORMANCE_TEST 1 using namespace RTC; @@ -454,14 +454,15 @@ SCENARIO("NACK and RTP packets retransmission", "[rtp][rtcp][nack]") } std::chrono::duration dur = std::chrono::system_clock::now() - start; - std::cout << iterations << " video RtpPackets processed in \t" << dur.count() << " seconds for a NACK enabled stream" << std::endl; + std::cout << iterations << " video RtpPackets processed in \t" << dur.count() + << " seconds for a NACK enabled stream" << std::endl; delete stream; // Perform the same test with NACK disabled. params.useNack = false; - stream = new RtpStreamSend(&testRtpStreamListener, params, mid); + stream = new RtpStreamSend(&testRtpStreamListener, params, mid); start = std::chrono::system_clock::now(); @@ -474,7 +475,8 @@ SCENARIO("NACK and RTP packets retransmission", "[rtp][rtcp][nack]") } dur = std::chrono::system_clock::now() - start; - std::cout << iterations << " video RtpPackets processed in \t" << dur.count() << " seconds for a NACK disabled stream" << std::endl; + std::cout << iterations << " video RtpPackets processed in \t" << dur.count() + << " seconds for a NACK disabled stream" << std::endl; delete stream; From caf8189dcdd7effb1d924b1ba3471e4985e7b3f1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Luis=20Mill=C3=A1n?= Date: Fri, 9 Sep 2022 16:35:49 +0200 Subject: [PATCH 12/16] Define ObjectPoolAllocator in its own file --- worker/include/ObjectPoolAllocator.hpp | 88 ++++++++++++++++++++++++++ worker/include/RTC/RtpPacket.hpp | 1 + worker/include/RTC/RtpStreamSend.hpp | 1 + worker/include/RTC/Transport.hpp | 1 + worker/include/Utils.hpp | 81 ------------------------ 5 files changed, 91 insertions(+), 81 deletions(-) create mode 100644 worker/include/ObjectPoolAllocator.hpp diff --git a/worker/include/ObjectPoolAllocator.hpp b/worker/include/ObjectPoolAllocator.hpp new file mode 100644 index 0000000000..66c1e6804e --- /dev/null +++ b/worker/include/ObjectPoolAllocator.hpp @@ -0,0 +1,88 @@ +#ifndef MS_OBJECT_POOL_ALLOCATOR_HPP +#define MS_OBJECT_POOL_ALLOCATOR_HPP + +// #define MS_ALLOCATOR_FREE_ON_RETURN 1 + +#include "common.hpp" + +namespace Utils +{ + // Simple implementation of object pool only for single objects. + // Arrays are allocated as usual. + template + class ObjectPoolAllocator + { + std::shared_ptr> pool_data; + + public: + typedef T value_type; + thread_local static Utils::ObjectPoolAllocator Pool; + + ObjectPoolAllocator() + { + pool_data = std::shared_ptr>( + new std::vector(), + [](std::vector* pool) + { + for (auto* ptr : *pool) + { + std::free(ptr); + } + delete pool; + }); + } + + template + ObjectPoolAllocator(const ObjectPoolAllocator& other) + : pool_data(ObjectPoolAllocator::Pool.pool_data) + { + } + + ~ObjectPoolAllocator() + { + } + + T* allocate(size_t n) + { + if (n > 1) + { + return static_cast(std::malloc(sizeof(T) * n)); + } + + if (this->pool_data->empty()) + { + return static_cast(std::malloc(sizeof(T))); + } + + T* ptr = this->pool_data->back(); + this->pool_data->pop_back(); + + return ptr; + } + + void deallocate(T* ptr, size_t n) + { + if (!ptr) + { + return; + } + + if (n > 1) + { + std::free(ptr); + return; + } + +#ifdef MS_ALLOCATOR_FREE_ON_RETURN + std::free(ptr); +#else + this->pool_data->push_back(ptr); +#endif + } + }; + + template + thread_local Utils::ObjectPoolAllocator Utils::ObjectPoolAllocator::Pool; +} // namespace Utils + +#endif diff --git a/worker/include/RTC/RtpPacket.hpp b/worker/include/RTC/RtpPacket.hpp index d622f82e29..8a1140d602 100644 --- a/worker/include/RTC/RtpPacket.hpp +++ b/worker/include/RTC/RtpPacket.hpp @@ -2,6 +2,7 @@ #define MS_RTC_RTP_PACKET_HPP #include "common.hpp" +#include "ObjectPoolAllocator.hpp" #include "Utils.hpp" #include "RTC/Codecs/PayloadDescriptorHandler.hpp" #include diff --git a/worker/include/RTC/RtpStreamSend.hpp b/worker/include/RTC/RtpStreamSend.hpp index bb858802bf..2bfb0fdcec 100644 --- a/worker/include/RTC/RtpStreamSend.hpp +++ b/worker/include/RTC/RtpStreamSend.hpp @@ -1,6 +1,7 @@ #ifndef MS_RTC_RTP_STREAM_SEND_HPP #define MS_RTC_RTP_STREAM_SEND_HPP +#include "ObjectPoolAllocator.hpp" #include "RTC/RateCalculator.hpp" #include "RTC/RtpStream.hpp" #include diff --git a/worker/include/RTC/Transport.hpp b/worker/include/RTC/Transport.hpp index dd5734c197..198feb27b6 100644 --- a/worker/include/RTC/Transport.hpp +++ b/worker/include/RTC/Transport.hpp @@ -4,6 +4,7 @@ #include "common.hpp" #include "DepLibUV.hpp" +#include "ObjectPoolAllocator.hpp" #include "Channel/ChannelRequest.hpp" #include "Channel/ChannelSocket.hpp" #include "PayloadChannel/PayloadChannelNotification.hpp" diff --git a/worker/include/Utils.hpp b/worker/include/Utils.hpp index 7b09e5524c..b19415c346 100644 --- a/worker/include/Utils.hpp +++ b/worker/include/Utils.hpp @@ -1,14 +1,7 @@ #ifndef MS_UTILS_HPP #define MS_UTILS_HPP -#ifndef MS_CLASS -#define MS_CLASS "Utils" -#endif - -// #define MS_MEM_POOL_FREE_ON_RETURN 1 - #include "common.hpp" -#include "Logger.hpp" #include #include #include // std::memcmp(), std::memcpy() @@ -360,80 +353,6 @@ namespace Utils return false; } }; - - // Simple implementation of object pool only for single objects. - // Arrays are allocated as usual. - template - class ObjectPoolAllocator - { - std::shared_ptr> pool_data; - - public: - typedef T value_type; - thread_local static Utils::ObjectPoolAllocator Pool; - - ObjectPoolAllocator() - { - pool_data = std::shared_ptr>( - new std::vector(), - [](std::vector* pool) - { - for (auto* ptr : *pool) - { - std::free(ptr); - } - delete pool; - }); - } - - template - ObjectPoolAllocator(const ObjectPoolAllocator& other) - : pool_data(ObjectPoolAllocator::Pool.pool_data) - { - } - - ~ObjectPoolAllocator() - { - } - - T* allocate(size_t n) - { - MS_ASSERT(n == 1, "only single object can be allocated"); - - if (this->pool_data->empty()) - { - return static_cast(std::malloc(sizeof(T))); - } - - T* ptr = this->pool_data->back(); - this->pool_data->pop_back(); - - return ptr; - } - - void deallocate(T* ptr, size_t n) - { - if (!ptr) - { - return; - } - - if (n > 1) - { - std::free(ptr); - return; - } - -#ifdef MS_MEM_POOL_FREE_ON_RETURN - std::free(ptr); -#else - this->pool_data->push_back(ptr); -#endif - } - }; - - template - thread_local Utils::ObjectPoolAllocator Utils::ObjectPoolAllocator::Pool; } // namespace Utils #endif From 69dda848c3802415e3d4b85b5528ea206157eca7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Luis=20Mill=C3=A1n?= Date: Fri, 9 Sep 2022 16:48:42 +0200 Subject: [PATCH 13/16] add missing include --- worker/include/ObjectPoolAllocator.hpp | 1 + 1 file changed, 1 insertion(+) diff --git a/worker/include/ObjectPoolAllocator.hpp b/worker/include/ObjectPoolAllocator.hpp index 66c1e6804e..72f045dfb4 100644 --- a/worker/include/ObjectPoolAllocator.hpp +++ b/worker/include/ObjectPoolAllocator.hpp @@ -4,6 +4,7 @@ // #define MS_ALLOCATOR_FREE_ON_RETURN 1 #include "common.hpp" +#include namespace Utils { From 39475b31efd6aebcbdc16da95284bab502212f22 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Luis=20Mill=C3=A1n?= Date: Wed, 28 Sep 2022 16:37:56 +0200 Subject: [PATCH 14/16] RtpPacket: use AllocatorTraits::destroy instead destructor --- worker/src/RTC/RtpPacket.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/worker/src/RTC/RtpPacket.cpp b/worker/src/RTC/RtpPacket.cpp index 8ffe24d6a1..bef4613e5f 100644 --- a/worker/src/RTC/RtpPacket.cpp +++ b/worker/src/RTC/RtpPacket.cpp @@ -14,7 +14,7 @@ namespace RTC void RtpPacket::Deallocate(RtpPacket* packet) { // Destroy and deallocate the RtpPacket. - packet->~RtpPacket(); + RtpPacket::AllocatorTraits::destroy(RtpPacket::Allocator::Pool, packet); RtpPacket::Allocator::Pool.deallocate(packet, 1); } From 2a76d0e5aa27fac56fa7ee50d1daa5d1dd0e4f39 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Luis=20Mill=C3=A1n?= Date: Tue, 4 Oct 2022 12:35:06 +0200 Subject: [PATCH 15/16] Use AllocatorTraints::destroy --- worker/src/RTC/RtpPacket.cpp | 9 +++++---- worker/src/RTC/RtpStreamSend.cpp | 10 ++++------ worker/src/RTC/Transport.cpp | 2 ++ 3 files changed, 11 insertions(+), 10 deletions(-) diff --git a/worker/src/RTC/RtpPacket.cpp b/worker/src/RTC/RtpPacket.cpp index bef4613e5f..b787f3d066 100644 --- a/worker/src/RTC/RtpPacket.cpp +++ b/worker/src/RTC/RtpPacket.cpp @@ -14,8 +14,8 @@ namespace RTC void RtpPacket::Deallocate(RtpPacket* packet) { // Destroy and deallocate the RtpPacket. - RtpPacket::AllocatorTraits::destroy(RtpPacket::Allocator::Pool, packet); - RtpPacket::Allocator::Pool.deallocate(packet, 1); + AllocatorTraits::destroy(Allocator::Pool, packet); + Allocator::Pool.deallocate(packet, 1); } /* Class methods. */ @@ -168,8 +168,9 @@ namespace RTC // This is a cloned RtpPacket. if (this->buffer) { - this->buffer->~array(); - RtpPacket::BufferAllocator::Pool.deallocate(this->buffer, 1); + // Destroy and deallocate the RtpPacket buffer. + BufferAllocatorTraits::destroy(BufferAllocator::Pool, this->buffer); + BufferAllocator::Pool.deallocate(this->buffer, 1); this->buffer = nullptr; } } diff --git a/worker/src/RTC/RtpStreamSend.cpp b/worker/src/RTC/RtpStreamSend.cpp index 6ff363c248..68d81c2604 100644 --- a/worker/src/RTC/RtpStreamSend.cpp +++ b/worker/src/RTC/RtpStreamSend.cpp @@ -123,9 +123,8 @@ namespace RTC auto* storageItem = this->buffer[0]; - // Reset (free RTP packet) the old storage item. - storageItem->Reset(); - // Return into the pool. + // Destroy and deallocate the StorageItem. + StorageItem::AllocatorTraits::destroy(StorageItem::Allocator::Pool, storageItem); StorageItem::Allocator::Pool.deallocate(storageItem, 1); this->buffer[0] = nullptr; @@ -146,9 +145,8 @@ namespace RTC if (!storageItem) continue; - // Reset the storage item (decrease RTP packet shared pointer counter). - storageItem->Reset(); - // Return into the pool. + // Destroy and deallocate the StorageItem. + StorageItem::AllocatorTraits::destroy(StorageItem::Allocator::Pool, storageItem); StorageItem::Allocator::Pool.deallocate(storageItem, 1); } diff --git a/worker/src/RTC/Transport.cpp b/worker/src/RTC/Transport.cpp index 7be7b48ee7..54e540a58b 100644 --- a/worker/src/RTC/Transport.cpp +++ b/worker/src/RTC/Transport.cpp @@ -42,6 +42,7 @@ namespace RTC ctx->senderBwe->RtpPacketSent(ctx->sentInfo); } + OnSendCallbackCtx::AllocatorTraits::destroy(OnSendCallbackCtx::Allocator::Pool, ctx); OnSendCallbackCtx::Allocator::Pool.deallocate(ctx, 1); } #else @@ -50,6 +51,7 @@ namespace RTC if (sent) ctx->tccClient->PacketSent(ctx->packetInfo, DepLibUV::GetTimeMsInt64()); + OnSendCallbackCtx::AllocatorTraits::destroy(OnSendCallbackCtx::Allocator::Pool, ctx); OnSendCallbackCtx::Allocator::Pool.deallocate(ctx, 1); } #endif From dace86750ee05f97a99442cd4a532b463aa9a166 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Luis=20Mill=C3=A1n?= Date: Fri, 4 Nov 2022 10:26:09 +0100 Subject: [PATCH 16/16] Handle feedback, cosmetic --- worker/src/RTC/RtpPacket.cpp | 2 ++ worker/src/RTC/Transport.cpp | 3 +++ 2 files changed, 5 insertions(+) diff --git a/worker/src/RTC/RtpPacket.cpp b/worker/src/RTC/RtpPacket.cpp index b787f3d066..5492a55163 100644 --- a/worker/src/RTC/RtpPacket.cpp +++ b/worker/src/RTC/RtpPacket.cpp @@ -127,6 +127,7 @@ namespace RTC "packet's computed size does not match received size"); auto* rtpPacket = RtpPacket::Allocator::Pool.allocate(1); + RtpPacket::AllocatorTraits::construct( RtpPacket::Allocator::Pool, rtpPacket, @@ -662,6 +663,7 @@ namespace RTC MS_TRACE(); auto* buffer = RtpPacket::BufferAllocator::Pool.allocate(1); + RtpPacket::BufferAllocatorTraits::construct(RtpPacket::BufferAllocator::Pool, buffer); auto* ptr = const_cast(buffer->data()); diff --git a/worker/src/RTC/Transport.cpp b/worker/src/RTC/Transport.cpp index 54e540a58b..4a665ad80c 100644 --- a/worker/src/RTC/Transport.cpp +++ b/worker/src/RTC/Transport.cpp @@ -2561,6 +2561,7 @@ namespace RTC this->tccClient->InsertPacket(packetInfo); auto* ctx = OnSendCallbackCtx::Allocator::Pool.allocate(1); + OnSendCallbackCtx::AllocatorTraits::construct(OnSendCallbackCtx::Allocator::Pool, ctx); #ifdef ENABLE_RTC_SENDER_BANDWIDTH_ESTIMATOR auto* senderBwe = this->senderBwe; @@ -2619,6 +2620,7 @@ namespace RTC this->tccClient->InsertPacket(packetInfo); auto* ctx = OnSendCallbackCtx::Allocator::Pool.allocate(1); + OnSendCallbackCtx::AllocatorTraits::construct(OnSendCallbackCtx::Allocator::Pool, ctx); #ifdef ENABLE_RTC_SENDER_BANDWIDTH_ESTIMATOR auto* senderBwe = this->senderBwe; @@ -2944,6 +2946,7 @@ namespace RTC this->tccClient->InsertPacket(packetInfo); auto* ctx = OnSendCallbackCtx::Allocator::Pool.allocate(1); + OnSendCallbackCtx::AllocatorTraits::construct(OnSendCallbackCtx::Allocator::Pool, ctx); #ifdef ENABLE_RTC_SENDER_BANDWIDTH_ESTIMATOR auto* senderBwe = this->senderBwe;