Skip to content

Commit

Permalink
rename messageManager to messageSenderImpl and renamed dht.messageMgr to
Browse files Browse the repository at this point in the history
dht.msgSender
  • Loading branch information
aschmahmann committed Jan 4, 2021
1 parent 6fca41f commit ebd2d69
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 15 deletions.
6 changes: 3 additions & 3 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ type IpfsDHT struct {
proc goprocess.Process

protoMessenger *pb.ProtocolMessenger
messageMgr *messageManager
msgSender *messageSenderImpl

plk sync.Mutex

Expand Down Expand Up @@ -188,12 +188,12 @@ func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error)
dht.disableFixLowPeers = cfg.disableFixLowPeers

dht.Validator = cfg.validator
dht.messageMgr = &messageManager{
dht.msgSender = &messageSenderImpl{
host: h,
strmap: make(map[peer.ID]*peerMessageSender),
protocols: dht.protocols,
}
dht.protoMessenger, err = pb.NewProtocolMessenger(dht.messageMgr, pb.WithValidator(dht.Validator))
dht.protoMessenger, err = pb.NewProtocolMessenger(dht.msgSender, pb.WithValidator(dht.Validator))
if err != nil {
return nil, err
}
Expand Down
8 changes: 4 additions & 4 deletions dht_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -570,14 +570,14 @@ func TestInvalidMessageSenderTracking(t *testing.T) {
defer dht.Close()

foo := peer.ID("asdasd")
_, err := dht.messageMgr.messageSenderForPeer(ctx, foo)
_, err := dht.msgSender.messageSenderForPeer(ctx, foo)
if err == nil {
t.Fatal("that shouldnt have succeeded")
}

dht.messageMgr.smlk.Lock()
mscnt := len(dht.messageMgr.strmap)
dht.messageMgr.smlk.Unlock()
dht.msgSender.smlk.Lock()
mscnt := len(dht.msgSender.strmap)
dht.msgSender.smlk.Unlock()

if mscnt > 0 {
t.Fatal("should have no message senders in map")
Expand Down
14 changes: 7 additions & 7 deletions message_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,16 @@ import (
"go.opencensus.io/tag"
)

// messageManager is responsible for sending requests and messages to peers efficiently, including reuse of streams.
// messageSenderImpl is responsible for sending requests and messages to peers efficiently, including reuse of streams.
// It also tracks metrics for sent requests and messages.
type messageManager struct {
type messageSenderImpl struct {
host host.Host // the network services we need
smlk sync.Mutex
strmap map[peer.ID]*peerMessageSender
protocols []protocol.ID
}

func (m *messageManager) streamDisconnect(ctx context.Context, p peer.ID) {
func (m *messageSenderImpl) streamDisconnect(ctx context.Context, p peer.ID) {
m.smlk.Lock()
defer m.smlk.Unlock()
ms, ok := m.strmap[p]
Expand All @@ -49,7 +49,7 @@ func (m *messageManager) streamDisconnect(ctx context.Context, p peer.ID) {

// SendRequest sends out a request, but also makes sure to
// measure the RTT for latency measurements.
func (m *messageManager) SendRequest(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
func (m *messageSenderImpl) SendRequest(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
ctx, _ = tag.New(ctx, metrics.UpsertMessageType(pmes))

ms, err := m.messageSenderForPeer(ctx, p)
Expand Down Expand Up @@ -84,7 +84,7 @@ func (m *messageManager) SendRequest(ctx context.Context, p peer.ID, pmes *pb.Me
}

// SendMessage sends out a message
func (m *messageManager) SendMessage(ctx context.Context, p peer.ID, pmes *pb.Message) error {
func (m *messageSenderImpl) SendMessage(ctx context.Context, p peer.ID, pmes *pb.Message) error {
ctx, _ = tag.New(ctx, metrics.UpsertMessageType(pmes))

ms, err := m.messageSenderForPeer(ctx, p)
Expand Down Expand Up @@ -113,7 +113,7 @@ func (m *messageManager) SendMessage(ctx context.Context, p peer.ID, pmes *pb.Me
return nil
}

func (m *messageManager) messageSenderForPeer(ctx context.Context, p peer.ID) (*peerMessageSender, error) {
func (m *messageSenderImpl) messageSenderForPeer(ctx context.Context, p peer.ID) (*peerMessageSender, error) {
m.smlk.Lock()
ms, ok := m.strmap[p]
if ok {
Expand Down Expand Up @@ -151,7 +151,7 @@ type peerMessageSender struct {
r msgio.ReadCloser
lk ctxMutex
p peer.ID
m *messageManager
m *messageSenderImpl

invalid bool
singleMes int
Expand Down
2 changes: 1 addition & 1 deletion subscriber_notifee.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func (nn *subscriberNotifee) Disconnected(n network.Network, v network.Conn) {
return
}

dht.messageMgr.streamDisconnect(dht.Context(), p)
dht.msgSender.streamDisconnect(dht.Context(), p)
}

func (nn *subscriberNotifee) Connected(network.Network, network.Conn) {}
Expand Down

0 comments on commit ebd2d69

Please sign in to comment.