Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Change setup for services initialization #9

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 31 additions & 10 deletions scripts/start_celery_and_flower.sh
Original file line number Diff line number Diff line change
@@ -1,22 +1,43 @@
#!/bin/bash

# Fetch the Rabbitmq IP address by directly invoking the get_amqp_ip function
AMQP_IP=$(python -c 'from poc_celery.get_container_ip import get_amqp_ip; print(get_amqp_ip())')
# Wait for a service to be available on a specific port
wait_for_service() {
local host=$1
local port=$2
local timeout=$3

echo "Waiting for service on $host:$port..."
for ((i=0; i<timeout; i++)); do
if nc -z $host $port > /dev/null 2>&1; then
echo "Service on $host:$port is available."
return 0
fi
sleep 1
done
echo "Timed out waiting for service on $host:$port."
exit 1
}

# Check for required commands
command -v nc >/dev/null 2>&1 || { echo >&2 "This script requires 'nc' but it's not installed. Aborting."; exit 1; }

# Get RabbitMQ IP from a Python utility, assuming the function is reliable and necessary
AMQP_IP=$(python -c 'from src.poc_celery.get_container_ip import get_amqp_ip; print(get_amqp_ip())')

# Validate the fetched IP
if [ -z "$AMQP_IP" ]; then
echo "Failed to get Rabbitmq IP address."
if [[ -z "$AMQP_IP" ]]; then
echo "Failed to get RabbitMQ IP address."
exit 1
fi

echo "Rabbitmq IP: $AMQP_IP"
# Wait for RabbitMQ to be fully operational
wait_for_service $AMQP_IP 5672 60

# Start the Celery worker
# Start Celery and Flower using the RabbitMQ IP
echo "Starting Celery worker..."
celery -A poc_celery.celery_app worker --loglevel=INFO &
celery -A src.poc_celery.celery_app worker --loglevel=INFO &

# Start Flower
echo "Starting Flower with Rabbitmq at $AMQP_IP..."
celery -A poc_celery.celery_app flower --broker=amqp://guest:guest@{AMQP_IP}:5672 &
echo "Starting Flower for monitoring Celery..."
celery -A src.poc_celery.celery_app flower &

echo "Celery and Flower have been started."
25 changes: 25 additions & 0 deletions scripts/stop_celery_and_flower.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#!/bin/bash

echo "Forcefully killing Celery and Flower processes..."

# Function to forcefully kill a process by its name
force_kill_process() {
local process_name=$1
echo "Searching for processes named $process_name to kill..."

# Use pgrep to find all process IDs matching the process name and kill them
pids=$(pgrep -f "$process_name")
if [ ! -z "$pids" ]; then
echo "Found processes with IDs: $pids. Force killing..."
echo $pids | xargs kill -9
echo "$process_name processes have been forcefully terminated."
else
echo "No $process_name processes found running."
fi
}

# Specific names or part of the command that was used to start Celery and Flower
force_kill_process "celery -A src.poc_celery.celery_app worker"
force_kill_process "celery -A src.poc_celery.celery_app flower"

echo "All relevant Celery and Flower processes have been forcefully killed."
1 change: 1 addition & 0 deletions src/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from src import *
4 changes: 2 additions & 2 deletions src/poc_celery/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from poc_celery.celery_app import app as celery_app
# from poc_celery.celery_app import app as celery_app

__all__ = ("celery_app",)
# __all__ = ("celery_app",)
6 changes: 3 additions & 3 deletions src/poc_celery/celery_app.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from celery import Celery

from poc_celery.get_container_ip import get_amqp_ip, get_redis_ip
from src.poc_celery.get_container_ip import get_amqp_ip, get_redis_ip

# Get the Rabbitmq container IP address
AMQP_IP = get_amqp_ip()
Expand All @@ -12,8 +12,8 @@
broker=f"amqp://guest:guest@{AMQP_IP}:5672",
backend=f"redis://{REDIS_IP}:6379/0",
include=[
"poc_celery.tasks_async",
"poc_celery.tasks_collectors",
"src.poc_celery.tasks_collectors",
"src.poc_celery.tasks_async",
],
)

Expand Down
2 changes: 1 addition & 1 deletion src/poc_celery/tasks_async.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from pathlib import Path

from poc_celery.celery_app import app
from src.poc_celery.celery_app import app

# app = Celery('tasks', broker='your_broker_url', backend='your_backend_url')
DATA_DIR = Path(__file__).parent.parent / "data"
Expand Down
2 changes: 1 addition & 1 deletion src/poc_celery/tasks_collectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from celery import chord, group

from poc_celery.celery_app import app
from src.poc_celery.celery_app import app


def generate_collector_request(topic: str) -> str:
Expand Down
Loading