Skip to content

Commit

Permalink
feat: workers to search audio and video files
Browse files Browse the repository at this point in the history
  • Loading branch information
aatmanvaidya committed Mar 7, 2024
1 parent cf801f1 commit cd94344
Show file tree
Hide file tree
Showing 11 changed files with 201 additions and 4 deletions.
3 changes: 3 additions & 0 deletions src/core/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,6 @@ def exception(self, msg):

def prettyprint(self, msg):
pp.pprint(msg)

def error(self, msg):
self.log.error(msg)
2 changes: 2 additions & 0 deletions src/tests/core/store/test_audio_es_vec.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
curl -X POST "http://es:9200/test_audio/_delete_by_query" -H 'Content-Type: application/json' -d'{"query":{"match_all":{}}}'
Delete the indice
curl -X DELETE "http://es:9200/test_audio"
Refresh the indice
curl -X POST "http://es:9200/test_audio/_refresh"
'''

class TestAudioES(unittest.TestCase):
Expand Down
22 changes: 22 additions & 0 deletions src/worker/audiovec/audio_payload_search_writer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from core.feluda import ComponentType, Feluda
from core.logger import Logger
log = Logger(__name__)
from time import sleep

try:
feluda = Feluda("worker/audiovec/config.yml")
feluda.setup()
audio_index_queue = feluda.config.queue.parameters.queues[2]['name']
feluda.start_component(ComponentType.STORE)
feluda.start_component(ComponentType.QUEUE)

for _ in range(1):
dummy_payload = {
"id": str(12345),
"path": 'https://raw.githubusercontent.com/tattle-made/feluda/main/src/core/operators/sample_data/audio.wav'
}
feluda.queue.message(audio_index_queue, dummy_payload)
sleep(0.3)

except Exception as e:
print("Error Initializing Indexer", e)
2 changes: 1 addition & 1 deletion src/worker/audiovec/audio_payload_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
feluda.start_component(ComponentType.STORE)
feluda.start_component(ComponentType.QUEUE)

for _ in range(25):
for _ in range(10):
dummy_payload = {
"id": str(12345),
"path": 'https://raw.githubusercontent.com/tattle-made/feluda/main/src/core/operators/sample_data/audio.wav'
Expand Down
2 changes: 0 additions & 2 deletions src/worker/audiovec/audio_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from core.logger import Logger
from core.operators import audio_vec_embedding
import json
from datetime import datetime
from core.models.media import MediaType
from core.models.media_factory import AudioFactory
from time import sleep
Expand Down Expand Up @@ -43,7 +42,6 @@ def worker(ch, method, properties, body):
result = feluda.store.store(media_type, doc)
print(result)
report = make_report_indexed(file_content, "indexed")
print(report)
feluda.queue.message(feluda.config.queue.parameters.queues[1]['name'], report)
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
Expand Down
73 changes: 73 additions & 0 deletions src/worker/audiovec/audio_worker_search.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
from core.feluda import ComponentType, Feluda
from core.logger import Logger
from core.operators import audio_vec_embedding
import json
from core.models.media_factory import AudioFactory
from time import sleep
log = Logger(__name__)

def make_report_indexed(data, status):
report = {}
report["indexer_id"] = 1
report["post_id"] = data["id"]
report["status"] = status
report["status_code"] = 200
return json.dumps(report)

def make_report_failed(data, status):
report = {}
report["indexer_id"] = 1
report["post_id"] = data["id"]
report["status"] = status
report["status_code"] = 400
return json.dumps(report)

def indexer(feluda):
def worker(ch, method, properties, body):
print("MESSAGE RECEIVED")
file_content = json.loads(body)
audio_path = AudioFactory.make_from_url(file_content['path'])
try:
audio_vec = audio_vec_embedding.run(audio_path)
search_result = feluda.store.find("audio", audio_vec)
print(search_result)
report = make_report_indexed(file_content, "searched")
feluda.queue.message(feluda.config.queue.parameters.queues[3]['name'], report)
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
print("Error indexing media", e)
# requeue the media file
report = make_report_failed(file_content, "failed")
feluda.queue.message(feluda.config.queue.parameters.queues[3]['name'], report)
ch.basic_nack(delivery_tag=method.delivery_tag)
return worker

def handle_exception(feluda, queue_name, worker_func, retries, max_retries):
retry_interval = 60
if retries < max_retries:
print("Inside Handle Exception")
try:
feluda.start_component(ComponentType.QUEUE)
feluda.queue.listen(queue_name, worker_func)
return
except Exception as e:
print("Error handling exception:", e)
retries = retries + 1
sleep(retry_interval)
handle_exception(feluda, queue_name, worker_func, retries, max_retries)
else:
print("Failed to re-establish connection after maximum retries.")

try:
feluda = Feluda("worker/audiovec/config.yml")
feluda.setup()
audio_search_queue = feluda.config.queue.parameters.queues[2]['name']
feluda.start_component(ComponentType.STORE)
feluda.start_component(ComponentType.QUEUE)
audio_vec_embedding.initialize(param=None)
feluda.queue.listen(audio_search_queue, indexer(feluda))
except Exception as e:
print("Error Initializing Indexer", e)
retries = 0
max_retries = 10
handle_exception(feluda, audio_search_queue, indexer(feluda), retries, max_retries)
2 changes: 2 additions & 0 deletions src/worker/audiovec/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ queue :
queues:
- name : "audio-index-queue"
- name : "report-queue"
- name : "audio-search-queue"
- name : "search-result-queue"

operators :
label : "Operators"
Expand Down
2 changes: 2 additions & 0 deletions src/worker/vidvec/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ queue :
queues:
- name : "video-index-queue"
- name : "report-queue"
- name : "video-search-queue"
- name : "search-result-queue"

operators :
label : "Operators"
Expand Down
22 changes: 22 additions & 0 deletions src/worker/vidvec/video_payload_search_writer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from core.feluda import ComponentType, Feluda
from core.logger import Logger
log = Logger(__name__)
from time import sleep

try:
feluda = Feluda("worker/vidvec/config.yml")
feluda.setup()
video_search_queue = feluda.config.queue.parameters.queues[2]['name']
feluda.start_component(ComponentType.STORE)
feluda.start_component(ComponentType.QUEUE)

for _ in range(1):
dummy_payload = {
"id": str(123),
"path": 'https://raw.githubusercontent.com/tattle-made/feluda/main/src/core/operators/sample_data/sample-cat-video.mp4'
}
feluda.queue.message(video_search_queue, dummy_payload)
sleep(0.1)

except Exception as e:
print("Error Initializing Indexer", e)
1 change: 0 additions & 1 deletion src/worker/vidvec/video_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from core.models.media import MediaType
from core.models.media_factory import VideoFactory
from time import sleep
import subprocess
log = Logger(__name__)

def make_report_indexed(data, status):
Expand Down
74 changes: 74 additions & 0 deletions src/worker/vidvec/video_worker_search.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
from core.feluda import ComponentType, Feluda
from core.logger import Logger
from core.operators import vid_vec_rep_resnet
import json
from core.models.media_factory import VideoFactory
from time import sleep
log = Logger(__name__)

def make_report_indexed(data, status):
report = {}
report["indexer_id"] = 1
report["post_id"] = data["id"]
report["status"] = status
report["status_code"] = 200
return json.dumps(report)

def make_report_failed(data, status):
report = {}
report["indexer_id"] = 1
report["post_id"] = data["id"]
report["status"] = status
report["status_code"] = 400
return json.dumps(report)

def indexer(feluda):
def worker(ch, method, properties, body):
print("MESSAGE RECEIVED")
file_content = json.loads(body)
video_path = VideoFactory.make_from_url(file_content['path'])
try:
print("Processing File:", video_path)
video_vec = vid_vec_rep_resnet.run(video_path)
average_vector = next(video_vec)
search_result = feluda.store.find("video", average_vector.get('vid_vec'))
print(search_result)
report = make_report_indexed(file_content, "searched")
feluda.queue.message(feluda.config.queue.parameters.queues[3]['name'], report)
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
print("Error indexing media", e)
report = make_report_failed(file_content, "failed")
feluda.queue.message(feluda.config.queue.parameters.queues[3]['name'], report)
ch.basic_nack(delivery_tag=method.delivery_tag)
return worker

def handle_exception(feluda, queue_name, worker_func, retries, max_retries):
retry_interval = 60
if retries < max_retries:
print("Inside Handle Exception")
try:
feluda.start_component(ComponentType.QUEUE)
feluda.queue.listen(queue_name, worker_func)
return
except Exception as e:
print("Error handling exception:", e)
retries = retries + 1
sleep(retry_interval)
handle_exception(feluda, queue_name, worker_func, retries, max_retries)
else:
print("Failed to re-establish connection after maximum retries.")

try:
feluda = Feluda("worker/vidvec/config.yml")
feluda.setup()
video_search_queue = feluda.config.queue.parameters.queues[2]['name']
feluda.start_component(ComponentType.STORE)
feluda.start_component(ComponentType.QUEUE)
vid_vec_rep_resnet.initialize(param=None)
feluda.queue.listen(video_search_queue, indexer(feluda))
except Exception as e:
print("Error Initializing Indexer", e)
retries = 0
max_retries = 10
handle_exception(feluda, video_search_queue, indexer(feluda), retries, max_retries)

0 comments on commit cd94344

Please sign in to comment.