Skip to content

kpn/kstreams

This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Folders and files

NameName
Last commit message
Last commit date
Jan 6, 2025
Jan 6, 2025
Sep 5, 2024
Nov 18, 2024
Jan 6, 2025
Jan 6, 2025
Nov 27, 2024
Jul 8, 2024
Jan 6, 2025
Jul 21, 2022
Jul 16, 2024
Aug 14, 2024
Sep 5, 2024
Nov 27, 2024
Jan 6, 2025

Repository files navigation

Kstreams

kstreams is a library/micro framework to use with kafka. It has simple kafka streams implementation that gives certain guarantees, see below.

Build status codecov python version


Documentation: https://kpn.github.io/kstreams/


Installation

pip install kstreams

You will need a worker, we recommend aiorun

pip install aiorun

Usage

import aiorun
from kstreams import create_engine, ConsumerRecord


stream_engine = create_engine(title="my-stream-engine")

@stream_engine.stream("local--kstream")
async def consume(cr: ConsumerRecord):
    print(f"Event consumed: headers: {cr.headers}, payload: {cr.value}")


async def produce():
    payload = b'{"message": "Hello world!"}'

    for i in range(5):
        metadata = await stream_engine.send("local--kstreams", value=payload)
        print(f"Message sent: {metadata}")


async def start():
    await stream_engine.start()
    await produce()


async def shutdown(loop):
    await stream_engine.stop()


if __name__ == "__main__":
    aiorun.run(start(), stop_on_unhandled_errors=True, shutdown_callback=shutdown)

Features

  • Produce events
  • Consumer events with Streams
  • Subscribe to topics by pattern
  • Prometheus metrics and custom monitoring
  • TestClient
  • Custom Serialization and Deserialization
  • Easy to integrate with any async framework. No tied to any library!!
  • Yield events from streams
  • Opentelemetry Instrumentation
  • Middlewares
  • Hooks (on_startup, on_stop, after_startup, after_stop)
  • Store (kafka streams pattern)
  • Stream Join
  • Windowing

Development

This repo requires the use of poetry instead of pip. Note: If you want to have the virtualenv in the same path as the project first you should run poetry config --local virtualenvs.in-project true

To install the dependencies just execute:

poetry install

Then you can activate the virtualenv with

poetry shell

Run test:

./scripts/test

Run code formatting with ruff:

./scripts/format

Commit messages

We use conventional commits for the commit message.

The use of commitizen is recommended. Commitizen is part of the dev dependencies.

cz commit