Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DOW-113] deprecate output queue and manually attach workers to each other #593

28 changes: 10 additions & 18 deletions tests/streaming/test_streaming_conversation.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@
from vocode.streaming.models.transcript import ActionStart, Message, Transcript
from vocode.streaming.streaming_conversation import StreamingConversation
from vocode.streaming.synthesizer.base_synthesizer import SynthesisResult
from vocode.streaming.telephony.constants import DEFAULT_SAMPLING_RATE
from vocode.streaming.utils.worker import AsyncWorker, QueueConsumer
from vocode.streaming.utils.worker import QueueConsumer


class ShouldIgnoreUtteranceTestCase(BaseModel):
Expand Down Expand Up @@ -235,9 +234,8 @@ async def test_transcriptions_worker_ignores_utterances_before_initial_message(
mocker,
)

streaming_conversation.transcriptions_worker._input_queue = asyncio.Queue()
streaming_conversation.transcriptions_worker.output_queue = asyncio.Queue()
streaming_conversation.transcriptions_worker.start()
transcriptions_worker_consumer = QueueConsumer()
streaming_conversation.transcriptions_worker.consumer = transcriptions_worker_consumer
streaming_conversation.transcriptions_worker.consume_nonblocking(
Transcription(
message="sup",
Expand All @@ -259,8 +257,7 @@ async def test_transcriptions_worker_ignores_utterances_before_initial_message(
is_final=True,
),
)
transcriptions_worker_consumer = QueueConsumer()
streaming_conversation.transcriptions_worker.consumer = transcriptions_worker_consumer
streaming_conversation.transcriptions_worker.start()
adnaans marked this conversation as resolved.
Show resolved Hide resolved
assert await _get_from_consumer_queue_if_exists(transcriptions_worker_consumer) is None
assert not streaming_conversation.broadcast_interrupt.called

Expand Down Expand Up @@ -301,25 +298,24 @@ async def test_transcriptions_worker_ignores_associated_ignored_utterance(
mocker,
)

streaming_conversation.transcriptions_worker._input_queue = asyncio.Queue()
streaming_conversation.transcriptions_worker.output_queue = asyncio.Queue()
streaming_conversation.transcriptions_worker.start()
streaming_conversation.initial_message_tracker.set()
streaming_conversation.transcript.add_bot_message(
text="Hi, I was wondering",
is_final=False,
conversation_id="test",
)

transcriptions_worker_consumer = QueueConsumer()
streaming_conversation.transcriptions_worker.consumer = transcriptions_worker_consumer
streaming_conversation.transcriptions_worker.start()

streaming_conversation.transcriptions_worker.consume_nonblocking(
Transcription(
message="i'm listening",
confidence=1.0,
is_final=False,
),
)
transcriptions_worker_consumer = QueueConsumer()
streaming_conversation.transcriptions_worker.consumer = transcriptions_worker_consumer

assert await _get_from_consumer_queue_if_exists(transcriptions_worker_consumer) is None
assert not streaming_conversation.broadcast_interrupt.called # ignored for length of response
Expand Down Expand Up @@ -370,9 +366,6 @@ async def test_transcriptions_worker_interrupts_on_interim_transcripts(
mocker,
)

streaming_conversation.transcriptions_worker._input_queue = asyncio.Queue()
streaming_conversation.transcriptions_worker.output_queue = asyncio.Queue()
streaming_conversation.transcriptions_worker.start()
streaming_conversation.initial_message_tracker.set()
streaming_conversation.transcript.add_bot_message(
text="Hi, I was wondering",
Expand All @@ -390,6 +383,7 @@ async def test_transcriptions_worker_interrupts_on_interim_transcripts(

transcriptions_worker_consumer = QueueConsumer()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consumer should be added before like 376 again

streaming_conversation.transcriptions_worker.consumer = transcriptions_worker_consumer
streaming_conversation.transcriptions_worker.start()

assert await _get_from_consumer_queue_if_exists(transcriptions_worker_consumer) is None
assert streaming_conversation.broadcast_interrupt.called
Expand Down Expand Up @@ -423,9 +417,6 @@ async def test_transcriptions_worker_interrupts_immediately_before_bot_has_begun
mocker,
)

streaming_conversation.transcriptions_worker._input_queue = asyncio.Queue()
streaming_conversation.transcriptions_worker.output_queue = asyncio.Queue()
streaming_conversation.transcriptions_worker.start()
streaming_conversation.initial_message_tracker.set()

streaming_conversation.transcriptions_worker.consume_nonblocking(
Expand All @@ -437,6 +428,7 @@ async def test_transcriptions_worker_interrupts_immediately_before_bot_has_begun
)
transcriptions_worker_consumer = QueueConsumer()
streaming_conversation.transcriptions_worker.consumer = transcriptions_worker_consumer
streaming_conversation.transcriptions_worker.start()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consumer should be added before 422


assert await _get_from_consumer_queue_if_exists(transcriptions_worker_consumer) is None
assert streaming_conversation.broadcast_interrupt.called
Expand Down