Skip to content

Commit

Permalink
switch to new protocol, ticker and level2 channels (#12)
Browse files Browse the repository at this point in the history
* Introduces a flat schema for all messages.
* Subscription message moved to gdax module.
* Fixes bad call to logging module in ws4py implementation.
* Reverted to ws4py 0.3.4 due to [possible bug](Lawouach/WebSocket-for-Python#230) in other recent releases.
  • Loading branch information
blbradley authored Sep 12, 2017
1 parent 1187230 commit 4f6b4c2
Show file tree
Hide file tree
Showing 6 changed files with 118 additions and 16 deletions.
13 changes: 12 additions & 1 deletion gdax.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
import json

from myproducer import producer


subscription_message = {
'type': 'subscribe',
'product_ids': ['BTC-USD'],
'channels': ['ticker', 'level2'],
}

def create_raw(dt, data):
raw = {'timestamp': dt.isoformat(), 'producerUUID': producer.uuid.bytes, 'data': data}
data_dict = json.loads(data)
extra = {'timestamp': dt.isoformat(), 'producerUUID': producer.uuid.bytes}
raw = {**extra, **data_dict}
return raw
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ six==1.10.0
urllib3==1.22
websocket-client==0.44.0
websockets==3.3
ws4py==0.4.2
ws4py==0.3.4
91 changes: 90 additions & 1 deletion websocket-raw.avsc
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,97 @@
"type": "bytes"
},
{
"name": "data",
"name": "type",
"type": "string"
},
{
"name": "channels",
"type": ["null", {
"type": "array",
"items": {
"type": "record",
"name": "channel",
"fields": [
{
"name": "name",
"type": "string"
},
{
"name": "product_ids",
"type": {
"type": "array",
"items": "string"
}
}
]
}
}]
},
{
"name": "trade_id",
"type": ["null", "long"]
},
{
"name": "sequence",
"type": ["null", "long"]
},
{
"name": "time",
"type": ["null", "string"]
},
{
"name": "product_id",
"type": ["null", "string"]
},
{
"name": "price",
"type": ["null", "string"]
},
{
"name": "side",
"type": ["null", "string"]
},
{
"name": "last_size",
"type": ["null", "string"]
},
{
"name": "best_bid",
"type": ["null", "string"]
},
{
"name": "best_ask",
"type": ["null", "string"]
},
{
"name": "bids",
"type": ["null", {
"type": "array",
"items": {
"type": "array",
"items": "string"
}
}]
},
{
"name": "asks",
"type": ["null", {
"type": "array",
"items": {
"type": "array",
"items": "string"
}
}]
},
{
"name": "changes",
"type": ["null", {
"type": "array",
"items": {
"type": "array",
"items": "string"
}
}]
}
]
}
8 changes: 5 additions & 3 deletions websocket_client_example.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
import logging
from datetime import datetime
import json
import websocket

from gdax import create_raw
from gdax import create_raw, subscription_message
from myproducer import producer

logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')

def on_message(ws, message):
dt = datetime.utcnow()
logging.debug('received websocket message: {}'.format(message))
value = create_raw(dt, message)
producer.produce(
topic='websocket_client-gdax',
Expand All @@ -23,10 +25,10 @@ def on_error(ws, error):
def on_close(ws):
logging.warning("### closed ###")

msg = """{"type": "subscribe","product_ids":["BTC-USD"]}"""

def on_open(ws):
msg = json.dumps(subscription_message)
ws.send(msg)
logging.debug('sent websocket message: {}'.format(msg))

if __name__ == "__main__":
ws = websocket.WebSocketApp("wss://ws-feed.gdax.com",
Expand Down
6 changes: 3 additions & 3 deletions websockets_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
from datetime import datetime
import asyncio
import websockets
import json

from gdax import create_raw
from gdax import create_raw, subscription_message
from myproducer import producer

async def handler(websocket):
Expand All @@ -22,8 +23,7 @@ async def handler(websocket):

async def main():
async with websockets.connect('wss://ws-feed.gdax.com') as websocket:
msg = """{"type": "subscribe","product_ids":["BTC-USD"]}"""
await websocket.send(msg)
await websocket.send(json.dumps(subscription_message))
await handler(websocket)

if __name__ == "__main__":
Expand Down
14 changes: 7 additions & 7 deletions ws4py_example.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,27 @@
import logging
import uuid
from datetime import datetime
import json

from ws4py.client.threadedclient import WebSocketClient

from gdax import create_raw
from gdax import create_raw, subscription_message
from myproducer import producer

logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')

class DummyClient(WebSocketClient):
def opened(self):
def data_provider():
msg = """{"type": "subscribe","product_ids":["BTC-USD"]}"""
return msg

self.send(data_provider())
msg = json.dumps(subscription_message)
self.send(msg)
logging.debug('sent websocket message: {}'.format(msg))

def closed(self, code, reason=None):
logging.warning("Closed down", code, reason)
logging.warning("Closed down, code {}: {}".format(code, reason))

def received_message(self, m):
dt = datetime.utcnow()
logging.debug('received websocket message: {}'.format(m))
value = create_raw(dt, str(m))
producer.produce(
topic='ws4py-gdax',
Expand Down

0 comments on commit 4f6b4c2

Please sign in to comment.