diff --git a/engine/access/rest/websockets/controller.go b/engine/access/rest/websockets/controller.go index fb88106d86b..854df073710 100644 --- a/engine/access/rest/websockets/controller.go +++ b/engine/access/rest/websockets/controller.go @@ -157,21 +157,27 @@ func (c *Controller) readMessages(ctx context.Context) error { return err } - c.writeBaseErrorResponse(ctx, err, "") - c.logger.Error().Err(err).Msg("error reading message") + c.writeErrorResponse( + ctx, + err, + wrapErrorMessage(ConnectionRead, "error reading from conn", "", "", "")) continue } validatedMsg, err := c.parseAndValidateMessage(message) if err != nil { - c.writeBaseErrorResponse(ctx, err, "") - c.logger.Error().Err(err).Msg("failed to parse message") + c.writeErrorResponse( + ctx, + err, + wrapErrorMessage(InvalidMessage, "error parsing message", "", "", "")) continue } if err = c.handleAction(ctx, validatedMsg); err != nil { - c.writeBaseErrorResponse(ctx, err, "") - c.logger.Error().Err(err).Msg("failed to handle action") + c.writeErrorResponse( + ctx, + err, + wrapErrorMessage(InvalidMessage, "error handling action", "", "", "")) continue } } @@ -185,21 +191,21 @@ func (c *Controller) parseAndValidateMessage(message json.RawMessage) (interface var validatedMsg interface{} switch baseMsg.Action { - case "subscribe": + case models.SubscribeAction: var subscribeMsg models.SubscribeMessageRequest if err := json.Unmarshal(message, &subscribeMsg); err != nil { return nil, fmt.Errorf("error unmarshalling subscribe message: %w", err) } validatedMsg = subscribeMsg - case "unsubscribe": + case models.UnsubscribeAction: var unsubscribeMsg models.UnsubscribeMessageRequest if err := json.Unmarshal(message, &unsubscribeMsg); err != nil { return nil, fmt.Errorf("error unmarshalling unsubscribe message: %w", err) } validatedMsg = unsubscribeMsg - case "list_subscriptions": + case models.ListSubscriptionsAction: var listMsg models.ListSubscriptionsMessageRequest if err := json.Unmarshal(message, &listMsg); err != nil { return nil, fmt.Errorf("error unmarshalling list subscriptions message: %w", err) @@ -221,7 +227,7 @@ func (c *Controller) handleAction(ctx context.Context, message interface{}) erro case models.UnsubscribeMessageRequest: c.handleUnsubscribe(ctx, msg) case models.ListSubscriptionsMessageRequest: - c.handleListSubscriptions(ctx) + c.handleListSubscriptions(ctx, msg) default: return fmt.Errorf("unknown message type: %T", msg) } @@ -232,21 +238,35 @@ func (c *Controller) handleSubscribe(ctx context.Context, msg models.SubscribeMe // register new provider provider, err := c.dataProviderFactory.NewDataProvider(ctx, msg.Topic, msg.Arguments, c.multiplexedStream) if err != nil { - c.writeBaseErrorResponse(ctx, err, "subscribe") - c.logger.Error().Err(err).Msg("error creating data provider") + c.writeErrorResponse( + ctx, + err, + wrapErrorMessage(InvalidArgument, "error creating data provider", msg.MessageID, models.SubscribeAction, ""), + ) return } - c.dataProviders.Add(provider.ID(), provider) - c.writeSubscribeOkResponse(ctx, provider.ID()) + + // write OK response to client + responseOk := models.SubscribeMessageResponse{ + BaseMessageResponse: models.BaseMessageResponse{ + MessageID: msg.MessageID, + Success: true, + }, + ID: provider.ID().String(), + } + c.writeResponse(ctx, responseOk) // run provider c.dataProvidersGroup.Add(1) go func() { err = provider.Run() if err != nil { - c.writeBaseErrorResponse(ctx, err, "") - c.logger.Error().Err(err).Msgf("error while running data provider for topic: %s", msg.Topic) + c.writeErrorResponse( + ctx, + err, + wrapErrorMessage(RunError, "data provider finished with error", msg.MessageID, "", ""), + ) } c.dataProvidersGroup.Done() @@ -255,33 +275,50 @@ func (c *Controller) handleSubscribe(ctx context.Context, msg models.SubscribeMe } func (c *Controller) handleUnsubscribe(ctx context.Context, msg models.UnsubscribeMessageRequest) { - id, err := uuid.Parse(msg.ID) + id, err := uuid.Parse(msg.SubscriptionID) if err != nil { - c.writeBaseErrorResponse(ctx, err, "unsubscribe") - c.logger.Debug().Err(err).Msg("error parsing message ID") + c.writeErrorResponse( + ctx, + err, + wrapErrorMessage(InvalidArgument, "error parsing message ID", msg.MessageID, models.UnsubscribeAction, msg.SubscriptionID), + ) return } provider, ok := c.dataProviders.Get(id) if !ok { - c.writeBaseErrorResponse(ctx, fmt.Errorf("could not find data provider with such id"), "unsubscribe") - c.logger.Debug().Err(err).Msg("no active subscription with such ID found") + c.writeErrorResponse( + ctx, + err, + wrapErrorMessage(NotFound, "provider not found", msg.MessageID, models.UnsubscribeAction, msg.SubscriptionID), + ) return } err = provider.Close() if err != nil { - c.writeBaseErrorResponse(ctx, err, "unsubscribe") + c.writeErrorResponse( + ctx, + err, + wrapErrorMessage(InternalError, "provider close error", msg.MessageID, models.UnsubscribeAction, msg.SubscriptionID), + ) return } c.dataProviders.Remove(id) - c.writeUnsubscribeOkResponse(ctx, id) + + responseOk := models.UnsubscribeMessageResponse{ + BaseMessageResponse: models.BaseMessageResponse{ + MessageID: msg.MessageID, + Success: true, + }, + SubscriptionID: msg.SubscriptionID, + } + c.writeResponse(ctx, responseOk) } -func (c *Controller) handleListSubscriptions(ctx context.Context) { +func (c *Controller) handleListSubscriptions(ctx context.Context, msg models.ListSubscriptionsMessageRequest) { var subs []*models.SubscriptionEntry - err := c.dataProviders.ForEach(func(id uuid.UUID, provider dp.DataProvider) error { subs = append(subs, &models.SubscriptionEntry{ ID: id.String(), @@ -291,39 +328,42 @@ func (c *Controller) handleListSubscriptions(ctx context.Context) { }) if err != nil { - c.writeBaseErrorResponse(ctx, err, "list_subscriptions") - c.logger.Debug().Err(err).Msg("error listing subscriptions") + c.writeErrorResponse( + ctx, + err, + wrapErrorMessage(NotFound, "error looking for subscription", msg.MessageID, models.ListSubscriptionsAction, ""), + ) return } - resp := models.ListSubscriptionsMessageResponse{ + responseOk := models.ListSubscriptionsMessageResponse{ BaseMessageResponse: models.BaseMessageResponse{ - Success: true, - Action: "list_subscriptions", + Success: true, + MessageID: msg.MessageID, }, Subscriptions: subs, } - c.writeResponse(ctx, resp) + c.writeResponse(ctx, responseOk) } func (c *Controller) shutdownConnection() { err := c.conn.Close() if err != nil { - c.logger.Error().Err(err).Msg("error closing connection") + c.logger.Debug().Err(err).Msg("error closing connection") } err = c.dataProviders.ForEach(func(_ uuid.UUID, dp dp.DataProvider) error { //TODO: why did i think it's a good idea to return error in Close()? it's messy now err = dp.Close() if err != nil { - c.logger.Error().Err(err).Msg("error closing data provider") + c.logger.Debug().Err(err).Msg("error closing data provider") } return nil }) if err != nil { - c.logger.Error().Err(err).Msg("error closing data provider") + c.logger.Debug().Err(err).Msg("error closing data provider") } c.dataProviders.Clear() @@ -358,46 +398,19 @@ func (c *Controller) keepalive(ctx context.Context) error { return err } - c.writeBaseErrorResponse(ctx, err, "") - c.logger.Debug().Err(err).Msg("failed to send ping") - return fmt.Errorf("failed to write ping message: %w", err) + c.writeErrorResponse( + ctx, + err, + wrapErrorMessage(ConnectionWrite, "error sending ping", "", "", "")) + return fmt.Errorf("error sending ping: %w", err) } } } } -func (c *Controller) writeBaseErrorResponse(ctx context.Context, err error, action string) { - request := models.BaseMessageResponse{ - Action: action, - Success: false, - ErrorMessage: err.Error(), - } - - c.writeResponse(ctx, request) -} - -func (c *Controller) writeSubscribeOkResponse(ctx context.Context, id uuid.UUID) { - request := models.SubscribeMessageResponse{ - BaseMessageResponse: models.BaseMessageResponse{ - Action: "subscribe", - Success: true, - }, - ID: id.String(), - } - - c.writeResponse(ctx, request) -} - -func (c *Controller) writeUnsubscribeOkResponse(ctx context.Context, id uuid.UUID) { - request := models.UnsubscribeMessageResponse{ - BaseMessageResponse: models.BaseMessageResponse{ - Action: "unsubscribe", - Success: true, - }, - ID: id.String(), - } - - c.writeResponse(ctx, request) +func (c *Controller) writeErrorResponse(ctx context.Context, err error, msg models.BaseMessageResponse) { + c.logger.Debug().Err(err).Msg(msg.Error.Message) + c.writeResponse(ctx, msg) } func (c *Controller) writeResponse(ctx context.Context, response interface{}) { @@ -407,3 +420,16 @@ func (c *Controller) writeResponse(ctx context.Context, response interface{}) { case c.multiplexedStream <- response: } } + +func wrapErrorMessage(code Code, message string, msgId string, action string, subscriptionID string) models.BaseMessageResponse { + return models.BaseMessageResponse{ + MessageID: msgId, + Success: false, + Error: models.ErrorMessage{ + Code: int(code), + Message: message, + Action: action, + SubscriptionID: subscriptionID, + }, + } +} diff --git a/engine/access/rest/websockets/controller_test.go b/engine/access/rest/websockets/controller_test.go index c83469d61b9..7e27428ac28 100644 --- a/engine/access/rest/websockets/controller_test.go +++ b/engine/access/rest/websockets/controller_test.go @@ -70,9 +70,12 @@ func (s *WsControllerSuite) TestSubscribeRequest() { Once() request := models.SubscribeMessageRequest{ - BaseMessageRequest: models.BaseMessageRequest{Action: "subscribe"}, - Topic: "blocks", - Arguments: nil, + BaseMessageRequest: models.BaseMessageRequest{ + MessageID: uuid.New().String(), + Action: models.SubscribeAction, + }, + Topic: "blocks", + Arguments: nil, } requestJson, err := json.Marshal(request) require.NoError(t, err) @@ -93,17 +96,20 @@ func (s *WsControllerSuite) TestSubscribeRequest() { conn. On("WriteJSON", mock.Anything). Return(func(msg interface{}) error { + defer close(done) // Signal that response has been sent + response, ok := msg.(models.SubscribeMessageResponse) require.True(t, ok) - require.Equal(t, request.Action, response.Action) require.True(t, response.Success) + require.Equal(t, request.MessageID, response.MessageID) require.Equal(t, id.String(), response.ID) - close(done) // Signal that response has been sent return websocket.ErrCloseSent }) - controller.HandleConnection(context.Background()) + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second)) + defer cancel() + controller.HandleConnection(ctx) }) s.T().Run("Parse and validate error", func(t *testing.T) { @@ -137,19 +143,20 @@ func (s *WsControllerSuite) TestSubscribeRequest() { conn. On("WriteJSON", mock.Anything). Return(func(msg interface{}) error { + defer close(done) // Signal that response has been sent + response, ok := msg.(models.BaseMessageResponse) require.True(t, ok) - require.Empty(t, response.Action) require.False(t, response.Success) - require.NotNil(t, response.ErrorMessage) //TODO: add kinds of errors for different data flows - - s.T().Log(response.ErrorMessage) + require.NotEmpty(t, response.Error) + require.Equal(t, int(InvalidMessage), response.Error.Code) - close(done) // Signal that response has been sent return websocket.ErrCloseSent }) - controller.HandleConnection(context.Background()) + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second)) + defer cancel() + controller.HandleConnection(ctx) }) s.T().Run("Error creating data provider", func(t *testing.T) { @@ -168,19 +175,20 @@ func (s *WsControllerSuite) TestSubscribeRequest() { conn. On("WriteJSON", mock.Anything). Return(func(msg interface{}) error { + defer close(done) // Signal that response has been sent + response, ok := msg.(models.BaseMessageResponse) require.True(t, ok) - require.Equal(t, "subscribe", response.Action) require.False(t, response.Success) - require.Equal(t, response.ErrorMessage, "error creating data provider") - - s.T().Log(response.ErrorMessage) + require.NotEmpty(t, response.Error) + require.Equal(t, int(InvalidArgument), response.Error.Code) - close(done) // Signal that response has been sent return websocket.ErrCloseSent }) - controller.HandleConnection(context.Background()) + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second)) + defer cancel() + controller.HandleConnection(ctx) }) s.T().Run("Run error", func(t *testing.T) { @@ -201,26 +209,27 @@ func (s *WsControllerSuite) TestSubscribeRequest() { Once() done := make(chan struct{}, 1) - s.expectSubscribeRequest(conn) - s.expectSubscribeResponse(conn) + msgID := s.expectSubscribeRequest(conn) + s.expectSubscribeResponse(conn, msgID) s.expectCloseConnection(conn, done) conn. On("WriteJSON", mock.Anything). Return(func(msg interface{}) error { + defer close(done) // Signal that response has been sent + response, ok := msg.(models.BaseMessageResponse) require.True(t, ok) - require.Equal(t, "", response.Action) require.False(t, response.Success) - require.NotNil(t, response.ErrorMessage) //TODO: add kinds of errors for different data flows + require.NotEmpty(t, response.Error) + require.Equal(t, int(RunError), response.Error.Code) - s.T().Log(response.ErrorMessage) - - close(done) // Signal that response has been sent return websocket.ErrCloseSent }) - controller.HandleConnection(context.Background()) + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second)) + defer cancel() + controller.HandleConnection(ctx) }) } @@ -248,12 +257,15 @@ func (s *WsControllerSuite) TestUnsubscribeRequest() { Return(nil). Once() - s.expectSubscribeRequest(conn) - s.expectSubscribeResponse(conn) + msgID := s.expectSubscribeRequest(conn) + s.expectSubscribeResponse(conn, msgID) request := models.UnsubscribeMessageRequest{ - BaseMessageRequest: models.BaseMessageRequest{Action: "unsubscribe"}, - ID: id.String(), + BaseMessageRequest: models.BaseMessageRequest{ + MessageID: uuid.New().String(), + Action: models.UnsubscribeAction, + }, + SubscriptionID: id.String(), } requestJson, err := json.Marshal(request) require.NoError(s.T(), err) @@ -271,19 +283,21 @@ func (s *WsControllerSuite) TestUnsubscribeRequest() { conn. On("WriteJSON", mock.Anything). Return(func(msg interface{}) error { + defer close(done) + response, ok := msg.(models.UnsubscribeMessageResponse) require.True(t, ok) - require.Equal(t, request.Action, response.Action) require.True(t, response.Success) - require.Empty(t, response.ErrorMessage) - require.Equal(t, request.ID, response.ID) + require.Empty(t, response.Error) + require.Equal(t, request.MessageID, response.MessageID) + require.Equal(t, request.SubscriptionID, response.SubscriptionID) - close(done) return websocket.ErrCloseSent }). Once() s.expectCloseConnection(conn, done) + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second)) defer cancel() controller.HandleConnection(ctx) @@ -312,12 +326,15 @@ func (s *WsControllerSuite) TestUnsubscribeRequest() { Return(nil). Once() - s.expectSubscribeRequest(conn) - s.expectSubscribeResponse(conn) + msgID := s.expectSubscribeRequest(conn) + s.expectSubscribeResponse(conn, msgID) request := models.UnsubscribeMessageRequest{ - BaseMessageRequest: models.BaseMessageRequest{Action: "unsubscribe"}, - ID: "invalid-uuid", + BaseMessageRequest: models.BaseMessageRequest{ + MessageID: uuid.New().String(), + Action: models.UnsubscribeAction, + }, + SubscriptionID: "invalid-uuid", } requestJson, err := json.Marshal(request) require.NoError(s.T(), err) @@ -335,21 +352,24 @@ func (s *WsControllerSuite) TestUnsubscribeRequest() { conn. On("WriteJSON", mock.Anything). Return(func(msg interface{}) error { + defer close(done) + response, ok := msg.(models.BaseMessageResponse) require.True(t, ok) - require.Equal(t, request.Action, response.Action) require.False(t, response.Success) - require.NotEmpty(t, response.ErrorMessage) + require.NotEmpty(t, response.Error) + require.Equal(t, request.MessageID, response.MessageID) + require.Equal(t, int(InvalidArgument), response.Error.Code) - s.T().Log(response.ErrorMessage) - - close(done) return websocket.ErrCloseSent }). Once() s.expectCloseConnection(conn, done) - controller.HandleConnection(context.Background()) + + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second)) + defer cancel() + controller.HandleConnection(ctx) }) s.T().Run("Unsubscribe from unknown subscription", func(t *testing.T) { @@ -375,12 +395,15 @@ func (s *WsControllerSuite) TestUnsubscribeRequest() { Return(nil). Once() - s.expectSubscribeRequest(conn) - s.expectSubscribeResponse(conn) + msgID := s.expectSubscribeRequest(conn) + s.expectSubscribeResponse(conn, msgID) request := models.UnsubscribeMessageRequest{ - BaseMessageRequest: models.BaseMessageRequest{Action: "unsubscribe"}, - ID: uuid.New().String(), + BaseMessageRequest: models.BaseMessageRequest{ + MessageID: uuid.New().String(), + Action: models.UnsubscribeAction, + }, + SubscriptionID: uuid.New().String(), } requestJson, err := json.Marshal(request) require.NoError(s.T(), err) @@ -398,21 +421,25 @@ func (s *WsControllerSuite) TestUnsubscribeRequest() { conn. On("WriteJSON", mock.Anything). Return(func(msg interface{}) error { + defer close(done) + response, ok := msg.(models.BaseMessageResponse) require.True(t, ok) - require.Equal(t, request.Action, response.Action) require.False(t, response.Success) - require.NotEmpty(t, response.ErrorMessage) + require.NotEmpty(t, response.Error) - s.T().Log(response.ErrorMessage) + require.Equal(t, request.MessageID, response.MessageID) + require.Equal(t, int(NotFound), response.Error.Code) - close(done) return websocket.ErrCloseSent }). Once() s.expectCloseConnection(conn, done) - controller.HandleConnection(context.Background()) + + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second)) + defer cancel() + controller.HandleConnection(ctx) }) } @@ -442,11 +469,14 @@ func (s *WsControllerSuite) TestListSubscriptions() { Return(nil). Once() - s.expectSubscribeRequest(conn) - s.expectSubscribeResponse(conn) + msgID := s.expectSubscribeRequest(conn) + s.expectSubscribeResponse(conn, msgID) request := models.ListSubscriptionsMessageRequest{ - BaseMessageRequest: models.BaseMessageRequest{Action: "list_subscriptions"}, + BaseMessageRequest: models.BaseMessageRequest{ + MessageID: uuid.New().String(), + Action: models.ListSubscriptionsAction, + }, } requestJson, err := json.Marshal(request) require.NoError(s.T(), err) @@ -464,22 +494,26 @@ func (s *WsControllerSuite) TestListSubscriptions() { conn. On("WriteJSON", mock.Anything). Return(func(msg interface{}) error { + defer close(done) + response, ok := msg.(models.ListSubscriptionsMessageResponse) require.True(t, ok) + require.True(t, response.Success) + require.Empty(t, response.Error) + require.Equal(t, request.MessageID, response.MessageID) require.Equal(t, 1, len(response.Subscriptions)) require.Equal(t, id.String(), response.Subscriptions[0].ID) require.Equal(t, topic, response.Subscriptions[0].Topic) - require.Equal(t, response.Action, "list_subscriptions") - require.True(t, response.Success) - require.Empty(t, response.ErrorMessage) - close(done) return websocket.ErrCloseSent }). Once() s.expectCloseConnection(conn, done) - controller.HandleConnection(context.Background()) + + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second)) + defer cancel() + controller.HandleConnection(ctx) }) } @@ -510,8 +544,8 @@ func (s *WsControllerSuite) TestSubscribeBlocks() { Once() done := make(chan struct{}, 1) - s.expectSubscribeRequest(conn) - s.expectSubscribeResponse(conn) + msgID := s.expectSubscribeRequest(conn) + s.expectSubscribeResponse(conn, msgID) s.expectCloseConnection(conn, done) // Expect a valid block to be passed to WriteJSON. @@ -520,15 +554,19 @@ func (s *WsControllerSuite) TestSubscribeBlocks() { conn. On("WriteJSON", mock.Anything). Return(func(msg interface{}) error { + defer close(done) + block, ok := msg.(flow.Block) require.True(t, ok) actualBlock = block - close(done) return websocket.ErrCloseSent }) - controller.HandleConnection(context.Background()) + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second)) + defer cancel() + controller.HandleConnection(ctx) + require.Equal(t, expectedBlock, actualBlock) }) @@ -558,8 +596,8 @@ func (s *WsControllerSuite) TestSubscribeBlocks() { Once() done := make(chan struct{}, 1) - s.expectSubscribeRequest(conn) - s.expectSubscribeResponse(conn) + msgID := s.expectSubscribeRequest(conn) + s.expectSubscribeResponse(conn, msgID) s.expectCloseConnection(conn, done) i := 0 @@ -585,7 +623,10 @@ func (s *WsControllerSuite) TestSubscribeBlocks() { }). Times(len(expectedBlocks)) - controller.HandleConnection(context.Background()) + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second)) + defer cancel() + controller.HandleConnection(ctx) + require.Equal(t, expectedBlocks, actualBlocks) }) } @@ -606,24 +647,29 @@ func newControllerMocks(t *testing.T) (*connmock.WebsocketConnection, *dpmock.Da } // expectSubscribeRequest mocks the client's subscription request. -func (s *WsControllerSuite) expectSubscribeRequest(conn *connmock.WebsocketConnection) { - subscribeRequest := models.SubscribeMessageRequest{ - BaseMessageRequest: models.BaseMessageRequest{Action: "subscribe"}, - Topic: "blocks", +func (s *WsControllerSuite) expectSubscribeRequest(conn *connmock.WebsocketConnection) string { + request := models.SubscribeMessageRequest{ + BaseMessageRequest: models.BaseMessageRequest{ + MessageID: uuid.New().String(), + Action: models.SubscribeAction, + }, + Topic: "blocks", } - subscribeRequestJson, err := json.Marshal(subscribeRequest) + requestJson, err := json.Marshal(request) require.NoError(s.T(), err) - // The very first message from a client is a subscribeRequest to subscribe to some topic + // The very first message from a client is a request to subscribe to some topic conn. On("ReadJSON", mock.Anything). Run(func(args mock.Arguments) { msg, ok := args.Get(0).(*json.RawMessage) require.True(s.T(), ok) - *msg = subscribeRequestJson + *msg = requestJson }). Return(nil). Once() + + return request.MessageID } func (s *WsControllerSuite) expectCloseConnection(conn *connmock.WebsocketConnection, done <-chan struct{}) { @@ -640,13 +686,13 @@ func (s *WsControllerSuite) expectCloseConnection(conn *connmock.WebsocketConnec } // expectSubscribeResponse mocks the subscription response sent to the client. -func (s *WsControllerSuite) expectSubscribeResponse(conn *connmock.WebsocketConnection) { +func (s *WsControllerSuite) expectSubscribeResponse(conn *connmock.WebsocketConnection, msgId string) { conn. On("WriteJSON", mock.Anything). Run(func(args mock.Arguments) { response, ok := args.Get(0).(models.SubscribeMessageResponse) require.True(s.T(), ok) - require.Equal(s.T(), "subscribe", response.Action) + require.Equal(s.T(), msgId, response.MessageID) require.Equal(s.T(), true, response.Success) }). Return(nil). diff --git a/engine/access/rest/websockets/error_codes.go b/engine/access/rest/websockets/error_codes.go new file mode 100644 index 00000000000..fa4d1521a3b --- /dev/null +++ b/engine/access/rest/websockets/error_codes.go @@ -0,0 +1,14 @@ +package websockets + +type Code int + +const ( + Ok Code = iota + ConnectionRead + ConnectionWrite + InvalidMessage + NotFound + InvalidArgument + RunError + InternalError +) diff --git a/engine/access/rest/websockets/models/base_message.go b/engine/access/rest/websockets/models/base_message.go index 38a030358bd..88b15ded6a3 100644 --- a/engine/access/rest/websockets/models/base_message.go +++ b/engine/access/rest/websockets/models/base_message.go @@ -2,14 +2,19 @@ package models // BaseMessageRequest represents a base structure for incoming messages. type BaseMessageRequest struct { - Action string `json:"action"` // Action type of the request + Action string `json:"action"` // subscribe, unsubscribe or list_subscriptions + MessageID string `json:"message_id"` // MessageID is a uuid generated by client to identify request/response uniquely } // BaseMessageResponse represents a base structure for outgoing messages. type BaseMessageResponse struct { - Action string `json:"action,omitempty"` // Action type of the response - Success bool `json:"success"` // Indicates success or failure - ErrorMessage string `json:"error_message,omitempty"` // Error message, if any + MessageID string `json:"message_id,omitempty"` // MessageID may be empty in case we send msg by ourselves (e.g. error occurred) + Success bool `json:"success"` + Error ErrorMessage `json:"error,omitempty"` } -// TODO: add Action enum? subscribe, unsubscribe, list +const ( + SubscribeAction = "subscribe" + UnsubscribeAction = "unsubscribe" + ListSubscriptionsAction = "list_subscription" +) diff --git a/engine/access/rest/websockets/models/error.go b/engine/access/rest/websockets/models/error.go new file mode 100644 index 00000000000..a49e90594d5 --- /dev/null +++ b/engine/access/rest/websockets/models/error.go @@ -0,0 +1,8 @@ +package models + +type ErrorMessage struct { + Code int `json:"code"` + Message string `json:"message"` + Action string `json:"action,omitempty"` + SubscriptionID string `json:"subscription_id,omitempty"` +} diff --git a/engine/access/rest/websockets/models/unsubscribe.go b/engine/access/rest/websockets/models/unsubscribe.go index 2024bb922e0..b0b8b8f8e0d 100644 --- a/engine/access/rest/websockets/models/unsubscribe.go +++ b/engine/access/rest/websockets/models/unsubscribe.go @@ -3,11 +3,11 @@ package models // UnsubscribeMessageRequest represents a request to unsubscribe from a topic. type UnsubscribeMessageRequest struct { BaseMessageRequest - ID string `json:"id"` // Unique subscription ID + SubscriptionID string `json:"id"` } // UnsubscribeMessageResponse represents the response to an unsubscription request. type UnsubscribeMessageResponse struct { BaseMessageResponse - ID string `json:"id"` // Unique subscription ID + SubscriptionID string `json:"id"` }