Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: workers to search audio and video files #136

Merged
merged 13 commits into from
Mar 7, 2024
112 changes: 57 additions & 55 deletions .ci/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,59 +4,61 @@ services:
context: ./../src
dockerfile: Dockerfile.test
target: test
env_file: ./../src/test.env
command: bash -c "python -c 'import time';
while ! curl store:9200 | grep -q 'You Know, for Search'
&& ! curl queue:15672 | grep -q '<!DOCTYPE html>';
do python -c 'time.sleep(10)';
done;
nose2 tests"
links:
- store
- queue
depends_on:
store:
condition: service_started
queue:
condition: service_started
# env_file: ./../src/test.env
command: bash -c "python -m unittest discover -s ./tests/core/models -p "test_*.py";
python -m unittest core.operators.test_vid_vec_rep_resnet;
python -m unittest core.operators.test_audio_vec_embedding"
# bash -c "python -c 'import time';
# while ! curl store:9200 | grep -q 'You Know, for Search'
# && ! curl queue:15672 | grep -q '<!DOCTYPE html>';
# do python -c 'time.sleep(10)';
# done;
# links:
# - store
# - queue
# depends_on:
# store:
# condition: service_started
# queue:
# condition: service_started

store:
container_name: es
image: docker.elastic.co/elasticsearch/elasticsearch:8.12.0
volumes:
- ./.docker/es/data:/var/lib/elasticsearch/data
# ports:
# - "9300:9300"
# - "9200:9200"
environment:
- xpack.security.enabled=false
- discovery.type=single-node
- http.cors.enabled=true
- http.cors.allow-origin=http://localhost:1358,http://127.0.0.1:1358
- http.cors.allow-headers=X-Requested-With,X-Auth-Token,Content-Type,Content-Length,Authorization
- http.cors.allow-credentials=true
- ES_JAVA_OPTS=-Xms750m -Xmx2g
ulimits:
memlock:
soft: -1
hard: -1
nofile:
soft: 65536
hard: 65536
cap_add:
- IPC_LOCK

queue:
image: rabbitmq:3.12.12-management
container_name: rabbitmq
hostname: rabbit
volumes:
- ./.docker/rabbitmq/data:/var/lib/rabbitmq
- ./.docker/rabbitmq/logs:/var/log/rabbitmq
environment:
RABBITMQ_ERLANG_COOKIE: "secret-cookie"
RABBITMQ_DEFAULT_USER: "admin"
RABBITMQ_DEFAULT_PASS: "Admin123"
# ports:
# - "5672:5672"
# - "15672:15672"
# store:
# container_name: es
# image: docker.elastic.co/elasticsearch/elasticsearch:8.12.0
# volumes:
# - ./.docker/es/data:/var/lib/elasticsearch/data
## ports:
## - "9300:9300"
## - "9200:9200"
# environment:
# - xpack.security.enabled=false
# - discovery.type=single-node
# - http.cors.enabled=true
# - http.cors.allow-origin=http://localhost:1358,http://127.0.0.1:1358
# - http.cors.allow-headers=X-Requested-With,X-Auth-Token,Content-Type,Content-Length,Authorization
# - http.cors.allow-credentials=true
# - ES_JAVA_OPTS=-Xms750m -Xmx2g
# ulimits:
# memlock:
# soft: -1
# hard: -1
# nofile:
# soft: 65536
# hard: 65536
# cap_add:
# - IPC_LOCK
#
# queue:
# image: rabbitmq:3.12.12-management
# container_name: rabbitmq
# hostname: rabbit
# volumes:
# - ./.docker/rabbitmq/data:/var/lib/rabbitmq
# - ./.docker/rabbitmq/logs:/var/log/rabbitmq
# environment:
# RABBITMQ_ERLANG_COOKIE: "secret-cookie"
# RABBITMQ_DEFAULT_USER: "admin"
# RABBITMQ_DEFAULT_PASS: "Admin123"
## ports:
## - "5672:5672"
## - "15672:15672"
70 changes: 40 additions & 30 deletions .github/workflows/merge-main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,64 +26,74 @@ jobs:
with:
python-version: '3.11'

- name: Dry Run to get next release version
id: next_version
run: |
pip install python-semantic-release
export NEXT_TAG_VERSION=$(semantic-release --noop version --print)
echo "new_tag_version=${NEXT_TAG_VERSION}" >> $GITHUB_OUTPUT
# - name: Dry Run to get next release version
# id: next_version
# shell: bash
# run: |
# pip install python-semantic-release
# echo "new_tag_version=$(semantic-release --noop version --print)" >> $GITHUB_OUTPUT

- name: Python Semantic Release
id: release
uses: python-semantic-release/python-semantic-release@master
with:
github_token: ${{ secrets.GITHUB_TOKEN }}

# - name: Declare some variables
# id: vars
# shell: bash
# run: |
# echo "setting variables"
# echo "sha_short=$(git rev-parse --short HEAD)" >> $GITHUB_OUTPUT
# echo "the release status is: ${{ steps.release.outputs.released }}"
# echo "the new version is: ${{ steps.release.outputs.version }}"
# echo "the new tag version is: ${{ steps.release.outputs.tag }}"

- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3

- name: Publish vidvec amd64 benchmark to dockerhub
#if: steps.release.outcome == 'success'
- name: Publish vidvec amd64 worker to dockerhub
if: steps.release.outputs.released == 'true'
uses: elgohr/Publish-Docker-Github-Action@main
with:
username: tattletech
password: ${{ secrets.DOCKER_PASSWORD }}
name: tattletech/feluda-operator-vidvec
workdir: src/
dockerfile: benchmark/vidvec/Dockerfile.vid_vec_rep_resnet
tags: benchmark-amd64-${{ steps.next_version.outputs.new_tag_version }}
dockerfile: worker/vidvec/Dockerfile.video_worker
tags: worker-amd64-${{ steps.release.outputs.tag }}
platforms: linux/amd64

- name: Publish vidvec arm64 benchmark to dockerhub
#if: steps.release.outcome == 'success'
- name: Publish vidvec arm64 worker to dockerhub
if: steps.release.outputs.released == 'true'
uses: elgohr/Publish-Docker-Github-Action@main
with:
username: tattletech
password: ${{ secrets.DOCKER_PASSWORD }}
name: tattletech/feluda-operator-vidvec
workdir: src/
dockerfile: benchmark/vidvec/Dockerfile.vid_vec_rep_resnet.graviton
tags: benchmark-arm64-${{ steps.next_version.outputs.new_tag_version }}
dockerfile: worker/vidvec/Dockerfile.video_worker.graviton
tags: worker-arm64-${{ steps.release.outputs.tag }}
platforms: linux/arm64

# - name: Publish vidvec amd64 worker to dockerhub
# #if: steps.release.outcome == 'success'
# uses: elgohr/Publish-Docker-Github-Action@main
# with:
# username: tattletech
# password: ${{ secrets.DOCKER_PASSWORD }}
# name: tattletech/feluda-operator-vidvec
# workdir: src/
# dockerfile: worker/vidvec/Dockerfile.video_worker
# tags: worker-amd64-${{ steps.next_version.outputs.new_tag_version }}
# platforms: linux/amd64
- name: Publish audiovec amd64 worker to dockerhub
if: steps.release.outputs.released == 'true'
uses: elgohr/Publish-Docker-Github-Action@main
with:
username: tattletech
password: ${{ secrets.DOCKER_PASSWORD }}
name: tattletech/feluda-operator-audiovec
workdir: src/
dockerfile: worker/audiovec/Dockerfile.audio_worker
tags: worker-amd64-${{ steps.release.outputs.tag }}
platforms: linux/amd64

- name: Publish audiovec arm64 worker to dockerhub
if: steps.release.outputs.released == 'true'
uses: elgohr/Publish-Docker-Github-Action@main
with:
username: tattletech
password: ${{ secrets.DOCKER_PASSWORD }}
name: tattletech/feluda-operator-audiovec
workdir: src/
dockerfile: worker/audiovec/Dockerfile.audio_worker.graviton
tags: worker-arm64-${{ steps.release.outputs.tag }}
platforms: linux/arm64

# - name: deploy to cluster
# uses: steebchen/[email protected]
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/pr-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ jobs:
- name: Launch containers to run tests
id: vars
run: |
docker compose -f ./.ci/docker-compose.yml -p ci up -d store queue sut
docker compose -f ./.ci/docker-compose.yml -p ci up -d sut
docker logs -f ci-sut-1
echo "sut_output=$(docker wait ci-sut-1)" >> $GITHUB_OUTPUT

Expand Down
7 changes: 5 additions & 2 deletions src/core/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ def __init__(self, moduleName):
self.environment = os.environ.get("ENVIRONMENT", "DEVELOPMENT")
self.log = logging.getLogger(moduleName)

def info(self, msg):
self.log.info(msg)
def info(self, msg, *args, **kwargs):
self.log.info(msg, *args, **kwargs)

def debug(self, msg):
if self.environment == "DEVELOPMENT":
Expand All @@ -23,3 +23,6 @@ def exception(self, msg):

def prettyprint(self, msg):
pp.pprint(msg)

def error(self, msg, *args, **kwargs):
self.log.error(self, msg, *args, **kwargs)
4 changes: 2 additions & 2 deletions src/core/operators/test_audio_vec_embedding.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def tearDownClass(cls):
# delete config files
pass

# @skip
@skip
def test_sample_audio_from_disk(self):
audio_file_path = AudioFactory.make_from_file_on_disk(r'core/operators/sample_data/audio.wav')
audio_emb = audio_vec_embedding.run(audio_file_path)
Expand All @@ -26,4 +26,4 @@ def test_sample_audio_from_url(self):
"https://raw.githubusercontent.com/tattle-made/feluda/main/src/core/operators/sample_data/audio.wav"
)
audio_emb = audio_vec_embedding.run(audio_path)
self.assertEqual(2048, len(audio_emb))
self.assertEqual(2048, len(audio_emb))
2 changes: 2 additions & 0 deletions src/tests/core/store/test_audio_es_vec.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
curl -X POST "http://es:9200/test_audio/_delete_by_query" -H 'Content-Type: application/json' -d'{"query":{"match_all":{}}}'
Delete the indice
curl -X DELETE "http://es:9200/test_audio"
Refresh the indice
curl -X POST "http://es:9200/test_audio/_refresh"
'''

class TestAudioES(unittest.TestCase):
Expand Down
22 changes: 22 additions & 0 deletions src/worker/audiovec/audio_payload_search_writer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from core.feluda import ComponentType, Feluda
from core.logger import Logger
log = Logger(__name__)
from time import sleep

try:
feluda = Feluda("worker/audiovec/config.yml")
feluda.setup()
audio_index_queue = feluda.config.queue.parameters.queues[2]['name']
feluda.start_component(ComponentType.STORE)
feluda.start_component(ComponentType.QUEUE)

for _ in range(1):
dummy_payload = {
"id": str(12345),
"path": 'https://raw.githubusercontent.com/tattle-made/feluda/main/src/core/operators/sample_data/audio.wav'
}
feluda.queue.message(audio_index_queue, dummy_payload)
sleep(0.3)

except Exception as e:
print("Error Initializing Indexer", e)
2 changes: 1 addition & 1 deletion src/worker/audiovec/audio_payload_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
feluda.start_component(ComponentType.STORE)
feluda.start_component(ComponentType.QUEUE)

for _ in range(25):
for _ in range(10):
dummy_payload = {
"id": str(12345),
"path": 'https://raw.githubusercontent.com/tattle-made/feluda/main/src/core/operators/sample_data/audio.wav'
Expand Down
5 changes: 2 additions & 3 deletions src/worker/audiovec/audio_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from core.logger import Logger
from core.operators import audio_vec_embedding
import json
from datetime import datetime
from core.models.media import MediaType
from core.models.media_factory import AudioFactory
from time import sleep
Expand Down Expand Up @@ -31,6 +30,7 @@ def worker(ch, method, properties, body):
file_content = json.loads(body)
audio_path = AudioFactory.make_from_url(file_content['path'])
try:
log.info("Processing File")
media_type = MediaType.AUDIO
audio_vec = audio_vec_embedding.run(audio_path)
doc = {
Expand All @@ -41,9 +41,8 @@ def worker(ch, method, properties, body):
"date_added": datetime.utcnow(),
}
result = feluda.store.store(media_type, doc)
print(result)
log.info(result)
report = make_report_indexed(file_content, "indexed")
print(report)
feluda.queue.message(feluda.config.queue.parameters.queues[1]['name'], report)
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
Expand Down
Loading