Skip to content

Commit

Permalink
Implement lightweight method to check is peer has update channel (net…
Browse files Browse the repository at this point in the history
…birdio#1351)

Instead of GetAllConnectedPeers that need to traverse the whole
connections map in order to find one channel there.
surik authored Dec 5, 2023

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent 45fc89b commit 27ed88f
Showing 6 changed files with 47 additions and 47 deletions.
6 changes: 6 additions & 0 deletions management/server/account.go
Original file line number Diff line number Diff line change
@@ -114,6 +114,7 @@ type AccountManager interface {
LoginPeer(login PeerLogin) (*nbpeer.Peer, *NetworkMap, error) // used by peer gRPC API
SyncPeer(sync PeerSync) (*nbpeer.Peer, *NetworkMap, error) // used by peer gRPC API
GetAllConnectedPeers() (map[string]struct{}, error)
HasConnectedChannel(peerID string) bool
GetExternalCacheManager() ExternalCacheManager
}

@@ -1676,6 +1677,11 @@ func (am *DefaultAccountManager) GetAllConnectedPeers() (map[string]struct{}, er
return am.peersUpdateManager.GetAllConnectedPeers(), nil
}

// HasConnectedChannel returns true if peers has channel in update manager, otherwise false
func (am *DefaultAccountManager) HasConnectedChannel(peerID string) bool {
return am.peersUpdateManager.HasChannel(peerID)
}

var invalidDomainRegexp = regexp.MustCompile(`^([a-z0-9]+(-[a-z0-9]+)*\.)+[a-z]{2,}$`)

func isDomainValid(domain string) bool {
7 changes: 1 addition & 6 deletions management/server/http/peers_handler.go
Original file line number Diff line number Diff line change
@@ -35,14 +35,9 @@ func NewPeersHandler(accountManager server.AccountManager, authCfg AuthCfg) *Pee
func (h *PeersHandler) checkPeerStatus(peer *nbpeer.Peer) (*nbpeer.Peer, error) {
peerToReturn := peer.Copy()
if peer.Status.Connected {
statuses, err := h.accountManager.GetAllConnectedPeers()
if err != nil {
return peerToReturn, err
}

// Although we have online status in store we do not yet have an updated channel so have to show it as disconnected
// This may happen after server restart when not all peers are yet connected
if _, connected := statuses[peerToReturn.ID]; !connected {
if !h.accountManager.HasConnectedChannel(peer.ID) {
peerToReturn.Status.Connected = false
}
}
6 changes: 3 additions & 3 deletions management/server/http/peers_handler_test.go
Original file line number Diff line number Diff line change
@@ -81,16 +81,16 @@ func initTestMetaData(peers ...*nbpeer.Peer) *PeersHandler {
},
}, user, nil
},

GetAllConnectedPeersFunc: func() (map[string]struct{}, error) {
HasConnectedChannelFunc: func(peerID string) bool {
statuses := make(map[string]struct{})
for _, peer := range peers {
if peer.ID == noUpdateChannelTestPeerID {
break
}
statuses[peer.ID] = struct{}{}
}
return statuses, nil
_, ok := statuses[peerID]
return ok
},
},
claimsExtractor: jwtclaims.NewClaimsExtractor(
9 changes: 9 additions & 0 deletions management/server/mock_server/account_mock.go
Original file line number Diff line number Diff line change
@@ -81,6 +81,7 @@ type MockAccountManager struct {
SyncPeerFunc func(sync server.PeerSync) (*nbpeer.Peer, *server.NetworkMap, error)
InviteUserFunc func(accountID string, initiatorUserID string, targetUserEmail string) error
GetAllConnectedPeersFunc func() (map[string]struct{}, error)
HasConnectedChannelFunc func(peerID string) bool
GetExternalCacheManagerFunc func() server.ExternalCacheManager
}

@@ -622,6 +623,14 @@ func (am *MockAccountManager) GetAllConnectedPeers() (map[string]struct{}, error
return nil, status.Errorf(codes.Unimplemented, "method GetAllConnectedPeers is not implemented")
}

// HasconnectedChannel mocks HasConnectedChannel of the AccountManager interface
func (am *MockAccountManager) HasConnectedChannel(peerID string) bool {
if am.HasConnectedChannelFunc != nil {
return am.HasConnectedChannelFunc(peerID)
}
return false
}

// StoreEvent mocks StoreEvent of the AccountManager interface
func (am *MockAccountManager) StoreEvent(initiatorID, targetID, accountID string, activityID activity.Activity, meta map[string]any) {
if am.StoreEventFunc != nil {
48 changes: 10 additions & 38 deletions management/server/telemetry/updatechannel_metrics.go
Original file line number Diff line number Diff line change
@@ -11,48 +11,29 @@ import (

// UpdateChannelMetrics represents all metrics related to the UpdateChannel
type UpdateChannelMetrics struct {
createChannelDurationMs syncint64.Histogram
createChannelDurationMicro syncint64.Histogram
closeChannelDurationMs syncint64.Histogram
closeChannelDurationMicro syncint64.Histogram
closeChannelsDurationMs syncint64.Histogram
closeChannelsDurationMicro syncint64.Histogram
closeChannels syncint64.Histogram
sendUpdateDurationMs syncint64.Histogram
sendUpdateDurationMicro syncint64.Histogram
getAllConnectedPeersDurationMs syncint64.Histogram
getAllConnectedPeersDurationMicro syncint64.Histogram
getAllConnectedPeers syncint64.Histogram
hasChannelDurationMicro syncint64.Histogram
ctx context.Context
}

// NewUpdateChannelMetrics creates an instance of UpdateChannel
func NewUpdateChannelMetrics(ctx context.Context, meter metric.Meter) (*UpdateChannelMetrics, error) {
createChannelDurationMs, err := meter.SyncInt64().Histogram("management.updatechannel.create.duration.ms")
if err != nil {
return nil, err
}

createChannelDurationMicro, err := meter.SyncInt64().Histogram("management.updatechannel.create.duration.micro")
if err != nil {
return nil, err
}

closeChannelDurationMs, err := meter.SyncInt64().Histogram("management.updatechannel.close.one.duration.ms")
if err != nil {
return nil, err
}

closeChannelDurationMicro, err := meter.SyncInt64().Histogram("management.updatechannel.close.one.duration.micro")
if err != nil {
return nil, err
}

closeChannelsDurationMs, err := meter.SyncInt64().Histogram("management.updatechannel.close.multiple.duration.ms")
if err != nil {
return nil, err
}

closeChannelsDurationMicro, err := meter.SyncInt64().Histogram("management.updatechannel.close.multiple.duration.micro")
if err != nil {
return nil, err
@@ -63,64 +44,52 @@ func NewUpdateChannelMetrics(ctx context.Context, meter metric.Meter) (*UpdateCh
return nil, err
}

sendUpdateDurationMs, err := meter.SyncInt64().Histogram("management.updatechannel.send.duration.ms")
if err != nil {
return nil, err
}

sendUpdateDurationMicro, err := meter.SyncInt64().Histogram("management.updatechannel.send.duration.micro")
if err != nil {
return nil, err
}

getAllConnectedPeersDurationMs, err := meter.SyncInt64().Histogram("management.updatechannel.get.all.duration.ms")
getAllConnectedPeersDurationMicro, err := meter.SyncInt64().Histogram("management.updatechannel.get.all.duration.micro")
if err != nil {
return nil, err
}

getAllConnectedPeersDurationMicro, err := meter.SyncInt64().Histogram("management.updatechannel.get.all.duration.micro")
getAllConnectedPeers, err := meter.SyncInt64().Histogram("management.updatechannel.get.all.peers")
if err != nil {
return nil, err
}

getAllConnectedPeers, err := meter.SyncInt64().Histogram("management.updatechannel.get.all.peers")
hasChannelDurationMicro, err := meter.SyncInt64().Histogram("management.updatechannel.haschannel.duration.micro")
if err != nil {
return nil, err
}

return &UpdateChannelMetrics{
createChannelDurationMs: createChannelDurationMs,
createChannelDurationMicro: createChannelDurationMicro,
closeChannelDurationMs: closeChannelDurationMs,
closeChannelDurationMicro: closeChannelDurationMicro,
closeChannelsDurationMs: closeChannelsDurationMs,
closeChannelsDurationMicro: closeChannelsDurationMicro,
closeChannels: closeChannels,
sendUpdateDurationMs: sendUpdateDurationMs,
sendUpdateDurationMicro: sendUpdateDurationMicro,
getAllConnectedPeersDurationMs: getAllConnectedPeersDurationMs,
getAllConnectedPeersDurationMicro: getAllConnectedPeersDurationMicro,
getAllConnectedPeers: getAllConnectedPeers,
hasChannelDurationMicro: hasChannelDurationMicro,
ctx: ctx,
}, nil
}

// CountCreateChannelDuration counts the duration of the CreateChannel method,
// closed indicates if existing channel was closed before creation of a new one
func (metrics *UpdateChannelMetrics) CountCreateChannelDuration(duration time.Duration, closed bool) {
metrics.createChannelDurationMs.Record(metrics.ctx, duration.Milliseconds(), attribute.Bool("closed", closed))
metrics.createChannelDurationMicro.Record(metrics.ctx, duration.Microseconds(), attribute.Bool("closed", closed))
}

// CountCloseChannelDuration counts the duration of the CloseChannel method
func (metrics *UpdateChannelMetrics) CountCloseChannelDuration(duration time.Duration) {
metrics.closeChannelDurationMs.Record(metrics.ctx, duration.Milliseconds())
metrics.closeChannelDurationMicro.Record(metrics.ctx, duration.Microseconds())
}

// CountCloseChannelsDuration counts the duration of the CloseChannels method and the number of channels have been closed
func (metrics *UpdateChannelMetrics) CountCloseChannelsDuration(duration time.Duration, channels int) {
metrics.closeChannelsDurationMs.Record(metrics.ctx, duration.Milliseconds())
metrics.closeChannelsDurationMicro.Record(metrics.ctx, duration.Microseconds())
metrics.closeChannels.Record(metrics.ctx, int64(channels))
}
@@ -129,13 +98,16 @@ func (metrics *UpdateChannelMetrics) CountCloseChannelsDuration(duration time.Du
// found indicates if peer had channel, dropped indicates if the message was dropped due channel buffer overload
func (metrics *UpdateChannelMetrics) CountSendUpdateDuration(duration time.Duration, found, dropped bool) {
attrs := []attribute.KeyValue{attribute.Bool("found", found), attribute.Bool("dropped", dropped)}
metrics.sendUpdateDurationMs.Record(metrics.ctx, duration.Milliseconds(), attrs...)
metrics.sendUpdateDurationMicro.Record(metrics.ctx, duration.Microseconds(), attrs...)
}

// CountGetAllConnectedPeersDuration counts the duration of the GetAllConnectedPeers method and the number of peers have been returned
func (metrics *UpdateChannelMetrics) CountGetAllConnectedPeersDuration(duration time.Duration, peers int) {
metrics.getAllConnectedPeersDurationMs.Record(metrics.ctx, duration.Milliseconds())
metrics.getAllConnectedPeersDurationMicro.Record(metrics.ctx, duration.Microseconds())
metrics.getAllConnectedPeers.Record(metrics.ctx, int64(peers))
}

// CountHasChannelDuration counts the duration of the HasChannel method
func (metrics *UpdateChannelMetrics) CountHasChannelDuration(duration time.Duration) {
metrics.hasChannelDurationMicro.Record(metrics.ctx, duration.Microseconds())
}
18 changes: 18 additions & 0 deletions management/server/updatechannel.go
Original file line number Diff line number Diff line change
@@ -151,3 +151,21 @@ func (p *PeersUpdateManager) GetAllConnectedPeers() map[string]struct{} {

return m
}

// HasChannel returns true if peers has channel in update manager, otherwise false
func (p *PeersUpdateManager) HasChannel(peerID string) bool {
start := time.Now()

p.channelsMux.Lock()

defer func() {
p.channelsMux.Unlock()
if p.metrics != nil {
p.metrics.UpdateChannelMetrics().CountHasChannelDuration(time.Since(start))
}
}()

_, ok := p.peerChannels[peerID]

return ok
}

0 comments on commit 27ed88f

Please sign in to comment.