Skip to content

Commit

Permalink
Add Connect and Disconnect functionality
Browse files Browse the repository at this point in the history
GODRIVER-285

Change-Id: I1d2eae1cc94a93c0664665541b12682131a49018
  • Loading branch information
skriptble committed Apr 18, 2018
1 parent 9bee77c commit 94ae2dc
Show file tree
Hide file tree
Showing 21 changed files with 1,388 additions and 225 deletions.
8 changes: 8 additions & 0 deletions core/connection/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,14 @@ type Dialer interface {
DialContext(ctx context.Context, network, address string) (net.Conn, error)
}

// DialerFunc is a type implemented by functions that can be used as a Dialer.
type DialerFunc func(ctx context.Context, network, address string) (net.Conn, error)

// DialContext implements the Dialer interface.
func (df DialerFunc) DialContext(ctx context.Context, network, address string) (net.Conn, error) {
return df(ctx, network, address)
}

// DefaultDialer is the Dialer implementation that is used by this package. Changing this
// will also change the Dialer used for this package. This should only be changed why all
// of the connections being made need to use a different Dialer. Most of the time, using a
Expand Down
69 changes: 63 additions & 6 deletions core/connection/connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,26 +7,83 @@
package connection

import (
"context"
"net"
"sync"
"testing"
)

// bootstrapConnection creates a listener that will listen for a single connection
// on the return address. The user provided run function will be called with the accepted
// connection. The user is responsible for closing the connection.
func bootstrapConnection(t *testing.T, run func(net.Conn)) net.Addr {
l, err := net.Listen("tcp", ":0")
func bootstrapConnections(t *testing.T, num int, run func(net.Conn)) net.Addr {
l, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Errorf("Could not set up a listener: %v", err)
t.FailNow()
}
go func() {
c, err := l.Accept()
if err != nil {
t.Errorf("Could not accept a connection: %v", err)
for i := 0; i < num; i++ {
c, err := l.Accept()
if err != nil {
t.Errorf("Could not accept a connection: %v", err)
}
go run(c)
}
_ = l.Close()
run(c)
}()
return l.Addr()
}

type netconn struct {
net.Conn
closed chan struct{}
d *dialer
}

func (nc *netconn) Close() error {
nc.closed <- struct{}{}
nc.d.connclosed(nc)
return nc.Conn.Close()
}

type dialer struct {
Dialer
opened map[*netconn]struct{}
closed map[*netconn]struct{}
sync.Mutex
}

func newdialer(d Dialer) *dialer {
return &dialer{Dialer: d, opened: make(map[*netconn]struct{}), closed: make(map[*netconn]struct{})}
}

func (d *dialer) DialContext(ctx context.Context, network, address string) (net.Conn, error) {
d.Lock()
defer d.Unlock()
c, err := d.Dialer.DialContext(ctx, network, address)
if err != nil {
return nil, err
}
nc := &netconn{Conn: c, closed: make(chan struct{}, 1), d: d}
d.opened[nc] = struct{}{}
return nc, nil
}

func (d *dialer) connclosed(nc *netconn) {
d.Lock()
defer d.Unlock()
d.closed[nc] = struct{}{}
}

func (d *dialer) lenopened() int {
d.Lock()
defer d.Unlock()
return len(d.opened)
}

func (d *dialer) lenclosed() int {
d.Lock()
defer d.Unlock()
return len(d.closed)
}
5 changes: 5 additions & 0 deletions core/connection/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,8 @@ type NetworkError struct {
func (ne NetworkError) Error() string {
return fmt.Sprintf("connection(%s): %s", ne.ConnectionID, ne.Wrapped.Error())
}

// PoolError is an error returned from a Pool method.
type PoolError string

func (pe PoolError) Error() string { return string(pe) }
Loading

0 comments on commit 94ae2dc

Please sign in to comment.