-
Notifications
You must be signed in to change notification settings - Fork 0
/
driver.go
121 lines (102 loc) · 2.99 KB
/
driver.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
package main
import (
"encoding/json"
"flag"
"fmt"
"io/ioutil"
"log"
"mime"
"net/http"
"os"
"os/signal"
"syscall"
"github.com/vmware/dispatch/pkg/events"
"github.com/vmware/dispatch/pkg/events/driverclient"
)
type validationEvent struct {
Data struct {
ValidationCode string `json:"validationCode"`
ValidationURL string `json:"validationUrl"`
} `json:"data"`
EventType string `json:"eventType"`
Topic string `json:"topic"`
}
type validationResponse struct {
ValidationResponse string `json:"validationResponse"`
}
// debug
var dryRun = flag.Bool("dry-run", false, "Debug, pull messages and do not send Dispatch events")
var org = flag.String("org", "default", "organization of this event driver")
var dispatchEndpoint = flag.String("dispatch-api-endpoint", "localhost:8080", "dispatch server host")
var port = flag.Int("port", 80, "Port to listen on")
var sharedSecret = flag.String("shared-secret", "", "A token or shared secret that the client should pass")
func getDriverClient() driverclient.Client {
if *dryRun {
return nil
}
token := os.Getenv(driverclient.AuthToken)
client, err := driverclient.NewHTTPClient(driverclient.WithGateway(*dispatchEndpoint), driverclient.WithToken(token))
if err != nil {
log.Fatalf("Error when creating the events client: %s", err.Error())
}
log.Println("Event driver initialized.")
return client
}
func main() {
flag.Parse()
client := getDriverClient()
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
contentType, _, err := mime.ParseMediaType(r.Header.Get("content-type"))
if err != nil {
w.WriteHeader(http.StatusBadRequest)
}
eType := r.Header.Get("aeg-event-type")
if contentType == "application/json" && eType == "SubscriptionValidation" {
// validation handshake
var e []validationEvent
bytes, err := ioutil.ReadAll(r.Body)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
json.Unmarshal(bytes, &e)
if len(e) != 1 {
w.WriteHeader(http.StatusBadRequest)
return
}
resp := json.NewEncoder(w)
err = resp.Encode(validationResponse{ValidationResponse: e[0].Data.ValidationCode})
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
}
if contentType == "application/cloudevents+json" {
// an actual cloud event
bytes, err := ioutil.ReadAll(r.Body)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
evt := events.CloudEvent{}
err = json.Unmarshal(bytes, &evt)
if err != nil {
log.Printf("Error unmarshalling event grid event: %s", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
err = client.SendOne(&evt)
if err != nil {
log.Printf("Error sending event grid event: %s", err)
}
log.Printf("Sent event successfully %v", evt)
}
})
// Create chan signal
done := make(chan os.Signal, 1)
signal.Notify(done, syscall.SIGINT, syscall.SIGTERM)
go func() {
log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", *port), nil))
}()
<-done
}