Skip to content

Commit

Permalink
Control AO-RTSP stream with RESTful API (#677)
Browse files Browse the repository at this point in the history
* #655 move ao-sink app config to a separate moudule

* #655 implement stream manger

* #655 add API server

* #655 validate stream requests

* #655 add YAML response

* #655 configure API port

* #655 allow to set per-stream codec

* #655 update API

* #655 check codec is available before stream creation

* #655 fix AO-sink adapter

* #655 update docs for samples

* #655 add docstrings to API

* #655 update documentation for AO-sink

* #655 add stream control API usage examples to multiple_gige sample

* #655 fix AO-sink

* #655 add API documentation
  • Loading branch information
tomskikh authored Mar 4, 2024
1 parent dfc8df5 commit 33d20f0
Show file tree
Hide file tree
Showing 37 changed files with 1,131 additions and 256 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ git lfs pull
# if Jetson
../../utils/check-environment-compatible && docker compose -f docker-compose.l4t.yml up

# open 'rtsp://127.0.0.1:554/stream' in your player
# or visit 'http://127.0.0.1:888/stream/' (LL-HLS)
# open 'rtsp://127.0.0.1:554/stream/city-traffic' in your player
# or visit 'http://127.0.0.1:888/stream/city-traffic/' (LL-HLS)

# Ctrl+C to stop running the compose bundle

Expand Down
221 changes: 65 additions & 156 deletions adapters/ds/sinks/always_on_rtsp/__main__.py
Original file line number Diff line number Diff line change
@@ -1,117 +1,48 @@
"""Entrypoint for Always-On-RTSP sink."""

import os
import signal
import time
from pathlib import Path
from subprocess import Popen, TimeoutExpired
from subprocess import Popen
from threading import Thread
from typing import Dict, List, Optional

from adapters.ds.sinks.always_on_rtsp.utils import nvidia_runtime_is_available
from typing import Optional

from adapters.ds.sinks.always_on_rtsp.api import Api
from adapters.ds.sinks.always_on_rtsp.app_config import AppConfig
from adapters.ds.sinks.always_on_rtsp.stream_manager import Stream, StreamManager
from adapters.ds.sinks.always_on_rtsp.utils import (
check_codec_is_available,
process_is_alive,
)
from adapters.ds.sinks.always_on_rtsp.zeromq_proxy import ZeroMqProxy
from savant.gstreamer import Gst
from savant.gstreamer.codecs import CODEC_BY_NAME, Codec
from savant.utils.config import opt_config, strtobool
from savant.utils.logging import get_logger, init_logging
from savant.utils.zeromq import ReceiverSocketTypes

LOGGER_NAME = 'adapters.ao_sink.entrypoint'
logger = get_logger(LOGGER_NAME)

SUPPORTED_CODECS = {x.value.name for x in [Codec.H264, Codec.HEVC]}


class Config:
def __init__(self):
self.dev_mode = opt_config('DEV_MODE', False, strtobool)
self.source_id: Optional[str] = opt_config('SOURCE_ID')
self.source_ids: Optional[List[str]] = opt_config('SOURCE_IDS', '').split(',')
assert (
self.source_id or self.source_ids
), 'Either "SOURCE_ID" or "SOURCE_IDS" must be set.'

self.zmq_endpoint = os.environ['ZMQ_ENDPOINT']
self.zmq_socket_type = opt_config(
'ZMQ_TYPE',
ReceiverSocketTypes.SUB,
ReceiverSocketTypes.__getitem__,
)
self.zmq_socket_bind = opt_config('ZMQ_BIND', False, strtobool)

self.rtsp_uri = opt_config('RTSP_URI')
if self.dev_mode:
assert (
self.rtsp_uri is None
), '"RTSP_URI" cannot be set when "DEV_MODE=True"'
self.rtsp_uri = 'rtsp://localhost:554/stream'
else:
assert (
self.rtsp_uri is not None
), '"RTSP_URI" must be set when "DEV_MODE=False"'

codec_name = opt_config('CODEC', 'h264')
assert codec_name in SUPPORTED_CODECS, f'Unsupported codec {codec_name}.'
self.codec = CODEC_BY_NAME[codec_name]


def main():
# To gracefully shutdown the adapter on SIGTERM (raise KeyboardInterrupt)
signal.signal(signal.SIGTERM, signal.getsignal(signal.SIGINT))

init_logging()
config = Config()
config = AppConfig()

Gst.init(None)
if nvidia_runtime_is_available():
logger.info(
'NVIDIA runtime is available. Using hardware-based decoding/encoding.'
)
if not config.codec.value.nv_encoder:
logger.error(
'Hardware-based encoding is not available for codec %s.',
config.codec.value.name,
)
return

from savant.utils.check_display import check_display_env

check_display_env(logger)

from savant.deepstream.encoding import check_encoder_is_available

if not check_encoder_is_available(
{'output_frame': {'codec': config.codec.value.name}}
):
return
else:
logger.warning(
'You are using the AO-RTSP adapter with a software-based decoder/encoder '
'(without NVDEC/NVENC). This mode must be used only when hardware-based '
'encoding is not available (Jetson Orin Nano, A100, H100). '
'If the hardware-based encoding is available, run the adapter with Nvidia '
'runtime enabled to activate hardware-based decoding/encoding.'
)
if not config.codec.value.sw_encoder:
logger.error(
'Software-based encoding is not available for codec %s.',
config.codec.value.name,
)
return

if not config.source_id:
internal_socket = 'ipc:///tmp/ao-sink-internal-socket.ipc'
zmq_reader_endpoint = f'sub+connect:{internal_socket}'
zmq_proxy = ZeroMqProxy(
input_socket=config.zmq_endpoint,
input_socket_type=config.zmq_socket_type,
input_bind=config.zmq_socket_bind,
output_socket=f'pub+bind:{internal_socket}',
)
zmq_proxy.start()
zmq_proxy_thread = Thread(target=zmq_proxy.run, daemon=True)
zmq_proxy_thread.start()
else:
zmq_reader_endpoint = config.zmq_endpoint
if not check_codec_is_available(config.codec):
return

internal_socket = 'ipc:///tmp/ao-sink-internal-socket.ipc'
zmq_reader_endpoint = f'sub+connect:{internal_socket}'
zmq_proxy = ZeroMqProxy(
input_socket=config.zmq_endpoint,
input_socket_type=config.zmq_socket_type,
input_bind=config.zmq_socket_bind,
output_socket=f'pub+bind:{internal_socket}',
)
zmq_proxy.start()
zmq_proxy_thread = Thread(target=zmq_proxy.run, daemon=True)
zmq_proxy_thread.start()

if config.dev_mode:
mediamtx_process = Popen(
Expand All @@ -127,88 +58,66 @@ def main():
else:
mediamtx_process = None

if config.source_id:
ao_sink_processes = {
config.source_id: run_ao_sink_process(
config.source_id,
config.rtsp_uri,
zmq_reader_endpoint,
)
}
else:
ao_sink_processes = {
source_id: run_ao_sink_process(
source_id,
f'{config.rtsp_uri.rstrip("/")}/{source_id}',
zmq_reader_endpoint,
)
for source_id in config.source_ids
}
stream_manager = StreamManager(config, zmq_reader_endpoint)
stream_manager.start()

for source_id in config.source_ids:
stream_manager.add_stream(source_id, Stream())

time.sleep(config.status_poll_interval)
if not check_all_streams_are_running(stream_manager):
return

api = Api(config, stream_manager)
api.start()

try:
main_loop(ao_sink_processes, mediamtx_process)
main_loop(config, stream_manager, api, mediamtx_process)
except KeyboardInterrupt:
pass

for source_id, ao_sink_process in ao_sink_processes.items():
if ao_sink_process.returncode is None:
logger.info('Terminating Always-On-RTSP for source %s', source_id)
ao_sink_process.terminate()
logger.info(
'Always-On-RTSP for source %s terminated. Exit code: %s.',
source_id,
ao_sink_process.returncode,
)
stream_manager.stop()
for source_id in stream_manager.get_all_streams().keys():
stream_manager.delete_stream(source_id)

if mediamtx_process is not None:
if mediamtx_process.returncode is None:
logger.info('Terminating MediaMTX')
mediamtx_process.terminate()
mediamtx_process.wait()
logger.info('MediaMTX terminated. Exit code: %s.', mediamtx_process.returncode)


def run_ao_sink_process(source_id: str, rtsp_uri: str, zmq_endpoint: str):
ao_sink_process = Popen(
['python', '-m', 'adapters.ds.sinks.always_on_rtsp.ao_sink'],
env={
**os.environ,
'SOURCE_ID': source_id,
'RTSP_URI': rtsp_uri,
'ZMQ_ENDPOINT': zmq_endpoint,
},
)
logger.info('Started Always-On-RTSP, PID: %s', ao_sink_process.pid)
if ao_sink_process.returncode is not None:
raise RuntimeError(
f'Failed to start Always-On-RTSP. Exit code: {ao_sink_process.returncode}.'
)
return ao_sink_process


def main_loop(
ao_sink_processes: Dict[str, Popen],
config: AppConfig,
stream_manager: StreamManager,
api: Api,
mediamtx_process: Optional[Popen],
):
while True:
for source_id, ao_sink_process in ao_sink_processes.items():
try:
returncode = ao_sink_process.wait(1)
logger.error(
'Always-On-RTSP for source %s exited. Exit code: %s.',
source_id,
returncode,
)
if config.fail_on_stream_error:
if not check_all_streams_are_running(stream_manager):
return
except TimeoutExpired:
pass

if mediamtx_process is not None:
try:
returncode = mediamtx_process.wait(1)
logger.error('MediaMTX exited. Exit code: %s.', returncode)
exit_code = process_is_alive(mediamtx_process)
if exit_code is not None:
logger.error('MediaMTX exited. Exit code: %s.', exit_code)
return
except TimeoutExpired:
pass

if not api.is_alive():
logger.error('API server is not running.')
return

time.sleep(config.status_poll_interval)


def check_all_streams_are_running(stream_manager: StreamManager):
for source_id, stream in stream_manager.get_all_streams().items():
if stream.exit_code is not None:
return False

return True


if __name__ == '__main__':
Expand Down
Loading

0 comments on commit 33d20f0

Please sign in to comment.