Skip to content

Commit

Permalink
feat: log error and stacktrace when panic in goroutine (#1225)
Browse files Browse the repository at this point in the history
  • Loading branch information
qfrank authored Sep 25, 2024
1 parent 798c9c5 commit 8b0e031
Show file tree
Hide file tree
Showing 42 changed files with 113 additions and 0 deletions.
2 changes: 2 additions & 0 deletions library/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
"github.com/waku-org/go-waku/waku/v2/protocol/subscription"
"github.com/waku-org/go-waku/waku/v2/utils"
)

type filterArgument struct {
Expand Down Expand Up @@ -74,6 +75,7 @@ func FilterSubscribe(instance *WakuInstance, filterJSON string, peerID string, m

for _, subscriptionDetails := range subscriptions {
go func(subscriptionDetails *subscription.SubscriptionDetails) {
defer utils.LogOnPanic()
for envelope := range subscriptionDetails.C {
send(instance, "message", toSubscriptionMessage(envelope))
}
Expand Down
2 changes: 2 additions & 0 deletions library/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/utils"
)

// RelayEnoughPeers determines if there are enough peers to publish a message on a topic
Expand Down Expand Up @@ -66,6 +67,7 @@ func relaySubscribe(instance *WakuInstance, filterJSON string) error {

for _, sub := range subscriptions {
go func(subscription *relay.Subscription) {
defer utils.LogOnPanic()
for envelope := range subscription.Ch {
send(instance, "message", toSubscriptionMessage(envelope))
}
Expand Down
3 changes: 3 additions & 0 deletions waku/persistence/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_store/pb"
wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/timesource"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
)
Expand Down Expand Up @@ -186,6 +187,7 @@ func (d *DBStore) Start(ctx context.Context, timesource timesource.Timesource) e
}

func (d *DBStore) updateMetrics(ctx context.Context) {
defer utils.LogOnPanic()
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
defer d.wg.Done()
Expand Down Expand Up @@ -251,6 +253,7 @@ func (d *DBStore) getDeleteOldRowsQuery() string {
}

func (d *DBStore) checkForOlderRecords(ctx context.Context, t time.Duration) {
defer utils.LogOnPanic()
defer d.wg.Done()

ticker := time.NewTicker(t)
Expand Down
5 changes: 5 additions & 0 deletions waku/v2/api/filter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
"github.com/waku-org/go-waku/waku/v2/protocol/subscription"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -98,6 +99,7 @@ func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilte
}

func (apiSub *Sub) Unsubscribe(contentFilter protocol.ContentFilter) {
defer utils.LogOnPanic()
_, err := apiSub.wf.Unsubscribe(apiSub.ctx, contentFilter)
//Not reading result unless we want to do specific error handling?
if err != nil {
Expand All @@ -106,6 +108,7 @@ func (apiSub *Sub) Unsubscribe(contentFilter protocol.ContentFilter) {
}

func (apiSub *Sub) subscriptionLoop(batchInterval time.Duration) {
defer utils.LogOnPanic()
ticker := time.NewTicker(batchInterval)
defer ticker.Stop()
for {
Expand Down Expand Up @@ -213,12 +216,14 @@ func (apiSub *Sub) multiplex(subs []*subscription.SubscriptionDetails) {
for _, subDetails := range subs {
apiSub.subs[subDetails.ID] = subDetails
go func(subDetails *subscription.SubscriptionDetails) {
defer utils.LogOnPanic()
apiSub.log.Debug("new multiplex", zap.String("sub-id", subDetails.ID))
for env := range subDetails.C {
apiSub.DataCh <- env
}
}(subDetails)
go func(subDetails *subscription.SubscriptionDetails) {
defer utils.LogOnPanic()
select {
case <-apiSub.ctx.Done():
return
Expand Down
3 changes: 3 additions & 0 deletions waku/v2/api/filter/filter_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/onlinechecker"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
"github.com/waku-org/go-waku/waku/v2/utils"
)

// Methods on FilterManager just aggregate filters from application and subscribe to them
Expand Down Expand Up @@ -87,6 +88,7 @@ func NewFilterManager(ctx context.Context, logger *zap.Logger, minPeersPerFilter
}

func (mgr *FilterManager) startFilterSubLoop() {
defer utils.LogOnPanic()
ticker := time.NewTicker(mgr.filterSubBatchDuration)
defer ticker.Stop()
for {
Expand Down Expand Up @@ -157,6 +159,7 @@ func (mgr *FilterManager) SubscribeFilter(filterID string, cf protocol.ContentFi
}

func (mgr *FilterManager) subscribeAndRunLoop(f filterConfig) {
defer utils.LogOnPanic()
ctx, cancel := context.WithCancel(mgr.ctx)
config := FilterConfig{MaxPeers: mgr.minPeersPerFilter}
sub, err := Subscribe(ctx, mgr.node, f.contentFilter, config, mgr.logger, mgr.params)
Expand Down
4 changes: 4 additions & 0 deletions waku/v2/api/missing/missing_messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/store"
"github.com/waku-org/go-waku/waku/v2/timesource"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
)
Expand Down Expand Up @@ -102,6 +103,7 @@ func (m *MissingMessageVerifier) Start(ctx context.Context) {
m.C = c

go func() {
defer utils.LogOnPanic()
t := time.NewTicker(m.params.interval)
defer t.Stop()

Expand All @@ -123,6 +125,7 @@ func (m *MissingMessageVerifier) Start(ctx context.Context) {
default:
semaphore <- struct{}{}
go func(interest criteriaInterest) {
defer utils.LogOnPanic()
m.fetchHistory(c, interest)
<-semaphore
}(interest)
Expand Down Expand Up @@ -276,6 +279,7 @@ func (m *MissingMessageVerifier) fetchMessagesBatch(c chan<- *protocol.Envelope,

wg.Add(1)
go func(messageHashes []pb.MessageHash) {
defer utils.LogOnPanic()
defer wg.Wait()

result, err := m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (*store.Result, error) {
Expand Down
2 changes: 2 additions & 0 deletions waku/v2/api/publish/message_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/store"
"github.com/waku-org/go-waku/waku/v2/timesource"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -145,6 +146,7 @@ func (m *MessageSentCheck) SetStorePeerID(peerID peer.ID) {

// Start checks if the tracked outgoing messages are stored periodically
func (m *MessageSentCheck) Start() {
defer utils.LogOnPanic()
ticker := time.NewTicker(m.hashQueryInterval)
defer ticker.Stop()
for {
Expand Down
2 changes: 2 additions & 0 deletions waku/v2/api/publish/message_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"sync"

"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/utils"
)

// MessagePriority determines the ordering for the message priority queue
Expand Down Expand Up @@ -182,6 +183,7 @@ func (m *MessageQueue) Pop(ctx context.Context) <-chan *protocol.Envelope {
ch := make(chan *protocol.Envelope)

go func() {
defer utils.LogOnPanic()
defer close(ch)

select {
Expand Down
2 changes: 2 additions & 0 deletions waku/v2/discv5/discover.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ func (d *DiscoveryV5) listen(ctx context.Context) error {
if d.NAT != nil && !d.udpAddr.IP.IsLoopback() {
d.WaitGroup().Add(1)
go func() {
defer utils.LogOnPanic()
defer d.WaitGroup().Done()
nat.Map(d.NAT, ctx.Done(), "udp", d.udpAddr.Port, d.udpAddr.Port, "go-waku discv5 discovery")
}()
Expand Down Expand Up @@ -217,6 +218,7 @@ func (d *DiscoveryV5) start() error {
if d.params.autoFindPeers {
d.WaitGroup().Add(1)
go func() {
defer utils.LogOnPanic()
defer d.WaitGroup().Done()
d.runDiscoveryV5Loop(d.Context())
}()
Expand Down
2 changes: 2 additions & 0 deletions waku/v2/discv5/mock_peer_discoverer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/libp2p/go-libp2p/core/peer"
"github.com/waku-org/go-waku/waku/v2/service"
"github.com/waku-org/go-waku/waku/v2/utils"
)

// TestPeerDiscoverer is mock peer discoverer for testing
Expand All @@ -26,6 +27,7 @@ func NewTestPeerDiscoverer() *TestPeerDiscoverer {
// Subscribe is for subscribing to peer discoverer
func (t *TestPeerDiscoverer) Subscribe(ctx context.Context, ch <-chan service.PeerData) {
go func() {
defer utils.LogOnPanic()
for {
select {
case <-ctx.Done():
Expand Down
2 changes: 2 additions & 0 deletions waku/v2/node/connectedness.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"go.uber.org/zap"

wps "github.com/waku-org/go-waku/waku/v2/peerstore"
"github.com/waku-org/go-waku/waku/v2/utils"
)

// PeerStatis is a map of peer IDs to supported protocols
Expand Down Expand Up @@ -101,6 +102,7 @@ func (c ConnectionNotifier) Close() {
}

func (w *WakuNode) connectednessListener(ctx context.Context) {
defer utils.LogOnPanic()
defer w.wg.Done()

for {
Expand Down
3 changes: 3 additions & 0 deletions waku/v2/node/keepalive.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
"github.com/waku-org/go-waku/logging"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
"golang.org/x/exp/maps"
)
Expand Down Expand Up @@ -40,6 +41,7 @@ func disconnectAllPeers(host host.Host, logger *zap.Logger) {
// This is necessary because TCP connections are automatically closed due to inactivity,
// and doing a ping will avoid this (with a small bandwidth cost)
func (w *WakuNode) startKeepAlive(ctx context.Context, randomPeersPingDuration time.Duration, allPeersPingDuration time.Duration) {
defer utils.LogOnPanic()
defer w.wg.Done()

if !w.opts.enableRelay {
Expand Down Expand Up @@ -168,6 +170,7 @@ func (w *WakuNode) startKeepAlive(ctx context.Context, randomPeersPingDuration t
}

func (w *WakuNode) pingPeer(ctx context.Context, wg *sync.WaitGroup, peerID peer.ID, resultChan chan bool) {
defer utils.LogOnPanic()
defer wg.Done()

logger := w.log.With(logging.HostID("peer", peerID))
Expand Down
3 changes: 3 additions & 0 deletions waku/v2/node/localnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/protocol"
wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -358,6 +359,7 @@ func (w *WakuNode) watchTopicShards(ctx context.Context) error {
}

go func() {
defer utils.LogOnPanic()
defer evtRelaySubscribed.Close()
defer evtRelayUnsubscribed.Close()

Expand Down Expand Up @@ -411,6 +413,7 @@ func (w *WakuNode) registerAndMonitorReachability(ctx context.Context) {
}
w.wg.Add(1)
go func() {
defer utils.LogOnPanic()
defer myEventSub.Close()
defer w.wg.Done()

Expand Down
4 changes: 4 additions & 0 deletions waku/v2/node/wakunode2.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
func(ctx context.Context, numPeers int) <-chan peer.AddrInfo {
r := make(chan peer.AddrInfo)
go func() {
defer utils.LogOnPanic()
defer close(r)
for ; numPeers != 0; numPeers-- {
select {
Expand Down Expand Up @@ -308,6 +309,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
}

func (w *WakuNode) watchMultiaddressChanges(ctx context.Context) {
defer utils.LogOnPanic()
defer w.wg.Done()

addrsSet := utils.MultiAddrSet(w.ListenAddresses()...)
Expand Down Expand Up @@ -550,6 +552,7 @@ func (w *WakuNode) ID() string {
}

func (w *WakuNode) watchENRChanges(ctx context.Context) {
defer utils.LogOnPanic()
defer w.wg.Done()

var prevNodeVal string
Expand Down Expand Up @@ -887,6 +890,7 @@ func (w *WakuNode) PeersByContentTopic(contentTopic string) peer.IDSlice {
}

func (w *WakuNode) findRelayNodes(ctx context.Context) {
defer utils.LogOnPanic()
defer w.wg.Done()

// Feed peers more often right after the bootstrap, then backoff
Expand Down
2 changes: 2 additions & 0 deletions waku/v2/peermanager/connection_gater.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -77,6 +78,7 @@ func (c *ConnectionGater) InterceptUpgraded(_ network.Conn) (allow bool, reason

// NotifyDisconnect is called when a connection disconnects.
func (c *ConnectionGater) NotifyDisconnect(addr multiaddr.Multiaddr) {
defer utils.LogOnPanic()
ip, err := manet.ToIP(addr)
if err != nil {
return
Expand Down
3 changes: 3 additions & 0 deletions waku/v2/peermanager/fastest_peer_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
"github.com/waku-org/go-waku/logging"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -69,9 +70,11 @@ func (r *FastestPeerSelector) FastestPeer(ctx context.Context, peers peer.IDSlic
pinged := make(map[peer.ID]struct{})

go func() {
defer utils.LogOnPanic()
// Ping any peer with no latency recorded
for peerToPing := range pingCh {
go func(p peer.ID) {
defer utils.LogOnPanic()
defer wg.Done()
rtt := time.Hour
result, err := r.PingPeer(ctx, p)
Expand Down
5 changes: 5 additions & 0 deletions waku/v2/peermanager/peer_connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/onlinechecker"
wps "github.com/waku-org/go-waku/waku/v2/peerstore"
"github.com/waku-org/go-waku/waku/v2/service"
"github.com/waku-org/go-waku/waku/v2/utils"

"go.uber.org/zap"

Expand Down Expand Up @@ -103,6 +104,7 @@ func (c *PeerConnectionStrategy) Subscribe(ctx context.Context, ch <-chan servic
// if running start a goroutine to consume the subscription
c.WaitGroup().Add(1)
go func() {
defer utils.LogOnPanic()
defer c.WaitGroup().Done()
c.consumeSubscription(subscription{ctx, ch})
}()
Expand Down Expand Up @@ -186,6 +188,7 @@ func (c *PeerConnectionStrategy) consumeSubscriptions() {
for _, subs := range c.subscriptions {
c.WaitGroup().Add(1)
go func(s subscription) {
defer utils.LogOnPanic()
defer c.WaitGroup().Done()
c.consumeSubscription(s)
}(subs)
Expand Down Expand Up @@ -233,6 +236,7 @@ func (c *PeerConnectionStrategy) addConnectionBackoff(peerID peer.ID) {
}

func (c *PeerConnectionStrategy) dialPeers() {
defer utils.LogOnPanic()
defer c.WaitGroup().Done()

maxGoRoutines := c.pm.OutPeersTarget
Expand Down Expand Up @@ -272,6 +276,7 @@ func (c *PeerConnectionStrategy) dialPeers() {
}

func (c *PeerConnectionStrategy) dialPeer(pi peer.AddrInfo, sem chan struct{}) {
defer utils.LogOnPanic()
defer c.WaitGroup().Done()
ctx, cancel := context.WithTimeout(c.Context(), c.dialTimeout)
defer cancel()
Expand Down
Loading

0 comments on commit 8b0e031

Please sign in to comment.