Skip to content

Commit

Permalink
Use PyAV for encoding output
Browse files Browse the repository at this point in the history
  • Loading branch information
j0sh committed Jan 28, 2025
1 parent 7b49cfb commit 8a0f3fd
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 213 deletions.
8 changes: 5 additions & 3 deletions runner/app/live/streamer/protocol/trickle.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def __init__(self, subscribe_url: str, publish_url: str, control_url: Optional[s
self.control_url = control_url
self.events_url = events_url
self.subscribe_queue = queue.Queue[bytearray]()
self.publish_queue = queue.Queue[bytearray]()
self.publish_queue = queue.Queue[dict]()
self.control_subscriber = None
self.events_publisher = None
self.subscribe_task = None
Expand Down Expand Up @@ -78,8 +78,10 @@ def dequeue_frame():

async def egress_loop(self, output_frames: AsyncGenerator[Image.Image, None]):
def enqueue_bytes(frame: Image.Image):
jpeg_bytes = to_jpeg_bytes(frame)
self.publish_queue.put(jpeg_bytes)
if frame:
self.publish_queue.put({'image': frame})
else:
self.publish_queue.put(None)

async for frame in output_frames:
await asyncio.to_thread(enqueue_bytes, frame)
Expand Down
73 changes: 38 additions & 35 deletions runner/app/live/trickle/media.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from .trickle_subscriber import TrickleSubscriber
from .trickle_publisher import TricklePublisher
from .decoder import decode_av
from .encoder import encode_av
from . import segmenter

# target framerate
Expand Down Expand Up @@ -76,15 +77,6 @@ def decode_runner():
loop = asyncio.get_running_loop()
await loop.run_in_executor(None, decode_runner)

def feed_ffmpeg(ffmpeg_fd, image_generator):
while True:
image = image_generator()
if image is None:
logging.info("Image generator empty, leaving feed_ffmpeg")
break
os.write(ffmpeg_fd, image)
os.close(ffmpeg_fd)

async def run_publish(publish_url: str, image_generator):
publisher = None
try:
Expand All @@ -106,36 +98,47 @@ async def callback(pipe_file, pipe_name):
await segment.write(data)
transport.close()

def sync_callback(pipe_fd, pipe_name):
# asyncio.connect_read_pipe expects explicit fd close
# so we have to manually read, detect eof, then close
r, w = os.pipe()
rf = os.fdopen(r, 'rb', buffering=0)
future = asyncio.run_coroutine_threadsafe(callback(rf, pipe_name), loop)
try:
while True:
data = pipe_fd.read(32 * 1024)
if not data:
break
os.write(w, data)
os.close(w) # streamreader is very sensitive about this
future.result() # This blocks in the thread until callback completes
rf.close() # also closes the read end of the pipe
# Ensure any exceptions in the coroutine are caught
except Exception as e:
logging.error(f"Error in sync_callback: {e}")
def sync_callback(pipe_file, pipe_name):
def do_schedule():
schedule_callback(callback(pipe_file, pipe_name), pipe_name)
loop.call_soon_threadsafe(do_schedule)

# hold tasks since `loop.create_task` is a weak reference that gets GC'd
# TODO use asyncio.TaskGroup once all pipelines are on Python 3.11+
live_tasks = set()
live_tasks_lock = threading.Lock()
def schedule_callback(coro: asyncio.coroutine, pipe_name):
task = loop.create_task(coro)
with live_tasks_lock:
live_tasks.add(task)
def task_done(t: asyncio.Task):
try:
t.result()
except Exception as e:
logging.error(f"Task {pipe_name} crashed: {e}")
with live_tasks_lock:
live_tasks.remove(t)
task.add_done_callback(task_done)

ffmpeg_read_fd, ffmpeg_write_fd = os.pipe()
segment_thread = threading.Thread(target=segmenter.segment_reading_process, args=(ffmpeg_read_fd, sync_callback))
ffmpeg_feeder = threading.Thread(target=feed_ffmpeg, args=(ffmpeg_write_fd, image_generator))
segment_thread.start()
ffmpeg_feeder.start()
logging.debug("run_publish: ffmpeg feeder and segmenter threads started")
encode_thread = threading.Thread(target=encode_av, args=(image_generator, sync_callback), kwargs={'audio_codec':None})
encode_thread.start()
logging.debug("run_publish: encoder thread started")

# Wait for encode thread to complete
def joins():
segment_thread.join()
ffmpeg_feeder.join()
encode_thread.join()
await asyncio.to_thread(joins)

# wait for IO tasks to complete
# TODO use asyncio.TaskGroup once all pipelines are on python 3.11+
while True:
with live_tasks_lock:
current_tasks = list(live_tasks)
if not current_tasks:
break # nothing left to wait on
await asyncio.wait(current_tasks, return_when=asyncio.ALL_COMPLETED)
# loop in case another task was added while awaiting

logging.info("run_publish complete")

except Exception as e:
Expand Down
175 changes: 0 additions & 175 deletions runner/app/live/trickle/segmenter.py

This file was deleted.

0 comments on commit 8a0f3fd

Please sign in to comment.