From 39df548d437b035337e4f4ad6a9186eaf27473fd Mon Sep 17 00:00:00 2001 From: shaoyue Date: Fri, 25 Oct 2024 15:09:21 +0800 Subject: [PATCH] support enable etcd authn (#201) Signed-off-by: haorenfsa --- config/samples/milvus_etcd_auth.yaml | 24 +++++++++++++++ pkg/controllers/conditions.go | 42 ++++++++++++++++++++------ pkg/controllers/conditions_test.go | 12 ++++---- pkg/controllers/status_cluster.go | 2 +- pkg/controllers/status_cluster_test.go | 9 ------ test/min-milvus-feature.yaml | 9 ++++++ 6 files changed, 72 insertions(+), 26 deletions(-) create mode 100644 config/samples/milvus_etcd_auth.yaml diff --git a/config/samples/milvus_etcd_auth.yaml b/config/samples/milvus_etcd_auth.yaml new file mode 100644 index 00000000..2508012c --- /dev/null +++ b/config/samples/milvus_etcd_auth.yaml @@ -0,0 +1,24 @@ +# This is a sample to enable etcd auth for milvus +apiVersion: milvus.io/v1beta1 +kind: Milvus +metadata: + name: my-release + labels: + app: milvus +spec: + dependencies: + etcd: + inCluster: + values: + replicaCount: 1 + auth: + rbac: + enabled: true + rootPassword: myrootpass + components: {} + config: + etcd: + auth: + enabled: true + userName: root + password: myrootpass diff --git a/pkg/controllers/conditions.go b/pkg/controllers/conditions.go index 7663d6bd..2212589e 100644 --- a/pkg/controllers/conditions.go +++ b/pkg/controllers/conditions.go @@ -49,8 +49,20 @@ var ( wrapKafkaConditonGetter = func(ctx context.Context, logger logr.Logger, p v1beta1.MilvusKafka, cfg external.CheckKafkaConfig) func() v1beta1.MilvusCondition { return func() v1beta1.MilvusCondition { return GetKafkaCondition(ctx, logger, p, cfg) } } - wrapEtcdConditionGetter = func(ctx context.Context, endpoints []string) func() v1beta1.MilvusCondition { - return func() v1beta1.MilvusCondition { return GetEtcdCondition(ctx, endpoints) } + wrapEtcdConditionGetter = func(ctx context.Context, m *v1beta1.Milvus, endpoints []string) func() v1beta1.MilvusCondition { + sslEnabled, _ := util.GetBoolValue(m.Spec.Conf.Data, "etcd", "ssl", "enabled") + if sslEnabled { + return external.NewTCPDialConditionGetter(v1beta1.EtcdReady, endpoints).GetCondition + } + authEnabled, _ := util.GetBoolValue(m.Spec.Conf.Data, "etcd", "auth", "enabled") + userName, _ := util.GetStringValue(m.Spec.Conf.Data, "etcd", "auth", "userName") + password, _ := util.GetStringValue(m.Spec.Conf.Data, "etcd", "auth", "password") + authCfg := EtcdAuthConfig{ + Enabled: authEnabled, + Username: userName, + Password: password, + } + return func() v1beta1.MilvusCondition { return GetEtcdCondition(ctx, authCfg, endpoints) } } wrapMinioConditionGetter = func(ctx context.Context, logger logr.Logger, cli client.Client, info StorageConditionInfo) func() v1beta1.MilvusCondition { return func() v1beta1.MilvusCondition { return GetMinioCondition(ctx, logger, cli, info) } @@ -140,8 +152,8 @@ type EtcdConditionInfo struct { Endpoints []string } -func GetEtcdCondition(ctx context.Context, endpoints []string) v1beta1.MilvusCondition { - health := GetEndpointsHealth(endpoints) +func GetEtcdCondition(ctx context.Context, authCfg EtcdAuthConfig, endpoints []string) v1beta1.MilvusCondition { + health := GetEndpointsHealth(authCfg, endpoints) etcdReady := false var msg string for _, ep := range endpoints { @@ -174,19 +186,29 @@ var etcdNewClient NewEtcdClientFunc = func(cfg clientv3.Config) (EtcdClient, err const etcdHealthKey = "health" -func GetEndpointsHealth(endpoints []string) map[string]EtcdEndPointHealth { +type EtcdAuthConfig struct { + Enabled bool + Username string + Password string +} + +func GetEndpointsHealth(authConfig EtcdAuthConfig, endpoints []string) map[string]EtcdEndPointHealth { hch := make(chan EtcdEndPointHealth, len(endpoints)) var wg sync.WaitGroup for _, ep := range endpoints { wg.Add(1) go func(ep string) { defer wg.Done() - + cliCfg := clientv3.Config{ + Endpoints: []string{ep}, + DialTimeout: 5 * time.Second, + } + if authConfig.Enabled { + cliCfg.Username = authConfig.Username + cliCfg.Password = authConfig.Password + } var checkEtcd = func() error { - cli, err := etcdNewClient(clientv3.Config{ - Endpoints: []string{ep}, - DialTimeout: 5 * time.Second, - }) + cli, err := etcdNewClient(cliCfg) if err != nil { return errors.Wrap(err, "failed to create etcd client") } diff --git a/pkg/controllers/conditions_test.go b/pkg/controllers/conditions_test.go index b456bb0a..8c2d41c7 100644 --- a/pkg/controllers/conditions_test.go +++ b/pkg/controllers/conditions_test.go @@ -72,7 +72,7 @@ func TestWrapGetters(t *testing.T) { fn() }) t.Run("etcd", func(t *testing.T) { - fn := wrapEtcdConditionGetter(ctx, []string{}) + fn := wrapEtcdConditionGetter(ctx, &v1beta1.Milvus{}, []string{}) fn() }) t.Run("minio", func(t *testing.T) { @@ -202,7 +202,7 @@ func TestGetEtcdCondition(t *testing.T) { errTest := errors.New("test") // no endpoint - ret := GetEtcdCondition(ctx, []string{}) + ret := GetEtcdCondition(ctx, EtcdAuthConfig{}, []string{}) assert.Equal(t, corev1.ConditionFalse, ret.Status) assert.Equal(t, v1beta1.ReasonEtcdNotReady, ret.Reason) @@ -210,7 +210,7 @@ func TestGetEtcdCondition(t *testing.T) { t.Run("new client failed", func(t *testing.T) { stubs := gostub.Stub(&etcdNewClient, getMockNewEtcdClient(nil, errTest)) defer stubs.Reset() - ret = GetEtcdCondition(ctx, []string{"etcd:2379"}) + ret = GetEtcdCondition(ctx, EtcdAuthConfig{}, []string{"etcd:2379"}) assert.Equal(t, corev1.ConditionFalse, ret.Status) assert.Equal(t, v1beta1.ReasonEtcdNotReady, ret.Reason) }) @@ -221,7 +221,7 @@ func TestGetEtcdCondition(t *testing.T) { defer stubs.Reset() mockEtcdCli.EXPECT().Get(gomock.Any(), etcdHealthKey, gomock.Any()).Return(nil, errTest).AnyTimes() mockEtcdCli.EXPECT().Close().AnyTimes() - ret = GetEtcdCondition(ctx, []string{"etcd:2379"}) + ret = GetEtcdCondition(ctx, EtcdAuthConfig{}, []string{"etcd:2379"}) assert.Equal(t, corev1.ConditionFalse, ret.Status) assert.Equal(t, v1beta1.ReasonEtcdNotReady, ret.Reason) @@ -229,7 +229,7 @@ func TestGetEtcdCondition(t *testing.T) { mockEtcdCli.EXPECT().Get(gomock.Any(), etcdHealthKey, gomock.Any()).Return(nil, rpctypes.ErrPermissionDenied).AnyTimes() mockEtcdCli.EXPECT().AlarmList(gomock.Any()).Return(nil, errTest).AnyTimes() mockEtcdCli.EXPECT().Close().AnyTimes() - ret = GetEtcdCondition(ctx, []string{"etcd:2379"}) + ret = GetEtcdCondition(ctx, EtcdAuthConfig{}, []string{"etcd:2379"}) assert.Equal(t, corev1.ConditionFalse, ret.Status) assert.Equal(t, v1beta1.ReasonEtcdNotReady, ret.Reason) @@ -241,7 +241,7 @@ func TestGetEtcdCondition(t *testing.T) { }, }, nil).AnyTimes() mockEtcdCli.EXPECT().Close().AnyTimes() - ret = GetEtcdCondition(ctx, []string{"etcd:2379"}) + ret = GetEtcdCondition(ctx, EtcdAuthConfig{}, []string{"etcd:2379"}) assert.Equal(t, corev1.ConditionFalse, ret.Status) assert.Equal(t, v1beta1.ReasonEtcdNotReady, ret.Reason) diff --git a/pkg/controllers/status_cluster.go b/pkg/controllers/status_cluster.go index 8765e3c8..8244aafa 100644 --- a/pkg/controllers/status_cluster.go +++ b/pkg/controllers/status_cluster.go @@ -485,7 +485,7 @@ func (r *MilvusStatusSyncer) GetMinioCondition( } func (r *MilvusStatusSyncer) GetEtcdCondition(ctx context.Context, mc v1beta1.Milvus) (v1beta1.MilvusCondition, error) { - getter := wrapEtcdConditionGetter(ctx, mc.Spec.Dep.Etcd.Endpoints) + getter := wrapEtcdConditionGetter(ctx, &mc, mc.Spec.Dep.Etcd.Endpoints) return GetCondition(getter, mc.Spec.Dep.Etcd.Endpoints), nil } diff --git a/pkg/controllers/status_cluster_test.go b/pkg/controllers/status_cluster_test.go index bf18d2c0..d00108ca 100644 --- a/pkg/controllers/status_cluster_test.go +++ b/pkg/controllers/status_cluster_test.go @@ -5,7 +5,6 @@ import ( "errors" "testing" - "github.com/go-logr/logr" "github.com/milvus-io/milvus-operator/apis/milvus.io/v1beta1" "github.com/milvus-io/milvus-operator/pkg/util" "github.com/prashantv/gostub" @@ -438,14 +437,6 @@ func mockConditionGetter() v1beta1.MilvusCondition { return v1beta1.MilvusCondition{Reason: "update"} } -func TestWrapGetter(t *testing.T) { - var getter func() v1beta1.MilvusCondition - getter = wrapEtcdConditionGetter(nil, []string{}) - assert.NotNil(t, getter) - getter = wrapMinioConditionGetter(nil, logr.Logger{}, nil, StorageConditionInfo{}) - assert.NotNil(t, getter) -} - func Test_updateMetrics(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() diff --git a/test/min-milvus-feature.yaml b/test/min-milvus-feature.yaml index 5c8e835c..38fa1b8e 100644 --- a/test/min-milvus-feature.yaml +++ b/test/min-milvus-feature.yaml @@ -66,6 +66,10 @@ spec: pvcDeletion: true values: replicaCount: 1 + auth: + rbac: + enabled: true + rootPassword: myrootpass storage: inCluster: deletionPolicy: Delete @@ -186,3 +190,8 @@ spec: pulsar: authPlugin: token authParams: file:/milvus/pulsar/token + etcd: + auth: + enabled: true + userName: root + password: myrootpass