Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improvements #37

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
os:
- linux

language: python
cache: pip

python:
- '2.7.13'
- '3.6.1'

install:
pip install -r requirements.txt

script:
python setup.py test
39 changes: 38 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
# pymarketstore
Python driver for MarketStore

Build Status: ![build status](https://circleci.com/gh/alpacahq/pymarketstore/tree/master.png?971fa5b1079e8af0568db6caf772132c54f04dc2)
Build Status:
![circleci.com](https://circleci.com/gh/alpacahq/pymarketstore/tree/master.png?971fa5b1079e8af0568db6caf772132c54f04dc2)
[![travis-ci.org](https://travis-ci.org/alpacahq/pymarketstore.svg)](https://travis-ci.org/alpacahq/pymarketstore)

Pymarketstore can query and write financial timeseries data from [MarketStore](https://github.com/alpacahq/marketstore)

Expand Down Expand Up @@ -68,6 +70,35 @@ Construct a client object with endpoint.

## Query

`pymkts.Client#create(tbk, datashapes, schema='Symbol/Timeframe/AttributeGroup', row_type="fixed")`

You can create a new time bucket and build the datashapes using `pymkts.DataShapes`.

```python
from pymarketstore import Client, DataShape, DataShapes

cli = Client()

o = DataShape(name='Open', typ='float64')
h = DataShape(name='High', typ='float64')
l = DataShape(name='Low', typ='float64')
c = DataShape(name='Close', typ='float64')
v = DataShape(name='Volume', typ='int64')
e = DataShape(name='Epoch', typ='int64')

shapes = DataShapes()
shapes.add(o)
shapes.add(h)
shapes.add(l)
shapes.add(c)
shapes.add(v)
shapes.add(e)

cli.create('TSLA/15Min/OHLCV', shapes)
```

## Query

`pymkts.Client#query(symbols, timeframe, attrgroup, start=None, end=None, limit=None, limit_from_start=False)`

You can build parameters using `pymkts.Params`.
Expand All @@ -82,6 +113,12 @@ You can build parameters using `pymkts.Params`.

Pass one or multiple instances of `Params` to `Client.query()`. It will return `QueryReply` object which holds internal numpy array data returned from the server.

## Sql

`pymkts.Client#sql(statement)`

You can query with raw SQL statements.

## Write

`pymkts.Client#write(data, tbk)`
Expand Down
27 changes: 27 additions & 0 deletions examples/create_write.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import time
import numpy as np
from pymarketstore import Client, DataShape, DataShapes

cli = Client()

o = DataShape(name='Open', typ='float64')
h = DataShape(name='High', typ='float64')
l = DataShape(name='Low', typ='float64')
c = DataShape(name='Close', typ='float64')
v = DataShape(name='Volume', typ='int64')
e = DataShape(name='Epoch', typ='int64')

shapes = DataShapes()
shapes.add(o)
shapes.add(h)
shapes.add(l)
shapes.add(c)
shapes.add(v)
shapes.add(e)

cli.create('TSLA/15Min/OHLCV', shapes)

data = np.array([(int(time.time()), 0, 1, 1, 1, 1)],
dtype=[('Epoch', 'i8'), ('Open', 'f8'), ('Close', 'f8'),
('High', 'f8'), ('Low', 'f8'), ('Volume', 'i8')])
cli.write(data, 'TSLA/15Min/OHLCV')
2 changes: 1 addition & 1 deletion pymarketstore/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .client import Client, Params # noqa
from .client import Client, Params, DataShape, DataShapes # noqa

# alias
Param = Params # noqa
Expand Down
87 changes: 78 additions & 9 deletions pymarketstore/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,19 @@ def get_timestamp(value):

class Params(object):

def __init__(self, symbols, timeframe, attrgroup,
start=None, end=None,
limit=None, limit_from_start=None):
if not isiterable(symbols):
def __init__(self, symbols=None, timeframe=None, attrgroup=None,
start=None, end=None, limit=None, limit_from_start=None,
columns=None, is_sqlstatement=None, sql_statement=None):
if symbols and not isiterable(symbols):
symbols = [symbols]
self.tbk = ','.join(symbols) + "/" + timeframe + "/" + attrgroup
if symbols and timeframe and attrgroup:
self.tbk = ','.join(symbols) + "/" + timeframe + "/" + attrgroup
else:
self.tbk = None
self.key_category = None # server default
self.is_sqlstatement = is_sqlstatement
self.sql_statement = sql_statement
self.columns = columns
self.start = get_timestamp(start)
self.end = get_timestamp(end)
self.limit = limit
Expand All @@ -72,31 +78,80 @@ def __repr__(self):
return 'Params({})'.format(content)


class DataShape(object):
def __init__(self, name, typ):
self.name = name
self.typ = typ


class DataShapes(object):
def __init__(self):
self.shapes = dict()

def add(self, shape):
"""
:type shape: DataShape
"""
self.shapes.setdefault(shape.typ, set())
self.shapes[shape.typ].add(shape.name)

def __str__(self):
rtn_strs = []
return ':'.join(['%s/%s' % (','.join(sorted(self.shapes[typ])), typ)
for typ in sorted(self.shapes)])


class Client(object):

def __init__(self, endpoint='http://localhost:5993/rpc'):
def __init__(self, endpoint='http://localhost:5993/rpc', codec='msgpack'):
self.endpoint = endpoint
rpc_client = get_rpc_client('msgpack')
self.codec = codec
rpc_client = get_rpc_client(codec)
self.rpc = rpc_client(self.endpoint)

def _request(self, method, **query):
try:
resp = self.rpc.call(method, **query)
resp.raise_for_status()
rpc_reply = self.rpc.codec.loads(resp.content, encoding='utf-8')
rpc_reply = self.rpc.codec.loads(resp.content)
return self.rpc.response(rpc_reply)
except requests.exceptions.HTTPError as exc:
logger.exception(exc)
raise

def create(self,
tbk,
datashapes,
schema='Symbol/Timeframe/AttributeGroup',
row_type="fixed"):
"""
:type datashapes: DataShapes
"""
tbk = '%s:%s' % (tbk, schema)
request = {
'Key': tbk,
'DataShapes': str(datashapes),
'RowType': row_type
}
requests = {'Requests': [request]}
return self._request('DataService.Create', **requests)

def query(self, params):
if not isiterable(params):
params = [params]
query = self.build_query(params)
reply = self._request('DataService.Query', **query)
return QueryReply(reply)

def sql(self, statement):
params = Params(is_sqlstatement=True, sql_statement=statement)
return self.query(params)

def write(self, recarray, tbk, isvariablelength=False):
if self.codec != 'msgpack':
print("Write action only support for msgpack codec")
return

data = {}
data['types'] = [
recarray.dtype[name].str.replace('<', '')
Expand All @@ -121,7 +176,7 @@ def write(self, recarray, tbk, isvariablelength=False):
except requests.exceptions.ConnectionError:
raise requests.exceptions.ConnectionError(
"Could not contact server")
reply_obj = self.rpc.codec.loads(reply.content, encoding='utf-8')
reply_obj = self.rpc.codec.loads(reply.content)
resp = self.rpc.response(reply_obj)
return resp

Expand All @@ -145,6 +200,12 @@ def build_query(self, params):
req['limit_from_start'] = bool(param.limit_from_start)
if param.functions is not None:
req['functions'] = param.functions
if param.columns is not None:
req['columns'] = param.columns
if param.is_sqlstatement is not None:
req['is_sqlstatement'] = param.is_sqlstatement
if param.sql_statement is not None:
req['sql_statement'] = param.sql_statement
reqs.append(req)
return {
'requests': reqs,
Expand Down Expand Up @@ -177,3 +238,11 @@ def stream(self):

def __repr__(self):
return 'Client("{}")'.format(self.endpoint)


if __name__ == '__main__':
param = Params('BTC', '1Min', 'OHLCV', limit=10)
cli = Client()
reply = cli.query(param)
print(reply)

26 changes: 26 additions & 0 deletions tests/test_client.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import pymarketstore as pymkts
from pymarketstore import DataShape, DataShapes
from pymarketstore import jsonrpc
import numpy as np
try:
Expand All @@ -21,6 +22,31 @@ def test_client_init():
assert isinstance(c.rpc, pymkts.client.MsgpackRpcClient)


@patch('pymarketstore.client.MsgpackRpcClient')
def test_create(MsgpackRpcClient):
cli = pymkts.Client()

o = DataShape(name='Open', typ='float64')
h = DataShape(name='High', typ='float64')
l = DataShape(name='Low', typ='float64')
c = DataShape(name='Close', typ='float64')
v = DataShape(name='Volume', typ='int64')
e = DataShape(name='Epoch', typ='int64')

shapes = DataShapes()
shapes.add(o)
shapes.add(h)
shapes.add(l)
shapes.add(c)
shapes.add(v)
shapes.add(e)

assert str(shapes) == 'Close,High,Low,Open/float64:Epoch,Volume/int64'

cli.create('TSLA/15Min/OHLCV', shapes)
assert MsgpackRpcClient().call.called == 1


@patch('pymarketstore.client.MsgpackRpcClient')
def test_query(MsgpackRpcClient):
c = pymkts.Client()
Expand Down