Skip to content

Commit

Permalink
Merge pull request #361 from streamdal/blinktag/relay_fixes
Browse files Browse the repository at this point in the history
Relay Fixes
  • Loading branch information
blinktag committed Aug 2, 2023
2 parents f15555b + 806449d commit 90ee8c8
Show file tree
Hide file tree
Showing 13 changed files with 194 additions and 43 deletions.
2 changes: 2 additions & 0 deletions actions/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ type IActions interface {
StopTunnel(ctx context.Context, tunnelID string) (*types.Tunnel, error)
UpdateTunnel(ctx context.Context, tunnelID string, tunnelOpts *opts.TunnelOptions) (*types.Tunnel, error)
DeleteTunnel(ctx context.Context, tunnelID string) error

UpdateConnection(ctx context.Context, connectionID string, connOpts *opts.ConnectionOptions) (*types.Connection, error)
}

func New(cfg *Config) (IActions, error) {
Expand Down
83 changes: 83 additions & 0 deletions actions/actionsfakes/fake_iactions.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

46 changes: 46 additions & 0 deletions actions/connection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package actions

import (
"context"

"github.com/batchcorp/plumber-schemas/build/go/protos/opts"

"github.com/batchcorp/plumber/server/types"
)

func (a *Actions) UpdateConnection(_ context.Context, connectionID string, connOpts *opts.ConnectionOptions) (*types.Connection, error) {

conn := &types.Connection{Connection: connOpts}

// Update connection in persistent config
a.cfg.PersistentConfig.SetConnection(connectionID, conn)
_ = a.cfg.PersistentConfig.Save()

// Starting/stopping needs to lock this mutex, so copy it for access
a.cfg.PersistentConfig.RelaysMutex.RLock()
relays := make(map[string]*types.Relay)
for k, v := range a.cfg.PersistentConfig.Relays {
relays[k] = v
}
a.cfg.PersistentConfig.RelaysMutex.RUnlock()

// Restart all relays that use this connection and are active
// Inactive relays will pick up the new connection details whenever they get resumed
for _, relay := range relays {
if relay.Options.ConnectionId == connectionID && relay.Active {
// Don't use the request context, use a fresh one
if _, err := a.StopRelay(context.Background(), relay.Options.XRelayId); err != nil {
a.log.Errorf("unable to stop relay '%s': %s", relay.Options.XRelayId, err)
continue
}

// Don't use the request context, use a fresh one
if _, err := a.ResumeRelay(context.Background(), relay.Options.XRelayId); err != nil {
a.log.Errorf("unable to resume relay '%s': %s", relay.Options.XRelayId, err)
continue
}
}
}

return conn, nil
}
46 changes: 35 additions & 11 deletions actions/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ func (a *Actions) StopRelay(ctx context.Context, relayID string) (*types.Relay,

relay.Active = false
relay.Options.XActive = false
relay.CancelCtx = nil
relay.CancelFunc = nil

// Update persistent storage
a.cfg.PersistentConfig.SetRelay(relay.Id, relay)
Expand All @@ -104,6 +106,22 @@ func (a *Actions) ResumeRelay(ctx context.Context, relayID string) (*types.Relay
return nil, validate.ErrRelayAlreadyActive
}

conn := a.cfg.PersistentConfig.GetConnection(relay.Options.ConnectionId)
if conn == nil {
return nil, validate.ErrConnectionNotFound
}

// Try to create a backend from given connection options
be, err := backends.New(conn.Connection)
if err != nil {
return nil, errors.Wrap(err, "unable to create backend")
}

relay.Backend = be

shutdownCtx, shutdownFunc := context.WithCancel(context.Background())
relay.CancelFunc = shutdownFunc
relay.CancelCtx = shutdownCtx
if err := relay.StartRelay(time.Millisecond * 100); err != nil {
return nil, errors.Wrap(err, "unable to start relay")
}
Expand Down Expand Up @@ -168,21 +186,13 @@ func (a *Actions) UpdateRelay(ctx context.Context, relayID string, relayOpts *op
time.Sleep(time.Second)

prometheus.DecrPromGauge(prometheus.PlumberRelayWorkers)
}

// Get stored connection information
conn := a.cfg.PersistentConfig.GetConnection(relayOpts.ConnectionId)
if conn == nil {
return nil, validate.ErrConnectionNotFound
}
relay.CancelCtx = nil
relay.CancelFunc = nil
_ = relay.Backend.Close(context.Background())

// Try to create a backend from given connection options
be, err := backends.New(conn.Connection)
if err != nil {
return nil, errors.Wrap(err, "unable to create backend")
}

relay.Backend = be
relay.Options = relayOpts

// New contexts
Expand All @@ -191,6 +201,20 @@ func (a *Actions) UpdateRelay(ctx context.Context, relayID string, relayOpts *op
relay.CancelFunc = cancelFunc

if relayOpts.XActive {
// Get stored connection information
conn := a.cfg.PersistentConfig.GetConnection(relayOpts.ConnectionId)
if conn == nil {
return nil, validate.ErrConnectionNotFound
}

// Try to create a backend from given connection options
be, err := backends.New(conn.Connection)
if err != nil {
return nil, errors.Wrap(err, "unable to create backend")
}

relay.Backend = be

if err := relay.StartRelay(5 * time.Second); err != nil {
relay.Options.XActive = false
return nil, errors.Wrap(err, "unable to start relay")
Expand Down
2 changes: 1 addition & 1 deletion actions/tunnel.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (a *Actions) ResumeTunnel(ctx context.Context, tunnelID string) (*types.Tun
return d, nil
}

func (a Actions) StopTunnel(ctx context.Context, tunnelID string) (*types.Tunnel, error) {
func (a *Actions) StopTunnel(ctx context.Context, tunnelID string) (*types.Tunnel, error) {
d := a.cfg.PersistentConfig.GetTunnel(tunnelID)
if d == nil {
return nil, errors.New("Tunnel replay does not exist")
Expand Down
4 changes: 4 additions & 0 deletions backends/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,10 @@ func newDialer(connArgs *args.KafkaConn) (*skafka.Dialer, error) {
Timeout: time.Duration(connArgs.TimeoutSeconds) * time.Second,
}

if connArgs.UseTls {
dialer.TLS = &tls.Config{}
}

if connArgs.TlsSkipVerify {
dialer.TLS = &tls.Config{
InsecureSkipVerify: true,
Expand Down
6 changes: 4 additions & 2 deletions backends/kafka/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,14 @@ func (k *Kafka) Relay(ctx context.Context, relayOpts *opts.RelayOptions, relayCh

defer reader.Close()

llog := k.log.WithField("relay-id", relayOpts.XRelayId)

for {
msg, err := reader.ReadMessage(ctx)
if err != nil {
// Shutdown cancelled, exit so we don't spam logs with context cancelled errors
if err == context.Canceled {
k.log.Debug("Received shutdown signal, exiting relayer")
llog.Debug("Received shutdown signal, exiting relayer")
break
}

Expand All @@ -48,7 +50,7 @@ func (k *Kafka) Relay(ctx context.Context, relayOpts *opts.RelayOptions, relayCh
prometheus.IncrPromCounter("plumber_read_errors", 1)

wrappedErr := fmt.Errorf("unable to read kafka message: %s; retrying in %s", err, RetryReadInterval)
util.WriteError(k.log, errorCh, wrappedErr)
util.WriteError(llog, errorCh, wrappedErr)

time.Sleep(RetryReadInterval)

Expand Down
17 changes: 6 additions & 11 deletions bus/broadcast_consumer_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,25 +30,20 @@ func (b *Bus) doCreateConnection(_ context.Context, msg *Message) error {
return nil
}

func (b *Bus) doUpdateConnection(_ context.Context, msg *Message) error {
b.log.Debugf("running doCreateConnection handler for msg emitted by %s", msg.EmittedBy)
func (b *Bus) doUpdateConnection(ctx context.Context, msg *Message) error {
b.log.Debugf("running doUpdateonnection handler for msg emitted by %s", msg.EmittedBy)

connOpts := &opts.ConnectionOptions{}
if err := proto.Unmarshal(msg.Data, connOpts); err != nil {
return errors.Wrap(err, "unable to unmarshal message into opts.ConnectionOptions")
}

// Update connection in in-memory map
b.config.PersistentConfig.SetConnection(connOpts.XId, &types.Connection{
Connection: connOpts,
})
if _, err := b.config.Actions.UpdateConnection(ctx, connOpts.XId, connOpts); err != nil {
return errors.Wrap(err, "unable to update connection")
}

b.log.Debugf("updated connection '%s'", connOpts.Name)

// TODO: some way to signal reads/relays to restart? How will GRPC streams handle this?

// TODO: Some more work here

return nil
}

Expand All @@ -74,7 +69,7 @@ func (b *Bus) doDeleteConnection(ctx context.Context, msg *Message) error {
}
}

b.log.Debugf("running doCreateConnection handler for msg emitted by %s", msg.EmittedBy)
b.log.Debugf("running doDeleteConnection handler for msg emitted by %s", msg.EmittedBy)

connOpts := &opts.ConnectionOptions{}
if err := proto.Unmarshal(msg.Data, connOpts); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion bus/broadcast_consumer_relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (b *Bus) doResumeRelay(ctx context.Context, msg *Message) error {
return fmt.Errorf("unable to resume relay '%s': %s", relayOptions.XRelayId, err)
}

b.log.Infof("stopped relay '%s' (from broadcast msg)", relayOptions.XRelayId)
b.log.Infof("resumed relay '%s' (from broadcast msg)", relayOptions.XRelayId)

return nil
}
Expand Down
12 changes: 5 additions & 7 deletions server/connections_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,15 +154,13 @@ func (s *Server) UpdateConnection(ctx context.Context, req *protos.UpdateConnect
return nil, CustomError(common.Code_INVALID_ARGUMENT, err.Error())
}

// Re-assign connection options so we can update in-mem + etcd
conn.Connection = req.Options

// Update conf
s.PersistentConfig.SetConnection(conn.Connection.XId, conn)
s.PersistentConfig.Save()
if _, err := s.Actions.UpdateConnection(ctx, req.ConnectionId, req.Options); err != nil {
return nil, CustomError(common.Code_INTERNAL, fmt.Sprintf("unable to update connection: %s", err))
}

//Publish UpdateConnection event
if err := s.Bus.PublishUpdateConnection(ctx, conn.Connection); err != nil {
req.Options.XId = req.ConnectionId
if err := s.Bus.PublishUpdateConnection(context.Background(), req.Options); err != nil {
s.Log.Error(err)
}

Expand Down
8 changes: 7 additions & 1 deletion server/connections_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/batchcorp/plumber-schemas/build/go/protos/common"
"github.com/batchcorp/plumber-schemas/build/go/protos/opts"

"github.com/batchcorp/plumber/actions"
"github.com/batchcorp/plumber/bus/busfakes"
"github.com/batchcorp/plumber/config"
stypes "github.com/batchcorp/plumber/server/types"
Expand Down Expand Up @@ -169,6 +170,10 @@ var _ = Describe("Connection", func() {
fakeBus := &busfakes.FakeIBus{}
p.Bus = fakeBus

a, err := actions.New(&actions.Config{PersistentConfig: p.PersistentConfig})
Expect(err).ToNot(HaveOccurred())
p.Actions = a

conn := &opts.ConnectionOptions{
XId: connID,
Name: "testing",
Expand All @@ -189,11 +194,12 @@ var _ = Describe("Connection", func() {
}},
}

_, err := p.UpdateConnection(context.Background(), &protos.UpdateConnectionRequest{
_, err = p.UpdateConnection(context.Background(), &protos.UpdateConnectionRequest{
Auth: &common.Auth{Token: "streamdal"},
ConnectionId: connID,
Options: newConn,
})
Expect(err).ToNot(HaveOccurred())

updateConn := p.PersistentConfig.GetConnection(connID)

Expand Down
Loading

0 comments on commit 90ee8c8

Please sign in to comment.