Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#6 automate tests #13

Merged
merged 20 commits into from
May 24, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
#6 Listen to end session continuously in a separate goroutine
XioZ committed May 21, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
commit c7b500135aa1bd5f09c65308b3afb56f319cf1bc
35 changes: 17 additions & 18 deletions mino/minows/rpc.go
Original file line number Diff line number Diff line change
@@ -81,9 +81,9 @@ func (r rpc) Call(ctx context.Context, req serde.Message,
return
case env := <-result:
if env.err != nil {
responses <- mino.NewResponseWithError(env.addr, env.err)
responses <- mino.NewResponseWithError(env.from, env.err)
} else {
responses <- mino.NewResponse(env.addr, env.msg)
responses <- mino.NewResponse(env.from, env.msg)
}
}
}
@@ -221,28 +221,27 @@ func (r rpc) createSession(ctx context.Context,
}

go func() {
for {
select {
// Initiator ended session by canceling context
case <-ctx.Done():
for env := range result {
// Cancelling context resets stream and ends session for
// participants
if xerrors.Is(env.err, network.ErrReset) {
close(done)
return
case env := <-result:
// Cancelling context resets stream and ends session for
// participants
if xerrors.Is(env.err, network.ErrReset) {
close(done)
return
}
select {
case <-done:
return
case in <- env:
}
}
select {
case <-done:
return
case in <- env:
}
}
}()

go func() {
// Initiator ended session by canceling context
<-ctx.Done()
close(done)
}()

return &session{
logger: r.logger.With().Stringer("session", xid.New()).Logger(),
myAddr: r.myAddr,
14 changes: 7 additions & 7 deletions mino/minows/session.go
Original file line number Diff line number Diff line change
@@ -15,7 +15,7 @@ import (
const loopbackBufferSize = 16

type envelope struct {
addr mino.Address // todo rename from
from mino.Address
msg serde.Message
err error
}
@@ -53,7 +53,7 @@ func (s session) Send(msg serde.Message, addrs ...mino.Address) <-chan error {
case <-s.done:
return network.ErrReset
default:
s.buffer <- envelope{addr: s.myAddr, msg: msg}
s.buffer <- envelope{from: s.myAddr, msg: msg}
}
return nil
}
@@ -68,7 +68,7 @@ func (s session) Send(msg serde.Message, addrs ...mino.Address) <-chan error {
for _, addr := range addrs {
go func(dest mino.Address) {
err := send(dest)
result <- envelope{addr: dest, err: err}
result <- envelope{from: dest, err: err}
}(addr)
}

@@ -83,10 +83,10 @@ func (s session) Send(msg serde.Message, addrs ...mino.Address) <-chan error {
}
if env.err != nil {
errs <- xerrors.Errorf("could not send to %v: %v",
env.addr, env.err)
env.from, env.err)
continue
}
s.logger.Trace().Stringer("to", env.addr).
s.logger.Trace().Stringer("to", env.from).
Msgf("sent %v", msg)
}
}()
@@ -103,8 +103,8 @@ func (s session) Recv(ctx context.Context) (mino.Address, serde.Message, error)
case <-ctx.Done():
return nil, nil, ctx.Err()
case env := <-s.in:
s.logger.Trace().Stringer("from", env.addr).
s.logger.Trace().Stringer("from", env.from).
Msgf("received %v", env.msg)
return env.addr, env.msg, env.err
return env.from, env.msg, env.err
}
}
8 changes: 2 additions & 6 deletions mino/minows/session_test.go
Original file line number Diff line number Diff line change
@@ -247,17 +247,13 @@ func Test_session_Recv_SessionEnded(t *testing.T) {
mustCreateRPC(t, player, "test", handler)

s, r, stop := mustCreateSession(t, rpc, initiator, player)

errs := s.Send(fake.Message{}, initiator.GetAddress(), player.GetAddress())
_, open := <-errs
require.False(t, open)
stop()

// todo 1 sec
ctx, cancel := context.WithTimeout(context.Background(), time.Hour)
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

stop()
<-s.(*session).done // todo remove
_, _, err := r.Recv(ctx)
require.ErrorIs(t, err, io.EOF)
}