Skip to content

Commit

Permalink
Clean up example script (#747)
Browse files Browse the repository at this point in the history
  • Loading branch information
kerinin authored Sep 11, 2023
1 parent a63c372 commit 14f7837
Showing 1 changed file with 14 additions and 12 deletions.
26 changes: 14 additions & 12 deletions examples/event-api/server.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#!/usr/bin/env python

import json, asyncio, time, uuid, asyncio
import pyarrow as pa
import kaskada as kd
from aiohttp import web

Expand All @@ -12,7 +13,12 @@ async def main():

# Initialize event source with schema from historical data.
events = kd.sources.PyDict(
rows = [{"ts": start, "user": "user_1", "request_id": "12345678-1234-5678-1234-567812345678"}],
rows = [],
schema = pa.schema([
pa.field("ts", pa.float64()),
pa.field("user", pa.string()),
pa.field("request_id", pa.string()),
]),
time_column = "ts",
key_column = "user",
time_unit = "s",
Expand All @@ -29,6 +35,7 @@ async def main():
"ts": events.col("ts"),
}))


# Receive JSON messages in real-time
async def handle_http(req: web.Request) -> web.Response:
data = await req.json()
Expand All @@ -37,20 +44,18 @@ async def handle_http(req: web.Request) -> web.Response:
data["ts"] = time.time()

# Create a future so the aggregated result can be returned in the API response
request_id = str(uuid.uuid4())
requestmap[request_id] = asyncio.Future()
request_id = str(uuid.uuid4())
fut = asyncio.Future()
requestmap[request_id] = fut
data["request_id"] = request_id

# Send the event to Kaskada to be processed as a stream
print(f"Got data: {data}")
events.add_rows(data)

# Wait for the response to be completed by the Kaskada handler
print(f"Waiting for response")
resp = await requestmap[request_id]
resp = await fut

# Return result as the response body
print(f"Sending response: {resp}")
return web.Response(text = json.dumps(resp))

# Setup the async web server
Expand All @@ -63,15 +68,12 @@ async def handle_http(req: web.Request) -> web.Response:


# Handle each conversation as it occurs
print(f"Waiting for events...")
async for row in output.run(materialize=True).iter_rows_async():
async for row in output.run_iter(kind='row', mode='live'):
try:
# Ignore historical rows
if row["ts"] <= start:
continue

print(f"Recieved from K*: {row}")

request_id = row["request_id"]
fut = requestmap.pop(request_id, None)
if fut == None:
Expand All @@ -81,7 +83,7 @@ async def handle_http(req: web.Request) -> web.Response:
fut.set_result(row["response"])

except Exception as e:
print(f"Failed to handle live event from Kaskada: {e}")
print(f"Failed to handle live event from Kaskada: {e}")

# Wait for web server to terminate gracefully
await runner.cleanup()
Expand Down

0 comments on commit 14f7837

Please sign in to comment.