Skip to content

Commit

Permalink
[listen] logging improvements (#1229)
Browse files Browse the repository at this point in the history
* add updates to listen

* rpc changes

* rest of rpc changes

* add context

* pass headers along

* final listen changes

* tests and CI

* clean things up

* send event id, not webhook id

* fix test after merge

* add new event

* rename something

* fix seg fault

* send request body and headers to mongo

* omit webhook convo id if empty

* pass event ID
  • Loading branch information
charliecruzan-stripe authored Sep 11, 2024
1 parent 5be357c commit 8d33a69
Show file tree
Hide file tree
Showing 37 changed files with 1,117 additions and 172 deletions.
45 changes: 45 additions & 0 deletions docs/rpc/commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
- [common.proto](#common-proto)
- [StripeEvent](#rpc-StripeEvent)
- [StripeEvent.Request](#rpc-StripeEvent-Request)
- [V2StripeEvent](#rpc-V2StripeEvent)
- [V2StripeEvent.RelatedObject](#rpc-V2StripeEvent-RelatedObject)

- [events_resend.proto](#events_resend-proto)
- [EventsResendRequest](#rpc-EventsResendRequest)
Expand Down Expand Up @@ -171,6 +173,44 @@




<a name="rpc-V2StripeEvent"></a>

### V2StripeEvent



| Field | Type | Label | Description |
| ----- | ---- | ----- | ----------- |
| created | [string](#string) | | timestamp event was created |
| data | [bytes](#bytes) | | Object containing data associated with the event. |
| id | [string](#string) | | unique id of the event |
| object | [string](#string) | | The object type, i.e. &#39;event&#39; |
| related_object | [V2StripeEvent.RelatedObject](#rpc-V2StripeEvent-RelatedObject) | | The resource related to the event |
| type | [string](#string) | | Description of the event (e.g., invoice.created or charge.refunded). |
| context | [string](#string) | | Conext of the event |






<a name="rpc-V2StripeEvent-RelatedObject"></a>

### V2StripeEvent.RelatedObject



| Field | Type | Label | Description |
| ----- | ---- | ----- | ----------- |
| id | [string](#string) | | unique ID of the resource |
| type | [string](#string) | | The resource type |
| url | [string](#string) | | The API url to GET the resource details |








Expand Down Expand Up @@ -305,6 +345,10 @@
| live | [bool](#bool) | | Receive live events (default: test) |
| skip_verify | [bool](#bool) | | Skip certificate verification when forwarding to HTTPS endpoints |
| use_configured_webhooks | [bool](#bool) | | Load webhook endpoint configuration from the webhooks API/dashboard |
| thin_events | [string](#string) | repeated | A list of thin-type events to listen for. Defaults to none. |
| forward_thin_to | [string](#string) | | The URL to forward thin webhook events to |
| forward_thin_connect_to | [string](#string) | | The URL to forward thin Connect webhook events to (default: same as normal thin events) |
| api_version | [string](#string) | | The Stripe API version associated with the provided snapshot payload event types |



Expand All @@ -322,6 +366,7 @@
| state | [ListenResponse.State](#rpc-ListenResponse-State) | | Check if the stream ready |
| stripe_event | [StripeEvent](#rpc-StripeEvent) | | A Stripe event |
| endpoint_response | [ListenResponse.EndpointResponse](#rpc-ListenResponse-EndpointResponse) | | A response from an endpoint |
| v2_stripe_event | [V2StripeEvent](#rpc-V2StripeEvent) | | A V2 Stripe event |



Expand Down
59 changes: 54 additions & 5 deletions pkg/cmd/listen.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,24 @@ import (
"github.com/stripe/stripe-cli/pkg/websocket"
)

const webhooksWebSocketFeature = "webhooks"
const timeLayout = "2006-01-02 15:04:05"
const outputFormatJSON = "JSON"
const (
webhooksWebSocketFeature = "webhooks"
destinationsWebSocketFeature = "v2_events"
timeLayout = "2006-01-02 15:04:05"
outputFormatJSON = "JSON"
)

type listenCmd struct {
cmd *cobra.Command

forwardURL string
forwardThinURL string
forwardHeaders []string
forwardConnectHeaders []string
forwardConnectURL string
forwardThinConnectURL string
events []string
thinEvents []string
latestAPIVersion bool
livemode bool
useConfiguredWebhooks bool
Expand Down Expand Up @@ -70,6 +76,12 @@ Stripe account.`,
lc.cmd.Flags().StringVarP(&lc.forwardURL, "forward-to", "f", "", "The URL to forward webhook events to")
lc.cmd.Flags().StringSliceVarP(&lc.forwardHeaders, "headers", "H", []string{}, "A comma-separated list of custom headers to forward. Ex: \"Key1:Value1, Key2:Value2\"")
lc.cmd.Flags().StringVarP(&lc.forwardConnectURL, "forward-connect-to", "c", "", "The URL to forward Connect webhook events to (default: same as normal events)")
lc.cmd.Flags().StringSliceVar(&lc.thinEvents, "thin-events", []string{}, "A comma-separated list of thin events to listen for.")
lc.cmd.Flags().MarkHidden("thin-events")
lc.cmd.Flags().StringVar(&lc.forwardThinURL, "forward-thin-to", "", "The URL to forward thin events to")
lc.cmd.Flags().MarkHidden("forward-thin-to")
lc.cmd.Flags().StringVar(&lc.forwardThinConnectURL, "forward-thin-connect-to", "", "The URL to forward thin Connect events to")
lc.cmd.Flags().MarkHidden("forward-thin-connect-to")
lc.cmd.Flags().BoolVarP(&lc.latestAPIVersion, "latest", "l", false, "Receive events formatted with the latest API version (default: your account's default API version)")
lc.cmd.Flags().BoolVar(&lc.livemode, "live", false, "Receive live events (default: test)")
lc.cmd.Flags().BoolVarP(&lc.printJSON, "print-json", "j", false, "Print full JSON objects to stdout.")
Expand Down Expand Up @@ -153,18 +165,21 @@ func (lc *listenCmd) runListenCmd(cmd *cobra.Command, args []string) error {
Client: client,
DeviceName: deviceName,
ForwardURL: lc.forwardURL,
ForwardThinURL: lc.forwardThinURL,
ForwardHeaders: lc.forwardHeaders,
ForwardConnectURL: lc.forwardConnectURL,
ForwardThinConnectURL: lc.forwardThinConnectURL,
ForwardConnectHeaders: lc.forwardConnectHeaders,
UseConfiguredWebhooks: lc.useConfiguredWebhooks,
WebSocketFeature: webhooksWebSocketFeature,
WebSocketFeatures: lc.getFeatures(),
PrintJSON: lc.printJSON,
UseLatestAPIVersion: lc.latestAPIVersion,
SkipVerify: lc.skipVerify,
Log: logger,
NoWSS: lc.noWSS,
Timeout: lc.timeout,
Events: lc.events,
ThinEvents: lc.thinEvents,
OutCh: proxyOutCh,
})
if err != nil {
Expand Down Expand Up @@ -251,6 +266,18 @@ func createVisitor(logger *log.Logger, format string, printJSON bool) *websocket
},
VisitData: func(de websocket.DataElement) error {
switch data := de.Data.(type) {
case proxy.V2EventPayload:

localTime := time.Now().Format(timeLayout)

color := ansi.Color(os.Stdout)
outputStr := fmt.Sprintf("%s --> %s [%s]",
color.Faint(localTime),
ansi.Linkify(ansi.Bold(data.Type), data.URLForEventType(), logger.Out),
ansi.Linkify(data.ID, data.URLForEventID(), logger.Out),
)
fmt.Println(outputStr)
return nil
case proxy.StripeEvent:
if strings.ToUpper(format) == outputFormatJSON || printJSON {
fmt.Println(de.Marshaled)
Expand All @@ -275,6 +302,14 @@ func createVisitor(logger *log.Logger, format string, printJSON bool) *websocket
case proxy.EndpointResponse:
event := data.Event
resp := data.Resp
v2Event := data.V2Event
var link string
if event != nil {
link = ansi.Linkify(event.ID, event.URLForEventID(), logger.Out)
} else if v2Event != nil {
// todo(@charliecruzan): Add link support once inspector supports v2 events
link = v2Event.ID
}
localTime := time.Now().Format(timeLayout)

color := ansi.Color(os.Stdout)
Expand All @@ -283,7 +318,7 @@ func createVisitor(logger *log.Logger, format string, printJSON bool) *websocket
ansi.ColorizeStatus(resp.StatusCode),
resp.Request.Method,
resp.Request.URL,
ansi.Linkify(event.ID, event.URLForEventID(), logger.Out),
link,
)
fmt.Println(outputStr)
return nil
Expand All @@ -293,3 +328,17 @@ func createVisitor(logger *log.Logger, format string, printJSON bool) *websocket
},
}
}

func (lc *listenCmd) getFeatures() []string {
features := []string{}

if len(lc.events) > 0 {
features = append(features, webhooksWebSocketFeature)
}

if len(lc.thinEvents) > 0 {
features = append(features, destinationsWebSocketFeature)
}

return features
}
2 changes: 1 addition & 1 deletion pkg/logtailing/tailer.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ func (t *Tailer) processRequestLogEvent(msg websocket.IncomingMessage) {
}

// at this point the message is valid so we can acknowledge it
ackMessage := websocket.NewEventAck(requestLogEvent.RequestLogID, "")
ackMessage := websocket.NewEventAck(requestLogEvent.RequestLogID, "", "")
t.webSocketClient.SendMessage(ackMessage)

// Don't show stripecli/sessions logs since they're generated by the CLI
Expand Down
2 changes: 1 addition & 1 deletion pkg/plugins/proto/main.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/plugins/proto/main_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

65 changes: 56 additions & 9 deletions pkg/proxy/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ type EndpointClient struct {

// Optional configuration parameters
cfg *EndpointConfig

isEventDestination bool
}

// SupportsEventType takes an event of a webhook and compares it to the internal
Expand All @@ -84,18 +86,28 @@ func (c *EndpointClient) SupportsEventType(connect bool, eventType string) bool
return false
}

// SupportsContext takes the context string of an event, and determines whether the endpoint supports
// this context
func (c *EndpointClient) SupportsContext(context string) bool {
if c.connect {
return context != ""
}

return context == ""
}

// Post sends a message to the local endpoint.
func (c *EndpointClient) Post(evtCtx eventContext, body string, headers map[string]string) error {
func (c *EndpointClient) Post(evtCtx eventContext) error {
c.cfg.Log.WithFields(log.Fields{
"prefix": "proxy.EndpointClient.Post",
}).Debug("Forwarding event to local endpoint")

req, err := http.NewRequest(http.MethodPost, c.URL, bytes.NewBuffer([]byte(body)))
req, err := http.NewRequest(http.MethodPost, c.URL, bytes.NewBuffer([]byte(evtCtx.requestBody)))
if err != nil {
return err
}

for k, v := range headers {
for k, v := range evtCtx.requestHeaders {
req.Header.Add(k, v)
}

Expand Down Expand Up @@ -123,12 +135,46 @@ func (c *EndpointClient) Post(evtCtx eventContext, body string, headers map[stri
return nil
}

// PostV2 sends a message to a local event destination
func (c *EndpointClient) PostV2(evtCtx eventContext) error {
req, err := http.NewRequest(http.MethodPost, c.URL, bytes.NewBuffer([]byte(evtCtx.requestBody)))
if err != nil {
return err
}

for k, v := range evtCtx.requestHeaders {
req.Header.Add(k, v)
}

// add custom headers
for k, v := range c.headers {
if strings.ToLower(k) == "host" {
req.Host = v
} else {
req.Header.Add(k, v)
}
}

resp, err := http.DefaultClient.Do(req)
if err != nil {
c.cfg.OutCh <- websocket.ErrorElement{
Error: FailedToPostError{Err: err},
}
return err
}
defer resp.Body.Close()

c.cfg.ResponseHandler.ProcessResponse(evtCtx, c.URL, resp)

return nil
}

//
// Public functions
//

// NewEndpointClient returns a new EndpointClient.
func NewEndpointClient(url string, headers []string, connect bool, events []string, cfg *EndpointConfig) *EndpointClient {
func NewEndpointClient(url string, headers []string, connect bool, events []string, isEventDestination bool, cfg *EndpointConfig) *EndpointClient {
if cfg == nil {
cfg = &EndpointConfig{}
}
Expand All @@ -151,11 +197,12 @@ func NewEndpointClient(url string, headers []string, connect bool, events []stri
}

return &EndpointClient{
URL: url,
headers: convertToMapAndSanitize(headers),
connect: connect,
events: convertToMap(events),
cfg: cfg,
URL: url,
headers: convertToMapAndSanitize(headers),
connect: connect,
events: convertToMap(events),
isEventDestination: isEventDestination,
cfg: cfg,
}
}

Expand Down
Loading

0 comments on commit 8d33a69

Please sign in to comment.