Skip to content

Commit

Permalink
Events are working in Ruby
Browse files Browse the repository at this point in the history
  • Loading branch information
dwilkie committed Mar 23, 2024
1 parent 4e7f4dd commit a3ad366
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 53 deletions.
20 changes: 13 additions & 7 deletions components/app/app/workflows/execute_connect.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,25 @@

class ExecuteConnect < ExecuteTwiMLVerb
class ExecuteCommand < ExecuteTwiMLVerb
Event = Struct.new(:type, keyword_init: true) do
CHANNEL_PREFIX = "mod_twilio_stream".freeze

Event = Struct.new(:type, :disconnect?, keyword_init: true) do
DISCONNECT_EVENTS = [ "connect_failed", "disconnect" ].freeze

def self.parse(payload)
message = JSON.parse(payload)

new(
type: ActiveSupport::StringInquirer.new(message.fetch("event"))
type: ActiveSupport::StringInquirer.new(message.fetch("event")),
disconnect?: message.fetch("event").in?(DISCONNECT_EVENTS)
)
end
end

def execute(stream_sid:, **)
context.write_and_await_response(build_command(stream_sid:, **))

subscribe_to_stream_events("mod_twilio_stream:#{stream_sid}")
subscribe_to_stream_events("#{CHANNEL_PREFIX}:#{stream_sid}") do
context.write_and_await_response(build_command(stream_sid:, **))
end
end

private
Expand All @@ -33,12 +38,13 @@ def build_command(url:, stream_sid:, custom_parameters:)
)
end

def subscribe_to_stream_events(channel_name)
def subscribe_to_stream_events(channel_name, &)
AppSettings.redis.with do |redis|
redis.subscribe(channel_name) do |on|
on.subscribe(&)
on.message do |_channel, message|
event = Event.parse(message)
redis.unsubscribe(channel_name) if event.type.closed?
redis.unsubscribe(channel_name) if event.disconnect?
end
end
end
Expand Down
63 changes: 32 additions & 31 deletions components/app/spec/call_controllers/connect_spec.rb
Original file line number Diff line number Diff line change
@@ -1,29 +1,9 @@
require "spec_helper"

RSpec.describe CallController, type: :call_controller do
# stream_sid from VCR Cassette
def stub_twilio_stream(controller, with_events: [], stream_sid: "393a227f-0602-4024-b38a-dcbbeed4d5a0")
allow(controller).to receive(:write_and_await_response) do
AppSettings.redis.with do |redis|
build_twilio_stream_events(Array(with_events)).each do |event|
redis.publish_later("mod_twilio_stream:#{stream_sid}", event.to_json)
end
end
end
end

def assert_twilio_stream(controller, &)
expect(controller).to have_received(:write_and_await_response, &)
end

def build_twilio_stream_events(events)
result = [
{ event: "connected" },
{ event: "start" }
]
result.concat(Array(events))
result.push({ event: "closed" })
end
# from cassette
VCR_CALL_SID = "6f362591-ab86-4d1a-b39b-40c87e7929fc".freeze
VCR_STREAM_SID = "393a227f-0602-4024-b38a-dcbbeed4d5a0".freeze

describe "<Connect>", :vcr, cassette: :audio_stream do
# From: https://www.twilio.com/docs/voice/twiml/connect
Expand Down Expand Up @@ -56,9 +36,8 @@ def build_twilio_stream_events(events)

it "connects to a websockets stream" do
controller = build_controller(
call_properties: {
call_sid: "6f362591-ab86-4d1a-b39b-40c87e7929fc"
}
stub_voice_commands: :play_audio,
call_properties: { call_sid: VCR_CALL_SID }
)
stub_twilio_stream(controller)
stub_twiml_request(controller, response: <<~TWIML)
Expand All @@ -67,6 +46,7 @@ def build_twilio_stream_events(events)
<Connect>
<Stream url="wss://mystream.ngrok.io/audiostream" />
</Connect>
<Play>http://api.twilio.com/cowbell.mp3</Play>
</Response>
TWIML

Expand All @@ -79,17 +59,15 @@ def build_twilio_stream_events(events)
expect(metadata).to include(
"call_sid" => controller.call_properties.call_sid,
"account_sid" => controller.call_properties.account_sid,
"stream_sid" => be_present
"stream_sid" => VCR_STREAM_SID
)
end
expect(controller).to have_received(:play_audio).with("http://api.twilio.com/cowbell.mp3")
end

it "handles custom parameters" do
controller = build_controller(
stub_voice_commands: :play_audio,
call_properties: {
call_sid: "6f362591-ab86-4d1a-b39b-40c87e7929fc"
}
call_properties: { call_sid: VCR_CALL_SID }
)
stub_twilio_stream(controller)
stub_twiml_request(controller, response: <<~TWIML)
Expand Down Expand Up @@ -118,4 +96,27 @@ def build_twilio_stream_events(events)
end
end
end

def stub_twilio_stream(controller, with_events: [], stream_sid: VCR_STREAM_SID)
allow(controller).to receive(:write_and_await_response)

AppSettings.redis.with do |redis|
build_twilio_stream_events(Array(with_events)).each do |event|
redis.publish_later("mod_twilio_stream:#{stream_sid}", event.to_json)
end
end
end

def build_twilio_stream_events(events)
result = [
{ event: "connected" },
{ event: "start" }
]
result.concat(Array(events))
result.push({ event: "disconnect" })
end

def assert_twilio_stream(controller, &)
expect(controller).to have_received(:write_and_await_response, &)
end
end
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
class FakeRedis < MockRedis
class NullSubscription
class DefaultSubscription
def message(&)
sleep
end

def unsubscribe; end
def subscribe(&)
yield
end

def unsubscribe(&)
yield
end
end

class Subscription
class Subscription < DefaultSubscription
attr_reader :channel, :messages

def initialize(channel)
Expand All @@ -21,14 +27,13 @@ def message(&)
end

poll_for_messages
foo = [ "bar", "baz" ]
end

def publish(message)
messages << message
end

def unsubscribe
def unsubscribe!
@unsubscribed = true
end

Expand All @@ -52,7 +57,7 @@ def subscribe(channel, &)
end

def unsubscribe(channel)
find_subscription(channel).unsubscribe
find_subscription(channel).unsubscribe!
subscriptions.delete(channel)
end

Expand All @@ -68,7 +73,7 @@ def subscriptions
end

def find_subscription(channel)
subscriptions.fetch(channel) { NullSubscription.new }
subscriptions.fetch(channel) { DefaultSubscription.new }
end
end

Expand Down
1 change: 0 additions & 1 deletion components/app/spec/workflows/execute_gather_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
TwiML::PlayVerb.new(name: "Play", content: "http://api.twilio.com/cowbell.mp3", attributes: { "loop" => "3" }),
TwiML::SayVerb.new(name: "Say", content: say_content, attributes: { "voice" => "woman" })
]

verb = build_verb(nested_verbs:)
context = build_controller(stub_voice_commands: { ask: build_input_result })
tts_event_notifier = instance_spy(TTSEventNotifier)
Expand Down
12 changes: 5 additions & 7 deletions components/freeswitch_event_logger/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ import (
"encoding/json"
"fmt"
"os"
"time"
"strings"
"time"

"github.com/cgrates/fsock"
"github.com/redis/go-redis/v9"
)
Expand All @@ -23,8 +24,6 @@ func (nopLogger) Info(string) error { return nil }
func (nopLogger) Notice(string) error { return nil }
func (nopLogger) Warning(string) error { return nil }



// Formats the event as map and prints it out
func logHeartbeat(eventStr string, connIdx int) {
// Format the event from string into Go's map type
Expand Down Expand Up @@ -52,7 +51,6 @@ func customEventHandler(ctx context.Context, rdb *redis.Client, eventStr string,
err := json.Unmarshal([]byte(payload), &result)
if err != nil {
panic(err)
return
}

stream_sid, stream_sid_ok := result["streamSid"].(string)
Expand All @@ -64,7 +62,7 @@ func customEventHandler(ctx context.Context, rdb *redis.Client, eventStr string,

if stream_sid_ok {
fmt.Println(stream_sid)
err := rdb.Publish(ctx, prefix + "::" + stream_sid, payload).Err()
err := rdb.Publish(ctx, prefix+":"+stream_sid, payload).Err()
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -100,8 +98,8 @@ func main() {

rdb := redis.NewClient(opt)

customEventHandlerWrapper := func(eventStr string, connIdx int) {
customEventHandler(ctx, rdb, eventStr, connIdx)
customEventHandlerWrapper := func(eventStr string, connIdx int) {
customEventHandler(ctx, rdb, eventStr, connIdx)
}

evFilters := map[string][]string{
Expand Down

0 comments on commit a3ad366

Please sign in to comment.