Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add KafkaMessageQueue #148

Merged
merged 29 commits into from
Jul 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 183 additions & 0 deletions examples/kafka/README.md
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Future note: Somehow it feels like our examples should be more simple. Feels a little crazy to write this much code for a toy example. I wonder how much of this is worth baking into other abstractions in the library

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree! I think for simple local launch/single examples its easy.

But when looking to deploy these with some orchestration via docker-compose or kubernetes then I am not sure if it can be as easy as a single script.

It would be really nice to simplify the process of building a multi-agent app where we do "intend" to deploy the microservices on their own.

The pattern I've adopted here is:

  • build a multi-agent app Python package
  • that defines all of your microservices and launches them in their associated script
  • this way we can have a single docker image and just change the command for that image to launch the service that you want to launch

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added an issue to track this for us: #150 😅

Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
# Using Apache Kafka as the MessageQueue

The examples contained in this subdirectory make use of the Apache Kafka integration
within `llama-agents`.

## Installation

Simply install `llama-agents` with the `kafka` extra:

```sh
# using pip install
pip install llama-agents[kafka]

# using poetry
poetry add llama-agents -E "kafka"
```

A virtual environment with this installation of `llama-agents` is what will
be needed to run the example scripts in `simple-scripts/`.

## Usage Pattern

```python
from llama_agents.message_queue.apache_kafka import KafkaMessageQueue

message_queue = KafkaMessageQueue(
url=...
) # if no url is supplied the default localhost:9092 is used
```

## Examples

### Simple Scripts

A couple of scripts using `LocalLauncher` and a `LocalServer` with
`KafkaMessageQueue` (rather than `SimpleMessageQueue`) are included in this
subdirectory.

Before running any of these scrtips we first need to have Kafka cluster running.
For a quick setup, we recommend using the official Kafka community [docker image](https://hub.docker.com/r/apache/kafka):

```sh
docker run -p 9092:9092 apache/kafka:3.7.1
```

With our Kafka broker running, we can now run our example scripts.

```sh
# using LocalLauncher
python ./simple-scripts/local_launcher_human_single.py
```

The script above will build a simple multi-agent app, connect it to the Kafka
message queue, and subsequently send the specified task.

### Example App: Pig-Latin Translator

In this example, we build a multi-agent app for translating simple Pig-Latin,
where when given a sentence, all words in the sentence are modified with the
following two steps:

1. the first letter is moved to the end of the word
2. the suffix "ay" is added to the end of the word

For example: "hi how are you" becomes "eyhay owhay siay tiay oinggay" in simple
Pig-Latin.

The multi-agent system translate's simple Pig-Latin text by reversing the
previously mentioned two steps. It does so by using two agents that work in
sequence: `remove_ay_agent` and `correct_first_character_agent`.

This multi-agent system also features a `TaskResultService` that is a consumer
of messages containing the final `TaskResult` data. That is, when a task is
completed, the control plane sends a message containing the results of the task
to a consumer that subscribes to the message type "human". This is precisely
what `TaskResultService` is, and it consumes these messages by appending the
results to a `task_results.jsonl` file that is stored in a `task_results` folder
that gets created in the directory from which the service was launched.

#### Launching Without Docker

As with running our example simple scripts above, we need to standup our
Kafka node manually:

```sh
docker run -p 9092:9092 apache/kafka:3.7.1
```

Next, in order to launch this multi-agent system, we first need to set the
required environment variables. To do that fill in the provided
`template.env.local` file found in the `pig-latin-translation/` folder. After filling
in the file rename it to `.env.local` (i.e., remove "template" from the name)
and the run the commands that follow.

```sh
# set environment variables
cd pig-latin-translation
set -a && source .env.local

# activate the project virtual env
poetry shell && poetry install
```

Finally to launch the example multi-agent app:

```sh
python pig_lating_translation/local_launcher.py
```

Once launched, we can send tasks to our multi-agent system using the
`LlamaAgentsClient` (note: the code below introduce static delay to handle
asynchronous call for quick test purpose only):

```python
from llama_agents import LlamaAgentsClient
import time

client = LlamaAgentsClient("http://0.0.0.0:8001")
task_id = client.create_task("lamaindexlay siay hetay estbay")
time.sleep(10)
task_result = client.get_task_result(task_id)
print(task_result.result)
```

#### Launching With Docker

_Prerequisites_: Must have docker installed. (See
[here](https://docs.docker.com/get-docker/) for how to install Docker Desktop
which comes with `docker-compose`.)

**NOTE:** In this example, we don't need to run the Kafka server manually. So you
can go ahead and shutdown the Kafka docker container that we had running in
previous launch if you haven't yet done so. The Kafka server is bundled within
the multi-agent deployment defined in the `docker-compose.yaml` file.

To Launch with Docker, this example makes use of `docker-compose` that will take
care of launching the individual services (and building a default bridge network
so that the services/containers can communicate with one another by name.).

Before building the docker image and launching the services (as with the case
for launching without Docker), we first need to set the required environment
variables. Fill in the values in the `template.env.docker` file and after doing so rename
the file to `.env.docker`. Note there are some variables there that we recommend
not modifying as they are used to the service definitions establisehed in the
`docker_compose.yml`.

This example is provided without a `poetry.lock` file as recommended in the
[poetry documentation for library developers](https://python-poetry.org/docs/basic-usage/#as-a-library-developer).
Before running docker-compose the first time, we must create the `poetry.lock`
file. If you are coming from the previous section where we Launched Without
Docker, then you have obtained the lock file after running `poetry install`.
If not, then use the command below

```sh
# while in pig-latin-translation/
poetry install
```

To launch the services we now use the `docker-compose` command line tool.

```sh
docker-compose up --build
```

This command will start the servers in sequence: first the Kafka service,
then the control plane, followed by the agent services and the human consumer
service. This sequencing is required since the later services depend must register
to the message queue and control plane (and they need to be up and running before
being able to do so).

Once all the services are up and running, we can send tasks to our multi-agent
system:

```python
from llama_agents import LlamaAgentsClient
import time

client = LlamaAgentsClient("http://0.0.0.0:8001")
task_id = client.create_task("lamaindexlay siay hetay estbay")
time.sleep(10)
task_result = client.get_task_result(task_id)
print(task_result.result)
```
11 changes: 11 additions & 0 deletions examples/kafka/pig-latin-translation/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
.env.*
.env
poetry.lock
index.html.*
index.html
task_results
.ipynb_checkpoints/
secrets.yaml
Dockerfile.local
docker-compose.local.yml
pyproject.local.toml
49 changes: 49 additions & 0 deletions examples/kafka/pig-latin-translation/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
FROM --platform=linux/amd64 python:3.10-slim as builder

WORKDIR /app

ENV POETRY_VERSION=1.7.1

# Install libraries for necessary python package builds
RUN apt-get update && apt-get --no-install-recommends install build-essential python3-dev libpq-dev -y && \
pip install --no-cache-dir --upgrade pip && \
pip install --no-cache-dir --upgrade poetry==${POETRY_VERSION}

# Install ssh
RUN apt-get -yq update && apt-get -yqq install ssh

# Configure Poetry
ENV POETRY_CACHE_DIR=/tmp/poetry_cache
ENV POETRY_NO_INTERACTION=1
ENV POETRY_VIRTUALENVS_IN_PROJECT=true
ENV POETRY_VIRTUALENVS_CREATE=true

# Install dependencies
COPY ./poetry.lock ./pyproject.toml ./

RUN mkdir -p -m 0600 ~/.ssh && ssh-keyscan github.com >> ~/.ssh/known_hosts
RUN --mount=type=secret,id=id_ed25519,dst=/root/.ssh/id_ed25519 poetry install --no-cache --no-root -vvv

RUN poetry install --no-cache --no-root

FROM --platform=linux/amd64 python:3.10-slim as runtime

# Install wget for healthcheck
RUN apt-get update && apt-get install -y wget

RUN apt-get update -y && \
apt-get install --no-install-recommends libpq5 -y && \
rm -rf /var/lib/apt/lists/* # Install libpq for psycopg2

RUN groupadd -r appuser && useradd --no-create-home -g appuser -r appuser
USER appuser

WORKDIR /app

ENV VIRTUAL_ENV=/app/.venv
COPY --from=builder ${VIRTUAL_ENV} ${VIRTUAL_ENV}
ENV PATH="${VIRTUAL_ENV}/bin:${PATH}"

# Copy source code
COPY ./logging.ini ./logging.ini
COPY ./pig_latin_translation ./pig_latin_translation
1 change: 1 addition & 0 deletions examples/kafka/pig-latin-translation/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# Pig-Latin Translation
142 changes: 142 additions & 0 deletions examples/kafka/pig-latin-translation/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
version: "3"
services:
kafka:
image: apache/kafka:3.7.1
hostname: kafka
container_name: kafka
ports:
- "9092:9092"
env_file:
- .env.docker
environment:
KAFKA_NODE_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT"
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT_HOST://localhost:9092,PLAINTEXT://kafka:19092"
KAFKA_PROCESS_ROLES: "broker,controller"
KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka:29093"
KAFKA_LISTENERS: "CONTROLLER://:29093,PLAINTEXT_HOST://:9092,PLAINTEXT://:19092"
KAFKA_INTER_BROKER_LISTENER_NAME: "PLAINTEXT"
KAFKA_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_LOG_DIRS: "/tmp/kraft-combined-logs"
healthcheck:
test: nc -z localhost 9092 || exit -1
start_period: 15s
interval: 30s
timeout: 10s
retries: 5
control_plane:
image: pig_latin_translation:latest
command: sh -c "python -m pig_latin_translation.core_services.control_plane"
env_file:
- .env.docker
ports:
- "8001:8001"
volumes:
- ./pig_latin_translation:/app/pig_latin_translation # load local code change to container without the need of rebuild
- ./logging.ini:/app/logging.ini
depends_on:
kafka:
condition: service_healthy
platform: linux/amd64
build:
context: .
dockerfile: ./Dockerfile
secrets:
- id_ed25519
healthcheck:
test: wget --no-verbose --tries=1 http://0.0.0.0:8001/ || exit 1
interval: 30s
retries: 5
start_period: 20s
timeout: 10s
ay_agent:
image: pig_latin_translation:latest
command: sh -c "python -m pig_latin_translation.agent_services.remove_ay_agent"
env_file:
- ./.env.docker
ports:
- "8002:8002"
volumes:
- ./pig_latin_translation:/app/pig_latin_translation # load local code change to container without the need of rebuild
- ./logging.ini:/app/logging.ini
platform: linux/amd64
depends_on:
kafka:
condition: service_healthy
control_plane:
condition: service_healthy
build:
context: .
dockerfile: ./Dockerfile
secrets:
- id_ed25519
healthcheck:
test: wget --no-verbose --tries=1 http://0.0.0.0:8002/ || exit 1
interval: 30s
retries: 5
start_period: 20s
timeout: 10s
first_char_agent:
image: pig_latin_translation:latest
command: sh -c "python -m pig_latin_translation.agent_services.correct_first_character_agent"
env_file:
- ./.env.docker
ports:
- "8003:8003"
volumes:
- ./pig_latin_translation:/app/pig_latin_translation # load local code change to container without the need of rebuild
- ./logging.ini:/app/logging.ini
depends_on:
kafka:
condition: service_healthy
control_plane:
condition: service_healthy
platform: linux/amd64
build:
context: .
dockerfile: ./Dockerfile
secrets:
- id_ed25519
healthcheck:
test: wget --no-verbose --tries=1 http://0.0.0.0:8003/ || exit 1
interval: 30s
retries: 5
start_period: 20s
timeout: 10s
human_consumer:
image: pig_latin_translation:latest
command: sh -c "python -m pig_latin_translation.additional_services.human_consumer"
env_file:
- ./.env.docker
ports:
- "8004:8004"
volumes:
- ./pig_latin_translation:/app/pig_latin_translation # load local code change to container without the need of rebuild
- ./logging.ini:/app/logging.ini
- ./task_results:/app/task_results
platform: linux/amd64
depends_on:
kafka:
condition: service_healthy
control_plane:
condition: service_healthy
build:
context: .
dockerfile: ./Dockerfile
secrets:
- id_ed25519
healthcheck:
test: wget --no-verbose --tries=1 http://0.0.0.0:8004/ || exit 1
interval: 30s
retries: 5
start_period: 20s
timeout: 10s
volumes:
kafka:
secrets:
id_ed25519:
file: ~/.ssh/id_ed25519
Loading
Loading