From cd943449b2d94a92e5c62ce2244a73db3955eea0 Mon Sep 17 00:00:00 2001 From: Aatman Vaidya Date: Thu, 7 Mar 2024 13:26:44 +0530 Subject: [PATCH 01/11] feat: workers to search audio and video files --- src/core/logger.py | 3 + src/tests/core/store/test_audio_es_vec.py | 2 + .../audiovec/audio_payload_search_writer.py | 22 ++++++ src/worker/audiovec/audio_payload_writer.py | 2 +- src/worker/audiovec/audio_worker.py | 2 - src/worker/audiovec/audio_worker_search.py | 73 ++++++++++++++++++ src/worker/audiovec/config.yml | 2 + src/worker/vidvec/config.yml | 2 + .../vidvec/video_payload_search_writer.py | 22 ++++++ src/worker/vidvec/video_worker.py | 1 - src/worker/vidvec/video_worker_search.py | 74 +++++++++++++++++++ 11 files changed, 201 insertions(+), 4 deletions(-) create mode 100644 src/worker/audiovec/audio_payload_search_writer.py create mode 100644 src/worker/audiovec/audio_worker_search.py create mode 100644 src/worker/vidvec/video_payload_search_writer.py create mode 100644 src/worker/vidvec/video_worker_search.py diff --git a/src/core/logger.py b/src/core/logger.py index 7d7fd18f..e2f9e3c2 100644 --- a/src/core/logger.py +++ b/src/core/logger.py @@ -23,3 +23,6 @@ def exception(self, msg): def prettyprint(self, msg): pp.pprint(msg) + + def error(self, msg): + self.log.error(msg) diff --git a/src/tests/core/store/test_audio_es_vec.py b/src/tests/core/store/test_audio_es_vec.py index 07965660..8dd7b1c5 100644 --- a/src/tests/core/store/test_audio_es_vec.py +++ b/src/tests/core/store/test_audio_es_vec.py @@ -19,6 +19,8 @@ curl -X POST "http://es:9200/test_audio/_delete_by_query" -H 'Content-Type: application/json' -d'{"query":{"match_all":{}}}' Delete the indice curl -X DELETE "http://es:9200/test_audio" +Refresh the indice +curl -X POST "http://es:9200/test_audio/_refresh" ''' class TestAudioES(unittest.TestCase): diff --git a/src/worker/audiovec/audio_payload_search_writer.py b/src/worker/audiovec/audio_payload_search_writer.py new file mode 100644 index 00000000..0da47a1d --- /dev/null +++ b/src/worker/audiovec/audio_payload_search_writer.py @@ -0,0 +1,22 @@ +from core.feluda import ComponentType, Feluda +from core.logger import Logger +log = Logger(__name__) +from time import sleep + +try: + feluda = Feluda("worker/audiovec/config.yml") + feluda.setup() + audio_index_queue = feluda.config.queue.parameters.queues[2]['name'] + feluda.start_component(ComponentType.STORE) + feluda.start_component(ComponentType.QUEUE) + + for _ in range(1): + dummy_payload = { + "id": str(12345), + "path": 'https://raw.githubusercontent.com/tattle-made/feluda/main/src/core/operators/sample_data/audio.wav' + } + feluda.queue.message(audio_index_queue, dummy_payload) + sleep(0.3) + +except Exception as e: + print("Error Initializing Indexer", e) \ No newline at end of file diff --git a/src/worker/audiovec/audio_payload_writer.py b/src/worker/audiovec/audio_payload_writer.py index aeb22221..e151cdbb 100644 --- a/src/worker/audiovec/audio_payload_writer.py +++ b/src/worker/audiovec/audio_payload_writer.py @@ -10,7 +10,7 @@ feluda.start_component(ComponentType.STORE) feluda.start_component(ComponentType.QUEUE) - for _ in range(25): + for _ in range(10): dummy_payload = { "id": str(12345), "path": 'https://raw.githubusercontent.com/tattle-made/feluda/main/src/core/operators/sample_data/audio.wav' diff --git a/src/worker/audiovec/audio_worker.py b/src/worker/audiovec/audio_worker.py index 5711d879..328e46ff 100644 --- a/src/worker/audiovec/audio_worker.py +++ b/src/worker/audiovec/audio_worker.py @@ -2,7 +2,6 @@ from core.logger import Logger from core.operators import audio_vec_embedding import json -from datetime import datetime from core.models.media import MediaType from core.models.media_factory import AudioFactory from time import sleep @@ -43,7 +42,6 @@ def worker(ch, method, properties, body): result = feluda.store.store(media_type, doc) print(result) report = make_report_indexed(file_content, "indexed") - print(report) feluda.queue.message(feluda.config.queue.parameters.queues[1]['name'], report) ch.basic_ack(delivery_tag=method.delivery_tag) except Exception as e: diff --git a/src/worker/audiovec/audio_worker_search.py b/src/worker/audiovec/audio_worker_search.py new file mode 100644 index 00000000..f258de3b --- /dev/null +++ b/src/worker/audiovec/audio_worker_search.py @@ -0,0 +1,73 @@ +from core.feluda import ComponentType, Feluda +from core.logger import Logger +from core.operators import audio_vec_embedding +import json +from core.models.media_factory import AudioFactory +from time import sleep +log = Logger(__name__) + +def make_report_indexed(data, status): + report = {} + report["indexer_id"] = 1 + report["post_id"] = data["id"] + report["status"] = status + report["status_code"] = 200 + return json.dumps(report) + +def make_report_failed(data, status): + report = {} + report["indexer_id"] = 1 + report["post_id"] = data["id"] + report["status"] = status + report["status_code"] = 400 + return json.dumps(report) + +def indexer(feluda): + def worker(ch, method, properties, body): + print("MESSAGE RECEIVED") + file_content = json.loads(body) + audio_path = AudioFactory.make_from_url(file_content['path']) + try: + audio_vec = audio_vec_embedding.run(audio_path) + search_result = feluda.store.find("audio", audio_vec) + print(search_result) + report = make_report_indexed(file_content, "searched") + feluda.queue.message(feluda.config.queue.parameters.queues[3]['name'], report) + ch.basic_ack(delivery_tag=method.delivery_tag) + except Exception as e: + print("Error indexing media", e) + # requeue the media file + report = make_report_failed(file_content, "failed") + feluda.queue.message(feluda.config.queue.parameters.queues[3]['name'], report) + ch.basic_nack(delivery_tag=method.delivery_tag) + return worker + +def handle_exception(feluda, queue_name, worker_func, retries, max_retries): + retry_interval = 60 + if retries < max_retries: + print("Inside Handle Exception") + try: + feluda.start_component(ComponentType.QUEUE) + feluda.queue.listen(queue_name, worker_func) + return + except Exception as e: + print("Error handling exception:", e) + retries = retries + 1 + sleep(retry_interval) + handle_exception(feluda, queue_name, worker_func, retries, max_retries) + else: + print("Failed to re-establish connection after maximum retries.") + +try: + feluda = Feluda("worker/audiovec/config.yml") + feluda.setup() + audio_search_queue = feluda.config.queue.parameters.queues[2]['name'] + feluda.start_component(ComponentType.STORE) + feluda.start_component(ComponentType.QUEUE) + audio_vec_embedding.initialize(param=None) + feluda.queue.listen(audio_search_queue, indexer(feluda)) +except Exception as e: + print("Error Initializing Indexer", e) + retries = 0 + max_retries = 10 + handle_exception(feluda, audio_search_queue, indexer(feluda), retries, max_retries) \ No newline at end of file diff --git a/src/worker/audiovec/config.yml b/src/worker/audiovec/config.yml index f4f222a8..a8cdf68e 100644 --- a/src/worker/audiovec/config.yml +++ b/src/worker/audiovec/config.yml @@ -16,6 +16,8 @@ queue : queues: - name : "audio-index-queue" - name : "report-queue" + - name : "audio-search-queue" + - name : "search-result-queue" operators : label : "Operators" diff --git a/src/worker/vidvec/config.yml b/src/worker/vidvec/config.yml index 1dc95db7..612d0ff3 100644 --- a/src/worker/vidvec/config.yml +++ b/src/worker/vidvec/config.yml @@ -16,6 +16,8 @@ queue : queues: - name : "video-index-queue" - name : "report-queue" + - name : "video-search-queue" + - name : "search-result-queue" operators : label : "Operators" diff --git a/src/worker/vidvec/video_payload_search_writer.py b/src/worker/vidvec/video_payload_search_writer.py new file mode 100644 index 00000000..044bec5b --- /dev/null +++ b/src/worker/vidvec/video_payload_search_writer.py @@ -0,0 +1,22 @@ +from core.feluda import ComponentType, Feluda +from core.logger import Logger +log = Logger(__name__) +from time import sleep + +try: + feluda = Feluda("worker/vidvec/config.yml") + feluda.setup() + video_search_queue = feluda.config.queue.parameters.queues[2]['name'] + feluda.start_component(ComponentType.STORE) + feluda.start_component(ComponentType.QUEUE) + + for _ in range(1): + dummy_payload = { + "id": str(123), + "path": 'https://raw.githubusercontent.com/tattle-made/feluda/main/src/core/operators/sample_data/sample-cat-video.mp4' + } + feluda.queue.message(video_search_queue, dummy_payload) + sleep(0.1) + +except Exception as e: + print("Error Initializing Indexer", e) \ No newline at end of file diff --git a/src/worker/vidvec/video_worker.py b/src/worker/vidvec/video_worker.py index 0497206c..e9f02a67 100644 --- a/src/worker/vidvec/video_worker.py +++ b/src/worker/vidvec/video_worker.py @@ -6,7 +6,6 @@ from core.models.media import MediaType from core.models.media_factory import VideoFactory from time import sleep -import subprocess log = Logger(__name__) def make_report_indexed(data, status): diff --git a/src/worker/vidvec/video_worker_search.py b/src/worker/vidvec/video_worker_search.py new file mode 100644 index 00000000..099bb2fc --- /dev/null +++ b/src/worker/vidvec/video_worker_search.py @@ -0,0 +1,74 @@ +from core.feluda import ComponentType, Feluda +from core.logger import Logger +from core.operators import vid_vec_rep_resnet +import json +from core.models.media_factory import VideoFactory +from time import sleep +log = Logger(__name__) + +def make_report_indexed(data, status): + report = {} + report["indexer_id"] = 1 + report["post_id"] = data["id"] + report["status"] = status + report["status_code"] = 200 + return json.dumps(report) + +def make_report_failed(data, status): + report = {} + report["indexer_id"] = 1 + report["post_id"] = data["id"] + report["status"] = status + report["status_code"] = 400 + return json.dumps(report) + +def indexer(feluda): + def worker(ch, method, properties, body): + print("MESSAGE RECEIVED") + file_content = json.loads(body) + video_path = VideoFactory.make_from_url(file_content['path']) + try: + print("Processing File:", video_path) + video_vec = vid_vec_rep_resnet.run(video_path) + average_vector = next(video_vec) + search_result = feluda.store.find("video", average_vector.get('vid_vec')) + print(search_result) + report = make_report_indexed(file_content, "searched") + feluda.queue.message(feluda.config.queue.parameters.queues[3]['name'], report) + ch.basic_ack(delivery_tag=method.delivery_tag) + except Exception as e: + print("Error indexing media", e) + report = make_report_failed(file_content, "failed") + feluda.queue.message(feluda.config.queue.parameters.queues[3]['name'], report) + ch.basic_nack(delivery_tag=method.delivery_tag) + return worker + +def handle_exception(feluda, queue_name, worker_func, retries, max_retries): + retry_interval = 60 + if retries < max_retries: + print("Inside Handle Exception") + try: + feluda.start_component(ComponentType.QUEUE) + feluda.queue.listen(queue_name, worker_func) + return + except Exception as e: + print("Error handling exception:", e) + retries = retries + 1 + sleep(retry_interval) + handle_exception(feluda, queue_name, worker_func, retries, max_retries) + else: + print("Failed to re-establish connection after maximum retries.") + +try: + feluda = Feluda("worker/vidvec/config.yml") + feluda.setup() + video_search_queue = feluda.config.queue.parameters.queues[2]['name'] + feluda.start_component(ComponentType.STORE) + feluda.start_component(ComponentType.QUEUE) + vid_vec_rep_resnet.initialize(param=None) + feluda.queue.listen(video_search_queue, indexer(feluda)) +except Exception as e: + print("Error Initializing Indexer", e) + retries = 0 + max_retries = 10 + handle_exception(feluda, video_search_queue, indexer(feluda), retries, max_retries) From 933f6db71e0a0eb800b98affb3702882f8e3e015 Mon Sep 17 00:00:00 2001 From: Aatman Vaidya Date: Thu, 7 Mar 2024 13:51:58 +0530 Subject: [PATCH 02/11] style: fixing logger --- src/core/logger.py | 8 ++++---- src/worker/audiovec/audio_worker.py | 3 ++- src/worker/audiovec/audio_worker_search.py | 3 ++- src/worker/vidvec/video_worker.py | 4 ++-- src/worker/vidvec/video_worker_search.py | 4 ++-- 5 files changed, 12 insertions(+), 10 deletions(-) diff --git a/src/core/logger.py b/src/core/logger.py index e2f9e3c2..ebf4e11b 100644 --- a/src/core/logger.py +++ b/src/core/logger.py @@ -11,8 +11,8 @@ def __init__(self, moduleName): self.environment = os.environ.get("ENVIRONMENT", "DEVELOPMENT") self.log = logging.getLogger(moduleName) - def info(self, msg): - self.log.info(msg) + def info(self, msg, *args, **kwargs): + self.log.info(msg, *args, **kwargs) def debug(self, msg): if self.environment == "DEVELOPMENT": @@ -24,5 +24,5 @@ def exception(self, msg): def prettyprint(self, msg): pp.pprint(msg) - def error(self, msg): - self.log.error(msg) + def error(self, msg, *args, **kwargs): + self.log.error(self, msg, *args, **kwargs) diff --git a/src/worker/audiovec/audio_worker.py b/src/worker/audiovec/audio_worker.py index 328e46ff..b415eaa4 100644 --- a/src/worker/audiovec/audio_worker.py +++ b/src/worker/audiovec/audio_worker.py @@ -30,6 +30,7 @@ def worker(ch, method, properties, body): file_content = json.loads(body) audio_path = AudioFactory.make_from_url(file_content['path']) try: + log.info("Processing File") media_type = MediaType.AUDIO audio_vec = audio_vec_embedding.run(audio_path) doc = { @@ -40,7 +41,7 @@ def worker(ch, method, properties, body): "date_added": datetime.utcnow(), } result = feluda.store.store(media_type, doc) - print(result) + log.info(result) report = make_report_indexed(file_content, "indexed") feluda.queue.message(feluda.config.queue.parameters.queues[1]['name'], report) ch.basic_ack(delivery_tag=method.delivery_tag) diff --git a/src/worker/audiovec/audio_worker_search.py b/src/worker/audiovec/audio_worker_search.py index f258de3b..3e345bd4 100644 --- a/src/worker/audiovec/audio_worker_search.py +++ b/src/worker/audiovec/audio_worker_search.py @@ -28,9 +28,10 @@ def worker(ch, method, properties, body): file_content = json.loads(body) audio_path = AudioFactory.make_from_url(file_content['path']) try: + log.info("Processsing File") audio_vec = audio_vec_embedding.run(audio_path) search_result = feluda.store.find("audio", audio_vec) - print(search_result) + log.info(search_result) report = make_report_indexed(file_content, "searched") feluda.queue.message(feluda.config.queue.parameters.queues[3]['name'], report) ch.basic_ack(delivery_tag=method.delivery_tag) diff --git a/src/worker/vidvec/video_worker.py b/src/worker/vidvec/video_worker.py index e9f02a67..63c2b921 100644 --- a/src/worker/vidvec/video_worker.py +++ b/src/worker/vidvec/video_worker.py @@ -49,12 +49,12 @@ def worker(ch, method, properties, body): file_content = json.loads(body) video_path = VideoFactory.make_from_url(file_content['path']) try: - print("Processing File:", video_path) + log.info("Processing file") video_vec = vid_vec_rep_resnet.run(video_path) doc = generate_document(video_path["path"], video_vec) media_type = MediaType.VIDEO result = feluda.store.store(media_type, doc) - print(result) + log.info(result) report = make_report_indexed(file_content, "indexed") feluda.queue.message(feluda.config.queue.parameters.queues[1]['name'], report) ch.basic_ack(delivery_tag=method.delivery_tag) diff --git a/src/worker/vidvec/video_worker_search.py b/src/worker/vidvec/video_worker_search.py index 099bb2fc..a57e73e0 100644 --- a/src/worker/vidvec/video_worker_search.py +++ b/src/worker/vidvec/video_worker_search.py @@ -28,11 +28,11 @@ def worker(ch, method, properties, body): file_content = json.loads(body) video_path = VideoFactory.make_from_url(file_content['path']) try: - print("Processing File:", video_path) + log.info("Processing File:") video_vec = vid_vec_rep_resnet.run(video_path) average_vector = next(video_vec) search_result = feluda.store.find("video", average_vector.get('vid_vec')) - print(search_result) + log.info(search_result) report = make_report_indexed(file_content, "searched") feluda.queue.message(feluda.config.queue.parameters.queues[3]['name'], report) ch.basic_ack(delivery_tag=method.delivery_tag) From 16add467ccfcc4520efd18f9104ffce5565056b2 Mon Sep 17 00:00:00 2001 From: Aurora <5505558+duggalsu@users.noreply.github.com> Date: Wed, 6 Mar 2024 10:32:55 +0530 Subject: [PATCH 03/11] ci (fix): version output --- .github/workflows/merge-main.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/merge-main.yml b/.github/workflows/merge-main.yml index 4af5aeae..67c269c0 100644 --- a/.github/workflows/merge-main.yml +++ b/.github/workflows/merge-main.yml @@ -28,10 +28,10 @@ jobs: - name: Dry Run to get next release version id: next_version + shell: bash run: | pip install python-semantic-release - export NEXT_TAG_VERSION=$(semantic-release --noop version --print) - echo "new_tag_version=${NEXT_TAG_VERSION}" >> $GITHUB_OUTPUT + echo "new_tag_version=$(semantic-release --noop version --print)" >> $GITHUB_OUTPUT - name: Python Semantic Release id: release From b4bdee409885eff3ff3ad9954f5eb306bf3b6730 Mon Sep 17 00:00:00 2001 From: Aurora <5505558+duggalsu@users.noreply.github.com> Date: Wed, 6 Mar 2024 11:15:24 +0530 Subject: [PATCH 04/11] ci (fix): Increased es java mem limit --- .ci/docker-compose.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.ci/docker-compose.yml b/.ci/docker-compose.yml index b6900da8..e88ef29d 100644 --- a/.ci/docker-compose.yml +++ b/.ci/docker-compose.yml @@ -35,7 +35,7 @@ services: - http.cors.allow-origin=http://localhost:1358,http://127.0.0.1:1358 - http.cors.allow-headers=X-Requested-With,X-Auth-Token,Content-Type,Content-Length,Authorization - http.cors.allow-credentials=true - - ES_JAVA_OPTS=-Xms750m -Xmx2g + - ES_JAVA_OPTS=-Xms750m -Xmx3g ulimits: memlock: soft: -1 From 828b1e492e4434300e9870ec6bfad11c54b6deb1 Mon Sep 17 00:00:00 2001 From: Aurora <5505558+duggalsu@users.noreply.github.com> Date: Wed, 6 Mar 2024 12:40:07 +0530 Subject: [PATCH 05/11] ci (fix): Modify pr testing - Removed integration tests and required components - Used unittest for unit tests - Disabled audio from disk test --- .ci/docker-compose.yml | 113 +++++++++--------- .github/workflows/pr-tests.yml | 2 +- .../operators/test_audio_vec_embedding.py | 4 +- 3 files changed, 61 insertions(+), 58 deletions(-) diff --git a/.ci/docker-compose.yml b/.ci/docker-compose.yml index e88ef29d..35dc0ba9 100644 --- a/.ci/docker-compose.yml +++ b/.ci/docker-compose.yml @@ -4,59 +4,62 @@ services: context: ./../src dockerfile: Dockerfile.test target: test - env_file: ./../src/test.env - command: bash -c "python -c 'import time'; - while ! curl store:9200 | grep -q 'You Know, for Search' - && ! curl queue:15672 | grep -q ''; - do python -c 'time.sleep(10)'; - done; - nose2 tests" - links: - - store - - queue - depends_on: - store: - condition: service_started - queue: - condition: service_started +# env_file: ./../src/test.env + command: | +# bash -c "python -c 'import time'; +# while ! curl store:9200 | grep -q 'You Know, for Search' +# && ! curl queue:15672 | grep -q ''; +# do python -c 'time.sleep(10)'; +# done; + bash -c "python -m unittest discover -s ./tests/core/models -p "test_*.py" | + bash -c "python -m unittest core.operators.test_vid_vec_rep_resnet" | + bash -c "python -m unittest core.operators.test_audio_vec_embedding" +# links: +# - store +# - queue +# depends_on: +# store: +# condition: service_started +# queue: +# condition: service_started - store: - container_name: es - image: docker.elastic.co/elasticsearch/elasticsearch:8.12.0 - volumes: - - ./.docker/es/data:/var/lib/elasticsearch/data -# ports: -# - "9300:9300" -# - "9200:9200" - environment: - - xpack.security.enabled=false - - discovery.type=single-node - - http.cors.enabled=true - - http.cors.allow-origin=http://localhost:1358,http://127.0.0.1:1358 - - http.cors.allow-headers=X-Requested-With,X-Auth-Token,Content-Type,Content-Length,Authorization - - http.cors.allow-credentials=true - - ES_JAVA_OPTS=-Xms750m -Xmx3g - ulimits: - memlock: - soft: -1 - hard: -1 - nofile: - soft: 65536 - hard: 65536 - cap_add: - - IPC_LOCK - - queue: - image: rabbitmq:3.12.12-management - container_name: rabbitmq - hostname: rabbit - volumes: - - ./.docker/rabbitmq/data:/var/lib/rabbitmq - - ./.docker/rabbitmq/logs:/var/log/rabbitmq - environment: - RABBITMQ_ERLANG_COOKIE: "secret-cookie" - RABBITMQ_DEFAULT_USER: "admin" - RABBITMQ_DEFAULT_PASS: "Admin123" -# ports: -# - "5672:5672" -# - "15672:15672" +# store: +# container_name: es +# image: docker.elastic.co/elasticsearch/elasticsearch:8.12.0 +# volumes: +# - ./.docker/es/data:/var/lib/elasticsearch/data +## ports: +## - "9300:9300" +## - "9200:9200" +# environment: +# - xpack.security.enabled=false +# - discovery.type=single-node +# - http.cors.enabled=true +# - http.cors.allow-origin=http://localhost:1358,http://127.0.0.1:1358 +# - http.cors.allow-headers=X-Requested-With,X-Auth-Token,Content-Type,Content-Length,Authorization +# - http.cors.allow-credentials=true +# - ES_JAVA_OPTS=-Xms750m -Xmx2g +# ulimits: +# memlock: +# soft: -1 +# hard: -1 +# nofile: +# soft: 65536 +# hard: 65536 +# cap_add: +# - IPC_LOCK +# +# queue: +# image: rabbitmq:3.12.12-management +# container_name: rabbitmq +# hostname: rabbit +# volumes: +# - ./.docker/rabbitmq/data:/var/lib/rabbitmq +# - ./.docker/rabbitmq/logs:/var/log/rabbitmq +# environment: +# RABBITMQ_ERLANG_COOKIE: "secret-cookie" +# RABBITMQ_DEFAULT_USER: "admin" +# RABBITMQ_DEFAULT_PASS: "Admin123" +## ports: +## - "5672:5672" +## - "15672:15672" diff --git a/.github/workflows/pr-tests.yml b/.github/workflows/pr-tests.yml index 8be3368f..4a7c036c 100644 --- a/.github/workflows/pr-tests.yml +++ b/.github/workflows/pr-tests.yml @@ -26,7 +26,7 @@ jobs: - name: Launch containers to run tests id: vars run: | - docker compose -f ./.ci/docker-compose.yml -p ci up -d store queue sut + docker compose -f ./.ci/docker-compose.yml -p ci up -d sut docker logs -f ci-sut-1 echo "sut_output=$(docker wait ci-sut-1)" >> $GITHUB_OUTPUT diff --git a/src/core/operators/test_audio_vec_embedding.py b/src/core/operators/test_audio_vec_embedding.py index d9b6e28f..c96a7d9e 100644 --- a/src/core/operators/test_audio_vec_embedding.py +++ b/src/core/operators/test_audio_vec_embedding.py @@ -14,7 +14,7 @@ def tearDownClass(cls): # delete config files pass - # @skip + @skip def test_sample_audio_from_disk(self): audio_file_path = AudioFactory.make_from_file_on_disk(r'core/operators/sample_data/audio.wav') audio_emb = audio_vec_embedding.run(audio_file_path) @@ -26,4 +26,4 @@ def test_sample_audio_from_url(self): "https://raw.githubusercontent.com/tattle-made/feluda/main/src/core/operators/sample_data/audio.wav" ) audio_emb = audio_vec_embedding.run(audio_path) - self.assertEqual(2048, len(audio_emb)) \ No newline at end of file + self.assertEqual(2048, len(audio_emb)) From 82bfeb4e545ed15b2f2795bdcac8fcb9dc7c2870 Mon Sep 17 00:00:00 2001 From: Aurora <5505558+duggalsu@users.noreply.github.com> Date: Wed, 6 Mar 2024 12:47:31 +0530 Subject: [PATCH 06/11] ci: Fix yaml issue --- .ci/docker-compose.yml | 6 ------ 1 file changed, 6 deletions(-) diff --git a/.ci/docker-compose.yml b/.ci/docker-compose.yml index 35dc0ba9..4930f251 100644 --- a/.ci/docker-compose.yml +++ b/.ci/docker-compose.yml @@ -4,13 +4,7 @@ services: context: ./../src dockerfile: Dockerfile.test target: test -# env_file: ./../src/test.env command: | -# bash -c "python -c 'import time'; -# while ! curl store:9200 | grep -q 'You Know, for Search' -# && ! curl queue:15672 | grep -q ''; -# do python -c 'time.sleep(10)'; -# done; bash -c "python -m unittest discover -s ./tests/core/models -p "test_*.py" | bash -c "python -m unittest core.operators.test_vid_vec_rep_resnet" | bash -c "python -m unittest core.operators.test_audio_vec_embedding" From 2f47f1985760328c8932c3b5b8c8cbfc31613358 Mon Sep 17 00:00:00 2001 From: Aurora <5505558+duggalsu@users.noreply.github.com> Date: Wed, 6 Mar 2024 12:50:36 +0530 Subject: [PATCH 07/11] ci: Fix yaml issue --- .ci/docker-compose.yml | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/.ci/docker-compose.yml b/.ci/docker-compose.yml index 4930f251..8ebbb856 100644 --- a/.ci/docker-compose.yml +++ b/.ci/docker-compose.yml @@ -4,10 +4,16 @@ services: context: ./../src dockerfile: Dockerfile.test target: test +# env_file: ./../src/test.env command: | - bash -c "python -m unittest discover -s ./tests/core/models -p "test_*.py" | - bash -c "python -m unittest core.operators.test_vid_vec_rep_resnet" | - bash -c "python -m unittest core.operators.test_audio_vec_embedding" +# bash -c "python -c 'import time'; +# while ! curl store:9200 | grep -q 'You Know, for Search' +# && ! curl queue:15672 | grep -q ''; +# do python -c 'time.sleep(10)'; +# done; + bash -c "python -m unittest discover -s ./tests/core/models -p "test_*.py"; + python -m unittest core.operators.test_vid_vec_rep_resnet; + python -m unittest core.operators.test_audio_vec_embedding" # links: # - store # - queue From d0a48f8424de21b928ca0e09c421b87c155cc725 Mon Sep 17 00:00:00 2001 From: Aurora <5505558+duggalsu@users.noreply.github.com> Date: Wed, 6 Mar 2024 13:05:09 +0530 Subject: [PATCH 08/11] ci: Fix docker yml --- .ci/docker-compose.yml | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/.ci/docker-compose.yml b/.ci/docker-compose.yml index 8ebbb856..5eac6ca6 100644 --- a/.ci/docker-compose.yml +++ b/.ci/docker-compose.yml @@ -5,15 +5,14 @@ services: dockerfile: Dockerfile.test target: test # env_file: ./../src/test.env - command: | + command: bash -c "python -m unittest discover -s ./tests/core/models -p "test_*.py"; + python -m unittest core.operators.test_vid_vec_rep_resnet; + python -m unittest core.operators.test_audio_vec_embedding" # bash -c "python -c 'import time'; # while ! curl store:9200 | grep -q 'You Know, for Search' # && ! curl queue:15672 | grep -q ''; # do python -c 'time.sleep(10)'; # done; - bash -c "python -m unittest discover -s ./tests/core/models -p "test_*.py"; - python -m unittest core.operators.test_vid_vec_rep_resnet; - python -m unittest core.operators.test_audio_vec_embedding" # links: # - store # - queue From 455b77c0b2692a6eb19b7499db0e18542b1c101f Mon Sep 17 00:00:00 2001 From: Aurora <5505558+duggalsu@users.noreply.github.com> Date: Thu, 7 Mar 2024 09:49:56 +0530 Subject: [PATCH 09/11] ci: Test fix versioning and modify docker images --- .github/workflows/merge-main.yml | 70 ++++++++++++++++++-------------- 1 file changed, 40 insertions(+), 30 deletions(-) diff --git a/.github/workflows/merge-main.yml b/.github/workflows/merge-main.yml index 67c269c0..a53cb275 100644 --- a/.github/workflows/merge-main.yml +++ b/.github/workflows/merge-main.yml @@ -26,64 +26,74 @@ jobs: with: python-version: '3.11' - - name: Dry Run to get next release version - id: next_version - shell: bash - run: | - pip install python-semantic-release - echo "new_tag_version=$(semantic-release --noop version --print)" >> $GITHUB_OUTPUT +# - name: Dry Run to get next release version +# id: next_version +# shell: bash +# run: | +# pip install python-semantic-release +# echo "new_tag_version=$(semantic-release --noop version --print)" >> $GITHUB_OUTPUT - name: Python Semantic Release id: release uses: python-semantic-release/python-semantic-release@master with: github_token: ${{ secrets.GITHUB_TOKEN }} - -# - name: Declare some variables -# id: vars # shell: bash # run: | -# echo "setting variables" -# echo "sha_short=$(git rev-parse --short HEAD)" >> $GITHUB_OUTPUT +# echo "the release status is: ${{ steps.release.outputs.released }}" +# echo "the new version is: ${{ steps.release.outputs.version }}" +# echo "the new tag version is: ${{ steps.release.outputs.tag }}" - name: Set up Docker Buildx uses: docker/setup-buildx-action@v3 - - name: Publish vidvec amd64 benchmark to dockerhub - #if: steps.release.outcome == 'success' + - name: Publish vidvec amd64 worker to dockerhub + if: steps.release.outputs.released == 'true' uses: elgohr/Publish-Docker-Github-Action@main with: username: tattletech password: ${{ secrets.DOCKER_PASSWORD }} name: tattletech/feluda-operator-vidvec workdir: src/ - dockerfile: benchmark/vidvec/Dockerfile.vid_vec_rep_resnet - tags: benchmark-amd64-${{ steps.next_version.outputs.new_tag_version }} + dockerfile: worker/vidvec/Dockerfile.video_worker + tags: worker-amd64-${{ steps.release.outputs.tag }} platforms: linux/amd64 - - name: Publish vidvec arm64 benchmark to dockerhub - #if: steps.release.outcome == 'success' + - name: Publish vidvec arm64 worker to dockerhub + if: steps.release.outputs.released == 'true' uses: elgohr/Publish-Docker-Github-Action@main with: username: tattletech password: ${{ secrets.DOCKER_PASSWORD }} name: tattletech/feluda-operator-vidvec workdir: src/ - dockerfile: benchmark/vidvec/Dockerfile.vid_vec_rep_resnet.graviton - tags: benchmark-arm64-${{ steps.next_version.outputs.new_tag_version }} + dockerfile: worker/vidvec/Dockerfile.video_worker.graviton + tags: worker-arm64-${{ steps.release.outputs.tag }} platforms: linux/arm64 -# - name: Publish vidvec amd64 worker to dockerhub -# #if: steps.release.outcome == 'success' -# uses: elgohr/Publish-Docker-Github-Action@main -# with: -# username: tattletech -# password: ${{ secrets.DOCKER_PASSWORD }} -# name: tattletech/feluda-operator-vidvec -# workdir: src/ -# dockerfile: worker/vidvec/Dockerfile.video_worker -# tags: worker-amd64-${{ steps.next_version.outputs.new_tag_version }} -# platforms: linux/amd64 + - name: Publish audiovec amd64 worker to dockerhub + if: steps.release.outputs.released == 'true' + uses: elgohr/Publish-Docker-Github-Action@main + with: + username: tattletech + password: ${{ secrets.DOCKER_PASSWORD }} + name: tattletech/feluda-operator-audiovec + workdir: src/ + dockerfile: worker/audiovec/Dockerfile.audio_worker + tags: worker-amd64-${{ steps.release.outputs.tag }} + platforms: linux/amd64 + + - name: Publish audiovec arm64 worker to dockerhub + if: steps.release.outputs.released == 'true' + uses: elgohr/Publish-Docker-Github-Action@main + with: + username: tattletech + password: ${{ secrets.DOCKER_PASSWORD }} + name: tattletech/feluda-operator-audiovec + workdir: src/ + dockerfile: worker/audiovec/Dockerfile.audio_worker.graviton + tags: worker-arm64-${{ steps.release.outputs.tag }} + platforms: linux/arm64 # - name: deploy to cluster # uses: steebchen/kubectl@v2.0.0 From 2301c984b06319adafdd9fda7ea9aa9592ff5991 Mon Sep 17 00:00:00 2001 From: Aatman Vaidya Date: Thu, 7 Mar 2024 13:26:44 +0530 Subject: [PATCH 10/11] feat: workers to search audio and video files --- src/core/logger.py | 3 + src/tests/core/store/test_audio_es_vec.py | 2 + .../audiovec/audio_payload_search_writer.py | 22 ++++++ src/worker/audiovec/audio_payload_writer.py | 2 +- src/worker/audiovec/audio_worker.py | 2 - src/worker/audiovec/audio_worker_search.py | 73 ++++++++++++++++++ src/worker/audiovec/config.yml | 2 + src/worker/vidvec/config.yml | 2 + .../vidvec/video_payload_search_writer.py | 22 ++++++ src/worker/vidvec/video_worker.py | 1 - src/worker/vidvec/video_worker_search.py | 74 +++++++++++++++++++ 11 files changed, 201 insertions(+), 4 deletions(-) create mode 100644 src/worker/audiovec/audio_payload_search_writer.py create mode 100644 src/worker/audiovec/audio_worker_search.py create mode 100644 src/worker/vidvec/video_payload_search_writer.py create mode 100644 src/worker/vidvec/video_worker_search.py diff --git a/src/core/logger.py b/src/core/logger.py index 7d7fd18f..e2f9e3c2 100644 --- a/src/core/logger.py +++ b/src/core/logger.py @@ -23,3 +23,6 @@ def exception(self, msg): def prettyprint(self, msg): pp.pprint(msg) + + def error(self, msg): + self.log.error(msg) diff --git a/src/tests/core/store/test_audio_es_vec.py b/src/tests/core/store/test_audio_es_vec.py index 07965660..8dd7b1c5 100644 --- a/src/tests/core/store/test_audio_es_vec.py +++ b/src/tests/core/store/test_audio_es_vec.py @@ -19,6 +19,8 @@ curl -X POST "http://es:9200/test_audio/_delete_by_query" -H 'Content-Type: application/json' -d'{"query":{"match_all":{}}}' Delete the indice curl -X DELETE "http://es:9200/test_audio" +Refresh the indice +curl -X POST "http://es:9200/test_audio/_refresh" ''' class TestAudioES(unittest.TestCase): diff --git a/src/worker/audiovec/audio_payload_search_writer.py b/src/worker/audiovec/audio_payload_search_writer.py new file mode 100644 index 00000000..0da47a1d --- /dev/null +++ b/src/worker/audiovec/audio_payload_search_writer.py @@ -0,0 +1,22 @@ +from core.feluda import ComponentType, Feluda +from core.logger import Logger +log = Logger(__name__) +from time import sleep + +try: + feluda = Feluda("worker/audiovec/config.yml") + feluda.setup() + audio_index_queue = feluda.config.queue.parameters.queues[2]['name'] + feluda.start_component(ComponentType.STORE) + feluda.start_component(ComponentType.QUEUE) + + for _ in range(1): + dummy_payload = { + "id": str(12345), + "path": 'https://raw.githubusercontent.com/tattle-made/feluda/main/src/core/operators/sample_data/audio.wav' + } + feluda.queue.message(audio_index_queue, dummy_payload) + sleep(0.3) + +except Exception as e: + print("Error Initializing Indexer", e) \ No newline at end of file diff --git a/src/worker/audiovec/audio_payload_writer.py b/src/worker/audiovec/audio_payload_writer.py index aeb22221..e151cdbb 100644 --- a/src/worker/audiovec/audio_payload_writer.py +++ b/src/worker/audiovec/audio_payload_writer.py @@ -10,7 +10,7 @@ feluda.start_component(ComponentType.STORE) feluda.start_component(ComponentType.QUEUE) - for _ in range(25): + for _ in range(10): dummy_payload = { "id": str(12345), "path": 'https://raw.githubusercontent.com/tattle-made/feluda/main/src/core/operators/sample_data/audio.wav' diff --git a/src/worker/audiovec/audio_worker.py b/src/worker/audiovec/audio_worker.py index 5711d879..328e46ff 100644 --- a/src/worker/audiovec/audio_worker.py +++ b/src/worker/audiovec/audio_worker.py @@ -2,7 +2,6 @@ from core.logger import Logger from core.operators import audio_vec_embedding import json -from datetime import datetime from core.models.media import MediaType from core.models.media_factory import AudioFactory from time import sleep @@ -43,7 +42,6 @@ def worker(ch, method, properties, body): result = feluda.store.store(media_type, doc) print(result) report = make_report_indexed(file_content, "indexed") - print(report) feluda.queue.message(feluda.config.queue.parameters.queues[1]['name'], report) ch.basic_ack(delivery_tag=method.delivery_tag) except Exception as e: diff --git a/src/worker/audiovec/audio_worker_search.py b/src/worker/audiovec/audio_worker_search.py new file mode 100644 index 00000000..f258de3b --- /dev/null +++ b/src/worker/audiovec/audio_worker_search.py @@ -0,0 +1,73 @@ +from core.feluda import ComponentType, Feluda +from core.logger import Logger +from core.operators import audio_vec_embedding +import json +from core.models.media_factory import AudioFactory +from time import sleep +log = Logger(__name__) + +def make_report_indexed(data, status): + report = {} + report["indexer_id"] = 1 + report["post_id"] = data["id"] + report["status"] = status + report["status_code"] = 200 + return json.dumps(report) + +def make_report_failed(data, status): + report = {} + report["indexer_id"] = 1 + report["post_id"] = data["id"] + report["status"] = status + report["status_code"] = 400 + return json.dumps(report) + +def indexer(feluda): + def worker(ch, method, properties, body): + print("MESSAGE RECEIVED") + file_content = json.loads(body) + audio_path = AudioFactory.make_from_url(file_content['path']) + try: + audio_vec = audio_vec_embedding.run(audio_path) + search_result = feluda.store.find("audio", audio_vec) + print(search_result) + report = make_report_indexed(file_content, "searched") + feluda.queue.message(feluda.config.queue.parameters.queues[3]['name'], report) + ch.basic_ack(delivery_tag=method.delivery_tag) + except Exception as e: + print("Error indexing media", e) + # requeue the media file + report = make_report_failed(file_content, "failed") + feluda.queue.message(feluda.config.queue.parameters.queues[3]['name'], report) + ch.basic_nack(delivery_tag=method.delivery_tag) + return worker + +def handle_exception(feluda, queue_name, worker_func, retries, max_retries): + retry_interval = 60 + if retries < max_retries: + print("Inside Handle Exception") + try: + feluda.start_component(ComponentType.QUEUE) + feluda.queue.listen(queue_name, worker_func) + return + except Exception as e: + print("Error handling exception:", e) + retries = retries + 1 + sleep(retry_interval) + handle_exception(feluda, queue_name, worker_func, retries, max_retries) + else: + print("Failed to re-establish connection after maximum retries.") + +try: + feluda = Feluda("worker/audiovec/config.yml") + feluda.setup() + audio_search_queue = feluda.config.queue.parameters.queues[2]['name'] + feluda.start_component(ComponentType.STORE) + feluda.start_component(ComponentType.QUEUE) + audio_vec_embedding.initialize(param=None) + feluda.queue.listen(audio_search_queue, indexer(feluda)) +except Exception as e: + print("Error Initializing Indexer", e) + retries = 0 + max_retries = 10 + handle_exception(feluda, audio_search_queue, indexer(feluda), retries, max_retries) \ No newline at end of file diff --git a/src/worker/audiovec/config.yml b/src/worker/audiovec/config.yml index f4f222a8..a8cdf68e 100644 --- a/src/worker/audiovec/config.yml +++ b/src/worker/audiovec/config.yml @@ -16,6 +16,8 @@ queue : queues: - name : "audio-index-queue" - name : "report-queue" + - name : "audio-search-queue" + - name : "search-result-queue" operators : label : "Operators" diff --git a/src/worker/vidvec/config.yml b/src/worker/vidvec/config.yml index 1dc95db7..612d0ff3 100644 --- a/src/worker/vidvec/config.yml +++ b/src/worker/vidvec/config.yml @@ -16,6 +16,8 @@ queue : queues: - name : "video-index-queue" - name : "report-queue" + - name : "video-search-queue" + - name : "search-result-queue" operators : label : "Operators" diff --git a/src/worker/vidvec/video_payload_search_writer.py b/src/worker/vidvec/video_payload_search_writer.py new file mode 100644 index 00000000..044bec5b --- /dev/null +++ b/src/worker/vidvec/video_payload_search_writer.py @@ -0,0 +1,22 @@ +from core.feluda import ComponentType, Feluda +from core.logger import Logger +log = Logger(__name__) +from time import sleep + +try: + feluda = Feluda("worker/vidvec/config.yml") + feluda.setup() + video_search_queue = feluda.config.queue.parameters.queues[2]['name'] + feluda.start_component(ComponentType.STORE) + feluda.start_component(ComponentType.QUEUE) + + for _ in range(1): + dummy_payload = { + "id": str(123), + "path": 'https://raw.githubusercontent.com/tattle-made/feluda/main/src/core/operators/sample_data/sample-cat-video.mp4' + } + feluda.queue.message(video_search_queue, dummy_payload) + sleep(0.1) + +except Exception as e: + print("Error Initializing Indexer", e) \ No newline at end of file diff --git a/src/worker/vidvec/video_worker.py b/src/worker/vidvec/video_worker.py index 0497206c..e9f02a67 100644 --- a/src/worker/vidvec/video_worker.py +++ b/src/worker/vidvec/video_worker.py @@ -6,7 +6,6 @@ from core.models.media import MediaType from core.models.media_factory import VideoFactory from time import sleep -import subprocess log = Logger(__name__) def make_report_indexed(data, status): diff --git a/src/worker/vidvec/video_worker_search.py b/src/worker/vidvec/video_worker_search.py new file mode 100644 index 00000000..099bb2fc --- /dev/null +++ b/src/worker/vidvec/video_worker_search.py @@ -0,0 +1,74 @@ +from core.feluda import ComponentType, Feluda +from core.logger import Logger +from core.operators import vid_vec_rep_resnet +import json +from core.models.media_factory import VideoFactory +from time import sleep +log = Logger(__name__) + +def make_report_indexed(data, status): + report = {} + report["indexer_id"] = 1 + report["post_id"] = data["id"] + report["status"] = status + report["status_code"] = 200 + return json.dumps(report) + +def make_report_failed(data, status): + report = {} + report["indexer_id"] = 1 + report["post_id"] = data["id"] + report["status"] = status + report["status_code"] = 400 + return json.dumps(report) + +def indexer(feluda): + def worker(ch, method, properties, body): + print("MESSAGE RECEIVED") + file_content = json.loads(body) + video_path = VideoFactory.make_from_url(file_content['path']) + try: + print("Processing File:", video_path) + video_vec = vid_vec_rep_resnet.run(video_path) + average_vector = next(video_vec) + search_result = feluda.store.find("video", average_vector.get('vid_vec')) + print(search_result) + report = make_report_indexed(file_content, "searched") + feluda.queue.message(feluda.config.queue.parameters.queues[3]['name'], report) + ch.basic_ack(delivery_tag=method.delivery_tag) + except Exception as e: + print("Error indexing media", e) + report = make_report_failed(file_content, "failed") + feluda.queue.message(feluda.config.queue.parameters.queues[3]['name'], report) + ch.basic_nack(delivery_tag=method.delivery_tag) + return worker + +def handle_exception(feluda, queue_name, worker_func, retries, max_retries): + retry_interval = 60 + if retries < max_retries: + print("Inside Handle Exception") + try: + feluda.start_component(ComponentType.QUEUE) + feluda.queue.listen(queue_name, worker_func) + return + except Exception as e: + print("Error handling exception:", e) + retries = retries + 1 + sleep(retry_interval) + handle_exception(feluda, queue_name, worker_func, retries, max_retries) + else: + print("Failed to re-establish connection after maximum retries.") + +try: + feluda = Feluda("worker/vidvec/config.yml") + feluda.setup() + video_search_queue = feluda.config.queue.parameters.queues[2]['name'] + feluda.start_component(ComponentType.STORE) + feluda.start_component(ComponentType.QUEUE) + vid_vec_rep_resnet.initialize(param=None) + feluda.queue.listen(video_search_queue, indexer(feluda)) +except Exception as e: + print("Error Initializing Indexer", e) + retries = 0 + max_retries = 10 + handle_exception(feluda, video_search_queue, indexer(feluda), retries, max_retries) From dc86b154fd0ffbf19f7f5522e835cea1e15d0566 Mon Sep 17 00:00:00 2001 From: Aatman Vaidya Date: Thu, 7 Mar 2024 13:51:58 +0530 Subject: [PATCH 11/11] style: fixing logger --- src/core/logger.py | 8 ++++---- src/worker/audiovec/audio_worker.py | 3 ++- src/worker/audiovec/audio_worker_search.py | 3 ++- src/worker/vidvec/video_worker.py | 4 ++-- src/worker/vidvec/video_worker_search.py | 4 ++-- 5 files changed, 12 insertions(+), 10 deletions(-) diff --git a/src/core/logger.py b/src/core/logger.py index e2f9e3c2..ebf4e11b 100644 --- a/src/core/logger.py +++ b/src/core/logger.py @@ -11,8 +11,8 @@ def __init__(self, moduleName): self.environment = os.environ.get("ENVIRONMENT", "DEVELOPMENT") self.log = logging.getLogger(moduleName) - def info(self, msg): - self.log.info(msg) + def info(self, msg, *args, **kwargs): + self.log.info(msg, *args, **kwargs) def debug(self, msg): if self.environment == "DEVELOPMENT": @@ -24,5 +24,5 @@ def exception(self, msg): def prettyprint(self, msg): pp.pprint(msg) - def error(self, msg): - self.log.error(msg) + def error(self, msg, *args, **kwargs): + self.log.error(self, msg, *args, **kwargs) diff --git a/src/worker/audiovec/audio_worker.py b/src/worker/audiovec/audio_worker.py index 328e46ff..b415eaa4 100644 --- a/src/worker/audiovec/audio_worker.py +++ b/src/worker/audiovec/audio_worker.py @@ -30,6 +30,7 @@ def worker(ch, method, properties, body): file_content = json.loads(body) audio_path = AudioFactory.make_from_url(file_content['path']) try: + log.info("Processing File") media_type = MediaType.AUDIO audio_vec = audio_vec_embedding.run(audio_path) doc = { @@ -40,7 +41,7 @@ def worker(ch, method, properties, body): "date_added": datetime.utcnow(), } result = feluda.store.store(media_type, doc) - print(result) + log.info(result) report = make_report_indexed(file_content, "indexed") feluda.queue.message(feluda.config.queue.parameters.queues[1]['name'], report) ch.basic_ack(delivery_tag=method.delivery_tag) diff --git a/src/worker/audiovec/audio_worker_search.py b/src/worker/audiovec/audio_worker_search.py index f258de3b..3e345bd4 100644 --- a/src/worker/audiovec/audio_worker_search.py +++ b/src/worker/audiovec/audio_worker_search.py @@ -28,9 +28,10 @@ def worker(ch, method, properties, body): file_content = json.loads(body) audio_path = AudioFactory.make_from_url(file_content['path']) try: + log.info("Processsing File") audio_vec = audio_vec_embedding.run(audio_path) search_result = feluda.store.find("audio", audio_vec) - print(search_result) + log.info(search_result) report = make_report_indexed(file_content, "searched") feluda.queue.message(feluda.config.queue.parameters.queues[3]['name'], report) ch.basic_ack(delivery_tag=method.delivery_tag) diff --git a/src/worker/vidvec/video_worker.py b/src/worker/vidvec/video_worker.py index e9f02a67..63c2b921 100644 --- a/src/worker/vidvec/video_worker.py +++ b/src/worker/vidvec/video_worker.py @@ -49,12 +49,12 @@ def worker(ch, method, properties, body): file_content = json.loads(body) video_path = VideoFactory.make_from_url(file_content['path']) try: - print("Processing File:", video_path) + log.info("Processing file") video_vec = vid_vec_rep_resnet.run(video_path) doc = generate_document(video_path["path"], video_vec) media_type = MediaType.VIDEO result = feluda.store.store(media_type, doc) - print(result) + log.info(result) report = make_report_indexed(file_content, "indexed") feluda.queue.message(feluda.config.queue.parameters.queues[1]['name'], report) ch.basic_ack(delivery_tag=method.delivery_tag) diff --git a/src/worker/vidvec/video_worker_search.py b/src/worker/vidvec/video_worker_search.py index 099bb2fc..a57e73e0 100644 --- a/src/worker/vidvec/video_worker_search.py +++ b/src/worker/vidvec/video_worker_search.py @@ -28,11 +28,11 @@ def worker(ch, method, properties, body): file_content = json.loads(body) video_path = VideoFactory.make_from_url(file_content['path']) try: - print("Processing File:", video_path) + log.info("Processing File:") video_vec = vid_vec_rep_resnet.run(video_path) average_vector = next(video_vec) search_result = feluda.store.find("video", average_vector.get('vid_vec')) - print(search_result) + log.info(search_result) report = make_report_indexed(file_content, "searched") feluda.queue.message(feluda.config.queue.parameters.queues[3]['name'], report) ch.basic_ack(delivery_tag=method.delivery_tag)