Skip to content

Commit

Permalink
Rewrite audio buffer to remove third party dependancies (#659)
Browse files Browse the repository at this point in the history
  • Loading branch information
jstahlbaum-fibernetics committed Jul 4, 2024
1 parent 1425bc4 commit ec462e5
Show file tree
Hide file tree
Showing 8 changed files with 70 additions and 26 deletions.
1 change: 1 addition & 0 deletions .github/workflows/integration_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -150,3 +150,4 @@ jobs:
- name: Run ${{ inputs.context }} Tests
run: |
docker compose exec testing ./tests.sh ./tests/${{ inputs.context }}
1 change: 1 addition & 0 deletions .github/workflows/public_gateway.yml
Original file line number Diff line number Diff line change
Expand Up @@ -149,3 +149,4 @@ jobs:
service: ${{ matrix.ecs_service }}
cluster: ${{ matrix.ecs_cluster }}
wait-for-service-stability: true

3 changes: 1 addition & 2 deletions components/freeswitch/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@ RUN apt-get update && apt-get install --no-install-recommends -yq gnupg2 wget ca
apt-get -y install --no-install-recommends libfreeswitch-dev

COPY src/mod/mod_twilio_stream /usr/src/mod_twilio_stream
WORKDIR /usr/src/mod_twilio_stream/libs
RUN git clone https://github.com/DNedic/lockfree.git

RUN mkdir -p /usr/src/mod_twilio_stream/build
WORKDIR /usr/src/mod_twilio_stream/build
RUN cmake ..
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ target_include_directories(
PRIVATE
include
${FREESWITCH_INCLUDE_DIR}
libs/lockfree
)

target_include_directories(mod_twilio_stream PRIVATE )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,6 @@

#include <libwebsockets.h>

#include "lockfree/lockfree.hpp"

// 5s of L16 8Khz audio
#define MAX_AUDIO_BUFFER 5 * 2 * 8000

class AudioPipe
{
Expand Down Expand Up @@ -51,7 +47,7 @@ class AudioPipe

// constructor
AudioPipe(const char *uuid, const char *host, unsigned int port, const char *path, int sslFlags,
size_t bufLen, size_t minFreespace, const char *username, const char *password, const char *bugname, notifyHandler_t callback);
size_t bufLen, size_t bufInLen, size_t minFreespace, const char *username, const char *password, const char *bugname, notifyHandler_t callback);
~AudioPipe();

LwsState_t getLwsState(void) { return m_state; }
Expand Down Expand Up @@ -81,11 +77,11 @@ class AudioPipe

size_t binaryReadSpaceAvailable(void)
{
return m_audio_buffer_in.GetFree();
return m_audio_buffer_in_max_len - m_audio_buffer_in_len;
}
size_t binaryReadPtrCount(void)
{
return m_audio_buffer_in.GetAvailable();
return m_audio_buffer_in_len;
}

void binaryReadPush(uint8_t *data, size_t len);
Expand Down Expand Up @@ -177,7 +173,12 @@ class AudioPipe
size_t m_audio_buffer_out_min_freespace;

// Stores data coming from the external socket server
lockfree::spsc::RingBuf<uint8_t, MAX_AUDIO_BUFFER> m_audio_buffer_in;
uint8_t *m_audio_buffer_in;
size_t m_audio_buffer_in_max_len;
size_t m_audio_buffer_in_read_offset;
size_t m_audio_buffer_in_write_offset;
size_t m_audio_buffer_in_len;

std::vector<audio_mark_t> m_marks;

uint8_t *m_recv_buf;
Expand Down
58 changes: 49 additions & 9 deletions components/freeswitch/src/mod/mod_twilio_stream/src/audio_pipe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,7 @@ AudioPipe::AudioPipe(const char *uuid,
const char *path,
int sslFlags,
size_t bufLen,
size_t bufInLen,
size_t minFreespace,
const char *username,
const char *password,
Expand All @@ -540,6 +541,10 @@ AudioPipe::AudioPipe(const char *uuid,
m_sslFlags(sslFlags),
m_audio_buffer_out_min_freespace(minFreespace),
m_audio_buffer_out_max_len(bufLen),
m_audio_buffer_in_max_len(bufInLen),
m_audio_buffer_in_read_offset(0),
m_audio_buffer_in_write_offset(0),
m_audio_buffer_in_len(0),
m_gracefulShutdown(false),
m_audio_buffer_out_write_offset(LWS_PRE),
m_recv_buf(nullptr), m_recv_buf_ptr(nullptr), m_bugname(bugname),
Expand All @@ -553,11 +558,14 @@ AudioPipe::AudioPipe(const char *uuid,
}

m_audio_buffer_out = new uint8_t[m_audio_buffer_out_max_len];
m_audio_buffer_in = new uint8_t[m_audio_buffer_in_max_len];
}
AudioPipe::~AudioPipe()
{
if (m_audio_buffer_out)
delete[] m_audio_buffer_out;
if (m_audio_buffer_in)
delete[] m_audio_buffer_in;
if (m_recv_buf)
delete[] m_recv_buf;
}
Expand Down Expand Up @@ -626,28 +634,60 @@ void AudioPipe::do_graceful_shutdown()
}
void AudioPipe::binaryReadPush(uint8_t *data, size_t len)
{
m_audio_buffer_in.Write(data, len);
uint32_t avail = m_audio_buffer_in_max_len - m_audio_buffer_in_write_offset;
uint8_t *ptr = m_audio_buffer_in + m_audio_buffer_in_write_offset;

if (len <= avail)
{
memcpy(ptr, data, len);
m_audio_buffer_in_write_offset = (m_audio_buffer_in_write_offset + len) % m_audio_buffer_in_max_len;
}
else
{
// Wrapping case
memcpy(ptr, data, avail);
memcpy(m_audio_buffer_in, &data[avail], len - avail);
m_audio_buffer_in_write_offset = (len - avail) % m_audio_buffer_in_max_len;
}
m_audio_buffer_in_len += len;
}
size_t AudioPipe::binaryReadPop(uint8_t *data, size_t len)

size_t AudioPipe::binaryReadPop(uint8_t *data, size_t data_len)
{
size_t olen = std::min(len, m_audio_buffer_in.GetAvailable());
if (olen > 0)
size_t len = std::min(data_len, m_audio_buffer_in_len);
if (len > 0)
{
m_audio_buffer_in.Read(data, olen);
uint32_t avail = m_audio_buffer_in_max_len - m_audio_buffer_in_read_offset;
uint8_t *ptr = m_audio_buffer_in + m_audio_buffer_in_read_offset;
if (len <= avail)
{
memcpy(data, ptr, len);
m_audio_buffer_in_read_offset = (m_audio_buffer_in_read_offset + len) % m_audio_buffer_in_max_len;
}
else
{
// Wrapping case
memcpy(data, ptr, avail);
memcpy(&data[avail], m_audio_buffer_in, len - avail);
m_audio_buffer_in_read_offset = (len - avail) % m_audio_buffer_in_max_len;
}
m_audio_buffer_in_len -= len;

for (int i = 0; i < m_marks.size(); i++)
{
m_marks[i].buffer_index -= olen;
m_marks[i].buffer_index -= len;
if (m_marks[i].buffer_index < 0)
m_marks[i].buffer_index = 0;
}
}
return olen;
return len;
}

void AudioPipe::binaryReadClear()
{
m_audio_buffer_in.Skip(m_audio_buffer_in.GetAvailable());
m_audio_buffer_in_read_offset = 0;
m_audio_buffer_in_write_offset = 0;
m_audio_buffer_in_len = 0;
for (int i = 0; i < m_marks.size(); i++)
{
m_marks[i].buffer_index = 0;
Expand All @@ -658,7 +698,7 @@ void AudioPipe::binaryReadMark(std::string name)
{
audio_mark_t mark;
mark.name = name;
mark.buffer_index = m_audio_buffer_in.GetAvailable();
mark.buffer_index = m_audio_buffer_in_len;
m_marks.push_back(mark);
}

Expand Down
14 changes: 9 additions & 5 deletions components/freeswitch/src/mod/mod_twilio_stream/src/lws_glue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ namespace

static const char *requestedBufferSecs = std::getenv("MOD_TWILIO_STREAM_BUFFER_SECS");
static int nAudioBufferSecs = std::max(1, std::min(requestedBufferSecs ? ::atoi(requestedBufferSecs) : 2, 5));
static const char *requestedInBufferSecs = std::getenv("MOD_TWILIO_STREAM_IN_BUFFER_SECS");
static int nAudioInBufferSecs = std::max(1, std::min(requestedBufferSecs ? ::atoi(requestedBufferSecs) : 60, 120));
static const char *requestedBufferStartMSecs = std::getenv("MOD_TWILIO_STREAM_MIN_BUFFER_MILISECS");
static int nAudioBufferStartMSecs = std::max(0, std::min(requestedBufferStartMSecs ? ::atoi(requestedBufferStartMSecs) : 0, 0));
static const char *requestedNumServiceThreads = std::getenv("MOD_TWILIO_STREAM_SERVICE_THREADS");
Expand Down Expand Up @@ -184,6 +186,7 @@ namespace
}
else if (0 == type.compare("clear"))
{
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "(%u) clearing audio \n", tech_pvt->id);
AudioPipe *pAudioPipe = static_cast<AudioPipe *>(tech_pvt->pAudioPipe);
TwilioHelper *pTwilioHelper = static_cast<TwilioHelper *>(tech_pvt->pTwilioHelper);
if (pAudioPipe != nullptr && pTwilioHelper != nullptr)
Expand Down Expand Up @@ -301,9 +304,10 @@ namespace
tech_pvt->graceful_shutdown = 0;

size_t buflen = LWS_PRE + (FRAME_SIZE_8000 * desiredSampling / 8000 * channels * 1000 / RTP_PACKETIZATION_PERIOD * nAudioBufferSecs);
size_t bufInlen = (desiredSampling * channels * nAudioInBufferSecs);

AudioPipe *ap = new AudioPipe(tech_pvt->sessionId, host, port, path, sslFlags,
buflen, read_impl.decoded_bytes_per_packet, username, password, bugname, eventCallback);
buflen, bufInlen, read_impl.decoded_bytes_per_packet, username, password, bugname, eventCallback);
if (!ap)
{
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "Error allocating AudioPipe\n");
Expand Down Expand Up @@ -488,7 +492,8 @@ extern "C"
switch_status_t fork_init()
{
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "mod_twilio_stream: fork_init\n");
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "mod_twilio_stream: audio buffer (in secs): %d secs\n", nAudioBufferSecs);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "mod_twilio_stream: audio output buffer (in secs): %d secs\n", nAudioBufferSecs);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "mod_twilio_stream: audio input buffer (in secs): %d secs\n", nAudioInBufferSecs);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "mod_twilio_stream: sub-protocol: %s\n", mySubProtocolName);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "mod_twilio_stream: lws service threads: %d\n", nServiceThreads);

Expand Down Expand Up @@ -740,9 +745,7 @@ extern "C"
tech_pvt->id,
available, minBuffer);
return SWITCH_TRUE;
} switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "(%u) buffer full (%d, %d) \n",
tech_pvt->id,
available, minBuffer);
}

if (switch_mutex_trylock(tech_pvt->mutex) == SWITCH_STATUS_SUCCESS)
{
Expand Down Expand Up @@ -775,6 +778,7 @@ extern "C"
TwilioHelper *pTwilioHelper = static_cast<TwilioHelper *>(tech_pvt->pTwilioHelper);
if (pTwilioHelper)
{

auto marks = pAudioPipe->clearExpiredMarks();
for (int i = 0; i < marks.size(); i++)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
#include "g711.h"
#include "base64.hpp"

#include "lockfree/lockfree.hpp"

using namespace std::chrono;

Expand Down

0 comments on commit ec462e5

Please sign in to comment.