Skip to content

Commit

Permalink
remote unreachable event when message fails to be sent remotely (#172)
Browse files Browse the repository at this point in the history
* remote unreachable event

* fix tests
  • Loading branch information
tprifti authored Nov 20, 2024
1 parent d199384 commit 7ba1326
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 19 deletions.
4 changes: 1 addition & 3 deletions remote/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ type Remote struct {
addr string
engine *actor.Engine
config Config
streamReader *streamReader
streamRouterPID *actor.PID
stopCh chan struct{} // Stop closes this channel to signal the remote to stop listening.
stopWg *sync.WaitGroup
Expand All @@ -57,7 +56,6 @@ func New(addr string, config Config) *Remote {
config: config,
}
r.state.Store(stateInitialized)
r.streamReader = newStreamReader(r)
return r
}

Expand All @@ -81,7 +79,7 @@ func (r *Remote) Start(e *actor.Engine) error {
}
slog.Debug("listening", "addr", r.addr)
mux := drpcmux.New()
err = DRPCRegisterRemote(mux, r.streamReader)
err = DRPCRegisterRemote(mux, newStreamReader(r))
if err != nil {
return fmt.Errorf("failed to register remote: %w", err)
}
Expand Down
31 changes: 31 additions & 0 deletions remote/remote_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,37 @@ func TestWeird(t *testing.T) {
wg.Wait() // wait for the actor to stop.
}

func TestStreamWriterRemoteUnreachableEvent(t *testing.T) {
a, _, err := makeRemoteEngine(getRandomLocalhostAddr())
assert.NoError(t, err)
b, rb, err := makeRemoteEngine(getRandomLocalhostAddr())
assert.NoError(t, err)

wg := sync.WaitGroup{}
wg.Add(1)

a.SpawnFunc(func(c *actor.Context) {
switch c.Message().(type) {
case actor.Started:
c.Engine().Subscribe(c.PID())
case actor.RemoteUnreachableEvent:
wg.Done()
}
}, "listener")

bPID := b.SpawnFunc(func(c *actor.Context) {
switch c.Message().(type) {
case actor.Started:
c.Engine().Subscribe(c.PID())
case *TestMessage:
rb.Stop()
}
}, "listener")

a.Send(bPID, &TestMessage{Data: []byte("test")})
wg.Wait()
}

func makeRemoteEngine(listenAddr string) (*actor.Engine, *Remote, error) {
var e *actor.Engine
r := New(listenAddr, NewConfig())
Expand Down
16 changes: 6 additions & 10 deletions remote/stream_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,6 @@ type streamDeliver struct {
msg any
}

type terminateStream struct {
address string
}

type streamRouter struct {
engine *actor.Engine
// streams is a map of remote address to stream writer pid.
Expand All @@ -41,16 +37,16 @@ func (s *streamRouter) Receive(ctx *actor.Context) {
s.pid = ctx.PID()
case *streamDeliver:
s.deliverStream(msg)
case terminateStream:
case actor.RemoteUnreachableEvent:
s.handleTerminateStream(msg)
}
}

func (s *streamRouter) handleTerminateStream(msg terminateStream) {
streamWriterPID := s.streams[msg.address]
delete(s.streams, msg.address)
slog.Debug("terminating stream",
"remote", msg.address,
func (s *streamRouter) handleTerminateStream(msg actor.RemoteUnreachableEvent) {
streamWriterPID := s.streams[msg.ListenAddr]
delete(s.streams, msg.ListenAddr)
slog.Debug("stream terminated",
"remote", msg.ListenAddr,
"pid", streamWriterPID,
)
}
Expand Down
11 changes: 5 additions & 6 deletions remote/stream_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ func (s *streamWriter) init() {
)
for i := 0; i < maxRetries; i++ {
// Here we try to connect to the remote address.
// Todo: can we make an Event here in case of failure?
switch s.tlsConfig {
case nil:
rawconn, err = net.Dial("tcp", s.writeToAddr)
Expand All @@ -142,10 +141,6 @@ func (s *streamWriter) init() {
// We could not reach the remote after retrying N times. Hence, shutdown the stream writer.
// and notify RemoteUnreachableEvent.
if rawconn == nil {
evt := actor.RemoteUnreachableEvent{
ListenAddr: s.writeToAddr,
}
s.engine.BroadcastEvent(evt)
s.Shutdown(nil)
return
}
Expand Down Expand Up @@ -183,8 +178,12 @@ func (s *streamWriter) init() {
}()
}

// TODO: is there a way that stream router can listen to event stream
// instead of sending the event itself?
func (s *streamWriter) Shutdown(wg *sync.WaitGroup) {
s.engine.Send(s.routerPID, terminateStream{address: s.writeToAddr})
evt := actor.RemoteUnreachableEvent{ListenAddr: s.writeToAddr}
s.engine.Send(s.routerPID, evt)
s.engine.BroadcastEvent(evt)
if s.stream != nil {
s.stream.Close()
}
Expand Down

0 comments on commit 7ba1326

Please sign in to comment.