Skip to content

Commit

Permalink
linkedingo: improve reporting of realtime connect errors
Browse files Browse the repository at this point in the history
Signed-off-by: Sumner Evans <[email protected]>
  • Loading branch information
sumnerevans committed Feb 19, 2025
1 parent caf6c30 commit 882927b
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 21 deletions.
31 changes: 26 additions & 5 deletions pkg/connector/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,10 @@ func NewLinkedInClient(ctx context.Context, lc *LinkedInConnector, login *bridge
ClientConnection: func(context.Context, *linkedingo.ClientConnection) {
login.BridgeState.Send(status.BridgeState{StateEvent: status.StateConnected})
},
RealtimeConnectError: client.onRealtimeConnectError,
DecoratedEvent: client.onDecoratedEvent,
TransientDisconnect: client.onTransientDisconnect,
BadCredentials: client.onBadCredentials,
UnknownError: client.onUnknownError,
DecoratedEvent: client.onDecoratedEvent,
},
)

Expand Down Expand Up @@ -137,14 +139,33 @@ func (l *LinkedInClient) Connect(ctx context.Context) {
}
}

func (l *LinkedInClient) onRealtimeConnectError(ctx context.Context, err error) {
func (l *LinkedInClient) onTransientDisconnect(ctx context.Context, err error) {
zerolog.Ctx(ctx).Err(err).Msg("failed to read from event stream")
// TODO probably don't do this unconditionally
l.userLogin.BridgeState.Send(status.BridgeState{
StateEvent: status.StateTransientDisconnect,
Error: "linkedin-transient-disconnect",
Message: err.Error(),
})
}

func (l *LinkedInClient) onBadCredentials(ctx context.Context, err error) {
zerolog.Ctx(ctx).Err(err).Msg("bad credentials")
l.userLogin.BridgeState.Send(status.BridgeState{
StateEvent: status.StateBadCredentials,
Error: "linkedin-no-auth",
Error: "linkedin-bad-credentials",
Message: err.Error(),
})
l.Disconnect()
}

func (l *LinkedInClient) onUnknownError(ctx context.Context, err error) {
zerolog.Ctx(ctx).Err(err).Msg("unknown error")
l.userLogin.BridgeState.Send(status.BridgeState{
StateEvent: status.StateUnknownError,
Error: "linkedin-unknown-error",
Message: err.Error(),
})
// TODO probably don't do this unconditionally?
l.Disconnect()
}

Expand Down
28 changes: 21 additions & 7 deletions pkg/linkedingo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,12 @@ func NewClient(ctx context.Context, userEntityURN types.URN, jar *stringcookieja
}

type Handlers struct {
Heartbeat func(context.Context)
ClientConnection func(context.Context, *ClientConnection)
RealtimeConnectError func(context.Context, error)
DecoratedEvent func(context.Context, *DecoratedEvent)
Heartbeat func(context.Context)
ClientConnection func(context.Context, *ClientConnection)
TransientDisconnect func(context.Context, error)
BadCredentials func(context.Context, error)
UnknownError func(context.Context, error)
DecoratedEvent func(context.Context, *DecoratedEvent)
}

func (h Handlers) onHeartbeat(ctx context.Context) {
Expand All @@ -81,9 +83,21 @@ func (h Handlers) onClientConnection(ctx context.Context, conn *ClientConnection
}
}

func (h Handlers) onRealtimeConnectError(ctx context.Context, err error) {
if h.RealtimeConnectError != nil {
h.RealtimeConnectError(ctx, err)
func (h Handlers) onTransientDisconnect(ctx context.Context, err error) {
if h.TransientDisconnect != nil {
h.TransientDisconnect(ctx, err)
}
}

func (h Handlers) onBadCredentials(ctx context.Context, err error) {
if h.BadCredentials != nil {
h.BadCredentials(ctx, err)
}
}

func (h Handlers) onUnknownError(ctx context.Context, err error) {
if h.UnknownError != nil {
h.UnknownError(ctx, err)
}
}

Expand Down
25 changes: 16 additions & 9 deletions pkg/linkedingo/realtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,11 +230,15 @@ func (c *Client) realtimeConnectLoop(ctx context.Context) {
WithHeader("Accept", contentTypeTextEventStream).
Do(ctx)
if err != nil {
c.handlers.onRealtimeConnectError(ctx, err)
c.handlers.onUnknownError(ctx, fmt.Errorf("failed to connect: %w", err))
return
}
if c.realtimeResp.StatusCode != http.StatusOK {
c.handlers.onRealtimeConnectError(ctx, fmt.Errorf("failed to connect due to status code %d", c.realtimeResp.StatusCode))
} else if c.realtimeResp.StatusCode != http.StatusOK {
switch c.realtimeResp.StatusCode {
case http.StatusUnauthorized:
c.handlers.onBadCredentials(ctx, fmt.Errorf("got %d on connect", c.realtimeResp.StatusCode))
default:
c.handlers.onUnknownError(ctx, fmt.Errorf("failed to connect due to status code %d", c.realtimeResp.StatusCode))
}
return
}

Expand All @@ -245,12 +249,15 @@ func (c *Client) realtimeConnectLoop(ctx context.Context) {
if err != nil {
if errors.Is(err, context.Canceled) {
return
}
if errors.Is(err, io.EOF) {
} else if errors.Is(err, io.EOF) {
log.Info().
Stringer("realtime_session_id", c.realtimeSessionID).
Msg("Realtime stream closed")
break
} else {
c.handlers.onTransientDisconnect(ctx, fmt.Errorf("failed to read realtime stream: %w", err))
break
}
c.handlers.onRealtimeConnectError(ctx, err)
break
}

if !bytes.HasPrefix(line, []byte("data:")) {
Expand All @@ -259,7 +266,7 @@ func (c *Client) realtimeConnectLoop(ctx context.Context) {

var realtimeEvent RealtimeEvent
if err = json.Unmarshal(line[6:], &realtimeEvent); err != nil {
c.handlers.onRealtimeConnectError(ctx, err)
c.handlers.onTransientDisconnect(ctx, fmt.Errorf("failed to unmarshal realtime event: %w", err))
break
}

Expand Down

0 comments on commit 882927b

Please sign in to comment.