Skip to content

Commit

Permalink
Update binance futures, fix bug in poloniex (#460)
Browse files Browse the repository at this point in the history
* Update binance futures, fix bug in poloniex

* Flake8 additions
  • Loading branch information
bmoscon authored Apr 2, 2021
1 parent 05b66bd commit d1c9fb6
Show file tree
Hide file tree
Showing 7 changed files with 43 additions and 11 deletions.
5 changes: 5 additions & 0 deletions cryptofeed/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
'''
import asyncio
from contextlib import asynccontextmanager
import logging
import time
from typing import Callable, Union, List
import uuid
Expand All @@ -14,6 +15,9 @@
import websockets


LOG = logging.getLogger('feedhandler')


class AsyncConnection:
def __init__(self, address: Union[str, List[str]], identifier: str, delay: float = 1.0, sleep: float = 0.0, **kwargs):
"""
Expand Down Expand Up @@ -58,6 +62,7 @@ async def connect(self):
if self.conn_type == "ws":
if self.raw_cb:
await self.raw_cb(None, time.time(), self.uuid, connect=self.address)
LOG.debug("Connecting (websocket) to %s", self.address)
self.conn = await websockets.connect(self.address, **self.kwargs)

try:
Expand Down
15 changes: 12 additions & 3 deletions cryptofeed/exchange/binance.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,22 @@ def _address(self) -> Union[str, Dict]:
address = self.ws_endpoint + '/stream?streams='

for chan in self.channels if not self.subscription else self.subscription:
if normalize_channel(self.id, chan) == OPEN_INTEREST:
normalized_chan = normalize_channel(self.id, chan)

if normalize_channel == OPEN_INTEREST:
continue

for pair in self.symbols if not self.subscription else self.subscription[chan]:
if normalize_channel(self.id, chan) == CANDLES:
if normalized_chan == CANDLES:
chan = f"{chan}{self.candle_interval}"
pair = pair.lower()

# for everything but premium index the symbols need to be lowercase.
if pair.startswith("p"):
if normalized_chan != CANDLES:
raise ValueError("Premium Index Symbols only allowed on Candle data feed")
else:
pair = pair.lower()

stream = f"{pair}@{chan}/"
address += stream
counter += 1
Expand Down
2 changes: 2 additions & 0 deletions cryptofeed/exchange/binance_futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,5 +104,7 @@ async def message_handler(self, msg: str, conn: AsyncConnection, timestamp: floa
await self._liquidations(msg, timestamp)
elif msg_type == 'markPriceUpdate':
await self._funding(msg, timestamp)
elif msg['e'] == 'kline':
await self._candle(msg, timestamp)
else:
LOG.warning("%s: Unexpected message received: %s", self.id, msg)
14 changes: 11 additions & 3 deletions cryptofeed/exchange/poloniex.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,17 @@ def __reset(self):
self.seq_no = {}

async def _ticker(self, msg: dict, timestamp: float):
# currencyPair, last, lowestAsk, highestBid, percentChange, baseVolume,
# quoteVolume, isFrozen, 24hrHigh, 24hrLow
pair_id, _, ask, bid, _, _, _, _, _, _ = msg
"""
Format:
currencyPair, last, lowestAsk, highestBid, percentChange, baseVolume,
quoteVolume, isFrozen, 24hrHigh, 24hrLow, postOnly, maintenance mode
The postOnly field indicates that new orders posted to the market must be non-matching orders (no taker orders).
Any orders that would match will be rejected. Maintenance mode indicates that maintenace is being performed
and orders will be rejected
"""
pair_id, _, ask, bid, _, _, _, _, _, _, _, _ = msg
if pair_id not in self.pair_mapping:
# Ignore new trading pairs that are added during long running sessions
return
Expand Down
5 changes: 3 additions & 2 deletions cryptofeed/standards.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,8 @@ def timestamp_normalize(exchange, ts):
OKEX: ORDER_INFO
},
CANDLES: {
BINANCE: 'kline_'
BINANCE: 'kline_',
BINANCE_FUTURES: 'kline_',
}
}

Expand Down Expand Up @@ -335,7 +336,7 @@ def normalize_channel(exchange: str, feed: str) -> str:
if exchange in entries:
if entries[exchange] == feed:
return chan
return None
raise ValueError('Unable to normalize channel %s', feed)


def is_authenticated_channel(channel: str) -> bool:
Expand Down
9 changes: 8 additions & 1 deletion cryptofeed/symbols.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,14 @@ def binance_us_symbols() -> Dict[str, str]:


def binance_futures_symbols() -> Dict[str, str]:
return _binance_symbols('https://fapi.binance.com/fapi/v1/exchangeInfo', BINANCE_FUTURES)
base = _binance_symbols('https://fapi.binance.com/fapi/v1/exchangeInfo', BINANCE_FUTURES)
add = {}
for symbol, orig in base.items():
if "_" in orig:
continue
add[f"{symbol}-PINDEX"] = f"p{orig}"
base.update(add)
return base


def binance_delivery_symbols() -> Dict[str, str]:
Expand Down
4 changes: 2 additions & 2 deletions examples/demo.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from cryptofeed.symbols import binance_symbols
from cryptofeed import FeedHandler
from cryptofeed.callback import CandleCallback, BookCallback, FundingCallback, TickerCallback, TradeCallback, FuturesIndexCallback, OpenInterestCallback
from cryptofeed.callback import BookCallback, FundingCallback, TickerCallback, TradeCallback, FuturesIndexCallback, OpenInterestCallback
from cryptofeed.defines import CANDLES, BID, ASK, BLOCKCHAIN, COINBASE, FUNDING, GEMINI, L2_BOOK, L3_BOOK, OPEN_INTEREST, TICKER, TRADES, VOLUME, FUTURES_INDEX, BOOK_DELTA
from cryptofeed.exchanges import (FTX, Binance, BinanceFutures, Bitfinex, Bitflyer, Bitmax, Bitmex, Bitstamp, Bittrex, Coinbase, Gateio,
HitBTC, Huobi, HuobiDM, HuobiSwap, Kraken, OKCoin, OKEx, Poloniex, Bybit)
Expand Down Expand Up @@ -104,7 +104,7 @@ def main():
f.add_feed(Bitmax(symbols=['XRP-USDT', 'BTC-USDT'], channels=[L2_BOOK], callbacks={TRADES: trade, L2_BOOK: book}))
f.add_feed(Bitflyer(symbols=['BTC-JPY'], channels=[L2_BOOK, TRADES, TICKER], callbacks={L2_BOOK: book, BOOK_DELTA: delta, TICKER: ticker, TRADES: trade}))
f.add_feed(BinanceFutures(symbols=['BTC-USDT'], channels=[TICKER], callbacks={TICKER: ticker}))
f.add_feed(Binance(candle_closed_only=True, symbols=['BTC-USDT'], channels=[CANDLES], callbacks={CANDLES: CandleCallback(candle_callback)}))
f.add_feed(BinanceFutures(subscription={TRADES: ['BTC-USDT'], CANDLES: ['BTC-USDT', 'BTC-USDT-PINDEX']}, callbacks={CANDLES: candle_callback, TRADES: trade}))

f.run()

Expand Down

0 comments on commit d1c9fb6

Please sign in to comment.