diff --git a/.github/workflows/main.yaml b/.github/workflows/main.yaml new file mode 100644 index 0000000..314bdc6 --- /dev/null +++ b/.github/workflows/main.yaml @@ -0,0 +1,44 @@ +name: main +on: + push: + branches: [ main ] + pull_request: + branches: [ main ] + +jobs: + deploy: + runs-on: ubuntu-latest + timeout-minutes: 30 + strategy: + matrix: + python_version: ["3.10"] + concurrency: + group: ci-${{ github.ref }} + cancel-in-progress: true + + defaults: + run: + shell: bash -l {0} + + steps: + - uses: actions/checkout@v2 + + - uses: conda-incubator/setup-miniconda@v2 + with: + miniconda-version: "latest" + mamba-version: "*" + environment-file: conda/base.yaml + channels: conda-forge,nodefaults + activate-environment: celery-collectors + use-mamba: true + miniforge-variant: Mambaforge + + - name: Run Redis container + run: | + docker run --name redis -d redis redis-server --save 60 1 --loglevel warning + + - name: Start Celery worker + run: celery -A main.celery_app worker --loglevel=DEBUG + + - name: Run pytest + run: pytest -vvv main/tests/tests_tasks.py diff --git a/README.md b/README.md index b556442..be3aac9 100644 --- a/README.md +++ b/README.md @@ -1 +1,127 @@ -# poc-celery \ No newline at end of file +# Celery with Subcollectors + +## Overview + +This proof of concept (PoC) demonstrates a distributed task queue implementation using Celery with Redis as the message broker. The project simulates a data collection and processing workflow using a system of subcollectors. Each subcollector generates a random list of integers for a given topic, and the results are aggregated, cleaned, and processed. + +### Purpose + +The primary goal of this PoC is to test and demonstrate the capability of Celery to manage complex workflows involving multiple asynchronous tasks. It showcases how to: + +- Dispatch tasks to subcollectors in parallel. +- Aggregate results from all subcollectors. +- Perform cleanup and processing on aggregated data. + +## Project Structure + +- `celery_app.py`: Initializes the Celery app and includes the configuration for the Celery connection to Redis. +- `tasks.py`: Contains the definition of all Celery tasks, including subcollectors, data aggregation, cleanup, and processing tasks. +- `tests/`: Directory containing test files to validate the functionality of the Celery tasks. + - `test_tasks.py`: Implements tests for each Celery task using pytest. +- `conda/base.yaml`: Conda environment file listing all necessary Python dependencies for the project. + +## Workflow + +The Celery workflow in this project orchestrates a series of tasks to simulate a data collection and processing pipeline using subcollector tasks. The workflow leverages Celery's capabilities for asynchronous task execution, task grouping, and result aggregation. Below is a step-by-step breakdown of the workflow: + +### 1. Task Initiation with `collector_request` + +- **Functionality**: The workflow begins with the `collector_request` task, which initiates the data collection process based on a specified topic. +- **Key Methods**: + - `generate_collector_request(topic: str) -> str`: Generates a unique request ID for the collection request. + - `group()`: Groups multiple subcollector tasks (`collector_1`, `collector_2`, `collector_3`) to be executed in parallel. + - `chord(group())(callback)`: A `chord` is a Celery primitive that takes a group of tasks and a callback task. It ensures that the callback task (`collector_gathering`) is executed only after all tasks in the group have completed. + +### 2. Parallel Execution of Subcollector Tasks + +- **Subcollectors**: `collector_1`, `collector_2`, `collector_3` +- **Functionality**: Each subcollector task generates a random list of integers simulating the collection of data for the given topic. +- **Execution**: These tasks are executed in parallel as part of the `group` passed to the `chord`. This parallel execution is enabled by the `.apply_async()` method, ensuring that each task can run concurrently without waiting for the others. + +### 3. Aggregation and Processing + +- **Callback Task**: `collector_gathering` + - **Functionality**: Aggregates the results from all subcollector tasks. This task acts as the callback for the `chord`, which means it automatically receives the aggregated results of the group as its input. + - **Method Calls**: + - `cleanup.delay(combined_result, request_id)`: After aggregation, the `cleanup` task is called asynchronously with the `.delay()` method, passing the combined results for further processing. +- **Cleanup Task**: `cleanup` + - **Functionality**: Performs preliminary processing or cleanup on the aggregated data. + - **Method Calls**: + - `process.delay(data, request_id)`: Calls the `process` task asynchronously for final processing. +- **Process Task**: `process` + - **Functionality**: Conducts the final processing of the data. In this example, it counts the total items and prints the result. + - **Returns**: A dictionary with the `request_id` and the `item_count`, providing a simple summary of the processing outcome. + +### Summary + +This workflow demonstrates the power of Celery for handling complex asynchronous task pipelines. It showcases task parallelization (`group`), conditional task execution based on the completion of a group of tasks (`chord`), and chaining further processing steps (`delay`). Each task is designed to perform a specific role within the data collection and processing pipeline, from initiating collection requests to final data processing. + +### Best Practices + +- **Asynchronous Execution**: Utilize Celery's asynchronous task execution to enhance performance and scalability. +- **Task Chaining and Callbacks**: Leverage `chord` and `.delay()` for task chaining and callbacks, ensuring tasks are executed in the desired order and only after necessary prerequisites are met. +- **Error Handling**: Implement comprehensive error handling within tasks to manage failures gracefully and maintain workflow integrity. + +## Setup Instructions + +### Prerequisites + +- Docker +- Conda or Mamba + +### Environment Setup + +1. Clone the repository and navigate to the project directory. + +2. Create a new Conda environment using the `base.yaml` file: + + ```bash + mamba env create -f conda/base.yaml + ``` + + Or, if you're using Conda: + + ```bash + mamba env create -f conda/base.yaml + ``` + +3. Activate the new environment: + + ```bash + mamba activate celery-collectors + ``` + +### Redis Setup Using Docker + +Run a Redis container with the following command: + +```bash +docker run --name redis -d redis redis-server --save 60 1 --loglevel warning +``` + +This command starts a Redis server in a Docker container named `redis`, with data saving configured and log level set to `warning`. + +### Starting the Celery App + +With the Redis server running and the environment activated, start the Celery worker: + +```bash +celery -A main.celery_app worker --loglevel=info +``` + +This command initiates a Celery worker that listens for tasks as defined in `main.celery_app`. + +### Running Tests + +Ensure your Celery worker and Redis server are running, then execute the tests using pytest: + +```bash +pytest tests/ +``` + +This command runs all tests defined in the `tests/` directory, verifying the functionality of your Celery tasks. + +## Conclusion + +This PoC demonstrates a scalable and efficient way to manage distributed tasks using Celery with Redis as a message broker. It exemplifies a practical application of Celery's capabilities in handling complex workflows with multiple asynchronous tasks, showcasing a system that can be adapted for various data processing and aggregation needs. + diff --git a/conda/base.yaml b/conda/base.yaml new file mode 100644 index 0000000..d103949 --- /dev/null +++ b/conda/base.yaml @@ -0,0 +1,12 @@ +name: celery-collectors +channels: + - nodefaults + - conda-forge +dependencies: + - python 3.10.* + - celery[redis] + - pip: + - redis + - pytest + - pytest-celery + - factory_boy diff --git a/main/__init__.py b/main/__init__.py new file mode 100644 index 0000000..0c951d8 --- /dev/null +++ b/main/__init__.py @@ -0,0 +1,3 @@ +from main.celery_app import app as celery_app + +__all__ = ("celery_app",) diff --git a/main/celery_app.py b/main/celery_app.py new file mode 100644 index 0000000..472cb56 --- /dev/null +++ b/main/celery_app.py @@ -0,0 +1,15 @@ + +from celery import Celery + +from main.scripts.get_container_ip import get_redis_ip + +# Get the Redis container IP address +REDIS_IP = get_redis_ip() + +# Create a Celery instance with Redis as the broker and result backend +app = Celery('celery-collectors', broker=f'redis://{REDIS_IP}:6379/0', backend=f'redis://{REDIS_IP}:6379/0', include=["main.tasks"]) + +# Set broker_connection_retry_on_startup to True to suppress the warning +app.conf.broker_connection_retry_on_startup = True + +app.autodiscover_tasks() diff --git a/main/scripts/get_container_ip.py b/main/scripts/get_container_ip.py new file mode 100644 index 0000000..310ebe9 --- /dev/null +++ b/main/scripts/get_container_ip.py @@ -0,0 +1,28 @@ +import subprocess +import json + +def get_redis_ip(): + """ + Get the IP address of the Redis container. + + This function runs the docker inspect command to retrieve information about the Redis container, + parses the output as JSON, and extracts the IP address from the container information. + + Returns: + str: The IP address of the Redis container. + + Raises: + subprocess.CalledProcessError: If the docker inspect command fails. + json.JSONDecodeError: If the output of the docker inspect command cannot be decoded as JSON. + IndexError: If the container information does not contain the expected structure. + KeyError: If the container information does not contain the expected keys. + """ + # Run the docker inspect command to get information about the redis container + result = subprocess.run(["docker", "inspect", "redis"], capture_output=True, text=True) + + # Parse the output as JSON + container_info = json.loads(result.stdout) + + # Extract the IP address from the container information + ip_address = container_info[0]['NetworkSettings']['IPAddress'] + return ip_address diff --git a/main/tasks.py b/main/tasks.py new file mode 100644 index 0000000..a98d5c8 --- /dev/null +++ b/main/tasks.py @@ -0,0 +1,165 @@ +import random +import uuid + +from celery import group, chord + +from main.celery_app import app + + +def generate_collector_request(topic: str) -> str: + """ + Generate a unique identifier for a collector request based on the given topic. + + Parameters + ---------- + topic : str + The topic name for which the collector request is being generated. + + Returns + ------- + str + A unique identifier (UUID4) for the collector request. + """ + return str(uuid.uuid4()) + +@app.task +def collector_request(topic: str): + """ + Generate a unique identifier for a collector request based on the given topic. + + Initiates the collection request by dispatching tasks to subcollectors + and processes the aggregated results asynchronously using a chord. + + Parameters + ---------- + topic : str + The topic for which the collection is requested. + """ + request_id = generate_collector_request(topic) + callback = collector_gathering.s(request_id) + chord_tasks = group( + collector_1.s(topic, request_id), + collector_2.s(topic, request_id), + collector_3.s(topic, request_id) + ) + chord(chord_tasks)(callback) + +@app.task +def collector_1(topic: str, request_id: str) -> list: + """ + Generate a random list of integers for the given topic. + + A subcollector task that generates a random list of integers for the given topic. + + Parameters + ---------- + topic : str + The topic for which data is being collected. + request_id : str + A unique identifier for the collection request. + + Returns + ------- + list + A list of random integers. + """ + return [random.randint(0, 100) for _ in range(random.randint(0, 10))] + + +@app.task +def collector_2(topic: str, request_id: str) -> None: + """ + Processes the second subcollector task. + + A subcollector task that generates a random list of integers for the given topic. + + Parameters + ---------- + topic : str + The topic for which data is being collected. + request_id : str + A unique identifier for the collection request. + + Returns + ------- + list + A list of random integers. + """ + random_list = [random.randint(0, 100) for _ in range(random.randint(0, 10))] + collector_gathering.s(random_list, request_id).apply_async() + +@app.task +def collector_3(topic: str, request_id: str) -> None: + """ + Processes the third subcollector task. + + A subcollector task that generates a random list of integers for the given topic. + + Parameters + ---------- + topic : str + The topic for which data is being collected. + request_id : str + A unique identifier for the collection request. + + Returns + ------- + list + A list of random integers. + """ + random_list = [random.randint(0, 100) for _ in range(random.randint(0, 10))] + collector_gathering.s(random_list, request_id).apply_async() + +@app.task +def collector_gathering(request_id: str, results: list): + """ + Aggregate the results from all subcollectors and proceeds with the cleanup process. + + This task is intended to be used as a callback for a group of subcollector tasks. + + Parameters + ---------- + request_id : str + A unique identifier for the collection request. + results : list + Aggregated results from all subcollector tasks. + + """ + combined_result = [item for sublist in results for item in sublist] + cleanup.delay(combined_result, request_id) + +@app.task +def cleanup(data: list, request_id: str): + """ + Perform cleanup operations on the aggregated data from subcollectors. + + Parameters + ---------- + data : list + The aggregated data that needs to be cleaned. + request_id : str + A unique identifier for the collection request. + """ + print(f"Cleanup: {request_id}, Size: {len(data)}") + process.delay(data, request_id) + +@app.task +def process(data: list, request_id: str): + """ + Processes the cleaned data, typically involving further analysis or storage. + + Parameters + ---------- + data : list + The cleaned data ready for processing. + request_id : str + A unique identifier for the collection request. + + Returns + ------- + dict + A dictionary containing the request_id and the total item count. + """ + item_count = len(data) + print(f"Process: {request_id}, Total Items: {item_count}") + return {'request_id': request_id, 'item_count': item_count} diff --git a/main/tests/__init__.py b/main/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/main/tests/tests_tasks.py b/main/tests/tests_tasks.py new file mode 100644 index 0000000..4d890f4 --- /dev/null +++ b/main/tests/tests_tasks.py @@ -0,0 +1,92 @@ +import pytest +from unittest.mock import patch +from main.celery_app import app +from main.tasks import ( + collector_request, + generate_collector_request, +) +import logging + +# Configure logging to ensure visibility of task execution during test runs. +logging.basicConfig(level=logging.INFO) + +@pytest.fixture(scope='module') +def celery_config(): + """ + Provide Celery app configuration for testing. + + This fixture is responsible for setting up the Celery app with a specific + configuration suitable for test runs. It defines the broker and result backend + to use Redis and sets the task execution mode to always eager, which means + tasks will be executed locally and synchronously. + + Yields + ------ + dict + A dictionary containing configuration settings for the Celery application. + """ + return { + 'broker_url': 'redis://localhost:6379/0', + 'result_backend': 'redis://localhost:6379/0', + 'task_always_eager': True, + } + +@pytest.fixture(scope='module') +def celery_enable_logging(): + """ + Activate logging for Celery tasks during testing. + + This fixture ensures that Celery task logs are visible during test execution, + aiding in debugging and verifying task behavior. + + Returns + ------- + bool + True to enable Celery task logging, False otherwise. + """ + return True + +def test_generate_collector_request(): + """ + Validate that `generate_collector_request` produces a valid UUID string. + + Ensures the `generate_collector_request` function returns a string that + is expected to be a UUID, confirming the generation of unique request identifiers. + + Assertions + ---------- + Asserts the type and format of the return value from `generate_collector_request`. + """ + topic = "test_topicA" + request_id = generate_collector_request(topic) + assert isinstance(request_id, str), "The request_id should be a string." + +@patch('main.tasks.collector_gathering.s') +@patch('main.tasks.group') +def test_collector_request_triggers_sub_collectors(mock_group, mock_collector_gathering_s): + """ + Test the orchestration within `collector_request` to trigger subcollector tasks. + + This test verifies that the `collector_request` function correctly sets up + a group of subcollector tasks and designates `collector_gathering` as the callback + using Celery's chord primitive. It mocks the `group` method and the signature + of `collector_gathering` to intercept and assert their usage without actual task execution. + + Parameters + ---------- + mock_group : MagicMock + Mock object for Celery's `group` method. + mock_collector_gathering_s : MagicMock + Mock object for the `s()` signature method of `collector_gathering` task. + + Assertions + ---------- + Asserts that `group` is called with the correct tasks. + Asserts that `collector_gathering.s()` is called to prepare the callback signature. + """ + topic = "test_topic" + + collector_request(topic) + + mock_group.assert_called_once() + mock_collector_gathering_s.assert_called_once()