Skip to content

Commit

Permalink
[#178] Rate in msg/s or fraction of realtime
Browse files Browse the repository at this point in the history
- Specified as GET parameters, rateMessages or rateRealtime
- Instantiate a different MessageLoader for each rate
  • Loading branch information
Lercerss committed Mar 8, 2020
1 parent e3d7558 commit fa28a41
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 49 deletions.
3 changes: 2 additions & 1 deletion app/src/tests/samples/socket_client.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
// eslint-disable-next-line
const WebSocket = require('ws');

const endpoint = `ws://localhost:5050/playback/SPY/1577892050000000000/1.00?delay=2.0`;
const endpoint = `ws://localhost:5050/playback/SPY/1577892050000000000/?delay=2.0&rateMessages=5`;
// const endpoint = `ws://localhost:5050/playback/SPY/1577892050000000000/?delay=2.0&rateRealtime=0.5`;
console.log(`Connecting to ${endpoint}`);
const ws = new WebSocket(endpoint);

Expand Down
10 changes: 6 additions & 4 deletions core/graphelier-service/api/hndlrs/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,12 @@ func (se StatusError) Status() int {
return se.Code
}

type WebSocketError interface {
error
type ParamError struct {
Value string
}

func (p ParamError) Error() string {
return p.Value
}

// Env : A struct that represents the database configuration
Expand All @@ -57,8 +61,6 @@ func (h CustomHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
case Error:
log.Errorf("HTTP %d - %s\n", e.Status(), e)
http.Error(w, e.Error(), e.Status())
case WebSocketError:
log.Errorf("WebSocketError: %s\n", err)
default:
log.Errorf("Error: %s\n", err)
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
Expand Down
139 changes: 96 additions & 43 deletions core/graphelier-service/api/hndlrs/playbackhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,39 +14,66 @@ import (
)

type PlaybackSession struct {
Socket *websocket.Conn
Orderbook *models.Orderbook
Delay uint64
Loader MessageLoader
messages chan []*models.Message
realtime time.Time
running bool
}

type MessageLoader interface {
LoadMessages()
Init(*models.Orderbook, chan []*models.Message)
}

type TimeIntervalLoader struct {
Instrument string
Datastore db.Datastore
Socket *websocket.Conn
Orderbook *models.Orderbook
Speed float64
Delay uint64
Interval uint64
messages chan []*models.Message
currentTimestamp uint64
interval uint64
realtime time.Time
running bool
}

type CountIntervalLoader struct {
Instrument string
Datastore db.Datastore
Count uint64
messages chan []*models.Message
currentPage models.Paginator
}

var upgrader = websocket.Upgrader{}

// StreamPlayback: Initiates a websocket connection and starts streaming order book modifications for playback
// StreamPlayback : Initiates a websocket connection and starts streaming order book modifications for playback
func StreamPlayback(env *Env, w http.ResponseWriter, r *http.Request) error {
delay, err := strconv.ParseFloat(r.URL.Query().Get("delay"), 64)
getParams := r.URL.Query()
delay, err := strconv.ParseFloat(getParams.Get("delay"), 64)
if err != nil {
delay = 0.5
}
delay *= 1e9
params := mux.Vars(r)
instrument := params["instrument"]
startTimestamp, err := strconv.ParseUint(params["start_timestamp"], 10, 64)
if err != nil {
return StatusError{400, err}
}
speed, err := strconv.ParseFloat(params["speed"], 64) // TODO variable rate
if err != nil {
return StatusError{400, err}
var loader MessageLoader
rateMessages, mErr := strconv.ParseUint(getParams.Get("rateMessages"), 10, 64)
rateRealtime, tErr := strconv.ParseFloat(getParams.Get("rateRealtime"), 64)
switch {
case mErr == nil && tErr != nil:
loader = &CountIntervalLoader{Instrument: instrument, Datastore: env.Datastore, Count: rateMessages}
case tErr == nil && mErr != nil:
loader = &TimeIntervalLoader{Instrument: instrument, Datastore: env.Datastore, Interval: uint64(delay / rateRealtime)}
default:
return StatusError{400, ParamError{"One of rateMessages or rateRealtime must be provided"}}
}

session := PlaybackSession{Datastore: env.Datastore, Speed: speed, Delay: uint64(delay * 1e9)}
err = session.LoadOrderBook(instrument, startTimestamp)
session := PlaybackSession{Delay: uint64(delay), Loader: loader}
err = session.LoadOrderBook(env.Datastore, instrument, startTimestamp)
if err != nil {
return StatusError{500, err}
}
Expand All @@ -67,25 +94,24 @@ func StreamPlayback(env *Env, w http.ResponseWriter, r *http.Request) error {
return nil
}

// Start: streams modifications to the client until a close message is received
// Start : streams modifications to the client until a close message is received
func (pb *PlaybackSession) Start() error {
pb.realtime = time.Now()
pb.interval = uint64(float64(pb.Delay) / pb.Speed)
pb.running = true
go pb.handleSocketControl()
return pb.handleStreaming()
}

// generates modifications from messages and sends data to the client at regular intervals
func (pb *PlaybackSession) handleStreaming() error {
go pb.loadMessages()
go pb.Loader.LoadMessages()
for pb.running {
messages, ok := <-pb.messages
if !ok {
log.Debugln("Stopping playback session")
break
}
go pb.loadMessages()
go pb.Loader.LoadMessages()
log.Tracef("Got %d messages\n", len(messages))
modifications := pb.Orderbook.YieldModifications(messages)
log.Tracef("Generated %d modifications\n", len(modifications.Modifications))
Expand Down Expand Up @@ -121,25 +147,7 @@ func (pb *PlaybackSession) handleSocketControl() {
}
}

// reads the next batch of messages from the datastore
func (pb *PlaybackSession) loadMessages() {
// TODO handle different rates
pb.currentTimestamp += pb.interval
log.Debugf("Loading messages for {%d, %d}\n", pb.currentTimestamp, pb.currentTimestamp+pb.interval)
messages, err := pb.Datastore.GetMessagesByTimestampRange(
pb.Orderbook.Instrument,
pb.currentTimestamp,
pb.currentTimestamp+pb.interval,
)
if err != nil {
log.Errorf("Failed to retrieve messages: %s\n", err)
close(pb.messages)
return
}
pb.messages <- messages
}

// InitSocket: Upgrades the http request to a websocket connection
// InitSocket : Upgrades the http request to a websocket connection
func (pb *PlaybackSession) InitSocket(w http.ResponseWriter, r *http.Request) error {
socket, err := upgrader.Upgrade(w, r, nil)
if err != nil {
Expand All @@ -150,25 +158,70 @@ func (pb *PlaybackSession) InitSocket(w http.ResponseWriter, r *http.Request) er
return nil
}

// LoadOrderBook: Establishes the initial state for the playback session
func (pb *PlaybackSession) LoadOrderBook(instrument string, startTimestamp uint64) error {
orderbook, err := pb.Datastore.GetOrderbook(instrument, startTimestamp)
// LoadOrderBook : Establishes the initial state for the playback session
func (pb *PlaybackSession) LoadOrderBook(db db.Datastore, instrument string, startTimestamp uint64) error {
orderbook, err := db.GetOrderbook(instrument, startTimestamp)
if err != nil {
return err
}
messages, err := pb.Datastore.GetMessagesByTimestamp(instrument, startTimestamp)
messages, err := db.GetMessagesByTimestamp(instrument, startTimestamp)
if err != nil {
return err
}
orderbook.ApplyMessagesToOrderbook(messages)
log.Tracef("Loaded initial state for playback at %d\n", orderbook.Timestamp)
pb.Orderbook = orderbook
pb.messages = make(chan []*models.Message, 1)
pb.currentTimestamp = orderbook.Timestamp
pb.Loader.Init(pb.Orderbook, pb.messages)
return nil
}

// Close: Tears down the session's resources
// Close : Tears down the session's resources
func (pb *PlaybackSession) Close() {
pb.Socket.Close()
}

// LoadMessages : Reads the next batch of messages from the datastore, by timestamp range
func (loader *TimeIntervalLoader) LoadMessages() {
loader.currentTimestamp += loader.Interval
log.Debugf("Loading messages for {%d, %d}\n", loader.currentTimestamp, loader.currentTimestamp+loader.Interval)
messages, err := loader.Datastore.GetMessagesByTimestampRange(
loader.Instrument,
loader.currentTimestamp,
loader.currentTimestamp+loader.Interval,
)
if err != nil {
log.Errorf("Failed to retrieve messages: %s\n", err)
close(loader.messages)
return
}
loader.messages <- messages
}

// Init : Initializes the loader's state based on the first snapshot
func (loader *TimeIntervalLoader) Init(orderbook *models.Orderbook, messages chan []*models.Message) {
loader.currentTimestamp = orderbook.Timestamp
loader.messages = messages
}

// LoadMessages : Reads the next batch of messages from the datastore, by number of messages
func (loader *CountIntervalLoader) LoadMessages() {
loader.currentPage.SodOffset += loader.currentPage.NMessages
log.Debugf("Loading messages for {%d, %d}\n", loader.currentPage.SodOffset, loader.currentPage.SodOffset+loader.currentPage.NMessages)
messages, err := loader.Datastore.GetMessagesWithPagination(
loader.Instrument,
&loader.currentPage,
)
if err != nil {
log.Errorf("Failed to retrieve messages: %s\n", err)
close(loader.messages)
return
}
loader.messages <- messages
}

// Init : Initializes the loader's state based on the first snapshot
func (loader *CountIntervalLoader) Init(orderbook *models.Orderbook, messages chan []*models.Message) {
loader.currentPage = models.Paginator{NMessages: int64(loader.Count), SodOffset: int64(orderbook.LastSodOffset)}
loader.messages = messages
}
2 changes: 1 addition & 1 deletion core/graphelier-service/api/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ var routes = Routes{
Route{
"Playback",
"GET",
"/playback/{instrument}/{start_timestamp}/{speed}", // TODO remove speed
"/playback/{instrument}/{start_timestamp}/",
hndlrs.CustomHandler{H: hndlrs.StreamPlayback},
},
}

0 comments on commit fa28a41

Please sign in to comment.