diff --git a/cryptofeed/connection.py b/cryptofeed/connection.py index a91df3fde..cb00f2e52 100644 --- a/cryptofeed/connection.py +++ b/cryptofeed/connection.py @@ -6,6 +6,7 @@ ''' import asyncio from contextlib import asynccontextmanager +import logging import time from typing import Callable, Union, List import uuid @@ -14,6 +15,9 @@ import websockets +LOG = logging.getLogger('feedhandler') + + class AsyncConnection: def __init__(self, address: Union[str, List[str]], identifier: str, delay: float = 1.0, sleep: float = 0.0, **kwargs): """ @@ -58,6 +62,7 @@ async def connect(self): if self.conn_type == "ws": if self.raw_cb: await self.raw_cb(None, time.time(), self.uuid, connect=self.address) + LOG.debug("Connecting (websocket) to %s", self.address) self.conn = await websockets.connect(self.address, **self.kwargs) try: diff --git a/cryptofeed/exchange/binance.py b/cryptofeed/exchange/binance.py index ac01c5981..115d0748d 100644 --- a/cryptofeed/exchange/binance.py +++ b/cryptofeed/exchange/binance.py @@ -53,13 +53,22 @@ def _address(self) -> Union[str, Dict]: address = self.ws_endpoint + '/stream?streams=' for chan in self.channels if not self.subscription else self.subscription: - if normalize_channel(self.id, chan) == OPEN_INTEREST: + normalized_chan = normalize_channel(self.id, chan) + + if normalize_channel == OPEN_INTEREST: continue for pair in self.symbols if not self.subscription else self.subscription[chan]: - if normalize_channel(self.id, chan) == CANDLES: + if normalized_chan == CANDLES: chan = f"{chan}{self.candle_interval}" - pair = pair.lower() + + # for everything but premium index the symbols need to be lowercase. + if pair.startswith("p"): + if normalized_chan != CANDLES: + raise ValueError("Premium Index Symbols only allowed on Candle data feed") + else: + pair = pair.lower() + stream = f"{pair}@{chan}/" address += stream counter += 1 diff --git a/cryptofeed/exchange/binance_futures.py b/cryptofeed/exchange/binance_futures.py index 8d2258c63..9f7c0cd6f 100644 --- a/cryptofeed/exchange/binance_futures.py +++ b/cryptofeed/exchange/binance_futures.py @@ -104,5 +104,7 @@ async def message_handler(self, msg: str, conn: AsyncConnection, timestamp: floa await self._liquidations(msg, timestamp) elif msg_type == 'markPriceUpdate': await self._funding(msg, timestamp) + elif msg['e'] == 'kline': + await self._candle(msg, timestamp) else: LOG.warning("%s: Unexpected message received: %s", self.id, msg) diff --git a/cryptofeed/exchange/poloniex.py b/cryptofeed/exchange/poloniex.py index b4fb1b366..8ab20b085 100644 --- a/cryptofeed/exchange/poloniex.py +++ b/cryptofeed/exchange/poloniex.py @@ -74,9 +74,17 @@ def __reset(self): self.seq_no = {} async def _ticker(self, msg: dict, timestamp: float): - # currencyPair, last, lowestAsk, highestBid, percentChange, baseVolume, - # quoteVolume, isFrozen, 24hrHigh, 24hrLow - pair_id, _, ask, bid, _, _, _, _, _, _ = msg + """ + Format: + + currencyPair, last, lowestAsk, highestBid, percentChange, baseVolume, + quoteVolume, isFrozen, 24hrHigh, 24hrLow, postOnly, maintenance mode + + The postOnly field indicates that new orders posted to the market must be non-matching orders (no taker orders). + Any orders that would match will be rejected. Maintenance mode indicates that maintenace is being performed + and orders will be rejected + """ + pair_id, _, ask, bid, _, _, _, _, _, _, _, _ = msg if pair_id not in self.pair_mapping: # Ignore new trading pairs that are added during long running sessions return diff --git a/cryptofeed/standards.py b/cryptofeed/standards.py index bf06e8f0e..f3aa9ef6b 100755 --- a/cryptofeed/standards.py +++ b/cryptofeed/standards.py @@ -256,7 +256,8 @@ def timestamp_normalize(exchange, ts): OKEX: ORDER_INFO }, CANDLES: { - BINANCE: 'kline_' + BINANCE: 'kline_', + BINANCE_FUTURES: 'kline_', } } @@ -335,7 +336,7 @@ def normalize_channel(exchange: str, feed: str) -> str: if exchange in entries: if entries[exchange] == feed: return chan - return None + raise ValueError('Unable to normalize channel %s', feed) def is_authenticated_channel(channel: str) -> bool: diff --git a/cryptofeed/symbols.py b/cryptofeed/symbols.py index 62aacadcc..b62298d1d 100644 --- a/cryptofeed/symbols.py +++ b/cryptofeed/symbols.py @@ -94,7 +94,14 @@ def binance_us_symbols() -> Dict[str, str]: def binance_futures_symbols() -> Dict[str, str]: - return _binance_symbols('https://fapi.binance.com/fapi/v1/exchangeInfo', BINANCE_FUTURES) + base = _binance_symbols('https://fapi.binance.com/fapi/v1/exchangeInfo', BINANCE_FUTURES) + add = {} + for symbol, orig in base.items(): + if "_" in orig: + continue + add[f"{symbol}-PINDEX"] = f"p{orig}" + base.update(add) + return base def binance_delivery_symbols() -> Dict[str, str]: diff --git a/examples/demo.py b/examples/demo.py index 0608657b6..4b28b20b8 100644 --- a/examples/demo.py +++ b/examples/demo.py @@ -8,7 +8,7 @@ from cryptofeed.symbols import binance_symbols from cryptofeed import FeedHandler -from cryptofeed.callback import CandleCallback, BookCallback, FundingCallback, TickerCallback, TradeCallback, FuturesIndexCallback, OpenInterestCallback +from cryptofeed.callback import BookCallback, FundingCallback, TickerCallback, TradeCallback, FuturesIndexCallback, OpenInterestCallback from cryptofeed.defines import CANDLES, BID, ASK, BLOCKCHAIN, COINBASE, FUNDING, GEMINI, L2_BOOK, L3_BOOK, OPEN_INTEREST, TICKER, TRADES, VOLUME, FUTURES_INDEX, BOOK_DELTA from cryptofeed.exchanges import (FTX, Binance, BinanceFutures, Bitfinex, Bitflyer, Bitmax, Bitmex, Bitstamp, Bittrex, Coinbase, Gateio, HitBTC, Huobi, HuobiDM, HuobiSwap, Kraken, OKCoin, OKEx, Poloniex, Bybit) @@ -104,7 +104,7 @@ def main(): f.add_feed(Bitmax(symbols=['XRP-USDT', 'BTC-USDT'], channels=[L2_BOOK], callbacks={TRADES: trade, L2_BOOK: book})) f.add_feed(Bitflyer(symbols=['BTC-JPY'], channels=[L2_BOOK, TRADES, TICKER], callbacks={L2_BOOK: book, BOOK_DELTA: delta, TICKER: ticker, TRADES: trade})) f.add_feed(BinanceFutures(symbols=['BTC-USDT'], channels=[TICKER], callbacks={TICKER: ticker})) - f.add_feed(Binance(candle_closed_only=True, symbols=['BTC-USDT'], channels=[CANDLES], callbacks={CANDLES: CandleCallback(candle_callback)})) + f.add_feed(BinanceFutures(subscription={TRADES: ['BTC-USDT'], CANDLES: ['BTC-USDT', 'BTC-USDT-PINDEX']}, callbacks={CANDLES: candle_callback, TRADES: trade})) f.run()