diff --git a/.github/workflows/integration_tests.yml b/.github/workflows/integration_tests.yml index 16d89f587..b55e8593d 100644 --- a/.github/workflows/integration_tests.yml +++ b/.github/workflows/integration_tests.yml @@ -150,3 +150,4 @@ jobs: - name: Run ${{ inputs.context }} Tests run: | docker compose exec testing ./tests.sh ./tests/${{ inputs.context }} + diff --git a/.github/workflows/public_gateway.yml b/.github/workflows/public_gateway.yml index f2f4884d1..0744fa0da 100644 --- a/.github/workflows/public_gateway.yml +++ b/.github/workflows/public_gateway.yml @@ -149,3 +149,4 @@ jobs: service: ${{ matrix.ecs_service }} cluster: ${{ matrix.ecs_cluster }} wait-for-service-stability: true + diff --git a/components/freeswitch/Dockerfile b/components/freeswitch/Dockerfile index 0dc3895f2..3c7c6d223 100644 --- a/components/freeswitch/Dockerfile +++ b/components/freeswitch/Dockerfile @@ -13,8 +13,7 @@ RUN apt-get update && apt-get install --no-install-recommends -yq gnupg2 wget ca apt-get -y install --no-install-recommends libfreeswitch-dev COPY src/mod/mod_twilio_stream /usr/src/mod_twilio_stream -WORKDIR /usr/src/mod_twilio_stream/libs -RUN git clone https://github.com/DNedic/lockfree.git + RUN mkdir -p /usr/src/mod_twilio_stream/build WORKDIR /usr/src/mod_twilio_stream/build RUN cmake .. diff --git a/components/freeswitch/src/mod/mod_twilio_stream/CMakeLists.txt b/components/freeswitch/src/mod/mod_twilio_stream/CMakeLists.txt index 8466fde37..612f21515 100644 --- a/components/freeswitch/src/mod/mod_twilio_stream/CMakeLists.txt +++ b/components/freeswitch/src/mod/mod_twilio_stream/CMakeLists.txt @@ -41,7 +41,6 @@ target_include_directories( PRIVATE include ${FREESWITCH_INCLUDE_DIR} - libs/lockfree ) target_include_directories(mod_twilio_stream PRIVATE ) diff --git a/components/freeswitch/src/mod/mod_twilio_stream/include/audio_pipe.hpp b/components/freeswitch/src/mod/mod_twilio_stream/include/audio_pipe.hpp index f609bf8ca..87cf6742f 100644 --- a/components/freeswitch/src/mod/mod_twilio_stream/include/audio_pipe.hpp +++ b/components/freeswitch/src/mod/mod_twilio_stream/include/audio_pipe.hpp @@ -10,10 +10,6 @@ #include -#include "lockfree/lockfree.hpp" - -// 5s of L16 8Khz audio -#define MAX_AUDIO_BUFFER 5 * 2 * 8000 class AudioPipe { @@ -51,7 +47,7 @@ class AudioPipe // constructor AudioPipe(const char *uuid, const char *host, unsigned int port, const char *path, int sslFlags, - size_t bufLen, size_t minFreespace, const char *username, const char *password, const char *bugname, notifyHandler_t callback); + size_t bufLen, size_t bufInLen, size_t minFreespace, const char *username, const char *password, const char *bugname, notifyHandler_t callback); ~AudioPipe(); LwsState_t getLwsState(void) { return m_state; } @@ -81,11 +77,11 @@ class AudioPipe size_t binaryReadSpaceAvailable(void) { - return m_audio_buffer_in.GetFree(); + return m_audio_buffer_in_max_len - m_audio_buffer_in_len; } size_t binaryReadPtrCount(void) { - return m_audio_buffer_in.GetAvailable(); + return m_audio_buffer_in_len; } void binaryReadPush(uint8_t *data, size_t len); @@ -177,7 +173,12 @@ class AudioPipe size_t m_audio_buffer_out_min_freespace; // Stores data coming from the external socket server - lockfree::spsc::RingBuf m_audio_buffer_in; + uint8_t *m_audio_buffer_in; + size_t m_audio_buffer_in_max_len; + size_t m_audio_buffer_in_read_offset; + size_t m_audio_buffer_in_write_offset; + size_t m_audio_buffer_in_len; + std::vector m_marks; uint8_t *m_recv_buf; diff --git a/components/freeswitch/src/mod/mod_twilio_stream/src/audio_pipe.cpp b/components/freeswitch/src/mod/mod_twilio_stream/src/audio_pipe.cpp index 023ef464e..10e47b9b8 100644 --- a/components/freeswitch/src/mod/mod_twilio_stream/src/audio_pipe.cpp +++ b/components/freeswitch/src/mod/mod_twilio_stream/src/audio_pipe.cpp @@ -529,6 +529,7 @@ AudioPipe::AudioPipe(const char *uuid, const char *path, int sslFlags, size_t bufLen, + size_t bufInLen, size_t minFreespace, const char *username, const char *password, @@ -540,6 +541,10 @@ AudioPipe::AudioPipe(const char *uuid, m_sslFlags(sslFlags), m_audio_buffer_out_min_freespace(minFreespace), m_audio_buffer_out_max_len(bufLen), + m_audio_buffer_in_max_len(bufInLen), + m_audio_buffer_in_read_offset(0), + m_audio_buffer_in_write_offset(0), + m_audio_buffer_in_len(0), m_gracefulShutdown(false), m_audio_buffer_out_write_offset(LWS_PRE), m_recv_buf(nullptr), m_recv_buf_ptr(nullptr), m_bugname(bugname), @@ -553,11 +558,14 @@ AudioPipe::AudioPipe(const char *uuid, } m_audio_buffer_out = new uint8_t[m_audio_buffer_out_max_len]; + m_audio_buffer_in = new uint8_t[m_audio_buffer_in_max_len]; } AudioPipe::~AudioPipe() { if (m_audio_buffer_out) delete[] m_audio_buffer_out; + if (m_audio_buffer_in) + delete[] m_audio_buffer_in; if (m_recv_buf) delete[] m_recv_buf; } @@ -626,28 +634,60 @@ void AudioPipe::do_graceful_shutdown() } void AudioPipe::binaryReadPush(uint8_t *data, size_t len) { - m_audio_buffer_in.Write(data, len); + uint32_t avail = m_audio_buffer_in_max_len - m_audio_buffer_in_write_offset; + uint8_t *ptr = m_audio_buffer_in + m_audio_buffer_in_write_offset; + + if (len <= avail) + { + memcpy(ptr, data, len); + m_audio_buffer_in_write_offset = (m_audio_buffer_in_write_offset + len) % m_audio_buffer_in_max_len; + } + else + { + // Wrapping case + memcpy(ptr, data, avail); + memcpy(m_audio_buffer_in, &data[avail], len - avail); + m_audio_buffer_in_write_offset = (len - avail) % m_audio_buffer_in_max_len; + } + m_audio_buffer_in_len += len; } -size_t AudioPipe::binaryReadPop(uint8_t *data, size_t len) + +size_t AudioPipe::binaryReadPop(uint8_t *data, size_t data_len) { - size_t olen = std::min(len, m_audio_buffer_in.GetAvailable()); - if (olen > 0) + size_t len = std::min(data_len, m_audio_buffer_in_len); + if (len > 0) { - m_audio_buffer_in.Read(data, olen); + uint32_t avail = m_audio_buffer_in_max_len - m_audio_buffer_in_read_offset; + uint8_t *ptr = m_audio_buffer_in + m_audio_buffer_in_read_offset; + if (len <= avail) + { + memcpy(data, ptr, len); + m_audio_buffer_in_read_offset = (m_audio_buffer_in_read_offset + len) % m_audio_buffer_in_max_len; + } + else + { + // Wrapping case + memcpy(data, ptr, avail); + memcpy(&data[avail], m_audio_buffer_in, len - avail); + m_audio_buffer_in_read_offset = (len - avail) % m_audio_buffer_in_max_len; + } + m_audio_buffer_in_len -= len; for (int i = 0; i < m_marks.size(); i++) { - m_marks[i].buffer_index -= olen; + m_marks[i].buffer_index -= len; if (m_marks[i].buffer_index < 0) m_marks[i].buffer_index = 0; } } - return olen; + return len; } void AudioPipe::binaryReadClear() { - m_audio_buffer_in.Skip(m_audio_buffer_in.GetAvailable()); + m_audio_buffer_in_read_offset = 0; + m_audio_buffer_in_write_offset = 0; + m_audio_buffer_in_len = 0; for (int i = 0; i < m_marks.size(); i++) { m_marks[i].buffer_index = 0; @@ -658,7 +698,7 @@ void AudioPipe::binaryReadMark(std::string name) { audio_mark_t mark; mark.name = name; - mark.buffer_index = m_audio_buffer_in.GetAvailable(); + mark.buffer_index = m_audio_buffer_in_len; m_marks.push_back(mark); } diff --git a/components/freeswitch/src/mod/mod_twilio_stream/src/lws_glue.cpp b/components/freeswitch/src/mod/mod_twilio_stream/src/lws_glue.cpp index 4738dbb6d..1fe603b93 100644 --- a/components/freeswitch/src/mod/mod_twilio_stream/src/lws_glue.cpp +++ b/components/freeswitch/src/mod/mod_twilio_stream/src/lws_glue.cpp @@ -31,6 +31,8 @@ namespace static const char *requestedBufferSecs = std::getenv("MOD_TWILIO_STREAM_BUFFER_SECS"); static int nAudioBufferSecs = std::max(1, std::min(requestedBufferSecs ? ::atoi(requestedBufferSecs) : 2, 5)); + static const char *requestedInBufferSecs = std::getenv("MOD_TWILIO_STREAM_IN_BUFFER_SECS"); + static int nAudioInBufferSecs = std::max(1, std::min(requestedBufferSecs ? ::atoi(requestedBufferSecs) : 60, 120)); static const char *requestedBufferStartMSecs = std::getenv("MOD_TWILIO_STREAM_MIN_BUFFER_MILISECS"); static int nAudioBufferStartMSecs = std::max(0, std::min(requestedBufferStartMSecs ? ::atoi(requestedBufferStartMSecs) : 0, 0)); static const char *requestedNumServiceThreads = std::getenv("MOD_TWILIO_STREAM_SERVICE_THREADS"); @@ -184,6 +186,7 @@ namespace } else if (0 == type.compare("clear")) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "(%u) clearing audio \n", tech_pvt->id); AudioPipe *pAudioPipe = static_cast(tech_pvt->pAudioPipe); TwilioHelper *pTwilioHelper = static_cast(tech_pvt->pTwilioHelper); if (pAudioPipe != nullptr && pTwilioHelper != nullptr) @@ -301,9 +304,10 @@ namespace tech_pvt->graceful_shutdown = 0; size_t buflen = LWS_PRE + (FRAME_SIZE_8000 * desiredSampling / 8000 * channels * 1000 / RTP_PACKETIZATION_PERIOD * nAudioBufferSecs); + size_t bufInlen = (desiredSampling * channels * nAudioInBufferSecs); AudioPipe *ap = new AudioPipe(tech_pvt->sessionId, host, port, path, sslFlags, - buflen, read_impl.decoded_bytes_per_packet, username, password, bugname, eventCallback); + buflen, bufInlen, read_impl.decoded_bytes_per_packet, username, password, bugname, eventCallback); if (!ap) { switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "Error allocating AudioPipe\n"); @@ -488,7 +492,8 @@ extern "C" switch_status_t fork_init() { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "mod_twilio_stream: fork_init\n"); - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "mod_twilio_stream: audio buffer (in secs): %d secs\n", nAudioBufferSecs); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "mod_twilio_stream: audio output buffer (in secs): %d secs\n", nAudioBufferSecs); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "mod_twilio_stream: audio input buffer (in secs): %d secs\n", nAudioInBufferSecs); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "mod_twilio_stream: sub-protocol: %s\n", mySubProtocolName); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "mod_twilio_stream: lws service threads: %d\n", nServiceThreads); @@ -740,9 +745,7 @@ extern "C" tech_pvt->id, available, minBuffer); return SWITCH_TRUE; - } switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "(%u) buffer full (%d, %d) \n", - tech_pvt->id, - available, minBuffer); + } if (switch_mutex_trylock(tech_pvt->mutex) == SWITCH_STATUS_SUCCESS) { @@ -775,6 +778,7 @@ extern "C" TwilioHelper *pTwilioHelper = static_cast(tech_pvt->pTwilioHelper); if (pTwilioHelper) { + auto marks = pAudioPipe->clearExpiredMarks(); for (int i = 0; i < marks.size(); i++) { diff --git a/components/freeswitch/src/mod/mod_twilio_stream/src/twilio_helper.cpp b/components/freeswitch/src/mod/mod_twilio_stream/src/twilio_helper.cpp index a3de9ee3f..4c64241b4 100644 --- a/components/freeswitch/src/mod/mod_twilio_stream/src/twilio_helper.cpp +++ b/components/freeswitch/src/mod/mod_twilio_stream/src/twilio_helper.cpp @@ -7,7 +7,6 @@ #include "g711.h" #include "base64.hpp" -#include "lockfree/lockfree.hpp" using namespace std::chrono;