forked from hashicorp/memberlist
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtransport.go
127 lines (106 loc) · 4.47 KB
/
transport.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package memberlist
import (
"fmt"
"net"
"time"
)
// Packet is used to provide some metadata about incoming packets from peers
// over a packet connection, as well as the packet payload.
type Packet struct {
// Buf has the raw contents of the packet.
Buf []byte
// From has the address of the peer. This is an actual net.Addr so we
// can expose some concrete details about incoming packets.
From net.Addr
// Timestamp is the time when the packet was received. This should be
// taken as close as possible to the actual receipt time to help make an
// accurate RTT measurement during probes.
Timestamp time.Time
}
// Transport is used to abstract over communicating with other peers. The packet
// interface is assumed to be best-effort and the stream interface is assumed to
// be reliable.
type Transport interface {
// FinalAdvertiseAddr is given the user's configured values (which
// might be empty) and returns the desired IP and port to advertise to
// the rest of the cluster.
FinalAdvertiseAddr(ip string, port int) (net.IP, int, error)
// WriteTo is a packet-oriented interface that fires off the given
// payload to the given address in a connectionless fashion. This should
// return a time stamp that's as close as possible to when the packet
// was transmitted to help make accurate RTT measurements during probes.
//
// This is similar to net.PacketConn, though we didn't want to expose
// that full set of required methods to keep assumptions about the
// underlying plumbing to a minimum. We also treat the address here as a
// string, similar to Dial, so it's network neutral, so this usually is
// in the form of "host:port".
WriteTo(b []byte, addr string) (time.Time, error)
// PacketCh returns a channel that can be read to receive incoming
// packets from other peers. How this is set up for listening is left as
// an exercise for the concrete transport implementations.
PacketCh() <-chan *Packet
// DialTimeout is used to create a connection that allows us to perform
// two-way communication with a peer. This is generally more expensive
// than packet connections so is used for more infrequent operations
// such as anti-entropy or fallback probes if the packet-oriented probe
// failed.
DialTimeout(addr string, timeout time.Duration) (net.Conn, error)
// StreamCh returns a channel that can be read to handle incoming stream
// connections from other peers. How this is set up for listening is
// left as an exercise for the concrete transport implementations.
StreamCh() <-chan net.Conn
// Shutdown is called when memberlist is shutting down; this gives the
// transport a chance to clean up any listeners.
Shutdown() error
}
type Address struct {
// Addr is a network address as a string, similar to Dial. This usually is
// in the form of "host:port". This is required.
Addr string
// Name is the name of the node being addressed. This is optional but
// transports may require it.
Name string
}
func (a *Address) String() string {
if a.Name != "" {
return fmt.Sprintf("%s (%s)", a.Name, a.Addr)
}
return a.Addr
}
type IngestionAwareTransport interface {
Transport
// IngestPacket pulls a single packet off the conn, and only closes it if shouldClose is true.
IngestPacket(conn net.Conn, addr net.Addr, now time.Time, shouldClose bool) error
// IngestStream hands off the conn to the transport and doesn't close it.
IngestStream(conn net.Conn) error
}
type NodeAwareTransport interface {
IngestionAwareTransport
WriteToAddress(b []byte, addr Address) (time.Time, error)
DialAddressTimeout(addr Address, timeout time.Duration) (net.Conn, error)
}
type shimNodeAwareTransport struct {
Transport
}
var _ NodeAwareTransport = (*shimNodeAwareTransport)(nil)
func (t *shimNodeAwareTransport) IngestPacket(conn net.Conn, addr net.Addr, now time.Time, shouldClose bool) error {
iat, ok := t.Transport.(IngestionAwareTransport)
if !ok {
panic("shimNodeAwareTransport does not support IngestPacket")
}
return iat.IngestPacket(conn, addr, now, shouldClose)
}
func (t *shimNodeAwareTransport) IngestStream(conn net.Conn) error {
iat, ok := t.Transport.(IngestionAwareTransport)
if !ok {
panic("shimNodeAwareTransport does not support IngestStream")
}
return iat.IngestStream(conn)
}
func (t *shimNodeAwareTransport) WriteToAddress(b []byte, addr Address) (time.Time, error) {
return t.WriteTo(b, addr.Addr)
}
func (t *shimNodeAwareTransport) DialAddressTimeout(addr Address, timeout time.Duration) (net.Conn, error) {
return t.DialTimeout(addr.Addr, timeout)
}