Skip to content

Commit

Permalink
[#178] Tests and edge cases
Browse files Browse the repository at this point in the history
- Added tests for all new components in the branch
- UpdateOrder mods now hold the order's new quantity instead of the
change
- Support more cases for modifies/executes
- Minor refactoring/renaming + comments
  • Loading branch information
Lercerss committed Mar 9, 2020
1 parent fa28a41 commit 6a9e897
Show file tree
Hide file tree
Showing 11 changed files with 435 additions and 93 deletions.
2 changes: 1 addition & 1 deletion app/src/constants/Constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import bigInt from 'big-integer';
export const ENVIRONMENT = 'DEV';
export const APP_NAME = 'Graphelier';
export const BACKEND_PORT = 5050;
let host = '18.218.121.174';
let host = 'localhost';
if (process.env.NODE_ENV === 'production') {
host = '18.218.121.174';
}
Expand Down
22 changes: 11 additions & 11 deletions core/graphelier-service/api/hndlrs/obhandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"github.com/stretchr/testify/assert"
)

var ob_messages []*models.Message = []*models.Message{
var obMessages []*models.Message = []*models.Message{
MakeMsg(DirectionAsk, OrderID(12), SodOffset(1)),
MakeMsg(DirectionBid, OrderID(13), SodOffset(2)),
MakeMsg(DirectionAsk, OrderID(15), SodOffset(3)),
Expand All @@ -24,13 +24,13 @@ func TestFetchOrderbookDeltaSuccess(t *testing.T) {

mockedDB.EXPECT().
GetSingleMessage("test", int64(1)).
Return(ob_messages[0], nil)
Return(obMessages[0], nil)
mockedDB.EXPECT().
GetOrderbook("test", uint64(1)).
Return(&models.Orderbook{}, nil)
mockedDB.EXPECT().
GetMessagesWithPagination("test", &models.Paginator{NMessages: 3, SodOffset: 0}).
Return(ob_messages, nil)
Return(obMessages, nil)

var deltabook models.Orderbook
err := MakeRequest(
Expand Down Expand Up @@ -105,7 +105,7 @@ func TestFetchOrderbookDeltaNegativeNMessages(t *testing.T) {
Return(&models.Orderbook{Timestamp: 0, LastSodOffset: 0, Instrument: "test"}, nil)
mockedDB.EXPECT().
GetMessagesWithPagination("test", &models.Paginator{NMessages: 5, SodOffset: 0}).
Return(ob_messages, nil)
Return(obMessages, nil)

var deltabook models.Orderbook
err := MakeRequest(
Expand Down Expand Up @@ -139,23 +139,23 @@ func TestFetchOrderbookDeltaExecuteHiddenOrder(t *testing.T) {
mockedDB := MockDb(t)
defer Ctrl.Finish()

test_messages := make([]*models.Message, len(ob_messages))
copy(test_messages, ob_messages)
test_messages = append(
ob_messages,
testMessages := make([]*models.Message, len(obMessages))
copy(testMessages, obMessages)
testMessages = append(
obMessages,
// Add an execute for hideen order
MakeMsg(OrderID(0), SodOffset(4), TypeExecute),
)

mockedDB.EXPECT().
GetSingleMessage("test", int64(1)).
Return(ob_messages[0], nil)
Return(obMessages[0], nil)
mockedDB.EXPECT().
GetOrderbook("test", uint64(1)).
Return(&models.Orderbook{}, nil)
mockedDB.EXPECT().
GetMessagesWithPagination("test", &models.Paginator{NMessages: 4, SodOffset: 0}).
Return(test_messages, nil)
Return(testMessages, nil)

var deltabook models.Orderbook
err := MakeRequest(
Expand Down Expand Up @@ -184,7 +184,7 @@ func TestFetchOrderbookSuccess(t *testing.T) {
Return(&models.Orderbook{Instrument: "test"}, nil)
mockedDB.EXPECT().
GetMessagesByTimestamp("test", uint64(1)).
Return(ob_messages, nil)
Return(obMessages, nil)

var orderbook models.Orderbook
err := MakeRequest(
Expand Down
37 changes: 29 additions & 8 deletions core/graphelier-service/api/hndlrs/playbackhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package hndlrs
import (
"graphelier/core/graphelier-service/db"
"graphelier/core/graphelier-service/models"
"net"
"net/http"
"strconv"
"time"
Expand All @@ -13,8 +14,17 @@ import (
"github.com/gorilla/websocket"
)

// SocketConn : Indirection layer for access to websocket.Conn
type SocketConn interface {
RemoteAddr() net.Addr
WriteJSON(interface{}) error
ReadMessage() (int, []byte, error)
Close() error
}

// PlaybackSession : Handles lifetime of a playback session with the client over a websocket connection
type PlaybackSession struct {
Socket *websocket.Conn
Socket SocketConn
Orderbook *models.Orderbook
Delay uint64
Loader MessageLoader
Expand All @@ -23,11 +33,13 @@ type PlaybackSession struct {
running bool
}

// MessageLoader : Interface for asynchronous retrieval of messages, each call to LoadMessages should be stateful and move the bounds
type MessageLoader interface {
LoadMessages()
Init(*models.Orderbook, chan []*models.Message)
}

// TimeIntervalLoader : Loads messages by time intervals
type TimeIntervalLoader struct {
Instrument string
Datastore db.Datastore
Expand All @@ -36,6 +48,7 @@ type TimeIntervalLoader struct {
currentTimestamp uint64
}

// CountIntervalLoader : Loads messages by regular count intervals
type CountIntervalLoader struct {
Instrument string
Datastore db.Datastore
Expand All @@ -56,7 +69,7 @@ func StreamPlayback(env *Env, w http.ResponseWriter, r *http.Request) error {
delay *= 1e9
params := mux.Vars(r)
instrument := params["instrument"]
startTimestamp, err := strconv.ParseUint(params["start_timestamp"], 10, 64)
startTimestamp, err := strconv.ParseUint(params["timestamp"], 10, 64)
if err != nil {
return StatusError{400, err}
}
Expand Down Expand Up @@ -132,14 +145,14 @@ func (pb *PlaybackSession) handleStreaming() error {
// receive messages from the client and aborts the session on close
func (pb *PlaybackSession) handleSocketControl() {
for pb.running {
type_, _, err := pb.Socket.ReadMessage()
sockMsgType, _, err := pb.Socket.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseNoStatusReceived, websocket.CloseGoingAway) {
log.Errorf("Unexpected error while waiting for input: %v\n", err)
}
pb.running = false
}
switch type_ {
switch sockMsgType {
case websocket.CloseMessage:
log.Infof("Terminating playback session with %s\n", pb.Socket.RemoteAddr())
pb.running = false
Expand Down Expand Up @@ -183,8 +196,11 @@ func (pb *PlaybackSession) Close() {

// LoadMessages : Reads the next batch of messages from the datastore, by timestamp range
func (loader *TimeIntervalLoader) LoadMessages() {
loader.currentTimestamp += loader.Interval
log.Debugf("Loading messages for {%d, %d}\n", loader.currentTimestamp, loader.currentTimestamp+loader.Interval)
log.Debugf(
"Loading messages for {%d, %d}\n",
loader.currentTimestamp,
loader.currentTimestamp+loader.Interval,
)
messages, err := loader.Datastore.GetMessagesByTimestampRange(
loader.Instrument,
loader.currentTimestamp,
Expand All @@ -195,6 +211,7 @@ func (loader *TimeIntervalLoader) LoadMessages() {
close(loader.messages)
return
}
loader.currentTimestamp += loader.Interval
loader.messages <- messages
}

Expand All @@ -206,8 +223,11 @@ func (loader *TimeIntervalLoader) Init(orderbook *models.Orderbook, messages cha

// LoadMessages : Reads the next batch of messages from the datastore, by number of messages
func (loader *CountIntervalLoader) LoadMessages() {
loader.currentPage.SodOffset += loader.currentPage.NMessages
log.Debugf("Loading messages for {%d, %d}\n", loader.currentPage.SodOffset, loader.currentPage.SodOffset+loader.currentPage.NMessages)
log.Debugf(
"Loading messages for {%d, %d}\n",
loader.currentPage.SodOffset,
loader.currentPage.SodOffset+loader.currentPage.NMessages,
)
messages, err := loader.Datastore.GetMessagesWithPagination(
loader.Instrument,
&loader.currentPage,
Expand All @@ -217,6 +237,7 @@ func (loader *CountIntervalLoader) LoadMessages() {
close(loader.messages)
return
}
loader.currentPage.SodOffset += loader.currentPage.NMessages
loader.messages <- messages
}

Expand Down
169 changes: 169 additions & 0 deletions core/graphelier-service/api/hndlrs/playbackhandler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
package hndlrs_test

import (
"net"
"testing"
"time"

"graphelier/core/graphelier-service/api/hndlrs"
"graphelier/core/graphelier-service/models"
. "graphelier/core/graphelier-service/utils/test_utils"

"github.com/stretchr/testify/assert"
)

var pbMessages []*models.Message = []*models.Message{
MakeMsg(Timestamp(101)),
}

func TestCounterIntervalLoader(t *testing.T) {
mockedDB := MockDb(t)
defer Ctrl.Finish()

loader := hndlrs.CountIntervalLoader{
Instrument: "test",
Datastore: mockedDB,
Count: 3,
}

orderbook := &models.Orderbook{LastSodOffset: 10, Timestamp: 5}
output := make(chan []*models.Message, 1)
loader.Init(orderbook, output)

mockedDB.EXPECT().
GetMessagesWithPagination("test", &models.Paginator{NMessages: 3, SodOffset: 10}).
Return(pbMessages, nil)

loader.LoadMessages() // Channels require concurrency
assert.Equal(t, pbMessages, <-output)
}

func TestTimeIntervalLoader(t *testing.T) {
mockedDB := MockDb(t)
defer Ctrl.Finish()

loader := hndlrs.TimeIntervalLoader{
Instrument: "test",
Datastore: mockedDB,
Interval: uint64(3),
}

orderbook := &models.Orderbook{LastSodOffset: 10, Timestamp: 5}
output := make(chan []*models.Message, 1)
loader.Init(orderbook, output)

mockedDB.EXPECT().
GetMessagesByTimestampRange("test", uint64(5), uint64(8)).
Return(pbMessages, nil)

loader.LoadMessages()
assert.Equal(t, pbMessages, <-output)
}

func TestIntervalLoaderFailure(t *testing.T) {
mockedDB := MockDb(t)
defer Ctrl.Finish()

loader := hndlrs.TimeIntervalLoader{
Instrument: "test",
Datastore: mockedDB,
Interval: uint64(3),
}

orderbook := &models.Orderbook{LastSodOffset: 10, Timestamp: 5}
output := make(chan []*models.Message, 1)
loader.Init(orderbook, output)

mockedDB.EXPECT().
GetMessagesByTimestampRange("test", uint64(5), uint64(8)).
Return(nil, assert.AnError)

loader.LoadMessages()
_, ok := <-output // Channel should be closed
assert.False(t, ok)
}

func TestNoRateParam(t *testing.T) {
mockedDB := MockDb(t)
defer Ctrl.Finish()

err := MakeRequest(
hndlrs.StreamPlayback, // Function under test
mockedDB,
"GET",
"/playback/test/1",
map[string]string{
"instrument": "test",
"timestamp": "1",
},
nil,
)

assert.Error(t, err)
assert.IsType(t, hndlrs.ParamError{}, err.(hndlrs.StatusError).Err)
}

type MockSocket struct {
Output []interface{}
}

func (socket *MockSocket) RemoteAddr() net.Addr {
return nil
}

func (socket *MockSocket) WriteJSON(message interface{}) error {
socket.Output = append(socket.Output, message)
return nil
}

func (socket *MockSocket) ReadMessage() (int, []byte, error) {
<-time.After(50 * time.Millisecond)
return 8, nil, assert.AnError // Close connection
}

func (socket *MockSocket) Close() error {
return nil
}

type MockLoader struct {
messages chan []*models.Message
}

func (loader *MockLoader) Init(orderbook *models.Orderbook, messages chan []*models.Message) {
loader.messages = messages
}

func (loader *MockLoader) LoadMessages() {
loader.messages <- pbMessages
}

func TestPlaybackSession(t *testing.T) {
mockedDB := MockDb(t)
defer Ctrl.Finish()

socket := &MockSocket{}
session := hndlrs.PlaybackSession{
Delay: 100000000,
Loader: &MockLoader{},
Socket: socket,
}
mockedDB.EXPECT().
GetOrderbook("test", uint64(100)).
Return(&models.Orderbook{Timestamp: 100}, nil)
mockedDB.EXPECT().
GetMessagesByTimestamp("test", uint64(100)).
Return([]*models.Message{}, nil)

err := session.LoadOrderBook(mockedDB, "test", 100)
assert.Nil(t, err)

err = session.Start() // Function under test
assert.Nil(t, err)

assert.GreaterOrEqual(t, len(socket.Output), 1)
result := socket.Output[0].(*models.Modifications)
assert.Equal(t, uint64(100), result.Timestamp)
modification := result.Modifications[0]
assert.Equal(t, uint64(1), modification.Offset)
assert.Equal(t, models.AddOrderType, modification.Type)
}
2 changes: 1 addition & 1 deletion core/graphelier-service/api/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ var routes = Routes{
Route{
"Playback",
"GET",
"/playback/{instrument}/{start_timestamp}/",
"/playback/{instrument}/{timestamp}/",
hndlrs.CustomHandler{H: hndlrs.StreamPlayback},
},
}
Loading

0 comments on commit 6a9e897

Please sign in to comment.