Skip to content

Commit

Permalink
Reintroduce changes of PR #120
Browse files Browse the repository at this point in the history
Signed-off-by: Paschalis Tsilias <[email protected]>
  • Loading branch information
tpaschalis committed Jun 25, 2024
1 parent 8f7a652 commit 2bf6a4d
Show file tree
Hide file tree
Showing 8 changed files with 29 additions and 5 deletions.
1 change: 1 addition & 0 deletions client/httpclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ func (c *httpClient) runUntilStopped(ctx context.Context) {
&c.common.ClientSyncedState,
c.common.PackagesStateProvider,
c.common.Capabilities,
&c.common.PackageSyncMutex,
)
}

Expand Down
3 changes: 3 additions & 0 deletions client/internal/clientcommon.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ type ClientCommon struct {
// PackagesStateProvider provides access to the local state of packages.
PackagesStateProvider types.PackagesStateProvider

// PackageSyncMutex makes sure only one package syncing operation happens at a time.
PackageSyncMutex sync.Mutex

// The transport-specific sender.
sender Sender

Expand Down
4 changes: 3 additions & 1 deletion client/internal/httpsender.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"fmt"
"io"
"net/http"
"sync"
"sync/atomic"
"time"

Expand Down Expand Up @@ -91,10 +92,11 @@ func (h *HTTPSender) Run(
clientSyncedState *ClientSyncedState,
packagesStateProvider types.PackagesStateProvider,
capabilities protobufs.AgentCapabilities,
packageSyncMutex *sync.Mutex,
) {
h.url = url
h.callbacks = callbacks
h.receiveProcessor = newReceivedProcessor(h.logger, callbacks, h, clientSyncedState, packagesStateProvider, capabilities)
h.receiveProcessor = newReceivedProcessor(h.logger, callbacks, h, clientSyncedState, packagesStateProvider, capabilities, packageSyncMutex)

for {
pollingTimer := time.NewTimer(time.Millisecond * time.Duration(atomic.LoadInt64(&h.pollingIntervalMs)))
Expand Down
7 changes: 7 additions & 0 deletions client/internal/packagessyncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"net/http"
"sync"

"github.com/open-telemetry/opamp-go/client/types"
"github.com/open-telemetry/opamp-go/protobufs"
Expand All @@ -20,6 +21,7 @@ type packagesSyncer struct {
sender Sender

statuses *protobufs.PackageStatuses
mut *sync.Mutex
doneCh chan struct{}
}

Expand All @@ -30,6 +32,7 @@ func NewPackagesSyncer(
sender Sender,
clientSyncedState *ClientSyncedState,
packagesStateProvider types.PackagesStateProvider,
mut *sync.Mutex,
) *packagesSyncer {
return &packagesSyncer{
logger: logger,
Expand All @@ -38,6 +41,7 @@ func NewPackagesSyncer(
clientSyncedState: clientSyncedState,
localState: packagesStateProvider,
doneCh: make(chan struct{}),
mut: mut,
}
}

Expand All @@ -49,6 +53,7 @@ func (s *packagesSyncer) Sync(ctx context.Context) error {
}()

// Prepare package statuses.
s.mut.Lock()
if err := s.initStatuses(); err != nil {
return err
}
Expand Down Expand Up @@ -99,6 +104,8 @@ func (s *packagesSyncer) initStatuses() error {

// doSync performs the actual syncing process.
func (s *packagesSyncer) doSync(ctx context.Context) {
defer s.mut.Unlock()

hash, err := s.localState.AllPackagesHash()
if err != nil {
s.logger.Errorf(ctx, "Package syncing failed: %V", err)
Expand Down
7 changes: 7 additions & 0 deletions client/internal/receivedprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package internal
import (
"context"
"fmt"
"sync"

"github.com/open-telemetry/opamp-go/client/types"
"github.com/open-telemetry/opamp-go/protobufs"
Expand All @@ -24,6 +25,9 @@ type receivedProcessor struct {

packagesStateProvider types.PackagesStateProvider

// packageSyncMutex protects against multiple package syncing operations at the same time.
packageSyncMutex *sync.Mutex

// Agent's capabilities defined at Start() time.
capabilities protobufs.AgentCapabilities
}
Expand All @@ -35,6 +39,7 @@ func newReceivedProcessor(
clientSyncedState *ClientSyncedState,
packagesStateProvider types.PackagesStateProvider,
capabilities protobufs.AgentCapabilities,
packageSyncMutex *sync.Mutex,
) receivedProcessor {
return receivedProcessor{
logger: logger,
Expand All @@ -43,6 +48,7 @@ func newReceivedProcessor(
clientSyncedState: clientSyncedState,
packagesStateProvider: packagesStateProvider,
capabilities: capabilities,
packageSyncMutex: packageSyncMutex,
}
}

Expand Down Expand Up @@ -122,6 +128,7 @@ func (r *receivedProcessor) ProcessReceivedMessage(ctx context.Context, msg *pro
r.sender,
r.clientSyncedState,
r.packagesStateProvider,
r.packageSyncMutex,
)
} else {
r.logger.Debugf(ctx, "Ignoring PackagesAvailable, agent does not have AcceptsPackages capability")
Expand Down
4 changes: 3 additions & 1 deletion client/internal/wsreceiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package internal
import (
"context"
"fmt"
"sync"

"github.com/gorilla/websocket"
"github.com/open-telemetry/opamp-go/client/types"
Expand Down Expand Up @@ -32,13 +33,14 @@ func NewWSReceiver(
clientSyncedState *ClientSyncedState,
packagesStateProvider types.PackagesStateProvider,
capabilities protobufs.AgentCapabilities,
packageSyncMutex *sync.Mutex,
) *wsReceiver {
w := &wsReceiver{
conn: conn,
logger: logger,
sender: sender,
callbacks: callbacks,
processor: newReceivedProcessor(logger, callbacks, sender, clientSyncedState, packagesStateProvider, capabilities),
processor: newReceivedProcessor(logger, callbacks, sender, clientSyncedState, packagesStateProvider, capabilities, packageSyncMutex),
stopped: make(chan struct{}),
}

Expand Down
7 changes: 4 additions & 3 deletions client/internal/wsreceiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package internal
import (
"context"
"fmt"
"sync"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -87,7 +88,7 @@ func TestServerToAgentCommand(t *testing.T) {
}
sender := WSSender{}
capabilities := protobufs.AgentCapabilities_AgentCapabilities_AcceptsRestartCommand
receiver := NewWSReceiver(TestLogger{t}, callbacks, nil, &sender, &clientSyncedState, nil, capabilities)
receiver := NewWSReceiver(TestLogger{t}, callbacks, nil, &sender, &clientSyncedState, nil, capabilities, new(sync.Mutex))
receiver.processor.ProcessReceivedMessage(context.Background(), &protobufs.ServerToAgent{
Command: test.command,
})
Expand Down Expand Up @@ -141,7 +142,7 @@ func TestServerToAgentCommandExclusive(t *testing.T) {
},
}
clientSyncedState := ClientSyncedState{}
receiver := NewWSReceiver(TestLogger{t}, callbacks, nil, nil, &clientSyncedState, nil, test.capabilities)
receiver := NewWSReceiver(TestLogger{t}, callbacks, nil, nil, &clientSyncedState, nil, test.capabilities, new(sync.Mutex))
receiver.processor.ProcessReceivedMessage(context.Background(), &protobufs.ServerToAgent{
Command: &protobufs.ServerToAgentCommand{
Type: protobufs.CommandType_CommandType_Restart,
Expand Down Expand Up @@ -204,7 +205,7 @@ func TestReceiverLoopStop(t *testing.T) {
}
sender := WSSender{}
capabilities := protobufs.AgentCapabilities_AgentCapabilities_AcceptsRestartCommand
receiver := NewWSReceiver(TestLogger{t}, callbacks, conn, &sender, &clientSyncedState, nil, capabilities)
receiver := NewWSReceiver(TestLogger{t}, callbacks, conn, &sender, &clientSyncedState, nil, capabilities, new(sync.Mutex))
ctx, cancel := context.WithCancel(context.Background())

go func() {
Expand Down
1 change: 1 addition & 0 deletions client/wsclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ func (c *wsClient) runOneCycle(ctx context.Context) {
&c.common.ClientSyncedState,
c.common.PackagesStateProvider,
c.common.Capabilities,
&c.common.PackageSyncMutex,
)

// When the wsclient is closed, the context passed to runOneCycle will be canceled.
Expand Down

0 comments on commit 2bf6a4d

Please sign in to comment.