From 882927b37fcfba10896e8b0159f579012da27bb7 Mon Sep 17 00:00:00 2001 From: Sumner Evans Date: Wed, 19 Feb 2025 12:47:18 -0700 Subject: [PATCH] linkedingo: improve reporting of realtime connect errors Signed-off-by: Sumner Evans --- pkg/connector/client.go | 31 ++++++++++++++++++++++++++----- pkg/linkedingo/client.go | 28 +++++++++++++++++++++------- pkg/linkedingo/realtime.go | 25 ++++++++++++++++--------- 3 files changed, 63 insertions(+), 21 deletions(-) diff --git a/pkg/connector/client.go b/pkg/connector/client.go index 080d959..8065872 100644 --- a/pkg/connector/client.go +++ b/pkg/connector/client.go @@ -82,8 +82,10 @@ func NewLinkedInClient(ctx context.Context, lc *LinkedInConnector, login *bridge ClientConnection: func(context.Context, *linkedingo.ClientConnection) { login.BridgeState.Send(status.BridgeState{StateEvent: status.StateConnected}) }, - RealtimeConnectError: client.onRealtimeConnectError, - DecoratedEvent: client.onDecoratedEvent, + TransientDisconnect: client.onTransientDisconnect, + BadCredentials: client.onBadCredentials, + UnknownError: client.onUnknownError, + DecoratedEvent: client.onDecoratedEvent, }, ) @@ -137,14 +139,33 @@ func (l *LinkedInClient) Connect(ctx context.Context) { } } -func (l *LinkedInClient) onRealtimeConnectError(ctx context.Context, err error) { +func (l *LinkedInClient) onTransientDisconnect(ctx context.Context, err error) { zerolog.Ctx(ctx).Err(err).Msg("failed to read from event stream") - // TODO probably don't do this unconditionally + l.userLogin.BridgeState.Send(status.BridgeState{ + StateEvent: status.StateTransientDisconnect, + Error: "linkedin-transient-disconnect", + Message: err.Error(), + }) +} + +func (l *LinkedInClient) onBadCredentials(ctx context.Context, err error) { + zerolog.Ctx(ctx).Err(err).Msg("bad credentials") l.userLogin.BridgeState.Send(status.BridgeState{ StateEvent: status.StateBadCredentials, - Error: "linkedin-no-auth", + Error: "linkedin-bad-credentials", + Message: err.Error(), + }) + l.Disconnect() +} + +func (l *LinkedInClient) onUnknownError(ctx context.Context, err error) { + zerolog.Ctx(ctx).Err(err).Msg("unknown error") + l.userLogin.BridgeState.Send(status.BridgeState{ + StateEvent: status.StateUnknownError, + Error: "linkedin-unknown-error", Message: err.Error(), }) + // TODO probably don't do this unconditionally? l.Disconnect() } diff --git a/pkg/linkedingo/client.go b/pkg/linkedingo/client.go index 42c6b56..5df075c 100644 --- a/pkg/linkedingo/client.go +++ b/pkg/linkedingo/client.go @@ -63,10 +63,12 @@ func NewClient(ctx context.Context, userEntityURN types.URN, jar *stringcookieja } type Handlers struct { - Heartbeat func(context.Context) - ClientConnection func(context.Context, *ClientConnection) - RealtimeConnectError func(context.Context, error) - DecoratedEvent func(context.Context, *DecoratedEvent) + Heartbeat func(context.Context) + ClientConnection func(context.Context, *ClientConnection) + TransientDisconnect func(context.Context, error) + BadCredentials func(context.Context, error) + UnknownError func(context.Context, error) + DecoratedEvent func(context.Context, *DecoratedEvent) } func (h Handlers) onHeartbeat(ctx context.Context) { @@ -81,9 +83,21 @@ func (h Handlers) onClientConnection(ctx context.Context, conn *ClientConnection } } -func (h Handlers) onRealtimeConnectError(ctx context.Context, err error) { - if h.RealtimeConnectError != nil { - h.RealtimeConnectError(ctx, err) +func (h Handlers) onTransientDisconnect(ctx context.Context, err error) { + if h.TransientDisconnect != nil { + h.TransientDisconnect(ctx, err) + } +} + +func (h Handlers) onBadCredentials(ctx context.Context, err error) { + if h.BadCredentials != nil { + h.BadCredentials(ctx, err) + } +} + +func (h Handlers) onUnknownError(ctx context.Context, err error) { + if h.UnknownError != nil { + h.UnknownError(ctx, err) } } diff --git a/pkg/linkedingo/realtime.go b/pkg/linkedingo/realtime.go index f0815af..b0478ab 100644 --- a/pkg/linkedingo/realtime.go +++ b/pkg/linkedingo/realtime.go @@ -230,11 +230,15 @@ func (c *Client) realtimeConnectLoop(ctx context.Context) { WithHeader("Accept", contentTypeTextEventStream). Do(ctx) if err != nil { - c.handlers.onRealtimeConnectError(ctx, err) + c.handlers.onUnknownError(ctx, fmt.Errorf("failed to connect: %w", err)) return - } - if c.realtimeResp.StatusCode != http.StatusOK { - c.handlers.onRealtimeConnectError(ctx, fmt.Errorf("failed to connect due to status code %d", c.realtimeResp.StatusCode)) + } else if c.realtimeResp.StatusCode != http.StatusOK { + switch c.realtimeResp.StatusCode { + case http.StatusUnauthorized: + c.handlers.onBadCredentials(ctx, fmt.Errorf("got %d on connect", c.realtimeResp.StatusCode)) + default: + c.handlers.onUnknownError(ctx, fmt.Errorf("failed to connect due to status code %d", c.realtimeResp.StatusCode)) + } return } @@ -245,12 +249,15 @@ func (c *Client) realtimeConnectLoop(ctx context.Context) { if err != nil { if errors.Is(err, context.Canceled) { return - } - if errors.Is(err, io.EOF) { + } else if errors.Is(err, io.EOF) { + log.Info(). + Stringer("realtime_session_id", c.realtimeSessionID). + Msg("Realtime stream closed") + break + } else { + c.handlers.onTransientDisconnect(ctx, fmt.Errorf("failed to read realtime stream: %w", err)) break } - c.handlers.onRealtimeConnectError(ctx, err) - break } if !bytes.HasPrefix(line, []byte("data:")) { @@ -259,7 +266,7 @@ func (c *Client) realtimeConnectLoop(ctx context.Context) { var realtimeEvent RealtimeEvent if err = json.Unmarshal(line[6:], &realtimeEvent); err != nil { - c.handlers.onRealtimeConnectError(ctx, err) + c.handlers.onTransientDisconnect(ctx, fmt.Errorf("failed to unmarshal realtime event: %w", err)) break }