diff --git a/.github/workflows/docker-push-media-worker-staging.yml b/.github/workflows/docker-push-media-worker-staging.yml new file mode 100644 index 00000000..3e0f651a --- /dev/null +++ b/.github/workflows/docker-push-media-worker-staging.yml @@ -0,0 +1,46 @@ +name: Publish Media Worker to Dockerhub for Staging + +permissions: + contents: read + +on: workflow_dispatch + +jobs: + docker: + runs-on: ubuntu-latest + steps: + - name: Set up QEMU + uses: docker/setup-qemu-action@68827325e0b33c7199eb31dd4e31fbe9023e06e3 # v3.0.0 + + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@2b51285047da1547ffb1b2203d8be4c0af6b1f20 # v3.2.0 + + - name: Login to Docker Hub + uses: docker/login-action@e92390c5fb421da1463c202d546fed0ec5c39f20 # v.3.1.0 + with: + username: ${{ secrets.DOCKERHUB_USERNAME }} + password: ${{ secrets.DOCKERHUB_TOKEN }} + + - name: Build and push amd64 + uses: docker/build-push-action@2cdde995de11925a030ce8070c3d77a52ffcf1c0 # v5.3.0 + with: + context: "{{defaultContext}}:src/" + file: worker/media/Dockerfile.media_worker + platforms: linux/amd64 + build-args: | + "UID=1000" + "GID=1000" + push: true + tags: tattletech/feluda-operator-media:worker-amd64-latest + + - name: Build and push arm64 + uses: docker/build-push-action@2cdde995de11925a030ce8070c3d77a52ffcf1c0 # v5.3.0 + with: + context: "{{defaultContext}}:src/" + file: worker/media/Dockerfile.media_worker_graviton + platforms: linux/arm64 + build-args: | + "UID=1000" + "GID=1000" + push: true + tags: tattletech/feluda-operator-media:worker-arm64-latest diff --git a/CHANGELOG.md b/CHANGELOG.md index f6a8cbb4..edd9eda8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1084,45 +1084,45 @@ refactor: merging development to main ([`22bb325`](https://github.com/tattle-mad * feat: feluda store supports audio (#78) -* feat: feluda store supports audio - -* fix: delete and refresh for ES - +* feat: feluda store supports audio + +* fix: delete and refresh for ES + * dhore: profiling audio operator ([`f6987a6`](https://github.com/tattle-made/feluda/commit/f6987a6d3aa4ff018b5ebac248a4469437df80d3)) * feat: add poc multiprocess test ([`f43646b`](https://github.com/tattle-made/feluda/commit/f43646b4145af6cd8c6ea718be895ecbca77d271)) * feat: audio operator to extract embedding vectors (#59) -* feat: audio emebddings - -* chore: deleting music files - -* chore: renaming files - -* docs: documentation for audio embedding operator - +* feat: audio emebddings + +* chore: deleting music files + +* chore: renaming files + +* docs: documentation for audio embedding operator + * docs: adding work to be done for the operator ([`484d5ae`](https://github.com/tattle-made/feluda/commit/484d5aed902b46d627c060625fad2a64a6246461)) * feat: c-profiling test for video vec (#60) -* feat: c-profiling test for video vec - +* feat: c-profiling test for video vec + * feat: test to find time taken for video vec ([`247f5db`](https://github.com/tattle-made/feluda/commit/247f5db90dc708f04bd0818f790d95c3e2c67a42)) * feat: add workflow to push vidvec specific operator to dockerhub ([`17e0d57`](https://github.com/tattle-made/feluda/commit/17e0d576492e379b2e4165e2b728868ab3fad455)) * feat: operator to detect objects using YOLO (#44) -* feat: operator to detect objects using YOLO - -* test file comment main function - +* feat: operator to detect objects using YOLO + +* test file comment main function + * chore: moving ultralytics install to opreator ([`17b9d10`](https://github.com/tattle-made/feluda/commit/17b9d107464f24875ca7008c4cf81cf0466e45d3)) * feat: operator to extract text in images using tesseract (#40) -* feat: opreator to detect text in images using tesseract +* feat: opreator to detect text in images using tesseract * chore: adding test images and making test multilingual ([`edec4a9`](https://github.com/tattle-made/feluda/commit/edec4a97763dd81e7dd7013833c690890563a1e9)) * feat: add license ([`a44e233`](https://github.com/tattle-made/feluda/commit/a44e233bf36fb4ad0d5bbd41a29523dc1f5364aa)) @@ -1194,8 +1194,8 @@ refactor: merging development to main ([`22bb325`](https://github.com/tattle-mad * fix: video search (#52) -* chore: moving test files to a folder -* fix: video search +* chore: moving test files to a folder +* fix: video search * docs: commenting TODO in search.py ([`af54ac0`](https://github.com/tattle-made/feluda/commit/af54ac0b7e2ef1afc139939e88cfc0f3fcc2dbfc)) * fix: search api as client ([`2573490`](https://github.com/tattle-made/feluda/commit/25734905ed04d6f302542ae152a3954d20e7ad31)) @@ -1238,10 +1238,10 @@ refactor: merging development to main ([`22bb325`](https://github.com/tattle-mad * refactor: benchmark test sh file (#64) -* refactor: benchmark test sh file - -* ci: dockerfile udpate for benchmark.sh - +* refactor: benchmark test sh file + +* ci: dockerfile udpate for benchmark.sh + * chore: echo statements for benchmark file ([`37e768a`](https://github.com/tattle-made/feluda/commit/37e768a38af2dcc169c395dace49bd708eeeaef1)) * refactor: cleanup deprecated thigns. ([`4c67853`](https://github.com/tattle-made/feluda/commit/4c67853e75b8511cfb89158a358f60523851b138)) @@ -1264,8 +1264,8 @@ refactor: merging development to main ([`22bb325`](https://github.com/tattle-mad * test: worker to queue and index video files (#84) -* refactor: small improvements - +* refactor: small improvements + * test: worker to queue and index video vec ([`6eaf19b`](https://github.com/tattle-made/feluda/commit/6eaf19b39298762b6f9a3f34e50858d7314d02e6)) ### Unknown @@ -1437,8 +1437,8 @@ Add ElasticSearch benchmarking ([`03915d3`](https://github.com/tattle-made/felud * [WIP] test: evaluating audio vec ES index and search (#77) -* test: evaluating audio vec ES index and search - +* test: evaluating audio vec ES index and search + * docs: delete stored documents ([`ad94ad7`](https://github.com/tattle-made/feluda/commit/ad94ad745d85c734fe1c7671ce43c4f2a9e28876)) * Merge pull request #76 from duggalsu/add_arch_to_docker_tag diff --git a/src/core/config.py b/src/core/config.py index b344ffae..8f8b689d 100644 --- a/src/core/config.py +++ b/src/core/config.py @@ -8,16 +8,15 @@ """ import logging -from typing import List, Optional +from typing import List, Optional, Union import yaml from dataclasses import dataclass from dacite import from_dict log = logging.getLogger(__name__) - @dataclass -class StoreParameters: +class StoreESParameters: host_name: str image_index_name: str text_index_name: str @@ -26,10 +25,20 @@ class StoreParameters: @dataclass -class StoreConfig: +class StorePostgresParameters: + table_names: List[str] + + +@dataclass +class StoreEntity: label: str type: str - parameters: StoreParameters + parameters: Union[StoreESParameters, StorePostgresParameters] + + +@dataclass +class StoreConfig: + entities: List[StoreEntity] @dataclass diff --git a/src/core/feluda.py b/src/core/feluda.py index fb2de750..d4c36846 100644 --- a/src/core/feluda.py +++ b/src/core/feluda.py @@ -22,7 +22,7 @@ def __init__(self, configPath): if self.config.store: from core import store - self.store = store.get_store(self.config.store) + self.store = store.get_stores(self.config.store) if self.config.queue: # print("---> 1", self.config.queue) from core.queue import Queue @@ -61,8 +61,9 @@ def start_component(self, component_type: ComponentType): if component_type == ComponentType.SERVER and self.server: self.server.start() elif component_type == ComponentType.STORE and self.store: - self.store.connect() - self.store.optionally_create_index() + for store in self.store: + self.store[store].connect() + self.store[store].initialise() elif component_type == ComponentType.QUEUE and self.queue: self.queue.connect() self.queue.initialize() diff --git a/src/core/models/media_factory.py b/src/core/models/media_factory.py index b3bc18d5..52b3cea9 100644 --- a/src/core/models/media_factory.py +++ b/src/core/models/media_factory.py @@ -6,10 +6,11 @@ from werkzeug.datastructures import FileStorage import wget from core.models.media import MediaType +from core.models.s3_utils import AWSS3Utils import logging import os import tempfile -import boto3 +from pydub import AudioSegment log = logging.getLogger(__name__) @@ -70,23 +71,6 @@ def make_from_file_in_memory(image_data: FileStorage): pass class VideoFactory: - aws_access_key_id = os.getenv('AWS_ACCESS_KEY_ID') - aws_secret_access_key = os.getenv('AWS_SECRET_ACCESS_KEY') - aws_region = os.getenv('AWS_REGION') - aws_bucket = os.getenv('AWS_BUCKET') - session = boto3.Session( - aws_access_key_id=aws_access_key_id, - aws_secret_access_key=aws_secret_access_key, - region_name=aws_region - ) - s3 = session.client('s3') - @staticmethod - def download_file_from_s3(bucket_name, file_key, local_file_path): - try: - VideoFactory.s3.download_file(bucket_name, file_key, local_file_path) - print(f"File {file_key} downloaded successfully as {local_file_path}") - except Exception as e: - print(f"Error downloading file {file_key}: {e}") @staticmethod def make_from_url(video_url): @@ -104,13 +88,13 @@ def make_from_url(video_url): print("Error downloading video:", e) raise Exception("Error Downloading Video") else: - bucket_name = VideoFactory.aws_bucket + bucket_name = AWSS3Utils.aws_bucket file_key = video_url file_name = file_key.split("/")[-1] file_path = os.path.join(temp_dir, file_name) try: print("Downloading video from S3") - VideoFactory.download_file_from_s3(bucket_name, file_key, file_path) + AWSS3Utils.download_file_from_s3(bucket_name, file_key, file_path) print("Video downloaded") except Exception as e: print("Error downloading video from S3:", e) @@ -134,21 +118,61 @@ class AudioFactory: @staticmethod def make_from_url(audio_url): temp_dir = tempfile.gettempdir() + + if audio_url.startswith("http"): + temp_url = audio_url.split("?")[0] + file_name = temp_url.split("/")[-1] + ".wav" + file_path = os.path.join(temp_dir, file_name) + try: + print("Downloading audio from URL") + wget.download(audio_url, out=file_path) + print("Audio downloaded") + except Exception as e: + print("Error downloading audio:", e) + raise Exception("Error Downloading audio") + else: + bucket_name = AWSS3Utils.aws_bucket + file_key = audio_url + file_name = file_key.split("/")[-1] + file_path = os.path.join(temp_dir, file_name) + try: + print("Downloading audio from S3") + AWSS3Utils.download_file_from_s3(bucket_name, file_key, file_path) + print("Audio downloaded") + except Exception as e: + print("Error downloading audio from S3:", e) + raise Exception("Error Downloading audio") + + return {"path": file_path} + + @staticmethod + def make_from_url_to_wav(audio_url): + temp_dir = tempfile.gettempdir() temp_url = audio_url.split("?")[0] - file_name = temp_url.split("/")[-1] + ".wav" - audio_file = temp_dir + os.sep + file_name + file_name = temp_url.split("/")[-1] + audio_file = os.path.join(temp_dir, file_name) + try: - print("Downloading audio from url") + print("Downloading audio from URL") wget.download(audio_url, out=audio_file) - print("audio downloaded") + print("\naudio downloaded") + + _, file_extension = os.path.splitext(file_name) + if file_extension != '.wav': + audio = AudioSegment.from_file(audio_file, format=file_extension[1:]) + wav_file = os.path.splitext(audio_file)[0] + '.wav' + audio.export(wav_file, format='wav') + os.remove(audio_file) + audio_file = wav_file except Exception as e: - log.exception("Error downloading audio:", e) - raise Exception("Error Downloading audio") + logging.exception("Error downloading or converting audio:", e) + raise Exception("Error downloading or converting audio") return {"path": audio_file} @staticmethod def make_from_file_on_disk(audio_path): return {"path": audio_path} + media_factory = { diff --git a/src/core/models/s3_utils.py b/src/core/models/s3_utils.py new file mode 100644 index 00000000..df4eff13 --- /dev/null +++ b/src/core/models/s3_utils.py @@ -0,0 +1,23 @@ +import boto3 +import os + +class AWSS3Utils: + aws_access_key_id = os.getenv('AWS_ACCESS_KEY_ID') + aws_secret_access_key = os.getenv('AWS_SECRET_ACCESS_KEY') + aws_region = os.getenv('AWS_REGION') + aws_bucket = os.getenv('AWS_BUCKET') + session = boto3.Session( + aws_access_key_id=aws_access_key_id, + aws_secret_access_key=aws_secret_access_key, + region_name=aws_region + ) + s3 = session.client('s3') + + @staticmethod + def download_file_from_s3(bucket_name, file_key, local_file_path): + try: + AWSS3Utils.s3.download_file(bucket_name, file_key, local_file_path) + print(f"File {file_key} downloaded successfully!") + except Exception as e: + print(f"Error downloading file {file_key}: {e}") + raise Exception("Error Downloading file from S3") \ No newline at end of file diff --git a/src/core/operators/media_file_hash.py b/src/core/operators/media_file_hash.py index 4fb5d01a..513a5903 100644 --- a/src/core/operators/media_file_hash.py +++ b/src/core/operators/media_file_hash.py @@ -1,16 +1,27 @@ def initialize(param): global hashlib import hashlib - + global os + import os + global contextmanager + from contextlib import contextmanager def run(media_path): file_path = media_path["path"] - with open(file_path, "rb") as f: - file_hash = hashlib.blake2b() - while chunk := f.read(4092): - file_hash.update(chunk) - return file_hash.hexdigest() + @contextmanager + def process_file(file_path): + try: + with open(file_path, "rb") as f: + file_hash = hashlib.blake2b() + while chunk := f.read(4092): + file_hash.update(chunk) + yield file_hash.hexdigest() + finally: + os.remove(file_path) + + with process_file(file_path) as hash_value: + return hash_value # if __name__ == "__main__": diff --git a/src/core/operators/test_media_file_hash.py b/src/core/operators/test_media_file_hash.py index 1a1aa14b..f58371e0 100644 --- a/src/core/operators/test_media_file_hash.py +++ b/src/core/operators/test_media_file_hash.py @@ -1,5 +1,5 @@ import unittest -# from unittest.case import skip +from unittest.case import skip from core.operators import media_file_hash from core.models.media_factory import VideoFactory @@ -15,6 +15,7 @@ def tearDownClass(cls): # delete config files pass + @skip def test_sample_media_from_disk(self): media_file_path = VideoFactory.make_from_file_on_disk("core/operators/sample_data/sample-cat-video.mp4") hash = media_file_hash.run(media_file_path) diff --git a/src/core/queue/__init__.py b/src/core/queue/__init__.py index fb57fd06..f053e025 100644 --- a/src/core/queue/__init__.py +++ b/src/core/queue/__init__.py @@ -1,11 +1,13 @@ import logging from . import rabbit_mq +from . import amazon_mq from core.config import QueueConfig + # from os import environ log = logging.getLogger(__name__) -queues = {"rabbitmq": rabbit_mq.RabbitMQ} +queues = {"rabbitmq": rabbit_mq.RabbitMQ, "amazonmq": amazon_mq.AmazonMQ} class Queue: diff --git a/src/core/queue/amazon_mq.py b/src/core/queue/amazon_mq.py new file mode 100644 index 00000000..96375fe4 --- /dev/null +++ b/src/core/queue/amazon_mq.py @@ -0,0 +1,74 @@ +import ssl +import pika +import logging +import json +from core.config import QueueConfig +from os import environ + +log = logging.getLogger(__name__) + + +class AmazonMQ: + def __init__(self, param: QueueConfig): + try: + self.rabbitmq_user = environ.get("MQ_USERNAME") + self.rabbitmq_password = environ.get("MQ_PASSWORD") + self.rabbitmq_broker_id = environ.get("MQ_BROKER_ID") + self.region = environ.get("MQ_REGION") + self.queues = [] + for queue in param.parameters.queues: + self.queues.append(queue["name"]) + except Exception: + print("Invalid parameter") + raise TypeError("Invalid parameters passed to AmazonMQ") + + def connect(self): + try: + # SSL Context for TLS configuration of Amazon MQ for RabbitMQ + ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2) + ssl_context.set_ciphers("ECDHE+AESGCM:!ECDSA") + + url = f"amqps://{self.rabbitmq_user}:{self.rabbitmq_password}@{self.rabbitmq_broker_id}.mq.{self.region}.amazonaws.com:5671" + parameters = pika.URLParameters(url) + parameters.ssl_options = pika.SSLOptions(context=ssl_context) + + self.connection = pika.BlockingConnection(parameters) + self.channel = self.connection.channel() + print("----> 2 ", "Success Connecting to AmazonMQ") + except Exception: + log.exception("Error Connecting to AmazonMQ") + print("Error Connecting to AmazonMQ") + raise Exception("Error connecting to AmazonMQ") + + def initialize(self): + for queue_name in self.queues: + self.channel.queue_declare(queue=queue_name) + print("Queue Declared : ", queue_name) + + def is_connected(self): + return self.channel.is_open + + def message(self, queue_name, payload): + try: + channel = self.connection.channel() + channel.basic_publish( + exchange="", + routing_key=queue_name, + body=json.dumps(payload), + ) + print("Sent message") + except Exception: + log.exception("Error sending message") + print("Error sending message") + raise Exception("Error sending message") + + def listen(self, queue_name, callback): + self.channel.basic_consume( + queue=queue_name, on_message_callback=callback, auto_ack=True + ) + print(" [*] Waiting for messages. To exit press CTRL+C") + self.channel.start_consuming() + + def close(self): + self.channel.close() + self.connection.close() diff --git a/src/core/store/__init__.py b/src/core/store/__init__.py index 9d48590f..7e5b652e 100644 --- a/src/core/store/__init__.py +++ b/src/core/store/__init__.py @@ -1,8 +1,11 @@ from core.config import StoreConfig from . import es_vec +from . import postgresql -stores = {"es_vec": es_vec.ES} +stores = {"es_vec": es_vec.ES, "postgresql": postgresql.PostgreSQLManager} - -def get_store(config: StoreConfig): - return stores[config.type](config) +def get_stores(config: StoreConfig): + stores_dict = {} + for store in config.entities: + stores_dict[store.type] = stores[store.type](store) + return stores_dict \ No newline at end of file diff --git a/src/core/store/es_vec.py b/src/core/store/es_vec.py index 04be99f0..b71f0f37 100644 --- a/src/core/store/es_vec.py +++ b/src/core/store/es_vec.py @@ -14,7 +14,6 @@ class ES: def __init__(self, config: StoreConfig): - # self.es_host = config.parameters.host_name self.es_host = os.environ.get("ES_HOST") self.indices = { "text": config.parameters.text_index_name, @@ -158,3 +157,6 @@ def reset(self): def stats(self): indices = self.get_indices() return indices + + def initialise(self): + self.optionally_create_index() diff --git a/src/core/store/postgresql.py b/src/core/store/postgresql.py index 81f43139..75d357e2 100644 --- a/src/core/store/postgresql.py +++ b/src/core/store/postgresql.py @@ -1,15 +1,17 @@ import psycopg2 +from core.config import StoreConfig from dotenv import load_dotenv import os load_dotenv() class PostgreSQLManager: - def __init__(self, port=5432): + def __init__(self, param: StoreConfig, port=5432, ): self.host = os.getenv("PG_HOST") self.dbname = os.getenv("PG_DB") self.user = os.getenv("PG_USER") self.password = os.getenv("PG_PASS") self.port = port + self.table_name = param.parameters.table_names[0] self.conn = None self.cur = None @@ -101,13 +103,13 @@ def create_trigger(self, table_name): else: print("Not connected to the database. Call connect() first.") - def store(self, table_name, value_column_value, worker_column_value): + def store(self, value_column_value, worker_column_value): if self.cur: try: prepared_stmt = None - if table_name == "user_message_inbox_duplicate": + if self.table_name == "user_message_inbox_duplicate": prepared_stmt = "INSERT INTO user_message_inbox_duplicate (value, worker_name) VALUES (%s, %s)" - elif table_name == "user_message_inbox_perceptually_similar": + elif self.table_name == "user_message_inbox_perceptually_similar": prepared_stmt = "INSERT INTO user_message_inbox_perceptually_similar (value, worker_name) VALUES (%s, %s)" self.cur.execute( @@ -189,14 +191,7 @@ def delete_table(self, table_name): else: print("Not connected to the database. Call connect() first.") - -# if __name__ == "__main__": -# pg_manager = PostgreSQLManager() -# pg_manager.connect() -# pg_manager.create_trigger_function() -# pg_manager.create_table("user_message_inbox_duplicate") -# pg_manager.create_trigger("user_message_inbox_duplicate") -# #pg_manager.store("user_message_inbox_duplicate", "hash_val", "blake2b_hash_value") -# pg_manager.update("user_message_inbox_duplicate", 1, "some_new_hash", "blake2b_hash_value") -# pg_manager.update("user_message_inbox_perceptually_similar", 1, "some_new_hash", "video_vector_crc") -# pg_manager.close_connection() \ No newline at end of file + def initialise(self): + self.create_trigger_function() + self.create_table(self.table_name) + self.create_trigger(self.table_name) \ No newline at end of file diff --git a/src/requirements.in b/src/requirements.in index 259c62fc..a38a1e84 100644 --- a/src/requirements.in +++ b/src/requirements.in @@ -16,4 +16,5 @@ requests==2.31.0 locust==2.23.1 nose2==0.14.1 psycopg2-binary==2.9.9 -boto3==1.34.64 \ No newline at end of file +boto3==1.34.64 +pydub==0.25.1 \ No newline at end of file diff --git a/src/requirements.txt b/src/requirements.txt index e12a3f27..fddb0fb3 100644 --- a/src/requirements.txt +++ b/src/requirements.txt @@ -880,6 +880,10 @@ psycopg2-binary==2.9.9 \ --hash=sha256:f7fc5a5acafb7d6ccca13bfa8c90f8c51f13d8fb87d95656d3950f0158d3ce53 \ --hash=sha256:f9b5571d33660d5009a8b3c25dc1db560206e2d2f89d3df1cb32d72c0d117d52 # via -r requirements.in +pydub==0.25.1 \ + --hash=sha256:65617e33033874b59d87db603aa1ed450633288aefead953b30bded59cb599a6 \ + --hash=sha256:980a33ce9949cab2a569606b65674d748ecbca4f0796887fd6f46173a7b0d30f + # via -r requirements.in pygments==2.17.2 \ --hash=sha256:b27c2826c47d0f3219f29554824c30c5e8945175d888647acd804ddd04af846c \ --hash=sha256:da46cec9fd2de5be3a8a784f434e4c4ab670b4ff54d605c4c2717e9d49c4c367 diff --git a/src/core/store/query.py b/src/worker/hash/__init__.py similarity index 100% rename from src/core/store/query.py rename to src/worker/hash/__init__.py diff --git a/src/worker/hash/config.yml b/src/worker/hash/config.yml index 5ac5fe4e..590382e6 100644 --- a/src/worker/hash/config.yml +++ b/src/worker/hash/config.yml @@ -1,3 +1,11 @@ +store: + entities: + - label: "Postgres Store" + type: "postgresql" + parameters: + table_names: + - "user_message_inbox_duplicate" + queue : label : "Queue" type : "rabbitmq" diff --git a/src/worker/hash/hash_payload_writer.py b/src/worker/hash/hash_payload_writer.py index 926f80dd..c9cf76e0 100644 --- a/src/worker/hash/hash_payload_writer.py +++ b/src/worker/hash/hash_payload_writer.py @@ -2,6 +2,7 @@ from core.logger import Logger from time import sleep import uuid +import sys log = Logger(__name__) try: @@ -9,15 +10,25 @@ feluda.setup() count_queue = feluda.config.queue.parameters.queues[0]['name'] feluda.start_component(ComponentType.QUEUE) + # take media_type from command line + media_type = sys.argv[1] if len(sys.argv) > 1 else "video" + media_paths = { + "video": "https://raw.githubusercontent.com/tattle-made/feluda/main/src/core/operators/sample_data/sample-cat-video.mp4", + "audio": "https://raw.githubusercontent.com/tattle-made/feluda/main/src/core/operators/sample_data/audio.wav", + } + + path = media_paths.get(media_type) + if path is None: + raise ValueError("Unsupported media type") for _ in range(1): unique_id = str(uuid.uuid4()) dummy_payload = { "id": unique_id, - "path": 'https://raw.githubusercontent.com/tattle-made/feluda/main/src/core/operators/sample_data/sample-cat-video.mp4' + "path": path, + "media_type": media_type, } feluda.queue.message(count_queue, dummy_payload) sleep(0.1) - except Exception as e: print("Error Initializing Indexer", e) \ No newline at end of file diff --git a/src/worker/hash/hash_worker.py b/src/worker/hash/hash_worker.py index b34eeb83..4f0fe644 100644 --- a/src/worker/hash/hash_worker.py +++ b/src/worker/hash/hash_worker.py @@ -2,19 +2,24 @@ from core.logger import Logger from core.operators import media_file_hash import json -from core.models.media_factory import VideoFactory -from core.store.postgresql import PostgreSQLManager +from core.models.media_factory import VideoFactory, AudioFactory from time import sleep + log = Logger(__name__) -def make_report_indexed(data, status): + +def make_report_indexed(data, status, hash_value=None): report = {} report["indexer_id"] = 1 report["post_id"] = data["id"] + report["media_type"] = data["media_type"] + if hash_value is not None: + report["hash_value"] = hash_value report["status"] = status report["status_code"] = 200 return json.dumps(report) + def make_report_failed(data, status): report = {} report["indexer_id"] = 1 @@ -23,6 +28,7 @@ def make_report_failed(data, status): report["status_code"] = 400 return json.dumps(report) + def handle_exception(feluda, queue_name, worker_func, retries, max_retries): retry_interval = 60 if retries < max_retries: @@ -39,46 +45,103 @@ def handle_exception(feluda, queue_name, worker_func, retries, max_retries): else: print("Failed to re-establish connection after maximum retries.") + def indexer(feluda): def worker(ch, method, properties, body): print("MESSAGE RECEIVED") + video_hash_value = None + audio_hash_value = None file_content = json.loads(body) - video_path = VideoFactory.make_from_url(file_content['path']) - try: - log.info("Processing file") - hash = media_file_hash.run(video_path) - log.debug(hash) - pg_manager.store("user_message_inbox_duplicate", str(hash), "blake2b_hash_value") - log.info("Hash value added to PostgreSQL") - report = make_report_indexed(file_content, "indexed") - feluda.queue.message(feluda.config.queue.parameters.queues[1]['name'], report) - ch.basic_ack(delivery_tag=method.delivery_tag) - except Exception as e: - print("Error indexing media", e) + file_media_type = file_content["media_type"] + if file_media_type == "video": + log.info("Media Type is Video") + try: + # download the video from url (supports s3) + video_path = VideoFactory.make_from_url(file_content["path"]) + # extrach hash value + video_hash_value = media_file_hash.run(video_path) + log.info(video_hash_value) + # add hash value to database + if feluda.config.store and "postgresql" in feluda.store: + feluda.store["postgresql"].store( + str(video_hash_value), "blake2b_hash_value" + ) + log.info("Hash value added to PostgreSQL") + # send indexed report to report queue + report = make_report_indexed(file_content, "indexed", video_hash_value) + feluda.queue.message( + feluda.config.queue.parameters.queues[1]["name"], report + ) + # send ack + ch.basic_ack(delivery_tag=method.delivery_tag) + except Exception as e: + print("Error indexing media", e) + # send failed report to report queue + report = make_report_failed(file_content, "failed") + feluda.queue.message( + feluda.config.queue.parameters.queues[1]["name"], report + ) + # requeue the media file + ch.basic_ack(delivery_tag=method.delivery_tag) + elif file_media_type == "audio": + log.info("Media Type is Audio") + try: + # download audio file from url (supports S3) + audio_path = AudioFactory.make_from_url(file_content["path"]) + # extrach hash value + audio_hash_value = media_file_hash.run(audio_path) + log.info(audio_hash_value) + # add hash value to database + if feluda.config.store and "postgresql" in feluda.store: + feluda.store["postgresql"].store( + str(audio_hash_value), "blake2b_hash_value" + ) + log.info("Hash value added to PostgreSQL") + # send indexed report to report queue + report = make_report_indexed(file_content, "indexed", audio_hash_value) + feluda.queue.message( + feluda.config.queue.parameters.queues[1]["name"], report + ) + # send ack + ch.basic_ack(delivery_tag=method.delivery_tag) + except Exception as e: + print("Error indexing media", e) + # send failed report to report queue + report = make_report_failed(file_content, "failed") + feluda.queue.message( + feluda.config.queue.parameters.queues[1]["name"], report + ) + # requeue the media file + ch.basic_ack(delivery_tag=method.delivery_tag) + else: + log.info("This media type is not supported currently") + # TODO: send a customised report and then report it to the queue with a ack report = make_report_failed(file_content, "failed") - feluda.queue.message(feluda.config.queue.parameters.queues[1]['name'], report) - # requeue the media file - ch.basic_nack(delivery_tag=method.delivery_tag) + feluda.queue.message( + feluda.config.queue.parameters.queues[1]["name"], report + ) + ch.basic_ack(delivery_tag=method.delivery_tag) + return worker + feluda = None -pg_manager = None count_queue = None try: + # Init Feluda and load config feluda = Feluda("worker/hash/config.yml") feluda.setup() - pg_manager = PostgreSQLManager() - pg_manager.connect() - pg_manager.create_trigger_function() - pg_manager.create_table("user_message_inbox_duplicate") - pg_manager.create_trigger("user_message_inbox_duplicate") - count_queue = feluda.config.queue.parameters.queues[0]['name'] + count_queue = feluda.config.queue.parameters.queues[0]["name"] + # setup Components + feluda.start_component(ComponentType.STORE) feluda.start_component(ComponentType.QUEUE) + # init hash operator media_file_hash.initialize(param=None) + # start listening to the queue feluda.queue.listen(count_queue, indexer(feluda)) except Exception as e: print("Error Initializing Indexer", e) + # Try connecting to Queue again retries = 0 max_retries = 10 handle_exception(feluda, count_queue, indexer(feluda), retries, max_retries) - pg_manager.close_connection() \ No newline at end of file diff --git a/src/worker/media/Dockerfile.media_worker b/src/worker/media/Dockerfile.media_worker new file mode 100644 index 00000000..aeb6d710 --- /dev/null +++ b/src/worker/media/Dockerfile.media_worker @@ -0,0 +1,75 @@ +### BUILDER IMAGE ### +FROM --platform=$TARGETPLATFORM python:3.11-slim-bullseye@sha256:47863f26a5f2e0bfa903e7b658355940250979bd555b5e4f9f25da81647daff8 AS builder +ARG UID +ARG GID + +# Fetch OS packages updates, upgrade packages, and install packages required for build +RUN apt-get update \ + && apt-get -y upgrade \ + && apt-get install -y \ + --no-install-recommends gcc build-essential \ + --no-install-recommends libgl1-mesa-glx libglib2.0-0 \ + --no-install-recommends python3-dev + +# Set python user +RUN groupadd -g $GID python \ + && useradd --create-home -r -u $UID -g python python \ + && mkdir /home/python/app \ + && chown -R python:python /home/python/app/ +# Set working dir +WORKDIR /home/python/app + +# Create venv and change ownership recursively +RUN python -m venv /home/python/app/venv \ + && chown -R python:python /home/python/app/venv +# Set venv in path +ENV PATH="/home/python/app/venv/bin:$PATH" + +# Copy core and operator requirements +COPY --chown=python:python requirements.txt /home/python/app/requirements.txt +COPY --chown=python:python ./core/operators/vid_vec_rep_resnet_requirements.txt /home/python/app/core/operators/vid_vec_rep_resnet_requirements.txt +COPY --chown=python:python ./core/operators/audio_vec_embedding_requirements.txt /home/python/app/core/operators/audio_vec_embedding_requirements.txt + +# Run pip install +RUN pip install --no-cache-dir --upgrade pip \ + && pip install --no-cache-dir --require-hashes --no-deps -r /home/python/app/requirements.txt \ + && pip install --no-cache-dir --require-hashes --no-deps -r /home/python/app/core/operators/vid_vec_rep_resnet_requirements.txt \ + && pip install --no-cache-dir --require-hashes --no-deps -r /home/python/app/core/operators/audio_vec_embedding_requirements.txt + +##################################### + + +### PRODUCTION IMAGE ### +FROM --platform=$TARGETPLATFORM python:3.11-slim-bullseye@sha256:47863f26a5f2e0bfa903e7b658355940250979bd555b5e4f9f25da81647daff8 AS production +ARG UID +ARG GID + +# Update image, install required utils, and remove cache +RUN apt-get update \ + && apt-get -y upgrade \ + && apt-get install -y --no-install-recommends vim zsh curl \ + && apt-get install -y ffmpeg \ + && rm -rf /var/lib/apt/lists/* + +# Set python group and user, create home dir, create app dir, change ownership of app to user +RUN groupadd -g $GID python \ + && useradd --create-home -r -u $UID -g python python \ + && mkdir /home/python/app \ + && chown -R python:python /home/python/app/ + +# Set working dir +WORKDIR /home/python/app + +# Copy output from builder stage +COPY --from=builder /home/python/app /home/python/app + +# Copy all files and change ownership to unprivileged user +COPY --chown=python:python . /home/python/app + +# Set venv path +ENV PATH="/home/python/app/venv/bin:$PATH" + +# Set unprivileged user with group membership +USER python:python + +################################# \ No newline at end of file diff --git a/src/worker/media/Dockerfile.media_worker_graviton b/src/worker/media/Dockerfile.media_worker_graviton new file mode 100644 index 00000000..436ea4da --- /dev/null +++ b/src/worker/media/Dockerfile.media_worker_graviton @@ -0,0 +1,99 @@ +### BUILDER IMAGE ### +FROM --platform=$TARGETPLATFORM python:3.11-slim-bullseye@sha256:47863f26a5f2e0bfa903e7b658355940250979bd555b5e4f9f25da81647daff8 AS builder +ARG UID +ARG GID + +# Fetch OS packages updates, upgrade packages, and install packages required for build +RUN apt-get update \ + && apt-get -y upgrade \ + && apt-get install -y \ + --no-install-recommends gcc build-essential \ + --no-install-recommends libgl1-mesa-glx libglib2.0-0 \ + --no-install-recommends python3-dev + +# Set python user +RUN groupadd -g $GID python \ + && useradd --create-home -r -u $UID -g python python \ + && mkdir /home/python/app \ + && chown -R python:python /home/python/app/ +# Set working dir +WORKDIR /home/python/app + +# Create venv and change ownership recursively +RUN python -m venv /home/python/app/venv \ + && chown -R python:python /home/python/app/venv +# Set venv in path +ENV PATH="/home/python/app/venv/bin:$PATH" + +# Copy core and operator requirements +COPY --chown=python:python requirements.txt /home/python/app/requirements.txt +COPY --chown=python:python ./core/operators/vid_vec_rep_resnet_requirements.txt /home/python/app/core/operators/vid_vec_rep_resnet_requirements.txt +COPY --chown=python:python ./core/operators/audio_vec_embedding_requirements.txt /home/python/app/core/operators/audio_vec_embedding_requirements.txt + +# Run pip install +RUN pip install --no-cache-dir --upgrade pip \ + && pip install --no-cache-dir --require-hashes --no-deps -r /home/python/app/requirements.txt \ + && pip install --no-cache-dir --require-hashes --no-deps -r /home/python/app/core/operators/vid_vec_rep_resnet_requirements.txt \ + && pip install --no-cache-dir --require-hashes --no-deps -r /home/python/app/core/operators/audio_vec_embedding_requirements.txt + +##################################### + + +### PRODUCTION IMAGE ### +FROM --platform=$TARGETPLATFORM python:3.11-slim-bullseye@sha256:47863f26a5f2e0bfa903e7b658355940250979bd555b5e4f9f25da81647daff8 AS production +ARG UID +ARG GID + +# Update image, install required utils, and remove cache +RUN apt-get update \ + && apt-get -y upgrade \ + && apt-get install -y --no-install-recommends vim zsh curl \ + && apt-get install -y ffmpeg \ + && rm -rf /var/lib/apt/lists/* + +# Set python group and user, create home dir, and create app dir for user +RUN groupadd -g $GID python \ + && useradd --create-home -r -u $UID -g python python \ + && mkdir /home/python/app \ + && chown -R python:python /home/python/app/ + +# Set working dir +WORKDIR /home/python/app + +# Copy output from builder stage +COPY --from=builder /home/python/app /home/python/app + +# Copy all files and change ownership to unprivileged user +COPY --chown=python:python . /home/python/app + +# Set venv path +ENV PATH="/home/python/app/venv/bin:$PATH" + +### AWS Graviton Optimization ### + +# Graviton3(E) (e.g. c7g, c7gn and Hpc7g instances) supports BF16 format for ML acceleration. This can be enabled in oneDNN by setting the below environment variable +ENV DNNL_DEFAULT_FPMATH_MODE=BF16 + +# Enable primitive caching to avoid the redundant primitive allocation +# latency overhead. Please note this caching feature increases the +# memory footprint. Tune this cache capacity to a lower value to +# reduce the additional memory requirement. +ENV LRU_CACHE_CAPACITY=1024 + +# Enable Transparent huge page allocations from PyTorch C10 allocator +ENV THP_MEM_ALLOC_ENABLE=1 + +# Make sure the openmp threads are distributed across all the processes for multi process applications to avoid over subscription for the vcpus. For example if there is a single application process, then num_processes should be set to '1' so that all the vcpus are assigned to it with one-to-one mapping to omp threads +# RUN num_vcpus=8 +# RUN num_processes=1 +# RUN temp = $((1 > ($num_vcpus/$num_processes) ? 1 : ($num_vcpus/$num_processes))) +ENV OMP_NUM_THREADS=1 +ENV OMP_PROC_BIND=false +ENV OMP_PLACES=cores +### + + +# Set unprivileged user with group membership +USER python:python + +################################# \ No newline at end of file diff --git a/src/worker/media/__init__.py b/src/worker/media/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/worker/media/config.yml b/src/worker/media/config.yml new file mode 100644 index 00000000..af8e07f6 --- /dev/null +++ b/src/worker/media/config.yml @@ -0,0 +1,34 @@ +store: + entities: + - label: "Data Store" + type: "es_vec" + parameters: + host_name: "es" + image_index_name: "image" + text_index_name: "text" + video_index_name: "video" + audio_index_name: "audio" + - label: "Postgres Store" + type: "postgresql" + parameters: + table_names: + - "user_message_inbox_perceptually_similar" + +queue: + label: "Queue" + type: "rabbitmq" + parameters: + host_name: "rabbitmq" + queues: + - name: "media-index-queue" + - name: "report-queue" + +operators: + label: "Operators" + parameters: + - name: "Video Vector Representation" + type: "vid_vec_rep_resnet" + parameters: { index_name: "video" } + - name: "Audio Vector Representation" + type: "audio_vec_embedding" + parameters: { index_name: "audio" } \ No newline at end of file diff --git a/src/worker/media/media_payload_writer.py b/src/worker/media/media_payload_writer.py new file mode 100644 index 00000000..874da57c --- /dev/null +++ b/src/worker/media/media_payload_writer.py @@ -0,0 +1,34 @@ +from core.feluda import ComponentType, Feluda +from core.logger import Logger +from time import sleep +import uuid +import sys + +log = Logger(__name__) + +try: + feluda = Feluda("worker/media/config.yml") + feluda.setup() + media_index_queue = feluda.config.queue.parameters.queues[0]["name"] + feluda.start_component(ComponentType.QUEUE) + # take media_type from command line + media_type = sys.argv[1] if len(sys.argv) > 1 else "video" + media_paths = { + "video": "https://raw.githubusercontent.com/tattle-made/feluda/main/src/core/operators/sample_data/sample-cat-video.mp4", + "audio": "https://raw.githubusercontent.com/tattle-made/feluda/main/src/core/operators/sample_data/audio.wav", + } + path = media_paths.get(media_type) + if path is None: + raise ValueError("Unsupported media type") + + for _ in range(1): + unique_id = str(uuid.uuid4()) + dummy_payload = { + "id": unique_id, + "path": path, + "media_type": media_type, + } + feluda.queue.message(media_index_queue, dummy_payload) + sleep(0.3) +except Exception as e: + print("Error Sending Payload", e) diff --git a/src/worker/media/media_worker.py b/src/worker/media/media_worker.py new file mode 100755 index 00000000..c7b71121 --- /dev/null +++ b/src/worker/media/media_worker.py @@ -0,0 +1,222 @@ +from core.feluda import ComponentType, Feluda +from core.logger import Logger +from core.operators import vid_vec_rep_resnet +from core.operators import audio_vec_embedding +import json +from datetime import datetime +from core.models.media import MediaType +from core.models.media_factory import VideoFactory +from core.models.media_factory import AudioFactory +from time import sleep +import numpy as np +import binascii + +log = Logger(__name__) + + +def make_report_indexed(data, status, crc_value=None): + report = {} + report["indexer_id"] = 1 + report["post_id"] = data["id"] + report["media_type"] = data["media_type"] + if crc_value is not None: + report["crc_value"] = crc_value + report["status"] = status + report["status_code"] = 200 + return json.dumps(report) + + +def make_report_failed(data, status): + report = {} + report["indexer_id"] = 1 + report["post_id"] = data["id"] + report["media_type"] = data["media_type"] + report["status"] = status + report["status_code"] = 400 + return json.dumps(report) + + +def generate_document(post_id: str, representation: any): + base_doc = { + "e_kosh_id": "", + "dataset": post_id, + "metadata": None, + "date_added": datetime.now().isoformat(), + } + + def generator_doc(): + for vector in representation: + base_doc["_index"] = "video" + base_doc["vid_vec"] = vector["vid_vec"] + base_doc["is_avg"] = vector["is_avg"] + base_doc["duration"] = vector["duration"] + base_doc["n_keyframes"] = vector["n_keyframes"] + yield base_doc + + return generator_doc + + +def calc_video_vec_crc(video_vec_gen): + count = 0 + combined_vec = [[]] + for vector in video_vec_gen: + if count == 0: + # skip first vector - mean of keyframes + count += 1 + else: + combined_vec.append(vector["vid_vec"]) + # remove first list which is empty + combined_vec = combined_vec[1:] + combined_vec_arr = np.asarray(combined_vec) + arr_crc = binascii.crc32(combined_vec_arr.tobytes(order="C")) + return arr_crc + + +def calc_audio_vec_crc(audio_vector): + vec_arr = np.asarray(audio_vector) + arr_crc = binascii.crc32(vec_arr.tobytes(order="C")) + return arr_crc + + +def indexer(feluda): + def worker(ch, method, properties, body): + print("MESSAGE RECEIVED") + video_vec_crc = None + audio_vec_crc = None + file_content = json.loads(body) + file_media_type = file_content["media_type"] + if file_media_type == "video": + log.info("Media Type is Video") + try: + # download the video from url (supports s3) + video_path = VideoFactory.make_from_url(file_content["path"]) + # extract video vectors + video_vec = vid_vec_rep_resnet.run(video_path) + # add crc to database + if feluda.config.store and "postgresql" in feluda.store: + video_vec_crc = calc_video_vec_crc(video_vec) + feluda.store["postgresql"].store( + str(video_vec_crc), "video_vector_crc" + ) + log.info("Video CRC value added to PostgreSQL") + # store in ES + if feluda.config.store and "es_vec" in feluda.store: + # generate document + doc = generate_document(video_path["path"], video_vec) + media_type = MediaType.VIDEO + result = feluda.store["es_vec"].store(media_type, doc) + log.info(result) + # send indexed report to report queue + report = make_report_indexed(file_content, "indexed", video_vec_crc) + feluda.queue.message( + feluda.config.queue.parameters.queues[1]["name"], report + ) + # send ack + ch.basic_ack(delivery_tag=method.delivery_tag) + except Exception as e: + print("Error indexing media", e) + # send failed report to report queue + report = make_report_failed(file_content, "failed") + feluda.queue.message( + feluda.config.queue.parameters.queues[1]["name"], report + ) + # requeue the media file + ch.basic_ack(delivery_tag=method.delivery_tag) + elif file_media_type == "audio": + log.info("Media Type is Audio") + try: + # download audio file from url (supports S3) + audio_path = AudioFactory.make_from_url(file_content["path"]) + # generate audio vec + audio_vec = audio_vec_embedding.run(audio_path) + # add crc to database + if feluda.config.store and "postgresql" in feluda.store: + audio_vec_crc = calc_audio_vec_crc(audio_vec) + feluda.store["postgresql"].store( + str(audio_vec_crc), "audio_vector_crc" + ) + log.info("Audio CRC value added to PostgreSQL") + # store in ES + if feluda.config.store and "es_vec" in feluda.store: + # generate document + doc = { + "e_kosh_id": str(1231231), + "dataset": "test-dataset-id", + "metadata": {}, + "audio_vec": audio_vec, + "date_added": datetime.utcnow(), + } + media_type = MediaType.AUDIO + result = feluda.store["es_vec"].store(media_type, doc) + log.info(result) + # send indexed report to report queue + report = make_report_indexed(file_content, "indexed", audio_vec_crc) + feluda.queue.message( + feluda.config.queue.parameters.queues[1]["name"], report + ) + ch.basic_ack(delivery_tag=method.delivery_tag) + except Exception as e: + print("Error indexing media", e) + # send failed report to report queue + report = make_report_failed(file_content, "failed") + feluda.queue.message( + feluda.config.queue.parameters.queues[1]["name"], report + ) + # requeue the media file + ch.basic_ack(delivery_tag=method.delivery_tag) + else: + log.info("This media type is not supported currently") + # TODO: send a customised report and then report it to the queue with a ack + report = make_report_failed(file_content, "failed") + feluda.queue.message( + feluda.config.queue.parameters.queues[1]["name"], report + ) + ch.basic_ack(delivery_tag=method.delivery_tag) + + return worker + + +def handle_exception(feluda, queue_name, worker_func, retries, max_retries): + retry_interval = 60 + if retries < max_retries: + print("Inside Handle Exception") + try: + feluda.start_component(ComponentType.QUEUE) + feluda.queue.listen(queue_name, worker_func) + return + except Exception as e: + print("Error handling exception:", e) + retries = retries + 1 + sleep(retry_interval) + handle_exception(feluda, queue_name, worker_func, retries, max_retries) + else: + print("Failed to re-establish connection after maximum retries.") + + +feluda = None +media_index_queue = None +try: + # Init Feluda and load config + feluda = Feluda("worker/media/config.yml") + feluda.setup() + media_index_queue = feluda.config.queue.parameters.queues[0]["name"] + # setup Components + feluda.start_component(ComponentType.STORE) + feluda.start_component(ComponentType.QUEUE) + # init all operators + vid_vec_rep_resnet.initialize(param=None) + audio_vec_embedding.initialize(param=None) + # start listening to the queue + feluda.queue.listen(media_index_queue, indexer(feluda)) +except Exception as e: + print("Error Initializing Indexer", e) + # Try connecting to Queue again + retries = 0 + max_retries = 10 + handle_exception( + feluda, + media_index_queue, + indexer(feluda), + retries, + max_retries, + ) \ No newline at end of file