Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Develop2 #22

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
35 changes: 35 additions & 0 deletions DemoAPI/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Flask Web Gateway

[This code sample](https://github.com/quixio/quix-samples/tree/develop/python/sources/web_api_gateway) demonstrates how to run a Flask web gateway and use it to publish to a Kafka topic via HTTP POST requests.

## How to run

Create a [Quix](https://portal.platform.quix.ai/self-sign-up?xlink=github) account or log-in and visit the Samples to use this project

Clicking `Edit code` on the Sample, forks the project to your own Git repo so you can customize it before deploying.

Once you have deployed the Sample within your project there are three things to bear in mind:

1. Your data JSON must contain a "sessionId" key
2. Keys and values must be Strings or Bytes.
3. The Flask gateway endpoint must end with `/data/`. E.g `https://gateway-example-develop.deployments.quix.io/data/`

You can test your endpoint by sending a message via curl:
`curl -X POST -H "Content-Type: application/json" -d '{"sessionId": "000001", "name": "Tony Hawk", "purchase": "skateboard" }' https://<your-deployment-url>/data/
`

## Environment variables

The code sample uses the following environment variables:

- **output**: This is the output topic for hello world data.

## Contribute

Submit forked projects to the Quix [GitHub](https://github.com/quixio/quix-samples) repo. Any new project that we accept will be attributed to you and you'll receive $200 in Quix credit.

## Open source

This project is open source under the Apache 2.0 license and available in our [GitHub](https://github.com/quixio/quix-samples) repo.

Please star us and mention us on social to show your appreciation.
11 changes: 11 additions & 0 deletions DemoAPI/app.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
name: DemoAPI
language: python
variables:
- name: output
inputType: OutputTopic
description: This is the output topic for hello world data
defaultValue: hello-world-source
required: true
dockerfile: dockerfile
runEntryPoint: main.py
defaultFile: main.py
23 changes: 23 additions & 0 deletions DemoAPI/dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
FROM python:3.11.1-slim-buster

# Set environment variables to non-interactive and unbuffered output
ENV DEBIAN_FRONTEND=noninteractive \
PYTHONUNBUFFERED=1 \
PYTHONIOENCODING=UTF-8

# Set the working directory inside the container
WORKDIR /app

# Copy only the requirements file(s) to leverage Docker cache
# Assuming all requirements files are in the root or subdirectories
COPY ./requirements.txt ./

# Install dependencies
# Adding `--no-cache-dir` to avoid storing unnecessary files and potentially reduce image size
RUN pip install --no-cache-dir -r requirements.txt

# Copy the rest of the application
COPY . .

# Set the command to run your application
ENTRYPOINT ["python3", "main.py"]
42 changes: 42 additions & 0 deletions DemoAPI/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import os
import datetime
import json
from flask import Flask, request, Response
from waitress import serve

from setup_logging import get_logger

from quixstreams import Application

# for local dev, load env vars from a .env file
from dotenv import load_dotenv
load_dotenv()

quix_app = Application()
topic = quix_app.topic(os.environ["output"])
producer = quix_app.get_producer()

logger = get_logger()

app = Flask(__name__)


@app.route("/data/", methods=['POST'])
def post_data():

data = request.json

print(data)

logger.info(f"{str(datetime.datetime.utcnow())} posted.")

producer.produce(topic.name, json.dumps(data), "hello-world-stream")

response = Response(status=200)
response.headers.add('Access-Control-Allow-Origin', '*')

return response


if __name__ == '__main__':
serve(app, host="0.0.0.0", port=80)
5 changes: 5 additions & 0 deletions DemoAPI/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
quixstreams<2.6
flask
flask_cors
waitress
python-dotenv
23 changes: 23 additions & 0 deletions DemoAPI/setup_logging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import logging


def get_logger():

logging.basicConfig(
level=logging.INFO,
format='[%(asctime)s] [%(levelname)s]: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)

# Set up logging
logger = logging.getLogger('waitress')
logger.setLevel(logging.INFO) # Set to DEBUG for more detailed output
logger.propagate = False # Prevent the log messages from propagating to the root logger

# Create handlers (console and file handler for example)
console_handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)

return logger
28 changes: 28 additions & 0 deletions f1-demo-transformation/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Starter transformation

[This code sample](https://github.com/quixio/quix-samples/tree/develop/python/transformations/starter_transformation) demonstrates how to consume data from a topic, apply a simple transformation to that data and publish the result to an output topic (while printing content to the console output).

Modify the Python code to transform your data on the fly.

## How to run

Create a [Quix](https://portal.platform.quix.ai/self-sign-up?xlink=github) account or log-in and visit the Samples to use this project.

Clicking `Edit code` on the Sample, forks the project to your own Git repo so you can customize it before deploying.

## Environment variables

The code sample uses the following environment variables:

- **input**: Name of the input topic to listen to.
- **output**: Name of the output topic to write to.

## Contribute

Submit forked projects to the Quix [GitHub](https://github.com/quixio/quix-samples) repo. Any new project that we accept will be attributed to you and you'll receive $200 in Quix credit.

## Open source

This project is open source under the Apache 2.0 license and available in our [GitHub](https://github.com/quixio/quix-samples) repo.

Please star us and mention us on social to show your appreciation.
16 changes: 16 additions & 0 deletions f1-demo-transformation/app.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
name: f1-demo-transformation
language: Python
variables:
- name: input
inputType: InputTopic
description: Name of the input topic to listen to.
defaultValue: raw
required: false
- name: output
inputType: OutputTopic
description: Name of the output topic to write to.
defaultValue: f1-demo-normalised
required: false
dockerfile: dockerfile
runEntryPoint: main.py
defaultFile: main.py
23 changes: 23 additions & 0 deletions f1-demo-transformation/dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
FROM python:3.11.1-slim-buster

# Set environment variables to non-interactive and unbuffered output
ENV DEBIAN_FRONTEND=noninteractive \
PYTHONUNBUFFERED=1 \
PYTHONIOENCODING=UTF-8

# Set the working directory inside the container
WORKDIR /app

# Copy only the requirements file(s) to leverage Docker cache
# Assuming all requirements files are in the root or subdirectories
COPY ./requirements.txt ./

# Install dependencies
# Adding `--no-cache-dir` to avoid storing unnecessary files and potentially reduce image size
RUN pip install --no-cache-dir -r requirements.txt

# Copy the rest of the application
COPY . .

# Set the command to run your application
ENTRYPOINT ["python3", "main.py"]
60 changes: 60 additions & 0 deletions f1-demo-transformation/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import os
from quixstreams import Application

import uuid
import json
import datetime
# for local dev, load env vars from a .env file
from dotenv import load_dotenv
load_dotenv()

app = Application(consumer_group=str(uuid.uuid4()), auto_offset_reset="earliest")

input_topic = app.topic(os.environ["input"])
output_topic = app.topic(os.environ["output"])

sdf = app.dataframe(input_topic)

sdf = sdf.apply(lambda row: row["payload"], expand=True)

def transpose(row: dict):

new_row = {
"time": row["time"]
}

for key in row["values"]:
new_row[row["name"] + "-" + key] = row["values"][key]

return new_row

sdf = sdf.apply(transpose)

# put transformation logic here
# see docs for what you can do
# https://quix.io/docs/get-started/quixtour/process-threshold.html

sdf = sdf.hopping_window(10000, 250, 1000).reduce(lambda state, row: {**state, **row}, lambda row: row).final()

sdf = sdf.apply(lambda row: {
"time": row["start"],
**row["value"]
})

#sdf["time"] = sdf["time"].apply(lambda epoch: str(datetime.datetime.fromtimestamp(epoch / 1E9)))


sdf = sdf[sdf.contains("accelerometer-x")]

sdf["accelerometer-total"] = sdf["accelerometer-x"].abs() + sdf["accelerometer-y"].abs() + sdf["accelerometer-z"].abs()

sdf = sdf[["time", "accelerometer-x", "accelerometer-y", "accelerometer-z", "accelerometer-total"]]

sdf = sdf[sdf["accelerometer-x"] > 0]

sdf = sdf.update(lambda row: print(list(row.values())))

sdf = sdf.to_topic(output_topic)

if __name__ == "__main__":
app.run(sdf)
2 changes: 2 additions & 0 deletions f1-demo-transformation/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
quixstreams<2.7
python-dotenv
Loading