Skip to content

Commit

Permalink
Merge pull request #24 from kvukelic/env_relocation_2022
Browse files Browse the repository at this point in the history
Changes in response to UOF production environment relocation
  • Loading branch information
gljubojevic authored Sep 29, 2022
2 parents d9778c9 + df5e894 commit 21bffaa
Show file tree
Hide file tree
Showing 13 changed files with 105 additions and 20 deletions.
18 changes: 16 additions & 2 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ import (
)

const (
stagingServer = "stgapi.betradar.com"
productionServer = "api.betradar.com"
stagingServer = "stgapi.betradar.com"
productionServer = "api.betradar.com"
productionServerGlobal = "global.api.betradar.com"
)

var RequestTimeout = 32 * time.Second
Expand All @@ -37,6 +38,8 @@ func Dial(ctx context.Context, env uof.Environment, token string) (*API, error)
return Staging(ctx, token)
case uof.Production:
return Production(ctx, token)
case uof.ProductionGlobal:
return ProductionGlobal(ctx, token)
default:
return nil, uof.Notice("queue dial", fmt.Errorf("unknown environment %d", env))
}
Expand Down Expand Up @@ -64,6 +67,17 @@ func Production(exitSig context.Context, token string) (*API, error) {
return a, a.Ping()
}

// Production connects to the production system
func ProductionGlobal(exitSig context.Context, token string) (*API, error) {
a := &API{
server: productionServerGlobal,
token: token,
exitSig: exitSig,
client: client(),
}
return a, a.Ping()
}

func client() *retryablehttp.Client {
c := retryablehttp.NewClient()
c.Logger = nil
Expand Down
2 changes: 1 addition & 1 deletion cmd/client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func logMessages(in <-chan *uof.Message) error {
func logMessage(m *uof.Message) {
switch m.Type {
case uof.MessageTypeConnection:
fmt.Printf("%-25s status: %s\n", m.Type, m.Connection.Status)
fmt.Printf("%-25s status: %s, server: %s, local: %s, network: %s, tls: %s\n", m.Type, m.Connection.Status, m.Connection.ServerName, m.Connection.LocalAddr, m.Connection.Network, m.Connection.TLSVersionToString())
case uof.MessageTypeFixture:
fmt.Printf("%-25s lang: %s, urn: %s raw: %d\n", m.Type, m.Lang, m.Fixture.URN, len(m.Raw))
case uof.MessageTypeMarkets:
Expand Down
1 change: 1 addition & 0 deletions enum.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,4 +489,5 @@ const (
Production Environment = iota
Staging
Replay
ProductionGlobal
)
24 changes: 22 additions & 2 deletions internal.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,28 @@
package uof

import "crypto/tls"

type Connection struct {
Status ConnectionStatus `json:"status"`
Timestamp int `json:"timestamp,omitempty"`
Status ConnectionStatus `json:"status"`
Timestamp int `json:"timestamp,omitempty"`
ServerName string `json:"servername,omitempty"`
LocalAddr string `json:"localaddr,omitempty"`
Network string `json:"network,omitempty"`
TLSVersion uint16 `json:"tlsversion,omitempty"`
}

func (c Connection) TLSVersionToString() string {
switch c.TLSVersion {
case tls.VersionTLS10:
return "1.0"
case tls.VersionTLS11:
return "1.1"
case tls.VersionTLS12:
return "1.2"
case tls.VersionTLS13:
return "1.3"
}
return "unknown"
}

type ConnectionStatus int8
Expand Down
23 changes: 22 additions & 1 deletion message.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ func NewPlayerMessage(lang Lang, player *Player, requestedAt int) *Message {
}
}

func NewConnnectionMessage(status ConnectionStatus) *Message {
func NewSimpleConnnectionMessage(status ConnectionStatus) *Message {
ts := uniqTimestamp()
return &Message{
Header: Header{
Expand All @@ -273,6 +273,27 @@ func NewConnnectionMessage(status ConnectionStatus) *Message {
}
}

func NewDetailedConnnectionMessage(status ConnectionStatus, serverName, localAddr, network string, tlsVersion uint16) *Message {
ts := uniqTimestamp()
return &Message{
Header: Header{
Type: MessageTypeConnection,
Scope: MessageScopeSystem,
ReceivedAt: ts,
},
Body: Body{
Connection: &Connection{
Status: status,
Timestamp: ts,
ServerName: serverName,
LocalAddr: localAddr,
Network: network,
TLSVersion: tlsVersion,
},
},
}
}

func NewProducersChangeMessage(pc ProducersChange) *Message {
return &Message{
Header: Header{
Expand Down
2 changes: 1 addition & 1 deletion message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ func TestUIDWithLang(t *testing.T) {
}

func TestNewMessage(t *testing.T) {
m := NewConnnectionMessage(ConnectionStatusUp)
m := NewSimpleConnnectionMessage(ConnectionStatusUp)
assert.True(t, m.Is(MessageTypeConnection))

m = NewPlayerMessage(LangEN, nil, 0)
Expand Down
2 changes: 1 addition & 1 deletion pipe/fixture_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestFixturePipe(t *testing.T) {
out, _ := f(in)

// this type of message is passing through
m := uof.NewConnnectionMessage(uof.ConnectionStatusUp)
m := uof.NewSimpleConnnectionMessage(uof.ConnectionStatusUp)
in <- m
om := <-out
assert.Equal(t, m, om)
Expand Down
2 changes: 1 addition & 1 deletion pipe/market_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestMarketsPipe(t *testing.T) {
out, _ := ms(in)

// this type of message is passing through
m := uof.NewConnnectionMessage(uof.ConnectionStatusUp)
m := uof.NewSimpleConnnectionMessage(uof.ConnectionStatusUp)
in <- m
om := <-out
assert.Equal(t, m, om)
Expand Down
2 changes: 1 addition & 1 deletion pipe/player_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestPlayerPipe(t *testing.T) {
out, _ := p(in)

// this type of message is passing through
m := uof.NewConnnectionMessage(uof.ConnectionStatusUp)
m := uof.NewSimpleConnnectionMessage(uof.ConnectionStatusUp)
in <- m
om := <-out
assert.Equal(t, m, om)
Expand Down
2 changes: 1 addition & 1 deletion pipe/recovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func TestRecoveryRequests(t *testing.T) {
go r.loop(in, out, errc)

// 1. connection status triggers recovery requests
in <- uof.NewConnnectionMessage(uof.ConnectionStatusUp)
in <- uof.NewSimpleConnnectionMessage(uof.ConnectionStatusUp)
recoveryRequestPrematch := <-m.calls
recoveryRequestLive := <-m.calls
if recoveryRequestPrematch.producer == uof.ProducerLiveOdds {
Expand Down
32 changes: 27 additions & 5 deletions queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@ import (
)

const (
replayServer = "replaymq.betradar.com:5671"
stagingServer = "stgmq.betradar.com:5671"
productionServer = "mq.betradar.com:5671"
queueExchange = "unifiedfeed"
bindingKeyAll = "#"
replayServer = "replaymq.betradar.com:5671"
stagingServer = "stgmq.betradar.com:5671"
productionServer = "mq.betradar.com:5671"
productionServerGlobal = "global.mq.betradar.com:5671"
queueExchange = "unifiedfeed"
bindingKeyAll = "#"
)

// Dial connects to the queue chosen by environment
Expand All @@ -31,6 +32,8 @@ func Dial(ctx context.Context, env uof.Environment, bookmakerID, token string) (
return DialStaging(ctx, bookmakerID, token)
case uof.Production:
return DialProduction(ctx, bookmakerID, token)
case uof.ProductionGlobal:
return DialProductionGlobal(ctx, bookmakerID, token)
default:
return nil, uof.Notice("queue dial", fmt.Errorf("unknown environment %d", env))
}
Expand All @@ -41,6 +44,11 @@ func DialProduction(ctx context.Context, bookmakerID, token string) (*Connection
return dial(ctx, productionServer, bookmakerID, token)
}

// Dial connects to the production queue
func DialProductionGlobal(ctx context.Context, bookmakerID, token string) (*Connection, error) {
return dial(ctx, productionServerGlobal, bookmakerID, token)
}

// DialStaging connects to the staging queue
func DialStaging(ctx context.Context, bookmakerID, token string) (*Connection, error) {
return dial(ctx, stagingServer, bookmakerID, token)
Expand All @@ -55,6 +63,14 @@ type Connection struct {
msgs <-chan amqp.Delivery
errs <-chan *amqp.Error
reDial func() (*Connection, error)
info ConnectionInfo
}

type ConnectionInfo struct {
server string
local string
network string
tlsVersion uint16
}

func (c *Connection) Listen() (<-chan *uof.Message, <-chan error) {
Expand Down Expand Up @@ -154,6 +170,12 @@ func dial(ctx context.Context, server, bookmakerID, token string) (*Connection,
reDial: func() (*Connection, error) {
return dial(ctx, server, bookmakerID, token)
},
info: ConnectionInfo{
server: server,
local: conn.LocalAddr().String(),
network: conn.LocalAddr().Network(),
tlsVersion: conn.ConnectionState().Version,
},
}

go func() {
Expand Down
6 changes: 4 additions & 2 deletions queue/reconnect.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,14 @@ func WithReconnect(ctx context.Context, conn *Connection) func() (<-chan *uof.Me
defer close(out)
defer close(errc)
for {
out <- uof.NewConnnectionMessage(uof.ConnectionStatusUp) // signal connect
// signal connect
out <- uof.NewDetailedConnnectionMessage(uof.ConnectionStatusUp, conn.info.server, conn.info.local, conn.info.network, conn.info.tlsVersion)
conn.drain(out, errc)
if done() {
return
}
out <- uof.NewConnnectionMessage(uof.ConnectionStatusDown) // signal connection lost
// signal connection lost
out <- uof.NewSimpleConnnectionMessage(uof.ConnectionStatusDown)
if err := withBackoff(ctx, reconnect, maxInterval, maxElapsedTime); err != nil {
return
}
Expand Down
9 changes: 7 additions & 2 deletions sdk/sdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ type Config struct {
Stages []pipe.InnerStage
Replay func(*api.ReplayAPI) error
Env uof.Environment
Staging bool
Languages []uof.Lang
ErrorListener ErrorListenerFunc
}
Expand Down Expand Up @@ -128,11 +127,17 @@ func Languages(langs []uof.Lang) Option {
}
}

// Global forces use of global production environment.
func Global() Option {
return func(c *Config) {
c.Env = uof.ProductionGlobal
}
}

// Staging forces use of staging environment instead of production.
func Staging() Option {
return func(c *Config) {
c.Env = uof.Staging
c.Staging = true
}
}

Expand Down

0 comments on commit 21bffaa

Please sign in to comment.