Skip to content

Commit

Permalink
Merge pull request linkedin#133 from exponea/more-configurable-requests
Browse files Browse the repository at this point in the history
More configurable requests
  • Loading branch information
toddpalino authored Nov 24, 2016
2 parents 3726a11 + 68d7cdd commit b6c9dad
Show file tree
Hide file tree
Showing 5 changed files with 149 additions and 121 deletions.
42 changes: 27 additions & 15 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,15 @@ type BurrowConfig struct {
Httpnotifier struct {
Enable bool `gcfg:"enable"`
Groups []string `gcfg:"group"`
Url string `gcfg:"url"`
UrlOpen string `gcfg:"url"`
UrlClose string `gcfg:"url-delete"`
MethodOpen string `gcfg:"method"`
MethodClose string `gcfg:"method-delete"`
Interval int64 `gcfg:"interval"`
Extras []string `gcfg:"extra"`
TemplatePost string `gcfg:"template-post"`
TemplateDelete string `gcfg:"template-delete"`
SendDelete bool `gcfg:"send-delete"`
TemplateOpen string `gcfg:"template-post"`
TemplateClose string `gcfg:"template-delete"`
SendClose bool `gcfg:"send-delete"`
PostThreshold int `gcfg:"post-threshold"`
Timeout int `gcfg:"timeout"`
Keepalive int `gcfg:"keepalive"`
Expand All @@ -126,7 +129,9 @@ func ReadConfig(cfgFile string) *BurrowConfig {
var cfg BurrowConfig

// Set some non-standard defaults
cfg.Httpnotifier.SendDelete = true
cfg.Httpnotifier.MethodOpen = "POST"
cfg.Httpnotifier.SendClose = true
cfg.Httpnotifier.MethodClose = "DELETE"

err := gcfg.ReadFileInto(&cfg, cfgFile)
if err != nil {
Expand Down Expand Up @@ -397,21 +402,28 @@ func ValidateConfig(app *ApplicationContext) error {
}

// HTTP Notifier config
if app.Config.Httpnotifier.Url != "" {
if !validateUrl(app.Config.Httpnotifier.Url) {
if app.Config.Httpnotifier.UrlOpen != "" {
if !validateUrl(app.Config.Httpnotifier.UrlOpen) {
errs = append(errs, "HTTP notifier URL is invalid")
}
if app.Config.Httpnotifier.TemplatePost == "" {
app.Config.Httpnotifier.TemplatePost = "config/default-http-post.tmpl"
if app.Config.Httpnotifier.TemplateOpen == "" {
app.Config.Httpnotifier.TemplateOpen = "config/default-http-post.tmpl"
}
if _, err := os.Stat(app.Config.Httpnotifier.TemplatePost); os.IsNotExist(err) {
errs = append(errs, "HTTP notifier POST template file does not exist")
if _, err := os.Stat(app.Config.Httpnotifier.TemplateOpen); os.IsNotExist(err) {
errs = append(errs, "HTTP notifier template file does not exist")
}
if app.Config.Httpnotifier.TemplateDelete == "" {
app.Config.Httpnotifier.TemplateDelete = "config/default-http-delete.tmpl"
if app.Config.Httpnotifier.TemplateClose == "" {
app.Config.Httpnotifier.TemplateClose = "config/default-http-delete.tmpl"
}
if _, err := os.Stat(app.Config.Httpnotifier.TemplateDelete); os.IsNotExist(err) {
errs = append(errs, "HTTP notifier DELETE template file does not exist")
if app.Config.Httpnotifier.UrlClose == "" {
app.Config.Httpnotifier.UrlClose = app.Config.Httpnotifier.UrlOpen
} else {
if !validateUrl(app.Config.Httpnotifier.UrlClose) {
errs = append(errs, "HTTP notifier close URL is invalid")
}
}
if _, err := os.Stat(app.Config.Httpnotifier.TemplateClose); os.IsNotExist(err) {
errs = append(errs, "HTTP notifier close template file does not exist")
}
if app.Config.Httpnotifier.PostThreshold == 0 {
app.Config.Httpnotifier.PostThreshold = 2
Expand Down
4 changes: 4 additions & 0 deletions notifier/email_notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ func (emailer *EmailNotifier) NotifierName() string {
}

func (emailer *EmailNotifier) Notify(msg Message) error {
if emailer.Ignore(msg) {
return nil
}

if emailer.auth == nil {
switch emailer.AuthType {
case "plain":
Expand Down
199 changes: 101 additions & 98 deletions notifier/http_notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,26 @@ import (
"net/http"
"text/template"
"time"
"fmt"
)

type HttpNotifier struct {
Url string
TemplatePostFile string
TemplateDeleteFile string
RequestOpen HttpNotifierRequest
RequestClose HttpNotifierRequest
Threshold int
SendDelete bool
SendClose bool
Extras map[string]string
HttpClient *http.Client
templatePost *template.Template
templateDelete *template.Template
groupIds map[string]map[string]Event
}

type HttpNotifierRequest struct {
Url string
TemplateFile string
Method string
template *template.Template
}

type Event struct {
Id string
Start time.Time
Expand All @@ -57,23 +62,16 @@ func (notifier *HttpNotifier) Notify(msg Message) error {
"maxlag": maxLagHelper,
}

if notifier.templatePost == nil {
// Compile the templates
templatePost, err := template.New("post").Funcs(fmap).ParseFiles(notifier.TemplatePostFile)
if err != nil {
log.Criticalf("Cannot parse HTTP notifier POST template: %v", err)
return err
}
notifier.templatePost = templatePost.Templates()[0]
err := notifier.RequestOpen.ensureTemplateCompiled("post", fmap)
if err != nil {
log.Criticalf("Cannot parse HTTP notifier open template: %v", err)
return err
}

if notifier.templateDelete == nil {
templateDelete, err := template.New("delete").Funcs(fmap).ParseFiles(notifier.TemplateDeleteFile)
if err != nil {
log.Criticalf("Cannot parse HTTP notifier DELETE template: %v", err)
return err
}
notifier.templateDelete = templateDelete.Templates()[0]
err = notifier.RequestClose.ensureTemplateCompiled("delete", fmap)
if err != nil {
log.Criticalf("Cannot parse HTTP notifier close template: %v", err)
return err
}

if notifier.groupIds == nil {
Expand All @@ -91,73 +89,55 @@ func (notifier *HttpNotifier) sendConsumerGroupStatusNotify(msg Message) error {
// We only use IDs if we are sending deletes
idStr := ""
startTime := time.Now()
if notifier.SendDelete {
if notifier.SendClose {
if _, ok := notifier.groupIds[msg.Cluster]; !ok {
// Create the cluster map
notifier.groupIds[msg.Cluster] = make(map[string]Event)
}
if _, ok := notifier.groupIds[msg.Cluster][msg.Group]; !ok {
// Create Event and Id
eventId := uuid.NewRandom()
idStr = eventId.String()
notifier.groupIds[msg.Cluster][msg.Group] = Event{
Id: idStr,
Start: startTime,
if !notifier.Ignore(msg) {
if _, ok := notifier.groupIds[msg.Cluster][msg.Group]; !ok {
// Create Event and Id
eventId := uuid.NewRandom()
idStr = eventId.String()
notifier.groupIds[msg.Cluster][msg.Group] = Event{
Id: idStr,
Start: startTime,
}
} else {
idStr = notifier.groupIds[msg.Cluster][msg.Group].Id
startTime = notifier.groupIds[msg.Cluster][msg.Group].Start
}
} else {
idStr = notifier.groupIds[msg.Cluster][msg.Group].Id
startTime = notifier.groupIds[msg.Cluster][msg.Group].Start
}
}

// NOTE - I'm leaving the JsonEncode item in here so as not to break compatibility. New helpers go in the FuncMap above
bytesToSend := new(bytes.Buffer)
err := notifier.templatePost.Execute(bytesToSend, struct {
Cluster string
Group string
Id string
Start time.Time
Extras map[string]string
Result Message
JsonEncode func(interface{}) string
}{
Cluster: msg.Cluster,
Group: msg.Group,
Id: idStr,
Start: startTime,
Extras: notifier.Extras,
Result: msg,
JsonEncode: templateJsonEncoder,
})
if err != nil {
log.Errorf("Failed to assemble POST: %v", err)
return err
}

// Send POST to HTTP endpoint
req, err := http.NewRequest("POST", notifier.Url, bytesToSend)
req.Header.Set("Content-Type", "application/json")

resp, err := notifier.HttpClient.Do(req)
if err != nil {
log.Errorf("Failed to send POST for group %s in cluster %s at severity %v (Id %s): %v", msg.Group, msg.Cluster, msg.Status, idStr, err)
return err
}
io.Copy(ioutil.Discard, resp.Body)
resp.Body.Close()
if !notifier.Ignore(msg) {
// NOTE - I'm leaving the JsonEncode item in here so as not to break compatibility. New helpers go in the FuncMap above
err := notifier.RequestOpen.send(struct {
Cluster string
Group string
Id string
Start time.Time
Extras map[string]string
Result Message
JsonEncode func(interface{}) string
}{
Cluster: msg.Cluster,
Group: msg.Group,
Id: idStr,
Start: startTime,
Extras: notifier.Extras,
Result: msg,
JsonEncode: templateJsonEncoder,
}, notifier.HttpClient, fmt.Sprintf("open for group %s in cluster %s at severity %v (Id %s)", msg.Group, msg.Cluster, msg.Status, idStr))

if (resp.StatusCode >= 200) && (resp.StatusCode <= 299) {
log.Debugf("Sent POST for group %s in cluster %s at severity %v (Id %s)", msg.Group, msg.Cluster, msg.Status, idStr)
} else {
log.Errorf("Failed to send POST for group %s in cluster %s at severity %v (Id %s): %s", msg.Group,
msg.Cluster, msg.Status, idStr, resp.Status)
if err != nil {
return err
}
}

if notifier.SendDelete && (msg.Status == protocol.StatusOK) {
if notifier.SendClose && (msg.Status == protocol.StatusOK) {
if _, ok := notifier.groupIds[msg.Cluster][msg.Group]; ok {
// Send DELETE to HTTP endpoint
bytesToSend := new(bytes.Buffer)
err := notifier.templateDelete.Execute(bytesToSend, struct {
err := notifier.RequestClose.send(struct {
Cluster string
Group string
Id string
Expand All @@ -169,36 +149,59 @@ func (notifier *HttpNotifier) sendConsumerGroupStatusNotify(msg Message) error {
Id: notifier.groupIds[msg.Cluster][msg.Group].Id,
Start: notifier.groupIds[msg.Cluster][msg.Group].Start,
Extras: notifier.Extras,
})
if err != nil {
log.Errorf("Failed to assemble DELETE for group %s in cluster %s (Id %s): %v", msg.Group,
msg.Cluster, notifier.groupIds[msg.Cluster][msg.Group].Id, err)
return err
}
}, notifier.HttpClient, fmt.Sprintf("close for group %s in cluster %s (Id %s)", msg.Group,
msg.Cluster, notifier.groupIds[msg.Cluster][msg.Group].Id))

req, err := http.NewRequest("DELETE", notifier.Url, bytesToSend)
req.Header.Set("Content-Type", "application/json")

resp, err := notifier.HttpClient.Do(req)
if err != nil {
log.Errorf("Failed to send DELETE for group %s in cluster %s (Id %s): %v", msg.Group,
msg.Cluster, notifier.groupIds[msg.Cluster][msg.Group].Id, err)
return err
}
io.Copy(ioutil.Discard, resp.Body)
resp.Body.Close()

if (resp.StatusCode >= 200) && (resp.StatusCode <= 299) {
log.Debugf("Sent DELETE for group %s in cluster %s (Id %s)", msg.Group, msg.Cluster,
notifier.groupIds[msg.Cluster][msg.Group].Id)
} else {
log.Errorf("Failed to send DELETE for group %s in cluster %s (Id %s): %s", msg.Group,
msg.Cluster, notifier.groupIds[msg.Cluster][msg.Group].Id, resp.Status)
}

// Remove ID for group that is now clear
delete(notifier.groupIds[msg.Cluster], msg.Group)
}
}
return nil
}

func (request *HttpNotifierRequest) ensureTemplateCompiled(name string, fmap template.FuncMap) error {
if request.template != nil {
return nil
}

template, err := template.New(name).Funcs(fmap).ParseFiles(request.TemplateFile)
if err != nil {
return err
}
request.template = template.Templates()[0]

return nil
}

func (request *HttpNotifierRequest) send(templateData interface{}, httpClient *http.Client, details string) error {
bytesToSend := new(bytes.Buffer)
err := request.template.Execute(bytesToSend, templateData)
if err != nil {
log.Errorf("Failed to assemble %s: %v", details, err)
return err
}

// Send POST to HTTP endpoint
req, err := http.NewRequest(request.Method, request.Url, bytesToSend)
req.Header.Set("Content-Type", "application/json")

resp, err := httpClient.Do(req)
if err != nil {
log.Errorf("Failed to send %s: %v", details, err)
return err
}
io.Copy(ioutil.Discard, resp.Body)
resp.Body.Close()

if (resp.StatusCode >= 200) && (resp.StatusCode <= 299) {
log.Debugf("Sent %s", details)
} else {
log.Errorf("Failed to send %s: %s", details, resp.Status)
}

return nil
}
4 changes: 4 additions & 0 deletions notifier/slack_notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ func (slack *SlackNotifier) Ignore(msg Message) bool {
}

func (slack *SlackNotifier) Notify(msg Message) error {
if slack.Ignore(msg) {
return nil
}

if slack.groupMsgs == nil {
slack.groupMsgs = make(map[string]Message)
}
Expand Down
21 changes: 13 additions & 8 deletions notify_center.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type NotifyCenter struct {

func LoadNotifiers(app *ApplicationContext) error {
notifiers := []notifier.Notifier{}
if app.Config.Httpnotifier.Url != "" {
if app.Config.Httpnotifier.UrlOpen != "" {
if httpNotifier, err := NewHttpNotifier(app); err == nil {
notifiers = append(notifiers, httpNotifier)
}
Expand Down Expand Up @@ -114,9 +114,7 @@ func StopNotifiers(app *ApplicationContext) {
func (nc *NotifyCenter) handleEvaluationResponse(result *protocol.ConsumerGroupStatus) {
msg := notifier.Message(*result)
for _, notifier := range nc.notifiers {
if !notifier.Ignore(msg) {
notifier.Notify(msg)
}
notifier.Notify(msg)
}
}

Expand Down Expand Up @@ -220,11 +218,18 @@ func NewHttpNotifier(app *ApplicationContext) (*notifier.HttpNotifier, error) {
}

return &notifier.HttpNotifier{
Url: httpConfig.Url,
RequestOpen: notifier.HttpNotifierRequest{
Url: httpConfig.UrlOpen,
Method: httpConfig.MethodOpen,
TemplateFile: httpConfig.TemplateOpen,
},
RequestClose: notifier.HttpNotifierRequest{
Url: httpConfig.UrlClose,
Method: httpConfig.MethodClose,
TemplateFile: httpConfig.TemplateClose,
},
Threshold: httpConfig.PostThreshold,
SendDelete: httpConfig.SendDelete,
TemplatePostFile: httpConfig.TemplatePost,
TemplateDeleteFile: httpConfig.TemplateDelete,
SendClose: httpConfig.SendClose,
Extras: extras,
HttpClient: &http.Client{
Timeout: time.Duration(httpConfig.Timeout) * time.Second,
Expand Down

0 comments on commit b6c9dad

Please sign in to comment.