Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flyte webhooks #4431

Draft
wants to merge 44 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
1fb5f4b
wip
pingsutw May 19, 2023
0d4bfa6
wip
pingsutw May 19, 2023
513fbad
wip
pingsutw May 22, 2023
d334e49
wip
pingsutw May 23, 2023
d5a48f7
add log
pingsutw May 23, 2023
126f777
update webhook url
pingsutw May 23, 2023
cfc33fc
wip
pingsutw May 23, 2023
c5442dd
wip
pingsutw May 23, 2023
c75707a
wip
pingsutw May 23, 2023
a646c2f
wip
pingsutw May 23, 2023
d071e75
wip
pingsutw May 23, 2023
a37209f
wip
pingsutw May 24, 2023
e498539
wip
pingsutw May 24, 2023
bb05d3c
wip
pingsutw May 24, 2023
eccd859
wip
pingsutw May 24, 2023
31d11a4
wip
pingsutw May 24, 2023
c966a24
wip
pingsutw May 24, 2023
e0b14d2
webhook poc
pingsutw May 26, 2023
699adce
init
pingsutw Jun 21, 2023
1b3e896
nit
pingsutw Jun 21, 2023
206cefc
More tests
pingsutw Jun 21, 2023
83d7c3a
bump idl
pingsutw Jul 5, 2023
bb703aa
Merged master
pingsutw Jul 5, 2023
cd07b2a
nit
pingsutw Jul 5, 2023
4343654
nit
pingsutw Jul 6, 2023
96e663d
make generate
pingsutw Jul 6, 2023
6cee8cb
lint
pingsutw Jul 6, 2023
412f8cc
lint
pingsutw Jul 6, 2023
e9ab3cf
Merge branch 'master' of github.com:flyteorg/flyteadmin into webhook-v2
pingsutw Jul 19, 2023
99c39c0
Read webhook url from secret
pingsutw Jul 19, 2023
868ce14
add gcp processor
pingsutw Jul 20, 2023
fcfe202
lint
pingsutw Jul 21, 2023
6330314
Fixed tests
pingsutw Jul 21, 2023
d6d5343
nit
pingsutw Jul 21, 2023
a3828c8
lint
pingsutw Jul 21, 2023
0cd80f9
merged master
pingsutw Aug 14, 2023
b754cf8
bump idl
pingsutw Aug 21, 2023
25c3a0f
bump idl
pingsutw Aug 21, 2023
89fba4b
Transfer commits
eapolinario Oct 3, 2023
0f34110
init flyte webhook
pingsutw Nov 15, 2023
664c370
merged master
pingsutw Nov 15, 2023
3cd0cc0
update idl
pingsutw Nov 15, 2023
bcb4e89
fix compile
pingsutw Nov 15, 2023
70a28b3
fix-compile
pingsutw Nov 15, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
182 changes: 0 additions & 182 deletions flyteadmin/go.sum

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions flyteadmin/pkg/async/notifications/email.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ var getTemplateValueFuncs = map[string]GetTemplateValue{
launchPlanVersion: getLaunchPlanVersion,
}

func substituteEmailParameters(message string, request admin.WorkflowExecutionEventRequest, execution *admin.Execution) string {
func SubstituteParameters(message string, request admin.WorkflowExecutionEventRequest, execution *admin.Execution) string {
for template, function := range getTemplateValueFuncs {
message = strings.Replace(message, fmt.Sprintf(substitutionParam, template), function(request, execution), replaceAllInstances)
message = strings.Replace(message, fmt.Sprintf(substitutionParamNoSpaces, template), function(request, execution), replaceAllInstances)
Expand All @@ -117,9 +117,9 @@ func ToEmailMessageFromWorkflowExecutionEvent(
execution *admin.Execution) *admin.EmailMessage {

return &admin.EmailMessage{
SubjectLine: substituteEmailParameters(config.NotificationsEmailerConfig.Subject, request, execution),
SubjectLine: SubstituteParameters(config.NotificationsEmailerConfig.Subject, request, execution),
SenderEmail: config.NotificationsEmailerConfig.Sender,
RecipientsEmail: emailNotification.GetRecipientsEmail(),
Body: substituteEmailParameters(config.NotificationsEmailerConfig.Body, request, execution),
Body: SubstituteParameters(config.NotificationsEmailerConfig.Body, request, execution),
}
}
8 changes: 4 additions & 4 deletions flyteadmin/pkg/async/notifications/email_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,14 @@ func TestSubstituteEmailParameters(t *testing.T) {
},
}
assert.Equal(t, "{{ unused }}. {{project }} and prod and e124 ended up in succeeded.",
substituteEmailParameters(message, request, workflowExecution))
SubstituteParameters(message, request, workflowExecution))
request.Event.OutputResult = &event.WorkflowExecutionEvent_Error{
Error: &core.ExecutionError{
Message: "uh-oh",
},
}
assert.Equal(t, "{{ unused }}. {{project }} and prod and e124 ended up in succeeded. The execution failed with error: [uh-oh].",
substituteEmailParameters(message, request, workflowExecution))
SubstituteParameters(message, request, workflowExecution))
}

func TestSubstituteAllTemplates(t *testing.T) {
Expand Down Expand Up @@ -95,7 +95,7 @@ func TestSubstituteAllTemplates(t *testing.T) {
},
}
assert.Equal(t, strings.Join(desiredResult, ","),
substituteEmailParameters(strings.Join(messageTemplate, ","), request, workflowExecution))
SubstituteParameters(strings.Join(messageTemplate, ","), request, workflowExecution))
}

func TestSubstituteAllTemplatesNoSpaces(t *testing.T) {
Expand Down Expand Up @@ -124,7 +124,7 @@ func TestSubstituteAllTemplatesNoSpaces(t *testing.T) {
},
}
assert.Equal(t, strings.Join(desiredResult, ","),
substituteEmailParameters(strings.Join(messageTemplate, ","), request, workflowExecution))
SubstituteParameters(strings.Join(messageTemplate, ","), request, workflowExecution))
}

func TestToEmailMessageFromWorkflowExecutionEvent(t *testing.T) {
Expand Down
97 changes: 21 additions & 76 deletions flyteadmin/pkg/async/notifications/implementations/aws_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package implementations

import (
"context"
"encoding/base64"
"encoding/json"
"time"

"github.com/NYTimes/gizmo/pubsub"
Expand All @@ -18,12 +16,11 @@ import (

// TODO: Add a counter that encompasses the publisher stats grouped by project and domain.
type Processor struct {
sub pubsub.Subscriber
email interfaces.Emailer
systemMetrics processorSystemMetrics
email interfaces.Emailer
interfaces.BaseProcessor
}

// Currently only email is the supported notification because slack and pagerduty both use
// StartProcessing Currently only email is the supported notification because slack and pagerduty both use
// email client to trigger those notifications.
// When Pagerduty and other notifications are supported, a publisher per type should be created.
func (p *Processor) StartProcessing() {
Expand All @@ -38,102 +35,50 @@ func (p *Processor) StartProcessing() {
func (p *Processor) run() error {
var emailMessage admin.EmailMessage
var err error
for msg := range p.sub.Start() {
p.systemMetrics.MessageTotal.Inc()
// Currently this is safe because Gizmo takes a string and casts it to a byte array.
for msg := range p.Sub.Start() {
p.SystemMetrics.MessageTotal.Inc()
stringMsg := string(msg.Message())

var snsJSONFormat map[string]interface{}

// At Lyft, SNS populates SQS. This results in the message body of SQS having the SNS message format.
// The message format is documented here: https://docs.aws.amazon.com/sns/latest/dg/sns-message-and-json-formats.html
// The notification published is stored in the message field after unmarshalling the SQS message.
if err := json.Unmarshal(msg.Message(), &snsJSONFormat); err != nil {
p.systemMetrics.MessageDecodingError.Inc()
logger.Errorf(context.Background(), "failed to unmarshall JSON message [%s] from processor with err: %v", stringMsg, err)
p.markMessageDone(msg)
continue
}

var value interface{}
var ok bool
var valueString string

if value, ok = snsJSONFormat["Message"]; !ok {
logger.Errorf(context.Background(), "failed to retrieve message from unmarshalled JSON object [%s]", stringMsg)
p.systemMetrics.MessageDataError.Inc()
p.markMessageDone(msg)
_, messageByte, ok := p.FromSQSMessage(msg)
if !ok {
continue
}

if valueString, ok = value.(string); !ok {
p.systemMetrics.MessageDataError.Inc()
logger.Errorf(context.Background(), "failed to retrieve notification message (in string format) from unmarshalled JSON object for message [%s]", stringMsg)
p.markMessageDone(msg)
continue
}

// The Publish method for SNS Encodes the notification using Base64 then stringifies it before
// setting that as the message body for SNS. Do the inverse to retrieve the notification.
notificationBytes, err := base64.StdEncoding.DecodeString(valueString)
if err != nil {
logger.Errorf(context.Background(), "failed to Base64 decode from message string [%s] from message [%s] with err: %v", valueString, stringMsg, err)
p.systemMetrics.MessageDecodingError.Inc()
p.markMessageDone(msg)
continue
}

if err = proto.Unmarshal(notificationBytes, &emailMessage); err != nil {
logger.Debugf(context.Background(), "failed to unmarshal to notification object from decoded string[%s] from message [%s] with err: %v", valueString, stringMsg, err)
p.systemMetrics.MessageDecodingError.Inc()
p.markMessageDone(msg)
if err = proto.Unmarshal(messageByte, &emailMessage); err != nil {
logger.Debugf(context.Background(), "failed to unmarshal to notification object from decoded string from message [%s] with err: %v", stringMsg, err)
p.SystemMetrics.MessageDecodingError.Inc()
p.MarkMessageDone(msg)
continue
}

if err = p.email.SendEmail(context.Background(), emailMessage); err != nil {
p.systemMetrics.MessageProcessorError.Inc()
p.SystemMetrics.MessageProcessorError.Inc()
logger.Errorf(context.Background(), "Error sending an email message for message [%s] with emailM with err: %v", emailMessage.String(), err)
} else {
p.systemMetrics.MessageSuccess.Inc()
p.SystemMetrics.MessageSuccess.Inc()
}

p.markMessageDone(msg)

p.MarkMessageDone(msg)
}

// According to https://github.com/NYTimes/gizmo/blob/f2b3deec03175b11cdfb6642245a49722751357f/pubsub/pubsub.go#L36-L39,
// the channel backing the subscriber will just close if there is an error. The call to Err() is needed to identify
// there was an error in the channel or there are no more messages left (resulting in no errors when calling Err()).
if err = p.sub.Err(); err != nil {
p.systemMetrics.ChannelClosedError.Inc()
if err = p.Sub.Err(); err != nil {
p.SystemMetrics.ChannelClosedError.Inc()
logger.Warningf(context.Background(), "The stream for the subscriber channel closed with err: %v", err)
}

// If there are no errors, nil will be returned.
return err
}

func (p *Processor) markMessageDone(message pubsub.SubscriberMessage) {
if err := message.Done(); err != nil {
p.systemMetrics.MessageDoneError.Inc()
logger.Errorf(context.Background(), "failed to mark message as Done() in processor with err: %v", err)
}
}

func (p *Processor) StopProcessing() error {
// Note: If the underlying channel is already closed, then Stop() will return an error.
err := p.sub.Stop()
if err != nil {
p.systemMetrics.StopError.Inc()
logger.Errorf(context.Background(), "Failed to stop the subscriber channel gracefully with err: %v", err)
}
return err
}

func NewProcessor(sub pubsub.Subscriber, emailer interfaces.Emailer, scope promutils.Scope) interfaces.Processor {
return &Processor{
sub: sub,
email: emailer,
systemMetrics: newProcessorSystemMetrics(scope.NewSubScope("processor")),
email: emailer,
BaseProcessor: interfaces.BaseProcessor{
Sub: sub,
SystemMetrics: interfaces.NewProcessorSystemMetrics(scope.NewSubScope("processor")),
},
}
}
49 changes: 16 additions & 33 deletions flyteadmin/pkg/async/notifications/implementations/gcp_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,17 @@ import (

// TODO: Add a counter that encompasses the publisher stats grouped by project and domain.
type GcpProcessor struct {
sub pubsub.Subscriber
email interfaces.Emailer
systemMetrics processorSystemMetrics
email interfaces.Emailer
interfaces.BaseProcessor
}

func NewGcpProcessor(sub pubsub.Subscriber, emailer interfaces.Emailer, scope promutils.Scope) interfaces.Processor {
return &GcpProcessor{
sub: sub,
email: emailer,
systemMetrics: newProcessorSystemMetrics(scope.NewSubScope("gcp_processor")),
email: emailer,
BaseProcessor: interfaces.BaseProcessor{
Sub: sub,
SystemMetrics: interfaces.NewProcessorSystemMetrics(scope.NewSubScope("processor")),
},
}
}

Expand All @@ -41,52 +42,34 @@ func (p *GcpProcessor) StartProcessing() {
func (p *GcpProcessor) run() error {
var emailMessage admin.EmailMessage

for msg := range p.sub.Start() {
p.systemMetrics.MessageTotal.Inc()
for msg := range p.Sub.Start() {
p.SystemMetrics.MessageTotal.Inc()

if err := proto.Unmarshal(msg.Message(), &emailMessage); err != nil {
logger.Debugf(context.Background(), "failed to unmarshal to notification object message [%s] with err: %v", string(msg.Message()), err)
p.systemMetrics.MessageDecodingError.Inc()
p.markMessageDone(msg)
p.SystemMetrics.MessageDecodingError.Inc()
p.MarkMessageDone(msg)
continue
}

if err := p.email.SendEmail(context.Background(), emailMessage); err != nil {
p.systemMetrics.MessageProcessorError.Inc()
p.SystemMetrics.MessageProcessorError.Inc()
logger.Errorf(context.Background(), "Error sending an email message for message [%s] with emailM with err: %v", emailMessage.String(), err)
} else {
p.systemMetrics.MessageSuccess.Inc()
p.SystemMetrics.MessageSuccess.Inc()
}

p.markMessageDone(msg)
p.MarkMessageDone(msg)
}

// According to https://github.com/NYTimes/gizmo/blob/f2b3deec03175b11cdfb6642245a49722751357f/pubsub/pubsub.go#L36-L39,
// the channel backing the subscriber will just close if there is an error. The call to Err() is needed to identify
// there was an error in the channel or there are no more messages left (resulting in no errors when calling Err()).
if err := p.sub.Err(); err != nil {
p.systemMetrics.ChannelClosedError.Inc()
if err := p.Sub.Err(); err != nil {
p.SystemMetrics.ChannelClosedError.Inc()
logger.Warningf(context.Background(), "The stream for the subscriber channel closed with err: %v", err)
return err
}

return nil
}

func (p *GcpProcessor) markMessageDone(message pubsub.SubscriberMessage) {
if err := message.Done(); err != nil {
p.systemMetrics.MessageDoneError.Inc()
logger.Errorf(context.Background(), "failed to mark message as Done() in processor with err: %v", err)
}
}

func (p *GcpProcessor) StopProcessing() error {
// Note: If the underlying channel is already closed, then Stop() will return an error.
if err := p.sub.Stop(); err != nil {
p.systemMetrics.StopError.Inc()
logger.Errorf(context.Background(), "Failed to stop the subscriber channel gracefully with err: %v", err)
return err
}

return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func TestGcpProcessor_StartProcessing(t *testing.T) {

// Check fornumber of messages processed.
m := &dto.Metric{}
err := testGcpProcessor.(*GcpProcessor).systemMetrics.MessageSuccess.Write(m)
err := testGcpProcessor.(*GcpProcessor).SystemMetrics.MessageSuccess.Write(m)
assert.Nil(t, err)
assert.Equal(t, "counter:{value:1}", m.String())
}
Expand All @@ -61,7 +61,7 @@ func TestGcpProcessor_StartProcessingNoMessages(t *testing.T) {

// Check fornumber of messages processed.
m := &dto.Metric{}
err := testGcpProcessor.(*GcpProcessor).systemMetrics.MessageSuccess.Write(m)
err := testGcpProcessor.(*GcpProcessor).SystemMetrics.MessageSuccess.Write(m)
assert.Nil(t, err)
assert.Equal(t, "counter:{value:0}", m.String())
}
Expand Down Expand Up @@ -94,7 +94,7 @@ func TestGcpProcessor_StartProcessingEmailError(t *testing.T) {

// Check for an email error stat.
m := &dto.Metric{}
err := testGcpProcessor.(*GcpProcessor).systemMetrics.MessageProcessorError.Write(m)
err := testGcpProcessor.(*GcpProcessor).SystemMetrics.MessageProcessorError.Write(m)
assert.Nil(t, err)
assert.Equal(t, "counter:{value:1}", m.String())
}
Expand Down
Loading
Loading