Skip to content

Commit

Permalink
Fix sender.Recover() to be goroutine safe (#218)
Browse files Browse the repository at this point in the history
* Fix sender.Recover() to be goroutine safe

Tidy go.sum

* add tracing to recovery code

* honor context while sleeping

* add comments detailing limitations

* add concurrency test

fix hang when test misbehaves
  • Loading branch information
jhendrixMSFT authored Jul 8, 2021
1 parent d9338b7 commit 0eb7b61
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 33 deletions.
14 changes: 0 additions & 14 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
github.com/Azure/azure-amqp-common-go/v3 v3.0.1 h1:mXh+eyOxGLBfqDtfmbtby0l7XfG/6b2NkuZ3B7i6zHA=
github.com/Azure/azure-amqp-common-go/v3 v3.0.1/go.mod h1:PBIGdzcO1teYoufTKMcGibdKaYZv4avS+O6LNIp8bq0=
github.com/Azure/azure-pipeline-go v0.1.8 h1:KmVRa8oFMaargVesEuuEoiLCQ4zCCwQ8QX/xg++KS20=
github.com/Azure/azure-pipeline-go v0.1.8/go.mod h1:XA1kFWRVhSK+KNFiOhfv83Fv8L9achrP7OxIzeTn1Yg=
github.com/Azure/azure-pipeline-go v0.1.9 h1:u7JFb9fFTE6Y/j8ae2VK33ePrRqJqoCM/IWkQdAZ+rg=
github.com/Azure/azure-pipeline-go v0.1.9/go.mod h1:XA1kFWRVhSK+KNFiOhfv83Fv8L9achrP7OxIzeTn1Yg=
Expand All @@ -14,14 +13,11 @@ github.com/Azure/go-amqp v0.13.1/go.mod h1:qj+o8xPCz9tMSbQ83Vp8boHahuRDl5mkNHyt1
github.com/Azure/go-autorest v14.2.0+incompatible h1:V5VMDjClD3GiElqLWO7mz2MxNAK/vTfRHdAubSIPRgs=
github.com/Azure/go-autorest v14.2.0+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24=
github.com/Azure/go-autorest/autorest v0.9.0/go.mod h1:xyHB1BMZT0cuDHU7I0+g046+BFDTQ8rEZB0s4Yfa6bI=
github.com/Azure/go-autorest/autorest v0.9.3 h1:OZEIaBbMdUE/Js+BQKlpO81XlISgipr6yDJ+PSwsgi4=
github.com/Azure/go-autorest/autorest v0.9.3/go.mod h1:GsRuLYvwzLjjjRoWEIyMUaYq8GNUx2nRB378IPt/1p0=
github.com/Azure/go-autorest/autorest v0.11.3 h1:fyYnmYujkIXUgv88D9/Wo2ybE4Zwd/TmQd5sSI5u2Ws=
github.com/Azure/go-autorest/autorest v0.11.3/go.mod h1:JFgpikqFJ/MleTTxwepExTKnFUKKszPS8UavbQYUMuw=
github.com/Azure/go-autorest/autorest/adal v0.5.0/go.mod h1:8Z9fGy2MpX0PvDjB1pEgQTmVqjGhiHBW7RJJEciWzS0=
github.com/Azure/go-autorest/autorest/adal v0.8.0 h1:CxTzQrySOxDnKpLjFJeZAS5Qrv/qFPkgLjx5bOAi//I=
github.com/Azure/go-autorest/autorest/adal v0.8.0/go.mod h1:Z6vX6WXXuyieHAXwMj0S6HY6e6wcHn37qQMBQlvY3lc=
github.com/Azure/go-autorest/autorest/adal v0.8.1 h1:pZdL8o72rK+avFWl+p9nE8RWi1JInZrWJYlnpfXJwHk=
github.com/Azure/go-autorest/autorest/adal v0.8.1/go.mod h1:ZjhuQClTqx435SRJ2iMlOxPYt3d2C/T/7TiQCVZSn3Q=
github.com/Azure/go-autorest/autorest/adal v0.9.0 h1:SigMbuFNuKgc1xcGhaeapbh+8fgsu+GxgDRFyg7f5lM=
github.com/Azure/go-autorest/autorest/adal v0.9.0/go.mod h1:/c022QCutn2P7uY+/oQWWNcK9YU+MH96NgK+jErpbcg=
Expand All @@ -30,25 +26,21 @@ github.com/Azure/go-autorest/autorest/azure/auth v0.4.2/go.mod h1:90gmfKdlmKgfjU
github.com/Azure/go-autorest/autorest/azure/cli v0.3.1 h1:LXl088ZQlP0SBppGFsRZonW6hSvwgL5gRByMbvUbx8U=
github.com/Azure/go-autorest/autorest/azure/cli v0.3.1/go.mod h1:ZG5p860J94/0kI9mNJVoIoLgXcirM2gF5i2kWloofxw=
github.com/Azure/go-autorest/autorest/date v0.1.0/go.mod h1:plvfp3oPSKwf2DNjlBjWF/7vwR+cUD/ELuzDCXwHUVA=
github.com/Azure/go-autorest/autorest/date v0.2.0 h1:yW+Zlqf26583pE43KhfnhFcdmSWlm5Ew6bxipnr/tbM=
github.com/Azure/go-autorest/autorest/date v0.2.0/go.mod h1:vcORJHLJEh643/Ioh9+vPmf1Ij9AEBM5FuBIXLmIy0g=
github.com/Azure/go-autorest/autorest/date v0.3.0 h1:7gUk1U5M/CQbp9WoqinNzJar+8KY+LPI6wiWrP/myHw=
github.com/Azure/go-autorest/autorest/date v0.3.0/go.mod h1:BI0uouVdmngYNUzGWeSYnokU+TrmwEsOqdt8Y6sso74=
github.com/Azure/go-autorest/autorest/mocks v0.1.0/go.mod h1:OTyCOPRA2IgIlWxVYxBee2F5Gr4kF2zd2J5cFRaIDN0=
github.com/Azure/go-autorest/autorest/mocks v0.2.0/go.mod h1:OTyCOPRA2IgIlWxVYxBee2F5Gr4kF2zd2J5cFRaIDN0=
github.com/Azure/go-autorest/autorest/mocks v0.3.0 h1:qJumjCaCudz+OcqE9/XtEPfvtOjOmKaui4EOpFI6zZc=
github.com/Azure/go-autorest/autorest/mocks v0.3.0/go.mod h1:a8FDP3DYzQ4RYfVAxAN3SVSiiO77gL2j2ronKKP0syM=
github.com/Azure/go-autorest/autorest/mocks v0.4.0 h1:z20OWOSG5aCye0HEkDp6TPmP17ZcfeMxPi6HnSALa8c=
github.com/Azure/go-autorest/autorest/mocks v0.4.0/go.mod h1:LTp+uSrOhSkaKrUy935gNZuuIPPVsHlr9DSOxSayd+k=
github.com/Azure/go-autorest/autorest/to v0.3.0 h1:zebkZaadz7+wIQYgC7GXaz3Wb28yKYfVkkBKwc38VF8=
github.com/Azure/go-autorest/autorest/to v0.3.0/go.mod h1:MgwOyqaIuKdG4TL/2ywSsIWKAfJfgHDo8ObuUk3t5sA=
github.com/Azure/go-autorest/autorest/validation v0.2.0 h1:15vMO4y76dehZSq7pAaOLQxC6dZYsSrj2GQpflyM/L4=
github.com/Azure/go-autorest/autorest/validation v0.2.0/go.mod h1:3EEqHnBxQGHXRYq3HT1WyXAvT7LLY3tl70hw6tQIbjI=
github.com/Azure/go-autorest/logger v0.1.0 h1:ruG4BSDXONFRrZZJ2GUXDiUyVpayPmb1GnWeHDdaNKY=
github.com/Azure/go-autorest/logger v0.1.0/go.mod h1:oExouG+K6PryycPJfVSxi/koC6LSNgds39diKLz7Vrc=
github.com/Azure/go-autorest/logger v0.2.0 h1:e4RVHVZKC5p6UANLJHkM4OfR1UKZPj8Wt8Pcx+3oqrE=
github.com/Azure/go-autorest/logger v0.2.0/go.mod h1:T9E3cAhj2VqvPOtCYAvby9aBXkZmbF5NWuPV8+WeEW8=
github.com/Azure/go-autorest/tracing v0.5.0 h1:TRn4WjSnkcSy5AEG3pnbtFSwNtwzjr4VYyQflFE619k=
github.com/Azure/go-autorest/tracing v0.5.0/go.mod h1:r/s2XiOKccPW3HrqB+W0TQzfbtp2fGCgRFtBroKn4Dk=
github.com/Azure/go-autorest/tracing v0.6.0 h1:TYi4+3m5t6K48TGI9AUdb+IzbnSxvnvUMfuitfgcfuo=
github.com/Azure/go-autorest/tracing v0.6.0/go.mod h1:+vhtPC754Xsa23ID7GlGsrdKBpUA79WCAKPPZVC2DeU=
Expand Down Expand Up @@ -82,30 +74,24 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN
github.com/sirupsen/logrus v1.2.0 h1:juTguoYk5qI21pwyTXY3B3Y5cOTH3ZUyZCg1v/mihuo=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1 h1:2vfRuCMp5sSVIDSqO8oNnWJq7mPa6KVP3iPIwFBuy8A=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191206172530-e9b2fee46413 h1:ULYEB3JvPRE/IfO+9uO7vKV/xzVTO7XPAwm8xbf4w2g=
golang.org/x/crypto v0.0.0-20191206172530-e9b2fee46413/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200728195943-123391ffb6de h1:ikNHVSjEfnvz6sxdSPCaPt572qowuyMDMJLLm3Db3ig=
golang.org/x/crypto v0.0.0-20200728195943-123391ffb6de/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3 h1:0GoQqolDA55aaLxZyTzK/Y2ePZzZTUrRacwib7cNsYQ=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859 h1:R/3boaszxrf1GEUWTVDzSKVwLmSJpwZ1yqXm8j0v2QI=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d h1:+R4KGOnez64A81RvjARKc4UT5/tI9ujCIVX+P5KiHuI=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand Down
16 changes: 13 additions & 3 deletions hub_examples_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (

"github.com/joho/godotenv"

"github.com/Azure/azure-event-hubs-go/v3"
eventhub "github.com/Azure/azure-event-hubs-go/v3"
)

func init() {
Expand Down Expand Up @@ -65,7 +65,12 @@ func ExampleHub_helloWorld() {
}

// wait for the first handler to get called with "Hello World!"
<-exit
select {
case <-exit:
// test completed
case <-ctx.Done():
// test timed out
}
err = hub.Close(ctx)
if err != nil {
fmt.Println(err)
Expand Down Expand Up @@ -122,7 +127,12 @@ func ExampleHub_webSocket() {
}

// wait for the first handler to get called with "Hello World!"
<-exit
select {
case <-exit:
// test completed
case <-ctx.Done():
// test timed out
}
err = hub.Close(ctx)
if err != nil {
fmt.Println(err)
Expand Down
37 changes: 37 additions & 0 deletions hub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,27 @@ func (suite *eventHubSuite) TestWebSocket() {
}
}

func (suite *eventHubSuite) TestConcurrency() {
tests := map[string]func(context.Context, *testing.T, *Hub, string){
"TestConcurrentSendWithRecover": testConcurrentSendWithRecover,
}

for name, testFunc := range tests {
setupTestTeardown := func(t *testing.T) {
hub, cleanup := suite.RandomHub()
defer cleanup()
partitionID := (*hub.PartitionIds)[0]
client, closer := suite.newClient(t, *hub.Name, HubWithPartitionedSender(partitionID))
defer closer()
ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout)
defer cancel()
testFunc(ctx, t, client, partitionID)
}

suite.T().Run(name, setupTestTeardown)
}
}

func testBasicSend(ctx context.Context, t *testing.T, client *Hub, _ string) {
err := client.Send(ctx, NewEventFromString("Hello!"))
assert.NoError(t, err)
Expand Down Expand Up @@ -404,6 +425,22 @@ func testBasicSendAndReceive(ctx context.Context, t *testing.T, client *Hub, par
}
}

func testConcurrentSendWithRecover(ctx context.Context, t *testing.T, client *Hub, _ string) {
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
err := client.Send(ctx, NewEventFromString("Hello!"))
assert.NoError(t, err)
err = client.sender.Recover(ctx)
assert.NoError(t, err)
}()
}
end, _ := ctx.Deadline()
waitUntil(t, &wg, time.Until(end))
}

func (suite *eventHubSuite) TestEpochReceivers() {
tests := map[string]func(context.Context, *testing.T, *Hub, []string, string){
"TestEpochGreaterThenLess": testEpochGreaterThenLess,
Expand Down
78 changes: 62 additions & 16 deletions sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
"fmt"
"net"
"strconv"
"sync"
"sync/atomic"
"time"

"github.com/Azure/azure-amqp-common-go/v3/uuid"
Expand All @@ -46,10 +48,13 @@ type (
hub *Hub
connection *amqp.Client
session *session
sender *amqp.Sender
sender atomic.Value // holds a *amqp.Sender
partitionID *string
Name string
recoveryBackoff *backoff.Backoff
// cond and recovering are used to atomically implement Recover()
cond *sync.Cond
recovering bool
}

// SendOption provides a way to customize a message on sending
Expand All @@ -74,32 +79,65 @@ func (h *Hub) newSender(ctx context.Context) (*sender, error) {
Max: 4 * time.Second,
Jitter: true,
},
cond: sync.NewCond(&sync.Mutex{}),
}
tab.For(ctx).Debug(fmt.Sprintf("creating a new sender for entity path %s", s.getAddress()))
err := s.newSessionAndLink(ctx)
return s, err
}

// Recover will attempt to close the current session and link, then rebuild them
func (s *sender) amqpSender() *amqp.Sender {
return s.sender.Load().(*amqp.Sender)
}

// Recover will attempt to close the current session and link, then rebuild them.
// Note that while the implementation will ensure that Recover() is goroutine safe
// it won't prevent excessive connection recovery. E.g. if a Recover() is in progress
// and is in block 2, any additional calls to Recover() will wait at block 1 to
// restart the recovery process once block 2 exits.
func (s *sender) Recover(ctx context.Context) error {
span, ctx := s.startProducerSpanFromContext(ctx, "eh.sender.Recover")
defer span.End()

// we expect the sender, session or client is in an error state, ignore errors
closeCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
_ = s.sender.Close(closeCtx)
_ = s.session.Close(closeCtx)
_ = s.connection.Close()
return s.newSessionAndLink(ctx)
recover := false
// acquire exclusive lock to see if this goroutine should recover
s.cond.L.Lock() // block 1
if !s.recovering {
// another goroutine isn't recovering, so this one will
tab.For(ctx).Debug("will recover connection")
s.recovering = true
recover = true
} else {
// wait for the recovery to finish
tab.For(ctx).Debug("waiting for connection to recover")
s.cond.Wait()
}
s.cond.L.Unlock()
var err error
if recover {
tab.For(ctx).Debug("recovering connection")
// we expect the sender, session or client is in an error state, ignore errors
closeCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
// update shared state
s.cond.L.Lock() // block 2
_ = s.amqpSender().Close(closeCtx)
_ = s.session.Close(closeCtx)
_ = s.connection.Close()
err = s.newSessionAndLink(ctx)
s.recovering = false
s.cond.L.Unlock()
// signal to waiters that recovery is complete
s.cond.Broadcast()
}
return err
}

// Close will close the AMQP connection, session and link of the sender
func (s *sender) Close(ctx context.Context) error {
span, _ := s.startProducerSpanFromContext(ctx, "eh.sender.Close")
defer span.End()

err := s.sender.Close(ctx)
err := s.amqpSender().Close(ctx)
if err != nil {
tab.For(ctx).Error(err)
if sessionErr := s.session.Close(ctx); sessionErr != nil {
Expand Down Expand Up @@ -170,17 +208,25 @@ func (s *sender) trySend(ctx context.Context, evt eventer) error {
sp.AddAttributes(tab.StringAttribute("he.message_id", str))
}

// create a per goroutine copy as Duration() and Reset() modify its state
backoff := *s.recoveryBackoff
recvr := func(err error, recover bool) {
duration := s.recoveryBackoff.Duration()
duration := backoff.Duration()
tab.For(ctx).Debug("amqp error, delaying " + strconv.FormatInt(int64(duration/time.Millisecond), 10) + " millis: " + err.Error())
time.Sleep(duration)
select {
case <-time.After(duration):
// ok, continue to recover
case <-ctx.Done():
// context expired, exit
return
}
if recover {
err = s.Recover(ctx)
if err != nil {
tab.For(ctx).Debug("failed to recover connection")
} else {
tab.For(ctx).Debug("recovered connection")
s.recoveryBackoff.Reset()
backoff.Reset()
}
}
}
Expand All @@ -191,7 +237,7 @@ func (s *sender) trySend(ctx context.Context, evt eventer) error {
return ctx.Err()
default:
// try as long as the context is not dead
err := s.sender.Send(ctx, msg)
err := s.amqpSender().Send(ctx, msg)
if err == nil {
// successful send
return err
Expand Down Expand Up @@ -272,7 +318,7 @@ func (s *sender) newSessionAndLink(ctx context.Context) error {
return err
}

s.sender = amqpSender
s.sender.Store(amqpSender)
return nil
}

Expand Down

0 comments on commit 0eb7b61

Please sign in to comment.