From 27ed88f918fe99a600880652ca44d5546d69cbb2 Mon Sep 17 00:00:00 2001 From: Yury Gargay Date: Tue, 5 Dec 2023 14:17:56 +0100 Subject: [PATCH] Implement lightweight method to check is peer has update channel (#1351) Instead of GetAllConnectedPeers that need to traverse the whole connections map in order to find one channel there. --- management/server/account.go | 6 +++ management/server/http/peers_handler.go | 7 +-- management/server/http/peers_handler_test.go | 6 +-- management/server/mock_server/account_mock.go | 9 ++++ .../server/telemetry/updatechannel_metrics.go | 48 ++++--------------- management/server/updatechannel.go | 18 +++++++ 6 files changed, 47 insertions(+), 47 deletions(-) diff --git a/management/server/account.go b/management/server/account.go index baaea2005d1..4c13c853587 100644 --- a/management/server/account.go +++ b/management/server/account.go @@ -114,6 +114,7 @@ type AccountManager interface { LoginPeer(login PeerLogin) (*nbpeer.Peer, *NetworkMap, error) // used by peer gRPC API SyncPeer(sync PeerSync) (*nbpeer.Peer, *NetworkMap, error) // used by peer gRPC API GetAllConnectedPeers() (map[string]struct{}, error) + HasConnectedChannel(peerID string) bool GetExternalCacheManager() ExternalCacheManager } @@ -1676,6 +1677,11 @@ func (am *DefaultAccountManager) GetAllConnectedPeers() (map[string]struct{}, er return am.peersUpdateManager.GetAllConnectedPeers(), nil } +// HasConnectedChannel returns true if peers has channel in update manager, otherwise false +func (am *DefaultAccountManager) HasConnectedChannel(peerID string) bool { + return am.peersUpdateManager.HasChannel(peerID) +} + var invalidDomainRegexp = regexp.MustCompile(`^([a-z0-9]+(-[a-z0-9]+)*\.)+[a-z]{2,}$`) func isDomainValid(domain string) bool { diff --git a/management/server/http/peers_handler.go b/management/server/http/peers_handler.go index c586bd6c8f2..734785e308a 100644 --- a/management/server/http/peers_handler.go +++ b/management/server/http/peers_handler.go @@ -35,14 +35,9 @@ func NewPeersHandler(accountManager server.AccountManager, authCfg AuthCfg) *Pee func (h *PeersHandler) checkPeerStatus(peer *nbpeer.Peer) (*nbpeer.Peer, error) { peerToReturn := peer.Copy() if peer.Status.Connected { - statuses, err := h.accountManager.GetAllConnectedPeers() - if err != nil { - return peerToReturn, err - } - // Although we have online status in store we do not yet have an updated channel so have to show it as disconnected // This may happen after server restart when not all peers are yet connected - if _, connected := statuses[peerToReturn.ID]; !connected { + if !h.accountManager.HasConnectedChannel(peer.ID) { peerToReturn.Status.Connected = false } } diff --git a/management/server/http/peers_handler_test.go b/management/server/http/peers_handler_test.go index d13db447b6b..27978c48754 100644 --- a/management/server/http/peers_handler_test.go +++ b/management/server/http/peers_handler_test.go @@ -81,8 +81,7 @@ func initTestMetaData(peers ...*nbpeer.Peer) *PeersHandler { }, }, user, nil }, - - GetAllConnectedPeersFunc: func() (map[string]struct{}, error) { + HasConnectedChannelFunc: func(peerID string) bool { statuses := make(map[string]struct{}) for _, peer := range peers { if peer.ID == noUpdateChannelTestPeerID { @@ -90,7 +89,8 @@ func initTestMetaData(peers ...*nbpeer.Peer) *PeersHandler { } statuses[peer.ID] = struct{}{} } - return statuses, nil + _, ok := statuses[peerID] + return ok }, }, claimsExtractor: jwtclaims.NewClaimsExtractor( diff --git a/management/server/mock_server/account_mock.go b/management/server/mock_server/account_mock.go index b200af0c3a7..a349b35a9d1 100644 --- a/management/server/mock_server/account_mock.go +++ b/management/server/mock_server/account_mock.go @@ -81,6 +81,7 @@ type MockAccountManager struct { SyncPeerFunc func(sync server.PeerSync) (*nbpeer.Peer, *server.NetworkMap, error) InviteUserFunc func(accountID string, initiatorUserID string, targetUserEmail string) error GetAllConnectedPeersFunc func() (map[string]struct{}, error) + HasConnectedChannelFunc func(peerID string) bool GetExternalCacheManagerFunc func() server.ExternalCacheManager } @@ -622,6 +623,14 @@ func (am *MockAccountManager) GetAllConnectedPeers() (map[string]struct{}, error return nil, status.Errorf(codes.Unimplemented, "method GetAllConnectedPeers is not implemented") } +// HasconnectedChannel mocks HasConnectedChannel of the AccountManager interface +func (am *MockAccountManager) HasConnectedChannel(peerID string) bool { + if am.HasConnectedChannelFunc != nil { + return am.HasConnectedChannelFunc(peerID) + } + return false +} + // StoreEvent mocks StoreEvent of the AccountManager interface func (am *MockAccountManager) StoreEvent(initiatorID, targetID, accountID string, activityID activity.Activity, meta map[string]any) { if am.StoreEventFunc != nil { diff --git a/management/server/telemetry/updatechannel_metrics.go b/management/server/telemetry/updatechannel_metrics.go index ede1671fcff..7abe34354a8 100644 --- a/management/server/telemetry/updatechannel_metrics.go +++ b/management/server/telemetry/updatechannel_metrics.go @@ -11,48 +11,29 @@ import ( // UpdateChannelMetrics represents all metrics related to the UpdateChannel type UpdateChannelMetrics struct { - createChannelDurationMs syncint64.Histogram createChannelDurationMicro syncint64.Histogram - closeChannelDurationMs syncint64.Histogram closeChannelDurationMicro syncint64.Histogram - closeChannelsDurationMs syncint64.Histogram closeChannelsDurationMicro syncint64.Histogram closeChannels syncint64.Histogram - sendUpdateDurationMs syncint64.Histogram sendUpdateDurationMicro syncint64.Histogram - getAllConnectedPeersDurationMs syncint64.Histogram getAllConnectedPeersDurationMicro syncint64.Histogram getAllConnectedPeers syncint64.Histogram + hasChannelDurationMicro syncint64.Histogram ctx context.Context } // NewUpdateChannelMetrics creates an instance of UpdateChannel func NewUpdateChannelMetrics(ctx context.Context, meter metric.Meter) (*UpdateChannelMetrics, error) { - createChannelDurationMs, err := meter.SyncInt64().Histogram("management.updatechannel.create.duration.ms") - if err != nil { - return nil, err - } - createChannelDurationMicro, err := meter.SyncInt64().Histogram("management.updatechannel.create.duration.micro") if err != nil { return nil, err } - closeChannelDurationMs, err := meter.SyncInt64().Histogram("management.updatechannel.close.one.duration.ms") - if err != nil { - return nil, err - } - closeChannelDurationMicro, err := meter.SyncInt64().Histogram("management.updatechannel.close.one.duration.micro") if err != nil { return nil, err } - closeChannelsDurationMs, err := meter.SyncInt64().Histogram("management.updatechannel.close.multiple.duration.ms") - if err != nil { - return nil, err - } - closeChannelsDurationMicro, err := meter.SyncInt64().Histogram("management.updatechannel.close.multiple.duration.micro") if err != nil { return nil, err @@ -63,44 +44,35 @@ func NewUpdateChannelMetrics(ctx context.Context, meter metric.Meter) (*UpdateCh return nil, err } - sendUpdateDurationMs, err := meter.SyncInt64().Histogram("management.updatechannel.send.duration.ms") - if err != nil { - return nil, err - } - sendUpdateDurationMicro, err := meter.SyncInt64().Histogram("management.updatechannel.send.duration.micro") if err != nil { return nil, err } - getAllConnectedPeersDurationMs, err := meter.SyncInt64().Histogram("management.updatechannel.get.all.duration.ms") + getAllConnectedPeersDurationMicro, err := meter.SyncInt64().Histogram("management.updatechannel.get.all.duration.micro") if err != nil { return nil, err } - getAllConnectedPeersDurationMicro, err := meter.SyncInt64().Histogram("management.updatechannel.get.all.duration.micro") + getAllConnectedPeers, err := meter.SyncInt64().Histogram("management.updatechannel.get.all.peers") if err != nil { return nil, err } - getAllConnectedPeers, err := meter.SyncInt64().Histogram("management.updatechannel.get.all.peers") + hasChannelDurationMicro, err := meter.SyncInt64().Histogram("management.updatechannel.haschannel.duration.micro") if err != nil { return nil, err } return &UpdateChannelMetrics{ - createChannelDurationMs: createChannelDurationMs, createChannelDurationMicro: createChannelDurationMicro, - closeChannelDurationMs: closeChannelDurationMs, closeChannelDurationMicro: closeChannelDurationMicro, - closeChannelsDurationMs: closeChannelsDurationMs, closeChannelsDurationMicro: closeChannelsDurationMicro, closeChannels: closeChannels, - sendUpdateDurationMs: sendUpdateDurationMs, sendUpdateDurationMicro: sendUpdateDurationMicro, - getAllConnectedPeersDurationMs: getAllConnectedPeersDurationMs, getAllConnectedPeersDurationMicro: getAllConnectedPeersDurationMicro, getAllConnectedPeers: getAllConnectedPeers, + hasChannelDurationMicro: hasChannelDurationMicro, ctx: ctx, }, nil } @@ -108,19 +80,16 @@ func NewUpdateChannelMetrics(ctx context.Context, meter metric.Meter) (*UpdateCh // CountCreateChannelDuration counts the duration of the CreateChannel method, // closed indicates if existing channel was closed before creation of a new one func (metrics *UpdateChannelMetrics) CountCreateChannelDuration(duration time.Duration, closed bool) { - metrics.createChannelDurationMs.Record(metrics.ctx, duration.Milliseconds(), attribute.Bool("closed", closed)) metrics.createChannelDurationMicro.Record(metrics.ctx, duration.Microseconds(), attribute.Bool("closed", closed)) } // CountCloseChannelDuration counts the duration of the CloseChannel method func (metrics *UpdateChannelMetrics) CountCloseChannelDuration(duration time.Duration) { - metrics.closeChannelDurationMs.Record(metrics.ctx, duration.Milliseconds()) metrics.closeChannelDurationMicro.Record(metrics.ctx, duration.Microseconds()) } // CountCloseChannelsDuration counts the duration of the CloseChannels method and the number of channels have been closed func (metrics *UpdateChannelMetrics) CountCloseChannelsDuration(duration time.Duration, channels int) { - metrics.closeChannelsDurationMs.Record(metrics.ctx, duration.Milliseconds()) metrics.closeChannelsDurationMicro.Record(metrics.ctx, duration.Microseconds()) metrics.closeChannels.Record(metrics.ctx, int64(channels)) } @@ -129,13 +98,16 @@ func (metrics *UpdateChannelMetrics) CountCloseChannelsDuration(duration time.Du // found indicates if peer had channel, dropped indicates if the message was dropped due channel buffer overload func (metrics *UpdateChannelMetrics) CountSendUpdateDuration(duration time.Duration, found, dropped bool) { attrs := []attribute.KeyValue{attribute.Bool("found", found), attribute.Bool("dropped", dropped)} - metrics.sendUpdateDurationMs.Record(metrics.ctx, duration.Milliseconds(), attrs...) metrics.sendUpdateDurationMicro.Record(metrics.ctx, duration.Microseconds(), attrs...) } // CountGetAllConnectedPeersDuration counts the duration of the GetAllConnectedPeers method and the number of peers have been returned func (metrics *UpdateChannelMetrics) CountGetAllConnectedPeersDuration(duration time.Duration, peers int) { - metrics.getAllConnectedPeersDurationMs.Record(metrics.ctx, duration.Milliseconds()) metrics.getAllConnectedPeersDurationMicro.Record(metrics.ctx, duration.Microseconds()) metrics.getAllConnectedPeers.Record(metrics.ctx, int64(peers)) } + +// CountHasChannelDuration counts the duration of the HasChannel method +func (metrics *UpdateChannelMetrics) CountHasChannelDuration(duration time.Duration) { + metrics.hasChannelDurationMicro.Record(metrics.ctx, duration.Microseconds()) +} diff --git a/management/server/updatechannel.go b/management/server/updatechannel.go index 0f14497fd2c..f760c5a7578 100644 --- a/management/server/updatechannel.go +++ b/management/server/updatechannel.go @@ -151,3 +151,21 @@ func (p *PeersUpdateManager) GetAllConnectedPeers() map[string]struct{} { return m } + +// HasChannel returns true if peers has channel in update manager, otherwise false +func (p *PeersUpdateManager) HasChannel(peerID string) bool { + start := time.Now() + + p.channelsMux.Lock() + + defer func() { + p.channelsMux.Unlock() + if p.metrics != nil { + p.metrics.UpdateChannelMetrics().CountHasChannelDuration(time.Since(start)) + } + }() + + _, ok := p.peerChannels[peerID] + + return ok +}