diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 49be5a95..4d391359 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -5,6 +5,15 @@ on: push: tags: ['v*.*.*'] branches: ['develop'] + paths-ignore: + - '.devcontainer' + - 'samples' + - 'tests' + - 'CODE_OF_CONDUCT.md' + - 'README.md' + - 'PREPARE_RELEASE.md' + - 'docs/performance.md' + - 'docs/README.md' # to run this workflow manually from the Actions tab # the init job fails if branch is different from 'releases/*.*.*' or 'develop' diff --git a/samples/key_value_api/README.md b/samples/key_value_api/README.md index f2aa7057..fcdf9b6e 100644 --- a/samples/key_value_api/README.md +++ b/samples/key_value_api/README.md @@ -22,7 +22,7 @@ The key-value store is accessible via REST API. Use the script to read from the ```bash # retrieve the value of the key 'frame_counter' through HTTP API request -docker compose -f samples/key_value_api/docker-compose.x86.yml exec -it module python /scripts/get_frame_counter.py +docker compose -f samples/key_value_api/docker-compose.x86.yml exec -it first python /scripts/get_frame_counter.py ``` The documentation for the Key-Value API is available at the Savant documentation [website](https://docs.savant-ai.io/develop/advanced_topics/15_embedded_kvs.html). diff --git a/samples/key_value_api/counter.py b/samples/key_value_api/counter.py deleted file mode 100644 index 39109803..00000000 --- a/samples/key_value_api/counter.py +++ /dev/null @@ -1,26 +0,0 @@ -import savant_rs.webserver.kvs as kvs -from savant_rs.primitives import Attribute, AttributeValue - -from savant.deepstream.meta.frame import NvDsFrameMeta -from savant.deepstream.pyfunc import NvDsPyFuncPlugin -from savant.gstreamer import Gst - - -class Counter(NvDsPyFuncPlugin): - """Apply gaussian blur to the frame.""" - - def __init__(self, **kwargs): - super().__init__(**kwargs) - self._counter = 0 - - def process_frame(self, buffer: Gst.Buffer, frame_meta: NvDsFrameMeta): - self._counter += 1 - attr = Attribute( - namespace='counter', - name='frame_counter', - hint='This attribute is set on every frame change', - values=[ - AttributeValue.integer(self._counter), - ], - ) - kvs.set_attributes([attr], ttl=None) diff --git a/samples/key_value_api/docker-compose.x86.yml b/samples/key_value_api/docker-compose.x86.yml index 1fa45938..d00fb8c0 100644 --- a/samples/key_value_api/docker-compose.x86.yml +++ b/samples/key_value_api/docker-compose.x86.yml @@ -14,10 +14,10 @@ services: - SYNC_OUTPUT=True entrypoint: /opt/savant/adapters/gst/sources/video_loop.sh depends_on: - module: + first: condition: service_healthy - module: + first: image: ghcr.io/insight-platform/savant-deepstream:latest restart: unless-stopped ports: @@ -28,8 +28,36 @@ services: - ..:/opt/savant/samples command: samples/key_value_api/module.yml environment: + - LOGLEVEL=info,savant_core::pipeline::stats=error + - MODULE_STAGE=first - ZMQ_SRC_ENDPOINT=sub+bind:ipc:///tmp/zmq-sockets/input-video.ipc - - ZMQ_SINK_ENDPOINT=pub+bind:ipc:///tmp/zmq-sockets/output-video.ipc + - ZMQ_SINK_ENDPOINT=dealer+connect:ipc:///tmp/zmq-sockets/first-output.ipc + deploy: + resources: + reservations: + devices: + - driver: nvidia + count: 1 + capabilities: [ gpu ] + depends_on: + second: + condition: service_healthy + + + second: + image: ghcr.io/insight-platform/savant-deepstream:latest + restart: unless-stopped + volumes: + - zmq_sockets:/tmp/zmq-sockets + - ./scripts:/scripts + - ..:/opt/savant/samples + command: samples/key_value_api/module.yml + environment: + - LOGLEVEL=info,savant_core::pipeline::stats=error + - MODULE_STAGE=second + - ZMQ_SRC_ENDPOINT=router+bind:ipc:///tmp/zmq-sockets/first-output.ipc + - ZMQ_SINK_ENDPOINT=pub+bind:ipc:///tmp/zmq-sockets/second-output.ipc + deploy: resources: reservations: diff --git a/samples/key_value_api/functions.py b/samples/key_value_api/functions.py new file mode 100644 index 00000000..5c614640 --- /dev/null +++ b/samples/key_value_api/functions.py @@ -0,0 +1,79 @@ +import time + +import requests +import savant_rs.webserver.kvs as kvs +from savant_rs.primitives import Attribute, AttributeValue + +from savant.deepstream.meta.frame import NvDsFrameMeta +from savant.deepstream.pyfunc import NvDsPyFuncPlugin +from savant.gstreamer import Gst + + +class First(NvDsPyFuncPlugin): + """Apply gaussian blur to the frame.""" + + def __init__(self, **kwargs): + super().__init__(**kwargs) + self._counter = 0 + self._subscription = kvs.KvsSubscription('events', 100) + + def process_frame(self, buffer: Gst.Buffer, frame_meta: NvDsFrameMeta): + self._counter += 1 + now = time.time() + attr = Attribute( + namespace='counter', + name='frame_counter', + hint='This attribute is set on every frame change', + values=[ + AttributeValue.integer(self._counter), + ], + ) + kvs.set_attributes([attr], ttl=None) + + events = [self._subscription.recv()] + + maybe_second = self._subscription.try_recv() + if maybe_second is not None: + events.append(maybe_second) + + if len(events) == 2: # we have an event from the downstream pipeline + for e in events: + attributes = e.attributes + # filter only attributes with namespace 'second' + second_attributes = [a for a in attributes if a.namespace == 'second'] + if len(second_attributes) > 0: + elapsed_time = float(int((time.time() - now) * 100_000) / 100) + for a in second_attributes: + self.logger.info( + f'Downstream attribute value (second): {a.values[0].as_integer()}, Elapsed time: {elapsed_time} ms' + ) + + +class Second(NvDsPyFuncPlugin): + """Apply gaussian blur to the frame.""" + + def __init__(self, **kwargs): + super().__init__(**kwargs) + self._counter = 0 + + def process_frame(self, buffer: Gst.Buffer, frame_meta: NvDsFrameMeta): + self._counter += 1 + if self._counter % 100 == 0: + now = time.time() + attr = Attribute( + namespace='second', + name='frame_counter', + hint='This attribute is set on every 1000th frame processing', + values=[ + AttributeValue.integer(self._counter), + ], + ) + binary_attributes = kvs.serialize_attributes([attr]) + response = requests.post( + f'http://first:8080/kvs/set', data=binary_attributes + ) + assert response.status_code == 200 + elapsed = float(int((time.time() - now) * 100_000) / 100) + self.logger.info( + f'Sent event to the upstream (first) module. Elapsed time: {elapsed} ms' + ) diff --git a/samples/key_value_api/module.yml b/samples/key_value_api/module.yml index f5f50cf3..d6cdf5ae 100644 --- a/samples/key_value_api/module.yml +++ b/samples/key_value_api/module.yml @@ -1,14 +1,35 @@ # module name, required -name: ${oc.env:MODULE_NAME, 'telemetry-demo'} +name: ${oc.env:MODULE_NAME, 'key-value-api'} # base module parameters parameters: batch_size: 1 + output_frame: + # "copy" codec means the pipeline works in pass-through mode + codec: ${oc.env:CODEC, 'copy'} + # pipeline definition pipeline: elements: - # simple pyfunc blurring frames - - element: pyfunc - module: samples.key_value_api.counter - class_name: Counter + - group: + # enabled if env var MODULE_STAGE==detector + init_condition: + expr: ${oc.env:MODULE_STAGE} + value: first + elements: + # simple pyfunc blurring frames + - element: pyfunc + module: samples.key_value_api.functions + class_name: First + + - group: + # enabled if env var MODULE_STAGE==detector + init_condition: + expr: ${oc.env:MODULE_STAGE} + value: second + elements: + # simple pyfunc blurring frames + - element: pyfunc + module: samples.key_value_api.functions + class_name: Second \ No newline at end of file