diff --git a/go.sum b/go.sum index b4a0c6f8..a1fc9627 100644 --- a/go.sum +++ b/go.sum @@ -1,6 +1,5 @@ github.com/Azure/azure-amqp-common-go/v3 v3.0.1 h1:mXh+eyOxGLBfqDtfmbtby0l7XfG/6b2NkuZ3B7i6zHA= github.com/Azure/azure-amqp-common-go/v3 v3.0.1/go.mod h1:PBIGdzcO1teYoufTKMcGibdKaYZv4avS+O6LNIp8bq0= -github.com/Azure/azure-pipeline-go v0.1.8 h1:KmVRa8oFMaargVesEuuEoiLCQ4zCCwQ8QX/xg++KS20= github.com/Azure/azure-pipeline-go v0.1.8/go.mod h1:XA1kFWRVhSK+KNFiOhfv83Fv8L9achrP7OxIzeTn1Yg= github.com/Azure/azure-pipeline-go v0.1.9 h1:u7JFb9fFTE6Y/j8ae2VK33ePrRqJqoCM/IWkQdAZ+rg= github.com/Azure/azure-pipeline-go v0.1.9/go.mod h1:XA1kFWRVhSK+KNFiOhfv83Fv8L9achrP7OxIzeTn1Yg= @@ -14,14 +13,11 @@ github.com/Azure/go-amqp v0.13.1/go.mod h1:qj+o8xPCz9tMSbQ83Vp8boHahuRDl5mkNHyt1 github.com/Azure/go-autorest v14.2.0+incompatible h1:V5VMDjClD3GiElqLWO7mz2MxNAK/vTfRHdAubSIPRgs= github.com/Azure/go-autorest v14.2.0+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24= github.com/Azure/go-autorest/autorest v0.9.0/go.mod h1:xyHB1BMZT0cuDHU7I0+g046+BFDTQ8rEZB0s4Yfa6bI= -github.com/Azure/go-autorest/autorest v0.9.3 h1:OZEIaBbMdUE/Js+BQKlpO81XlISgipr6yDJ+PSwsgi4= github.com/Azure/go-autorest/autorest v0.9.3/go.mod h1:GsRuLYvwzLjjjRoWEIyMUaYq8GNUx2nRB378IPt/1p0= github.com/Azure/go-autorest/autorest v0.11.3 h1:fyYnmYujkIXUgv88D9/Wo2ybE4Zwd/TmQd5sSI5u2Ws= github.com/Azure/go-autorest/autorest v0.11.3/go.mod h1:JFgpikqFJ/MleTTxwepExTKnFUKKszPS8UavbQYUMuw= github.com/Azure/go-autorest/autorest/adal v0.5.0/go.mod h1:8Z9fGy2MpX0PvDjB1pEgQTmVqjGhiHBW7RJJEciWzS0= -github.com/Azure/go-autorest/autorest/adal v0.8.0 h1:CxTzQrySOxDnKpLjFJeZAS5Qrv/qFPkgLjx5bOAi//I= github.com/Azure/go-autorest/autorest/adal v0.8.0/go.mod h1:Z6vX6WXXuyieHAXwMj0S6HY6e6wcHn37qQMBQlvY3lc= -github.com/Azure/go-autorest/autorest/adal v0.8.1 h1:pZdL8o72rK+avFWl+p9nE8RWi1JInZrWJYlnpfXJwHk= github.com/Azure/go-autorest/autorest/adal v0.8.1/go.mod h1:ZjhuQClTqx435SRJ2iMlOxPYt3d2C/T/7TiQCVZSn3Q= github.com/Azure/go-autorest/autorest/adal v0.9.0 h1:SigMbuFNuKgc1xcGhaeapbh+8fgsu+GxgDRFyg7f5lM= github.com/Azure/go-autorest/autorest/adal v0.9.0/go.mod h1:/c022QCutn2P7uY+/oQWWNcK9YU+MH96NgK+jErpbcg= @@ -30,13 +26,11 @@ github.com/Azure/go-autorest/autorest/azure/auth v0.4.2/go.mod h1:90gmfKdlmKgfjU github.com/Azure/go-autorest/autorest/azure/cli v0.3.1 h1:LXl088ZQlP0SBppGFsRZonW6hSvwgL5gRByMbvUbx8U= github.com/Azure/go-autorest/autorest/azure/cli v0.3.1/go.mod h1:ZG5p860J94/0kI9mNJVoIoLgXcirM2gF5i2kWloofxw= github.com/Azure/go-autorest/autorest/date v0.1.0/go.mod h1:plvfp3oPSKwf2DNjlBjWF/7vwR+cUD/ELuzDCXwHUVA= -github.com/Azure/go-autorest/autorest/date v0.2.0 h1:yW+Zlqf26583pE43KhfnhFcdmSWlm5Ew6bxipnr/tbM= github.com/Azure/go-autorest/autorest/date v0.2.0/go.mod h1:vcORJHLJEh643/Ioh9+vPmf1Ij9AEBM5FuBIXLmIy0g= github.com/Azure/go-autorest/autorest/date v0.3.0 h1:7gUk1U5M/CQbp9WoqinNzJar+8KY+LPI6wiWrP/myHw= github.com/Azure/go-autorest/autorest/date v0.3.0/go.mod h1:BI0uouVdmngYNUzGWeSYnokU+TrmwEsOqdt8Y6sso74= github.com/Azure/go-autorest/autorest/mocks v0.1.0/go.mod h1:OTyCOPRA2IgIlWxVYxBee2F5Gr4kF2zd2J5cFRaIDN0= github.com/Azure/go-autorest/autorest/mocks v0.2.0/go.mod h1:OTyCOPRA2IgIlWxVYxBee2F5Gr4kF2zd2J5cFRaIDN0= -github.com/Azure/go-autorest/autorest/mocks v0.3.0 h1:qJumjCaCudz+OcqE9/XtEPfvtOjOmKaui4EOpFI6zZc= github.com/Azure/go-autorest/autorest/mocks v0.3.0/go.mod h1:a8FDP3DYzQ4RYfVAxAN3SVSiiO77gL2j2ronKKP0syM= github.com/Azure/go-autorest/autorest/mocks v0.4.0 h1:z20OWOSG5aCye0HEkDp6TPmP17ZcfeMxPi6HnSALa8c= github.com/Azure/go-autorest/autorest/mocks v0.4.0/go.mod h1:LTp+uSrOhSkaKrUy935gNZuuIPPVsHlr9DSOxSayd+k= @@ -44,11 +38,9 @@ github.com/Azure/go-autorest/autorest/to v0.3.0 h1:zebkZaadz7+wIQYgC7GXaz3Wb28yK github.com/Azure/go-autorest/autorest/to v0.3.0/go.mod h1:MgwOyqaIuKdG4TL/2ywSsIWKAfJfgHDo8ObuUk3t5sA= github.com/Azure/go-autorest/autorest/validation v0.2.0 h1:15vMO4y76dehZSq7pAaOLQxC6dZYsSrj2GQpflyM/L4= github.com/Azure/go-autorest/autorest/validation v0.2.0/go.mod h1:3EEqHnBxQGHXRYq3HT1WyXAvT7LLY3tl70hw6tQIbjI= -github.com/Azure/go-autorest/logger v0.1.0 h1:ruG4BSDXONFRrZZJ2GUXDiUyVpayPmb1GnWeHDdaNKY= github.com/Azure/go-autorest/logger v0.1.0/go.mod h1:oExouG+K6PryycPJfVSxi/koC6LSNgds39diKLz7Vrc= github.com/Azure/go-autorest/logger v0.2.0 h1:e4RVHVZKC5p6UANLJHkM4OfR1UKZPj8Wt8Pcx+3oqrE= github.com/Azure/go-autorest/logger v0.2.0/go.mod h1:T9E3cAhj2VqvPOtCYAvby9aBXkZmbF5NWuPV8+WeEW8= -github.com/Azure/go-autorest/tracing v0.5.0 h1:TRn4WjSnkcSy5AEG3pnbtFSwNtwzjr4VYyQflFE619k= github.com/Azure/go-autorest/tracing v0.5.0/go.mod h1:r/s2XiOKccPW3HrqB+W0TQzfbtp2fGCgRFtBroKn4Dk= github.com/Azure/go-autorest/tracing v0.6.0 h1:TYi4+3m5t6K48TGI9AUdb+IzbnSxvnvUMfuitfgcfuo= github.com/Azure/go-autorest/tracing v0.6.0/go.mod h1:+vhtPC754Xsa23ID7GlGsrdKBpUA79WCAKPPZVC2DeU= @@ -82,22 +74,17 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN github.com/sirupsen/logrus v1.2.0 h1:juTguoYk5qI21pwyTXY3B3Y5cOTH3ZUyZCg1v/mihuo= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/objx v0.1.1 h1:2vfRuCMp5sSVIDSqO8oNnWJq7mPa6KVP3iPIwFBuy8A= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= -github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= -golang.org/x/crypto v0.0.0-20191206172530-e9b2fee46413 h1:ULYEB3JvPRE/IfO+9uO7vKV/xzVTO7XPAwm8xbf4w2g= golang.org/x/crypto v0.0.0-20191206172530-e9b2fee46413/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200728195943-123391ffb6de h1:ikNHVSjEfnvz6sxdSPCaPt572qowuyMDMJLLm3Db3ig= golang.org/x/crypto v0.0.0-20200728195943-123391ffb6de/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3 h1:0GoQqolDA55aaLxZyTzK/Y2ePZzZTUrRacwib7cNsYQ= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859 h1:R/3boaszxrf1GEUWTVDzSKVwLmSJpwZ1yqXm8j0v2QI= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= @@ -105,7 +92,6 @@ golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d h1:+R4KGOnez64A81RvjARKc4UT5/tI9ujCIVX+P5KiHuI= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/hub_examples_test.go b/hub_examples_test.go index d9dabbc0..a0a977a1 100644 --- a/hub_examples_test.go +++ b/hub_examples_test.go @@ -8,7 +8,7 @@ import ( "github.com/joho/godotenv" - "github.com/Azure/azure-event-hubs-go/v3" + eventhub "github.com/Azure/azure-event-hubs-go/v3" ) func init() { @@ -65,7 +65,12 @@ func ExampleHub_helloWorld() { } // wait for the first handler to get called with "Hello World!" - <-exit + select { + case <-exit: + // test completed + case <-ctx.Done(): + // test timed out + } err = hub.Close(ctx) if err != nil { fmt.Println(err) @@ -122,7 +127,12 @@ func ExampleHub_webSocket() { } // wait for the first handler to get called with "Hello World!" - <-exit + select { + case <-exit: + // test completed + case <-ctx.Done(): + // test timed out + } err = hub.Close(ctx) if err != nil { fmt.Println(err) diff --git a/hub_test.go b/hub_test.go index f8c05a71..c3570dc8 100644 --- a/hub_test.go +++ b/hub_test.go @@ -306,6 +306,27 @@ func (suite *eventHubSuite) TestWebSocket() { } } +func (suite *eventHubSuite) TestConcurrency() { + tests := map[string]func(context.Context, *testing.T, *Hub, string){ + "TestConcurrentSendWithRecover": testConcurrentSendWithRecover, + } + + for name, testFunc := range tests { + setupTestTeardown := func(t *testing.T) { + hub, cleanup := suite.RandomHub() + defer cleanup() + partitionID := (*hub.PartitionIds)[0] + client, closer := suite.newClient(t, *hub.Name, HubWithPartitionedSender(partitionID)) + defer closer() + ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout) + defer cancel() + testFunc(ctx, t, client, partitionID) + } + + suite.T().Run(name, setupTestTeardown) + } +} + func testBasicSend(ctx context.Context, t *testing.T, client *Hub, _ string) { err := client.Send(ctx, NewEventFromString("Hello!")) assert.NoError(t, err) @@ -404,6 +425,22 @@ func testBasicSendAndReceive(ctx context.Context, t *testing.T, client *Hub, par } } +func testConcurrentSendWithRecover(ctx context.Context, t *testing.T, client *Hub, _ string) { + var wg sync.WaitGroup + for i := 0; i < 100; i++ { + wg.Add(1) + go func() { + defer wg.Done() + err := client.Send(ctx, NewEventFromString("Hello!")) + assert.NoError(t, err) + err = client.sender.Recover(ctx) + assert.NoError(t, err) + }() + } + end, _ := ctx.Deadline() + waitUntil(t, &wg, time.Until(end)) +} + func (suite *eventHubSuite) TestEpochReceivers() { tests := map[string]func(context.Context, *testing.T, *Hub, []string, string){ "TestEpochGreaterThenLess": testEpochGreaterThenLess, diff --git a/sender.go b/sender.go index da234e82..b5f4c72e 100644 --- a/sender.go +++ b/sender.go @@ -27,6 +27,8 @@ import ( "fmt" "net" "strconv" + "sync" + "sync/atomic" "time" "github.com/Azure/azure-amqp-common-go/v3/uuid" @@ -46,10 +48,13 @@ type ( hub *Hub connection *amqp.Client session *session - sender *amqp.Sender + sender atomic.Value // holds a *amqp.Sender partitionID *string Name string recoveryBackoff *backoff.Backoff + // cond and recovering are used to atomically implement Recover() + cond *sync.Cond + recovering bool } // SendOption provides a way to customize a message on sending @@ -74,24 +79,57 @@ func (h *Hub) newSender(ctx context.Context) (*sender, error) { Max: 4 * time.Second, Jitter: true, }, + cond: sync.NewCond(&sync.Mutex{}), } tab.For(ctx).Debug(fmt.Sprintf("creating a new sender for entity path %s", s.getAddress())) err := s.newSessionAndLink(ctx) return s, err } -// Recover will attempt to close the current session and link, then rebuild them +func (s *sender) amqpSender() *amqp.Sender { + return s.sender.Load().(*amqp.Sender) +} + +// Recover will attempt to close the current session and link, then rebuild them. +// Note that while the implementation will ensure that Recover() is goroutine safe +// it won't prevent excessive connection recovery. E.g. if a Recover() is in progress +// and is in block 2, any additional calls to Recover() will wait at block 1 to +// restart the recovery process once block 2 exits. func (s *sender) Recover(ctx context.Context) error { span, ctx := s.startProducerSpanFromContext(ctx, "eh.sender.Recover") defer span.End() - - // we expect the sender, session or client is in an error state, ignore errors - closeCtx, cancel := context.WithTimeout(ctx, 10*time.Second) - defer cancel() - _ = s.sender.Close(closeCtx) - _ = s.session.Close(closeCtx) - _ = s.connection.Close() - return s.newSessionAndLink(ctx) + recover := false + // acquire exclusive lock to see if this goroutine should recover + s.cond.L.Lock() // block 1 + if !s.recovering { + // another goroutine isn't recovering, so this one will + tab.For(ctx).Debug("will recover connection") + s.recovering = true + recover = true + } else { + // wait for the recovery to finish + tab.For(ctx).Debug("waiting for connection to recover") + s.cond.Wait() + } + s.cond.L.Unlock() + var err error + if recover { + tab.For(ctx).Debug("recovering connection") + // we expect the sender, session or client is in an error state, ignore errors + closeCtx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + // update shared state + s.cond.L.Lock() // block 2 + _ = s.amqpSender().Close(closeCtx) + _ = s.session.Close(closeCtx) + _ = s.connection.Close() + err = s.newSessionAndLink(ctx) + s.recovering = false + s.cond.L.Unlock() + // signal to waiters that recovery is complete + s.cond.Broadcast() + } + return err } // Close will close the AMQP connection, session and link of the sender @@ -99,7 +137,7 @@ func (s *sender) Close(ctx context.Context) error { span, _ := s.startProducerSpanFromContext(ctx, "eh.sender.Close") defer span.End() - err := s.sender.Close(ctx) + err := s.amqpSender().Close(ctx) if err != nil { tab.For(ctx).Error(err) if sessionErr := s.session.Close(ctx); sessionErr != nil { @@ -170,17 +208,25 @@ func (s *sender) trySend(ctx context.Context, evt eventer) error { sp.AddAttributes(tab.StringAttribute("he.message_id", str)) } + // create a per goroutine copy as Duration() and Reset() modify its state + backoff := *s.recoveryBackoff recvr := func(err error, recover bool) { - duration := s.recoveryBackoff.Duration() + duration := backoff.Duration() tab.For(ctx).Debug("amqp error, delaying " + strconv.FormatInt(int64(duration/time.Millisecond), 10) + " millis: " + err.Error()) - time.Sleep(duration) + select { + case <-time.After(duration): + // ok, continue to recover + case <-ctx.Done(): + // context expired, exit + return + } if recover { err = s.Recover(ctx) if err != nil { tab.For(ctx).Debug("failed to recover connection") } else { tab.For(ctx).Debug("recovered connection") - s.recoveryBackoff.Reset() + backoff.Reset() } } } @@ -191,7 +237,7 @@ func (s *sender) trySend(ctx context.Context, evt eventer) error { return ctx.Err() default: // try as long as the context is not dead - err := s.sender.Send(ctx, msg) + err := s.amqpSender().Send(ctx, msg) if err == nil { // successful send return err @@ -272,7 +318,7 @@ func (s *sender) newSessionAndLink(ctx context.Context) error { return err } - s.sender = amqpSender + s.sender.Store(amqpSender) return nil }