Skip to content

Commit

Permalink
Adding the new common struct to be sent at connection time to confirm…
Browse files Browse the repository at this point in the history
… the version of the recorder and forwarder
  • Loading branch information
COMTOP1 committed Dec 23, 2023
1 parent 93f0b64 commit 018ffcf
Show file tree
Hide file tree
Showing 11 changed files with 86 additions and 46 deletions.
6 changes: 3 additions & 3 deletions forwarder/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ require (
github.com/gorilla/websocket v1.5.1
github.com/joho/godotenv v1.5.1
github.com/kelseyhightower/envconfig v1.4.0
github.com/labstack/echo/v4 v4.11.3
github.com/labstack/echo/v4 v4.11.4
github.com/mitchellh/mapstructure v1.5.0
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/ystv/streamer/common v0.0.0-20231219203825-78c9332344d1
github.com/ystv/streamer/common v0.0.0-20231223000419-998018556aed
)

require (
github.com/labstack/gommon v0.4.1 // indirect
github.com/labstack/gommon v0.4.2 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
Expand Down
12 changes: 6 additions & 6 deletions forwarder/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
github.com/kelseyhightower/envconfig v1.4.0 h1:Im6hONhd3pLkfDFsbRgu68RDNkGF1r3dvMUtDTo2cv8=
github.com/kelseyhightower/envconfig v1.4.0/go.mod h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg=
github.com/labstack/echo/v4 v4.11.3 h1:Upyu3olaqSHkCjs1EJJwQ3WId8b8b1hxbogyommKktM=
github.com/labstack/echo/v4 v4.11.3/go.mod h1:UcGuQ8V6ZNRmSweBIJkPvGfwCMIlFmiqrPqiEBfPYws=
github.com/labstack/gommon v0.4.1 h1:gqEff0p/hTENGMABzezPoPSRtIh1Cvw0ueMOe0/dfOk=
github.com/labstack/gommon v0.4.1/go.mod h1:TyTrpPqxR5KMk8LKVtLmfMjeQ5FEkBYdxLYPw/WfrOM=
github.com/labstack/echo/v4 v4.11.4 h1:vDZmA+qNeh1pd/cCkEicDMrjtrnMGQ1QFI9gWN1zGq8=
github.com/labstack/echo/v4 v4.11.4/go.mod h1:noh7EvLwqDsmh/X/HWKPUl1AjzJrhyptRyEbQJfxen8=
github.com/labstack/gommon v0.4.2 h1:F8qTUNXgG1+6WQmqoUWnz8WiEU60mXVVw0P4ht1WRA0=
github.com/labstack/gommon v0.4.2/go.mod h1:QlUFxVM+SNXhDL/Z7YhocGIBYOiwB0mXm1+1bAPHPyU=
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
Expand All @@ -27,8 +27,8 @@ github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6Kllzaw
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/valyala/fasttemplate v1.2.2 h1:lxLXG0uE3Qnshl9QyaK6XJxMXlQZELvChBOCmQD0Loo=
github.com/valyala/fasttemplate v1.2.2/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+qRAEEKiv+SiQ=
github.com/ystv/streamer/common v0.0.0-20231219203825-78c9332344d1 h1:8xS71VPL2RfPkffUZ6zJtLXiUqGDxUqt1SHrcYsmJBk=
github.com/ystv/streamer/common v0.0.0-20231219203825-78c9332344d1/go.mod h1:zTvpkqiPPvCmjvQn9qYA7gxUtvj/gDHz0keWnm32fB8=
github.com/ystv/streamer/common v0.0.0-20231223000419-998018556aed h1:h1iLeGH/Xu/1x7B/8umtT8eVSu0TzAaIwqCG/ZDqXe0=
github.com/ystv/streamer/common v0.0.0-20231223000419-998018556aed/go.mod h1:zTvpkqiPPvCmjvQn9qYA7gxUtvj/gDHz0keWnm32fB8=
golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k=
golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4=
golang.org/x/net v0.19.0 h1:zTwKpTd2XuCqf8huc7Fo2iSy+4RHPd10s4KzeTnVr1c=
Expand Down
27 changes: 21 additions & 6 deletions forwarder/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import (

commonTransporter "github.com/ystv/streamer/common/transporter"
"github.com/ystv/streamer/common/transporter/server"
specialTransporter "github.com/ystv/streamer/common/transporter/special"
"github.com/ystv/streamer/common/wsMessages"
specialWSMessage "github.com/ystv/streamer/common/wsMessages/special"
)

type (
Expand Down Expand Up @@ -161,9 +163,22 @@ func (v *Views) run(config Config, interrupt chan os.Signal) {
close(errorChannel)
}
}()
err = c.WriteMessage(websocket.TextMessage, []byte(server.Forwarder))
response := specialTransporter.InitiationTransporter{
Server: server.Forwarder,
Version: Version,
}

var resBytes []byte
resBytes, err = json.Marshal(response)
if err != nil {
log.Printf("failed to marshal initial: %+v", err)
close(errorChannel)
return
}

err = c.WriteMessage(websocket.TextMessage, resBytes)
if err != nil {
log.Printf("failed to write name: %+v", err)
log.Printf("failed to write name and version: %+v", err)
close(errorChannel)
return
}
Expand All @@ -177,12 +192,12 @@ func (v *Views) run(config Config, interrupt chan os.Signal) {
return
}

if string(msg) != wsMessages.Acknowledged.String() {
if string(msg) != specialWSMessage.Acknowledged.String() {
log.Printf("failed to read acknowledgement: %s", string(msg))
close(errorChannel)
return
}
log.Println(wsMessages.Acknowledged)
log.Println(specialWSMessage.Acknowledged)
log.Printf("connected to %s://%s", u.Scheme, u.Host)

for {
Expand All @@ -194,9 +209,9 @@ func (v *Views) run(config Config, interrupt chan os.Signal) {
close(errorChannel)
return
}
if msgType == websocket.TextMessage && string(message) == wsMessages.Ping.String() {
if msgType == websocket.TextMessage && string(message) == specialWSMessage.Ping.String() {
pinging.Store(true)
err = c.WriteMessage(websocket.TextMessage, []byte(wsMessages.Pong))
err = c.WriteMessage(websocket.TextMessage, []byte(specialWSMessage.Pong))
if err != nil {
log.Printf("failed to write pong: %+v", err)
close(errorChannel)
Expand Down
2 changes: 1 addition & 1 deletion recorder/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ require (
github.com/labstack/echo/v4 v4.11.4
github.com/mitchellh/mapstructure v1.5.0
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/ystv/streamer/common v0.0.0-20231220005158-110620c6bc17
github.com/ystv/streamer/common v0.0.0-20231223000419-998018556aed
)

require (
Expand Down
4 changes: 2 additions & 2 deletions recorder/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6Kllzaw
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/valyala/fasttemplate v1.2.2 h1:lxLXG0uE3Qnshl9QyaK6XJxMXlQZELvChBOCmQD0Loo=
github.com/valyala/fasttemplate v1.2.2/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+qRAEEKiv+SiQ=
github.com/ystv/streamer/common v0.0.0-20231220005158-110620c6bc17 h1:5ZFfQcBlqFLaae4tcgQipcRgJCUxJNjIRNIIYl7Zamc=
github.com/ystv/streamer/common v0.0.0-20231220005158-110620c6bc17/go.mod h1:zTvpkqiPPvCmjvQn9qYA7gxUtvj/gDHz0keWnm32fB8=
github.com/ystv/streamer/common v0.0.0-20231223000419-998018556aed h1:h1iLeGH/Xu/1x7B/8umtT8eVSu0TzAaIwqCG/ZDqXe0=
github.com/ystv/streamer/common v0.0.0-20231223000419-998018556aed/go.mod h1:zTvpkqiPPvCmjvQn9qYA7gxUtvj/gDHz0keWnm32fB8=
golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k=
golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4=
golang.org/x/net v0.19.0 h1:zTwKpTd2XuCqf8huc7Fo2iSy+4RHPd10s4KzeTnVr1c=
Expand Down
18 changes: 16 additions & 2 deletions recorder/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

commonTransporter "github.com/ystv/streamer/common/transporter"
"github.com/ystv/streamer/common/transporter/server"
specialTransporter "github.com/ystv/streamer/common/transporter/special"
"github.com/ystv/streamer/common/wsMessages"
specialWSMessage "github.com/ystv/streamer/common/wsMessages/special"
)
Expand Down Expand Up @@ -162,9 +163,22 @@ func (v *Views) run(config Config, interrupt chan os.Signal) {
close(errorChannel)
}
}()
err = c.WriteMessage(websocket.TextMessage, []byte(server.Recorder))
response := specialTransporter.InitiationTransporter{
Server: server.Recorder,
Version: Version,
}

var resBytes []byte
resBytes, err = json.Marshal(response)
if err != nil {
log.Printf("failed to marshal initial: %+v", err)
close(errorChannel)
return
}

err = c.WriteMessage(websocket.TextMessage, resBytes)
if err != nil {
log.Printf("failed to write name: %+v", err)
log.Printf("failed to write name and version: %+v", err)
close(errorChannel)
return
}
Expand Down
2 changes: 1 addition & 1 deletion server/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
github.com/labstack/gommon v0.4.2
github.com/mitchellh/mapstructure v1.5.0
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/ystv/streamer/common v0.0.0-20231220005158-110620c6bc17
github.com/ystv/streamer/common v0.0.0-20231223000419-998018556aed
golang.org/x/crypto v0.17.0
google.golang.org/protobuf v1.32.0
)
Expand Down
8 changes: 2 additions & 6 deletions server/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/golang-jwt/jwt v3.2.2+incompatible h1:IfV12K8xAKAnZqdXVzCZ+TOjboZ2keLg81eXfW3O+oY=
github.com/golang-jwt/jwt v3.2.2+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzqecmYZeUEB8OUGHkxJ+I=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY=
Expand Down Expand Up @@ -32,8 +31,8 @@ github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6Kllzaw
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/valyala/fasttemplate v1.2.2 h1:lxLXG0uE3Qnshl9QyaK6XJxMXlQZELvChBOCmQD0Loo=
github.com/valyala/fasttemplate v1.2.2/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+qRAEEKiv+SiQ=
github.com/ystv/streamer/common v0.0.0-20231220005158-110620c6bc17 h1:5ZFfQcBlqFLaae4tcgQipcRgJCUxJNjIRNIIYl7Zamc=
github.com/ystv/streamer/common v0.0.0-20231220005158-110620c6bc17/go.mod h1:zTvpkqiPPvCmjvQn9qYA7gxUtvj/gDHz0keWnm32fB8=
github.com/ystv/streamer/common v0.0.0-20231223000419-998018556aed h1:h1iLeGH/Xu/1x7B/8umtT8eVSu0TzAaIwqCG/ZDqXe0=
github.com/ystv/streamer/common v0.0.0-20231223000419-998018556aed/go.mod h1:zTvpkqiPPvCmjvQn9qYA7gxUtvj/gDHz0keWnm32fB8=
golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k=
golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4=
golang.org/x/net v0.19.0 h1:zTwKpTd2XuCqf8huc7Fo2iSy+4RHPd10s4KzeTnVr1c=
Expand All @@ -50,9 +49,6 @@ golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk=
golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8=
google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
google.golang.org/protobuf v1.32.0 h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7I=
google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
Expand Down
2 changes: 2 additions & 0 deletions server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ func main() {
log.Fatal("Failed to create store: ", err)
}

config.Version = Version

r := &Router{
config: config,
router: echo.New(),
Expand Down
1 change: 1 addition & 0 deletions server/views/views.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type (
// Config the global web-auth configuration
Config struct {
Verbose bool
Version string
Forwarder string `envconfig:"FORWARDER"`
Recorder string `envconfig:"RECORDER"`
ForwarderUsername string `envconfig:"FORWARDER_USERNAME"`
Expand Down
50 changes: 31 additions & 19 deletions server/views/websocket.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package views

import (
"encoding/json"
"fmt"
"log"
"time"
Expand All @@ -10,6 +11,7 @@ import (
"github.com/patrickmn/go-cache"

"github.com/ystv/streamer/common/transporter/server"
specialTransporter "github.com/ystv/streamer/common/transporter/special"
specialWSMessage "github.com/ystv/streamer/common/wsMessages/special"
)

Expand All @@ -31,27 +33,37 @@ func (v *Views) Websocket(c echo.Context) error {
return nil
}

name := string(msg)
var responseTransporter specialTransporter.InitiationTransporter

if name != server.Forwarder.String() && name != server.Recorder.String() {
log.Printf("failed connecting %s, invalid name", name)
err = json.Unmarshal(msg, &responseTransporter)
if err != nil {
log.Printf("failed to unmarshal response: %+v", err)
return nil
}

if responseTransporter.Server != server.Forwarder && responseTransporter.Server != server.Recorder {
log.Printf("failed connecting %s, invalid name", responseTransporter.Server)
_ = ws.Close()
return nil
}

log.Println("connecting", name)
log.Println("connecting", responseTransporter.Server)

if responseTransporter.Version != v.conf.Version {
log.Printf("%s has a version mismatch, server version: %s, %s version: %s", responseTransporter.Server, v.conf.Version, responseTransporter.Server, responseTransporter.Version)
}

clientChannel := make(chan []byte)
internalChannel := make(chan []byte)

err = v.cache.Add(name, clientChannel, cache.NoExpiration)
err = v.cache.Add(responseTransporter.Server.String(), clientChannel, cache.NoExpiration)
if err != nil {
c.Logger().Error(err)
_ = ws.Close()
return nil
}

err = v.cache.Add(name+internalChannelNameAppend, internalChannel, cache.NoExpiration)
err = v.cache.Add(responseTransporter.Server.String()+internalChannelNameAppend, internalChannel, cache.NoExpiration)
if err != nil {
c.Logger().Error(err)
_ = ws.Close()
Expand All @@ -65,7 +77,7 @@ func (v *Views) Websocket(c echo.Context) error {
return nil
}

log.Println("connected", name)
log.Println("connected", responseTransporter.Server)

loop := true

Expand All @@ -83,39 +95,39 @@ func (v *Views) Websocket(c echo.Context) error {
c.Logger().Error(err)
close(internalChannel)
close(clientChannel)
v.cache.Delete(name)
v.cache.Delete(name + internalChannelNameAppend)
v.cache.Delete(responseTransporter.Server.String())
v.cache.Delete(responseTransporter.Server.String() + internalChannelNameAppend)
loop = false
}

_, msg, err = ws.ReadMessage()
if err != nil {
c.Logger().Error(err)
internalChannel <- []byte(fmt.Sprintf("ERROR: failed to read message from %s: %+v", name, err))
internalChannel <- []byte(fmt.Sprintf("ERROR: failed to read message from %s: %+v", responseTransporter.Server.String(), err))
close(internalChannel)
close(clientChannel)
v.cache.Delete(name)
v.cache.Delete(name + internalChannelNameAppend)
v.cache.Delete(responseTransporter.Server.String())
v.cache.Delete(responseTransporter.Server.String() + internalChannelNameAppend)
loop = false
}
internalChannel <- msg
log.Printf("Message received from \"%s\": %s", name, msg)
log.Printf("Message received from \"%s\": %s", responseTransporter.Server, msg)
case <-ticker.C:
err = ws.WriteMessage(websocket.TextMessage, []byte(specialWSMessage.Ping))
if err != nil {
log.Printf("failed to write ping for %s: %+v", name, err)
log.Printf("failed to write ping for %s: %+v", responseTransporter.Server, err)
close(clientChannel)
v.cache.Delete(name)
v.cache.Delete(name + internalChannelNameAppend)
v.cache.Delete(responseTransporter.Server.String())
v.cache.Delete(responseTransporter.Server.String() + internalChannelNameAppend)
loop = false
}
var msgType int
msgType, msg, err = ws.ReadMessage()
if err != nil || msgType != websocket.TextMessage || string(msg) != specialWSMessage.Pong.String() {
log.Printf("failed to read pong for %s: %+v", name, err)
log.Printf("failed to read pong for %s: %+v", responseTransporter.Server, err)
close(clientChannel)
v.cache.Delete(name)
v.cache.Delete(name + internalChannelNameAppend)
v.cache.Delete(responseTransporter.Server.String())
v.cache.Delete(responseTransporter.Server.String() + internalChannelNameAppend)
loop = false
}
}
Expand Down

0 comments on commit 018ffcf

Please sign in to comment.