Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit to single inflight package syncing operation #289

Merged
merged 12 commits into from
Aug 29, 2024
1 change: 1 addition & 0 deletions client/httpclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ func (c *httpClient) runUntilStopped(ctx context.Context) {
&c.common.ClientSyncedState,
c.common.PackagesStateProvider,
c.common.Capabilities,
&c.common.PackageSyncMutex,
)
}

Expand Down
3 changes: 3 additions & 0 deletions client/internal/clientcommon.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ type ClientCommon struct {
// PackagesStateProvider provides access to the local state of packages.
PackagesStateProvider types.PackagesStateProvider

// PackageSyncMutex makes sure only one package syncing operation happens at a time.
PackageSyncMutex sync.Mutex

// The transport-specific sender.
sender Sender

Expand Down
4 changes: 3 additions & 1 deletion client/internal/httpsender.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"fmt"
"io"
"net/http"
"sync"
"sync/atomic"
"time"

Expand Down Expand Up @@ -91,10 +92,11 @@ func (h *HTTPSender) Run(
clientSyncedState *ClientSyncedState,
packagesStateProvider types.PackagesStateProvider,
capabilities protobufs.AgentCapabilities,
packageSyncMutex *sync.Mutex,
) {
h.url = url
h.callbacks = callbacks
h.receiveProcessor = newReceivedProcessor(h.logger, callbacks, h, clientSyncedState, packagesStateProvider, capabilities)
h.receiveProcessor = newReceivedProcessor(h.logger, callbacks, h, clientSyncedState, packagesStateProvider, capabilities, packageSyncMutex)

for {
pollingTimer := time.NewTimer(time.Millisecond * time.Duration(atomic.LoadInt64(&h.pollingIntervalMs)))
Expand Down
104 changes: 103 additions & 1 deletion client/internal/httpsender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func TestRequestInstanceUidFlagReset(t *testing.T) {
clientSyncedState := &ClientSyncedState{}
clientSyncedState.SetFlags(protobufs.AgentToServerFlags_AgentToServerFlags_RequestInstanceUid)
capabilities := protobufs.AgentCapabilities_AgentCapabilities_Unspecified
sender.receiveProcessor = newReceivedProcessor(&sharedinternal.NopLogger{}, sender.callbacks, sender, clientSyncedState, nil, capabilities)
sender.receiveProcessor = newReceivedProcessor(&sharedinternal.NopLogger{}, sender.callbacks, sender, clientSyncedState, nil, capabilities, new(sync.Mutex))

// If we process a message with a nil AgentIdentification, or an incorrect NewInstanceUid.
sender.receiveProcessor.ProcessReceivedMessage(ctx,
Expand All @@ -208,3 +208,105 @@ func TestRequestInstanceUidFlagReset(t *testing.T) {
assert.Equal(t, sender.receiveProcessor.clientSyncedState.flags, protobufs.AgentToServerFlags_AgentToServerFlags_Unspecified)
cancel()
}

func TestPackageUpdatesInParallel(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
localPackageState := NewInMemPackagesStore()
sender := NewHTTPSender(&sharedinternal.NopLogger{})

var messages atomic.Int32
var mux sync.Mutex
sender.callbacks = types.CallbacksStruct{
OnMessageFunc: func(ctx context.Context, msg *types.MessageData) {
err := msg.PackageSyncer.Sync(ctx)
assert.NoError(t, err)
messages.Add(1)
},
}

clientSyncedState := &ClientSyncedState{}
capabilities := protobufs.AgentCapabilities_AgentCapabilities_AcceptsPackages
sender.receiveProcessor = newReceivedProcessor(&sharedinternal.NopLogger{}, sender.callbacks, sender, clientSyncedState, localPackageState, capabilities, &mux)

sender.receiveProcessor.ProcessReceivedMessage(ctx,
&protobufs.ServerToAgent{
PackagesAvailable: &protobufs.PackagesAvailable{
Packages: map[string]*protobufs.PackageAvailable{
"package1": {
Type: protobufs.PackageType_PackageType_TopLevel,
Version: "1.0.0",
File: &protobufs.DownloadableFile{
DownloadUrl: "foo",
ContentHash: []byte{4, 5},
},
Hash: []byte{1, 2, 3},
},
},
AllPackagesHash: []byte{1, 2, 3, 4, 5},
},
})
sender.receiveProcessor.ProcessReceivedMessage(ctx,
&protobufs.ServerToAgent{
PackagesAvailable: &protobufs.PackagesAvailable{
Packages: map[string]*protobufs.PackageAvailable{
"package22": {
Type: protobufs.PackageType_PackageType_TopLevel,
Version: "1.0.0",
File: &protobufs.DownloadableFile{
DownloadUrl: "bar",
ContentHash: []byte{4, 5},
},
Hash: []byte{1, 2, 3},
},
},
AllPackagesHash: []byte{1, 2, 3, 4, 5},
},
})

assert.Eventually(t, func() bool {
return messages.Load() == 2
}, 2*time.Second, 100*time.Millisecond, "both messages must have been processed successfully")

cancel()
}

func TestPackageUpdatesWithError(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
sender := NewHTTPSender(&sharedinternal.NopLogger{})

// We'll pass in a nil PackageStateProvider to force the Sync call to return with an error.
localPackageState := types.PackagesStateProvider(nil)
var messages atomic.Int32
var mux sync.Mutex
sender.callbacks = types.CallbacksStruct{
OnMessageFunc: func(ctx context.Context, msg *types.MessageData) {
// Make sure the call to Sync will return an error due to a nil PackageStateProvider
err := msg.PackageSyncer.Sync(ctx)
assert.Error(t, err)
messages.Add(1)
},
}

clientSyncedState := &ClientSyncedState{}

capabilities := protobufs.AgentCapabilities_AgentCapabilities_AcceptsPackages
sender.receiveProcessor = newReceivedProcessor(&sharedinternal.NopLogger{}, sender.callbacks, sender, clientSyncedState, localPackageState, capabilities, &mux)

// Send two messages in parallel.
sender.receiveProcessor.ProcessReceivedMessage(ctx,
&protobufs.ServerToAgent{
PackagesAvailable: &protobufs.PackagesAvailable{},
})
sender.receiveProcessor.ProcessReceivedMessage(ctx,
&protobufs.ServerToAgent{
PackagesAvailable: &protobufs.PackagesAvailable{},
})

// Make sure that even though the call to Sync errored out early, the lock
// was still released properly for both messages to be processed.
assert.Eventually(t, func() bool {
return messages.Load() == 2
}, 5*time.Second, 100*time.Millisecond, "both messages must have been processed successfully")

cancel()
}
19 changes: 18 additions & 1 deletion client/internal/packagessyncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"errors"
"fmt"
"net/http"
"sync"

"github.com/open-telemetry/opamp-go/client/types"
"github.com/open-telemetry/opamp-go/protobufs"
Expand All @@ -20,6 +21,7 @@
sender Sender

statuses *protobufs.PackageStatuses
mux *sync.Mutex
doneCh chan struct{}
}

Expand All @@ -30,6 +32,7 @@
sender Sender,
clientSyncedState *ClientSyncedState,
packagesStateProvider types.PackagesStateProvider,
mux *sync.Mutex,
) *packagesSyncer {
return &packagesSyncer{
logger: logger,
Expand All @@ -38,6 +41,7 @@
clientSyncedState: clientSyncedState,
localState: packagesStateProvider,
doneCh: make(chan struct{}),
mux: mux,
}
}

Expand All @@ -49,15 +53,24 @@
}()

// Prepare package statuses.
// Grab a lock to make sure that package statuses are not overriden by
// another call to Sync running in parallel.
// In case Sync returns early with an error, take care of unlocking the
// mutex in this goroutine; otherwise it will be unlocked at the end
// of the sync operation.
s.mux.Lock()
if err := s.initStatuses(); err != nil {
s.mux.Unlock()
return err
}

if err := s.clientSyncedState.SetPackageStatuses(s.statuses); err != nil {
s.mux.Unlock()

Check warning on line 68 in client/internal/packagessyncer.go

View check run for this annotation

Codecov / codecov/patch

client/internal/packagessyncer.go#L68

Added line #L68 was not covered by tests
return err
}

// Now do the actual syncing in the background.
// Now do the actual syncing in the background and release the lock from
// inside of the goroutine.
go s.doSync(ctx)

return nil
Expand Down Expand Up @@ -99,6 +112,10 @@

// doSync performs the actual syncing process.
func (s *packagesSyncer) doSync(ctx context.Context) {
// Once doSync returns in a separate goroutine, make sure to release the
// mutex so that a new syncing process can take place.
defer s.mux.Unlock()

hash, err := s.localState.AllPackagesHash()
if err != nil {
s.logger.Errorf(ctx, "Package syncing failed: %V", err)
Expand Down
7 changes: 7 additions & 0 deletions client/internal/receivedprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package internal
import (
"context"
"fmt"
"sync"

"github.com/open-telemetry/opamp-go/client/types"
"github.com/open-telemetry/opamp-go/protobufs"
Expand All @@ -24,6 +25,9 @@ type receivedProcessor struct {

packagesStateProvider types.PackagesStateProvider

// packageSyncMutex protects against multiple package syncing operations at the same time.
packageSyncMutex *sync.Mutex

// Agent's capabilities defined at Start() time.
capabilities protobufs.AgentCapabilities
}
Expand All @@ -35,6 +39,7 @@ func newReceivedProcessor(
clientSyncedState *ClientSyncedState,
packagesStateProvider types.PackagesStateProvider,
capabilities protobufs.AgentCapabilities,
packageSyncMutex *sync.Mutex,
) receivedProcessor {
return receivedProcessor{
logger: logger,
Expand All @@ -43,6 +48,7 @@ func newReceivedProcessor(
clientSyncedState: clientSyncedState,
packagesStateProvider: packagesStateProvider,
capabilities: capabilities,
packageSyncMutex: packageSyncMutex,
}
}

Expand Down Expand Up @@ -122,6 +128,7 @@ func (r *receivedProcessor) ProcessReceivedMessage(ctx context.Context, msg *pro
r.sender,
r.clientSyncedState,
r.packagesStateProvider,
r.packageSyncMutex,
)
} else {
r.logger.Debugf(ctx, "Ignoring PackagesAvailable, agent does not have AcceptsPackages capability")
Expand Down
4 changes: 3 additions & 1 deletion client/internal/wsreceiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package internal
import (
"context"
"fmt"
"sync"

"github.com/gorilla/websocket"
"github.com/open-telemetry/opamp-go/client/types"
Expand Down Expand Up @@ -32,13 +33,14 @@ func NewWSReceiver(
clientSyncedState *ClientSyncedState,
packagesStateProvider types.PackagesStateProvider,
capabilities protobufs.AgentCapabilities,
packageSyncMutex *sync.Mutex,
) *wsReceiver {
w := &wsReceiver{
conn: conn,
logger: logger,
sender: sender,
callbacks: callbacks,
processor: newReceivedProcessor(logger, callbacks, sender, clientSyncedState, packagesStateProvider, capabilities),
processor: newReceivedProcessor(logger, callbacks, sender, clientSyncedState, packagesStateProvider, capabilities, packageSyncMutex),
stopped: make(chan struct{}),
}

Expand Down
66 changes: 63 additions & 3 deletions client/internal/wsreceiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package internal
import (
"context"
"fmt"
"sync"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -87,7 +88,7 @@ func TestServerToAgentCommand(t *testing.T) {
}
sender := WSSender{}
srikanthccv marked this conversation as resolved.
Show resolved Hide resolved
capabilities := protobufs.AgentCapabilities_AgentCapabilities_AcceptsRestartCommand
receiver := NewWSReceiver(TestLogger{t}, callbacks, nil, &sender, &clientSyncedState, nil, capabilities)
receiver := NewWSReceiver(TestLogger{t}, callbacks, nil, &sender, &clientSyncedState, nil, capabilities, new(sync.Mutex))
receiver.processor.ProcessReceivedMessage(context.Background(), &protobufs.ServerToAgent{
Command: test.command,
})
Expand Down Expand Up @@ -141,7 +142,7 @@ func TestServerToAgentCommandExclusive(t *testing.T) {
},
}
clientSyncedState := ClientSyncedState{}
receiver := NewWSReceiver(TestLogger{t}, callbacks, nil, nil, &clientSyncedState, nil, test.capabilities)
receiver := NewWSReceiver(TestLogger{t}, callbacks, nil, nil, &clientSyncedState, nil, test.capabilities, new(sync.Mutex))
receiver.processor.ProcessReceivedMessage(context.Background(), &protobufs.ServerToAgent{
Command: &protobufs.ServerToAgentCommand{
Type: protobufs.CommandType_CommandType_Restart,
Expand Down Expand Up @@ -204,7 +205,7 @@ func TestReceiverLoopStop(t *testing.T) {
}
sender := WSSender{}
capabilities := protobufs.AgentCapabilities_AgentCapabilities_AcceptsRestartCommand
receiver := NewWSReceiver(TestLogger{t}, callbacks, conn, &sender, &clientSyncedState, nil, capabilities)
receiver := NewWSReceiver(TestLogger{t}, callbacks, conn, &sender, &clientSyncedState, nil, capabilities, new(sync.Mutex))
ctx, cancel := context.WithCancel(context.Background())

go func() {
Expand All @@ -217,3 +218,62 @@ func TestReceiverLoopStop(t *testing.T) {
return receiverLoopStopped.Load()
}, 2*time.Second, 100*time.Millisecond, "ReceiverLoop should stop when context is cancelled")
}

func TestWSPackageUpdatesInParallel(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
var messages atomic.Int32
var mux sync.Mutex
localPackageState := NewInMemPackagesStore()
tigrannajaryan marked this conversation as resolved.
Show resolved Hide resolved
callbacks := types.CallbacksStruct{
OnMessageFunc: func(ctx context.Context, msg *types.MessageData) {
err := msg.PackageSyncer.Sync(ctx)
assert.NoError(t, err)
messages.Add(1)
},
}
clientSyncedState := &ClientSyncedState{}
capabilities := protobufs.AgentCapabilities_AgentCapabilities_AcceptsPackages
sender := NewSender(&internal.NopLogger{})
receiver := NewWSReceiver(&internal.NopLogger{}, callbacks, nil, sender, clientSyncedState, localPackageState, capabilities, &mux)

receiver.processor.ProcessReceivedMessage(ctx,
&protobufs.ServerToAgent{
PackagesAvailable: &protobufs.PackagesAvailable{
Packages: map[string]*protobufs.PackageAvailable{
"package1": {
Type: protobufs.PackageType_PackageType_TopLevel,
Version: "1.0.0",
File: &protobufs.DownloadableFile{
DownloadUrl: "foo",
ContentHash: []byte{4, 5},
},
Hash: []byte{1, 2, 3},
},
},
AllPackagesHash: []byte{1, 2, 3, 4, 5},
},
})
receiver.processor.ProcessReceivedMessage(ctx,
&protobufs.ServerToAgent{
PackagesAvailable: &protobufs.PackagesAvailable{
Packages: map[string]*protobufs.PackageAvailable{
"package22": {
Type: protobufs.PackageType_PackageType_TopLevel,
Version: "1.0.0",
File: &protobufs.DownloadableFile{
DownloadUrl: "bar",
ContentHash: []byte{4, 5},
},
Hash: []byte{1, 2, 3},
},
},
AllPackagesHash: []byte{1, 2, 3, 4, 5},
},
})

assert.Eventually(t, func() bool {
return messages.Load() == 2
}, 2*time.Second, 100*time.Millisecond, "both messages must have been processed successfully")

cancel()
}
1 change: 1 addition & 0 deletions client/wsclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ func (c *wsClient) runOneCycle(ctx context.Context) {
&c.common.ClientSyncedState,
c.common.PackagesStateProvider,
c.common.Capabilities,
&c.common.PackageSyncMutex,
)

// When the wsclient is closed, the context passed to runOneCycle will be canceled.
Expand Down
Loading