forked from skycoin/dmsg
-
Notifications
You must be signed in to change notification settings - Fork 0
/
client.go
454 lines (379 loc) · 11.9 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
package dmsg
import (
"context"
"errors"
"fmt"
"net"
"sync"
"time"
"github.com/skycoin/skycoin/src/util/logging"
"github.com/skycoin/dmsg/cipher"
"github.com/skycoin/dmsg/disc"
"github.com/skycoin/dmsg/netutil"
)
// TODO(evanlinjin): We should implement exponential backoff at some point.
const serveWait = time.Second
// SessionDialCallback is triggered BEFORE a session is dialed to.
// If a non-nil error is returned, the session dial is instantly terminated.
type SessionDialCallback func(network, addr string) (err error)
// SessionDisconnectCallback triggers after a session is closed.
type SessionDisconnectCallback func(network, addr string, err error)
// ClientCallbacks contains callbacks which a Client uses.
type ClientCallbacks struct {
OnSessionDial SessionDialCallback
OnSessionDisconnect SessionDisconnectCallback
}
func (sc *ClientCallbacks) ensure() {
if sc.OnSessionDial == nil {
sc.OnSessionDial = func(network, addr string) (err error) { return nil }
}
if sc.OnSessionDisconnect == nil {
sc.OnSessionDisconnect = func(network, addr string, err error) {}
}
}
// Config configures a dmsg client entity.
type Config struct {
MinSessions int
UpdateInterval time.Duration // Duration between discovery entry updates.
Callbacks *ClientCallbacks
}
// Ensure ensures all config values are set.
func (c *Config) Ensure() {
if c.MinSessions == 0 {
c.MinSessions = DefaultMinSessions
}
if c.UpdateInterval == 0 {
c.UpdateInterval = DefaultUpdateInterval
}
if c.Callbacks == nil {
c.Callbacks = new(ClientCallbacks)
}
c.Callbacks.ensure()
}
// DefaultConfig returns the default configuration for a dmsg client entity.
func DefaultConfig() *Config {
conf := &Config{
MinSessions: DefaultMinSessions,
UpdateInterval: DefaultUpdateInterval,
}
return conf
}
// Client represents a dmsg client entity.
type Client struct {
ready chan struct{}
readyOnce sync.Once
EntityCommon
conf *Config
porter *netutil.Porter
errCh chan error
done chan struct{}
once sync.Once
sesMx sync.Mutex
}
// NewClient creates a dmsg client entity.
func NewClient(pk cipher.PubKey, sk cipher.SecKey, dc disc.APIClient, conf *Config) *Client {
c := new(Client)
c.ready = make(chan struct{})
c.porter = netutil.NewPorter(netutil.PorterMinEphemeral)
c.errCh = make(chan error, 10)
c.done = make(chan struct{})
log := logging.MustGetLogger("dmsg_client")
// Init config.
if conf == nil {
conf = DefaultConfig()
}
conf.Ensure()
c.conf = conf
// Init common fields.
c.EntityCommon.init(pk, sk, dc, log, conf.UpdateInterval)
// Init callback: on set session.
c.EntityCommon.setSessionCallback = func(ctx context.Context, sessionCount int) error {
if err := c.EntityCommon.updateClientEntry(ctx, c.done); err != nil {
return err
}
// Client is 'ready' once we have successfully updated the discovery entry
// with at least one delegated server.
c.readyOnce.Do(func() { close(c.ready) })
return nil
}
// Init callback: on delete session.
c.EntityCommon.delSessionCallback = func(ctx context.Context, sessionCount int) error {
err := c.EntityCommon.updateClientEntry(ctx, c.done)
return err
}
return c
}
// Type returns the client's type (should always be "dmsg").
func (*Client) Type() string {
return Type
}
// Serve serves the client.
// It blocks until the client is closed.
func (ce *Client) Serve(ctx context.Context) {
defer func() {
ce.log.Info("Stopped serving client!")
}()
cancellabelCtx, cancel := context.WithCancel(ctx)
defer cancel()
go func(ctx context.Context) {
select {
case <-ctx.Done():
case <-ce.done:
cancel()
}
}(cancellabelCtx)
// Ensure we start updateClientEntryLoop once only.
updateEntryLoopOnce := new(sync.Once)
for {
if isClosed(ce.done) {
return
}
ce.log.Info("Discovering dmsg servers...")
entries, err := ce.discoverServers(cancellabelCtx)
if err != nil {
ce.log.WithError(err).Warn("Failed to discover dmsg servers.")
if err == context.Canceled || err == context.DeadlineExceeded {
return
}
time.Sleep(time.Second) // TODO(evanlinjin): Implement exponential back off.
continue
}
if len(entries) == 0 {
ce.log.Warnf("No entries found. Retrying after %s...", serveWait.String())
time.Sleep(serveWait)
}
for _, entry := range entries {
if isClosed(ce.done) {
return
}
// If we have enough sessions, we wait for error or done signal.
if ce.SessionCount() >= ce.conf.MinSessions {
select {
case <-ce.done:
return
case err := <-ce.errCh:
ce.log.WithError(err).Info("Session stopped.")
if isClosed(ce.done) {
return
}
}
}
if err := ce.ensureSession(cancellabelCtx, entry); err != nil {
ce.log.WithField("remote_pk", entry.Static).WithError(err).Warn("Failed to establish session.")
if err == context.Canceled || err == context.DeadlineExceeded {
return
}
time.Sleep(serveWait)
}
// Only start the update entry loop once we have at least one session established.
updateEntryLoopOnce.Do(func() { go ce.updateClientEntryLoop(cancellabelCtx, ce.done) })
}
}
}
// Ready returns a chan which blocks until the client has at least one delegated server and has an entry in the
// dmsg discovery.
func (ce *Client) Ready() <-chan struct{} {
return ce.ready
}
func (ce *Client) discoverServers(ctx context.Context) (entries []*disc.Entry, err error) {
err = netutil.NewDefaultRetrier(ce.log.WithField("func", "discoverServers")).Do(ctx, func() error {
entries, err = ce.dc.AvailableServers(ctx)
return err
})
return entries, err
}
// Close closes the dmsg client entity.
// TODO(evanlinjin): Have waitgroup.
func (ce *Client) Close() error {
if ce == nil {
return nil
}
ce.once.Do(func() {
close(ce.done)
ce.sesMx.Lock()
close(ce.errCh)
ce.sesMx.Unlock()
ce.sessionsMx.Lock()
for _, dSes := range ce.sessions {
ce.log.
WithError(dSes.Close()).
Info("Session closed.")
}
ce.sessions = make(map[cipher.PubKey]*SessionCommon)
ce.log.Info("All sessions closed.")
ce.sessionsMx.Unlock()
ce.porter.CloseAll(ce.log)
})
return nil
}
// Listen listens on a given dmsg port.
func (ce *Client) Listen(port uint16) (*Listener, error) {
lis := newListener(ce.porter, Addr{PK: ce.pk, Port: port})
ok, doneFn := ce.porter.Reserve(port, lis)
if !ok {
lis.close()
return nil, ErrPortOccupied
}
lis.addCloseCallback(doneFn)
return lis, nil
}
// Dial wraps DialStream to output net.Conn instead of *Stream.
func (ce *Client) Dial(ctx context.Context, addr Addr) (net.Conn, error) {
return ce.DialStream(ctx, addr)
}
// DialStream dials to a remote client entity with the given address.
func (ce *Client) DialStream(ctx context.Context, addr Addr) (*Stream, error) {
entry, err := getClientEntry(ctx, ce.dc, addr.PK)
if err != nil {
return nil, err
}
// Range client's delegated servers.
// See if we are already connected to a delegated server.
for _, srvPK := range entry.Client.DelegatedServers {
if dSes, ok := ce.clientSession(ce.porter, srvPK); ok {
return dSes.DialStream(addr)
}
}
// Range client's delegated servers.
// Attempt to connect to a delegated server.
for _, srvPK := range entry.Client.DelegatedServers {
dSes, err := ce.EnsureAndObtainSession(ctx, srvPK)
if err != nil {
continue
}
return dSes.DialStream(addr)
}
return nil, ErrCannotConnectToDelegated
}
// Session obtains an established session.
func (ce *Client) Session(pk cipher.PubKey) (ClientSession, bool) {
return ce.clientSession(ce.porter, pk)
}
// AllSessions obtains all established sessions.
func (ce *Client) AllSessions() []ClientSession {
return ce.allClientSessions(ce.porter)
}
// ConnectedServers obtains all the servers client is connected to.
//
// Deprecated: we can now obtain the remote TCP address of a session from the ClientSession struct directly.
func (ce *Client) ConnectedServers() []string {
sessions := ce.allClientSessions(ce.porter)
addrs := make([]string, len(sessions))
for i, s := range sessions {
addrs[i] = s.RemoteTCPAddr().String()
}
return addrs
}
// EnsureAndObtainSession attempts to obtain a session.
// If the session does not exist, we will attempt to establish one.
// It returns an error if the session does not exist AND cannot be established.
func (ce *Client) EnsureAndObtainSession(ctx context.Context, srvPK cipher.PubKey) (ClientSession, error) {
ce.sesMx.Lock()
defer ce.sesMx.Unlock()
if dSes, ok := ce.clientSession(ce.porter, srvPK); ok {
return dSes, nil
}
srvEntry, err := getServerEntry(ctx, ce.dc, srvPK)
if err != nil {
return ClientSession{}, err
}
return ce.dialSession(ctx, srvEntry)
}
// ensureSession ensures the existence of a session.
// It returns an error if the session does not exist AND cannot be established.
func (ce *Client) ensureSession(ctx context.Context, entry *disc.Entry) error {
ce.sesMx.Lock()
defer ce.sesMx.Unlock()
// If session with server of pk already exists, skip.
if _, ok := ce.clientSession(ce.porter, entry.Static); ok {
return nil
}
// Dial session.
_, err := ce.dialSession(ctx, entry)
return err
}
// It is expected that the session is created and served before the context cancels, otherwise an error will be returned.
// NOTE: This should not be called directly as it may lead to session duplicates.
// Only `ensureSession` or `EnsureAndObtainSession` should call this function.
func (ce *Client) dialSession(ctx context.Context, entry *disc.Entry) (cs ClientSession, err error) {
ce.log.WithField("remote_pk", entry.Static).Info("Dialing session...")
const network = "tcp"
// Trigger dial callback.
if err := ce.conf.Callbacks.OnSessionDial(network, entry.Server.Address); err != nil {
return ClientSession{}, fmt.Errorf("session dial is rejected by callback: %w", err)
}
defer func() {
if err != nil {
// Trigger disconnect callback when dial fails.
ce.conf.Callbacks.OnSessionDisconnect(network, entry.Server.Address, err)
}
}()
conn, err := net.Dial(network, entry.Server.Address)
if err != nil {
return ClientSession{}, err
}
dSes, err := makeClientSession(&ce.EntityCommon, ce.porter, conn, entry.Static)
if err != nil {
return ClientSession{}, err
}
if !ce.setSession(ctx, dSes.SessionCommon) {
_ = dSes.Close() //nolint:errcheck
return ClientSession{}, errors.New("session already exists")
}
go func() {
ce.log.WithField("remote_pk", dSes.RemotePK()).Info("Serving session.")
err := dSes.serve()
if !isClosed(ce.done) {
// We should only report an error when client is not closed.
// Also, when the client is closed, it will automatically delete all sessions.
ce.errCh <- fmt.Errorf("failed to serve dialed session to %s: %v", dSes.RemotePK(), err)
ce.delSession(ctx, dSes.RemotePK())
}
// Trigger disconnect callback.
ce.conf.Callbacks.OnSessionDisconnect(network, entry.Server.Address, err)
}()
return dSes, nil
}
// AllStreams returns all the streams of the current client.
func (ce *Client) AllStreams() (out []*Stream) {
fn := func(port uint16, pv netutil.PorterValue) (next bool) {
if str, ok := pv.Value.(*Stream); ok {
out = append(out, str)
return true
}
for _, v := range pv.Children {
if str, ok := v.(*Stream); ok {
out = append(out, str)
}
}
return true
}
ce.porter.RangePortValuesAndChildren(fn)
return out
}
// ConnectionsSummary associates connected clients, and the servers that connect such clients.
// Key: Client PK, Value: Slice of Server PKs
type ConnectionsSummary map[cipher.PubKey][]cipher.PubKey
// ConnectionsSummary returns a summary of all connected clients, and the associated servers that connect them.
func (ce *Client) ConnectionsSummary() ConnectionsSummary {
streams := ce.AllStreams()
out := make(ConnectionsSummary, len(streams))
for _, s := range streams {
cPK := s.RawRemoteAddr().PK
sPK := s.ServerPK()
sPKs, ok := out[cPK]
if ok && hasPK(sPKs, sPK) {
continue
}
out[cPK] = append(sPKs, sPK)
}
return out
}
func hasPK(pks []cipher.PubKey, pk cipher.PubKey) bool {
for _, oldPK := range pks {
if oldPK == pk {
return true
}
}
return false
}