diff --git a/cmd/maestro/agent/cmd.go b/cmd/maestro/agent/cmd.go index 25c0d4e4..cca9b430 100644 --- a/cmd/maestro/agent/cmd.go +++ b/cmd/maestro/agent/cmd.go @@ -8,12 +8,14 @@ import ( "os/signal" "syscall" - "github.com/golang/glog" + "github.com/openshift/library-go/pkg/controller/controllercmd" + "github.com/openshift/library-go/pkg/operator/events" "github.com/spf13/cobra" "github.com/spf13/pflag" utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" utilflag "k8s.io/component-base/cli/flag" - "k8s.io/component-base/version" "k8s.io/klog/v2" ocmfeature "open-cluster-management.io/api/feature" "open-cluster-management.io/ocm/pkg/common/options" @@ -21,20 +23,29 @@ import ( "open-cluster-management.io/ocm/pkg/work/spoke" ) -var ( - commonOptions = options.NewAgentOptions() - agentOption = spoke.NewWorkloadAgentOptions() -) - -// by default uses 1M as the limit for state feedback -var maxJSONRawLength int32 = 1024 * 1024 - func NewAgentCommand() *cobra.Command { + agentOptions := NewAgentOptions() + cmd := &cobra.Command{ Use: "agent", Short: "Start the maestro agent", Long: "Start the maestro agent.", - Run: runAgent, + Run: func(cmd *cobra.Command, args []string) { + ctx, cancel := context.WithCancel(context.Background()) + + stopCh := make(chan os.Signal, 1) + signal.Notify(stopCh, syscall.SIGINT, syscall.SIGTERM) + go func() { + defer cancel() + <-stopCh + }() + + if err := agentOptions.Run(ctx); err != nil { + klog.Fatal(err) + } + + <-ctx.Done() + }, } // check if the flag is already registered to avoid duplicate flag define error @@ -47,53 +58,98 @@ func NewAgentCommand() *cobra.Command { fs := cmd.PersistentFlags() fs.SetNormalizeFunc(utilflag.WordSepNormalizeFunc) fs.AddGoFlagSet(flag.CommandLine) - commonOptions.CommoOpts.AddFlags(fs) - addFlags(fs) + agentOptions.AddFlags(fs) + utilruntime.Must(features.SpokeMutableFeatureGate.Add(ocmfeature.DefaultSpokeWorkFeatureGates)) utilruntime.Must(features.SpokeMutableFeatureGate.Set(fmt.Sprintf("%s=true", ocmfeature.RawFeedbackJsonString))) return cmd } -func runAgent(cmd *cobra.Command, args []string) { - ctx, cancel := context.WithCancel(context.Background()) - - stopCh := make(chan os.Signal, 1) - signal.Notify(stopCh, syscall.SIGINT, syscall.SIGTERM) - go func() { - defer cancel() - <-stopCh - }() +type AgentOptions struct { + CommonOptions *options.AgentOptions + WorkOptions *spoke.WorkloadAgentOptions + KubeConfigFile string + Namespace string +} +func NewAgentOptions() *AgentOptions { + workOptions := spoke.NewWorkloadAgentOptions() + // use 1M as the default limit for state feedback + workOptions.MaxJSONRawLength = 1024 * 1024 // use mqtt as the default driver - agentOption.MaxJSONRawLength = maxJSONRawLength - cfg := spoke.NewWorkAgentConfig(commonOptions, agentOption) - cmdConfig := commonOptions.CommoOpts. - NewControllerCommandConfig("maestro-agent", version.Get(), cfg.RunWorkloadAgent) - cmdConfig.DisableLeaderElection = true - - if err := cmdConfig.StartController(ctx); err != nil { - glog.Fatalf("error running command: %v", err) + workOptions.WorkloadSourceDriver = "mqtt" + // use manifest as the default codec + workOptions.CloudEventsClientCodecs = []string{"manifest"} + + return &AgentOptions{ + CommonOptions: options.NewAgentOptions(), + WorkOptions: workOptions, } } -func addFlags(fs *pflag.FlagSet) { +func (o *AgentOptions) Run(ctx context.Context) error { + kubeConfig, err := rest.InClusterConfig() + if err != nil { + klog.Warningf("failed to get kubeconfig from cluster inside, will use '--kubeconfig' to build client") + + kubeConfig, err = clientcmd.BuildConfigFromFlags("", o.KubeConfigFile) + if err != nil { + return fmt.Errorf("unable to load kubeconfig from file %q: %v", o.KubeConfigFile, err) + } + } + + namespace := o.Namespace + if len(namespace) == 0 { + namespace, err = getComponentNamespace() + if err != nil { + return err + } + } + + controllerContext := &controllercmd.ControllerContext{ + KubeConfig: kubeConfig, + EventRecorder: events.NewLoggingEventRecorder("maestro-agent"), + OperatorNamespace: namespace, + } + + return spoke.NewWorkAgentConfig(o.CommonOptions, o.WorkOptions). + RunWorkloadAgent(ctx, controllerContext) +} + +func (o *AgentOptions) AddFlags(fs *pflag.FlagSet) { + fs.StringVar(&o.KubeConfigFile, "kubeconfig", + o.KubeConfigFile, "Location of the kubeconfig file") + fs.StringVar(&o.Namespace, "namespace", + o.Namespace, "Namespace where the agent runs") // workloadAgentOptions - fs.Int32Var(&maxJSONRawLength, "max-json-raw-length", - maxJSONRawLength, "The maximum size of the JSON raw string returned from status feedback") - fs.DurationVar(&agentOption.StatusSyncInterval, "status-sync-interval", - agentOption.StatusSyncInterval, "Interval to sync resource status to hub") - fs.DurationVar(&agentOption.AppliedManifestWorkEvictionGracePeriod, "resource-eviction-grace-period", - agentOption.AppliedManifestWorkEvictionGracePeriod, "Grace period for resource eviction") - fs.StringVar(&commonOptions.SpokeClusterName, "consumer-name", - commonOptions.SpokeClusterName, "Name of the consumer") + fs.Float32Var(&o.CommonOptions.CommoOpts.QPS, "kube-api-qps", + o.CommonOptions.CommoOpts.QPS, "QPS to use while talking with apiserver on spoke cluster") + fs.IntVar(&o.CommonOptions.CommoOpts.Burst, "kube-api-burst", + o.CommonOptions.CommoOpts.Burst, "Burst to use while talking with apiserver on spoke cluster") + fs.Int32Var(&o.WorkOptions.MaxJSONRawLength, "max-json-raw-length", + o.WorkOptions.MaxJSONRawLength, "The maximum size of the JSON raw string returned from status feedback") + fs.DurationVar(&o.WorkOptions.StatusSyncInterval, "status-sync-interval", + o.WorkOptions.StatusSyncInterval, "Interval to sync resource status to hub") + fs.DurationVar(&o.WorkOptions.AppliedManifestWorkEvictionGracePeriod, "resource-eviction-grace-period", + o.WorkOptions.AppliedManifestWorkEvictionGracePeriod, "Grace period for resource eviction") + fs.StringVar(&o.CommonOptions.SpokeClusterName, "consumer-name", + o.CommonOptions.SpokeClusterName, "Name of the consumer") // message broker config file - fs.StringVar(&agentOption.WorkloadSourceConfig, "message-broker-config-file", - agentOption.WorkloadSourceConfig, "The config file path of the message broker, it can be mqtt broker or kafka broker") - fs.StringVar(&agentOption.WorkloadSourceDriver, "message-broker-type", "mqtt", "Message broker type (default: mqtt)") - fs.StringVar(&agentOption.CloudEventsClientID, "agent-client-id", - agentOption.CloudEventsClientID, "The ID of the agent client, by default it is -work-agent") - fs.StringSliceVar(&agentOption.CloudEventsClientCodecs, "agent-client-codecs", - []string{"manifest"}, "The codecs of the agent client. The valid codecs are manifest and manifestbundle") + fs.StringVar(&o.WorkOptions.WorkloadSourceConfig, "message-broker-config-file", + o.WorkOptions.WorkloadSourceConfig, "The config file path of the message broker, it can be mqtt broker or kafka broker") + fs.StringVar(&o.WorkOptions.WorkloadSourceDriver, "message-broker-type", + o.WorkOptions.WorkloadSourceDriver, "Message broker type") + fs.StringVar(&o.WorkOptions.CloudEventsClientID, "agent-client-id", + o.WorkOptions.CloudEventsClientID, "The ID of the agent client, by default it is -work-agent") + fs.StringSliceVar(&o.WorkOptions.CloudEventsClientCodecs, "agent-client-codecs", + o.WorkOptions.CloudEventsClientCodecs, "The codecs of the agent client. The valid codecs are manifest and manifestbundle") +} +func getComponentNamespace() (string, error) { + nsBytes, err := os.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace") + if err != nil { + return "", err + } + return string(nsBytes), nil } diff --git a/go.mod b/go.mod index 40dae9a1..02384fad 100755 --- a/go.mod +++ b/go.mod @@ -26,6 +26,7 @@ require ( github.com/onsi/ginkgo/v2 v2.17.1 github.com/onsi/gomega v1.32.0 github.com/openshift-online/ocm-sdk-go v0.1.334 + github.com/openshift/library-go v0.0.0-20240116081341-964bcb3f545c github.com/prometheus/client_golang v1.18.0 github.com/segmentio/ksuid v1.0.2 github.com/spf13/cobra v1.8.0 @@ -105,7 +106,6 @@ require ( github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/openshift/api v0.0.0-20231218131639-7a5aa77cc72d // indirect github.com/openshift/client-go v0.0.0-20231218140158-47f6d749b9d9 // indirect - github.com/openshift/library-go v0.0.0-20240116081341-964bcb3f545c // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pkg/profile v1.3.0 // indirect github.com/prometheus/client_model v0.5.0 // indirect diff --git a/go.sum b/go.sum old mode 100755 new mode 100644