From 2f0d4c40a229bfdb4325de968c95768c1ca69827 Mon Sep 17 00:00:00 2001 From: Sidhant Kohli Date: Tue, 21 Nov 2023 12:17:49 -0800 Subject: [PATCH] fix: async source init (#120) Signed-off-by: Sidhant Kohli --- examples/source/async-source/Dockerfile | 54 +++++++++++++++ examples/source/async-source/Makefile | 8 +++ examples/source/async-source/README.md | 6 ++ examples/source/async-source/entry.sh | 4 ++ examples/source/async-source/example.py | 69 +++++++++++++++++++ .../async-source/pipeline-numaflow.yaml | 21 ++++++ examples/source/async-source/pyproject.toml | 17 +++++ pynumaflow/sourcer/async_server.py | 12 ++-- pyproject.toml | 2 +- 9 files changed, 188 insertions(+), 5 deletions(-) create mode 100644 examples/source/async-source/Dockerfile create mode 100644 examples/source/async-source/Makefile create mode 100644 examples/source/async-source/README.md create mode 100644 examples/source/async-source/entry.sh create mode 100644 examples/source/async-source/example.py create mode 100644 examples/source/async-source/pipeline-numaflow.yaml create mode 100644 examples/source/async-source/pyproject.toml diff --git a/examples/source/async-source/Dockerfile b/examples/source/async-source/Dockerfile new file mode 100644 index 00000000..fd0a621a --- /dev/null +++ b/examples/source/async-source/Dockerfile @@ -0,0 +1,54 @@ +#################################################################################################### +# builder: install needed dependencies +#################################################################################################### + +FROM python:3.10-slim-bullseye AS builder + +ENV PYTHONFAULTHANDLER=1 \ + PYTHONUNBUFFERED=1 \ + PYTHONHASHSEED=random \ + PIP_NO_CACHE_DIR=on \ + PIP_DISABLE_PIP_VERSION_CHECK=on \ + PIP_DEFAULT_TIMEOUT=100 \ + POETRY_VERSION=1.2.2 \ + POETRY_HOME="/opt/poetry" \ + POETRY_VIRTUALENVS_IN_PROJECT=true \ + POETRY_NO_INTERACTION=1 \ + PYSETUP_PATH="/opt/pysetup" \ + VENV_PATH="/opt/pysetup/.venv" + +ENV PATH="$POETRY_HOME/bin:$VENV_PATH/bin:$PATH" + +RUN apt-get update \ + && apt-get install --no-install-recommends -y \ + curl \ + wget \ + # deps for building python deps + build-essential \ + && apt-get install -y git \ + && apt-get clean && rm -rf /var/lib/apt/lists/* \ + \ + # install dumb-init + && wget -O /dumb-init https://github.com/Yelp/dumb-init/releases/download/v1.2.5/dumb-init_1.2.5_x86_64 \ + && chmod +x /dumb-init \ + && curl -sSL https://install.python-poetry.org | python3 - + +#################################################################################################### +# udf: used for running the udf vertices +#################################################################################################### +FROM builder AS udf + +WORKDIR $PYSETUP_PATH +COPY pyproject.toml ./ +RUN poetry install --no-cache --no-root && \ + rm -rf ~/.cache/pypoetry/ + +ADD . /app +WORKDIR /app + +RUN chmod +x entry.sh + +ENTRYPOINT ["/dumb-init", "--"] +CMD ["/app/entry.sh"] + +EXPOSE 5000 \ No newline at end of file diff --git a/examples/source/async-source/Makefile b/examples/source/async-source/Makefile new file mode 100644 index 00000000..be5fdf4e --- /dev/null +++ b/examples/source/async-source/Makefile @@ -0,0 +1,8 @@ +.PHONY: image +image: + docker build -t "quay.io/numaio/numaflow-python/async-source:v0.5.4" . +# Github CI runner uses platform linux/amd64. If your local environment don't, the image built by command above might not work +# under the CI E2E test environment. +# To build an image that supports multiple platforms(linux/amd64,linux/arm64) and push to quay.io, use the following command +# docker buildx build -t "quay.io/numaio/numaflow-python/async-source:v0.5.4" --platform linux/amd64,linux/arm64 . --push +# If command failed, refer to https://billglover.me/notes/build-multi-arch-docker-images/ to fix diff --git a/examples/source/async-source/README.md b/examples/source/async-source/README.md new file mode 100644 index 00000000..a0704d4d --- /dev/null +++ b/examples/source/async-source/README.md @@ -0,0 +1,6 @@ +# Example Python User Defined Source +A simple example of a user-defined source. The source maintains an array of messages and implements the Read, +Ack, and Pending methods. +The Read(x) method returns the next x number of messages in the array. +The Ack() method acknowledges the last batch of messages returned by Read(). +The Pending() method returns 0 to indicate that the simple source always has 0 pending messages. \ No newline at end of file diff --git a/examples/source/async-source/entry.sh b/examples/source/async-source/entry.sh new file mode 100644 index 00000000..073b05e3 --- /dev/null +++ b/examples/source/async-source/entry.sh @@ -0,0 +1,4 @@ +#!/bin/sh +set -eux + +python example.py diff --git a/examples/source/async-source/example.py b/examples/source/async-source/example.py new file mode 100644 index 00000000..1cde9e6e --- /dev/null +++ b/examples/source/async-source/example.py @@ -0,0 +1,69 @@ +from datetime import datetime +from collections.abc import AsyncIterable + +import aiorun + +from pynumaflow.sourcer import ( + ReadRequest, + Message, + AckRequest, + PendingResponse, + Offset, + AsyncSourcer, +) + + +class AsyncSource: + """ + AsyncSource is a class for User Defined Source implementation. + """ + + def __init__(self): + """ + to_ack_set: Set to maintain a track of the offsets yet to be acknowledged + read_idx : the offset idx till where the messages have been read + """ + self.to_ack_set = set() + self.read_idx = 0 + + async def read_handler(self, datum: ReadRequest) -> AsyncIterable[Message]: + """ + read_handler is used to read the data from the source and send the data forward + for each read request we process num_records and increment the read_idx to indicate that + the message has been read and the same is added to the ack set + """ + if self.to_ack_set: + return + + for x in range(datum.num_records): + yield Message( + payload=str(self.read_idx).encode(), + offset=Offset(offset=str(self.read_idx).encode(), partition_id="0"), + event_time=datetime.now(), + ) + self.to_ack_set.add(str(self.read_idx)) + self.read_idx += 1 + + async def ack_handler(self, ack_request: AckRequest): + """ + The ack handler is used acknowledge the offsets that have been read, and remove them + from the to_ack_set + """ + for offset in ack_request.offset: + self.to_ack_set.remove(str(offset.offset, "utf-8")) + + async def pending_handler(self) -> PendingResponse: + """ + The simple source always returns zero to indicate there is no pending record. + """ + return PendingResponse(count=0) + + +if __name__ == "__main__": + ud_source = AsyncSource() + grpc_server = AsyncSourcer( + read_handler=ud_source.read_handler, + ack_handler=ud_source.ack_handler, + pending_handler=ud_source.pending_handler, + ) + aiorun.run(grpc_server.start()) diff --git a/examples/source/async-source/pipeline-numaflow.yaml b/examples/source/async-source/pipeline-numaflow.yaml new file mode 100644 index 00000000..b828712f --- /dev/null +++ b/examples/source/async-source/pipeline-numaflow.yaml @@ -0,0 +1,21 @@ +apiVersion: numaflow.numaproj.io/v1alpha1 +kind: Pipeline +metadata: + name: async-source +spec: + vertices: + - name: in + source: + udsource: + container: + # A simple user-defined async source + image: quay.io/numaio/numaflow-python/async-source:v0.5.4 + imagePullPolicy: Always + limits: + readBatchSize: 2 + - name: out + sink: + log: {} + edges: + - from: in + to: out \ No newline at end of file diff --git a/examples/source/async-source/pyproject.toml b/examples/source/async-source/pyproject.toml new file mode 100644 index 00000000..2f0955ff --- /dev/null +++ b/examples/source/async-source/pyproject.toml @@ -0,0 +1,17 @@ +[tool.poetry] +name = "simple-source" +version = "0.2.4" +description = "" +authors = ["Numaflow developers"] + +[tool.poetry.dependencies] +python = "~3.10" +pynumaflow = "~0.5.4" +aiorun = "^2023.7" + + +[tool.poetry.dev-dependencies] + +[build-system] +requires = ["poetry-core>=1.0.0"] +build-backend = "poetry.core.masonry.api" diff --git a/pynumaflow/sourcer/async_server.py b/pynumaflow/sourcer/async_server.py index ded1cf57..72bf0e7e 100644 --- a/pynumaflow/sourcer/async_server.py +++ b/pynumaflow/sourcer/async_server.py @@ -137,7 +137,7 @@ async def AckFn( for offset in request.request.offsets: offsets.append(Offset(offset.offset, offset.partition_id)) try: - await self.__invoke_ack(ack_req=request) + await self.__invoke_ack(ack_req=offsets) except Exception as e: context.set_code(grpc.StatusCode.UNKNOWN) context.set_details(str(e)) @@ -145,12 +145,12 @@ async def AckFn( return source_pb2.AckResponse() - async def __invoke_ack(self, ack_req: AckRequest): + async def __invoke_ack(self, ack_req: list[Offset]): """ Invokes the Source Ack Function. """ try: - await self.__source_ack_handler(ack_req) + await self.__source_ack_handler(AckRequest(offsets=ack_req)) except Exception as err: _LOGGER.critical("UDFError, re-raising the error", exc_info=True) raise err @@ -182,7 +182,11 @@ async def PendingFn( async def __serve_async(self, server) -> None: source_pb2_grpc.add_SourceServicer_to_server( - AsyncSourcer(read_handler=self.__source_read_handler), + AsyncSourcer( + read_handler=self.__source_read_handler, + ack_handler=self.__source_ack_handler, + pending_handler=self.__source_pending_handler, + ), server, ) server.add_insecure_port(self.sock_path) diff --git a/pyproject.toml b/pyproject.toml index adf78afe..a429905d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "pynumaflow" -version = "0.5.3" +version = "0.5.4" description = "Provides the interfaces of writing Python User Defined Functions and Sinks for NumaFlow." authors = ["NumaFlow Developers"] readme = "README.md"