Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement webhooks library and perform webhook call on project event #96

Merged
merged 36 commits into from
Jun 5, 2024
Merged
Show file tree
Hide file tree
Changes from 33 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
056054f
Init basic framework for webhooks
shydefoo May 13, 2024
2860e60
Implement WebhookManager
shydefoo May 13, 2024
6e35d98
Modify mlp project service to integrate webhook
shydefoo May 14, 2024
ff39652
Refactor interface name
shydefoo May 15, 2024
6dfa420
Move setup code into package
shydefoo May 15, 2024
2a445cb
Add documentation
shydefoo May 15, 2024
cd17a8c
Add basic test cases
shydefoo May 15, 2024
1c76313
Set webhook default method to POST
shydefoo May 16, 2024
96c7921
Add retry, configuration to specify payload
shydefoo May 16, 2024
61e071a
Refactor, add comments
shydefoo May 16, 2024
df35d10
Add readme to explain webhook use cases
shydefoo May 16, 2024
137992f
Update mock clients
shydefoo May 16, 2024
570540e
Add test case for webhook validation
shydefoo May 16, 2024
994c431
Fix test cases
shydefoo May 16, 2024
a8824fe
Modify exposed methods, add unit tests in project_service
shydefoo May 17, 2024
4040740
Move mocks into test table runner
shydefoo May 17, 2024
368fd35
Add missing param arsg in project_api_test.go
shydefoo May 17, 2024
56ac7f3
Fix linting issues
shydefoo May 17, 2024
ec1659c
Fix lines issue
shydefoo May 17, 2024
e22b751
Add noOp on success callback, fix formatting
shydefoo May 17, 2024
d567cd3
Clean up comments
shydefoo May 20, 2024
709f6de
Address first round of comments
shydefoo May 28, 2024
4e159e5
Initialize async and sync webhooks at the start
shydefoo May 28, 2024
d58ec3a
Fix failing test cases
shydefoo May 28, 2024
291b291
Add test, fix typo
shydefoo May 28, 2024
5af7d9b
Remove unneccessary comment
shydefoo May 28, 2024
8ce4d35
Remove webhooks from default config
shydefoo May 28, 2024
f9f9350
Address PR comments
shydefoo Jun 4, 2024
720ccf0
Use validator package, remove onError
shydefoo Jun 4, 2024
82de64e
Fix broken test
shydefoo Jun 4, 2024
6d0c96f
Update readme
shydefoo Jun 4, 2024
abfe2eb
Fire async webhook in manager instead
shydefoo Jun 4, 2024
f83c975
Use context in retry
shydefoo Jun 4, 2024
72aaef7
Add webhook for UpdateProject
shydefoo Jun 5, 2024
5f8ef30
Add update project with webhook test
shydefoo Jun 5, 2024
e94db85
Handle case where webhook event may not be set
shydefoo Jun 5, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions api/api/projects_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func TestCreateProject(t *testing.T) {
_, err := prjRepository.Save(tC.existingProject)
assert.NoError(t, err)
}
projectService, err := service.NewProjectsService(mlflowTrackingURL, prjRepository, nil, false)
projectService, err := service.NewProjectsService(mlflowTrackingURL, prjRepository, nil, false, nil)
assert.NoError(t, err)

appCtx := &AppContext{
Expand Down Expand Up @@ -316,7 +316,7 @@ func TestListProjects(t *testing.T) {
assert.NoError(t, err)
}
}
projectService, err := service.NewProjectsService(mlflowTrackingURL, prjRepository, nil, false)
projectService, err := service.NewProjectsService(mlflowTrackingURL, prjRepository, nil, false, nil)
assert.NoError(t, err)

appCtx := &AppContext{
Expand Down Expand Up @@ -476,7 +476,7 @@ func TestUpdateProject(t *testing.T) {
_, err := prjRepository.Save(tC.existingProject)
assert.NoError(t, err)
}
projectService, err := service.NewProjectsService(mlflowTrackingURL, prjRepository, nil, false)
projectService, err := service.NewProjectsService(mlflowTrackingURL, prjRepository, nil, false, nil)
assert.NoError(t, err)

appCtx := &AppContext{
Expand Down Expand Up @@ -595,7 +595,7 @@ func TestGetProject(t *testing.T) {
_, err := prjRepository.Save(tC.existingProject)
assert.NoError(t, err)
}
projectService, err := service.NewProjectsService(mlflowTrackingURL, prjRepository, nil, false)
projectService, err := service.NewProjectsService(mlflowTrackingURL, prjRepository, nil, false, nil)
assert.NoError(t, err)

appCtx := &AppContext{
Expand Down
11 changes: 10 additions & 1 deletion api/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/caraml-dev/mlp/api/pkg/authz/enforcer"
"github.com/caraml-dev/mlp/api/pkg/instrumentation/newrelic"
"github.com/caraml-dev/mlp/api/pkg/secretstorage"
"github.com/caraml-dev/mlp/api/pkg/webhooks"
"github.com/caraml-dev/mlp/api/repository"
"github.com/caraml-dev/mlp/api/service"
"github.com/caraml-dev/mlp/api/validation"
Expand Down Expand Up @@ -61,11 +62,19 @@ func NewAppContext(db *gorm.DB, cfg *config.Config) (ctx *AppContext, err error)
return nil, fmt.Errorf("failed to initialize applications service: %v", err)
}

var projectsWebhookManager webhooks.WebhookManager
if cfg.Webhooks != nil && cfg.Webhooks.Enabled {
projectsWebhookManager, err = webhooks.InitializeWebhooks(cfg.Webhooks, service.EventList)
if err != nil {
return nil, fmt.Errorf("failed to initialize projects webhook manager: %v", err)
}
}

projectsService, err := service.NewProjectsService(
cfg.Mlflow.TrackingURL,
repository.NewProjectRepository(db),
authEnforcer,
cfg.Authorization.Enabled)
cfg.Authorization.Enabled, projectsWebhookManager)
mbruner marked this conversation as resolved.
Show resolved Hide resolved

if err != nil {
return nil, fmt.Errorf("failed to initialize projects service: %v", err)
Expand Down
2 changes: 2 additions & 0 deletions api/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/caraml-dev/mlp/api/models"
modelsv2 "github.com/caraml-dev/mlp/api/models/v2"
"github.com/caraml-dev/mlp/api/pkg/webhooks"
)

type Config struct {
Expand All @@ -33,6 +34,7 @@ type Config struct {
Mlflow *MlflowConfig `validate:"required"`
DefaultSecretStorage *SecretStorage `validate:"required"`
UI *UIConfig
Webhooks *webhooks.Config
}

// SecretStorage represents the configuration for a secret storage.
Expand Down
129 changes: 129 additions & 0 deletions api/pkg/webhooks/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
# Webhooks

- The package is meant to be used across caraml components (e.g. merlin, turing, mlp) to call webhooks when specific events occur.
- The package contains the webhook client implementation and abstracts the logic from the user. It provides some helper functions for the user to call in their code when specific events occur.
- The payload to the webhook server and the response can be arbitrary, and it is up to the user to choose what payload to send to the webhook server(s), but only 1 response will be used in the callback

### How to use?

1. In the caller package (eg, mlp, merlin), define the list of events that requires webhooks. For example:

```go
const (
ProjectCreatedEvent wh.EventType = "OnProjectCreated"
ProjectUpdatedEvent wh.EventType = "OnProjectUpdated"
)

var EventList = []wh.EventType{
ProjectCreatedEvent,
ProjectUpdatedEvent,
}
```

2. Define the event to webhook configuration. Optionally, the configuration can be provided in a yaml file and parsed via the `Config` struct. In the config file, define the event to webhook mapping for those events as required. For example, if projects need extra labels from an external source, we define the webhook config for the `OnProjectCreated` event

```yaml
webhooks:
enabled: true
config:
OnProjectCreated:
- url: http://localhost:8081/project_created
method: POST
finalResponse: true
leonlnj marked this conversation as resolved.
Show resolved Hide resolved
name: webhook1
```

3. Call InitializeWebhooks() to get a WebhookManager instance.
This method will initialize the webhook clients for each event type based on the mapping provided

```go
projectsWebhookManager, err := webhooks.InitializeWebhooks(cfg.Webhooks, service.EventList)
```

4. Call

```go
InvokeWebhooks(context.Context, EventType, payload interface{}, onSuccess func([]byte) error, onError func(error) error) error
```

method in the caller code based on the event.

### Single Webhook Configuration

```yaml
webhooks:
enabled: true
config:
OnProjectCreated:
- name: webhook1
url: http://webhook1
method: POST
finalResponse: true
```

- This configuration is the most straight forward. It configures 1 webhook client to be called when the `OnProjectCreated` event happens.
- The payload to the webhook is the json payload of the `payload` argument passed to `InvokeWebhooks`.
- The response from this webhook is used as the final response to the callback passed to the `onSuccess` argument.

### Multiple Webhooks use case

- The library supports multiple webhooks per event to a certain extent.

#### Use case 1

- sync and async webhook
- This can be specified by:

```yaml
webhooks:
enabled: true
config:
OnProjectCreated:
- name: webhook1
url: http://webhook1
method: POST
finalResponse: true
- name: webhook2
url: http://webhook2
method: POST
async: true
```

- The async webhook2 will be called only after webhook1 completes.
- If there are multiple sync and async webhooks, the async webhooks will be called only after all sync webhooks have completed.

#### Use case 2

- 3 sync clients, where the response of the first webhook is used as the payload for the second webhook.
- This can be specified by:

```yaml
webhooks:
enabled: true
config:
OnProjectCreated:
- url: http://webhook1
method: POST
finalResponse: true
name: webhook1
- url: http://webhook2
method: POST
useDataFrom: webhook1 # <-- specify to use data from webhook1
name: webhook2
- url: http://webhook3
method: POST
name: webhook3
```

- The order of webhook matters, webhook1 will be called before webhook2. If webhook2 is defined before webhook1 but uses the response from webhook1, there will be a validation error on initialization.
- Since `useDataFrom` for webhook1 is not set, webhook1 uses the original payload passed to `InvokeWebhooks` function.
- webhook2 will use the response from webhook1 as its payload. The response from webhook2 is not used.
- webhook3 will use the same payload as webhook1, but will only be called after webhook2
- Here, the finalResponse is set to true for webhook1. This means that the response from webhook1 will be passed as an argument to the `onSuccess` function

### Error Handling

- For synchronous webhooks, all webhooks must be successful before the `onSuccess` handler is called. This means that the caller of this package
only needs to consider how to handle the successful response.
- In the event any sync webhooks fail, the `onError` handler is called
- For webhooks that do not need to succeed (for whatever reason), pass them as async webhooks.
155 changes: 155 additions & 0 deletions api/pkg/webhooks/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
// package webhooks provides a webhook manager that can be used to invoke webhooks for different events.

/*
Usage:
1. In the caller package (eg, mlp, merlin), define the list of events that requires webhooks. For example:

```go
const (
ProjectCreatedEvent wh.EventType = "OnProjectCreated"
ProjectUpdatedEvent wh.EventType = "OnProjectUpdated"
)

var EventList = []wh.EventType{
ProjectCreatedEvent,
ProjectUpdatedEvent,
}
```

2. Define the event to webhook configuration. Optionally, the configuration can be provided in a yaml file
and parsed via the `Config` struct.
In the config file, define the event to webhook mapping for those events as required.
For example, if projects need extra labels from an external source,
we define the webhook config for the `OnProjectCreated` event

```go
webhooks:
enabled: true
config:
OnProjectCreated:
- url: http://localhost:8081/project_created
method: POST
onError: abort
```

3. Call InitializeWebhooks() to get a WebhookManager instance.
This method will initialize the webhook clients for each event type based on the mapping provided

```go
projectsWebhookManager, err := webhooks.InitializeWebhooks(cfg.Webhooks, service.EventList)
```

4. Call `InvokeWebhooks()` method in the caller code based on the event
*/

package webhooks

import (
"bytes"
"context"
"fmt"
"io"
"net/http"
"time"

"github.com/avast/retry-go/v4"
"github.com/caraml-dev/mlp/api/log"
"github.com/go-playground/validator/v10"
)

type EventType string
type ServiceType string

type WebhookClient interface {
Invoke(context.Context, []byte) ([]byte, error)
IsAsync() bool
IsFinalResponse() bool
GetUseDataFrom() string
GetName() string
}

type simpleWebhookClient struct {
leonlnj marked this conversation as resolved.
Show resolved Hide resolved
WebhookConfig
}
deadlycoconuts marked this conversation as resolved.
Show resolved Hide resolved

func NoOpErrorHandler(err error) error { return err }
mbruner marked this conversation as resolved.
Show resolved Hide resolved
func NoOpCallback([]byte) error { return nil }

func (g *simpleWebhookClient) Invoke(ctx context.Context, payload []byte) ([]byte, error) {
// create http request to webhook
var content []byte
err := retry.Do(
func() error {
client := http.Client{
Timeout: time.Duration(*g.Timeout) * time.Second,
}
req, err := http.NewRequestWithContext(ctx, g.Method, g.URL, bytes.NewBuffer(payload))
leonlnj marked this conversation as resolved.
Show resolved Hide resolved
// TODO: Add option for authentication headers
if err != nil {
return err
}
resp, err := client.Do(req)
if err != nil {
log.Errorf("Error making client request %s", err)
return err
}
defer resp.Body.Close()
content, err = io.ReadAll(resp.Body)
if err != nil {
return err
}
if err := validateWebhookResponse(content); err != nil {
leonlnj marked this conversation as resolved.
Show resolved Hide resolved
return err
}
// check http status code
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("response status code %d not 200", resp.StatusCode)
}
return nil

}, retry.Attempts(uint(g.NumRetries)), retry.Context(ctx),
)
if err != nil {
return nil, err
}
return content, nil
}

func (g *simpleWebhookClient) IsAsync() bool {
return g.Async
}

func (g *simpleWebhookClient) IsFinalResponse() bool {
return g.FinalResponse
}

func (g *simpleWebhookClient) GetUseDataFrom() string {
return g.UseDataFrom
}

func (g *simpleWebhookClient) GetName() string {
return g.Name
}

func validateWebhookConfig(webhookConfig *WebhookConfig) error {
leonlnj marked this conversation as resolved.
Show resolved Hide resolved
validate := validator.New()

err := validate.Struct(webhookConfig)
if err != nil {
return fmt.Errorf("failed to validate configuration: %s", err)
}
if webhookConfig.NumRetries < 0 {
return fmt.Errorf("numRetries must be a non-negative integer")
}
return nil
}

func setDefaults(webhookConfig *WebhookConfig) {
if webhookConfig.Method == "" {
webhookConfig.Method = http.MethodPost // Default to POST, TODO: decide if GET is allowed
}
if webhookConfig.Timeout == nil {
def := 10
webhookConfig.Timeout = &def
}
}
31 changes: 31 additions & 0 deletions api/pkg/webhooks/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package webhooks

type WebhookType string

const (
Async WebhookType = "async"
Sync WebhookType = "sync"
)

// Config is a helper struct to define the webhook config in a configuration file
type Config struct {
Enabled bool
Config map[EventType][]WebhookConfig `validate:"required_if=Enabled True"`
}

// WebhookConfig struct is the configuration for each webhook to be called
type WebhookConfig struct {
Name string `yaml:"name" validate:"required"`
URL string `yaml:"url" validate:"required,url"`
Method string `yaml:"method"`
AuthEnabled bool `yaml:"authEnabled"`
AuthToken string `yaml:"authToken" validate:"required_if=AuthEnabled True"`
Async bool `yaml:"async"`
NumRetries int `yaml:"numRetries"`
Timeout *int `yaml:"timeout"`
// UseDataFrom is the name of the webhook whose response will be used as input to this webhook
UseDataFrom string `yaml:"useDataFrom"`
leonlnj marked this conversation as resolved.
Show resolved Hide resolved

// FinalResponse can be set to use the response from this webhook to the onSuccess callback function
FinalResponse bool `yaml:"finalResponse"`
}
Loading
Loading