Skip to content

Commit

Permalink
refactor places where we return error
Browse files Browse the repository at this point in the history
  • Loading branch information
illia-malachyn committed Dec 12, 2024
1 parent 78ca825 commit 09bf64d
Show file tree
Hide file tree
Showing 6 changed files with 252 additions and 153 deletions.
164 changes: 95 additions & 69 deletions engine/access/rest/websockets/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,21 +157,27 @@ func (c *Controller) readMessages(ctx context.Context) error {
return err
}

c.writeBaseErrorResponse(ctx, err, "")
c.logger.Error().Err(err).Msg("error reading message")
c.writeErrorResponse(
ctx,
err,
wrapErrorMessage(ConnectionRead, "error reading from conn", "", "", ""))
continue
}

validatedMsg, err := c.parseAndValidateMessage(message)
if err != nil {
c.writeBaseErrorResponse(ctx, err, "")
c.logger.Error().Err(err).Msg("failed to parse message")
c.writeErrorResponse(
ctx,
err,
wrapErrorMessage(InvalidMessage, "error parsing message", "", "", ""))
continue
}

if err = c.handleAction(ctx, validatedMsg); err != nil {
c.writeBaseErrorResponse(ctx, err, "")
c.logger.Error().Err(err).Msg("failed to handle action")
c.writeErrorResponse(
ctx,
err,
wrapErrorMessage(InvalidMessage, "error handling action", "", "", ""))
continue
}
}
Expand All @@ -185,21 +191,21 @@ func (c *Controller) parseAndValidateMessage(message json.RawMessage) (interface

var validatedMsg interface{}
switch baseMsg.Action {
case "subscribe":
case models.SubscribeAction:
var subscribeMsg models.SubscribeMessageRequest
if err := json.Unmarshal(message, &subscribeMsg); err != nil {
return nil, fmt.Errorf("error unmarshalling subscribe message: %w", err)
}
validatedMsg = subscribeMsg

case "unsubscribe":
case models.UnsubscribeAction:
var unsubscribeMsg models.UnsubscribeMessageRequest
if err := json.Unmarshal(message, &unsubscribeMsg); err != nil {
return nil, fmt.Errorf("error unmarshalling unsubscribe message: %w", err)
}
validatedMsg = unsubscribeMsg

case "list_subscriptions":
case models.ListSubscriptionsAction:
var listMsg models.ListSubscriptionsMessageRequest
if err := json.Unmarshal(message, &listMsg); err != nil {
return nil, fmt.Errorf("error unmarshalling list subscriptions message: %w", err)
Expand All @@ -221,7 +227,7 @@ func (c *Controller) handleAction(ctx context.Context, message interface{}) erro
case models.UnsubscribeMessageRequest:
c.handleUnsubscribe(ctx, msg)
case models.ListSubscriptionsMessageRequest:
c.handleListSubscriptions(ctx)
c.handleListSubscriptions(ctx, msg)
default:
return fmt.Errorf("unknown message type: %T", msg)
}
Expand All @@ -232,21 +238,35 @@ func (c *Controller) handleSubscribe(ctx context.Context, msg models.SubscribeMe
// register new provider
provider, err := c.dataProviderFactory.NewDataProvider(ctx, msg.Topic, msg.Arguments, c.multiplexedStream)
if err != nil {
c.writeBaseErrorResponse(ctx, err, "subscribe")
c.logger.Error().Err(err).Msg("error creating data provider")
c.writeErrorResponse(
ctx,
err,
wrapErrorMessage(InvalidArgument, "error creating data provider", msg.MessageID, models.SubscribeAction, ""),
)
return
}

c.dataProviders.Add(provider.ID(), provider)
c.writeSubscribeOkResponse(ctx, provider.ID())

// write OK response to client
responseOk := models.SubscribeMessageResponse{
BaseMessageResponse: models.BaseMessageResponse{
MessageID: msg.MessageID,
Success: true,
},
ID: provider.ID().String(),
}
c.writeResponse(ctx, responseOk)

// run provider
c.dataProvidersGroup.Add(1)
go func() {
err = provider.Run()
if err != nil {
c.writeBaseErrorResponse(ctx, err, "")
c.logger.Error().Err(err).Msgf("error while running data provider for topic: %s", msg.Topic)
c.writeErrorResponse(
ctx,
err,
wrapErrorMessage(RunError, "data provider finished with error", msg.MessageID, "", ""),
)
}

c.dataProvidersGroup.Done()
Expand All @@ -255,33 +275,50 @@ func (c *Controller) handleSubscribe(ctx context.Context, msg models.SubscribeMe
}

func (c *Controller) handleUnsubscribe(ctx context.Context, msg models.UnsubscribeMessageRequest) {
id, err := uuid.Parse(msg.ID)
id, err := uuid.Parse(msg.SubscriptionID)
if err != nil {
c.writeBaseErrorResponse(ctx, err, "unsubscribe")
c.logger.Debug().Err(err).Msg("error parsing message ID")
c.writeErrorResponse(
ctx,
err,
wrapErrorMessage(InvalidArgument, "error parsing message ID", msg.MessageID, models.UnsubscribeAction, msg.SubscriptionID),
)
return
}

provider, ok := c.dataProviders.Get(id)
if !ok {
c.writeBaseErrorResponse(ctx, fmt.Errorf("could not find data provider with such id"), "unsubscribe")
c.logger.Debug().Err(err).Msg("no active subscription with such ID found")
c.writeErrorResponse(
ctx,
err,
wrapErrorMessage(NotFound, "provider not found", msg.MessageID, models.UnsubscribeAction, msg.SubscriptionID),
)
return
}

err = provider.Close()
if err != nil {
c.writeBaseErrorResponse(ctx, err, "unsubscribe")
c.writeErrorResponse(
ctx,
err,
wrapErrorMessage(InternalError, "provider close error", msg.MessageID, models.UnsubscribeAction, msg.SubscriptionID),
)
return
}

c.dataProviders.Remove(id)
c.writeUnsubscribeOkResponse(ctx, id)

responseOk := models.UnsubscribeMessageResponse{
BaseMessageResponse: models.BaseMessageResponse{
MessageID: msg.MessageID,
Success: true,
},
SubscriptionID: msg.SubscriptionID,
}
c.writeResponse(ctx, responseOk)
}

func (c *Controller) handleListSubscriptions(ctx context.Context) {
func (c *Controller) handleListSubscriptions(ctx context.Context, msg models.ListSubscriptionsMessageRequest) {
var subs []*models.SubscriptionEntry

err := c.dataProviders.ForEach(func(id uuid.UUID, provider dp.DataProvider) error {
subs = append(subs, &models.SubscriptionEntry{
ID: id.String(),
Expand All @@ -291,39 +328,42 @@ func (c *Controller) handleListSubscriptions(ctx context.Context) {
})

if err != nil {
c.writeBaseErrorResponse(ctx, err, "list_subscriptions")
c.logger.Debug().Err(err).Msg("error listing subscriptions")
c.writeErrorResponse(
ctx,
err,
wrapErrorMessage(NotFound, "error looking for subscription", msg.MessageID, models.ListSubscriptionsAction, ""),
)
return
}

resp := models.ListSubscriptionsMessageResponse{
responseOk := models.ListSubscriptionsMessageResponse{
BaseMessageResponse: models.BaseMessageResponse{
Success: true,
Action: "list_subscriptions",
Success: true,
MessageID: msg.MessageID,
},
Subscriptions: subs,
}
c.writeResponse(ctx, resp)
c.writeResponse(ctx, responseOk)
}

func (c *Controller) shutdownConnection() {
err := c.conn.Close()
if err != nil {
c.logger.Error().Err(err).Msg("error closing connection")
c.logger.Debug().Err(err).Msg("error closing connection")
}

err = c.dataProviders.ForEach(func(_ uuid.UUID, dp dp.DataProvider) error {
//TODO: why did i think it's a good idea to return error in Close()? it's messy now
err = dp.Close()
if err != nil {
c.logger.Error().Err(err).Msg("error closing data provider")
c.logger.Debug().Err(err).Msg("error closing data provider")
}

return nil
})

if err != nil {
c.logger.Error().Err(err).Msg("error closing data provider")
c.logger.Debug().Err(err).Msg("error closing data provider")
}

c.dataProviders.Clear()
Expand Down Expand Up @@ -358,46 +398,19 @@ func (c *Controller) keepalive(ctx context.Context) error {
return err
}

c.writeBaseErrorResponse(ctx, err, "")
c.logger.Debug().Err(err).Msg("failed to send ping")
return fmt.Errorf("failed to write ping message: %w", err)
c.writeErrorResponse(
ctx,
err,
wrapErrorMessage(ConnectionWrite, "error sending ping", "", "", ""))
return fmt.Errorf("error sending ping: %w", err)
}
}
}
}

func (c *Controller) writeBaseErrorResponse(ctx context.Context, err error, action string) {
request := models.BaseMessageResponse{
Action: action,
Success: false,
ErrorMessage: err.Error(),
}

c.writeResponse(ctx, request)
}

func (c *Controller) writeSubscribeOkResponse(ctx context.Context, id uuid.UUID) {
request := models.SubscribeMessageResponse{
BaseMessageResponse: models.BaseMessageResponse{
Action: "subscribe",
Success: true,
},
ID: id.String(),
}

c.writeResponse(ctx, request)
}

func (c *Controller) writeUnsubscribeOkResponse(ctx context.Context, id uuid.UUID) {
request := models.UnsubscribeMessageResponse{
BaseMessageResponse: models.BaseMessageResponse{
Action: "unsubscribe",
Success: true,
},
ID: id.String(),
}

c.writeResponse(ctx, request)
func (c *Controller) writeErrorResponse(ctx context.Context, err error, msg models.BaseMessageResponse) {
c.logger.Debug().Err(err).Msg(msg.Error.Message)
c.writeResponse(ctx, msg)
}

func (c *Controller) writeResponse(ctx context.Context, response interface{}) {
Expand All @@ -407,3 +420,16 @@ func (c *Controller) writeResponse(ctx context.Context, response interface{}) {
case c.multiplexedStream <- response:
}
}

func wrapErrorMessage(code Code, message string, msgId string, action string, subscriptionID string) models.BaseMessageResponse {
return models.BaseMessageResponse{
MessageID: msgId,
Success: false,
Error: models.ErrorMessage{
Code: int(code),
Message: message,
Action: action,
SubscriptionID: subscriptionID,
},
}
}
Loading

0 comments on commit 09bf64d

Please sign in to comment.