From 6a9e8971cf21f3332622454ae04059c20a56eb67 Mon Sep 17 00:00:00 2001 From: Lucas Turpin Date: Mon, 9 Mar 2020 06:42:01 -0400 Subject: [PATCH] [#178] Tests and edge cases - Added tests for all new components in the branch - UpdateOrder mods now hold the order's new quantity instead of the change - Support more cases for modifies/executes - Minor refactoring/renaming + comments --- app/src/constants/Constants.ts | 2 +- .../api/hndlrs/obhandler_test.go | 22 +-- .../api/hndlrs/playbackhandler.go | 37 +++- .../api/hndlrs/playbackhandler_test.go | 169 ++++++++++++++++++ core/graphelier-service/api/routes.go | 2 +- core/graphelier-service/mock_db/mock_mongo.go | 98 +++++----- core/graphelier-service/models/orderbooks.go | 11 +- .../models/orderbooks_test.go | 25 +++ core/graphelier-service/models/playback.go | 66 ++++--- .../models/playback_test.go | 92 ++++++++++ .../utils/test_utils/httpreq.go | 4 +- 11 files changed, 435 insertions(+), 93 deletions(-) create mode 100644 core/graphelier-service/api/hndlrs/playbackhandler_test.go create mode 100644 core/graphelier-service/models/playback_test.go diff --git a/app/src/constants/Constants.ts b/app/src/constants/Constants.ts index 5950bd51..526b4bef 100644 --- a/app/src/constants/Constants.ts +++ b/app/src/constants/Constants.ts @@ -3,7 +3,7 @@ import bigInt from 'big-integer'; export const ENVIRONMENT = 'DEV'; export const APP_NAME = 'Graphelier'; export const BACKEND_PORT = 5050; -let host = '18.218.121.174'; +let host = 'localhost'; if (process.env.NODE_ENV === 'production') { host = '18.218.121.174'; } diff --git a/core/graphelier-service/api/hndlrs/obhandler_test.go b/core/graphelier-service/api/hndlrs/obhandler_test.go index 91fadc41..c274134b 100644 --- a/core/graphelier-service/api/hndlrs/obhandler_test.go +++ b/core/graphelier-service/api/hndlrs/obhandler_test.go @@ -12,7 +12,7 @@ import ( "github.com/stretchr/testify/assert" ) -var ob_messages []*models.Message = []*models.Message{ +var obMessages []*models.Message = []*models.Message{ MakeMsg(DirectionAsk, OrderID(12), SodOffset(1)), MakeMsg(DirectionBid, OrderID(13), SodOffset(2)), MakeMsg(DirectionAsk, OrderID(15), SodOffset(3)), @@ -24,13 +24,13 @@ func TestFetchOrderbookDeltaSuccess(t *testing.T) { mockedDB.EXPECT(). GetSingleMessage("test", int64(1)). - Return(ob_messages[0], nil) + Return(obMessages[0], nil) mockedDB.EXPECT(). GetOrderbook("test", uint64(1)). Return(&models.Orderbook{}, nil) mockedDB.EXPECT(). GetMessagesWithPagination("test", &models.Paginator{NMessages: 3, SodOffset: 0}). - Return(ob_messages, nil) + Return(obMessages, nil) var deltabook models.Orderbook err := MakeRequest( @@ -105,7 +105,7 @@ func TestFetchOrderbookDeltaNegativeNMessages(t *testing.T) { Return(&models.Orderbook{Timestamp: 0, LastSodOffset: 0, Instrument: "test"}, nil) mockedDB.EXPECT(). GetMessagesWithPagination("test", &models.Paginator{NMessages: 5, SodOffset: 0}). - Return(ob_messages, nil) + Return(obMessages, nil) var deltabook models.Orderbook err := MakeRequest( @@ -139,23 +139,23 @@ func TestFetchOrderbookDeltaExecuteHiddenOrder(t *testing.T) { mockedDB := MockDb(t) defer Ctrl.Finish() - test_messages := make([]*models.Message, len(ob_messages)) - copy(test_messages, ob_messages) - test_messages = append( - ob_messages, + testMessages := make([]*models.Message, len(obMessages)) + copy(testMessages, obMessages) + testMessages = append( + obMessages, // Add an execute for hideen order MakeMsg(OrderID(0), SodOffset(4), TypeExecute), ) mockedDB.EXPECT(). GetSingleMessage("test", int64(1)). - Return(ob_messages[0], nil) + Return(obMessages[0], nil) mockedDB.EXPECT(). GetOrderbook("test", uint64(1)). Return(&models.Orderbook{}, nil) mockedDB.EXPECT(). GetMessagesWithPagination("test", &models.Paginator{NMessages: 4, SodOffset: 0}). - Return(test_messages, nil) + Return(testMessages, nil) var deltabook models.Orderbook err := MakeRequest( @@ -184,7 +184,7 @@ func TestFetchOrderbookSuccess(t *testing.T) { Return(&models.Orderbook{Instrument: "test"}, nil) mockedDB.EXPECT(). GetMessagesByTimestamp("test", uint64(1)). - Return(ob_messages, nil) + Return(obMessages, nil) var orderbook models.Orderbook err := MakeRequest( diff --git a/core/graphelier-service/api/hndlrs/playbackhandler.go b/core/graphelier-service/api/hndlrs/playbackhandler.go index e65ef3da..3645d770 100644 --- a/core/graphelier-service/api/hndlrs/playbackhandler.go +++ b/core/graphelier-service/api/hndlrs/playbackhandler.go @@ -3,6 +3,7 @@ package hndlrs import ( "graphelier/core/graphelier-service/db" "graphelier/core/graphelier-service/models" + "net" "net/http" "strconv" "time" @@ -13,8 +14,17 @@ import ( "github.com/gorilla/websocket" ) +// SocketConn : Indirection layer for access to websocket.Conn +type SocketConn interface { + RemoteAddr() net.Addr + WriteJSON(interface{}) error + ReadMessage() (int, []byte, error) + Close() error +} + +// PlaybackSession : Handles lifetime of a playback session with the client over a websocket connection type PlaybackSession struct { - Socket *websocket.Conn + Socket SocketConn Orderbook *models.Orderbook Delay uint64 Loader MessageLoader @@ -23,11 +33,13 @@ type PlaybackSession struct { running bool } +// MessageLoader : Interface for asynchronous retrieval of messages, each call to LoadMessages should be stateful and move the bounds type MessageLoader interface { LoadMessages() Init(*models.Orderbook, chan []*models.Message) } +// TimeIntervalLoader : Loads messages by time intervals type TimeIntervalLoader struct { Instrument string Datastore db.Datastore @@ -36,6 +48,7 @@ type TimeIntervalLoader struct { currentTimestamp uint64 } +// CountIntervalLoader : Loads messages by regular count intervals type CountIntervalLoader struct { Instrument string Datastore db.Datastore @@ -56,7 +69,7 @@ func StreamPlayback(env *Env, w http.ResponseWriter, r *http.Request) error { delay *= 1e9 params := mux.Vars(r) instrument := params["instrument"] - startTimestamp, err := strconv.ParseUint(params["start_timestamp"], 10, 64) + startTimestamp, err := strconv.ParseUint(params["timestamp"], 10, 64) if err != nil { return StatusError{400, err} } @@ -132,14 +145,14 @@ func (pb *PlaybackSession) handleStreaming() error { // receive messages from the client and aborts the session on close func (pb *PlaybackSession) handleSocketControl() { for pb.running { - type_, _, err := pb.Socket.ReadMessage() + sockMsgType, _, err := pb.Socket.ReadMessage() if err != nil { if websocket.IsUnexpectedCloseError(err, websocket.CloseNoStatusReceived, websocket.CloseGoingAway) { log.Errorf("Unexpected error while waiting for input: %v\n", err) } pb.running = false } - switch type_ { + switch sockMsgType { case websocket.CloseMessage: log.Infof("Terminating playback session with %s\n", pb.Socket.RemoteAddr()) pb.running = false @@ -183,8 +196,11 @@ func (pb *PlaybackSession) Close() { // LoadMessages : Reads the next batch of messages from the datastore, by timestamp range func (loader *TimeIntervalLoader) LoadMessages() { - loader.currentTimestamp += loader.Interval - log.Debugf("Loading messages for {%d, %d}\n", loader.currentTimestamp, loader.currentTimestamp+loader.Interval) + log.Debugf( + "Loading messages for {%d, %d}\n", + loader.currentTimestamp, + loader.currentTimestamp+loader.Interval, + ) messages, err := loader.Datastore.GetMessagesByTimestampRange( loader.Instrument, loader.currentTimestamp, @@ -195,6 +211,7 @@ func (loader *TimeIntervalLoader) LoadMessages() { close(loader.messages) return } + loader.currentTimestamp += loader.Interval loader.messages <- messages } @@ -206,8 +223,11 @@ func (loader *TimeIntervalLoader) Init(orderbook *models.Orderbook, messages cha // LoadMessages : Reads the next batch of messages from the datastore, by number of messages func (loader *CountIntervalLoader) LoadMessages() { - loader.currentPage.SodOffset += loader.currentPage.NMessages - log.Debugf("Loading messages for {%d, %d}\n", loader.currentPage.SodOffset, loader.currentPage.SodOffset+loader.currentPage.NMessages) + log.Debugf( + "Loading messages for {%d, %d}\n", + loader.currentPage.SodOffset, + loader.currentPage.SodOffset+loader.currentPage.NMessages, + ) messages, err := loader.Datastore.GetMessagesWithPagination( loader.Instrument, &loader.currentPage, @@ -217,6 +237,7 @@ func (loader *CountIntervalLoader) LoadMessages() { close(loader.messages) return } + loader.currentPage.SodOffset += loader.currentPage.NMessages loader.messages <- messages } diff --git a/core/graphelier-service/api/hndlrs/playbackhandler_test.go b/core/graphelier-service/api/hndlrs/playbackhandler_test.go new file mode 100644 index 00000000..ed6048ec --- /dev/null +++ b/core/graphelier-service/api/hndlrs/playbackhandler_test.go @@ -0,0 +1,169 @@ +package hndlrs_test + +import ( + "net" + "testing" + "time" + + "graphelier/core/graphelier-service/api/hndlrs" + "graphelier/core/graphelier-service/models" + . "graphelier/core/graphelier-service/utils/test_utils" + + "github.com/stretchr/testify/assert" +) + +var pbMessages []*models.Message = []*models.Message{ + MakeMsg(Timestamp(101)), +} + +func TestCounterIntervalLoader(t *testing.T) { + mockedDB := MockDb(t) + defer Ctrl.Finish() + + loader := hndlrs.CountIntervalLoader{ + Instrument: "test", + Datastore: mockedDB, + Count: 3, + } + + orderbook := &models.Orderbook{LastSodOffset: 10, Timestamp: 5} + output := make(chan []*models.Message, 1) + loader.Init(orderbook, output) + + mockedDB.EXPECT(). + GetMessagesWithPagination("test", &models.Paginator{NMessages: 3, SodOffset: 10}). + Return(pbMessages, nil) + + loader.LoadMessages() // Channels require concurrency + assert.Equal(t, pbMessages, <-output) +} + +func TestTimeIntervalLoader(t *testing.T) { + mockedDB := MockDb(t) + defer Ctrl.Finish() + + loader := hndlrs.TimeIntervalLoader{ + Instrument: "test", + Datastore: mockedDB, + Interval: uint64(3), + } + + orderbook := &models.Orderbook{LastSodOffset: 10, Timestamp: 5} + output := make(chan []*models.Message, 1) + loader.Init(orderbook, output) + + mockedDB.EXPECT(). + GetMessagesByTimestampRange("test", uint64(5), uint64(8)). + Return(pbMessages, nil) + + loader.LoadMessages() + assert.Equal(t, pbMessages, <-output) +} + +func TestIntervalLoaderFailure(t *testing.T) { + mockedDB := MockDb(t) + defer Ctrl.Finish() + + loader := hndlrs.TimeIntervalLoader{ + Instrument: "test", + Datastore: mockedDB, + Interval: uint64(3), + } + + orderbook := &models.Orderbook{LastSodOffset: 10, Timestamp: 5} + output := make(chan []*models.Message, 1) + loader.Init(orderbook, output) + + mockedDB.EXPECT(). + GetMessagesByTimestampRange("test", uint64(5), uint64(8)). + Return(nil, assert.AnError) + + loader.LoadMessages() + _, ok := <-output // Channel should be closed + assert.False(t, ok) +} + +func TestNoRateParam(t *testing.T) { + mockedDB := MockDb(t) + defer Ctrl.Finish() + + err := MakeRequest( + hndlrs.StreamPlayback, // Function under test + mockedDB, + "GET", + "/playback/test/1", + map[string]string{ + "instrument": "test", + "timestamp": "1", + }, + nil, + ) + + assert.Error(t, err) + assert.IsType(t, hndlrs.ParamError{}, err.(hndlrs.StatusError).Err) +} + +type MockSocket struct { + Output []interface{} +} + +func (socket *MockSocket) RemoteAddr() net.Addr { + return nil +} + +func (socket *MockSocket) WriteJSON(message interface{}) error { + socket.Output = append(socket.Output, message) + return nil +} + +func (socket *MockSocket) ReadMessage() (int, []byte, error) { + <-time.After(50 * time.Millisecond) + return 8, nil, assert.AnError // Close connection +} + +func (socket *MockSocket) Close() error { + return nil +} + +type MockLoader struct { + messages chan []*models.Message +} + +func (loader *MockLoader) Init(orderbook *models.Orderbook, messages chan []*models.Message) { + loader.messages = messages +} + +func (loader *MockLoader) LoadMessages() { + loader.messages <- pbMessages +} + +func TestPlaybackSession(t *testing.T) { + mockedDB := MockDb(t) + defer Ctrl.Finish() + + socket := &MockSocket{} + session := hndlrs.PlaybackSession{ + Delay: 100000000, + Loader: &MockLoader{}, + Socket: socket, + } + mockedDB.EXPECT(). + GetOrderbook("test", uint64(100)). + Return(&models.Orderbook{Timestamp: 100}, nil) + mockedDB.EXPECT(). + GetMessagesByTimestamp("test", uint64(100)). + Return([]*models.Message{}, nil) + + err := session.LoadOrderBook(mockedDB, "test", 100) + assert.Nil(t, err) + + err = session.Start() // Function under test + assert.Nil(t, err) + + assert.GreaterOrEqual(t, len(socket.Output), 1) + result := socket.Output[0].(*models.Modifications) + assert.Equal(t, uint64(100), result.Timestamp) + modification := result.Modifications[0] + assert.Equal(t, uint64(1), modification.Offset) + assert.Equal(t, models.AddOrderType, modification.Type) +} diff --git a/core/graphelier-service/api/routes.go b/core/graphelier-service/api/routes.go index e22a1566..a5b1d4ee 100644 --- a/core/graphelier-service/api/routes.go +++ b/core/graphelier-service/api/routes.go @@ -79,7 +79,7 @@ var routes = Routes{ Route{ "Playback", "GET", - "/playback/{instrument}/{start_timestamp}/", + "/playback/{instrument}/{timestamp}/", hndlrs.CustomHandler{H: hndlrs.StreamPlayback}, }, } diff --git a/core/graphelier-service/mock_db/mock_mongo.go b/core/graphelier-service/mock_db/mock_mongo.go index 82fc7412..f95395e4 100644 --- a/core/graphelier-service/mock_db/mock_mongo.go +++ b/core/graphelier-service/mock_db/mock_mongo.go @@ -1,6 +1,7 @@ // Code generated by MockGen. DO NOT EDIT. // Source: graphelier-service/db/mongo.go +// Package mock_db is a generated GoMock package. package mock_db import ( @@ -28,34 +29,38 @@ func NewMockDatastore(ctrl *gomock.Controller) *MockDatastore { } // EXPECT returns an object that allows the caller to indicate expected use -func (_m *MockDatastore) EXPECT() *MockDatastoreMockRecorder { - return _m.recorder +func (m *MockDatastore) EXPECT() *MockDatastoreMockRecorder { + return m.recorder } // GetOrderbook mocks base method -func (_m *MockDatastore) GetOrderbook(instrument string, timestamp uint64) (*models.Orderbook, error) { - ret := _m.ctrl.Call(_m, "GetOrderbook", instrument, timestamp) +func (m *MockDatastore) GetOrderbook(instrument string, timestamp uint64) (*models.Orderbook, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetOrderbook", instrument, timestamp) ret0, _ := ret[0].(*models.Orderbook) ret1, _ := ret[1].(error) return ret0, ret1 } // GetOrderbook indicates an expected call of GetOrderbook -func (_mr *MockDatastoreMockRecorder) GetOrderbook(arg0, arg1 interface{}) *gomock.Call { - return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "GetOrderbook", reflect.TypeOf((*MockDatastore)(nil).GetOrderbook), arg0, arg1) +func (mr *MockDatastoreMockRecorder) GetOrderbook(instrument, timestamp interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetOrderbook", reflect.TypeOf((*MockDatastore)(nil).GetOrderbook), instrument, timestamp) } // GetMessagesByTimestamp mocks base method -func (_m *MockDatastore) GetMessagesByTimestamp(instrument string, timestamp uint64) ([]*models.Message, error) { - ret := _m.ctrl.Call(_m, "GetMessagesByTimestamp", instrument, timestamp) +func (m *MockDatastore) GetMessagesByTimestamp(instrument string, timestamp uint64) ([]*models.Message, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetMessagesByTimestamp", instrument, timestamp) ret0, _ := ret[0].([]*models.Message) ret1, _ := ret[1].(error) return ret0, ret1 } // GetMessagesByTimestamp indicates an expected call of GetMessagesByTimestamp -func (_mr *MockDatastoreMockRecorder) GetMessagesByTimestamp(arg0, arg1 interface{}) *gomock.Call { - return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "GetMessagesByTimestamp", reflect.TypeOf((*MockDatastore)(nil).GetMessagesByTimestamp), arg0, arg1) +func (mr *MockDatastoreMockRecorder) GetMessagesByTimestamp(instrument, timestamp interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetMessagesByTimestamp", reflect.TypeOf((*MockDatastore)(nil).GetMessagesByTimestamp), instrument, timestamp) } // GetMessagesByTimestampRange mocks base method @@ -74,91 +79,90 @@ func (mr *MockDatastoreMockRecorder) GetMessagesByTimestampRange(instrument, sta } // GetMessagesWithPagination mocks base method -func (_m *MockDatastore) GetMessagesWithPagination(instrument string, paginator *models.Paginator) ([]*models.Message, error) { - ret := _m.ctrl.Call(_m, "GetMessagesWithPagination", instrument, paginator) +func (m *MockDatastore) GetMessagesWithPagination(instrument string, paginator *models.Paginator) ([]*models.Message, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetMessagesWithPagination", instrument, paginator) ret0, _ := ret[0].([]*models.Message) ret1, _ := ret[1].(error) return ret0, ret1 } // GetMessagesWithPagination indicates an expected call of GetMessagesWithPagination -func (_mr *MockDatastoreMockRecorder) GetMessagesWithPagination(arg0, arg1 interface{}) *gomock.Call { - return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "GetMessagesWithPagination", reflect.TypeOf((*MockDatastore)(nil).GetMessagesWithPagination), arg0, arg1) +func (mr *MockDatastoreMockRecorder) GetMessagesWithPagination(instrument, paginator interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetMessagesWithPagination", reflect.TypeOf((*MockDatastore)(nil).GetMessagesWithPagination), instrument, paginator) } // GetSingleMessage mocks base method -func (_m *MockDatastore) GetSingleMessage(instrument string, sodOffset int64) (*models.Message, error) { - ret := _m.ctrl.Call(_m, "GetSingleMessage", instrument, sodOffset) +func (m *MockDatastore) GetSingleMessage(instrument string, sodOffset int64) (*models.Message, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetSingleMessage", instrument, sodOffset) ret0, _ := ret[0].(*models.Message) ret1, _ := ret[1].(error) return ret0, ret1 } // GetSingleMessage indicates an expected call of GetSingleMessage -func (_mr *MockDatastoreMockRecorder) GetSingleMessage(arg0, arg1 interface{}) *gomock.Call { - return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "GetSingleMessage", reflect.TypeOf((*MockDatastore)(nil).GetSingleMessage), arg0, arg1) +func (mr *MockDatastoreMockRecorder) GetSingleMessage(instrument, sodOffset interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetSingleMessage", reflect.TypeOf((*MockDatastore)(nil).GetSingleMessage), instrument, sodOffset) } // GetInstruments mocks base method -func (_m *MockDatastore) GetInstruments() ([]string, error) { - ret := _m.ctrl.Call(_m, "GetInstruments") +func (m *MockDatastore) GetInstruments() ([]string, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetInstruments") ret0, _ := ret[0].([]string) ret1, _ := ret[1].(error) return ret0, ret1 } // GetInstruments indicates an expected call of GetInstruments -func (_mr *MockDatastoreMockRecorder) GetInstruments() *gomock.Call { - return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "GetInstruments", reflect.TypeOf((*MockDatastore)(nil).GetInstruments)) +func (mr *MockDatastoreMockRecorder) GetInstruments() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetInstruments", reflect.TypeOf((*MockDatastore)(nil).GetInstruments)) } // RefreshCache mocks base method -func (_m *MockDatastore) RefreshCache() error { - ret := _m.ctrl.Call(_m, "RefreshCache") +func (m *MockDatastore) RefreshCache() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "RefreshCache") ret0, _ := ret[0].(error) return ret0 } // RefreshCache indicates an expected call of RefreshCache -func (_mr *MockDatastoreMockRecorder) RefreshCache() *gomock.Call { - return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "RefreshCache", reflect.TypeOf((*MockDatastore)(nil).RefreshCache)) +func (mr *MockDatastoreMockRecorder) RefreshCache() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RefreshCache", reflect.TypeOf((*MockDatastore)(nil).RefreshCache)) } // GetSingleOrderMessages mocks base method -func (_m *MockDatastore) GetSingleOrderMessages(instrument string, SODTimestamp int64, EODTimestamp int64, orderID int64) ([]*models.Message, error) { - ret := _m.ctrl.Call(_m, "GetSingleOrderMessages", instrument, SODTimestamp, EODTimestamp, orderID) +func (m *MockDatastore) GetSingleOrderMessages(instrument string, SODTimestamp, EODTimestamp, orderID int64) ([]*models.Message, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetSingleOrderMessages", instrument, SODTimestamp, EODTimestamp, orderID) ret0, _ := ret[0].([]*models.Message) ret1, _ := ret[1].(error) return ret0, ret1 } // GetSingleOrderMessages indicates an expected call of GetSingleOrderMessages -func (_mr *MockDatastoreMockRecorder) GetSingleOrderMessages(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { - return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "GetSingleOrderMessages", reflect.TypeOf((*MockDatastore)(nil).GetSingleOrderMessages), arg0, arg1, arg2, arg3) +func (mr *MockDatastoreMockRecorder) GetSingleOrderMessages(instrument, SODTimestamp, EODTimestamp, orderID interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetSingleOrderMessages", reflect.TypeOf((*MockDatastore)(nil).GetSingleOrderMessages), instrument, SODTimestamp, EODTimestamp, orderID) } // GetTopOfBookByInterval mocks base method -func (_m *MockDatastore) GetTopOfBookByInterval(instrument string, startTimestamp uint64, endTimestamp uint64, maxCount int64) ([]*models.Point, error) { - ret := _m.ctrl.Call(_m, "GetTopOfBookByInterval", instrument, startTimestamp, endTimestamp, maxCount) +func (m *MockDatastore) GetTopOfBookByInterval(instrument string, startTimestamp, endTimestamp uint64, maxCount int64) ([]*models.Point, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetTopOfBookByInterval", instrument, startTimestamp, endTimestamp, maxCount) ret0, _ := ret[0].([]*models.Point) ret1, _ := ret[1].(error) return ret0, ret1 } // GetTopOfBookByInterval indicates an expected call of GetTopOfBookByInterval -func (_mr *MockDatastoreMockRecorder) GetTopOfBookByInterval(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { - return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "GetTopOfBookByInterval", reflect.TypeOf((*MockDatastore)(nil).GetTopOfBookByInterval), arg0, arg1, arg2, arg3) -} - -// GetMessagesWithinInterval mocks base method -func (_m *MockDatastore) GetMessagesWithinInterval(instrument string, startTimestamp uint64, endTimestamp uint64) ([]*models.Message, error) { - ret := _m.ctrl.Call(_m, "GetMessagesWithinInterval", instrument, startTimestamp, endTimestamp) - ret0, _ := ret[0].([]*models.Message) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// GetMessagesWithinInterval indicates an expected call of GetMessagesWithinInterval -func (_mr *MockDatastoreMockRecorder) GetMessagesWithinInterval(arg0, arg1, arg2 interface{}) *gomock.Call { - return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "GetMessagesWithinInterval", reflect.TypeOf((*MockDatastore)(nil).GetMessagesWithinInterval), arg0, arg1, arg2) +func (mr *MockDatastoreMockRecorder) GetTopOfBookByInterval(instrument, startTimestamp, endTimestamp, maxCount interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetTopOfBookByInterval", reflect.TypeOf((*MockDatastore)(nil).GetTopOfBookByInterval), instrument, startTimestamp, endTimestamp, maxCount) } diff --git a/core/graphelier-service/models/orderbooks.go b/core/graphelier-service/models/orderbooks.go index 428aa59a..4b87cc99 100644 --- a/core/graphelier-service/models/orderbooks.go +++ b/core/graphelier-service/models/orderbooks.go @@ -281,8 +281,7 @@ func (orderbook *Orderbook) TopBookPerXNano(messages []*Message, pointDistance u messageMultiple := (message.Timestamp - 1) / pointDistance topbook = orderbook.backfillPoints(topbook, pointDistance, lastMultipleCount, messageMultiple) lastMultipleCount = messageMultiple - singleMsgSlice := []*Message{message} - orderbook.ApplyMessagesToOrderbook(singleMsgSlice) + orderbook.ApplyMessage(message) } // handles points after messages topbook = orderbook.backfillPoints(topbook, pointDistance, lastMultipleCount, endTimestamp/pointDistance) @@ -299,7 +298,7 @@ func (orderbook *Orderbook) backfillPoints(topbook []*Point, pointDistance uint6 return topbook } -// YieldModifications: Generates modifications while applying a list of messages +// YieldModifications : Generates modifications while applying a list of messages func (orderbook *Orderbook) YieldModifications(messages []*Message) (modifications *Modifications) { modifications = &Modifications{ Timestamp: orderbook.Timestamp, @@ -312,7 +311,11 @@ func (orderbook *Orderbook) YieldModifications(messages []*Message) (modificatio continue } - if modification := NewModification(messages, currentMessage); modification != nil { + var order *Order = nil + if orders, index := orderbook.getOrders(message); orders != nil && index >= 0 { + order = (*orders)[index] + } + if modification := NewModification(messages, currentMessage, order); modification != nil { modifications.Add(modification, orderbook.Timestamp) } orderbook.ApplyMessage(message) diff --git a/core/graphelier-service/models/orderbooks_test.go b/core/graphelier-service/models/orderbooks_test.go index cca49b04..ddd8da32 100644 --- a/core/graphelier-service/models/orderbooks_test.go +++ b/core/graphelier-service/models/orderbooks_test.go @@ -511,3 +511,28 @@ func TestOrderIdsNotSorted(t *testing.T) { assert.Equal(t, 1, len(orderbook.Asks[0].Orders)) assert.Equal(t, uint64(3), orderbook.Asks[0].Orders[0].ID) } + +func TestYieldModifications(t *testing.T) { + setupExistingOrders() + + messages := []*Message{ + MakeMsg(TypeExecute, ShareQuantity(5)), // UpdateOrder + MakeMsg(TypeExecute, OrderID(0)), // Ignored + MakeMsg(TypeDelete), // DropOrder + MakeMsg(TypeDelete), // Ignored + MakeMsg(TypeDelete, OrderID(2)), // Ignored + MakeMsg(TypeNewOrder, OrderID(4), Price(200.0)), // MoveOrder + } + + result := orderbook.YieldModifications(messages) + assert.Equal(t, 3, len(result.Modifications)) + + modification := result.Modifications[0] + assert.Equal(t, UpdateOrderType, modification.Type) + + modification = result.Modifications[1] + assert.Equal(t, DropOrderType, modification.Type) + + modification = result.Modifications[2] + assert.Equal(t, MoveOrderType, modification.Type) +} diff --git a/core/graphelier-service/models/playback.go b/core/graphelier-service/models/playback.go index 409759dc..83c3d365 100644 --- a/core/graphelier-service/models/playback.go +++ b/core/graphelier-service/models/playback.go @@ -1,5 +1,6 @@ package models +// Modification : Represents an event on a specific order, happening at a given offset time type Modification struct { Type string `json:"type"` Offset uint64 `json:"offset"` @@ -11,60 +12,65 @@ type Modification struct { To *float64 `json:"to,omitempty"` } +// Modifications : Holds a list of Modification, starting at a given time type Modifications struct { Timestamp uint64 `json:"timestamp,string"` Modifications []*Modification `json:"modifications"` } +// Add : Appends a Modification to the current array func (m *Modifications) Add(modification *Modification, timestamp uint64) { modification.Offset = timestamp - m.Timestamp m.Modifications = append(m.Modifications, modification) } +// Modification Type values const ( - ADD_ORDER_TYPE = "add" - DROP_ORDER_TYPE = "drop" - UPDATE_ORDER_TYPE = "update" - MOVE_ORDER_TYPE = "move" + AddOrderType = "add" + DropOrderType = "drop" + UpdateOrderType = "update" + MoveOrderType = "move" ) -func NewModification(messages []*Message, currentMessage int) *Modification { +// NewModification : Generates a Modification for the message at index `currentMessage` affecting given Order +func NewModification(messages []*Message, currentMessage int, order *Order) *Modification { if isMoveModification(messages, currentMessage) { before := messages[currentMessage-1] after := messages[currentMessage] return &Modification{ - Type: MOVE_ORDER_TYPE, + Type: MoveOrderType, To: &after.Price, From: &before.Price, NewID: &after.OrderID, OrderID: before.OrderID, } } - if isSkippable(messages, currentMessage) { + if isSkippable(messages, currentMessage, order) { return nil } - modification := &Modification{} message := messages[currentMessage] + modification := &Modification{ + OrderID: message.OrderID, + Price: &message.Price, + } switch message.Type { case NewOrder: - modification.Type = ADD_ORDER_TYPE - modification.Quantity = &message.ShareQuantity - case Modify: - modification.Type = UPDATE_ORDER_TYPE + modification.Type = AddOrderType modification.Quantity = &message.ShareQuantity case Delete: - modification.Type = DROP_ORDER_TYPE + modification.Type = DropOrderType + case Modify: + modification.fillUpdate(message, order) case Execute: - modification.Type = UPDATE_ORDER_TYPE - modification.Quantity = &message.ShareQuantity + modification.fillUpdate(message, order) default: return nil } - modification.OrderID = message.OrderID - modification.Price = &message.Price return modification } +// Checks if the message at given index corresponds to a move +// i.e. an order gets deleted then re-created at a different price level at the same timestamp func isMoveModification(messages []*Message, currentMessage int) bool { if currentMessage == 0 { return false @@ -77,7 +83,27 @@ func isMoveModification(messages []*Message, currentMessage int) bool { before.ShareQuantity == after.ShareQuantity } -func isSkippable(messages []*Message, currentMessage int) bool { - return currentMessage != len(messages)-1 && - isMoveModification(messages, currentMessage+1) +// Checks if the message should generate a modification or be skipped +func isSkippable(messages []*Message, currentMessage int, order *Order) bool { + return (order == nil && messages[currentMessage].Type != NewOrder) || + (currentMessage != len(messages)-1 && isMoveModification(messages, currentMessage+1)) +} + +func (modification *Modification) fillUpdate(message *Message, order *Order) { + if message.ShareQuantity < order.Quantity && message.ShareQuantity > 0 { + // Reducing order size + modification.Type = UpdateOrderType + modification.Quantity = new(int64) + *modification.Quantity = order.Quantity - message.ShareQuantity + } else if message.ShareQuantity < 0 { + // Order goes to the back of the price level + modification.Type = MoveOrderType + modification.To, modification.From = &message.Price, &message.Price + modification.Price = nil + modification.Quantity = new(int64) + *modification.Quantity = order.Quantity - message.ShareQuantity + } else { + // Deleted order + modification.Type = DropOrderType + } } diff --git a/core/graphelier-service/models/playback_test.go b/core/graphelier-service/models/playback_test.go new file mode 100644 index 00000000..d04d0998 --- /dev/null +++ b/core/graphelier-service/models/playback_test.go @@ -0,0 +1,92 @@ +package models_test + +import ( + . "graphelier/core/graphelier-service/models" + . "graphelier/core/graphelier-service/utils/test_utils" + "testing" + + "github.com/stretchr/testify/assert" +) + +var pbOrder *Order +var pbMessages []*Message + +func setupPB() { + pbOrder = &Order{ID: 1, Quantity: 100} + pbMessages = []*Message{ + MakeMsg(), + MakeMsg(TypeExecute, ShareQuantity(40)), + MakeMsg(TypeDelete, ShareQuantity(60)), + } +} + +func TestNewModifications(t *testing.T) { + setupPB() + // NewOrder msg becomes an AddOrder mod + result := NewModification(pbMessages, 0, nil) + assert.Equal(t, AddOrderType, result.Type) + assert.Equal(t, pbMessages[0].ShareQuantity, *result.Quantity) + + // Execute msg becomes an UpdateOrder mod + result = NewModification(pbMessages, 1, pbOrder) + // Apply message to Order + pbOrder.Quantity -= pbMessages[1].ShareQuantity + assert.Equal(t, UpdateOrderType, result.Type) + assert.Equal(t, pbOrder.Quantity, *result.Quantity) + + // Delete msg becomes a DropOrder mod + result = NewModification(pbMessages, 2, pbOrder) + assert.Equal(t, DropOrderType, result.Type) + assert.Nil(t, result.Quantity) +} + +func TestMoveModification(t *testing.T) { + setupPB() + // Add a NewOrder with same quantity as a delete to form a Move + pbMessages = append(pbMessages, MakeMsg(ShareQuantity(60), Price(101.0), OrderID(2))) + // Delete msg is ignored + result := NewModification(pbMessages, 2, pbOrder) + assert.Nil(t, result) + + // NewOrder msg becomes a MoveOrder mod + result = NewModification(pbMessages, 3, nil) + assert.Equal(t, MoveOrderType, result.Type) + assert.NotEqual(t, result.OrderID, *result.NewID) // Different ID + assert.NotEqual(t, *result.To, *result.From) // Different Price + assert.Nil(t, result.Quantity) // Quantity is unchanged +} + +func TestUpdateModificationCases(t *testing.T) { + setupPB() + // Increase Execute size to completely fill the order + pbMessages[1].ShareQuantity = 100 + result := NewModification(pbMessages, 1, pbOrder) + // Yields a DropOrder instead of an UpdateOrder mod + assert.Equal(t, DropOrderType, result.Type) + assert.Nil(t, result.Quantity) + + // Send a negative Modify msg + pbMessages[1].ShareQuantity = -50 + pbMessages[1].Type = Modify + result = NewModification(pbMessages, 1, pbOrder) + // Yields a MoveOrder instead of an UpdateOrder mod + assert.Equal(t, MoveOrderType, result.Type) + assert.Nil(t, result.Price) + assert.Equal(t, int64(150), *result.Quantity) // Exceptionally has a Quantity + assert.Equal(t, *result.To, *result.From) // Same price +} + +func TestModificationsNilOrder(t *testing.T) { + // Nil order means the msg is not valid, therefore we drop the modification + msg := MakeMsg(TypeDelete) + result := NewModification([]*Message{msg}, 0, nil) + assert.Nil(t, result) + + msg.Type = Modify + result = NewModification([]*Message{msg}, 0, nil) + assert.Nil(t, result) + + msg.Type = Execute + result = NewModification([]*Message{msg}, 0, nil) + assert.Nil(t, result) +} diff --git a/core/graphelier-service/utils/test_utils/httpreq.go b/core/graphelier-service/utils/test_utils/httpreq.go index 7dc2eb17..aa47bfe6 100644 --- a/core/graphelier-service/utils/test_utils/httpreq.go +++ b/core/graphelier-service/utils/test_utils/httpreq.go @@ -11,13 +11,15 @@ import ( "github.com/gorilla/mux" ) +// MakeRequest : Sends a test request to the given handler func MakeRequest( method func(e *handlers.Env, w http.ResponseWriter, r *http.Request) error, connector db.Datastore, verb string, route string, params map[string]string, - result interface{}) error { + result interface{}, +) error { req := httptest.NewRequest(verb, route, nil) writer := httptest.NewRecorder() if params != nil {