-
Notifications
You must be signed in to change notification settings - Fork 475
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[DOW-113] deprecate output queue and manually attach workers to each other #593
[DOW-113] deprecate output queue and manually attach workers to each other #593
Conversation
c86f666
to
9e28363
Compare
c787ea0
to
84afa9c
Compare
@@ -253,7 +259,9 @@ async def test_transcriptions_worker_ignores_utterances_before_initial_message( | |||
is_final=True, | |||
), | |||
) | |||
assert await _consume_worker_output(streaming_conversation.transcriptions_worker) is None | |||
transcriptions_worker_consumer = QueueConsumer() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should remove line 239 right? And attach the consumer earlier? The consumer otherwise doesn't get the consume_nonblocking()
results above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yup, absolutely right!
@@ -310,7 +318,10 @@ async def test_transcriptions_worker_ignores_associated_ignored_utterance( | |||
is_final=False, | |||
), | |||
) | |||
assert await _consume_worker_output(streaming_conversation.transcriptions_worker) is None | |||
transcriptions_worker_consumer = QueueConsumer() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I'm correct above, likewise here and in all other relevant parts below
…ally-attach-workers-to-each
@@ -390,6 +383,7 @@ async def test_transcriptions_worker_interrupts_on_interim_transcripts( | |||
|
|||
transcriptions_worker_consumer = QueueConsumer() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
consumer should be added before like 376 again
@@ -437,6 +428,7 @@ async def test_transcriptions_worker_interrupts_immediately_before_bot_has_begun | |||
) | |||
transcriptions_worker_consumer = QueueConsumer() | |||
streaming_conversation.transcriptions_worker.consumer = transcriptions_worker_consumer | |||
streaming_conversation.transcriptions_worker.start() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
consumer should be added before 422
…other (#593) * deprecate output queues * fix quickstarts * fix mypy * fix tests * fix mypy * adds comment * adds back streamingconversation test * make input queue private on AsyncWorker * update tests * resolve PR comments
* remove phantom parameter (#612) * support files in make transcribe (#610) * support files in make transcribe * switch comment * uses .value instead of passing enum for sampling rate (#613) * [hotfix] fix ref in make transcribe (#614) * [DOW-113] deprecate output queue and manually attach workers to each other (#593) * deprecate output queues * fix quickstarts * fix mypy * fix tests * fix mypy * adds comment * adds back streamingconversation test * make input queue private on AsyncWorker * update tests * resolve PR comments * create scripts for vocodehq-public (#615) * add script used to make PR * adds test target for vocodehq-public
overall idea here - pushes the notion of queues under the abstraction barrier of workers. Workers do not need to know about the inner workings of how other workers process their events. In order to wire up workers, manually connect them to each other using instance variables and only use
consume_nonblocking
to communicate - this also improves the typing of events coming out of workers.Mechanically:
AbstractWorker
which just knows how to consume inputs, but places no restrictions on how it createsasyncio.Queue
s to do the processingAsyncWorker
to only create aninput_queue
, and not have it be overrideableStreamingConversation
so that the workers are connected to each other as described aboveAlso adds a nice test that asserts that a message can make it through the whole pipeline (
test_streaming_conversation_pipeline
)