Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

expose kubeconfig flag to run the agent binary on a host #94

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 102 additions & 46 deletions cmd/maestro/agent/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,33 +8,44 @@ import (
"os/signal"
"syscall"

"github.com/golang/glog"
"github.com/openshift/library-go/pkg/controller/controllercmd"
"github.com/openshift/library-go/pkg/operator/events"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
utilflag "k8s.io/component-base/cli/flag"
"k8s.io/component-base/version"
"k8s.io/klog/v2"
ocmfeature "open-cluster-management.io/api/feature"
"open-cluster-management.io/ocm/pkg/common/options"
"open-cluster-management.io/ocm/pkg/features"
"open-cluster-management.io/ocm/pkg/work/spoke"
)

var (
commonOptions = options.NewAgentOptions()
agentOption = spoke.NewWorkloadAgentOptions()
)

// by default uses 1M as the limit for state feedback
var maxJSONRawLength int32 = 1024 * 1024

func NewAgentCommand() *cobra.Command {
agentOptions := NewAgentOptions()

cmd := &cobra.Command{
Use: "agent",
Short: "Start the maestro agent",
Long: "Start the maestro agent.",
Run: runAgent,
Run: func(cmd *cobra.Command, args []string) {
ctx, cancel := context.WithCancel(context.Background())

stopCh := make(chan os.Signal, 1)
signal.Notify(stopCh, syscall.SIGINT, syscall.SIGTERM)
go func() {
defer cancel()
<-stopCh
}()

if err := agentOptions.Run(ctx); err != nil {
klog.Fatal(err)
}

<-ctx.Done()
},
}

// check if the flag is already registered to avoid duplicate flag define error
Expand All @@ -47,53 +58,98 @@ func NewAgentCommand() *cobra.Command {
fs := cmd.PersistentFlags()
fs.SetNormalizeFunc(utilflag.WordSepNormalizeFunc)
fs.AddGoFlagSet(flag.CommandLine)
commonOptions.CommoOpts.AddFlags(fs)
addFlags(fs)
agentOptions.AddFlags(fs)

utilruntime.Must(features.SpokeMutableFeatureGate.Add(ocmfeature.DefaultSpokeWorkFeatureGates))
utilruntime.Must(features.SpokeMutableFeatureGate.Set(fmt.Sprintf("%s=true", ocmfeature.RawFeedbackJsonString)))

return cmd
}

func runAgent(cmd *cobra.Command, args []string) {
ctx, cancel := context.WithCancel(context.Background())

stopCh := make(chan os.Signal, 1)
signal.Notify(stopCh, syscall.SIGINT, syscall.SIGTERM)
go func() {
defer cancel()
<-stopCh
}()
type AgentOptions struct {
CommonOptions *options.AgentOptions
WorkOptions *spoke.WorkloadAgentOptions
KubeConfigFile string
Namespace string
}

func NewAgentOptions() *AgentOptions {
workOptions := spoke.NewWorkloadAgentOptions()
// use 1M as the default limit for state feedback
workOptions.MaxJSONRawLength = 1024 * 1024
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remembered it was set in agent options. Do you need to reset it again?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, but the default value is small for maestro case, we need increase it here

// use mqtt as the default driver
agentOption.MaxJSONRawLength = maxJSONRawLength
cfg := spoke.NewWorkAgentConfig(commonOptions, agentOption)
cmdConfig := commonOptions.CommoOpts.
NewControllerCommandConfig("maestro-agent", version.Get(), cfg.RunWorkloadAgent)
cmdConfig.DisableLeaderElection = true

if err := cmdConfig.StartController(ctx); err != nil {
glog.Fatalf("error running command: %v", err)
workOptions.WorkloadSourceDriver = "mqtt"
// use manifest as the default codec
workOptions.CloudEventsClientCodecs = []string{"manifest"}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you update to support manifest and manifestbundle by default?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure, we should enable both codecs at the same time, for maestro, it should only handle the manifest, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let us handle it in separate issue #96


return &AgentOptions{
CommonOptions: options.NewAgentOptions(),
WorkOptions: workOptions,
}
}

func addFlags(fs *pflag.FlagSet) {
func (o *AgentOptions) Run(ctx context.Context) error {
kubeConfig, err := rest.InClusterConfig()
if err != nil {
klog.Warningf("failed to get kubeconfig from cluster inside, will use '--kubeconfig' to build client")

kubeConfig, err = clientcmd.BuildConfigFromFlags("", o.KubeConfigFile)
if err != nil {
return fmt.Errorf("unable to load kubeconfig from file %q: %v", o.KubeConfigFile, err)
}
}

namespace := o.Namespace
if len(namespace) == 0 {
namespace, err = getComponentNamespace()
if err != nil {
return err
}
}

controllerContext := &controllercmd.ControllerContext{
KubeConfig: kubeConfig,
EventRecorder: events.NewLoggingEventRecorder("maestro-agent"),
OperatorNamespace: namespace,
}

return spoke.NewWorkAgentConfig(o.CommonOptions, o.WorkOptions).
RunWorkloadAgent(ctx, controllerContext)
}

func (o *AgentOptions) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&o.KubeConfigFile, "kubeconfig",
o.KubeConfigFile, "Location of the kubeconfig file")
fs.StringVar(&o.Namespace, "namespace",
o.Namespace, "Namespace where the agent runs")
// workloadAgentOptions
fs.Int32Var(&maxJSONRawLength, "max-json-raw-length",
maxJSONRawLength, "The maximum size of the JSON raw string returned from status feedback")
fs.DurationVar(&agentOption.StatusSyncInterval, "status-sync-interval",
agentOption.StatusSyncInterval, "Interval to sync resource status to hub")
fs.DurationVar(&agentOption.AppliedManifestWorkEvictionGracePeriod, "resource-eviction-grace-period",
agentOption.AppliedManifestWorkEvictionGracePeriod, "Grace period for resource eviction")
fs.StringVar(&commonOptions.SpokeClusterName, "consumer-name",
commonOptions.SpokeClusterName, "Name of the consumer")
fs.Float32Var(&o.CommonOptions.CommoOpts.QPS, "kube-api-qps",
o.CommonOptions.CommoOpts.QPS, "QPS to use while talking with apiserver on spoke cluster")
fs.IntVar(&o.CommonOptions.CommoOpts.Burst, "kube-api-burst",
o.CommonOptions.CommoOpts.Burst, "Burst to use while talking with apiserver on spoke cluster")
fs.Int32Var(&o.WorkOptions.MaxJSONRawLength, "max-json-raw-length",
o.WorkOptions.MaxJSONRawLength, "The maximum size of the JSON raw string returned from status feedback")
fs.DurationVar(&o.WorkOptions.StatusSyncInterval, "status-sync-interval",
o.WorkOptions.StatusSyncInterval, "Interval to sync resource status to hub")
fs.DurationVar(&o.WorkOptions.AppliedManifestWorkEvictionGracePeriod, "resource-eviction-grace-period",
o.WorkOptions.AppliedManifestWorkEvictionGracePeriod, "Grace period for resource eviction")
fs.StringVar(&o.CommonOptions.SpokeClusterName, "consumer-name",
o.CommonOptions.SpokeClusterName, "Name of the consumer")
// message broker config file
fs.StringVar(&agentOption.WorkloadSourceConfig, "message-broker-config-file",
agentOption.WorkloadSourceConfig, "The config file path of the message broker, it can be mqtt broker or kafka broker")
fs.StringVar(&agentOption.WorkloadSourceDriver, "message-broker-type", "mqtt", "Message broker type (default: mqtt)")
fs.StringVar(&agentOption.CloudEventsClientID, "agent-client-id",
agentOption.CloudEventsClientID, "The ID of the agent client, by default it is <consumer-id>-work-agent")
fs.StringSliceVar(&agentOption.CloudEventsClientCodecs, "agent-client-codecs",
[]string{"manifest"}, "The codecs of the agent client. The valid codecs are manifest and manifestbundle")
fs.StringVar(&o.WorkOptions.WorkloadSourceConfig, "message-broker-config-file",
o.WorkOptions.WorkloadSourceConfig, "The config file path of the message broker, it can be mqtt broker or kafka broker")
fs.StringVar(&o.WorkOptions.WorkloadSourceDriver, "message-broker-type",
o.WorkOptions.WorkloadSourceDriver, "Message broker type")
fs.StringVar(&o.WorkOptions.CloudEventsClientID, "agent-client-id",
o.WorkOptions.CloudEventsClientID, "The ID of the agent client, by default it is <consumer-id>-work-agent")
fs.StringSliceVar(&o.WorkOptions.CloudEventsClientCodecs, "agent-client-codecs",

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

o.WorkOptions.CloudEventsClientCodecs, "The codecs of the agent client. The valid codecs are manifest and manifestbundle")
}

func getComponentNamespace() (string, error) {
nsBytes, err := os.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace")
if err != nil {
return "", err
}
return string(nsBytes), nil
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ require (
github.com/onsi/ginkgo/v2 v2.17.1
github.com/onsi/gomega v1.32.0
github.com/openshift-online/ocm-sdk-go v0.1.334
github.com/openshift/library-go v0.0.0-20240116081341-964bcb3f545c
github.com/prometheus/client_golang v1.18.0
github.com/segmentio/ksuid v1.0.2
github.com/spf13/cobra v1.8.0
Expand Down Expand Up @@ -105,7 +106,6 @@ require (
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/openshift/api v0.0.0-20231218131639-7a5aa77cc72d // indirect
github.com/openshift/client-go v0.0.0-20231218140158-47f6d749b9d9 // indirect
github.com/openshift/library-go v0.0.0-20240116081341-964bcb3f545c // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pkg/profile v1.3.0 // indirect
github.com/prometheus/client_model v0.5.0 // indirect
Expand Down
Empty file modified go.sum
100755 → 100644
Empty file.
Loading