Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rewrite audio buffer to remove third party dependancies #659

Merged
1 change: 1 addition & 0 deletions .github/workflows/integration_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -150,3 +150,4 @@ jobs:
- name: Run ${{ inputs.context }} Tests
run: |
docker compose exec testing ./tests.sh ./tests/${{ inputs.context }}

1 change: 1 addition & 0 deletions .github/workflows/public_gateway.yml
Original file line number Diff line number Diff line change
Expand Up @@ -149,3 +149,4 @@ jobs:
service: ${{ matrix.ecs_service }}
cluster: ${{ matrix.ecs_cluster }}
wait-for-service-stability: true

3 changes: 1 addition & 2 deletions components/freeswitch/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@ RUN apt-get update && apt-get install --no-install-recommends -yq gnupg2 wget ca
apt-get -y install --no-install-recommends libfreeswitch-dev

COPY src/mod/mod_twilio_stream /usr/src/mod_twilio_stream
WORKDIR /usr/src/mod_twilio_stream/libs
RUN git clone https://github.com/DNedic/lockfree.git

RUN mkdir -p /usr/src/mod_twilio_stream/build
WORKDIR /usr/src/mod_twilio_stream/build
RUN cmake ..
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ target_include_directories(
PRIVATE
include
${FREESWITCH_INCLUDE_DIR}
libs/lockfree
)

target_include_directories(mod_twilio_stream PRIVATE )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,6 @@

#include <libwebsockets.h>

#include "lockfree/lockfree.hpp"

// 5s of L16 8Khz audio
#define MAX_AUDIO_BUFFER 5 * 2 * 8000

class AudioPipe
{
Expand Down Expand Up @@ -51,7 +47,7 @@ class AudioPipe

// constructor
AudioPipe(const char *uuid, const char *host, unsigned int port, const char *path, int sslFlags,
size_t bufLen, size_t minFreespace, const char *username, const char *password, const char *bugname, notifyHandler_t callback);
size_t bufLen, size_t bufInLen, size_t minFreespace, const char *username, const char *password, const char *bugname, notifyHandler_t callback);
~AudioPipe();

LwsState_t getLwsState(void) { return m_state; }
Expand Down Expand Up @@ -81,11 +77,11 @@ class AudioPipe

size_t binaryReadSpaceAvailable(void)
{
return m_audio_buffer_in.GetFree();
return m_audio_buffer_in_max_len - m_audio_buffer_in_len;
}
size_t binaryReadPtrCount(void)
{
return m_audio_buffer_in.GetAvailable();
return m_audio_buffer_in_len;
}

void binaryReadPush(uint8_t *data, size_t len);
Expand Down Expand Up @@ -177,7 +173,12 @@ class AudioPipe
size_t m_audio_buffer_out_min_freespace;

// Stores data coming from the external socket server
lockfree::spsc::RingBuf<uint8_t, MAX_AUDIO_BUFFER> m_audio_buffer_in;
uint8_t *m_audio_buffer_in;
size_t m_audio_buffer_in_max_len;
size_t m_audio_buffer_in_read_offset;
size_t m_audio_buffer_in_write_offset;
size_t m_audio_buffer_in_len;

std::vector<audio_mark_t> m_marks;

uint8_t *m_recv_buf;
Expand Down
58 changes: 49 additions & 9 deletions components/freeswitch/src/mod/mod_twilio_stream/src/audio_pipe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,7 @@ AudioPipe::AudioPipe(const char *uuid,
const char *path,
int sslFlags,
size_t bufLen,
size_t bufInLen,
size_t minFreespace,
const char *username,
const char *password,
Expand All @@ -540,6 +541,10 @@ AudioPipe::AudioPipe(const char *uuid,
m_sslFlags(sslFlags),
m_audio_buffer_out_min_freespace(minFreespace),
m_audio_buffer_out_max_len(bufLen),
m_audio_buffer_in_max_len(bufInLen),
m_audio_buffer_in_read_offset(0),
m_audio_buffer_in_write_offset(0),
m_audio_buffer_in_len(0),
m_gracefulShutdown(false),
m_audio_buffer_out_write_offset(LWS_PRE),
m_recv_buf(nullptr), m_recv_buf_ptr(nullptr), m_bugname(bugname),
Expand All @@ -553,11 +558,14 @@ AudioPipe::AudioPipe(const char *uuid,
}

m_audio_buffer_out = new uint8_t[m_audio_buffer_out_max_len];
m_audio_buffer_in = new uint8_t[m_audio_buffer_in_max_len];
}
AudioPipe::~AudioPipe()
{
if (m_audio_buffer_out)
delete[] m_audio_buffer_out;
if (m_audio_buffer_in)
delete[] m_audio_buffer_in;
if (m_recv_buf)
delete[] m_recv_buf;
}
Expand Down Expand Up @@ -626,28 +634,60 @@ void AudioPipe::do_graceful_shutdown()
}
void AudioPipe::binaryReadPush(uint8_t *data, size_t len)
{
m_audio_buffer_in.Write(data, len);
uint32_t avail = m_audio_buffer_in_max_len - m_audio_buffer_in_write_offset;
uint8_t *ptr = m_audio_buffer_in + m_audio_buffer_in_write_offset;

if (len <= avail)
{
memcpy(ptr, data, len);
m_audio_buffer_in_write_offset = (m_audio_buffer_in_write_offset + len) % m_audio_buffer_in_max_len;
}
else
{
// Wrapping case
memcpy(ptr, data, avail);
memcpy(m_audio_buffer_in, &data[avail], len - avail);
m_audio_buffer_in_write_offset = (len - avail) % m_audio_buffer_in_max_len;
}
m_audio_buffer_in_len += len;
}
size_t AudioPipe::binaryReadPop(uint8_t *data, size_t len)

size_t AudioPipe::binaryReadPop(uint8_t *data, size_t data_len)
{
size_t olen = std::min(len, m_audio_buffer_in.GetAvailable());
if (olen > 0)
size_t len = std::min(data_len, m_audio_buffer_in_len);
if (len > 0)
{
m_audio_buffer_in.Read(data, olen);
uint32_t avail = m_audio_buffer_in_max_len - m_audio_buffer_in_read_offset;
uint8_t *ptr = m_audio_buffer_in + m_audio_buffer_in_read_offset;
if (len <= avail)
{
memcpy(data, ptr, len);
m_audio_buffer_in_read_offset = (m_audio_buffer_in_read_offset + len) % m_audio_buffer_in_max_len;
}
else
{
// Wrapping case
memcpy(data, ptr, avail);
memcpy(&data[avail], m_audio_buffer_in, len - avail);
m_audio_buffer_in_read_offset = (len - avail) % m_audio_buffer_in_max_len;
}
m_audio_buffer_in_len -= len;

for (int i = 0; i < m_marks.size(); i++)
{
m_marks[i].buffer_index -= olen;
m_marks[i].buffer_index -= len;
if (m_marks[i].buffer_index < 0)
m_marks[i].buffer_index = 0;
}
}
return olen;
return len;
}

void AudioPipe::binaryReadClear()
{
m_audio_buffer_in.Skip(m_audio_buffer_in.GetAvailable());
m_audio_buffer_in_read_offset = 0;
m_audio_buffer_in_write_offset = 0;
m_audio_buffer_in_len = 0;
for (int i = 0; i < m_marks.size(); i++)
{
m_marks[i].buffer_index = 0;
Expand All @@ -658,7 +698,7 @@ void AudioPipe::binaryReadMark(std::string name)
{
audio_mark_t mark;
mark.name = name;
mark.buffer_index = m_audio_buffer_in.GetAvailable();
mark.buffer_index = m_audio_buffer_in_len;
m_marks.push_back(mark);
}

Expand Down
14 changes: 9 additions & 5 deletions components/freeswitch/src/mod/mod_twilio_stream/src/lws_glue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ namespace

static const char *requestedBufferSecs = std::getenv("MOD_TWILIO_STREAM_BUFFER_SECS");
static int nAudioBufferSecs = std::max(1, std::min(requestedBufferSecs ? ::atoi(requestedBufferSecs) : 2, 5));
static const char *requestedInBufferSecs = std::getenv("MOD_TWILIO_STREAM_IN_BUFFER_SECS");
static int nAudioInBufferSecs = std::max(1, std::min(requestedBufferSecs ? ::atoi(requestedBufferSecs) : 60, 120));
static const char *requestedBufferStartMSecs = std::getenv("MOD_TWILIO_STREAM_MIN_BUFFER_MILISECS");
static int nAudioBufferStartMSecs = std::max(0, std::min(requestedBufferStartMSecs ? ::atoi(requestedBufferStartMSecs) : 0, 0));
static const char *requestedNumServiceThreads = std::getenv("MOD_TWILIO_STREAM_SERVICE_THREADS");
Expand Down Expand Up @@ -184,6 +186,7 @@ namespace
}
else if (0 == type.compare("clear"))
{
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "(%u) clearing audio \n", tech_pvt->id);
AudioPipe *pAudioPipe = static_cast<AudioPipe *>(tech_pvt->pAudioPipe);
TwilioHelper *pTwilioHelper = static_cast<TwilioHelper *>(tech_pvt->pTwilioHelper);
if (pAudioPipe != nullptr && pTwilioHelper != nullptr)
Expand Down Expand Up @@ -301,9 +304,10 @@ namespace
tech_pvt->graceful_shutdown = 0;

size_t buflen = LWS_PRE + (FRAME_SIZE_8000 * desiredSampling / 8000 * channels * 1000 / RTP_PACKETIZATION_PERIOD * nAudioBufferSecs);
size_t bufInlen = (desiredSampling * channels * nAudioInBufferSecs);

AudioPipe *ap = new AudioPipe(tech_pvt->sessionId, host, port, path, sslFlags,
buflen, read_impl.decoded_bytes_per_packet, username, password, bugname, eventCallback);
buflen, bufInlen, read_impl.decoded_bytes_per_packet, username, password, bugname, eventCallback);
if (!ap)
{
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "Error allocating AudioPipe\n");
Expand Down Expand Up @@ -488,7 +492,8 @@ extern "C"
switch_status_t fork_init()
{
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "mod_twilio_stream: fork_init\n");
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "mod_twilio_stream: audio buffer (in secs): %d secs\n", nAudioBufferSecs);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "mod_twilio_stream: audio output buffer (in secs): %d secs\n", nAudioBufferSecs);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "mod_twilio_stream: audio input buffer (in secs): %d secs\n", nAudioInBufferSecs);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "mod_twilio_stream: sub-protocol: %s\n", mySubProtocolName);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "mod_twilio_stream: lws service threads: %d\n", nServiceThreads);

Expand Down Expand Up @@ -740,9 +745,7 @@ extern "C"
tech_pvt->id,
available, minBuffer);
return SWITCH_TRUE;
} switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "(%u) buffer full (%d, %d) \n",
tech_pvt->id,
available, minBuffer);
}

if (switch_mutex_trylock(tech_pvt->mutex) == SWITCH_STATUS_SUCCESS)
{
Expand Down Expand Up @@ -775,6 +778,7 @@ extern "C"
TwilioHelper *pTwilioHelper = static_cast<TwilioHelper *>(tech_pvt->pTwilioHelper);
if (pTwilioHelper)
{

auto marks = pAudioPipe->clearExpiredMarks();
for (int i = 0; i < marks.size(); i++)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
#include "g711.h"
#include "base64.hpp"

#include "lockfree/lockfree.hpp"

using namespace std::chrono;

Expand Down