-
Notifications
You must be signed in to change notification settings - Fork 12
/
main.py
executable file
·89 lines (75 loc) · 2.29 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
#!/usr/bin/env python3
from typing import Callable
import sentry_sdk
from cryptofeed import FeedHandler
from cryptofeed.defines import TRADES
from decouple import config
from quant_tick.exchanges import (
Binance,
Bitfinex,
Bitflyer,
Bitmex,
# Bybit,
Coinbase,
Upbit,
)
from quant_tick.trades import (
CandleCallback,
NonSequentialIntegerTradeCallback,
SequentialIntegerTradeCallback,
SignificantTradeCallback,
TradeCallback,
)
sequential_integer_exchanges = {Binance: ["BTCUSDT"], Coinbase: ["BTC-USD"]}
non_sequential_integer_exchanges = {Bitfinex: ["BTCUSD"]}
other_exchanges = {
Bitmex: ["XBTUSD"],
# Bybit: ["BTCUSD"],
Bitflyer: ["BTC/JPY"],
Upbit: ["BTC-KRW"],
}
async def candles(candle: dict, timestamp: float) -> None:
"""Candles."""
print(candle)
def get_callback(exchange: any, significant_trade_filter: int = 1_000) -> Callable:
"""Get callback."""
if exchange == Bitflyer:
significant_trade_filter *= 100
elif exchange == Upbit:
significant_trade_filter *= 1000
return SignificantTradeCallback(
CandleCallback(candles, window_seconds=60),
window_seconds=60,
significant_trade_filter=significant_trade_filter,
)
if __name__ == "__main__":
sentry_sdk.init(config("SENTRY_DSN"), traces_sample_rate=1.0)
fh = FeedHandler()
for exchange, symbols in sequential_integer_exchanges.items():
callback = SequentialIntegerTradeCallback(get_callback(exchange))
fh.add_feed(
exchange(
symbols=symbols,
channels=[TRADES],
callbacks={TRADES: callback},
)
)
for exchange, symbols in non_sequential_integer_exchanges.items():
callback = NonSequentialIntegerTradeCallback(get_callback(exchange))
fh.add_feed(
exchange(
symbols=symbols,
channels=[TRADES],
callbacks={TRADES: callback},
)
)
for exchange, symbols in other_exchanges.items():
callback = TradeCallback(get_callback(exchange))
fh.add_feed(
exchange(
symbols=symbols,
channels=[TRADES],
callbacks={TRADES: callback},
)
)
fh.run()