This repository has been archived by the owner on Jan 12, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathapp.py
72 lines (55 loc) · 2.07 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import websocket
import os
import json
import logging
from kafka import KafkaProducer, producer, KafkaClient
from kafka.admin import KafkaAdminClient, NewTopic
logging.basicConfig(level=logging.NOTSET)
logger = logging.getLogger("stream-app")
topic = 'fin-stream-topic'
bootstrap_server = 'localhost:9092'
def setup_kafka_topic():
client = KafkaClient(bootstrap_servers=bootstrap_server)
future = client.cluster.request_update()
client.poll(future=future)
metadata = client.cluster
print(metadata.topics())
# create topic if not exist
if topic not in metadata.topics():
admin_client = KafkaAdminClient(
bootstrap_servers=bootstrap_server,
api_version=(3, 0, 0),
client_id='client'
)
topic_list = [NewTopic(name=topic, num_partitions=1, replication_factor=1)]
admin_client.create_topics(new_topics=topic_list, validate_only=False)
def setup_kafka_producers():
producer = KafkaProducer(bootstrap_servers=[bootstrap_server],
api_version=(3, 0, 0),
value_serializer=lambda m: json.dumps(m).encode('utf-8'))
return producer
def on_message(ws, message):
producer.send(topic, message)
info = f'MSG: {message} sent to TOPIC: {topic}'
logging.info(info)
def on_error(ws, error):
print(error)
def on_close(ws):
print("### closed ###")
def on_open(ws):
ws.send('{"type":"subscribe","symbol":"AAPL"}')
ws.send('{"type":"subscribe","symbol":"AMZN"}')
ws.send('{"type":"subscribe","symbol":"BINANCE:BTCUSDT"}')
ws.send('{"type":"subscribe","symbol":"IC MARKETS:1"}')
if __name__ == "__main__":
# setup kafka
setup_kafka_topic()
producer = setup_kafka_producers()
# run app
websocket.enableTrace(True)
ws = websocket.WebSocketApp("wss://ws.finnhub.io?token=" + os.environ['FINNHUB_API_TOKEN'],
on_message=on_message,
on_error=on_error,
on_close=on_close)
ws.on_open = on_open
ws.run_forever()