Skip to content

Commit

Permalink
Implement lightweight method to check is peer has update channel (net…
Browse files Browse the repository at this point in the history
…birdio#1351)

Instead of GetAllConnectedPeers that need to traverse the whole
connections map in order to find one channel there.
  • Loading branch information
surik authored Dec 5, 2023
1 parent 05cc589 commit 7643490
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 47 deletions.
6 changes: 6 additions & 0 deletions management/server/account.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ type AccountManager interface {
LoginPeer(login PeerLogin) (*nbpeer.Peer, *NetworkMap, error) // used by peer gRPC API
SyncPeer(sync PeerSync) (*nbpeer.Peer, *NetworkMap, error) // used by peer gRPC API
GetAllConnectedPeers() (map[string]struct{}, error)
HasConnectedChannel(peerID string) bool
GetExternalCacheManager() ExternalCacheManager
}

Expand Down Expand Up @@ -1676,6 +1677,11 @@ func (am *DefaultAccountManager) GetAllConnectedPeers() (map[string]struct{}, er
return am.peersUpdateManager.GetAllConnectedPeers(), nil
}

// HasConnectedChannel returns true if peers has channel in update manager, otherwise false
func (am *DefaultAccountManager) HasConnectedChannel(peerID string) bool {
return am.peersUpdateManager.HasChannel(peerID)
}

var invalidDomainRegexp = regexp.MustCompile(`^([a-z0-9]+(-[a-z0-9]+)*\.)+[a-z]{2,}$`)

func isDomainValid(domain string) bool {
Expand Down
7 changes: 1 addition & 6 deletions management/server/http/peers_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,9 @@ func NewPeersHandler(accountManager server.AccountManager, authCfg AuthCfg) *Pee
func (h *PeersHandler) checkPeerStatus(peer *nbpeer.Peer) (*nbpeer.Peer, error) {
peerToReturn := peer.Copy()
if peer.Status.Connected {
statuses, err := h.accountManager.GetAllConnectedPeers()
if err != nil {
return peerToReturn, err
}

// Although we have online status in store we do not yet have an updated channel so have to show it as disconnected
// This may happen after server restart when not all peers are yet connected
if _, connected := statuses[peerToReturn.ID]; !connected {
if !h.accountManager.HasConnectedChannel(peer.ID) {
peerToReturn.Status.Connected = false
}
}
Expand Down
6 changes: 3 additions & 3 deletions management/server/http/peers_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,16 +81,16 @@ func initTestMetaData(peers ...*nbpeer.Peer) *PeersHandler {
},
}, user, nil
},

GetAllConnectedPeersFunc: func() (map[string]struct{}, error) {
HasConnectedChannelFunc: func(peerID string) bool {
statuses := make(map[string]struct{})
for _, peer := range peers {
if peer.ID == noUpdateChannelTestPeerID {
break
}
statuses[peer.ID] = struct{}{}
}
return statuses, nil
_, ok := statuses[peerID]
return ok
},
},
claimsExtractor: jwtclaims.NewClaimsExtractor(
Expand Down
9 changes: 9 additions & 0 deletions management/server/mock_server/account_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ type MockAccountManager struct {
SyncPeerFunc func(sync server.PeerSync) (*nbpeer.Peer, *server.NetworkMap, error)
InviteUserFunc func(accountID string, initiatorUserID string, targetUserEmail string) error
GetAllConnectedPeersFunc func() (map[string]struct{}, error)
HasConnectedChannelFunc func(peerID string) bool
GetExternalCacheManagerFunc func() server.ExternalCacheManager
}

Expand Down Expand Up @@ -622,6 +623,14 @@ func (am *MockAccountManager) GetAllConnectedPeers() (map[string]struct{}, error
return nil, status.Errorf(codes.Unimplemented, "method GetAllConnectedPeers is not implemented")
}

// HasconnectedChannel mocks HasConnectedChannel of the AccountManager interface
func (am *MockAccountManager) HasConnectedChannel(peerID string) bool {
if am.HasConnectedChannelFunc != nil {
return am.HasConnectedChannelFunc(peerID)
}
return false
}

// StoreEvent mocks StoreEvent of the AccountManager interface
func (am *MockAccountManager) StoreEvent(initiatorID, targetID, accountID string, activityID activity.Activity, meta map[string]any) {
if am.StoreEventFunc != nil {
Expand Down
48 changes: 10 additions & 38 deletions management/server/telemetry/updatechannel_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,48 +11,29 @@ import (

// UpdateChannelMetrics represents all metrics related to the UpdateChannel
type UpdateChannelMetrics struct {
createChannelDurationMs syncint64.Histogram
createChannelDurationMicro syncint64.Histogram
closeChannelDurationMs syncint64.Histogram
closeChannelDurationMicro syncint64.Histogram
closeChannelsDurationMs syncint64.Histogram
closeChannelsDurationMicro syncint64.Histogram
closeChannels syncint64.Histogram
sendUpdateDurationMs syncint64.Histogram
sendUpdateDurationMicro syncint64.Histogram
getAllConnectedPeersDurationMs syncint64.Histogram
getAllConnectedPeersDurationMicro syncint64.Histogram
getAllConnectedPeers syncint64.Histogram
hasChannelDurationMicro syncint64.Histogram
ctx context.Context
}

// NewUpdateChannelMetrics creates an instance of UpdateChannel
func NewUpdateChannelMetrics(ctx context.Context, meter metric.Meter) (*UpdateChannelMetrics, error) {
createChannelDurationMs, err := meter.SyncInt64().Histogram("management.updatechannel.create.duration.ms")
if err != nil {
return nil, err
}

createChannelDurationMicro, err := meter.SyncInt64().Histogram("management.updatechannel.create.duration.micro")
if err != nil {
return nil, err
}

closeChannelDurationMs, err := meter.SyncInt64().Histogram("management.updatechannel.close.one.duration.ms")
if err != nil {
return nil, err
}

closeChannelDurationMicro, err := meter.SyncInt64().Histogram("management.updatechannel.close.one.duration.micro")
if err != nil {
return nil, err
}

closeChannelsDurationMs, err := meter.SyncInt64().Histogram("management.updatechannel.close.multiple.duration.ms")
if err != nil {
return nil, err
}

closeChannelsDurationMicro, err := meter.SyncInt64().Histogram("management.updatechannel.close.multiple.duration.micro")
if err != nil {
return nil, err
Expand All @@ -63,64 +44,52 @@ func NewUpdateChannelMetrics(ctx context.Context, meter metric.Meter) (*UpdateCh
return nil, err
}

sendUpdateDurationMs, err := meter.SyncInt64().Histogram("management.updatechannel.send.duration.ms")
if err != nil {
return nil, err
}

sendUpdateDurationMicro, err := meter.SyncInt64().Histogram("management.updatechannel.send.duration.micro")
if err != nil {
return nil, err
}

getAllConnectedPeersDurationMs, err := meter.SyncInt64().Histogram("management.updatechannel.get.all.duration.ms")
getAllConnectedPeersDurationMicro, err := meter.SyncInt64().Histogram("management.updatechannel.get.all.duration.micro")
if err != nil {
return nil, err
}

getAllConnectedPeersDurationMicro, err := meter.SyncInt64().Histogram("management.updatechannel.get.all.duration.micro")
getAllConnectedPeers, err := meter.SyncInt64().Histogram("management.updatechannel.get.all.peers")
if err != nil {
return nil, err
}

getAllConnectedPeers, err := meter.SyncInt64().Histogram("management.updatechannel.get.all.peers")
hasChannelDurationMicro, err := meter.SyncInt64().Histogram("management.updatechannel.haschannel.duration.micro")
if err != nil {
return nil, err
}

return &UpdateChannelMetrics{
createChannelDurationMs: createChannelDurationMs,
createChannelDurationMicro: createChannelDurationMicro,
closeChannelDurationMs: closeChannelDurationMs,
closeChannelDurationMicro: closeChannelDurationMicro,
closeChannelsDurationMs: closeChannelsDurationMs,
closeChannelsDurationMicro: closeChannelsDurationMicro,
closeChannels: closeChannels,
sendUpdateDurationMs: sendUpdateDurationMs,
sendUpdateDurationMicro: sendUpdateDurationMicro,
getAllConnectedPeersDurationMs: getAllConnectedPeersDurationMs,
getAllConnectedPeersDurationMicro: getAllConnectedPeersDurationMicro,
getAllConnectedPeers: getAllConnectedPeers,
hasChannelDurationMicro: hasChannelDurationMicro,
ctx: ctx,
}, nil
}

// CountCreateChannelDuration counts the duration of the CreateChannel method,
// closed indicates if existing channel was closed before creation of a new one
func (metrics *UpdateChannelMetrics) CountCreateChannelDuration(duration time.Duration, closed bool) {
metrics.createChannelDurationMs.Record(metrics.ctx, duration.Milliseconds(), attribute.Bool("closed", closed))
metrics.createChannelDurationMicro.Record(metrics.ctx, duration.Microseconds(), attribute.Bool("closed", closed))
}

// CountCloseChannelDuration counts the duration of the CloseChannel method
func (metrics *UpdateChannelMetrics) CountCloseChannelDuration(duration time.Duration) {
metrics.closeChannelDurationMs.Record(metrics.ctx, duration.Milliseconds())
metrics.closeChannelDurationMicro.Record(metrics.ctx, duration.Microseconds())
}

// CountCloseChannelsDuration counts the duration of the CloseChannels method and the number of channels have been closed
func (metrics *UpdateChannelMetrics) CountCloseChannelsDuration(duration time.Duration, channels int) {
metrics.closeChannelsDurationMs.Record(metrics.ctx, duration.Milliseconds())
metrics.closeChannelsDurationMicro.Record(metrics.ctx, duration.Microseconds())
metrics.closeChannels.Record(metrics.ctx, int64(channels))
}
Expand All @@ -129,13 +98,16 @@ func (metrics *UpdateChannelMetrics) CountCloseChannelsDuration(duration time.Du
// found indicates if peer had channel, dropped indicates if the message was dropped due channel buffer overload
func (metrics *UpdateChannelMetrics) CountSendUpdateDuration(duration time.Duration, found, dropped bool) {
attrs := []attribute.KeyValue{attribute.Bool("found", found), attribute.Bool("dropped", dropped)}
metrics.sendUpdateDurationMs.Record(metrics.ctx, duration.Milliseconds(), attrs...)
metrics.sendUpdateDurationMicro.Record(metrics.ctx, duration.Microseconds(), attrs...)
}

// CountGetAllConnectedPeersDuration counts the duration of the GetAllConnectedPeers method and the number of peers have been returned
func (metrics *UpdateChannelMetrics) CountGetAllConnectedPeersDuration(duration time.Duration, peers int) {
metrics.getAllConnectedPeersDurationMs.Record(metrics.ctx, duration.Milliseconds())
metrics.getAllConnectedPeersDurationMicro.Record(metrics.ctx, duration.Microseconds())
metrics.getAllConnectedPeers.Record(metrics.ctx, int64(peers))
}

// CountHasChannelDuration counts the duration of the HasChannel method
func (metrics *UpdateChannelMetrics) CountHasChannelDuration(duration time.Duration) {
metrics.hasChannelDurationMicro.Record(metrics.ctx, duration.Microseconds())
}
18 changes: 18 additions & 0 deletions management/server/updatechannel.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,3 +151,21 @@ func (p *PeersUpdateManager) GetAllConnectedPeers() map[string]struct{} {

return m
}

// HasChannel returns true if peers has channel in update manager, otherwise false
func (p *PeersUpdateManager) HasChannel(peerID string) bool {
start := time.Now()

p.channelsMux.Lock()

defer func() {
p.channelsMux.Unlock()
if p.metrics != nil {
p.metrics.UpdateChannelMetrics().CountHasChannelDuration(time.Since(start))
}
}()

_, ok := p.peerChannels[peerID]

return ok
}

0 comments on commit 7643490

Please sign in to comment.