Skip to content

Commit

Permalink
Rewrite the buffer to remove thirdparty dependancies
Browse files Browse the repository at this point in the history
  • Loading branch information
jstahlbaum-fibernetics committed Jul 3, 2024
1 parent c6d59ed commit 5ea8755
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,6 @@

#include <libwebsockets.h>

#include "lockfree/lockfree.hpp"

// 60s of L16 8Khz audio
#define MAX_AUDIO_BUFFER 30 * 2 * 8000

class AudioPipe
{
Expand Down Expand Up @@ -51,7 +47,7 @@ class AudioPipe

// constructor
AudioPipe(const char *uuid, const char *host, unsigned int port, const char *path, int sslFlags,
size_t bufLen, size_t minFreespace, const char *username, const char *password, const char *bugname, notifyHandler_t callback);
size_t bufLen, size_t bufInLen, size_t minFreespace, const char *username, const char *password, const char *bugname, notifyHandler_t callback);
~AudioPipe();

LwsState_t getLwsState(void) { return m_state; }
Expand Down Expand Up @@ -81,11 +77,11 @@ class AudioPipe

size_t binaryReadSpaceAvailable(void)
{
return m_audio_buffer_in.GetFree();
return m_audio_buffer_in_max_len - m_audio_buffer_in_len;
}
size_t binaryReadPtrCount(void)
{
return m_audio_buffer_in.GetAvailable();
return m_audio_buffer_in_len;
}

void binaryReadPush(uint8_t *data, size_t len);
Expand Down Expand Up @@ -177,7 +173,11 @@ class AudioPipe
size_t m_audio_buffer_out_min_freespace;

// Stores data coming from the external socket server
lockfree::spsc::RingBuf<uint8_t, MAX_AUDIO_BUFFER> m_audio_buffer_in;
uint8_t *m_audio_buffer_in;
size_t m_audio_buffer_in_max_len;
size_t m_audio_buffer_in_write_offset;
size_t m_audio_buffer_in_len;

std::vector<audio_mark_t> m_marks;

uint8_t *m_recv_buf;
Expand Down
54 changes: 45 additions & 9 deletions components/freeswitch/src/mod/mod_twilio_stream/src/audio_pipe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,7 @@ AudioPipe::AudioPipe(const char *uuid,
const char *path,
int sslFlags,
size_t bufLen,
size_t bufInLen,
size_t minFreespace,
const char *username,
const char *password,
Expand All @@ -540,6 +541,9 @@ AudioPipe::AudioPipe(const char *uuid,
m_sslFlags(sslFlags),
m_audio_buffer_out_min_freespace(minFreespace),
m_audio_buffer_out_max_len(bufLen),
m_audio_buffer_in_max_len(bufInLen),
m_audio_buffer_in_write_offset(0),
m_audio_buffer_in_len(0),
m_gracefulShutdown(false),
m_audio_buffer_out_write_offset(LWS_PRE),
m_recv_buf(nullptr), m_recv_buf_ptr(nullptr), m_bugname(bugname),
Expand All @@ -553,11 +557,14 @@ AudioPipe::AudioPipe(const char *uuid,
}

m_audio_buffer_out = new uint8_t[m_audio_buffer_out_max_len];
m_audio_buffer_in = new uint8_t[m_audio_buffer_in_max_len];
}
AudioPipe::~AudioPipe()
{
if (m_audio_buffer_out)
delete[] m_audio_buffer_out;
if (m_audio_buffer_in)
delete[] m_audio_buffer_in;
if (m_recv_buf)
delete[] m_recv_buf;
}
Expand Down Expand Up @@ -626,28 +633,57 @@ void AudioPipe::do_graceful_shutdown()
}
void AudioPipe::binaryReadPush(uint8_t *data, size_t len)
{
m_audio_buffer_in.Write(data, len);
auto avail = m_audio_buffer_in_max_len - m_audio_buffer_in_write_offset;
auto ptr = m_audio_buffer_in + m_audio_buffer_in_write_offset;
if (len <= avail)
{
memcpy(ptr, data, len);
m_audio_buffer_in_write_offset += len;
}
else
{
// Wrapping case
memcpy(ptr, data, avail);
memcpy(m_audio_buffer_in, &data[avail], len - avail);
m_audio_buffer_in_write_offset = len - avail;
}
m_audio_buffer_in_len += len;
}
size_t AudioPipe::binaryReadPop(uint8_t *data, size_t len)
size_t AudioPipe::binaryReadPop(uint8_t *data, size_t data_len)
{
size_t olen = std::min(len, m_audio_buffer_in.GetAvailable());
if (olen > 0)
size_t len = std::min(data_len, m_audio_buffer_in_len);
if (len > 0)
{
m_audio_buffer_in.Read(data, olen);
auto ptr = m_audio_buffer_in + m_audio_buffer_in_write_offset;

if (m_audio_buffer_in_write_offset >= len)
{
memcpy(data, ptr - len, len);
m_audio_buffer_in_write_offset -= len;
}
else
{
auto end_segment = len - m_audio_buffer_in_write_offset;
memcpy(&data[end_segment], m_audio_buffer_in, m_audio_buffer_in_write_offset);
memcpy(data, m_audio_buffer_in + m_audio_buffer_in_max_len - end_segment, end_segment);
m_audio_buffer_in_write_offset = m_audio_buffer_in_max_len - end_segment;
}
m_audio_buffer_in_len -= len;

for (int i = 0; i < m_marks.size(); i++)
{
m_marks[i].buffer_index -= olen;
m_marks[i].buffer_index -= len;
if (m_marks[i].buffer_index < 0)
m_marks[i].buffer_index = 0;
}
}
return olen;
return len;
}

void AudioPipe::binaryReadClear()
{
m_audio_buffer_in.Skip(m_audio_buffer_in.GetAvailable());
m_audio_buffer_in_write_offset = 0;
m_audio_buffer_in_len = 0;
for (int i = 0; i < m_marks.size(); i++)
{
m_marks[i].buffer_index = 0;
Expand All @@ -658,7 +694,7 @@ void AudioPipe::binaryReadMark(std::string name)
{
audio_mark_t mark;
mark.name = name;
mark.buffer_index = m_audio_buffer_in.GetAvailable();
mark.buffer_index = m_audio_buffer_in_len;
m_marks.push_back(mark);
}

Expand Down
10 changes: 7 additions & 3 deletions components/freeswitch/src/mod/mod_twilio_stream/src/lws_glue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ namespace

static const char *requestedBufferSecs = std::getenv("MOD_TWILIO_STREAM_BUFFER_SECS");
static int nAudioBufferSecs = std::max(1, std::min(requestedBufferSecs ? ::atoi(requestedBufferSecs) : 2, 5));
static const char *requestedInBufferSecs = std::getenv("MOD_TWILIO_STREAM_IN_BUFFER_SECS");
static int nAudioInBufferSecs = std::max(1, std::min(requestedBufferSecs ? ::atoi(requestedBufferSecs) : 60, 120));
static const char *requestedBufferStartMSecs = std::getenv("MOD_TWILIO_STREAM_MIN_BUFFER_MILISECS");
static int nAudioBufferStartMSecs = std::max(0, std::min(requestedBufferStartMSecs ? ::atoi(requestedBufferStartMSecs) : 0, 0));
static const char *requestedNumServiceThreads = std::getenv("MOD_TWILIO_STREAM_SERVICE_THREADS");
Expand Down Expand Up @@ -302,9 +304,10 @@ namespace
tech_pvt->graceful_shutdown = 0;

size_t buflen = LWS_PRE + (FRAME_SIZE_8000 * desiredSampling / 8000 * channels * 1000 / RTP_PACKETIZATION_PERIOD * nAudioBufferSecs);

size_t bufInlen = LWS_PRE + (FRAME_SIZE_8000 * channels * nAudioInBufferSecs);

AudioPipe *ap = new AudioPipe(tech_pvt->sessionId, host, port, path, sslFlags,
buflen, read_impl.decoded_bytes_per_packet, username, password, bugname, eventCallback);
buflen, bufInlen, read_impl.decoded_bytes_per_packet, username, password, bugname, eventCallback);
if (!ap)
{
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "Error allocating AudioPipe\n");
Expand Down Expand Up @@ -489,7 +492,8 @@ extern "C"
switch_status_t fork_init()
{
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "mod_twilio_stream: fork_init\n");
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "mod_twilio_stream: audio buffer (in secs): %d secs\n", nAudioBufferSecs);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "mod_twilio_stream: audio output buffer (in secs): %d secs\n", nAudioBufferSecs);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "mod_twilio_stream: audio input buffer (in secs): %d secs\n", nAudioInBufferSecs);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "mod_twilio_stream: sub-protocol: %s\n", mySubProtocolName);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "mod_twilio_stream: lws service threads: %d\n", nServiceThreads);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
#include "g711.h"
#include "base64.hpp"

#include "lockfree/lockfree.hpp"

using namespace std::chrono;

Expand Down

0 comments on commit 5ea8755

Please sign in to comment.