Skip to content

Commit

Permalink
clientv3: add Ctx() to return context of session
Browse files Browse the repository at this point in the history
Signed-off-by: Wenkang Zhang <[email protected]>
  • Loading branch information
kensou97 committed Jul 18, 2023
1 parent 0c643df commit 03d8fff
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 2 deletions.
14 changes: 12 additions & 2 deletions client/v3/concurrency/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type Session struct {
opts *sessionOptions
id v3.LeaseID

ctx context.Context
cancel context.CancelFunc
donec <-chan struct{}
}
Expand Down Expand Up @@ -61,11 +62,14 @@ func NewSession(client *v3.Client, opts ...SessionOption) (*Session, error) {
}

donec := make(chan struct{})
s := &Session{client: client, opts: ops, id: id, cancel: cancel, donec: donec}
s := &Session{client: client, opts: ops, id: id, ctx: ctx, cancel: cancel, donec: donec}

// keep the lease alive until client error or cancelled context
go func() {
defer close(donec)
defer func() {
close(donec)
cancel()
}()
for range keepAlive {
// eat messages until keep alive channel closes
}
Expand All @@ -82,6 +86,12 @@ func (s *Session) Client() *v3.Client {
// Lease is the lease ID for keys bound to the session.
func (s *Session) Lease() v3.LeaseID { return s.id }

// Ctx is the context attached to the session, it is canceled when the lease is orphaned, expires, or
// is otherwise no longer being refreshed.
func (s *Session) Ctx() context.Context {
return s.ctx
}

// Done returns a channel that closes when the lease is orphaned, expires, or
// is otherwise no longer being refreshed.
func (s *Session) Done() <-chan struct{} { return s.donec }
Expand Down
29 changes: 29 additions & 0 deletions tests/integration/clientv3/concurrency/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,32 @@ func TestSessionTTLOptions(t *testing.T) {
}

}

func TestSessionCtx(t *testing.T) {
cli, err := integration2.NewClient(t, clientv3.Config{Endpoints: exampleEndpoints()})
if err != nil {
t.Fatal(err)
}
defer cli.Close()
lease, err := cli.Grant(context.Background(), 100)
if err != nil {
t.Fatal(err)
}
s, err := concurrency.NewSession(cli, concurrency.WithLease(lease.ID))
if err != nil {
t.Fatal(err)
}
defer s.Close()
assert.Equal(t, s.Lease(), lease.ID)

childCtx, cancel := context.WithCancel(s.Ctx())
defer cancel()

go s.Orphan()
select {
case <-childCtx.Done():
case <-time.After(time.Millisecond * 100):
t.Fatal("child context of session context is not canceled")
}
assert.Equal(t, childCtx.Err(), context.Canceled)
}

0 comments on commit 03d8fff

Please sign in to comment.