Skip to content

Commit

Permalink
feat: decoupling apiserver and clustersynchro-manager by message midd…
Browse files Browse the repository at this point in the history
…leware

Signed-off-by: zhouhaoA1 <[email protected]>
  • Loading branch information
zhouhao authored and zhouhaoA1 committed Oct 30, 2023
1 parent e788be4 commit 4177173
Show file tree
Hide file tree
Showing 289 changed files with 205,784 additions and 286 deletions.
34 changes: 32 additions & 2 deletions cmd/apiserver/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,20 @@ import (
genericapiserver "k8s.io/apiserver/pkg/server"
genericoptions "k8s.io/apiserver/pkg/server/options"
"k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/leaderelection/resourcelock"
cliflag "k8s.io/component-base/cli/flag"
componentbaseconfig "k8s.io/component-base/config"
"k8s.io/component-base/config/options"
"k8s.io/component-base/featuregate"
"k8s.io/component-base/logs"
logsapi "k8s.io/component-base/logs/api/v1"

"github.com/clusterpedia-io/clusterpedia/pkg/apiserver"
"github.com/clusterpedia-io/clusterpedia/pkg/storage"
storageoptions "github.com/clusterpedia-io/clusterpedia/pkg/storage/options"
"github.com/clusterpedia-io/clusterpedia/pkg/watcher"
watchcomponents "github.com/clusterpedia-io/clusterpedia/pkg/watcher/components"
watchoptions "github.com/clusterpedia-io/clusterpedia/pkg/watcher/options"
)

type ClusterPediaServerOptions struct {
Expand All @@ -38,6 +44,10 @@ type ClusterPediaServerOptions struct {
// Traces *genericoptions.TracingOptions

Storage *storageoptions.StorageOptions

LeaderElection componentbaseconfig.LeaderElectionConfiguration

Subscriber *watchoptions.MiddlerwareOptions
}

func NewServerOptions() *ClusterPediaServerOptions {
Expand Down Expand Up @@ -65,6 +75,13 @@ func NewServerOptions() *ClusterPediaServerOptions {
// Traces: genericoptions.NewTracingOptions(),

Storage: storageoptions.NewStorageOptions(),
LeaderElection: componentbaseconfig.LeaderElectionConfiguration{
LeaderElect: false,
ResourceName: "clusterpedia-clustersynchro-manager",
ResourceNamespace: "clusterpedia-system",
ResourceLock: resourcelock.LeasesResourceLock,
},
Subscriber: watchoptions.NewMiddlerwareOptions(),
}
}

Expand Down Expand Up @@ -109,10 +126,20 @@ func (o *ClusterPediaServerOptions) Config() (*apiserver.Config, error) {
return nil, err
}

return &apiserver.Config{
config := &apiserver.Config{
GenericConfig: genericConfig,
StorageFactory: storage,
}, nil
LeaderElection: o.LeaderElection,
}

err = watcher.NewSubscriber(o.Subscriber)
watchcomponents.InitEventCacheSize(o.Subscriber.CacheSize)

if err != nil {
return nil, err
}

return config, nil
}

func (o *ClusterPediaServerOptions) genericOptionsApplyTo(config *genericapiserver.RecommendedConfig) error {
Expand Down Expand Up @@ -153,6 +180,8 @@ func (o *ClusterPediaServerOptions) Flags() cliflag.NamedFlagSets {
genericfs.IntVar(&o.MaxMutatingRequestsInFlight, "max-mutating-requests-inflight", o.MaxMutatingRequestsInFlight, ""+
"this flag limits the maximum number of mutating requests in flight, or a zero value disables the limit completely.")

options.BindLeaderElectionFlags(&o.LeaderElection, genericfs)

o.CoreAPI.AddFlags(fss.FlagSet("global"))
o.SecureServing.AddFlags(fss.FlagSet("secure serving"))
o.Authentication.AddFlags(fss.FlagSet("authentication"))
Expand All @@ -165,6 +194,7 @@ func (o *ClusterPediaServerOptions) Flags() cliflag.NamedFlagSets {
// o.Traces.AddFlags(fss.FlagSet("traces"))

o.Storage.AddFlags(fss.FlagSet("storage"))
o.Subscriber.AddFlags(fss.FlagSet("middleware"))
return fss
}

Expand Down
71 changes: 70 additions & 1 deletion cmd/clustersynchro-manager/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
genericoptions "k8s.io/apiserver/pkg/server/options"
"k8s.io/apiserver/pkg/util/feature"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
Expand All @@ -17,6 +19,7 @@ import (
componentbaseconfig "k8s.io/component-base/config"
"k8s.io/component-base/config/options"
componentbaseconfigv1alpha1 "k8s.io/component-base/config/v1alpha1"
"k8s.io/component-base/featuregate"
"k8s.io/component-base/logs"
logsapi "k8s.io/component-base/logs/api/v1"

Expand All @@ -26,6 +29,8 @@ import (
"github.com/clusterpedia-io/clusterpedia/pkg/metrics"
"github.com/clusterpedia-io/clusterpedia/pkg/storage"
storageoptions "github.com/clusterpedia-io/clusterpedia/pkg/storage/options"
"github.com/clusterpedia-io/clusterpedia/pkg/watcher"
watchoptions "github.com/clusterpedia-io/clusterpedia/pkg/watcher/options"
)

const (
Expand All @@ -45,13 +50,32 @@ type Options struct {
KubeStateMetrics *kubestatemetrics.Options

WorkerNumber int // WorkerNumber is the number of worker goroutines
Publisher *watchoptions.MiddlerwareOptions

MaxRequestsInFlight int
MaxMutatingRequestsInFlight int
SecureServing *genericoptions.SecureServingOptionsWithLoopback
Authentication *genericoptions.DelegatingAuthenticationOptions
Authorization *genericoptions.DelegatingAuthorizationOptions
Audit *genericoptions.AuditOptions
Features *genericoptions.FeatureOptions
CoreAPI *genericoptions.CoreAPIOptions
FeatureGate featuregate.FeatureGate
Admission *genericoptions.AdmissionOptions
}

func NewClusterSynchroManagerOptions() (*Options, error) {
var (
leaderElection componentbaseconfigv1alpha1.LeaderElectionConfiguration
clientConnection componentbaseconfigv1alpha1.ClientConnectionConfiguration
)
//internal apiserver
sso := genericoptions.NewSecureServingOptions()
// We are composing recommended options for an aggregated api-server,
// whose client is typically a proxy multiplexing many operations ---
// notably including long-running ones --- into one HTTP/2 connection
// into this server. So allow many concurrent operations.
sso.HTTP2MaxStreamsPerConnection = 1000
componentbaseconfigv1alpha1.RecommendedDefaultLeaderElectionConfiguration(&leaderElection)
componentbaseconfigv1alpha1.RecommendedDefaultClientConnectionConfiguration(&clientConnection)

Expand All @@ -77,6 +101,17 @@ func NewClusterSynchroManagerOptions() (*Options, error) {
options.KubeStateMetrics = kubestatemetrics.NewOptions()

options.WorkerNumber = 5
options.MaxRequestsInFlight = 0
options.MaxMutatingRequestsInFlight = 0
options.SecureServing = sso.WithLoopback()
options.Authentication = genericoptions.NewDelegatingAuthenticationOptions()
options.Authorization = genericoptions.NewDelegatingAuthorizationOptions()
options.Audit = genericoptions.NewAuditOptions()
options.Features = genericoptions.NewFeatureOptions()
options.CoreAPI = genericoptions.NewCoreAPIOptions()
options.FeatureGate = feature.DefaultFeatureGate
options.Admission = genericoptions.NewAdmissionOptions()
options.Publisher = watchoptions.NewMiddlerwareOptions()
return &options, nil
}

Expand All @@ -88,6 +123,10 @@ func (o *Options) Flags() cliflag.NamedFlagSets {
genericfs.Float32Var(&o.ClientConnection.QPS, "kube-api-qps", o.ClientConnection.QPS, "QPS to use while talking with kubernetes apiserver.")
genericfs.Int32Var(&o.ClientConnection.Burst, "kube-api-burst", o.ClientConnection.Burst, "Burst to use while talking with kubernetes apiserver.")
genericfs.IntVar(&o.WorkerNumber, "worker-number", o.WorkerNumber, "The number of worker goroutines.")
genericfs.IntVar(&o.MaxRequestsInFlight, "max-requests-inflight", o.MaxRequestsInFlight, ""+
"Otherwise, this flag limits the maximum number of non-mutating requests in flight, or a zero value disables the limit completely.")
genericfs.IntVar(&o.MaxMutatingRequestsInFlight, "max-mutating-requests-inflight", o.MaxMutatingRequestsInFlight, ""+
"this flag limits the maximum number of mutating requests in flight, or a zero value disables the limit completely.")

options.BindLeaderElectionFlags(&o.LeaderElection, genericfs)

Expand All @@ -96,10 +135,16 @@ func (o *Options) Flags() cliflag.NamedFlagSets {
fs.StringVar(&o.Kubeconfig, "kubeconfig", o.Kubeconfig, "Path to kubeconfig file with authorization and master location information.")

logsapi.AddFlags(o.Logs, fss.FlagSet("logs"))

o.CoreAPI.AddFlags(fss.FlagSet("global"))
o.SecureServing.AddFlags(fss.FlagSet("secure serving"))
o.Authentication.AddFlags(fss.FlagSet("authentication"))
o.Authorization.AddFlags(fss.FlagSet("authorization"))
o.Audit.AddFlags(fss.FlagSet("auditing"))
o.Features.AddFlags(fss.FlagSet("features"))
o.Storage.AddFlags(fss.FlagSet("storage"))
o.Metrics.AddFlags(fss.FlagSet("metrics server"))
o.KubeStateMetrics.AddFlags(fss.FlagSet("kube state metrics"))
o.Publisher.AddFlags(fss.FlagSet("middleware"))
return fss
}

Expand All @@ -108,6 +153,7 @@ func (o *Options) Validate() error {
errs = append(errs, o.Storage.Validate()...)
errs = append(errs, o.Metrics.Validate()...)
errs = append(errs, o.KubeStateMetrics.Validate()...)
errs = append(errs, o.validateGenericOptions()...)

if o.WorkerNumber <= 0 {
errs = append(errs, fmt.Errorf("worker-number must be greater than 0"))
Expand All @@ -125,6 +171,11 @@ func (o *Options) Config() (*config.Config, error) {
return nil, err
}

err = watcher.NewPulisher(o.Publisher)
if err != nil {
return nil, err
}

kubeconfig, err := clientcmd.BuildConfigFromFlags(o.Master, o.Kubeconfig)
if err != nil {
return nil, err
Expand Down Expand Up @@ -170,3 +221,21 @@ func (o *Options) Config() (*config.Config, error) {
LeaderElection: o.LeaderElection,
}, nil
}

func (o *Options) validateGenericOptions() []error {
errors := []error{}
if o.MaxRequestsInFlight < 0 {
errors = append(errors, fmt.Errorf("--max-requests-inflight can not be negative value"))
}
if o.MaxMutatingRequestsInFlight < 0 {
errors = append(errors, fmt.Errorf("--max-mutating-requests-inflight can not be negative value"))
}

errors = append(errors, o.CoreAPI.Validate()...)
errors = append(errors, o.SecureServing.Validate()...)
errors = append(errors, o.Authentication.Validate()...)
errors = append(errors, o.Authorization.Validate()...)
errors = append(errors, o.Audit.Validate()...)
errors = append(errors, o.Features.Validate()...)
return errors
}
134 changes: 75 additions & 59 deletions deploy/clusterpedia_apiserver_deployment.yaml
Original file line number Diff line number Diff line change
@@ -1,59 +1,75 @@
apiVersion: v1
kind: ServiceAccount
metadata:
name: clusterpedia-apiserver
namespace: clusterpedia-system
---
apiVersion: v1
kind: Service
metadata:
name: clusterpedia-apiserver
namespace: clusterpedia-system
spec:
ports:
- port: 443
protocol: TCP
targetPort: 443
selector:
app: clusterpedia-apiserver
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: clusterpedia-apiserver
namespace: clusterpedia-system
labels:
app: clusterpedia-apiserver
spec:
replicas: 1
selector:
matchLabels:
app: clusterpedia-apiserver
template:
metadata:
labels:
app: clusterpedia-apiserver
spec:
containers:
- name: apiserver
image: ghcr.io/clusterpedia-io/clusterpedia/apiserver:v0.7.0
command:
- /usr/local/bin/apiserver
- --secure-port=443
- --storage-config=/etc/clusterpedia/storage/internalstorage-config.yaml
- -v=3
env:
- name: DB_PASSWORD
valueFrom:
secretKeyRef:
name: internalstorage-password
key: password
volumeMounts:
- name: internalstorage-config
mountPath: /etc/clusterpedia/storage
readOnly: true
serviceAccountName: clusterpedia-apiserver
volumes:
- name: internalstorage-config
configMap:
name: clusterpedia-internalstorage
apiVersion: v1
kind: ServiceAccount
metadata:
name: clusterpedia-apiserver
namespace: clusterpedia-system
---
apiVersion: v1
kind: Service
metadata:
name: clusterpedia-apiserver
namespace: clusterpedia-system
spec:
ports:
- port: 6443
protocol: TCP
targetPort: 6443
selector:
app: clusterpedia-apiserver
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: clusterpedia-apiserver
namespace: clusterpedia-system
labels:
app: clusterpedia-apiserver
spec:
replicas: 3
selector:
matchLabels:
app: clusterpedia-apiserver
template:
metadata:
labels:
app: clusterpedia-apiserver
spec:
containers:
- name: apiserver
image: ghcr.io/clusterpedia-io/clusterpedia/apiserver:v0.7.0
command:
- /usr/local/bin/apiserver
- --secure-port=6443
- --storage-config=/etc/clusterpedia/storage/internalstorage-config.yaml
- -v=3
- --middleware-name=rabbitmq
- --middleware-serverIp=rabbitmq-cluster.rabbitmq-system.svc
- --middleware-user=$(RABBITMQ_USER)
- --middleware-serverPort=5672
- --middleware-password=$(RABBITMQ_PASSWD)
- --cache-size=200
env:
- name: DB_PASSWORD
valueFrom:
secretKeyRef:
name: internalstorage-password
key: password
- name: RABBITMQ_USER
valueFrom:
secretKeyRef:
name: rabbitmq-connect
key: user
- name: RABBITMQ_PASSWD
valueFrom:
secretKeyRef:
name: rabbitmq-connect
key: password
volumeMounts:
- name: internalstorage-config
mountPath: /etc/clusterpedia/storage
readOnly: true
serviceAccountName: clusterpedia-apiserver
volumes:
- name: internalstorage-config
configMap:
name: clusterpedia-internalstorage
7 changes: 7 additions & 0 deletions deploy/clusterpedia_clustersynchro_manager_configmap.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
apiVersion: v1
kind: ConfigMap
metadata:
name: clustersynchro-manager-kubeconfig
namespace: clusterpedia-system
data:
manager-kubeconfig: |
Loading

0 comments on commit 4177173

Please sign in to comment.