Skip to content

Commit

Permalink
create levels for the poc (#10)
Browse files Browse the repository at this point in the history
* create levels for the poc

* improve ci

* fix module path

* keep flexible the script

* fix task file name

* fix task file name

* fix import
  • Loading branch information
xmnlab authored Apr 29, 2024
1 parent 267ea3a commit 2e01445
Show file tree
Hide file tree
Showing 21 changed files with 334 additions and 62 deletions.
12 changes: 5 additions & 7 deletions .github/workflows/main.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ jobs:
strategy:
matrix:
python_version: ["3.10"]
poc_id: ["poc1", "poc2"]
concurrency:
group: ci-tests-${{ github.ref }}
group: ci-tests-${{ matrix.poc_id }}-${{ github.ref }}
cancel-in-progress: true

defaults:
Expand Down Expand Up @@ -45,13 +46,10 @@ jobs:
run: docker run --name redis -d redis redis-server --save 60 1 --loglevel warning

- name: Start Celery worker
run: celery -A poc_celery.celery_app worker --loglevel=DEBUG &
run: celery -A poc_celery.${{ matrix.poc_id }}.celery_app worker --loglevel=DEBUG &

- name: Run pytest for Collectors
run: pytest -vvv tests/test_tasks_collectors.py

- name: Run pytest for Async tasks
run: pytest -vvv tests/test_tasks_async.py
- name: Run pytest
run: pytest -vvv tests/${{ matrix.poc_id }}

- name: Run linter
run: |
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -160,3 +160,4 @@ cython_debug/
#.idea/

/data
*.db
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,8 @@ bash scripts/setup.sh
This command executes the script that:
1. **Starts a Celery Worker**: Launches a Celery worker instance using
`poc_celery.celery_app` as the application module. This worker listens for
tasks dispatched to the queues and executes them as they arrive.
`poc_celery.poc1.celery_app` as the application module. This worker listens
for tasks dispatched to the queues and executes them as they arrive.
2. **Launches Flower**: Initiates Flower on the default port (5555), allowing
you to access a web-based user interface to monitor and manage the Celery
Expand Down
1 change: 1 addition & 0 deletions conda/dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@ dependencies:
- vulture
- bandit
- mccabe
- sqlalchemy
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ exclude = [
fix = true

[tool.ruff.lint]
ignore = ["PLR0913"]
ignore = ["PLR0913", "RUF012"]
select = [
"E", # pycodestyle
"F", # pyflakes
Expand Down
6 changes: 4 additions & 2 deletions scripts/start_celery_and_flower.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#!/bin/bash

POC_ID=${1:-poc1}

# Fetch the Rabbitmq IP address by directly invoking the get_amqp_ip function
AMQP_IP=$(python -c 'from poc_celery.get_container_ip import get_amqp_ip; print(get_amqp_ip())')

Expand All @@ -13,10 +15,10 @@ echo "Rabbitmq IP: $AMQP_IP"

# Start the Celery worker
echo "Starting Celery worker..."
celery -A poc_celery.celery_app worker --loglevel=INFO &
celery -A poc_celery.${POC_ID}.celery_app worker --loglevel=INFO &

# Start Flower
echo "Starting Flower with Rabbitmq at $AMQP_IP..."
celery -A poc_celery.celery_app flower --broker=amqp://guest:guest@{AMQP_IP}:5672 &
celery -A poc_celery.${POC_ID}.celery_app flower --broker=amqp://guest:guest@{AMQP_IP}:5672 &

echo "Celery and Flower have been started."
3 changes: 0 additions & 3 deletions src/poc_celery/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +0,0 @@
from poc_celery.celery_app import app as celery_app

__all__ = ("celery_app",)
100 changes: 100 additions & 0 deletions src/poc_celery/db.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
from __future__ import annotations

from sqlalchemy import Column, Integer, String, create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

# Base class for declarative class definitions
Base = declarative_base()


class SearchModel(Base):
__tablename__ = "search"
id = Column(Integer, primary_key=True)
query = Column(String, nullable=False)


class ArticleModel(Base):
__tablename__ = "article"
id = Column(Integer, primary_key=True)
search_id = Column(Integer, nullable=False)
meta = Column(String, nullable=True)


class SimpleORM:
context = {"session": None}

@classmethod
def create(cls, **kwargs) -> SimpleORM:
"""
Inserts sample data into the database.
Parameters:
session (Session): The SQLAlchemy session object.
"""
# Creating a new search record
new_object = cls.model(**kwargs)
cls.context["session"].add(new_object)
cls.context["session"].commit()
return new_object

@classmethod
def filter(cls, **kwargs) -> SimpleORM:
# Filtering data based on a condition
query = cls.context["session"].query(cls.model)

# Apply filters based on kwargs
for key, value in kwargs.items():
if not hasattr(cls.model, key):
print(f"Warning: '{key}' is not a valid attribute of Article")
continue

# Construct a filter using the 'like' operator if the value
# contains a wildcard character
if "%" in value:
query = query.filter(getattr(cls.model, key).like(value))
else:
query = query.filter(getattr(cls.model, key) == value)

return query.all()

@classmethod
def setup(cls, url: str = "sqlite:///example.db"):
"""
Setup the database by creating tables and initializing the session.
Parameters:
url (str): The database URL.
Returns:
session (Session): A SQLAlchemy Session object.
"""
engine = create_engine(
url, echo=False
) # Set echo=False to turn off verbose logging
Base.metadata.create_all(engine) # Create all tables
Session = sessionmaker(bind=engine)
cls.context["session"] = Session()
cls.reset()
return cls.context["session"]

@classmethod
def reset(cls):
"""
Resets the database by dropping all tables and recreating them.
"""
# Get the engine from the current session
engine = cls.context["session"].get_bind()
# Drop all tables
Base.metadata.drop_all(engine)
# Create all tables
Base.metadata.create_all(engine)
print("Database has been reset.")


class Search(SimpleORM):
model = SearchModel


class Article(SimpleORM):
model = ArticleModel
3 changes: 3 additions & 0 deletions src/poc_celery/poc1/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from poc_celery.poc1.celery_app import app as celery_app

__all__ = ("celery_app",)
23 changes: 23 additions & 0 deletions src/poc_celery/poc1/celery_app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from celery import Celery

from poc_celery.get_container_ip import get_amqp_ip, get_redis_ip

# Get the Rabbitmq container IP address
AMQP_IP = get_amqp_ip()
REDIS_IP = get_redis_ip()

# Create a Celery instance with Rabbitmq as the broker and result backend
app = Celery(
"poc-celery",
broker=f"amqp://guest:guest@{AMQP_IP}:5672",
backend=f"redis://{REDIS_IP}:6379/0",
include=[
"poc_celery.poc1.tasks_async",
"poc_celery.poc1.tasks_collectors",
],
)

# Set broker_connection_retry_on_startup to True to suppress the warning
app.conf.broker_connection_retry_on_startup = True

app.autodiscover_tasks()
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from pathlib import Path

from poc_celery.celery_app import app
from poc_celery.poc1.celery_app import app

# app = Celery('tasks', broker='your_broker_url', backend='your_backend_url')
DATA_DIR = Path(__file__).parent.parent / "data"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from celery import chord, group

from poc_celery.celery_app import app
from poc_celery.poc1.celery_app import app


def generate_collector_request(topic: str) -> str:
Expand Down
3 changes: 3 additions & 0 deletions src/poc_celery/poc2/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from poc_celery.poc2.celery_app import app as celery_app

__all__ = ("celery_app",)
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@
broker=f"amqp://guest:guest@{AMQP_IP}:5672",
backend=f"redis://{REDIS_IP}:6379/0",
include=[
"poc_celery.tasks_async",
"poc_celery.tasks_collectors",
"poc_celery.poc2.tasks",
],
)

Expand Down
88 changes: 88 additions & 0 deletions src/poc_celery/poc2/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
from __future__ import annotations

from celery import group, shared_task

from poc_celery.db import Article, Search, SimpleORM

SimpleORM.setup()


@shared_task
def search_task(query: str):
"""
Start the pipeline.
Initial task that receives a user's request and triggers collector tasks.
"""
# with transaction.atomic():
search_obj = Search.create(query=query)
search_id = search_obj.id

collectors = [
collector_1.s(search_id),
collector_2.s(search_id),
collector_3.s(search_id),
]
callback = clean_up.s(search_id=search_id).set(
link_error=clean_up.s(search_id=search_id)
)
group(collectors) | callback.delay()


@shared_task(bind=True, max_retries=0)
def collector_1(self, search_id: int):
"""Collect data for collector 1."""
return execute_collector_tasks(search_id, "collector_1")


@shared_task(bind=True, max_retries=0)
def collector_2(self, search_id: int):
"""Collect data for collector 2."""
return execute_collector_tasks(search_id, "collector_2")


@shared_task(bind=True, max_retries=0)
def collector_3(self, search_id: int):
"""Collect data for collector 3."""
return execute_collector_tasks(search_id, "collector_3")


def execute_collector_tasks(search_id: int, collector_name: str):
"""
Execute collector tasks.
Helper function to execute get_list and get_article tasks for a collector.
"""
# Assuming `get_list` generates a list of article IDs for simplicity
article_ids = get_list(search_id, collector_name)
for article_id in article_ids:
get_article.delay(search_id, article_id, collector_name)
return {"status": "Completed", "collector": collector_name}


@shared_task
def get_list(search_id: int, collector_name: str):
"""Simulated task to get a list of articles."""
# Simulate getting a list of article IDs
return [1, 2, 3] # Example article IDs


@shared_task
def get_article(search_id: int, article_id: int, collector_name: str):
"""Task to fetch and save article metadata."""
# Simulate fetching article metadata
metadata = f"Metadata for article {article_id} from {collector_name}"
# with transaction.atomic():
Article.objects.create(search_id=search_id, meta=metadata)


@shared_task
def clean_up(search_id: int):
"""
Clean up temporary storage.
Cleanup task to be triggered when all articles from all collectors
for a specific search are done.
"""
# Implement cleanup logic here, e.g., removing duplicate articles
pass
39 changes: 39 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import pytest


@pytest.fixture(scope="module")
def celery_config():
"""
Provide Celery app configuration for testing.
This fixture is responsible for setting up the Celery app with a specific
configuration suitable for test runs. It defines the broker and result backend
to use Rabbitmq and sets the task execution mode to always eager, which means
tasks will be executed locally and synchronously.
Yields
------
dict
A dictionary containing configuration settings for the Celery application.
"""
return {
"broker_url": "amqp://guest:guest@rabbitmq3:5672",
"result_backend": "redis://localhost:6379/0",
"task_always_eager": True,
}


@pytest.fixture(scope="module")
def celery_enable_logging():
"""
Activate logging for Celery tasks during testing.
This fixture ensures that Celery task logs are visible during test execution,
aiding in debugging and verifying task behavior.
Returns
-------
bool
True to enable Celery task logging, False otherwise.
"""
return True
Empty file added tests/poc1/__init__.py
Empty file.
4 changes: 2 additions & 2 deletions tests/test_tasks_async.py → tests/poc1/test_tasks_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import pytest

from poc_celery.tasks_async import DATA_DIR, clean_data, create_project
from poc_celery.poc1.tasks_async import DATA_DIR, clean_data, create_project


@pytest.fixture
Expand Down Expand Up @@ -56,7 +56,7 @@ def test_create_project(mock_file_io):
async def test_create_project_stress(mock_file_io):
file_path = str(DATA_DIR / "collectors.txt")

num_calls = 100000
num_calls = 10

calls = [
[1, 1, 3],
Expand Down
Loading

0 comments on commit 2e01445

Please sign in to comment.