From 2204615e3fec3d5131f3beee50fc0f51dc5bf107 Mon Sep 17 00:00:00 2001 From: Tigran Najaryan Date: Wed, 18 Dec 2024 13:40:50 -0500 Subject: [PATCH] Eliminate Callbacks interface The interface has the following downsides: - Impossible to define non-trivial default behavior. Here is an example where it was needed: https://github.com/open-telemetry/opamp-go/pull/269#discussion_r1874634261 - Adding new callbacks requires expanding the interface, which is a breaking change for existing client users. Getting rid of the interface and keeping just a struct for callbacks solves both problems: - Arbitrarily complex default behavior can be now defined on the struct if the user does not provide the particular callback func. - Adding new callback funcs is not a braking change, existing users won't be affected. --- client/clientimpl_test.go | 72 ++++---- client/internal/clientcommon.go | 5 +- client/internal/httpsender_test.go | 26 +-- client/internal/receivedprocessor.go | 170 +++++++++--------- client/internal/wsreceiver_test.go | 17 +- client/types/callbacks.go | 108 +++-------- client/wsclient.go | 6 +- client/wsclient_test.go | 18 +- internal/examples/agent/agent/agent.go | 16 +- .../supervisor/supervisor/supervisor.go | 12 +- 10 files changed, 195 insertions(+), 255 deletions(-) diff --git a/client/clientimpl_test.go b/client/clientimpl_test.go index a0328ef8..dea6bd6c 100644 --- a/client/clientimpl_test.go +++ b/client/clientimpl_test.go @@ -167,8 +167,8 @@ func TestOnConnectFail(t *testing.T) { testClients(t, func(t *testing.T, client OpAMPClient) { var connectErr atomic.Value settings := createNoServerSettings() - settings.Callbacks = types.CallbacksStruct{ - OnConnectFailedFunc: func(ctx context.Context, err error) { + settings.Callbacks = types.Callbacks{ + OnConnectFailed: func(ctx context.Context, err error) { connectErr.Store(err) }, } @@ -244,8 +244,8 @@ func TestConnectWithServer(t *testing.T) { // Start a client. var connected int64 settings := types.StartSettings{ - Callbacks: types.CallbacksStruct{ - OnConnectFunc: func(ctx context.Context) { + Callbacks: types.Callbacks{ + OnConnect: func(ctx context.Context) { atomic.StoreInt64(&connected, 1) }, }, @@ -282,12 +282,12 @@ func TestConnectWithServer503(t *testing.T) { var clientConnected int64 var connectErr atomic.Value settings := types.StartSettings{ - Callbacks: types.CallbacksStruct{ - OnConnectFunc: func(ctx context.Context) { + Callbacks: types.Callbacks{ + OnConnect: func(ctx context.Context) { atomic.StoreInt64(&clientConnected, 1) assert.Fail(t, "Client should not be able to connect") }, - OnConnectFailedFunc: func(ctx context.Context, err error) { + OnConnectFailed: func(ctx context.Context, err error) { connectErr.Store(err) }, }, @@ -484,11 +484,11 @@ func TestFirstStatusReport(t *testing.T) { // Start a client. var connected, remoteConfigReceived int64 settings := types.StartSettings{ - Callbacks: types.CallbacksStruct{ - OnConnectFunc: func(ctx context.Context) { + Callbacks: types.Callbacks{ + OnConnect: func(ctx context.Context) { atomic.AddInt64(&connected, 1) }, - OnMessageFunc: func(ctx context.Context, msg *types.MessageData) { + OnMessage: func(ctx context.Context, msg *types.MessageData) { // Verify that the client received exactly the remote config that // the Server sent. assert.True(t, proto.Equal(remoteConfig, msg.RemoteConfig)) @@ -537,8 +537,8 @@ func TestIncludesDetailsOnReconnect(t *testing.T) { var connected int64 settings := types.StartSettings{ - Callbacks: types.CallbacksStruct{ - OnConnectFunc: func(ctx context.Context) { + Callbacks: types.Callbacks{ + OnConnect: func(ctx context.Context) { atomic.AddInt64(&connected, 1) }, }, @@ -589,8 +589,8 @@ func TestSetEffectiveConfig(t *testing.T) { // Start a client. sendConfig := createEffectiveConfig() settings := types.StartSettings{ - Callbacks: types.CallbacksStruct{ - GetEffectiveConfigFunc: func(ctx context.Context) (*protobufs.EffectiveConfig, error) { + Callbacks: types.Callbacks{ + GetEffectiveConfig: func(ctx context.Context) (*protobufs.EffectiveConfig, error) { return sendConfig, nil }, }, @@ -822,8 +822,8 @@ func TestServerOfferConnectionSettings(t *testing.T) { // Start a client. settings := types.StartSettings{ - Callbacks: types.CallbacksStruct{ - OnMessageFunc: func(ctx context.Context, msg *types.MessageData) { + Callbacks: types.Callbacks{ + OnMessage: func(ctx context.Context, msg *types.MessageData) { assert.True(t, proto.Equal(metricsSettings, msg.OwnMetricsConnSettings)) assert.True(t, proto.Equal(tracesSettings, msg.OwnTracesConnSettings)) assert.True(t, proto.Equal(logsSettings, msg.OwnLogsConnSettings)) @@ -834,7 +834,7 @@ func TestServerOfferConnectionSettings(t *testing.T) { atomic.AddInt64(&gotOtherSettings, 1) }, - OnOpampConnectionSettingsFunc: func( + OnOpampConnectionSettings: func( ctx context.Context, settings *protobufs.OpAMPConnectionSettings, ) error { assert.True(t, proto.Equal(opampSettings, settings)) @@ -891,8 +891,8 @@ func TestClientRequestConnectionSettings(t *testing.T) { // Start a client. settings := types.StartSettings{ - Callbacks: types.CallbacksStruct{ - OnOpampConnectionSettingsFunc: func( + Callbacks: types.Callbacks{ + OnOpampConnectionSettings: func( ctx context.Context, settings *protobufs.OpAMPConnectionSettings, ) error { assert.True(t, proto.Equal(opampSettings, settings)) @@ -1073,8 +1073,8 @@ func TestReportEffectiveConfig(t *testing.T) { // Start a client. settings := types.StartSettings{ OpAMPServerURL: "ws://" + srv.Endpoint, - Callbacks: types.CallbacksStruct{ - GetEffectiveConfigFunc: func(ctx context.Context) (*protobufs.EffectiveConfig, error) { + Callbacks: types.Callbacks{ + GetEffectiveConfig: func(ctx context.Context) (*protobufs.EffectiveConfig, error) { return clientEffectiveConfig, nil }, }, @@ -1139,8 +1139,8 @@ func verifyRemoteConfigUpdate(t *testing.T, successCase bool, expectStatus *prot // Start a client. settings := types.StartSettings{ OpAMPServerURL: "ws://" + srv.Endpoint, - Callbacks: types.CallbacksStruct{ - OnMessageFunc: func(ctx context.Context, msg *types.MessageData) { + Callbacks: types.Callbacks{ + OnMessage: func(ctx context.Context, msg *types.MessageData) { if msg.RemoteConfig != nil { if successCase { client.SetRemoteConfigStatus( @@ -1355,8 +1355,8 @@ func verifyUpdatePackages(t *testing.T, testCase packageTestCase) { // Start a client. settings := types.StartSettings{ OpAMPServerURL: "ws://" + srv.Endpoint, - Callbacks: types.CallbacksStruct{ - OnMessageFunc: onMessageFunc, + Callbacks: types.Callbacks{ + OnMessage: onMessageFunc, }, PackagesStateProvider: localPackageState, Capabilities: protobufs.AgentCapabilities_AgentCapabilities_AcceptsPackages | @@ -1542,8 +1542,8 @@ func TestMissingCapabilities(t *testing.T) { // Start a client. settings := types.StartSettings{ - Callbacks: types.CallbacksStruct{ - OnMessageFunc: func(ctx context.Context, msg *types.MessageData) { + Callbacks: types.Callbacks{ + OnMessage: func(ctx context.Context, msg *types.MessageData) { // These fields must not be set since we did not define the capabilities to accept them. assert.Nil(t, msg.RemoteConfig) assert.Nil(t, msg.OwnLogsConnSettings) @@ -1552,7 +1552,7 @@ func TestMissingCapabilities(t *testing.T) { assert.Nil(t, msg.OtherConnSettings) assert.Nil(t, msg.PackagesAvailable) }, - OnOpampConnectionSettingsFunc: func( + OnOpampConnectionSettings: func( ctx context.Context, settings *protobufs.OpAMPConnectionSettings, ) error { assert.Fail(t, "should not be called since capability is not set to accept it") @@ -1613,7 +1613,7 @@ func TestMissingPackagesStateProvider(t *testing.T) { testClients(t, func(t *testing.T, client OpAMPClient) { // Start a client. settings := types.StartSettings{ - Callbacks: types.CallbacksStruct{}, + Callbacks: types.Callbacks{}, Capabilities: protobufs.AgentCapabilities_AgentCapabilities_AcceptsPackages | protobufs.AgentCapabilities_AgentCapabilities_ReportsPackageStatuses, } @@ -1624,7 +1624,7 @@ func TestMissingPackagesStateProvider(t *testing.T) { // Start a client. localPackageState := internal.NewInMemPackagesStore() settings = types.StartSettings{ - Callbacks: types.CallbacksStruct{}, + Callbacks: types.Callbacks{}, PackagesStateProvider: localPackageState, Capabilities: protobufs.AgentCapabilities_AgentCapabilities_AcceptsPackages, } @@ -1634,7 +1634,7 @@ func TestMissingPackagesStateProvider(t *testing.T) { // Start a client. settings = types.StartSettings{ - Callbacks: types.CallbacksStruct{}, + Callbacks: types.Callbacks{}, PackagesStateProvider: localPackageState, Capabilities: protobufs.AgentCapabilities_AgentCapabilities_ReportsPackageStatuses, } @@ -1668,8 +1668,8 @@ func TestOfferUpdatedVersion(t *testing.T) { // Start a client. settings := types.StartSettings{ OpAMPServerURL: "ws://" + srv.Endpoint, - Callbacks: types.CallbacksStruct{ - OnMessageFunc: onMessageFunc, + Callbacks: types.Callbacks{ + OnMessage: onMessageFunc, }, PackagesStateProvider: localPackageState, Capabilities: protobufs.AgentCapabilities_AgentCapabilities_AcceptsPackages | @@ -1747,8 +1747,8 @@ func TestReportCustomCapabilities(t *testing.T) { // Start a client. settings := types.StartSettings{ OpAMPServerURL: "ws://" + srv.Endpoint, - Callbacks: types.CallbacksStruct{ - OnMessageFunc: func(ctx context.Context, msg *types.MessageData) { + Callbacks: types.Callbacks{ + OnMessage: func(ctx context.Context, msg *types.MessageData) { clientRcvCustomMessage.Store(msg.CustomMessage) }, }, @@ -1843,7 +1843,7 @@ func TestReportCustomCapabilities(t *testing.T) { func TestSendCustomMessage(t *testing.T) { testClients(t, func(t *testing.T, client OpAMPClient) { settings := types.StartSettings{ - Callbacks: types.CallbacksStruct{}, + Callbacks: types.Callbacks{}, } prepareClient(t, &settings, client) clientCustomCapabilities := &protobufs.CustomCapabilities{ diff --git a/client/internal/clientcommon.go b/client/internal/clientcommon.go index 5369cf15..283e3b01 100644 --- a/client/internal/clientcommon.go +++ b/client/internal/clientcommon.go @@ -132,10 +132,7 @@ func (c *ClientCommon) PrepareStart( // Prepare callbacks. c.Callbacks = settings.Callbacks - if c.Callbacks == nil { - // Make sure it is always safe to call Callbacks. - c.Callbacks = types.CallbacksStruct{} - } + c.Callbacks.SetDefaults() if c.Capabilities&protobufs.AgentCapabilities_AgentCapabilities_ReportsHeartbeat != 0 && settings.HeartbeatInterval != nil { if err := c.sender.SetHeartbeatInterval(*settings.HeartbeatInterval); err != nil { diff --git a/client/internal/httpsender_test.go b/client/internal/httpsender_test.go index 191a8289..78ff2f87 100644 --- a/client/internal/httpsender_test.go +++ b/client/internal/httpsender_test.go @@ -12,11 +12,12 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" + "github.com/open-telemetry/opamp-go/client/types" sharedinternal "github.com/open-telemetry/opamp-go/internal" "github.com/open-telemetry/opamp-go/internal/testhelpers" "github.com/open-telemetry/opamp-go/protobufs" - "github.com/stretchr/testify/assert" ) func TestHTTPSenderRetryForStatusTooManyRequests(t *testing.T) { @@ -46,10 +47,10 @@ func TestHTTPSenderRetryForStatusTooManyRequests(t *testing.T) { }}, } }) - sender.callbacks = types.CallbacksStruct{ - OnConnectFunc: func(ctx context.Context) { + sender.callbacks = types.Callbacks{ + OnConnect: func(ctx context.Context) { }, - OnConnectFailedFunc: func(ctx context.Context, _ error) { + OnConnectFailed: func(ctx context.Context, _ error) { }, } sender.url = url @@ -163,10 +164,10 @@ func TestHTTPSenderRetryForFailedRequests(t *testing.T) { }}, } }) - sender.callbacks = types.CallbacksStruct{ - OnConnectFunc: func(ctx context.Context) { + sender.callbacks = types.Callbacks{ + OnConnect: func(ctx context.Context) { }, - OnConnectFailedFunc: func(ctx context.Context, _ error) { + OnConnectFailed: func(ctx context.Context, _ error) { }, } sender.url = url @@ -197,7 +198,8 @@ func TestRequestInstanceUidFlagReset(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) sender := NewHTTPSender(&sharedinternal.NopLogger{}) - sender.callbacks = types.CallbacksStruct{} + sender.callbacks = types.Callbacks{} + sender.callbacks.SetDefaults() // Set the RequestInstanceUid flag on the tracked state to request the server for a new ID to use. clientSyncedState := &ClientSyncedState{} @@ -248,8 +250,8 @@ func TestPackageUpdatesInParallel(t *testing.T) { var messages atomic.Int32 var mux sync.Mutex - sender.callbacks = types.CallbacksStruct{ - OnMessageFunc: func(ctx context.Context, msg *types.MessageData) { + sender.callbacks = types.Callbacks{ + OnMessage: func(ctx context.Context, msg *types.MessageData) { err := msg.PackageSyncer.Sync(ctx) assert.NoError(t, err) messages.Add(1) @@ -320,8 +322,8 @@ func TestPackageUpdatesWithError(t *testing.T) { localPackageState := types.PackagesStateProvider(nil) var messages atomic.Int32 var mux sync.Mutex - sender.callbacks = types.CallbacksStruct{ - OnMessageFunc: func(ctx context.Context, msg *types.MessageData) { + sender.callbacks = types.Callbacks{ + OnMessage: func(ctx context.Context, msg *types.MessageData) { // Make sure the call to Sync will return an error due to a nil PackageStateProvider err := msg.PackageSyncer.Sync(ctx) assert.Error(t, err) diff --git a/client/internal/receivedprocessor.go b/client/internal/receivedprocessor.go index a851ba15..aee05a81 100644 --- a/client/internal/receivedprocessor.go +++ b/client/internal/receivedprocessor.go @@ -57,119 +57,117 @@ func newReceivedProcessor( // the received message and performs any processing necessary based on what fields are set. // This function will call any relevant callbacks. func (r *receivedProcessor) ProcessReceivedMessage(ctx context.Context, msg *protobufs.ServerToAgent) { - if r.callbacks != nil { - // Note that anytime we add a new command capabilities we need to add a check here. - // This is because we want to ignore commands that the agent does not have the capability - // to process. - if msg.Command != nil { - if r.hasCapability(protobufs.AgentCapabilities_AgentCapabilities_AcceptsRestartCommand) { - r.rcvCommand(ctx, msg.Command) - // If a command message exists, other messages will be ignored - return - } else { - r.logger.Debugf(ctx, "Ignoring Command, agent does not have AcceptsCommands capability") - } + // Note that anytime we add a new command capabilities we need to add a check here. + // This is because we want to ignore commands that the agent does not have the capability + // to process. + if msg.Command != nil { + if r.hasCapability(protobufs.AgentCapabilities_AgentCapabilities_AcceptsRestartCommand) { + r.rcvCommand(ctx, msg.Command) + // If a command message exists, other messages will be ignored + return + } else { + r.logger.Debugf(ctx, "Ignoring Command, agent does not have AcceptsCommands capability") } + } - scheduled, err := r.rcvFlags(ctx, protobufs.ServerToAgentFlags(msg.Flags)) - if err != nil { - r.logger.Errorf(ctx, "cannot processed received flags:%v", err) - } + scheduled, err := r.rcvFlags(ctx, protobufs.ServerToAgentFlags(msg.Flags)) + if err != nil { + r.logger.Errorf(ctx, "cannot processed received flags:%v", err) + } - msgData := &types.MessageData{} + msgData := &types.MessageData{} - if msg.RemoteConfig != nil { - if r.hasCapability(protobufs.AgentCapabilities_AgentCapabilities_AcceptsRemoteConfig) { - msgData.RemoteConfig = msg.RemoteConfig - } else { - r.logger.Debugf(ctx, "Ignoring RemoteConfig, agent does not have AcceptsRemoteConfig capability") - } + if msg.RemoteConfig != nil { + if r.hasCapability(protobufs.AgentCapabilities_AgentCapabilities_AcceptsRemoteConfig) { + msgData.RemoteConfig = msg.RemoteConfig + } else { + r.logger.Debugf(ctx, "Ignoring RemoteConfig, agent does not have AcceptsRemoteConfig capability") } + } - if msg.ConnectionSettings != nil { - if msg.ConnectionSettings.OwnMetrics != nil { - if r.hasCapability(protobufs.AgentCapabilities_AgentCapabilities_ReportsOwnMetrics) { - msgData.OwnMetricsConnSettings = msg.ConnectionSettings.OwnMetrics - } else { - r.logger.Debugf(ctx, "Ignoring OwnMetrics, agent does not have ReportsOwnMetrics capability") - } - } - - if msg.ConnectionSettings.OwnTraces != nil { - if r.hasCapability(protobufs.AgentCapabilities_AgentCapabilities_ReportsOwnTraces) { - msgData.OwnTracesConnSettings = msg.ConnectionSettings.OwnTraces - } else { - r.logger.Debugf(ctx, "Ignoring OwnTraces, agent does not have ReportsOwnTraces capability") - } - } - - if msg.ConnectionSettings.OwnLogs != nil { - if r.hasCapability(protobufs.AgentCapabilities_AgentCapabilities_ReportsOwnLogs) { - msgData.OwnLogsConnSettings = msg.ConnectionSettings.OwnLogs - } else { - r.logger.Debugf(ctx, "Ignoring OwnLogs, agent does not have ReportsOwnLogs capability") - } + if msg.ConnectionSettings != nil { + if msg.ConnectionSettings.OwnMetrics != nil { + if r.hasCapability(protobufs.AgentCapabilities_AgentCapabilities_ReportsOwnMetrics) { + msgData.OwnMetricsConnSettings = msg.ConnectionSettings.OwnMetrics + } else { + r.logger.Debugf(ctx, "Ignoring OwnMetrics, agent does not have ReportsOwnMetrics capability") } + } - if msg.ConnectionSettings.OtherConnections != nil { - if r.hasCapability(protobufs.AgentCapabilities_AgentCapabilities_AcceptsOtherConnectionSettings) { - msgData.OtherConnSettings = msg.ConnectionSettings.OtherConnections - } else { - r.logger.Debugf(ctx, "Ignoring OtherConnections, agent does not have AcceptsOtherConnectionSettings capability") - } + if msg.ConnectionSettings.OwnTraces != nil { + if r.hasCapability(protobufs.AgentCapabilities_AgentCapabilities_ReportsOwnTraces) { + msgData.OwnTracesConnSettings = msg.ConnectionSettings.OwnTraces + } else { + r.logger.Debugf(ctx, "Ignoring OwnTraces, agent does not have ReportsOwnTraces capability") } } - if msg.PackagesAvailable != nil { - if r.hasCapability(protobufs.AgentCapabilities_AgentCapabilities_AcceptsPackages) { - msgData.PackagesAvailable = msg.PackagesAvailable - msgData.PackageSyncer = NewPackagesSyncer( - r.logger, - msgData.PackagesAvailable, - r.sender, - r.clientSyncedState, - r.packagesStateProvider, - r.packageSyncMutex, - ) + if msg.ConnectionSettings.OwnLogs != nil { + if r.hasCapability(protobufs.AgentCapabilities_AgentCapabilities_ReportsOwnLogs) { + msgData.OwnLogsConnSettings = msg.ConnectionSettings.OwnLogs } else { - r.logger.Debugf(ctx, "Ignoring PackagesAvailable, agent does not have AcceptsPackages capability") + r.logger.Debugf(ctx, "Ignoring OwnLogs, agent does not have ReportsOwnLogs capability") } } - if msg.AgentIdentification != nil { - err := r.rcvAgentIdentification(ctx, msg.AgentIdentification) - if err != nil { - r.logger.Errorf(ctx, "Failed to set agent ID: %v", err) + if msg.ConnectionSettings.OtherConnections != nil { + if r.hasCapability(protobufs.AgentCapabilities_AgentCapabilities_AcceptsOtherConnectionSettings) { + msgData.OtherConnSettings = msg.ConnectionSettings.OtherConnections } else { - msgData.AgentIdentification = msg.AgentIdentification + r.logger.Debugf(ctx, "Ignoring OtherConnections, agent does not have AcceptsOtherConnectionSettings capability") } } + } - if msg.CustomCapabilities != nil { - msgData.CustomCapabilities = msg.CustomCapabilities + if msg.PackagesAvailable != nil { + if r.hasCapability(protobufs.AgentCapabilities_AgentCapabilities_AcceptsPackages) { + msgData.PackagesAvailable = msg.PackagesAvailable + msgData.PackageSyncer = NewPackagesSyncer( + r.logger, + msgData.PackagesAvailable, + r.sender, + r.clientSyncedState, + r.packagesStateProvider, + r.packageSyncMutex, + ) + } else { + r.logger.Debugf(ctx, "Ignoring PackagesAvailable, agent does not have AcceptsPackages capability") } + } - if msg.CustomMessage != nil { - // ensure that the agent supports the capability - if r.clientSyncedState.HasCustomCapability(msg.CustomMessage.Capability) { - msgData.CustomMessage = msg.CustomMessage - } else { - r.logger.Debugf(ctx, "Ignoring CustomMessage, agent does not have %s capability", msg.CustomMessage.Capability) - } + if msg.AgentIdentification != nil { + err := r.rcvAgentIdentification(ctx, msg.AgentIdentification) + if err != nil { + r.logger.Errorf(ctx, "Failed to set agent ID: %v", err) + } else { + msgData.AgentIdentification = msg.AgentIdentification } + } - r.callbacks.OnMessage(ctx, msgData) - - r.rcvOpampConnectionSettings(ctx, msg.ConnectionSettings) + if msg.CustomCapabilities != nil { + msgData.CustomCapabilities = msg.CustomCapabilities + } - if scheduled { - r.sender.ScheduleSend() + if msg.CustomMessage != nil { + // ensure that the agent supports the capability + if r.clientSyncedState.HasCustomCapability(msg.CustomMessage.Capability) { + msgData.CustomMessage = msg.CustomMessage + } else { + r.logger.Debugf(ctx, "Ignoring CustomMessage, agent does not have %s capability", msg.CustomMessage.Capability) } } - err := msg.GetErrorResponse() - if err != nil { - r.processErrorResponse(ctx, err) + r.callbacks.OnMessage(ctx, msgData) + + r.rcvOpampConnectionSettings(ctx, msg.ConnectionSettings) + + if scheduled { + r.sender.ScheduleSend() + } + + errResponse := msg.GetErrorResponse() + if errResponse != nil { + r.processErrorResponse(ctx, errResponse) } } diff --git a/client/internal/wsreceiver_test.go b/client/internal/wsreceiver_test.go index a1c21dc7..c8ade427 100644 --- a/client/internal/wsreceiver_test.go +++ b/client/internal/wsreceiver_test.go @@ -72,8 +72,8 @@ func TestServerToAgentCommand(t *testing.T) { t.Run(fmt.Sprint(i), func(t *testing.T) { action := none - callbacks := types.CallbacksStruct{ - OnCommandFunc: func(ctx context.Context, command *protobufs.ServerToAgentCommand) error { + callbacks := types.Callbacks{ + OnCommand: func(ctx context.Context, command *protobufs.ServerToAgentCommand) error { switch command.Type { case protobufs.CommandType_CommandType_Restart: action = restart @@ -83,6 +83,7 @@ func TestServerToAgentCommand(t *testing.T) { return nil }, } + callbacks.SetDefaults() clientSyncedState := ClientSyncedState{ remoteConfigStatus: &protobufs.RemoteConfigStatus{}, } @@ -132,12 +133,12 @@ func TestServerToAgentCommandExclusive(t *testing.T) { calledCommand := false calledOnMessageConfig := false - callbacks := types.CallbacksStruct{ - OnCommandFunc: func(ctx context.Context, command *protobufs.ServerToAgentCommand) error { + callbacks := types.Callbacks{ + OnCommand: func(ctx context.Context, command *protobufs.ServerToAgentCommand) error { calledCommand = true return nil }, - OnMessageFunc: func(ctx context.Context, msg *types.MessageData) { + OnMessage: func(ctx context.Context, msg *types.MessageData) { calledOnMessageConfig = true }, } @@ -199,7 +200,7 @@ func TestReceiverLoopStop(t *testing.T) { var receiverLoopStopped atomic.Bool - callbacks := types.CallbacksStruct{} + callbacks := types.Callbacks{} clientSyncedState := ClientSyncedState{ remoteConfigStatus: &protobufs.RemoteConfigStatus{}, } @@ -236,8 +237,8 @@ func TestWSPackageUpdatesInParallel(t *testing.T) { <-blockSyncCh } } - callbacks := types.CallbacksStruct{ - OnMessageFunc: func(ctx context.Context, msg *types.MessageData) { + callbacks := types.Callbacks{ + OnMessage: func(ctx context.Context, msg *types.MessageData) { err := msg.PackageSyncer.Sync(ctx) assert.NoError(t, err) messages.Add(1) diff --git a/client/types/callbacks.go b/client/types/callbacks.go index 662c4707..02cef82d 100644 --- a/client/types/callbacks.go +++ b/client/types/callbacks.go @@ -48,23 +48,23 @@ type MessageData struct { // Callbacks is an interface for the Client to handle messages from the Server. // Callbacks are expected to honour the context passed to them, meaning they should be aware of cancellations. -type Callbacks interface { +type Callbacks struct { // OnConnect is called when the connection is successfully established to the Server. // May be called after Start() is called and every time a connection is established to the Server. // For WebSocket clients this is called after the handshake is completed without any error. // For HTTP clients this is called for any request if the response status is OK. - OnConnect(ctx context.Context) + OnConnect func(ctx context.Context) // OnConnectFailed is called when the connection to the Server cannot be established. // May be called after Start() is called and tries to connect to the Server. // May also be called if the connection is lost and reconnection attempt fails. - OnConnectFailed(ctx context.Context, err error) + OnConnectFailed func(ctx context.Context, err error) // OnError is called when the Server reports an error in response to some previously // sent request. Useful for logging purposes. The Agent should not attempt to process // the error by reconnecting or retrying previous operations. The client handles the // ErrorResponse_UNAVAILABLE case internally by performing retries as necessary. - OnError(ctx context.Context, err *protobufs.ServerErrorResponse) + OnError func(ctx context.Context, err *protobufs.ServerErrorResponse) // OnMessage is called when the Agent receives a message that needs processing. // See MessageData definition for the data that may be available for processing. @@ -74,7 +74,7 @@ type Callbacks interface { // These functions may also be called after OnMessage returns. This is advisable // if processing can take a long time. In that case returning quickly is preferable // to avoid blocking the OpAMPClient. - OnMessage(ctx context.Context, msg *MessageData) + OnMessage func(ctx context.Context, msg *MessageData) // OnOpampConnectionSettings is called when the Agent receives an OpAMP // connection settings offer from the Server. Typically, the settings can specify @@ -87,7 +87,7 @@ type Callbacks interface { // // Only one OnOpampConnectionSettings call can be active at any time. // See OnRemoteConfig for the behavior. - OnOpampConnectionSettings( + OnOpampConnectionSettings func( ctx context.Context, settings *protobufs.OpAMPConnectionSettings, ) error @@ -101,96 +101,40 @@ type Callbacks interface { // returned a success or error. // The Agent must remember this RemoteConfigStatus and supply in the future // calls to Start() in StartSettings.RemoteConfigStatus. - SaveRemoteConfigStatus(ctx context.Context, status *protobufs.RemoteConfigStatus) + SaveRemoteConfigStatus func(ctx context.Context, status *protobufs.RemoteConfigStatus) // GetEffectiveConfig returns the current effective config. Only one // GetEffectiveConfig call can be active at any time. Until GetEffectiveConfig // returns it will not be called again. - GetEffectiveConfig(ctx context.Context) (*protobufs.EffectiveConfig, error) + GetEffectiveConfig func(ctx context.Context) (*protobufs.EffectiveConfig, error) // OnCommand is called when the Server requests that the connected Agent perform a command. - OnCommand(ctx context.Context, command *protobufs.ServerToAgentCommand) error + OnCommand func(ctx context.Context, command *protobufs.ServerToAgentCommand) error } -// CallbacksStruct is a struct that implements Callbacks interface and allows -// to override only the methods that are needed. If a method is not overridden then it is a no-op. -type CallbacksStruct struct { - OnConnectFunc func(ctx context.Context) - OnConnectFailedFunc func(ctx context.Context, err error) - OnErrorFunc func(ctx context.Context, err *protobufs.ServerErrorResponse) - - OnMessageFunc func(ctx context.Context, msg *MessageData) - - OnOpampConnectionSettingsFunc func( - ctx context.Context, - settings *protobufs.OpAMPConnectionSettings, - ) error - - OnCommandFunc func(ctx context.Context, command *protobufs.ServerToAgentCommand) error - - SaveRemoteConfigStatusFunc func(ctx context.Context, status *protobufs.RemoteConfigStatus) - GetEffectiveConfigFunc func(ctx context.Context) (*protobufs.EffectiveConfig, error) -} - -var _ Callbacks = (*CallbacksStruct)(nil) - -// OnConnect implements Callbacks.OnConnect. -func (c CallbacksStruct) OnConnect(ctx context.Context) { - if c.OnConnectFunc != nil { - c.OnConnectFunc(ctx) +func (c *Callbacks) SetDefaults() { + if c.OnConnect == nil { + c.OnConnect = func(ctx context.Context) {} } -} - -// OnConnectFailed implements Callbacks.OnConnectFailed. -func (c CallbacksStruct) OnConnectFailed(ctx context.Context, err error) { - if c.OnConnectFailedFunc != nil { - c.OnConnectFailedFunc(ctx, err) + if c.OnConnectFailed == nil { + c.OnConnectFailed = func(ctx context.Context, err error) {} } -} - -// OnError implements Callbacks.OnError. -func (c CallbacksStruct) OnError(ctx context.Context, err *protobufs.ServerErrorResponse) { - if c.OnErrorFunc != nil { - c.OnErrorFunc(ctx, err) + if c.OnError == nil { + c.OnError = func(ctx context.Context, err *protobufs.ServerErrorResponse) {} } -} - -// OnMessage implements Callbacks.OnMessage. -func (c CallbacksStruct) OnMessage(ctx context.Context, msg *MessageData) { - if c.OnMessageFunc != nil { - c.OnMessageFunc(ctx, msg) + if c.OnMessage == nil { + c.OnMessage = func(ctx context.Context, msg *MessageData) {} } -} - -// SaveRemoteConfigStatus implements Callbacks.SaveRemoteConfigStatus. -func (c CallbacksStruct) SaveRemoteConfigStatus(ctx context.Context, status *protobufs.RemoteConfigStatus) { - if c.SaveRemoteConfigStatusFunc != nil { - c.SaveRemoteConfigStatusFunc(ctx, status) + if c.OnOpampConnectionSettings == nil { + c.OnOpampConnectionSettings = func(ctx context.Context, settings *protobufs.OpAMPConnectionSettings) error { return nil } } -} - -// GetEffectiveConfig implements Callbacks.GetEffectiveConfig. -func (c CallbacksStruct) GetEffectiveConfig(ctx context.Context) (*protobufs.EffectiveConfig, error) { - if c.GetEffectiveConfigFunc != nil { - return c.GetEffectiveConfigFunc(ctx) + if c.OnCommand == nil { + c.OnCommand = func(ctx context.Context, command *protobufs.ServerToAgentCommand) error { return nil } } - return nil, nil -} - -// OnOpampConnectionSettings implements Callbacks.OnOpampConnectionSettings. -func (c CallbacksStruct) OnOpampConnectionSettings( - ctx context.Context, settings *protobufs.OpAMPConnectionSettings, -) error { - if c.OnOpampConnectionSettingsFunc != nil { - return c.OnOpampConnectionSettingsFunc(ctx, settings) + if c.GetEffectiveConfig == nil { + c.GetEffectiveConfig = func(ctx context.Context) (*protobufs.EffectiveConfig, error) { return nil, nil } } - return nil -} - -// OnCommand implements Callbacks.OnCommand. -func (c CallbacksStruct) OnCommand(ctx context.Context, command *protobufs.ServerToAgentCommand) error { - if c.OnCommandFunc != nil { - return c.OnCommandFunc(ctx, command) + if c.SaveRemoteConfigStatus == nil { + c.SaveRemoteConfigStatus = func(ctx context.Context, status *protobufs.RemoteConfigStatus) {} } - return nil } diff --git a/client/wsclient.go b/client/wsclient.go index dc703dcb..6219a28f 100644 --- a/client/wsclient.go +++ b/client/wsclient.go @@ -158,7 +158,7 @@ func (c *wsClient) tryConnectOnce(ctx context.Context) (retryAfter sharedinterna var resp *http.Response conn, resp, err := c.dialer.DialContext(ctx, c.url.String(), c.getHeader()) if err != nil { - if c.common.Callbacks != nil && !c.common.IsStopping() { + if !c.common.IsStopping() { c.common.Callbacks.OnConnectFailed(ctx, err) } if resp != nil { @@ -192,9 +192,7 @@ func (c *wsClient) tryConnectOnce(ctx context.Context) (retryAfter sharedinterna c.connMutex.Lock() c.conn = conn c.connMutex.Unlock() - if c.common.Callbacks != nil { - c.common.Callbacks.OnConnect(ctx) - } + c.common.Callbacks.OnConnect(ctx) return sharedinternal.OptionalDuration{Defined: false}, nil } diff --git a/client/wsclient_test.go b/client/wsclient_test.go index a8322a9f..cc9fd87d 100644 --- a/client/wsclient_test.go +++ b/client/wsclient_test.go @@ -162,11 +162,11 @@ func TestDisconnectWSByServer(t *testing.T) { var connected int64 var connectErr atomic.Value settings := types.StartSettings{ - Callbacks: types.CallbacksStruct{ - OnConnectFunc: func(ctx context.Context) { + Callbacks: types.Callbacks{ + OnConnect: func(ctx context.Context) { atomic.StoreInt64(&connected, 1) }, - OnConnectFailedFunc: func(ctx context.Context, err error) { + OnConnectFailed: func(ctx context.Context, err error) { connectErr.Store(err) }, }, @@ -212,13 +212,13 @@ func TestVerifyWSCompress(t *testing.T) { // Start an OpAMP/WebSocket client. var clientGotRemoteConfig atomic.Value settings := types.StartSettings{ - Callbacks: types.CallbacksStruct{ - OnMessageFunc: func(ctx context.Context, msg *types.MessageData) { + Callbacks: types.Callbacks{ + OnMessage: func(ctx context.Context, msg *types.MessageData) { if msg.RemoteConfig != nil { clientGotRemoteConfig.Store(msg.RemoteConfig) } }, - GetEffectiveConfigFunc: func(ctx context.Context) (*protobufs.EffectiveConfig, error) { + GetEffectiveConfig: func(ctx context.Context) (*protobufs.EffectiveConfig, error) { // If the client already received a remote config offer make sure to report // the effective config back to the server. var effCfg []byte @@ -355,11 +355,11 @@ func TestRedirectWS(t *testing.T) { var connected int64 var connectErr atomic.Value settings := types.StartSettings{ - Callbacks: types.CallbacksStruct{ - OnConnectFunc: func(ctx context.Context) { + Callbacks: types.Callbacks{ + OnConnect: func(ctx context.Context) { atomic.StoreInt64(&connected, 1) }, - OnConnectFailedFunc: func(ctx context.Context, err error) { + OnConnectFailed: func(ctx context.Context, err error) { if err != websocket.ErrBadHandshake { connectErr.Store(err) } diff --git a/internal/examples/agent/agent/agent.go b/internal/examples/agent/agent/agent.go index 41dfcab1..7b5bfd02 100644 --- a/internal/examples/agent/agent/agent.go +++ b/internal/examples/agent/agent/agent.go @@ -106,24 +106,24 @@ func (agent *Agent) connect() error { OpAMPServerURL: "wss://127.0.0.1:4320/v1/opamp", TLSConfig: tlsConfig, InstanceUid: types.InstanceUid(agent.instanceId), - Callbacks: types.CallbacksStruct{ - OnConnectFunc: func(ctx context.Context) { + Callbacks: types.Callbacks{ + OnConnect: func(ctx context.Context) { agent.logger.Debugf(ctx, "Connected to the server.") }, - OnConnectFailedFunc: func(ctx context.Context, err error) { + OnConnectFailed: func(ctx context.Context, err error) { agent.logger.Errorf(ctx, "Failed to connect to the server: %v", err) }, - OnErrorFunc: func(ctx context.Context, err *protobufs.ServerErrorResponse) { + OnError: func(ctx context.Context, err *protobufs.ServerErrorResponse) { agent.logger.Errorf(ctx, "Server returned an error response: %v", err.ErrorMessage) }, - SaveRemoteConfigStatusFunc: func(_ context.Context, status *protobufs.RemoteConfigStatus) { + SaveRemoteConfigStatus: func(_ context.Context, status *protobufs.RemoteConfigStatus) { agent.remoteConfigStatus = status }, - GetEffectiveConfigFunc: func(ctx context.Context) (*protobufs.EffectiveConfig, error) { + GetEffectiveConfig: func(ctx context.Context) (*protobufs.EffectiveConfig, error) { return agent.composeEffectiveConfig(), nil }, - OnMessageFunc: agent.onMessage, - OnOpampConnectionSettingsFunc: agent.onOpampConnectionSettings, + OnMessage: agent.onMessage, + OnOpampConnectionSettings: agent.onOpampConnectionSettings, }, RemoteConfigStatus: agent.remoteConfigStatus, Capabilities: protobufs.AgentCapabilities_AgentCapabilities_AcceptsRemoteConfig | diff --git a/internal/examples/supervisor/supervisor/supervisor.go b/internal/examples/supervisor/supervisor/supervisor.go index 1d794b18..18c014ca 100644 --- a/internal/examples/supervisor/supervisor/supervisor.go +++ b/internal/examples/supervisor/supervisor/supervisor.go @@ -138,20 +138,20 @@ func (s *Supervisor) startOpAMP() error { InsecureSkipVerify: true, }, InstanceUid: types.InstanceUid(s.instanceId), - Callbacks: types.CallbacksStruct{ - OnConnectFunc: func(ctx context.Context) { + Callbacks: types.Callbacks{ + OnConnect: func(ctx context.Context) { s.logger.Debugf(ctx, "Connected to the server.") }, - OnConnectFailedFunc: func(ctx context.Context, err error) { + OnConnectFailed: func(ctx context.Context, err error) { s.logger.Errorf(ctx, "Failed to connect to the server: %v", err) }, - OnErrorFunc: func(ctx context.Context, err *protobufs.ServerErrorResponse) { + OnError: func(ctx context.Context, err *protobufs.ServerErrorResponse) { s.logger.Errorf(ctx, "Server returned an error response: %v", err.ErrorMessage) }, - GetEffectiveConfigFunc: func(ctx context.Context) (*protobufs.EffectiveConfig, error) { + GetEffectiveConfig: func(ctx context.Context) (*protobufs.EffectiveConfig, error) { return s.createEffectiveConfigMsg(), nil }, - OnMessageFunc: s.onMessage, + OnMessage: s.onMessage, }, Capabilities: protobufs.AgentCapabilities_AgentCapabilities_AcceptsRemoteConfig | protobufs.AgentCapabilities_AgentCapabilities_ReportsRemoteConfig |