diff --git a/mongo/integration/change_stream_test.go b/mongo/integration/change_stream_test.go index 868706ad1ef..b3d0469c36f 100644 --- a/mongo/integration/change_stream_test.go +++ b/mongo/integration/change_stream_test.go @@ -770,7 +770,7 @@ func TestChangeStream_ReplicaSet(t *testing.T) { require.NoError(mt, err, "failed to update idValue") }() - nextCtx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + nextCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) t.Cleanup(cancel) type splitEvent struct { diff --git a/mongo/integration/unified/client_entity.go b/mongo/integration/unified/client_entity.go index e63c891039c..ff7d9d5fc33 100644 --- a/mongo/integration/unified/client_entity.go +++ b/mongo/integration/unified/client_entity.go @@ -66,6 +66,7 @@ type clientEntity struct { eventsCountLock sync.RWMutex serverDescriptionChangedEventsCountLock sync.RWMutex + eventProcessMu sync.RWMutex entityMap *EntityMap @@ -471,6 +472,9 @@ func (c *clientEntity) processPoolEvent(evt *event.PoolEvent) { } func (c *clientEntity) processServerDescriptionChangedEvent(evt *event.ServerDescriptionChangedEvent) { + c.eventProcessMu.Lock() + defer c.eventProcessMu.Unlock() + if !c.getRecordEvents() { return } @@ -487,6 +491,9 @@ func (c *clientEntity) processServerDescriptionChangedEvent(evt *event.ServerDes } func (c *clientEntity) processServerHeartbeatFailedEvent(evt *event.ServerHeartbeatFailedEvent) { + c.eventProcessMu.Lock() + defer c.eventProcessMu.Unlock() + if !c.getRecordEvents() { return } @@ -499,6 +506,9 @@ func (c *clientEntity) processServerHeartbeatFailedEvent(evt *event.ServerHeartb } func (c *clientEntity) processServerHeartbeatStartedEvent(evt *event.ServerHeartbeatStartedEvent) { + c.eventProcessMu.Lock() + defer c.eventProcessMu.Unlock() + if !c.getRecordEvents() { return } @@ -511,6 +521,9 @@ func (c *clientEntity) processServerHeartbeatStartedEvent(evt *event.ServerHeart } func (c *clientEntity) processServerHeartbeatSucceededEvent(evt *event.ServerHeartbeatSucceededEvent) { + c.eventProcessMu.Lock() + defer c.eventProcessMu.Unlock() + if !c.getRecordEvents() { return } @@ -523,6 +536,9 @@ func (c *clientEntity) processServerHeartbeatSucceededEvent(evt *event.ServerHea } func (c *clientEntity) processTopologyDescriptionChangedEvent(evt *event.TopologyDescriptionChangedEvent) { + c.eventProcessMu.Lock() + defer c.eventProcessMu.Unlock() + if !c.getRecordEvents() { return } diff --git a/mongo/integration/unified/logger.go b/mongo/integration/unified/logger.go index 6dcadacf4aa..1d9a612092b 100644 --- a/mongo/integration/unified/logger.go +++ b/mongo/integration/unified/logger.go @@ -7,6 +7,8 @@ package unified import ( + "sync" + "go.mongodb.org/mongo-driver/internal/logger" ) @@ -20,9 +22,19 @@ type orderedLogMessage struct { // Logger is the Sink used to captured log messages for logger verification in // the unified spec tests. type Logger struct { + // bufSize is the number of logs expected to be sent to the logger for a + // unified spec test. + bufSize int + + // lastOrder increments each time the "Info" method is called, and is used to + // determine when to close the logQueue. lastOrder int - logQueue chan orderedLogMessage - bufSize int + + // orderMu guards the order value, which increments each time the "Info" + // method is called. This is necessary since "Info" could be called from + // multiple go routines, e.g. SDAM logs. + orderMu sync.RWMutex + logQueue chan orderedLogMessage } func newLogger(olm *observeLogMessages, bufSize int) *Logger { @@ -44,14 +56,17 @@ func (log *Logger) Info(level int, msg string, args ...interface{}) { return } - defer func() { log.lastOrder++ }() - // If the order is greater than the buffer size, we must return. This // would indicate that the logQueue channel has been closed. if log.lastOrder > log.bufSize { return } + log.orderMu.Lock() + defer log.orderMu.Unlock() + + defer func() { log.lastOrder++ }() + // Add the Diff back to the level, as there is no need to create a // logging offset. level = level + logger.DiffToInfo @@ -68,7 +83,7 @@ func (log *Logger) Info(level int, msg string, args ...interface{}) { logMessage: logMessage, } - // If the order has reached the buffer size, then close the channe. + // If the order has reached the buffer size, then close the channel. if log.lastOrder == log.bufSize { close(log.logQueue) } diff --git a/mongo/integration/unified/unified_spec_runner.go b/mongo/integration/unified/unified_spec_runner.go index 1b1cbeb5339..7b92d07204f 100644 --- a/mongo/integration/unified/unified_spec_runner.go +++ b/mongo/integration/unified/unified_spec_runner.go @@ -224,7 +224,7 @@ func (tc *TestCase) Run(ls LoggerSkipper) error { } // Count the number of expected log messages over all clients. - expectedLogCount := 0 + var expectedLogCount int for _, clientLog := range tc.ExpectLogMessages { expectedLogCount += len(clientLog.LogMessages) }