Skip to content

Commit

Permalink
Feat/subscriber: Interfaces in subscriber (#4145)
Browse files Browse the repository at this point in the history
* feat: add interfaces to events

Signed-off-by: SohamRatnaparkhi <[email protected]>

* feat: add interfaces to subscriber gql

Signed-off-by: SohamRatnaparkhi <[email protected]>

* feat: add interfaces to k8s

Signed-off-by: SohamRatnaparkhi <[email protected]>

* feat: add interfaces to utils

Signed-off-by: SohamRatnaparkhi <[email protected]>

* feat: add interfaces to requests

Signed-off-by: SohamRatnaparkhi <[email protected]>

* feat: use interfaces in main file

Signed-off-by: SohamRatnaparkhi <[email protected]>

* feat: add interfaces to requests

Signed-off-by: SohamRatnaparkhi <[email protected]>

* feat: add interfaces to utils

Signed-off-by: SohamRatnaparkhi <[email protected]>

* feat: use new interfaces

Signed-off-by: SohamRatnaparkhi <[email protected]>

* fix: add global variables to reciever

Signed-off-by: SohamRatnaparkhi <[email protected]>

* refractor: events struct to subscriberEvents

Signed-off-by: SohamRatnaparkhi <[email protected]>

* refractor: gql struct nd getter

Signed-off-by: SohamRatnaparkhi <[email protected]>

* refractor: subscriber k8s object getter

Signed-off-by: SohamRatnaparkhi <[email protected]>

* refractor: define local vars with :=

Signed-off-by: SohamRatnaparkhi <[email protected]>

* fix: absent event in event interface

Signed-off-by: SohamRatnaparkhi <[email protected]>

---------

Signed-off-by: SohamRatnaparkhi <[email protected]>
  • Loading branch information
SohamRatnaparkhi authored Nov 2, 2023
1 parent f2275e4 commit d4550bd
Show file tree
Hide file tree
Showing 16 changed files with 267 additions and 126 deletions.
33 changes: 16 additions & 17 deletions chaoscenter/subscriber/pkg/events/chaosengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"errors"
"fmt"
"strconv"
"subscriber/pkg/k8s"
"subscriber/pkg/types"

wfclientset "github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned"
Expand All @@ -23,13 +22,13 @@ import (
)

// ChaosEventWatcher initializes the Litmus ChaosEngine event watcher
func ChaosEventWatcher(stopCh chan struct{}, stream chan types.WorkflowEvent, infraData map[string]string) {
func (ev *subscriberEvents) ChaosEventWatcher(stopCh chan struct{}, stream chan types.WorkflowEvent, infraData map[string]string) {
startTime, err := strconv.Atoi(infraData["START_TIME"])
if err != nil {
logrus.WithError(err).Fatal("failed to parse startTime")
}

cfg, err := k8s.GetKubeConfig()
cfg, err := ev.subscriberK8s.GetKubeConfig()
if err != nil {
logrus.WithError(err).Fatal("could not get kube config")
}
Expand All @@ -55,17 +54,17 @@ func ChaosEventWatcher(stopCh chan struct{}, stream chan types.WorkflowEvent, in
informer = f.Litmuschaos().V1alpha1().ChaosEngines().Informer()
}

go startWatchEngine(stopCh, informer, stream, int64(startTime))
go ev.startWatchEngine(stopCh, informer, stream, int64(startTime))
}

// handles the different events events - add, update and delete
func startWatchEngine(stopCh <-chan struct{}, s cache.SharedIndexInformer, stream chan types.WorkflowEvent, startTime int64) {
// handles the different*subscriberEvents - add, update and delete
func (ev *subscriberEvents) startWatchEngine(stopCh <-chan struct{}, s cache.SharedIndexInformer, stream chan types.WorkflowEvent, startTime int64) {
handlers := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
chaosEventHandler(obj, "ADD", stream, startTime)
ev.chaosEventHandler(obj, "ADD", stream, startTime)
},
UpdateFunc: func(oldObj, obj interface{}) {
chaosEventHandler(obj, "UPDATE", stream, startTime)
ev.chaosEventHandler(obj, "UPDATE", stream, startTime)
},
}

Expand All @@ -74,7 +73,7 @@ func startWatchEngine(stopCh <-chan struct{}, s cache.SharedIndexInformer, strea
}

// responsible for extracting the required data from the event and streaming
func chaosEventHandler(obj interface{}, eventType string, stream chan types.WorkflowEvent, startTime int64) {
func (ev *subscriberEvents) chaosEventHandler(obj interface{}, eventType string, stream chan types.WorkflowEvent, startTime int64) {
workflowObj := obj.(*chaosTypes.ChaosEngine)
if workflowObj.Labels["workflow_id"] == "" {
logrus.WithFields(map[string]interface{}{
Expand All @@ -89,7 +88,7 @@ func chaosEventHandler(obj interface{}, eventType string, stream chan types.Work
return
}

cfg, err := k8s.GetKubeConfig()
cfg, err := ev.subscriberK8s.GetKubeConfig()
if err != nil {
logrus.WithError(err).Fatal("could not get kube config")
}
Expand All @@ -104,12 +103,12 @@ func chaosEventHandler(obj interface{}, eventType string, stream chan types.Work
var cd *types.ChaosData = nil

//extracts chaos data
cd, err = getChaosData(v1alpha1.NodeStatus{StartedAt: workflowObj.ObjectMeta.CreationTimestamp}, workflowObj.Name, workflowObj.Namespace, chaosClient)
cd, err = ev.getChaosData(v1alpha1.NodeStatus{StartedAt: workflowObj.ObjectMeta.CreationTimestamp}, workflowObj.Name, workflowObj.Namespace, chaosClient)
if err != nil {
logrus.WithError(err).Print("FAILED PARSING CHAOS ENGINE CRD")
}

// considering chaos events has only 1 artifact with manifest as raw data
// considering chaos*subscriberEvents has only 1 artifact with manifest as raw data
finTime := int64(-1)
if workflowObj.Status.EngineStatus == chaosTypes.EngineStatusCompleted || workflowObj.Status.EngineStatus == chaosTypes.EngineStatusStopped {
if len(workflowObj.Status.Experiments) > 0 {
Expand Down Expand Up @@ -157,7 +156,7 @@ func chaosEventHandler(obj interface{}, eventType string, stream chan types.Work
}

// StopChaosEngineState is used to patch all the chaosEngines with engineState=stop
func StopChaosEngineState(namespace string, workflowRunID *string) error {
func (ev *subscriberEvents) StopChaosEngineState(namespace string, workflowRunID *string) error {
ctx := context.TODO()

//Define the GVR
Expand All @@ -168,7 +167,7 @@ func StopChaosEngineState(namespace string, workflowRunID *string) error {
}

//Generate the dynamic client
_, dynamicClient, err := k8s.GetDynamicAndDiscoveryClient()
_, dynamicClient, err := ev.subscriberK8s.GetDynamicAndDiscoveryClient()
if err != nil {
return errors.New("failed to get dynamic client, error: " + err.Error())
}
Expand All @@ -185,7 +184,7 @@ func StopChaosEngineState(namespace string, workflowRunID *string) error {
return errors.New("failed to list chaosengines: " + err.Error())
}

//Foe every chaosEngine patch the engineState to Stop
//Foe every subscriber patch the engineState to Stop
for _, val := range chaosEngines.Items {
patch := []byte(`{"spec":{"engineState":"stop"}}`)
patched, err := dynamicClient.Resource(resourceType).Namespace(namespace).Patch(ctx, val.GetName(), mergeType.MergePatchType, patch, v1.PatchOptions{})
Expand All @@ -200,9 +199,9 @@ func StopChaosEngineState(namespace string, workflowRunID *string) error {
}

// StopWorkflow will patch the workflow based on workflow name using the shutdown strategy
func StopWorkflow(wfName string, namespace string) error {
func (ev *subscriberEvents) StopWorkflow(wfName string, namespace string) error {

conf, err := k8s.GetKubeConfig()
conf, err := ev.subscriberK8s.GetKubeConfig()
wfClient := wfclientset.NewForConfigOrDie(conf).ArgoprojV1alpha1().Workflows(namespace)
patch := []byte(`{"spec":{"shutdown":"Stop"}}`)
wf, err := wfClient.Patch(context.TODO(), wfName, mergeType.MergePatchType, patch, v1.PatchOptions{})
Expand Down
38 changes: 38 additions & 0 deletions chaoscenter/subscriber/pkg/events/definations.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package events

import (
"subscriber/pkg/graphql"
"subscriber/pkg/types"

"subscriber/pkg/k8s"

"github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
v1alpha13 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
v1alpha12 "github.com/litmuschaos/chaos-operator/pkg/client/clientset/versioned/typed/litmuschaos/v1alpha1"
)

type SubscriberEvents interface {
ChaosEventWatcher(stopCh chan struct{}, stream chan types.WorkflowEvent, infraData map[string]string)
StopChaosEngineState(namespace string, workflowRunID *string) error
CheckChaosData(nodeStatus v1alpha13.NodeStatus, workflowNS string, chaosClient *v1alpha12.LitmuschaosV1alpha1Client) (string, *types.ChaosData, error)
GetWorkflowObj(uid string) (*v1alpha1.Workflow, error)
ListWorkflowObject(wfid string) (*v1alpha1.WorkflowList, error)
GenerateWorkflowPayload(cid, accessKey, version, completed string, wfEvent types.WorkflowEvent) ([]byte, error)
WorkflowEventWatcher(stopCh chan struct{}, stream chan types.WorkflowEvent, infraData map[string]string)
WorkflowEventHandler(workflowObj *v1alpha1.Workflow, eventType string, startTime int64) (types.WorkflowEvent, error)
SendWorkflowUpdates(infraData map[string]string, event types.WorkflowEvent) (string, error)
WorkflowUpdates(infraData map[string]string, event chan types.WorkflowEvent)
StopWorkflow(wfName string, namespace string) error
}

type subscriberEvents struct {
gqlSubscriberServer graphql.SubscriberGql
subscriberK8s k8s.SubscriberK8s
}

func NewSubscriberEventsOperator(gqlSubscriberServer graphql.SubscriberGql, subscriberK8s k8s.SubscriberK8s) SubscriberEvents {
return &subscriberEvents{
gqlSubscriberServer: gqlSubscriberServer,
subscriberK8s: subscriberK8s,
}
}
19 changes: 9 additions & 10 deletions chaoscenter/subscriber/pkg/events/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"regexp"
"strconv"
"strings"
"subscriber/pkg/k8s"
"subscriber/pkg/types"

"github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
Expand All @@ -24,7 +23,7 @@ import (
)

// util function, extracts the chaos data using the litmus go-client
func getChaosData(nodeStatus v1alpha13.NodeStatus, engineName, engineNS string, chaosClient *v1alpha12.LitmuschaosV1alpha1Client) (*types.ChaosData, error) {
func (ev *subscriberEvents) getChaosData(nodeStatus v1alpha13.NodeStatus, engineName, engineNS string, chaosClient *v1alpha12.LitmuschaosV1alpha1Client) (*types.ChaosData, error) {
cd := &types.ChaosData{}
cd.EngineName = engineName
cd.Namespace = engineNS
Expand Down Expand Up @@ -79,7 +78,7 @@ func getChaosData(nodeStatus v1alpha13.NodeStatus, engineName, engineNS string,
}

// CheckChaosData util function, checks if event is a chaos-exp event, if so - extract the chaos data
func CheckChaosData(nodeStatus v1alpha13.NodeStatus, workflowNS string, chaosClient *v1alpha12.LitmuschaosV1alpha1Client) (string, *types.ChaosData, error) {
func (ev *subscriberEvents) CheckChaosData(nodeStatus v1alpha13.NodeStatus, workflowNS string, chaosClient *v1alpha12.LitmuschaosV1alpha1Client) (string, *types.ChaosData, error) {
nodeType := string(nodeStatus.Type)
var cd *types.ChaosData = nil
// considering chaos events has only 1 artifact with manifest as raw data
Expand All @@ -92,7 +91,7 @@ func CheckChaosData(nodeStatus v1alpha13.NodeStatus, workflowNS string, chaosCli
if nodeStatus.Phase != "Pending" {
name := obj.GetName()
if obj.GetGenerateName() != "" {
log, err := k8s.GetLogs(nodeStatus.ID, workflowNS, "main")
log, err := ev.subscriberK8s.GetLogs(nodeStatus.ID, workflowNS, "main")
if err != nil {
return nodeType, nil, err
}
Expand All @@ -101,7 +100,7 @@ func CheckChaosData(nodeStatus v1alpha13.NodeStatus, workflowNS string, chaosCli
return nodeType, nil, errors.New("Chaos-Engine Generated Name couldn't be retrieved")
}
}
cd, err = getChaosData(nodeStatus, name, obj.GetNamespace(), chaosClient)
cd, err = ev.getChaosData(nodeStatus, name, obj.GetNamespace(), chaosClient)
return nodeType, cd, err
}
}
Expand Down Expand Up @@ -130,9 +129,9 @@ func StrConvTime(time int64) string {
}
}

func GetWorkflowObj(uid string) (*v1alpha1.Workflow, error) {
func (ev *subscriberEvents) GetWorkflowObj(uid string) (*v1alpha1.Workflow, error) {
ctx := context.TODO()
conf, err := k8s.GetKubeConfig()
conf, err := ev.subscriberK8s.GetKubeConfig()
if err != nil {
return nil, err
}
Expand All @@ -153,9 +152,9 @@ func GetWorkflowObj(uid string) (*v1alpha1.Workflow, error) {
return nil, nil
}

func ListWorkflowObject(wfid string) (*v1alpha1.WorkflowList, error) {
func (ev *subscriberEvents) ListWorkflowObject(wfid string) (*v1alpha1.WorkflowList, error) {
ctx := context.TODO()
conf, err := k8s.GetKubeConfig()
conf, err := ev.subscriberK8s.GetKubeConfig()
if err != nil {
return nil, err
}
Expand All @@ -171,7 +170,7 @@ func ListWorkflowObject(wfid string) (*v1alpha1.WorkflowList, error) {
}

// GenerateWorkflowPayload generate graphql mutation payload for events event
func GenerateWorkflowPayload(cid, accessKey, version, completed string, wfEvent types.WorkflowEvent) ([]byte, error) {
func (ev *subscriberEvents) GenerateWorkflowPayload(cid, accessKey, version, completed string, wfEvent types.WorkflowEvent) ([]byte, error) {
infraID := `{infraID: \"` + cid + `\", version: \"` + version + `\", accessKey: \"` + accessKey + `\"}`
for id, event := range wfEvent.Nodes {
event.Message = strings.Replace(event.Message, `"`, ``, -1)
Expand Down
33 changes: 15 additions & 18 deletions chaoscenter/subscriber/pkg/events/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,6 @@ import (
"strconv"
"time"

"subscriber/pkg/graphql"

"subscriber/pkg/k8s"
"subscriber/pkg/types"

"github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
Expand Down Expand Up @@ -39,12 +36,12 @@ var (
)

// WorkflowEventWatcher initializes the Argo Workflow event watcher
func WorkflowEventWatcher(stopCh chan struct{}, stream chan types.WorkflowEvent, infraData map[string]string) {
func (ev *subscriberEvents) WorkflowEventWatcher(stopCh chan struct{}, stream chan types.WorkflowEvent, infraData map[string]string) {
startTime, err := strconv.Atoi(infraData["START_TIME"])
if err != nil {
logrus.WithError(err).Fatal("Failed to parse START_TIME")
}
cfg, err := k8s.GetKubeConfig()
cfg, err := ev.subscriberK8s.GetKubeConfig()
if err != nil {
logrus.WithError(err).Fatal("Could not get kube config")
}
Expand All @@ -67,15 +64,15 @@ func WorkflowEventWatcher(stopCh chan struct{}, stream chan types.WorkflowEvent,
informer = f.Argoproj().V1alpha1().Workflows().Informer()
// Start Event Watch
}
go startWatchWorkflow(stopCh, informer, stream, int64(startTime))
go ev.startWatchWorkflow(stopCh, informer, stream, int64(startTime))
}

// handles the different events events - add, update and delete
func startWatchWorkflow(stopCh <-chan struct{}, s cache.SharedIndexInformer, stream chan types.WorkflowEvent, startTime int64) {
func (ev *subscriberEvents) startWatchWorkflow(stopCh <-chan struct{}, s cache.SharedIndexInformer, stream chan types.WorkflowEvent, startTime int64) {
handlers := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
workflowObj := obj.(*v1alpha1.Workflow)
workflow, err := WorkflowEventHandler(workflowObj, "ADD", startTime)
workflow, err := ev.WorkflowEventHandler(workflowObj, "ADD", startTime)
if err != nil {
logrus.Error(err)
}
Expand All @@ -86,7 +83,7 @@ func startWatchWorkflow(stopCh <-chan struct{}, s cache.SharedIndexInformer, str
},
UpdateFunc: func(oldObj, obj interface{}) {
workflowObj := obj.(*v1alpha1.Workflow)
workflow, err := WorkflowEventHandler(workflowObj, "UPDATE", startTime)
workflow, err := ev.WorkflowEventHandler(workflowObj, "UPDATE", startTime)
if err != nil {
logrus.Error(err)
}
Expand All @@ -100,7 +97,7 @@ func startWatchWorkflow(stopCh <-chan struct{}, s cache.SharedIndexInformer, str
}

// WorkflowEventHandler is responsible for extracting the required data from the event and streaming
func WorkflowEventHandler(workflowObj *v1alpha1.Workflow, eventType string, startTime int64) (types.WorkflowEvent, error) {
func (ev *subscriberEvents) WorkflowEventHandler(workflowObj *v1alpha1.Workflow, eventType string, startTime int64) (types.WorkflowEvent, error) {
if workflowObj.Labels["workflow_id"] == "" {
logrus.WithFields(map[string]interface{}{
"uid": string(workflowObj.ObjectMeta.UID),
Expand All @@ -114,7 +111,7 @@ func WorkflowEventHandler(workflowObj *v1alpha1.Workflow, eventType string, star
return types.WorkflowEvent{}, errors.New("startTime of subscriber is greater than experiment creation timestamp")
}

cfg, err := k8s.GetKubeConfig()
cfg, err := ev.subscriberK8s.GetKubeConfig()
if err != nil {
logrus.WithError(err).Fatal("Could not get kube config")
}
Expand All @@ -137,7 +134,7 @@ func WorkflowEventHandler(workflowObj *v1alpha1.Workflow, eventType string, star
// considering chaos events has only 1 artifact with manifest as raw data
if nodeStatus.Type == "Pod" && nodeStatus.Inputs != nil && len(nodeStatus.Inputs.Artifacts) == 1 && nodeStatus.Inputs.Artifacts[0].Raw != nil {
//extracts chaos data
nodeType, cd, err = CheckChaosData(nodeStatus, workflowObj.ObjectMeta.Namespace, chaosClient)
nodeType, cd, err = ev.CheckChaosData(nodeStatus, workflowObj.ObjectMeta.Namespace, chaosClient)
if err != nil {
logrus.WithError(err).Print("Failed to parse ChaosEngine CRD")
}
Expand Down Expand Up @@ -202,7 +199,7 @@ func WorkflowEventHandler(workflowObj *v1alpha1.Workflow, eventType string, star
}

// SendWorkflowUpdates generates graphql mutation to send events updates to graphql server
func SendWorkflowUpdates(infraData map[string]string, event types.WorkflowEvent) (string, error) {
func (ev *subscriberEvents) SendWorkflowUpdates(infraData map[string]string, event types.WorkflowEvent) (string, error) {
if wfEvent, ok := eventMap[event.UID]; ok {
for key, node := range wfEvent.Nodes {
if node.Type == "ChaosEngine" && node.ChaosExp != nil && event.Nodes[key].ChaosExp == nil {
Expand Down Expand Up @@ -230,28 +227,28 @@ func SendWorkflowUpdates(infraData map[string]string, event types.WorkflowEvent)
eventMap[event.UID] = event

// generate graphql payload
payload, err := GenerateWorkflowPayload(infraData["INFRA_ID"], infraData["ACCESS_KEY"], infraData["VERSION"], "false", event)
payload, err := ev.GenerateWorkflowPayload(infraData["INFRA_ID"], infraData["ACCESS_KEY"], infraData["VERSION"], "false", event)
if err != nil {
return "", errors.New("Error while generating graphql payload from the workflow event" + err.Error())
}

if event.FinishedAt != "" {
payload, err = GenerateWorkflowPayload(infraData["INFRA_ID"], infraData["ACCESS_KEY"], infraData["VERSION"], "true", event)
payload, err = ev.GenerateWorkflowPayload(infraData["INFRA_ID"], infraData["ACCESS_KEY"], infraData["VERSION"], "true", event)
delete(eventMap, event.UID)
}

body, err := graphql.SendRequest(infraData["SERVER_ADDR"], payload)
body, err := ev.gqlSubscriberServer.SendRequest(infraData["SERVER_ADDR"], payload)
if err != nil {
return "", err
}

return body, nil
}

func WorkflowUpdates(infraData map[string]string, event chan types.WorkflowEvent) {
func (ev *subscriberEvents) WorkflowUpdates(infraData map[string]string, event chan types.WorkflowEvent) {
// listen on the channel for streaming event updates
for eventData := range event {
response, err := SendWorkflowUpdates(infraData, eventData)
response, err := ev.SendWorkflowUpdates(infraData, eventData)
if err != nil {
logrus.Print(err.Error())
}
Expand Down
13 changes: 13 additions & 0 deletions chaoscenter/subscriber/pkg/graphql/definations.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package graphql

type SubscriberGql interface {
SendRequest(server string, payload []byte) (string, error)
MarshalGQLData(gqlData interface{}) (string, error)
}

type subscriberGql struct {
}

func NewSubscriberGql() SubscriberGql {
return &subscriberGql{}
}
4 changes: 2 additions & 2 deletions chaoscenter/subscriber/pkg/graphql/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"strings"
)

func SendRequest(server string, payload []byte) (string, error) {
func (gql *subscriberGql) SendRequest(server string, payload []byte) (string, error) {
req, err := http.NewRequest("POST", server, bytes.NewBuffer(payload))
if err != nil {
return "", err
Expand All @@ -30,7 +30,7 @@ func SendRequest(server string, payload []byte) (string, error) {
}

// MarshalGQLData processes event data into proper format acceptable by graphql
func MarshalGQLData(gqlData interface{}) (string, error) {
func (gql *subscriberGql) MarshalGQLData(gqlData interface{}) (string, error) {
data, err := json.Marshal(gqlData)
if err != nil {
return "", err
Expand Down
Loading

0 comments on commit d4550bd

Please sign in to comment.