Skip to content

Commit

Permalink
Ability to log/store raw exchange updates for debugging purposes
Browse files Browse the repository at this point in the history
  • Loading branch information
bmoscon committed Nov 15, 2019
1 parent 54174d4 commit e324b51
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 17 deletions.
3 changes: 2 additions & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
## Changelog

### 1.1.0
### 1.1.0 (2019-11-14)
* Feature: User enabled logging of exchange messages on error
* Refactor: Overhaul of backends - new base classes and simplified code
* Bugfix: Handle i messages from poloniex more correctly
* Bugfix: Report bittrex errors correctly
* Feature: New exchange: Bitcoin.com
* Feature: New exchange: BinanceUS
* Feature: New exchange: Bitmax
* Feature: Ability to store raw messages from exchanges

### 1.0.1 (2019-09-30)
* Feature: Backfill Bitmex historical trade data from S3 Bucket
Expand Down
39 changes: 24 additions & 15 deletions cryptofeed/feedhandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,21 +58,24 @@


class FeedHandler:
def __init__(self, retries=10, timeout_interval=10, log_messages_on_error=False):
def __init__(self, retries=10, timeout_interval=10, log_messages_on_error=False, raw_message_capture=None):
"""
retries: int
number of times the connection will be retried (in the event of a disconnect or other failure)
timeout_interval: int
number of seconds between checks to see if a feed has timed out
log_messages_on_error: boolean
if true, log the message from the exchange on exceptions
raw_message_capture: callback
if defined, callback to save/process/handle raw message (primarily for debugging purposes)
"""
self.feeds = []
self.retries = retries
self.timeout = {}
self.last_msg = {}
self.timeout_interval = timeout_interval
self.log_messages_on_error = log_messages_on_error
self.raw_message_capture = raw_message_capture

def add_feed(self, feed, timeout=120, **kwargs):
"""
Expand Down Expand Up @@ -205,20 +208,26 @@ async def _connect(self, feed):
raise ExhaustedRetries()

async def _handler(self, websocket, handler, feed_id):
async for message in websocket:
self.last_msg[feed_id] = time()
try:
await handler(message, self.last_msg[feed_id])
except Exception:
if self.log_messages_on_error:
if feed_id in {HUOBI, HUOBI_US, HUOBI_DM}:
message = zlib.decompress(message, 16+zlib.MAX_WBITS)
elif feed_id in {OKCOIN, OKEX}:
message = zlib.decompress(message, -15)
LOG.error("%s: error handling message %s", feed_id, message)
# exception will be logged with traceback when connection handler
# retries the connection
raise
try:
if self.raw_message_capture:
async for message in websocket:
self.last_msg[feed_id] = time()
await self.raw_message_capture(message, self.last_msg[feed_id], feed_id)
await handler(message, self.last_msg[feed_id])
else:
async for message in websocket:
self.last_msg[feed_id] = time()
await handler(message, self.last_msg[feed_id])
except Exception:
if self.log_messages_on_error:
if feed_id in {HUOBI, HUOBI_US, HUOBI_DM}:
message = zlib.decompress(message, 16+zlib.MAX_WBITS)
elif feed_id in {OKCOIN, OKEX}:
message = zlib.decompress(message, -15)
LOG.error("%s: error handling message %s", feed_id, message)
# exception will be logged with traceback when connection handler
# retries the connection
raise

def _do_bitmax_subscribe(self, feed, timeout: int, **kwargs):
"""
Expand Down
41 changes: 41 additions & 0 deletions cryptofeed/util/async_file.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
'''
Copyright (C) 2017-2019 Bryant Moscon - [email protected]
Please see the LICENSE file for the terms and conditions
associated with this software.
'''
import atexit
from collections import defaultdict

import aiofiles
import aiofiles.os


class AsyncFileCallback:
def __init__(self, path, length=10000, rotate=1024 * 1024 * 100):
self.path = path
self.length = length
self.data = defaultdict(list)
self.rotate = rotate
self.count = defaultdict(int)
atexit.register(self.__del__)

def __del__(self):
for uuid in list(self.data.keys()):
with open(f"{self.path}/{uuid}.{self.count[uuid]}", 'a') as fp:
fp.write("\n".join(self.data[uuid]))

async def write(self, uuid):
p = f"{self.path}/{uuid}.{self.count[uuid]}"
async with aiofiles.open(p, mode='a') as fp:
await fp.write("\n".join(self.data[uuid]))
self.data[uuid] = []

stats = await aiofiles.os.stat(p)
if stats.st_size >= self.rotate:
self.count[uuid] += 1

async def __call__(self, data: str, timestamp: float, uuid: str):
self.data[uuid].append(f"{timestamp}: {data}")
if len(self.data[uuid]) >= self.length:
await self.write(uuid)
21 changes: 21 additions & 0 deletions examples/demo_raw_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
'''
Copyright (C) 2017-2019 Bryant Moscon - [email protected]
Please see the LICENSE file for the terms and conditions
associated with this software.
'''
from cryptofeed.util.async_file import AsyncFileCallback
from cryptofeed import FeedHandler
from cryptofeed.exchanges import Coinbase
from cryptofeed.defines import L2_BOOK


def main():
f = FeedHandler(raw_message_capture=AsyncFileCallback('./'))
f.add_feed(Coinbase(pairs=['BTC-USD'], channels=[L2_BOOK]))

f.run()


if __name__ == '__main__':
main()
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ def run_tests(self):
"pyyaml",
"aiohttp",
"aiodns",
"cchardet"
"cchardet",
"aiofiles"
],
extras_require={
'redis': ['aioredis'],
Expand Down

0 comments on commit e324b51

Please sign in to comment.