From 710389dcbd250baa73f8d7ed391d016f2b5ce38a Mon Sep 17 00:00:00 2001
From: Ivan Kud <bwsw@users.noreply.github.com>
Date: Tue, 28 Jan 2025 13:25:07 +0100
Subject: [PATCH] 931 Implemented a better kvs sample (#932)

* implemented a better kvs sample
* added Github Actions ignored paths
---
 .github/workflows/main.yml                   |  9 +++
 samples/key_value_api/README.md              |  2 +-
 samples/key_value_api/counter.py             | 26 -------
 samples/key_value_api/docker-compose.x86.yml | 34 ++++++++-
 samples/key_value_api/functions.py           | 79 ++++++++++++++++++++
 samples/key_value_api/module.yml             | 31 ++++++--
 6 files changed, 146 insertions(+), 35 deletions(-)
 delete mode 100644 samples/key_value_api/counter.py
 create mode 100644 samples/key_value_api/functions.py

diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml
index 49be5a95..4d391359 100644
--- a/.github/workflows/main.yml
+++ b/.github/workflows/main.yml
@@ -5,6 +5,15 @@ on:
   push:
     tags: ['v*.*.*']
     branches: ['develop']
+    paths-ignore:
+      - '.devcontainer'
+      - 'samples'
+      - 'tests'
+      - 'CODE_OF_CONDUCT.md'
+      - 'README.md'
+      - 'PREPARE_RELEASE.md'
+      - 'docs/performance.md'
+      - 'docs/README.md'
 
   # to run this workflow manually from the Actions tab
   # the init job fails if branch is different from 'releases/*.*.*' or 'develop'
diff --git a/samples/key_value_api/README.md b/samples/key_value_api/README.md
index f2aa7057..fcdf9b6e 100644
--- a/samples/key_value_api/README.md
+++ b/samples/key_value_api/README.md
@@ -22,7 +22,7 @@ The key-value store is accessible via REST API. Use the script to read from the
 
 ```bash
 # retrieve the value of the key 'frame_counter' through HTTP API request
-docker compose -f samples/key_value_api/docker-compose.x86.yml exec -it module python /scripts/get_frame_counter.py
+docker compose -f samples/key_value_api/docker-compose.x86.yml exec -it first python /scripts/get_frame_counter.py
 ```
 
 The documentation for the Key-Value API is available at the Savant documentation [website](https://docs.savant-ai.io/develop/advanced_topics/15_embedded_kvs.html).
diff --git a/samples/key_value_api/counter.py b/samples/key_value_api/counter.py
deleted file mode 100644
index 39109803..00000000
--- a/samples/key_value_api/counter.py
+++ /dev/null
@@ -1,26 +0,0 @@
-import savant_rs.webserver.kvs as kvs
-from savant_rs.primitives import Attribute, AttributeValue
-
-from savant.deepstream.meta.frame import NvDsFrameMeta
-from savant.deepstream.pyfunc import NvDsPyFuncPlugin
-from savant.gstreamer import Gst
-
-
-class Counter(NvDsPyFuncPlugin):
-    """Apply gaussian blur to the frame."""
-
-    def __init__(self, **kwargs):
-        super().__init__(**kwargs)
-        self._counter = 0
-
-    def process_frame(self, buffer: Gst.Buffer, frame_meta: NvDsFrameMeta):
-        self._counter += 1
-        attr = Attribute(
-            namespace='counter',
-            name='frame_counter',
-            hint='This attribute is set on every frame change',
-            values=[
-                AttributeValue.integer(self._counter),
-            ],
-        )
-        kvs.set_attributes([attr], ttl=None)
diff --git a/samples/key_value_api/docker-compose.x86.yml b/samples/key_value_api/docker-compose.x86.yml
index 1fa45938..d00fb8c0 100644
--- a/samples/key_value_api/docker-compose.x86.yml
+++ b/samples/key_value_api/docker-compose.x86.yml
@@ -14,10 +14,10 @@ services:
       - SYNC_OUTPUT=True
     entrypoint: /opt/savant/adapters/gst/sources/video_loop.sh
     depends_on:
-      module:
+      first:
         condition: service_healthy
 
-  module:
+  first:
     image: ghcr.io/insight-platform/savant-deepstream:latest
     restart: unless-stopped
     ports:
@@ -28,8 +28,36 @@ services:
       - ..:/opt/savant/samples
     command: samples/key_value_api/module.yml
     environment:
+      - LOGLEVEL=info,savant_core::pipeline::stats=error
+      - MODULE_STAGE=first
       - ZMQ_SRC_ENDPOINT=sub+bind:ipc:///tmp/zmq-sockets/input-video.ipc
-      - ZMQ_SINK_ENDPOINT=pub+bind:ipc:///tmp/zmq-sockets/output-video.ipc
+      - ZMQ_SINK_ENDPOINT=dealer+connect:ipc:///tmp/zmq-sockets/first-output.ipc
+    deploy:
+      resources:
+        reservations:
+          devices:
+            - driver: nvidia
+              count: 1
+              capabilities: [ gpu ]
+    depends_on:
+      second:
+        condition: service_healthy
+
+
+  second:
+    image: ghcr.io/insight-platform/savant-deepstream:latest
+    restart: unless-stopped
+    volumes:
+      - zmq_sockets:/tmp/zmq-sockets
+      - ./scripts:/scripts
+      - ..:/opt/savant/samples
+    command: samples/key_value_api/module.yml
+    environment:
+      - LOGLEVEL=info,savant_core::pipeline::stats=error
+      - MODULE_STAGE=second
+      - ZMQ_SRC_ENDPOINT=router+bind:ipc:///tmp/zmq-sockets/first-output.ipc
+      - ZMQ_SINK_ENDPOINT=pub+bind:ipc:///tmp/zmq-sockets/second-output.ipc
+
     deploy:
       resources:
         reservations:
diff --git a/samples/key_value_api/functions.py b/samples/key_value_api/functions.py
new file mode 100644
index 00000000..5c614640
--- /dev/null
+++ b/samples/key_value_api/functions.py
@@ -0,0 +1,79 @@
+import time
+
+import requests
+import savant_rs.webserver.kvs as kvs
+from savant_rs.primitives import Attribute, AttributeValue
+
+from savant.deepstream.meta.frame import NvDsFrameMeta
+from savant.deepstream.pyfunc import NvDsPyFuncPlugin
+from savant.gstreamer import Gst
+
+
+class First(NvDsPyFuncPlugin):
+    """Apply gaussian blur to the frame."""
+
+    def __init__(self, **kwargs):
+        super().__init__(**kwargs)
+        self._counter = 0
+        self._subscription = kvs.KvsSubscription('events', 100)
+
+    def process_frame(self, buffer: Gst.Buffer, frame_meta: NvDsFrameMeta):
+        self._counter += 1
+        now = time.time()
+        attr = Attribute(
+            namespace='counter',
+            name='frame_counter',
+            hint='This attribute is set on every frame change',
+            values=[
+                AttributeValue.integer(self._counter),
+            ],
+        )
+        kvs.set_attributes([attr], ttl=None)
+
+        events = [self._subscription.recv()]
+
+        maybe_second = self._subscription.try_recv()
+        if maybe_second is not None:
+            events.append(maybe_second)
+
+        if len(events) == 2:  # we have an event from the downstream pipeline
+            for e in events:
+                attributes = e.attributes
+                # filter only attributes with namespace 'second'
+                second_attributes = [a for a in attributes if a.namespace == 'second']
+                if len(second_attributes) > 0:
+                    elapsed_time = float(int((time.time() - now) * 100_000) / 100)
+                    for a in second_attributes:
+                        self.logger.info(
+                            f'Downstream attribute value (second): {a.values[0].as_integer()}, Elapsed time: {elapsed_time} ms'
+                        )
+
+
+class Second(NvDsPyFuncPlugin):
+    """Apply gaussian blur to the frame."""
+
+    def __init__(self, **kwargs):
+        super().__init__(**kwargs)
+        self._counter = 0
+
+    def process_frame(self, buffer: Gst.Buffer, frame_meta: NvDsFrameMeta):
+        self._counter += 1
+        if self._counter % 100 == 0:
+            now = time.time()
+            attr = Attribute(
+                namespace='second',
+                name='frame_counter',
+                hint='This attribute is set on every 1000th frame processing',
+                values=[
+                    AttributeValue.integer(self._counter),
+                ],
+            )
+            binary_attributes = kvs.serialize_attributes([attr])
+            response = requests.post(
+                f'http://first:8080/kvs/set', data=binary_attributes
+            )
+            assert response.status_code == 200
+            elapsed = float(int((time.time() - now) * 100_000) / 100)
+            self.logger.info(
+                f'Sent event to the upstream (first) module. Elapsed time: {elapsed} ms'
+            )
diff --git a/samples/key_value_api/module.yml b/samples/key_value_api/module.yml
index f5f50cf3..d6cdf5ae 100644
--- a/samples/key_value_api/module.yml
+++ b/samples/key_value_api/module.yml
@@ -1,14 +1,35 @@
 # module name, required
-name: ${oc.env:MODULE_NAME, 'telemetry-demo'}
+name: ${oc.env:MODULE_NAME, 'key-value-api'}
 
 # base module parameters
 parameters:
   batch_size: 1
+  output_frame:
+    # "copy" codec means the pipeline works in pass-through mode
+    codec: ${oc.env:CODEC, 'copy'}
+
 
 # pipeline definition
 pipeline:
   elements:
-    # simple pyfunc blurring frames
-    - element: pyfunc
-      module: samples.key_value_api.counter
-      class_name: Counter
+    - group:
+        # enabled if env var MODULE_STAGE==detector
+        init_condition:
+          expr: ${oc.env:MODULE_STAGE}
+          value: first
+        elements:
+          # simple pyfunc blurring frames
+          - element: pyfunc
+            module: samples.key_value_api.functions
+            class_name: First
+
+    - group:
+        # enabled if env var MODULE_STAGE==detector
+        init_condition:
+          expr: ${oc.env:MODULE_STAGE}
+          value: second
+        elements:
+          # simple pyfunc blurring frames
+          - element: pyfunc
+            module: samples.key_value_api.functions
+            class_name: Second
\ No newline at end of file