Skip to content

Commit

Permalink
feat: Implement webhooks library and perform webhook call on project …
Browse files Browse the repository at this point in the history
…event (#96)

* Init basic framework for webhooks

* Implement WebhookManager

* Modify mlp project service to integrate webhook

* Refactor interface name

* Move setup code into package

* Add documentation

* Add basic test cases

* Set webhook default method to POST

* Add retry, configuration to specify payload

* Refactor, add comments

* Add readme to explain webhook use cases

* Update mock clients

* Add test case for webhook validation

* Fix test cases

* Modify exposed methods, add unit tests in project_service

* Move mocks into test table runner

* Add missing param arsg in project_api_test.go

* Fix linting issues

* Fix lines issue

* Add noOp on success callback, fix formatting

* Clean up comments

* Address first round of comments

* Initialize async and sync webhooks at the start

* Fix failing test cases

* Add test, fix typo

* Remove unneccessary comment

* Remove webhooks from default config

* Address PR comments

* Use validator package, remove onError

* Fix broken test

* Update readme

* Fire async webhook in manager instead

* Use context in retry

* Add webhook for UpdateProject

* Add update project with webhook test

* Handle case where webhook event may not be set
  • Loading branch information
shydefoo authored Jun 5, 2024
1 parent 0256c00 commit 529cb48
Show file tree
Hide file tree
Showing 16 changed files with 1,421 additions and 16 deletions.
8 changes: 4 additions & 4 deletions api/api/projects_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func TestCreateProject(t *testing.T) {
_, err := prjRepository.Save(tC.existingProject)
assert.NoError(t, err)
}
projectService, err := service.NewProjectsService(mlflowTrackingURL, prjRepository, nil, false)
projectService, err := service.NewProjectsService(mlflowTrackingURL, prjRepository, nil, false, nil)
assert.NoError(t, err)

appCtx := &AppContext{
Expand Down Expand Up @@ -316,7 +316,7 @@ func TestListProjects(t *testing.T) {
assert.NoError(t, err)
}
}
projectService, err := service.NewProjectsService(mlflowTrackingURL, prjRepository, nil, false)
projectService, err := service.NewProjectsService(mlflowTrackingURL, prjRepository, nil, false, nil)
assert.NoError(t, err)

appCtx := &AppContext{
Expand Down Expand Up @@ -476,7 +476,7 @@ func TestUpdateProject(t *testing.T) {
_, err := prjRepository.Save(tC.existingProject)
assert.NoError(t, err)
}
projectService, err := service.NewProjectsService(mlflowTrackingURL, prjRepository, nil, false)
projectService, err := service.NewProjectsService(mlflowTrackingURL, prjRepository, nil, false, nil)
assert.NoError(t, err)

appCtx := &AppContext{
Expand Down Expand Up @@ -595,7 +595,7 @@ func TestGetProject(t *testing.T) {
_, err := prjRepository.Save(tC.existingProject)
assert.NoError(t, err)
}
projectService, err := service.NewProjectsService(mlflowTrackingURL, prjRepository, nil, false)
projectService, err := service.NewProjectsService(mlflowTrackingURL, prjRepository, nil, false, nil)
assert.NoError(t, err)

appCtx := &AppContext{
Expand Down
11 changes: 10 additions & 1 deletion api/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/caraml-dev/mlp/api/pkg/authz/enforcer"
"github.com/caraml-dev/mlp/api/pkg/instrumentation/newrelic"
"github.com/caraml-dev/mlp/api/pkg/secretstorage"
"github.com/caraml-dev/mlp/api/pkg/webhooks"
"github.com/caraml-dev/mlp/api/repository"
"github.com/caraml-dev/mlp/api/service"
"github.com/caraml-dev/mlp/api/validation"
Expand Down Expand Up @@ -61,11 +62,19 @@ func NewAppContext(db *gorm.DB, cfg *config.Config) (ctx *AppContext, err error)
return nil, fmt.Errorf("failed to initialize applications service: %v", err)
}

var projectsWebhookManager webhooks.WebhookManager
if cfg.Webhooks != nil && cfg.Webhooks.Enabled {
projectsWebhookManager, err = webhooks.InitializeWebhooks(cfg.Webhooks, service.EventList)
if err != nil {
return nil, fmt.Errorf("failed to initialize projects webhook manager: %v", err)
}
}

projectsService, err := service.NewProjectsService(
cfg.Mlflow.TrackingURL,
repository.NewProjectRepository(db),
authEnforcer,
cfg.Authorization.Enabled)
cfg.Authorization.Enabled, projectsWebhookManager)

if err != nil {
return nil, fmt.Errorf("failed to initialize projects service: %v", err)
Expand Down
2 changes: 2 additions & 0 deletions api/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/caraml-dev/mlp/api/models"
modelsv2 "github.com/caraml-dev/mlp/api/models/v2"
"github.com/caraml-dev/mlp/api/pkg/webhooks"
)

type Config struct {
Expand All @@ -33,6 +34,7 @@ type Config struct {
Mlflow *MlflowConfig `validate:"required"`
DefaultSecretStorage *SecretStorage `validate:"required"`
UI *UIConfig
Webhooks *webhooks.Config
}

// SecretStorage represents the configuration for a secret storage.
Expand Down
147 changes: 147 additions & 0 deletions api/pkg/webhooks/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
# Webhooks

- The package is meant to be used across caraml components (e.g. merlin, turing, mlp) to call webhooks when specific events occur.
- The package contains the webhook client implementation and abstracts the logic from the user. It provides some helper functions for the user to call in their code when specific events occur.
- The payload to the webhook server and the response can be arbitrary, and it is up to the user to choose what payload to send to the webhook server(s), but only 1 response will be used in the callback

### How to use?

1. In the caller package (eg, mlp, merlin), define the list of events that requires webhooks. For example:

```go
const (
ProjectCreatedEvent wh.EventType = "OnProjectCreated"
ProjectUpdatedEvent wh.EventType = "OnProjectUpdated"
)

var EventList = []wh.EventType{
ProjectCreatedEvent,
ProjectUpdatedEvent,
}
```

2. Define the event to webhook configuration. Optionally, the configuration can be provided in a yaml file and parsed via the `Config` struct. In the config file, define the event to webhook mapping for those events as required. For example, if projects need extra labels from an external source, we define the webhook config for the `OnProjectCreated` event

```yaml
webhooks:
enabled: true
config:
OnProjectCreated:
- url: http://localhost:8081/project_created
method: POST
finalResponse: true
name: webhook1
```
3. Call InitializeWebhooks() to get a WebhookManager instance.
This method will initialize the webhook clients for each event type based on the mapping provided
```go
projectsWebhookManager, err := webhooks.InitializeWebhooks(cfg.Webhooks, service.EventList)
```

4. Call

```go
InvokeWebhooks(context.Context, EventType, payload interface{}, onSuccess func([]byte) error, onError func(error) error) error
```

method in the caller code based on the event.

#### Optional webhooks events

In the event that there are multiple events to be configured, for example `OnProjectCreated` and `OnProjectUpdated`, and only `OnProjectCreated` webhooks should be fired, use the `IsEventConfigured()` method provided by the `WebhookManager` to check if the event is set before calling `InvokeWebhooks()`

For example:

```go
if webhookManager == nil || !webhookManager.IsEventConfigured(ProjectUpdatedEvent) {
// do step if webhooks disabled, or event not set
...
} else {
err := webhookManager.InvokeWebhooks(ctx, ProjectUpdatedEvent, project, func(p []byte) error {
// onSuccess steps
...
}, webhooks.NoOpErrorHandler)
}
```

### Single Webhook Configuration

```yaml
webhooks:
enabled: true
config:
OnProjectCreated:
- name: webhook1
url: http://webhook1
method: POST
finalResponse: true
```
- This configuration is the most straight forward. It configures 1 webhook client to be called when the `OnProjectCreated` event happens.
- The payload to the webhook is the json payload of the `payload` argument passed to `InvokeWebhooks`.
- The response from this webhook is used as the final response to the callback passed to the `onSuccess` argument.

### Multiple Webhooks use case

- The library supports multiple webhooks per event to a certain extent.

#### Use case 1

- sync and async webhook
- This can be specified by:

```yaml
webhooks:
enabled: true
config:
OnProjectCreated:
- name: webhook1
url: http://webhook1
method: POST
finalResponse: true
- name: webhook2
url: http://webhook2
method: POST
async: true
```

- The async webhook2 will be called only after webhook1 completes.
- If there are multiple sync and async webhooks, the async webhooks will be called only after all sync webhooks have completed.

#### Use case 2

- 3 sync clients, where the response of the first webhook is used as the payload for the second webhook.
- This can be specified by:

```yaml
webhooks:
enabled: true
config:
OnProjectCreated:
- url: http://webhook1
method: POST
finalResponse: true
name: webhook1
- url: http://webhook2
method: POST
useDataFrom: webhook1 # <-- specify to use data from webhook1
name: webhook2
- url: http://webhook3
method: POST
name: webhook3
```

- The order of webhook matters, webhook1 will be called before webhook2. If webhook2 is defined before webhook1 but uses the response from webhook1, there will be a validation error on initialization.
- Since `useDataFrom` for webhook1 is not set, webhook1 uses the original payload passed to `InvokeWebhooks` function.
- webhook2 will use the response from webhook1 as its payload. The response from webhook2 is not used.
- webhook3 will use the same payload as webhook1, but will only be called after webhook2
- Here, the finalResponse is set to true for webhook1. This means that the response from webhook1 will be passed as an argument to the `onSuccess` function

### Error Handling

- For synchronous webhooks, all webhooks must be successful before the `onSuccess` handler is called. This means that the caller of this package
only needs to consider how to handle the successful response.
- In the event any sync webhooks fail, the `onError` handler is called
- For webhooks that do not need to succeed (for whatever reason), pass them as async webhooks.
155 changes: 155 additions & 0 deletions api/pkg/webhooks/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
// package webhooks provides a webhook manager that can be used to invoke webhooks for different events.

/*
Usage:
1. In the caller package (eg, mlp, merlin), define the list of events that requires webhooks. For example:
```go
const (
ProjectCreatedEvent wh.EventType = "OnProjectCreated"
ProjectUpdatedEvent wh.EventType = "OnProjectUpdated"
)
var EventList = []wh.EventType{
ProjectCreatedEvent,
ProjectUpdatedEvent,
}
```
2. Define the event to webhook configuration. Optionally, the configuration can be provided in a yaml file
and parsed via the `Config` struct.
In the config file, define the event to webhook mapping for those events as required.
For example, if projects need extra labels from an external source,
we define the webhook config for the `OnProjectCreated` event
```go
webhooks:
enabled: true
config:
OnProjectCreated:
- url: http://localhost:8081/project_created
method: POST
onError: abort
```
3. Call InitializeWebhooks() to get a WebhookManager instance.
This method will initialize the webhook clients for each event type based on the mapping provided
```go
projectsWebhookManager, err := webhooks.InitializeWebhooks(cfg.Webhooks, service.EventList)
```
4. Call `InvokeWebhooks()` method in the caller code based on the event
*/

package webhooks

import (
"bytes"
"context"
"fmt"
"io"
"net/http"
"time"

"github.com/avast/retry-go/v4"
"github.com/caraml-dev/mlp/api/log"
"github.com/go-playground/validator/v10"
)

type EventType string
type ServiceType string

type WebhookClient interface {
Invoke(context.Context, []byte) ([]byte, error)
IsAsync() bool
IsFinalResponse() bool
GetUseDataFrom() string
GetName() string
}

type simpleWebhookClient struct {
WebhookConfig
}

func NoOpErrorHandler(err error) error { return err }
func NoOpCallback([]byte) error { return nil }

func (g *simpleWebhookClient) Invoke(ctx context.Context, payload []byte) ([]byte, error) {
// create http request to webhook
var content []byte
err := retry.Do(
func() error {
client := http.Client{
Timeout: time.Duration(*g.Timeout) * time.Second,
}
req, err := http.NewRequestWithContext(ctx, g.Method, g.URL, bytes.NewBuffer(payload))
// TODO: Add option for authentication headers
if err != nil {
return err
}
resp, err := client.Do(req)
if err != nil {
log.Errorf("Error making client request %s", err)
return err
}
defer resp.Body.Close()
content, err = io.ReadAll(resp.Body)
if err != nil {
return err
}
if err := validateWebhookResponse(content); err != nil {
return err
}
// check http status code
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("response status code %d not 200", resp.StatusCode)
}
return nil

}, retry.Attempts(uint(g.NumRetries)), retry.Context(ctx),
)
if err != nil {
return nil, err
}
return content, nil
}

func (g *simpleWebhookClient) IsAsync() bool {
return g.Async
}

func (g *simpleWebhookClient) IsFinalResponse() bool {
return g.FinalResponse
}

func (g *simpleWebhookClient) GetUseDataFrom() string {
return g.UseDataFrom
}

func (g *simpleWebhookClient) GetName() string {
return g.Name
}

func validateWebhookConfig(webhookConfig *WebhookConfig) error {
validate := validator.New()

err := validate.Struct(webhookConfig)
if err != nil {
return fmt.Errorf("failed to validate configuration: %s", err)
}
if webhookConfig.NumRetries < 0 {
return fmt.Errorf("numRetries must be a non-negative integer")
}
return nil
}

func setDefaults(webhookConfig *WebhookConfig) {
if webhookConfig.Method == "" {
webhookConfig.Method = http.MethodPost // Default to POST, TODO: decide if GET is allowed
}
if webhookConfig.Timeout == nil {
def := 10
webhookConfig.Timeout = &def
}
}
Loading

0 comments on commit 529cb48

Please sign in to comment.