Skip to content

Commit

Permalink
Merge pull request #56 from Lercerss/lercerss/extending_sample
Browse files Browse the repository at this point in the history
[#25, #34] Extending sample to full days
  • Loading branch information
Lercerss authored Oct 26, 2019
2 parents 84d9649 + 8663363 commit 6618db5
Show file tree
Hide file tree
Showing 11 changed files with 412 additions and 70 deletions.
4 changes: 2 additions & 2 deletions core/scripts/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ RUN pip install -r requirements.txt
COPY . .

FROM builder as lint
CMD [ "pylint", "--exit-zero", "scripts" ]
CMD [ "pylint", "scripts" ]

FROM builder as test
CMD [ "python", "-m", "unittest" ]
CMD [ "python", "-m", "unittest", "-b" ]
60 changes: 36 additions & 24 deletions core/scripts/importer.py
Original file line number Diff line number Diff line change
@@ -1,39 +1,47 @@
import argparse
import csv
from datetime import datetime

import pymongo
from datetime import datetime

from models.message import LobsterMessageParser
from lobster.extender import Extender, weekdays
from models.order_book import OrderBook
from mongo_db.db_connector import save_messages, save_order_book

parser = argparse.ArgumentParser(
description='Import dataset into', formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('file', type=argparse.FileType())
description='Import dataset into mongodb', formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument(
'start_time', type=lambda x: datetime.strptime(x, '%Y-%m-%d %H:%M:%S'))
parser.add_argument('instrument')
'file', help='Path to sample messages file', type=argparse.FileType())
parser.add_argument('start_time', help='Time at which sample starts, e.g. "2012-06-21 00:00:00"',
type=lambda x: datetime.strptime(x, '%Y-%m-%d %H:%M:%S'))
parser.add_argument('instrument', help='Name of instrument for sample')
parser.add_argument('-e', '--extend', nargs=2,
help='Number of days to extend the sample over and the number of times to duplicate messages',
default=[1, 1])

# Determines how many messages will be parsed before sending them to db
MESSAGE_BATCH_SIZE = 200

EOD = 16 * 60 * 60 * 10**9 # 4:00 PM as ns

def load(file, start_time, instrument, cls=LobsterMessageParser):

def load(file, start_time, instrument, extend):
start_timestamp = start_time.timestamp() * 10**9 # in nanoseconds
message_parser = cls(start_timestamp)
order_book = OrderBook(instrument)
try:
last_multiple = 0
interval = 10 * 10**9 # in nanoseconds
reader = csv.reader(file)
interval = 10 * 10**9 # in nanoseconds

extender = Extender(file, start_timestamp, int(extend[1]))
for day in weekdays(start_timestamp, int(extend[0])):
order_book = OrderBook(instrument)
last_multiple = 1
message_buffer = []
for l in reader:
message = message_parser.parse(l)

day_diff = day - start_timestamp
max_time = day + EOD
for message in extender.extend_sample(day_diff):
if message.time > max_time:
break

current_multiple = message.time // interval
if current_multiple > last_multiple:
print(str(order_book))
timestamp = start_timestamp + current_multiple * interval
last_multiple = current_multiple
save_order_book(order_book)
message_buffer.append(message)
Expand All @@ -48,13 +56,17 @@ def load(file, start_time, instrument, cls=LobsterMessageParser):
if len(message_buffer) > 0:
save_messages(message_buffer, instrument)

print('best bid volume={}\tbest ask volume={}'.format(sum(
o.qty for o in order_book.bid_book[order_book.bid]), sum(o.qty for o in order_book.ask_book[order_book.ask])))
except Exception as e:
print('Failed to parse file due to: {}'.format(e))
exit(1)
eod_clear = OrderBook(instrument)
eod_clear.last_time = max_time
# save an empty order_book at the end of the day
save_order_book(eod_clear)

print('best bid volume={}\tbest ask volume={}'.format(
sum(o.qty for o in order_book.bid_book[order_book.bid]),
sum(o.qty for o in order_book.ask_book[order_book.ask])
))


if __name__ == '__main__':
args = parser.parse_args()
load(args.file, args.start_time, args.instrument)
load(args.file, args.start_time, args.instrument, args.extend)
Empty file.
173 changes: 173 additions & 0 deletions core/scripts/lobster/extender.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
import csv
import random

from collections import defaultdict, namedtuple
from datetime import datetime, time, timedelta

from models.message import Message, MessageType
from lobster.parser import LobsterMessageParser


Placement = namedtuple('Placement', ['index', 'message'])


def _endless_copies(l):
i = 0
while True:
i += 1
for m in l:
yield i, m.copy()


def _rand_range(left, right):
return random.randrange(left, right) if left < right else left


def _mix_by_index(base, mix):
j = 0
l = len(mix)
placement = mix[j]
for i, b in enumerate(base):
while j < l and placement.index <= i:
yield placement.message
j += 1
placement = mix[j] if j < l else Placement(-1, -1)
yield b


def weekdays(start: float, n):
current = datetime.fromtimestamp(start / 10 ** 9).date()
for _ in range(n):
yield datetime.combine(current, time()).timestamp() * 10 ** 9
offset = 3 if current.weekday() == 4 else 1
current = current + timedelta(days=offset)


def _initial_qty_for_messages(messages):
"""Calculates inital quantity for an order based on the messages that affect it.
If the order has a NEW_ORDER, return its quantity
Otherwise, sum up the executions, cancels and deletes"""
total = 0
for _, m in messages:
if m.message_type == MessageType.NEW_ORDER:
return m.share_quantity
elif m.message_type == MessageType.EXECUTE:
total += m.share_quantity
elif m.message_type == MessageType.DELETE:
return total + m.share_quantity
elif m.message_type == MessageType.MODIFY:
total += m.share_quantity

# Order has no NEW or DELETE, we will generate a delete for the same amount
return total * 2


def _unfilled_qty_for_messages(messages):
"""Calculates the unfilled quantity for an order based on the messages that affect it.
If the order has a DELETE, return 0
Otherwise, subtract cancels and executions from its initial quantity"""
total = 0
for _, m in reversed(messages):
# Reversed so NEW_ORDER appears on the last index
if m.message_type == MessageType.NEW_ORDER:
return m.share_quantity - total
elif m.message_type == MessageType.EXECUTE:
total += m.share_quantity
elif m.message_type == MessageType.DELETE:
return 0
elif m.message_type == MessageType.MODIFY:
total += m.share_quantity

return total


def _backfill(initial_messages, n_duplicates):
"""Backfills for order ids found in the sample, but without a NEW_ORDER message.
Also adds executions/deletions for order ids that are not closed in the sample."""
id_map = defaultdict(list)
for i, m in enumerate(initial_messages):
if m.id == 0:
continue
id_map[m.id].append(Placement(i, m))

id_diff = max(id_map) - min(id_map) + 1
backfilled = []
for id_, placements in id_map.items():
first_place = placements[0]
if first_place.message.message_type != MessageType.NEW_ORDER:
# Order without a NEW_ORDER message
new_index = _rand_range(
max(0, first_place.index - 500), first_place.index)
time_diff = initial_messages[new_index].time - \
initial_messages[max(0, new_index - 1)].time
_time = initial_messages[new_index].time - \
random.random() * time_diff
backfilled.append(
Placement(
new_index,
Message(_time, MessageType.NEW_ORDER, id_, _initial_qty_for_messages(
placements), first_place.message.price, first_place.message.direction)
)
)
unfilled_qty = _unfilled_qty_for_messages(placements)
if unfilled_qty:
# Unfilled order
new_index = _rand_range(0, first_place.index)
time_diff = initial_messages[new_index].time - \
initial_messages[max(0, new_index - 1)].time
_time = initial_messages[new_index].time + \
random.random() * time_diff
backfilled.append(
Placement(
# Subtract from id to apply on order from previous loop
new_index,
Message(_time, MessageType.DELETE,
id_ - id_diff * n_duplicates, unfilled_qty,
first_place.message.price, first_place.message.direction)
)
)

return sorted(backfilled, key=lambda x: (x.index, x.message.time, x.message.id)), id_diff


class Extender:

def __init__(self, file, start_time: float, n_duplicates: int):
self.n_duplicates = n_duplicates
print('Parsing sample set of messages...')
message_parser = LobsterMessageParser(start_time)
self.initial_messages = [
message_parser.parse(l) for l in csv.reader(file)]
print('Found {} messages in sample.\n'.format(
len(self.initial_messages)))

self.time_diff = self.initial_messages[-1].time - \
self.initial_messages[0].time

print('Backfilling for missing messages in sample...')
backfilled, self.id_diff = _backfill(self.initial_messages, n_duplicates)
self.mixed_messages = sorted(_mix_by_index(self.initial_messages, backfilled),
key=lambda x: x.time)
print('Added {} messages to fill holes in sample.\n'.format(len(backfilled)))

def _yield_n_copies(self, m):
yield m

for i in range(1, self.n_duplicates):
m_copy = m.copy()
if m_copy.id != 0:
m_copy.id += i * self.id_diff
yield m_copy

def extend_sample(self, day_diff: int):
for m in self.initial_messages:
m_ = m.copy()
m_.time += day_diff
yield from self._yield_n_copies(m_)

for loop_count, m in _endless_copies(self.mixed_messages):
m.time += day_diff + self.time_diff * loop_count
if m.id != 0:
m.id += self.id_diff * loop_count * self.n_duplicates

yield from self._yield_n_copies(m)
36 changes: 36 additions & 0 deletions core/scripts/lobster/parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from typing import Callable, Tuple

from models.message import Message, MessageType

_lobster_msg_types = {
# Maps lobster message types to MessageType enum
'1': MessageType.NEW_ORDER,
'2': MessageType.MODIFY,
'3': MessageType.DELETE,
'4': MessageType.EXECUTE,
'5': MessageType.EXECUTE,
'7': MessageType.IGNORE,
}


class LobsterMessageParser:
"""Parses messages stored in Lobster data format
See LOBSTER_SampleFiles_ReadMe.txt for official documentation
"""

def __init__(self, start_timestamp: float):
self.line_parsers: Tuple[str, Callable] = (
# Each index corresponds to an index in the line.
# This maps indexes to object keys and the corresponding function to parse it from a string
('time', lambda x: float(x) * 10**9 + start_timestamp),
('message_type', lambda x: _lobster_msg_types.get(x, MessageType.IGNORE)),
('id_', int),
('share_quantity', int),
('price', lambda x: float(x) / 10000),
('direction', int)
)

def parse(self, line: Tuple[str, str, str, str, str, str]) -> Message:
kwargs = {key: parse_fun(
line[index]) for index, (key, parse_fun) in enumerate(self.line_parsers)}
return Message(**kwargs)
42 changes: 5 additions & 37 deletions core/scripts/models/message.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import enum

from datetime import datetime
from typing import Callable, Tuple


class MessageType(enum.IntEnum):
Expand All @@ -23,40 +23,8 @@ def __init__(self, time, message_type, id_, share_quantity, price, direction):
def __str__(self):
return ('<Message id="{id}" time="{timef}" type="{message_type}" ' +
'price="{price}" qty="{share_quantity}" direction="{direction}">').format(
**self.__dict__, timef=datetime.fromtimestamp(
self.time // 10**9).strftime('%Y-%m-%d %H:%M:%S'))


_lobster_msg_types = {
# Maps lobster message types to MessageType enum
'1': MessageType.NEW_ORDER,
'2': MessageType.MODIFY,
'3': MessageType.DELETE,
'4': MessageType.EXECUTE,
'5': MessageType.EXECUTE,
'7': MessageType.IGNORE,
}


class LobsterMessageParser:
"""Parses messages stored in Lobster data format
See LOBSTER_SampleFiles_ReadMe.txt for official documentation
"""

def __init__(self, start_timestamp: datetime):
self.line_parsers: Tuple[str, Callable] = (
# Each index corresponds to an index in the line.
# This maps indexes to object keys and the
# corresponding function to parse it from a string
('time', lambda x: float(x) * 10**9 + start_timestamp),
('message_type', lambda x: _lobster_msg_types.get(x, MessageType.IGNORE)),
('id_', int),
('share_quantity', int),
('price', lambda x: float(x) / 10000),
('direction', int)
)
**self.__dict__, timef=datetime.fromtimestamp(
self.time // 10**9).strftime('%Y-%m-%d %H:%M:%S'))

def parse(self, line: Tuple[str, str, str, str, str, str]) -> Message:
kwargs = {key: parse_fun(
line[index]) for index, (key, parse_fun) in enumerate(self.line_parsers)}
return Message(**kwargs)
def copy(self):
return Message(self.time, self.message_type, self.id, self.share_quantity, self.price, self.direction)
Loading

0 comments on commit 6618db5

Please sign in to comment.