diff --git a/Makefile b/Makefile index 0444b69..3dfca25 100644 --- a/Makefile +++ b/Makefile @@ -3,4 +3,8 @@ LDFLAGS := "-s -w -X 'github.com/database-mesh/waterline/pkg/vesrion.GitCommit=` build: mkdir -p bin go build -ldflags=${LDFLAGS} -o bin/waterline cmd/waterline/main.go +linux: + mkdir -p bin + GOOS=linux GOARCH=amd64 go build -ldflags=${LDFLAGS} -o bin/waterline cmd/waterline/main.go + diff --git a/api/v1alpha1/sqltrafficqos_types.go b/api/v1alpha1/trafficqos_types.go similarity index 75% rename from api/v1alpha1/sqltrafficqos_types.go rename to api/v1alpha1/trafficqos_types.go index da65405..a40979c 100644 --- a/api/v1alpha1/sqltrafficqos_types.go +++ b/api/v1alpha1/trafficqos_types.go @@ -41,20 +41,20 @@ type TrafficQoSGroup struct { Ceil string `json:"ceil",omitempty` } -// SQLTrafficQoSSpec defines the desired state of SQLTrafficQoS -type SQLTrafficQoSSpec struct { +// TrafficQoSSpec defines the desired state of TrafficQoS +type TrafficQoSSpec struct { // INSERT ADDITIONAL SPEC FIELDS - desired state of cluster // Important: Run "make" to regenerate code after modifying this file - // Foo is an example field of SQLTrafficQoS. Edit sqltrafficqos_types.go to remove/update + // Foo is an example field of TrafficQoS. Edit trafficqos_types.go to remove/update NetworkDevice string `json:"networkDevice"` QoSClass QoSClassType `json:"qosClass,omitempty"` Strategy TrafficQoSStrategy `json:"strategy",omitempty` Groups []TrafficQoSGroup `json:"groups"` } -// SQLTrafficQoSStatus defines the observed state of SQLTrafficQoS -type SQLTrafficQoSStatus struct { +// TrafficQoSStatus defines the observed state of TrafficQoS +type TrafficQoSStatus struct { // INSERT ADDITIONAL STATUS FIELD - define observed state of cluster // Important: Run "make" to regenerate code after modifying this file @@ -64,24 +64,24 @@ type SQLTrafficQoSStatus struct { //+kubebuilder:object:root=true //+kubebuilder:subresource:status -// SQLTrafficQoS is the Schema for the sqltrafficqos API -type SQLTrafficQoS struct { +// TrafficQoS is the Schema for the trafficqos API +type TrafficQoS struct { metav1.TypeMeta `json:",inline"` metav1.ObjectMeta `json:"metadata,omitempty"` - Spec SQLTrafficQoSSpec `json:"spec,omitempty"` - Status SQLTrafficQoSStatus `json:"status,omitempty"` + Spec TrafficQoSSpec `json:"spec,omitempty"` + Status TrafficQoSStatus `json:"status,omitempty"` } //+kubebuilder:object:root=true -// SQLTrafficQoSList contains a list of SQLTrafficQoS -type SQLTrafficQoSList struct { +// TrafficQoSList contains a list of TrafficQoS +type TrafficQoSList struct { metav1.TypeMeta `json:",inline"` metav1.ListMeta `json:"metadata,omitempty"` - Items []SQLTrafficQoS `json:"items"` + Items []TrafficQoS `json:"items"` } func init() { - SchemeBuilder.Register(&SQLTrafficQoS{}, &SQLTrafficQoSList{}) + SchemeBuilder.Register(&TrafficQoS{}, &TrafficQoSList{}) } diff --git a/api/v1alpha1/sqltrafficqos_webhook.go b/api/v1alpha1/trafficqos_webhook.go similarity index 64% rename from api/v1alpha1/sqltrafficqos_webhook.go rename to api/v1alpha1/trafficqos_webhook.go index 0c3ae15..5919bb3 100644 --- a/api/v1alpha1/sqltrafficqos_webhook.go +++ b/api/v1alpha1/trafficqos_webhook.go @@ -22,9 +22,9 @@ import ( ) // log is for logging in this package. -var sqltrafficqoslog = logf.Log.WithName("sqltrafficqos-resource") +var trafficqoslog = logf.Log.WithName("trafficqos-resource") -func (r *SQLTrafficQoS) SetupWebhookWithManager(mgr ctrl.Manager) error { +func (r *TrafficQoS) SetupWebhookWithManager(mgr ctrl.Manager) error { return ctrl.NewWebhookManagedBy(mgr). For(r). Complete() @@ -32,25 +32,25 @@ func (r *SQLTrafficQoS) SetupWebhookWithManager(mgr ctrl.Manager) error { // TODO(user): EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN! -//+kubebuilder:webhook:path=/mutate-database-mesh-io-database-mesh-io-v1alpha1-sqltrafficqos,mutating=true,failurePolicy=fail,sideEffects=None,groups=database-mesh.io.database-mesh.io,resources=sqltrafficqos,verbs=create;update,versions=v1alpha1,name=msqltrafficqos.kb.io,admissionReviewVersions=v1 +//+kubebuilder:webhook:path=/mutate-database-mesh-io-database-mesh-io-v1alpha1-trafficqos,mutating=true,failurePolicy=fail,sideEffects=None,groups=database-mesh.io.database-mesh.io,resources=trafficqos,verbs=create;update,versions=v1alpha1,name=mtrafficqos.kb.io,admissionReviewVersions=v1 -var _ webhook.Defaulter = &SQLTrafficQoS{} +var _ webhook.Defaulter = &TrafficQoS{} // Default implements webhook.Defaulter so a webhook will be registered for the type -func (r *SQLTrafficQoS) Default() { - sqltrafficqoslog.Info("default", "name", r.Name) +func (r *TrafficQoS) Default() { + trafficqoslog.Info("default", "name", r.Name) // TODO(user): fill in your defaulting logic. } // TODO(user): change verbs to "verbs=create;update;delete" if you want to enable deletion validation. -//+kubebuilder:webhook:path=/validate-database-mesh-io-database-mesh-io-v1alpha1-sqltrafficqos,mutating=false,failurePolicy=fail,sideEffects=None,groups=database-mesh.io.database-mesh.io,resources=sqltrafficqos,verbs=create;update,versions=v1alpha1,name=vsqltrafficqos.kb.io,admissionReviewVersions=v1 +//+kubebuilder:webhook:path=/validate-database-mesh-io-database-mesh-io-v1alpha1-trafficqos,mutating=false,failurePolicy=fail,sideEffects=None,groups=database-mesh.io.database-mesh.io,resources=trafficqos,verbs=create;update,versions=v1alpha1,name=vtrafficqos.kb.io,admissionReviewVersions=v1 -var _ webhook.Validator = &SQLTrafficQoS{} +var _ webhook.Validator = &TrafficQoS{} // ValidateCreate implements webhook.Validator so a webhook will be registered for the type -func (r *SQLTrafficQoS) ValidateCreate() error { - sqltrafficqoslog.Info("validate create", "name", r.Name) +func (r *TrafficQoS) ValidateCreate() error { + trafficqoslog.Info("validate create", "name", r.Name) // TODO(user): fill in your validation logic upon object creation. @@ -58,16 +58,16 @@ func (r *SQLTrafficQoS) ValidateCreate() error { } // ValidateUpdate implements webhook.Validator so a webhook will be registered for the type -func (r *SQLTrafficQoS) ValidateUpdate(old runtime.Object) error { - sqltrafficqoslog.Info("validate update", "name", r.Name) +func (r *TrafficQoS) ValidateUpdate(old runtime.Object) error { + trafficqoslog.Info("validate update", "name", r.Name) // TODO(user): fill in your validation logic upon object update. return nil } // ValidateDelete implements webhook.Validator so a webhook will be registered for the type -func (r *SQLTrafficQoS) ValidateDelete() error { - sqltrafficqoslog.Info("validate delete", "name", r.Name) +func (r *TrafficQoS) ValidateDelete() error { + trafficqoslog.Info("validate delete", "name", r.Name) // TODO(user): fill in your validation logic upon object deletion. return nil diff --git a/api/v1alpha1/trafficqosmapping_webhook.go b/api/v1alpha1/trafficqosmapping_webhook.go new file mode 100644 index 0000000..65f69b0 --- /dev/null +++ b/api/v1alpha1/trafficqosmapping_webhook.go @@ -0,0 +1,74 @@ +// Copyright 2022 Database Mesh Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package v1alpha1 + +import ( + "k8s.io/apimachinery/pkg/runtime" + ctrl "sigs.k8s.io/controller-runtime" + logf "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/webhook" +) + +// log is for logging in this package. +var trafficqosmappinglog = logf.Log.WithName("trafficqosmapping-resource") + +func (r *TrafficQoSMapping) SetupWebhookWithManager(mgr ctrl.Manager) error { + return ctrl.NewWebhookManagedBy(mgr). + For(r). + Complete() +} + +// TODO(user): EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN! + +//+kubebuilder:webhook:path=/mutate-database-mesh-io-database-mesh-io-v1alpha1-TrafficQoSMapping,mutating=true,failurePolicy=fail,sideEffects=None,groups=database-mesh.io.database-mesh.io,resources=trafficqosmapping,verbs=create;update,versions=v1alpha1,name=mtrafficqosmapping.kb.io,admissionReviewVersions=v1 + +var _ webhook.Defaulter = &TrafficQoSMapping{} + +// Default implements webhook.Defaulter so a webhook will be registered for the type +func (r *TrafficQoSMapping) Default() { + trafficqosmappinglog.Info("default", "name", r.Name) + + // TODO(user): fill in your defaulting logic. +} + +// TODO(user): change verbs to "verbs=create;update;delete" if you want to enable deletion validation. +//+kubebuilder:webhook:path=/validate-database-mesh-io-database-mesh-io-v1alpha1-TrafficQoSMapping,mutating=false,failurePolicy=fail,sideEffects=None,groups=database-mesh.io.database-mesh.io,resources=trafficqosmapping,verbs=create;update,versions=v1alpha1,name=vtrafficqosmapping.kb.io,admissionReviewVersions=v1 + +var _ webhook.Validator = &TrafficQoSMapping{} + +// ValidateCreate implements webhook.Validator so a webhook will be registered for the type +func (r *TrafficQoSMapping) ValidateCreate() error { + trafficqosmappinglog.Info("validate create", "name", r.Name) + + // TODO(user): fill in your validation logic upon object creation. + + return nil +} + +// ValidateUpdate implements webhook.Validator so a webhook will be registered for the type +func (r *TrafficQoSMapping) ValidateUpdate(old runtime.Object) error { + trafficqosmappinglog.Info("validate update", "name", r.Name) + + // TODO(user): fill in your validation logic upon object update. + return nil +} + +// ValidateDelete implements webhook.Validator so a webhook will be registered for the type +func (r *TrafficQoSMapping) ValidateDelete() error { + trafficqosmappinglog.Info("validate delete", "name", r.Name) + + // TODO(user): fill in your validation logic upon object deletion. + return nil +} diff --git a/api/v1alpha1/trafficqosmappping_types.go b/api/v1alpha1/trafficqosmappping_types.go new file mode 100644 index 0000000..94f6ef9 --- /dev/null +++ b/api/v1alpha1/trafficqosmappping_types.go @@ -0,0 +1,66 @@ +// Copyright 2022 Database Mesh Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package v1alpha1 + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN! +// NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized. + +// TrafficQoSMappingSpec defines the desired state of TrafficQoSMapping +type TrafficQoSMappingSpec struct { + // INSERT ADDITIONAL SPEC FIELDS - desired state of cluster + // Important: Run "make" to regenerate code after modifying this file + + // Foo is an example field of TrafficQoSMapping. Edit trafficqos_types.go to remove/update + References map[string]string `json:"references` + + // + // 1:1 <- virtualdatabase namespace / name +} + +// TrafficQoSMappingStatus defines the observed state of TrafficQoSMapping +type TrafficQoSMappingStatus struct { + // INSERT ADDITIONAL STATUS FIELD - define observed state of cluster + // Important: Run "make" to regenerate code after modifying this file + + //TODO: add ObservedGeneration +} + +//+kubebuilder:object:root=true +//+kubebuilder:subresource:status + +// TrafficQoSMapping is the Schema for the trafficqos API +type TrafficQoSMapping struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty"` + + Spec TrafficQoSMappingSpec `json:"spec,omitempty"` +} + +//+kubebuilder:object:root=true + +// TrafficQoSMappingList contains a list of TrafficQoSMapping +type TrafficQoSMappingList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata,omitempty"` + Items []TrafficQoSMapping `json:"items"` +} + +func init() { + SchemeBuilder.Register(&TrafficQoSMapping{}, &TrafficQoSMappingList{}) +} diff --git a/api/v1alpha1/virtualdatabase_types.go b/api/v1alpha1/virtualdatabase_types.go index 9d35e47..51f0376 100644 --- a/api/v1alpha1/virtualdatabase_types.go +++ b/api/v1alpha1/virtualdatabase_types.go @@ -46,18 +46,18 @@ type VirtualDatabaseSpec struct { // Important: Run "make" to regenerate code after modifying this file // Foo is an example field of VirtualDatabase. Edit virtualdatabase_types.go to remove/update - Selector map[string]string `json:"selector"` + Selector map[string]string `json:"selector"` // TODO: Is it needed ? Or is it applied with Endpoints ? Server VirtualDatabaseServer `json:"server"` QoS string `json:"qos"` - Bandwidth string `json:"bandwidth` + Bandwidth string `json:"bandwidth"` } // VirtualDatabaseStatus defines the observed state of VirtualDatabase type VirtualDatabaseStatus struct { // INSERT ADDITIONAL STATUS FIELD - define observed state of cluster // Important: Run "make" to regenerate code after modifying this file - PodList []string `json:"podList"` - ObservedGeneration int64 `json:"observedGeneration,omitempty"` + ClassInfo string `json:"classInfo"` + ObservedGeneration int64 `json:"observedGeneration,omitempty"` } type Pod string diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index 4611627..a92a304 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -23,8 +23,10 @@ import ( "k8s.io/apimachinery/pkg/runtime" ) + + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *SQLTrafficQoS) DeepCopyInto(out *SQLTrafficQoS) { +func (in *TrafficQoS) DeepCopyInto(out *TrafficQoS) { *out = *in out.TypeMeta = in.TypeMeta in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) @@ -32,18 +34,18 @@ func (in *SQLTrafficQoS) DeepCopyInto(out *SQLTrafficQoS) { out.Status = in.Status } -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SQLTrafficQoS. -func (in *SQLTrafficQoS) DeepCopy() *SQLTrafficQoS { +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TrafficQoS. +func (in *TrafficQoS) DeepCopy() *TrafficQoS { if in == nil { return nil } - out := new(SQLTrafficQoS) + out := new(TrafficQoS) in.DeepCopyInto(out) return out } // DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. -func (in *SQLTrafficQoS) DeepCopyObject() runtime.Object { +func (in *TrafficQoS) DeepCopyObject() runtime.Object { if c := in.DeepCopy(); c != nil { return c } @@ -51,31 +53,31 @@ func (in *SQLTrafficQoS) DeepCopyObject() runtime.Object { } // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *SQLTrafficQoSList) DeepCopyInto(out *SQLTrafficQoSList) { +func (in *TrafficQoSList) DeepCopyInto(out *TrafficQoSList) { *out = *in out.TypeMeta = in.TypeMeta in.ListMeta.DeepCopyInto(&out.ListMeta) if in.Items != nil { in, out := &in.Items, &out.Items - *out = make([]SQLTrafficQoS, len(*in)) + *out = make([]TrafficQoS, len(*in)) for i := range *in { (*in)[i].DeepCopyInto(&(*out)[i]) } } } -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SQLTrafficQoSList. -func (in *SQLTrafficQoSList) DeepCopy() *SQLTrafficQoSList { +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TrafficQoSList. +func (in *TrafficQoSList) DeepCopy() *TrafficQoSList { if in == nil { return nil } - out := new(SQLTrafficQoSList) + out := new(TrafficQoSList) in.DeepCopyInto(out) return out } // DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. -func (in *SQLTrafficQoSList) DeepCopyObject() runtime.Object { +func (in *TrafficQoSList) DeepCopyObject() runtime.Object { if c := in.DeepCopy(); c != nil { return c } @@ -83,7 +85,7 @@ func (in *SQLTrafficQoSList) DeepCopyObject() runtime.Object { } // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *SQLTrafficQoSSpec) DeepCopyInto(out *SQLTrafficQoSSpec) { +func (in *TrafficQoSSpec) DeepCopyInto(out *TrafficQoSSpec) { *out = *in if in.Groups != nil { in, out := &in.Groups, &out.Groups @@ -92,27 +94,27 @@ func (in *SQLTrafficQoSSpec) DeepCopyInto(out *SQLTrafficQoSSpec) { } } -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SQLTrafficQoSSpec. -func (in *SQLTrafficQoSSpec) DeepCopy() *SQLTrafficQoSSpec { +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TrafficQoSSpec. +func (in *TrafficQoSSpec) DeepCopy() *TrafficQoSSpec { if in == nil { return nil } - out := new(SQLTrafficQoSSpec) + out := new(TrafficQoSSpec) in.DeepCopyInto(out) return out } // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *SQLTrafficQoSStatus) DeepCopyInto(out *SQLTrafficQoSStatus) { +func (in *TrafficQoSStatus) DeepCopyInto(out *TrafficQoSStatus) { *out = *in } -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SQLTrafficQoSStatus. -func (in *SQLTrafficQoSStatus) DeepCopy() *SQLTrafficQoSStatus { +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TrafficQoSStatus. +func (in *TrafficQoSStatus) DeepCopy() *TrafficQoSStatus { if in == nil { return nil } - out := new(SQLTrafficQoSStatus) + out := new(TrafficQoSStatus) in.DeepCopyInto(out) return out } @@ -252,11 +254,6 @@ func (in *VirtualDatabaseSpec) DeepCopy() *VirtualDatabaseSpec { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *VirtualDatabaseStatus) DeepCopyInto(out *VirtualDatabaseStatus) { *out = *in - if in.PodList != nil { - in, out := &in.PodList, &out.PodList - *out = make([]string, len(*in)) - copy(*out, *in) - } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new VirtualDatabaseStatus. @@ -267,4 +264,97 @@ func (in *VirtualDatabaseStatus) DeepCopy() *VirtualDatabaseStatus { out := new(VirtualDatabaseStatus) in.DeepCopyInto(out) return out -} \ No newline at end of file +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *TrafficQoSMapping) DeepCopyInto(out *TrafficQoSMapping) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + in.Spec.DeepCopyInto(&out.Spec) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TrafficQoS. +func (in *TrafficQoSMapping) DeepCopy() *TrafficQoSMapping { + if in == nil { + return nil + } + out := new(TrafficQoSMapping) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *TrafficQoSMapping) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *TrafficQoSMappingList) DeepCopyInto(out *TrafficQoSMappingList) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ListMeta.DeepCopyInto(&out.ListMeta) + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]TrafficQoSMapping, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TrafficQoSList. +func (in *TrafficQoSMappingList) DeepCopy() *TrafficQoSMappingList { + if in == nil { + return nil + } + out := new(TrafficQoSMappingList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *TrafficQoSMappingList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *TrafficQoSMappingSpec) DeepCopyInto(out *TrafficQoSMappingSpec) { + *out = *in + if in.References != nil { + in, out := &in.References, &out.References + *out = map[string]string{} + *out = *in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TrafficQoSSpec. +func (in *TrafficQoSMappingSpec) DeepCopy() *TrafficQoSMappingSpec { + if in == nil { + return nil + } + out := new(TrafficQoSMappingSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *TrafficQoSMappingStatus) DeepCopyInto(out *TrafficQoSMappingStatus) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TrafficQoSStatus. +func (in *TrafficQoSMappingStatus) DeepCopy() *TrafficQoSMappingStatus { + if in == nil { + return nil + } + out := new(TrafficQoSMappingStatus) + in.DeepCopyInto(out) + return out +} diff --git a/cmd/waterline/main.go b/cmd/waterline/main.go index f68a976..dee59cf 100644 --- a/cmd/waterline/main.go +++ b/cmd/waterline/main.go @@ -54,5 +54,7 @@ func main() { log.Fatalf("new server error") } - s.Run() + if err := s.Run(); err != nil { + log.Fatalf("server run error: %s", err) + } } diff --git a/go.mod b/go.mod index 7e458b4..5e9a21a 100644 --- a/go.mod +++ b/go.mod @@ -22,6 +22,7 @@ require ( github.com/mlycore/log v0.2.16 github.com/pkg/errors v0.9.1 github.com/vishvananda/netlink v1.1.1-0.20210330154013-f5de75959ad5 + github.com/vishvananda/netns v0.0.0-20210104183010-2eb08e3e575f golang.org/x/sync v0.0.0-20210220032951-036812b2e83c k8s.io/api v0.23.0 k8s.io/apimachinery v0.23.0 @@ -62,7 +63,6 @@ require ( github.com/prometheus/procfs v0.7.3 // indirect github.com/sirupsen/logrus v1.8.1 // indirect github.com/spf13/pflag v1.0.5 // indirect - github.com/vishvananda/netns v0.0.0-20210104183010-2eb08e3e575f // indirect golang.org/x/net v0.0.0-20211216030914-fe4d6282115f // indirect golang.org/x/oauth2 v0.0.0-20210819190943-2bc19b11175f // indirect golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e // indirect diff --git a/manifests/traffic-qos-mapping-v1alpha1.yaml b/manifests/traffic-qos-mapping-v1alpha1.yaml new file mode 100644 index 0000000..36cfe3d --- /dev/null +++ b/manifests/traffic-qos-mapping-v1alpha1.yaml @@ -0,0 +1,54 @@ + +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + annotations: + controller-gen.kubebuilder.io/version: v0.7.0 + creationTimestamp: null + name: trafficqosmapping.database-mesh.io +spec: + group: database-mesh.io + names: + kind: TrafficQoSMapping + listKind: TrafficQoSMappingList + plural: trafficqosmapping + singular: trafficqosmapping + scope: Namespaced + versions: + - name: v1alpha1 + schema: + openAPIV3Schema: + description: TrafficQoSMapping is the Schema for the trafficqos API + properties: + apiVersion: + description: 'APIVersion defines the versioned schema of this representation + of an object. Servers should convert recognized schemas to the latest + internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources' + type: string + kind: + description: 'Kind is a string value representing the REST resource this + object represents. Servers may infer this from the endpoint the client + submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds' + type: string + metadata: + type: object + spec: + description: TrafficQoSMappingSpec defines the desired state of TrafficQoSMapping + properties: + references: + type: object + required: + - references + type: object + type: object + served: true + storage: true + subresources: + status: {} +status: + acceptedNames: + kind: "" + plural: "" + conditions: [] + storedVersions: [] diff --git a/manifests/sql-traffic-qos-v1alpha1.yaml b/manifests/traffic-qos-v1alpha1.yaml similarity index 69% rename from manifests/sql-traffic-qos-v1alpha1.yaml rename to manifests/traffic-qos-v1alpha1.yaml index 52c6ae4..8ef694e 100644 --- a/manifests/sql-traffic-qos-v1alpha1.yaml +++ b/manifests/traffic-qos-v1alpha1.yaml @@ -6,20 +6,20 @@ metadata: annotations: controller-gen.kubebuilder.io/version: v0.7.0 creationTimestamp: null - name: sqltrafficqos.database-mesh.io + name: trafficqos.database-mesh.io spec: group: database-mesh.io names: - kind: SQLTrafficQoS - listKind: SQLTrafficQoSList - plural: sqltrafficqos - singular: sqltrafficqos + kind: TrafficQoS + listKind: TrafficQoSList + plural: trafficqos + singular: trafficqos scope: Namespaced versions: - name: v1alpha1 schema: openAPIV3Schema: - description: SQLTrafficQoS is the Schema for the sqltrafficqos API + description: TrafficQoS is the Schema for the trafficqos API properties: apiVersion: description: 'APIVersion defines the versioned schema of this representation @@ -34,41 +34,33 @@ spec: metadata: type: object spec: - description: SQLTrafficQoSSpec defines the desired state of SQLTrafficQoS + description: TrafficQoSSpec defines the desired state of TrafficQoS properties: groups: items: properties: ceil: type: string - classId: - type: string - networkDevice: - type: string - parent: - type: string rate: type: string required: - - ceil - - classId - - networkDevice - - parent - rate type: object type: array - qosClass: - description: Foo is an example field of SQLTrafficQoS. Edit sqltrafficqos_types.go + networkDevice: + description: Foo is an example field of TrafficQoS. Edit trafficqos_types.go to remove/update type: string + qosClass: + type: string strategy: type: string required: - groups - - strategy + - networkDevice type: object status: - description: SQLTrafficQoSStatus defines the observed state of SQLTrafficQoS + description: TrafficQoSStatus defines the observed state of TrafficQoS type: object type: object served: true diff --git a/manifests/virtual-database-v1alpha1.yaml b/manifests/virtual-database-v1alpha1.yaml index b8f2cd5..df9c568 100644 --- a/manifests/virtual-database-v1alpha1.yaml +++ b/manifests/virtual-database-v1alpha1.yaml @@ -36,11 +36,17 @@ spec: spec: description: VirtualDatabaseSpec defines the desired state of VirtualDatabase properties: + bandwidth: + type: string qos: type: string - server: + selector: + additionalProperties: + type: string description: Foo is an example field of VirtualDatabase. Edit virtualdatabase_types.go to remove/update + type: object + server: properties: backends: items: @@ -70,11 +76,26 @@ spec: - protocol type: object required: + - bandwidth - qos + - selector - server type: object status: description: VirtualDatabaseStatus defines the observed state of VirtualDatabase + properties: + observedGeneration: + format: int64 + type: integer + podList: + description: 'INSERT ADDITIONAL STATUS FIELD - define observed state + of cluster Important: Run "make" to regenerate code after modifying + this file' + items: + type: string + type: array + required: + - podList type: object type: object served: true diff --git a/pkg/bpf/load.go b/pkg/bpf/load.go index e9db47b..473f25e 100644 --- a/pkg/bpf/load.go +++ b/pkg/bpf/load.go @@ -18,12 +18,15 @@ import ( "bytes" "encoding/binary" "fmt" + + // "C" + "net" + "syscall" + "github.com/cilium/ebpf" "github.com/cilium/ebpf/ringbuf" "github.com/mlycore/log" "golang.org/x/sys/unix" - "net" - "syscall" ) const ( @@ -31,13 +34,19 @@ const ( TcPktMap = "/sys/fs/bpf/tc/globals/my_pkt" ) -func Load(ifaceName string, port uint16) error { - tcMap, err := loadTcPktMap() +type Loader struct { +} + +func (l *Loader) Load(ifaceName string, port uint16) error { + tcMap, err := l.LoadTcPkgMap() + // TODO: add loader to load this program to net dev + // TODO: add port + if err != nil { return err } - if err := loadSockFilter(ifaceName, port, tcMap); err != nil { + if err := l.LoadSockFilter(ifaceName, port, tcMap); err != nil { return err } @@ -54,7 +63,7 @@ type Objs struct { MyPktEvt *ebpf.Map `ebpf:"my_pkt_evt"` } -func loadSockFilter(ifaceName string, port uint16, tcPkt *ebpf.Map) error { +func (l *Loader) LoadSockFilter(ifaceName string, port uint16, tcPkt *ebpf.Map) error { spec, err := ebpf.LoadCollectionSpec(SockFilter) if err != nil { return err @@ -73,7 +82,7 @@ func loadSockFilter(ifaceName string, port uint16, tcPkt *ebpf.Map) error { return fmt.Errorf("set filter port error: %v", err) } - sock, err := openRawSock(ifaceName) + sock, err := l.OpenRawSock(ifaceName) if err != nil { return err } @@ -92,23 +101,24 @@ func loadSockFilter(ifaceName string, port uint16, tcPkt *ebpf.Map) error { } for { - evt, query, err := readRecord(objs, reader) + evt, query, err := l.ReadRecord(objs, reader) if err != nil { log.Warnln(err) continue } evt_value := EventValue{ - ClassId: calcQos(query), + ClassId: l.CalcQoS(query), } if err := tcPkt.Update(&evt, &evt_value, ebpf.UpdateAny); err != nil { log.Warnln(err) } } + return nil } -func openRawSock(ifaceName string) (int, error) { +func (l *Loader) OpenRawSock(ifaceName string) (int, error) { sock, err := syscall.Socket(syscall.AF_PACKET, syscall.SOCK_RAW|syscall.SOCK_NONBLOCK|syscall.SOCK_CLOEXEC, int(htons(syscall.ETH_P_ALL))) if err != nil { @@ -132,11 +142,11 @@ func openRawSock(ifaceName string) (int, error) { return sock, nil } -func loadTcPktMap() (*ebpf.Map, error) { +func (l *Loader) LoadTcPkgMap() (*ebpf.Map, error) { return ebpf.LoadPinnedMap(TcPktMap, nil) } - -func calcQos(query string) uint32 { +func (l *Loader) CalcQoS(query string) uint32 { + //TODO: virtual database tc argument return 0 } @@ -156,7 +166,7 @@ type EventValue struct { } // readRecord read record from ringbuf -func readRecord(objs Objs, reader *ringbuf.Reader) (EventKey, string, error) { +func (l *Loader) ReadRecord(objs Objs, reader *ringbuf.Reader) (EventKey, string, error) { record, err := reader.Read() if err != nil { return EventKey{}, "", fmt.Errorf("reading from reader: %s", err) diff --git a/pkg/bpf/sock_filter.c b/pkg/bpf/sock_filter.c deleted file mode 100644 index 321166c..0000000 --- a/pkg/bpf/sock_filter.c +++ /dev/null @@ -1,193 +0,0 @@ -// Copyright 2022 Database Mesh Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include -#include -#include -#include -#include -#include "headers/bpf_helpers.h" -#include "headers/bpf_endian.h" - -struct bpf_map_def SEC("maps") filter_helper = { - .type = BPF_MAP_TYPE_HASH, - .key_size = sizeof(__u8), - .value_size = sizeof(__u16), - .max_entries = 2, -}; - -struct bpf_map_def SEC("maps") buf = { - .type = BPF_MAP_TYPE_ARRAY, - .key_size = sizeof(__u32), - .value_size = sizeof(__u8), - .max_entries = 1<<16, -}; - -struct bpf_map_def SEC("maps") jmp_table = { - .type = BPF_MAP_TYPE_PROG_ARRAY, - .key_size = sizeof(__u32), - .value_size = sizeof(__u32), - .max_entries = 100, -}; - -struct event_key { - __u8 seq; - __u16 sport; - __u16 dport; - __u32 saddr; - __u32 daddr; -}; - -struct event_value { - __u32 payload_offset; - __u32 my_pkt_len; - __u32 class_id; -}; - -struct bpf_map_def SEC("maps") my_pkt = { - .type = BPF_MAP_TYPE_HASH, - .key_size = sizeof(struct event_key), - .value_size = sizeof(struct event_value), - .max_entries = 2048, -}; - -// used by tail call -struct bpf_map_def SEC("maps") tmp_pkt_evt = { - .type = BPF_MAP_TYPE_HASH, - .key_size = sizeof(__u8), - .value_size = sizeof(struct event_key), - .max_entries = 1, -}; - -struct bpf_map_def SEC("maps") my_pkt_evt = { - .type = BPF_MAP_TYPE_RINGBUF, - .max_entries = 4096, -}; - - -SEC("socket") -int sql_filter(struct __sk_buff *skb) { - if (skb->protocol != bpf_htons(ETH_P_IP)) { - return 0; - } - - struct iphdr iph; - bpf_skb_load_bytes(skb, ETH_HLEN, &iph, sizeof(iph)); - - if (iph.protocol != IPPROTO_TCP) { - return 0; - } - - __u32 ip_hlen = iph.ihl << 2; - - struct tcphdr tcph; - - bpf_skb_load_bytes(skb, ETH_HLEN + sizeof(iph), &tcph, sizeof(tcph)); - - __u32 tcp_hlen = tcph.doff << 2; - - __u8 filter_port_key = 0; - __u16 *port = bpf_map_lookup_elem(&filter_helper, &filter_port_key); - if (!port) return 0; - - if (bpf_ntohs(tcph.dest) != *port) { - return 0; - } - - __u32 payload_offset = ETH_HLEN + ip_hlen + tcp_hlen; - - __u8 header[5]; - bpf_skb_load_bytes(skb, payload_offset, &header, sizeof(header)); - - if (header[4] != 3) { - return 0; - } - - __u32 my_pkt_len = header[0] | header[1] << 8 | header[2] << 16; - - struct event_key evt_key; - __builtin_memset(&evt_key, 0, sizeof(struct event_key)); - evt_key.saddr = iph.saddr; - evt_key.sport = tcph.source, - evt_key.daddr = iph.daddr, - evt_key.dport = tcph.dest, - evt_key.seq = header[3]; - - - struct event_value evt_value; - __builtin_memset(&evt_value, 0, sizeof(struct event_value)); - evt_value.payload_offset = payload_offset; - evt_value.my_pkt_len = my_pkt_len; - evt_value.class_id = 0; - - bpf_map_update_elem(&my_pkt, &evt_key, &evt_value, BPF_ANY); - - __u8 tmp_key = 0; - bpf_map_update_elem(&tmp_pkt_evt, &tmp_key, &evt_key, BPF_ANY); - - __u8 b; - int i = 0; - __u32 buf_idx = 0; - #pragma unroll - for (i = 0; i < 200; i++) { - if (buf_idx == my_pkt_len - 1) { - bpf_ringbuf_output(&my_pkt_evt, &evt_key, sizeof(struct event_key), 0); - return -1; - } - if (bpf_skb_load_bytes(skb, payload_offset+5+i, &b, sizeof(b)) == 0) { - if (bpf_map_update_elem(&buf, &buf_idx, &b, BPF_ANY) == 0) { - buf_idx++; - } - } - } - - bpf_tail_call(skb, &jmp_table, 0); - - return -1; -} - -SEC("socket_1") -int sql_filter_1(struct __sk_buff *skb) { - __u8 tmp_key = 0; - - struct event_key *tmp_evt_key = bpf_map_lookup_elem(&tmp_pkt_evt, &tmp_key); - - if (!tmp_key) return -1; - - struct event_value *tmp_value = bpf_map_lookup_elem(&tmp_pkt_evt, &tmp_evt_key); - - if (!tmp_value) return -1; - - __u32 payload_offset = tmp_value->payload_offset; - __u32 my_pkt_len = tmp_value->my_pkt_len; - __u8 b; - __u32 buf_idx = 200; - #pragma unroll - for (int i = 200; i < 2048; i++) { - if (buf_idx == my_pkt_len - 1) { - bpf_ringbuf_output(&my_pkt_evt, tmp_value, sizeof(*tmp_value), 0); - return -1; - } - - if (bpf_skb_load_bytes(skb, payload_offset+5+i, &b, sizeof(b)) == 0) { - if (bpf_map_update_elem(&buf, &buf_idx, &b, BPF_ANY) == 0) { - buf_idx++; - } - } - } - - return -1; -} - -char LICENSE[] SEC("license") = "GPL"; \ No newline at end of file diff --git a/pkg/bpf/tc.c b/pkg/bpf/tc.c deleted file mode 100644 index 59b10a6..0000000 --- a/pkg/bpf/tc.c +++ /dev/null @@ -1,88 +0,0 @@ -#include -#include -#include -#include -#include -#include -#include "headers/bpf_helpers.h" -#include "headers/bpf_endian.h" - -// bpf_elf map definition from https://github.com/shemminger/iproute2/blob/main/include/bpf_elf.h -struct bpf_elf_map { - unsigned int type; - unsigned int size_key; - unsigned int size_value; - unsigned int max_elem; - unsigned int flags; - unsigned int id; - unsigned int pinning; - unsigned int inner_id; - unsigned int inner_idx; -}; - -struct event_key { - __u8 seq; - __u16 sport; - __u16 dport; - __u32 saddr; - __u32 daddr; -}; - -struct event_value { - __u32 payload_offset; - __u32 my_pkt_len; - __u32 class_id; -}; - -struct bpf_elf_map SEC("maps") my_pkt = { - .type = BPF_MAP_TYPE_HASH, - .size_key = sizeof(struct event_key), - .size_value = sizeof(struct event_value), - // pin path default is /sys/fs/bpf/tc/globals/my_pkt - // waterline can write qos rule to my_pkt by pin path - .pinning = 2, -}; - -// attach to eth0 || cni0 || docker0 -SEC("classifier") -int tc_egress(struct __sk_buff *skb) { - if (skb->protocol != bpf_htons(ETH_P_IP)) { - return TC_ACT_OK; - } - - struct iphdr iph; - bpf_skb_load_bytes(skb, ETH_HLEN, &iph, sizeof(iph)); - - if (iph.protocol != IPPROTO_TCP) { - return TC_ACT_OK; - } - - //__u32 ip_hlen = iph.ihl << 2; - - struct tcphdr tcph; - - bpf_skb_load_bytes(skb, ETH_HLEN + sizeof(iph), &tcph, sizeof(tcph)); - - //__u32 tcp_hlen = tcph.doff << 2; - - struct event_key evt_key; - __builtin_memset(&evt_key, 0, sizeof(struct event_key)); - evt_key.saddr = iph.daddr; - evt_key.sport = tcph.dest; - evt_key.daddr = iph.saddr; - evt_key.dport = tcph.source; - evt_key.seq = 0; - - struct event_value *evt_value = bpf_map_lookup_elem(&my_pkt, &evt_key); - if (!evt_value) return TC_ACT_OK; - - __u32 payload_offset = evt_value->payload_offset; - __u8 header[4]; - bpf_skb_load_bytes(skb, payload_offset, &header, sizeof(header)); - if (header[3] < 1) return TC_ACT_OK; - - skb->tc_classid = evt_value->class_id; - return TC_ACT_OK; -} - -char LICENSE[] SEC("license") = "GPL"; \ No newline at end of file diff --git a/pkg/kubernetes/controllers/trafficqos/controller.go b/pkg/kubernetes/controllers/trafficqos/controller.go new file mode 100644 index 0000000..7ec76e3 --- /dev/null +++ b/pkg/kubernetes/controllers/trafficqos/controller.go @@ -0,0 +1,95 @@ +// Copyright 2022 Database Mesh Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kubernetes + +import ( + "context" + + "k8s.io/apimachinery/pkg/runtime" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + + // "sigs.k8s.io/controller-runtime/pkg/log" + "github.com/mlycore/log" + + "github.com/database-mesh/waterline/api/v1alpha1" + "github.com/database-mesh/waterline/pkg/tc" +) + +// TrafficQoSReconciler reconciles a TrafficQoS object +type TrafficQoSReconciler struct { + client.Client + Scheme *runtime.Scheme +} + +//+kubebuilder:rbac:groups=database-mesh.io.my.domain,resources=sqltrafficqos,verbs=get;list;watch;create;update;patch;delete +//+kubebuilder:rbac:groups=database-mesh.io.my.domain,resources=sqltrafficqos/status,verbs=get;update;patch +//+kubebuilder:rbac:groups=database-mesh.io.my.domain,resources=sqltrafficqos/finalizers,verbs=update + +// Reconcile is part of the main kubernetes reconciliation loop which aims to +// move the current state of the cluster closer to the desired state. +// TODO(user): Modify the Reconcile function to compare the state specified by +// the TrafficQoS object against the actual cluster state, and then +// perform operations to make the cluster state reflect the state specified by +// the user. +// +// For more details, check Reconcile and its Result here: +// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.10.0/pkg/reconcile +func (r *TrafficQoSReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + // _ = log.FromContext(ctx) + + // TODO(user): your logic here + obj := &v1alpha1.TrafficQoS{} + if err := r.Client.Get(ctx, req.NamespacedName, obj); err != nil { + log.Errorf("get TrafficQos error: %s", err) + return ctrl.Result{}, nil + } + + // TODO: sync TrafficQoSStatus + defer func() { + + }() + + // TODO: add logic, remove VirtualDatabase. + // Read TrafficQoS for basic QoS class up. + // Read VirtualDatabase for application-level QoS after a Pod was scheduled on this Node + + //TODO: all rules should be removed once the resource was deleted + //TODO: is there an exception when more than one resource was created + + //TODO: add check for existed class rules + + shaper, err := tc.NewTcShaper(*obj, "1000M") + if err != nil { + log.Errorf("get shaper error: %s", err) + return ctrl.Result{Requeue: true}, nil + } + + if err = shaper.AddClasses(); err != nil { + log.Errorf("add classes error: %s", err) + return ctrl.Result{Requeue: true}, nil + } + + log.Infof("TrafficQoS: %#v", obj) + + return ctrl.Result{}, nil +} + +// SetupWithManager sets up the controller with the Manager. +func (r *TrafficQoSReconciler) SetupWithManager(mgr ctrl.Manager) error { + return ctrl.NewControllerManagedBy(mgr). + For(&v1alpha1.TrafficQoS{}). + Complete(r) +} diff --git a/pkg/kubernetes/controllers/sqltrafficqos/controller.go b/pkg/kubernetes/controllers/trafficqosmapping/controller.go similarity index 72% rename from pkg/kubernetes/controllers/sqltrafficqos/controller.go rename to pkg/kubernetes/controllers/trafficqosmapping/controller.go index 7c75ef2..c8a7a26 100644 --- a/pkg/kubernetes/controllers/sqltrafficqos/controller.go +++ b/pkg/kubernetes/controllers/trafficqosmapping/controller.go @@ -27,8 +27,8 @@ import ( "github.com/database-mesh/waterline/api/v1alpha1" ) -// SQLTrafficQoSReconciler reconciles a SQLTrafficQoS object -type SQLTrafficQoSReconciler struct { +// TrafficQoSMappingReconciler reconciles a TrafficQoSMapping object +type TrafficQoSMappingReconciler struct { client.Client Scheme *runtime.Scheme } @@ -40,49 +40,42 @@ type SQLTrafficQoSReconciler struct { // Reconcile is part of the main kubernetes reconciliation loop which aims to // move the current state of the cluster closer to the desired state. // TODO(user): Modify the Reconcile function to compare the state specified by -// the SQLTrafficQoS object against the actual cluster state, and then +// the TrafficQoSMapping object against the actual cluster state, and then // perform operations to make the cluster state reflect the state specified by // the user. // // For more details, check Reconcile and its Result here: // - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.10.0/pkg/reconcile -func (r *SQLTrafficQoSReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { +func (r *TrafficQoSMappingReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { // _ = log.FromContext(ctx) // TODO(user): your logic here - obj := &v1alpha1.SQLTrafficQoS{} - + obj := &v1alpha1.TrafficQoSMapping{} if err := r.Client.Get(ctx, req.NamespacedName, obj); err != nil { - log.Errorf("get resources error: %s", err) + log.Errorf("get TrafficQos error: %s", err) return ctrl.Result{}, nil } - // TODO: sync SQLTrafficQoSStatus + // TODO: sync TrafficQoSMappingStatus defer func() { }() // TODO: add logic, remove VirtualDatabase. - // Read SQLTrafficQoS for basic QoS class up. + // Read TrafficQoSMapping for basic QoS class up. // Read VirtualDatabase for application-level QoS after a Pod was scheduled on this Node - // err := r.SetTcs(ctx, obj) - // if err != nil { - // return ctrl.Result{Requeue: true}, nil - // } - log.Infof("SQLTrafficQoS: %#v", obj) + //TODO: all rules should be removed once the resource was deleted + //TODO: is there an exception when more than one resource was created - return ctrl.Result{}, nil -} + log.Infof("TrafficQoSMapping: %#v", obj) -func (r *SQLTrafficQoSReconciler) SetTcs(ctx context.Context, qos *v1alpha1.SQLTrafficQoS) error { - //TODO: add TC operations - return nil + return ctrl.Result{}, nil } // SetupWithManager sets up the controller with the Manager. -func (r *SQLTrafficQoSReconciler) SetupWithManager(mgr ctrl.Manager) error { +func (r *TrafficQoSMappingReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). - For(&v1alpha1.SQLTrafficQoS{}). + For(&v1alpha1.TrafficQoSMapping{}). Complete(r) } diff --git a/pkg/kubernetes/controllers/virtualdatabase/controller.go b/pkg/kubernetes/controllers/virtualdatabase/controller.go index 99347df..5a5b1fa 100644 --- a/pkg/kubernetes/controllers/virtualdatabase/controller.go +++ b/pkg/kubernetes/controllers/virtualdatabase/controller.go @@ -23,6 +23,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "github.com/database-mesh/waterline/api/v1alpha1" + // "github.com/database-mesh/waterline/pkg/bpf" ) // VirtualDatabaseReconciler reconciles a VirtualDatabase object @@ -46,6 +47,8 @@ type VirtualDatabaseReconciler struct { // - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.10.0/pkg/reconcile func (r *VirtualDatabaseReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { // TODO(user): your logic here + // TODO: all rules should be removed once the resource was deleted + // however the rules were applied with the Pod scheduled obj := &v1alpha1.VirtualDatabase{} if err := r.Client.Get(ctx, req.NamespacedName, obj); err != nil { @@ -57,6 +60,13 @@ func (r *VirtualDatabaseReconciler) Reconcile(ctx context.Context, req ctrl.Requ }() + //TODO: add tc class argument + //TODO: read mapping + + // TODO: load SockFilter + // l := &bpf.Loader{} + // l.Load() + // pod := &corev1.Pod{} // err := r.Client.Get(ctx, types.NamespacedName{ // Name: obj.Name, diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index c4ea797..8f5b2aa 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -15,12 +15,23 @@ package manager import ( - sqltrafficqos "github.com/database-mesh/waterline/pkg/kubernetes/controllers/sqltrafficqos" + "context" + "os" + "strings" + + "github.com/database-mesh/waterline/api/v1alpha1" + "github.com/database-mesh/waterline/pkg/bpf" + "github.com/database-mesh/waterline/pkg/cri" + trafficqos "github.com/database-mesh/waterline/pkg/kubernetes/controllers/trafficqos" + trafficqosmapping "github.com/database-mesh/waterline/pkg/kubernetes/controllers/trafficqosmapping" virtualdatabase "github.com/database-mesh/waterline/pkg/kubernetes/controllers/virtualdatabase" "github.com/database-mesh/waterline/pkg/kubernetes/watcher" + "github.com/database-mesh/waterline/pkg/tc" "github.com/mlycore/log" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/watch" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/healthz" ctrlmgr "sigs.k8s.io/controller-runtime/pkg/manager" ) @@ -28,6 +39,7 @@ import ( type Manager struct { Pod *watcher.PodWatcher Mgr ctrlmgr.Manager + CRI cri.ContainerRuntimeInterfaceClient } func (m *Manager) WatchAndHandle() error { @@ -35,8 +47,17 @@ func (m *Manager) WatchAndHandle() error { select { case event := <-m.Pod.Core.ResultChan(): { + pod := event.Object.(*corev1.Pod) log.Infof("[%s] pod event: %#v", event.Type, event.Object.(*corev1.Pod).Name) //TODO: Handle different types of events + switch event.Type { + case watch.Added: + handleAdded(pod, m.Mgr.GetClient(), m.CRI) + case watch.Modified: + handleModified(pod, m.Mgr.GetClient(), m.CRI) + case watch.Deleted: + handleDeleted(pod, m.Mgr.GetClient(), m.CRI) + } } } } @@ -44,11 +65,19 @@ func (m *Manager) WatchAndHandle() error { } func (m *Manager) Bootstrap() error { - if err := (&sqltrafficqos.SQLTrafficQoSReconciler{ + if err := (&trafficqos.TrafficQoSReconciler{ + Client: m.Mgr.GetClient(), + Scheme: m.Mgr.GetScheme(), + }).SetupWithManager(m.Mgr); err != nil { + log.Errorf("trafficqos setupWithManager error: %s", err) + return err + } + + if err := (&trafficqosmapping.TrafficQoSMappingReconciler{ Client: m.Mgr.GetClient(), Scheme: m.Mgr.GetScheme(), }).SetupWithManager(m.Mgr); err != nil { - log.Errorf("sqltrafficqos setupWithManager error: %s", err) + log.Errorf("trafficqosmapping setupWithManager error: %s", err) return err } @@ -72,3 +101,66 @@ func (m *Manager) Bootstrap() error { } return nil } + +func handleAdded(pod *corev1.Pod, c client.Client, cr cri.ContainerRuntimeInterfaceClient) error { + //TODO: add related rules + hostname, err := os.Hostname() + if err != nil { + return err + } + + if hostname == pod.Spec.Hostname { + list := &v1alpha1.VirtualDatabaseList{Items: []v1alpha1.VirtualDatabase{}} + + if err := c.List(context.TODO(), list, &client.ListOptions{Namespace: pod.Namespace}); err != nil { + log.Errorf("get SQLTrafficQos error: %s", err) + return err + } + + for _, db := range list.Items { + var found bool + for k, v := range db.Spec.Selector { + if pod.Labels[k] == v { + found = true + } else { + found = false + } + } + + if found { + l := &bpf.Loader{} + containerId := strings.Split(pod.Status.ContainerStatuses[0].ContainerID, "containerd://")[1] + pid, err := cr.GetPidFromContainer(context.TODO(), containerId) + if err != nil { + return err + } + ifname, err := tc.GetNetworkDeviceFromPid(pid) + if err != nil { + return err + } + // Add veth egress + err = l.Load(ifname, uint16(db.Spec.Server.Port)) + if err != nil { + return err + } + + } + + // TODO: add loader + // db.Spec.Server.Port + // db.Spec.QoS + } + + } + return nil +} + +func handleModified(pod *corev1.Pod, c client.Client, cr cri.ContainerRuntimeInterfaceClient) { + +} + +func handleDeleted(pod *corev1.Pod, c client.Client, cr cri.ContainerRuntimeInterfaceClient) { + //TODO: remove related rules + // move it to a queue ? + +} diff --git a/pkg/server/server.go b/pkg/server/server.go index 9372813..486858f 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -84,11 +84,14 @@ func (s *Server) Run() error { return err } manager := &manager.Manager{ + CRI: s.ContainerRuntimeClient, Pod: w, Mgr: mgr, } var eg errgroup.Group + // mgr.GetClient() + // apply to Pod eg.Go(func() error { log.Infof("starting controllers") diff --git a/pkg/tc/tc.go b/pkg/tc/tc.go index 7ce311f..8853231 100644 --- a/pkg/tc/tc.go +++ b/pkg/tc/tc.go @@ -15,20 +15,24 @@ package tc import ( + // "C" + "sort" + "strings" + v1alpha1 "github.com/database-mesh/waterline/api/v1alpha1" + "github.com/mlycore/log" "github.com/vishvananda/netlink" "golang.org/x/sys/unix" "k8s.io/apimachinery/pkg/api/resource" - "sort" ) type Shaper struct { - qos v1alpha1.SQLTrafficQoS + qos v1alpha1.TrafficQoS link netlink.Link totalBandWidth string } -func NewTcShaper(qos v1alpha1.SQLTrafficQoS, totalBandWidth string) (*Shaper, error) { +func NewTcShaper(qos v1alpha1.TrafficQoS, totalBandWidth string) (*Shaper, error) { link, err := netlink.LinkByName(qos.Spec.NetworkDevice) if err != nil { return nil, err @@ -50,6 +54,7 @@ func (t *Shaper) addHtbQdisc() error { } qdisc := netlink.NewHtb(attrs) + log.Infof("htb qdisc attrs: %#v, qdisc: %#v", attrs, qdisc) return netlink.QdiscReplace(qdisc) } @@ -71,15 +76,20 @@ func (t *Shaper) addRootHandle() error { } class := netlink.NewHtbClass(attrs, htbClassAttrs) + log.Infof("add root handle attrs: %#v, htbClassAttrs: %#v, qdisc: %#v", attrs, htbClassAttrs, class) return netlink.ClassReplace(class) } -func (t *Shaper) AddClasses() error { - if err := t.addHtbQdisc(); err != nil { +func (t *Shaper) AddClasses() ([]int, error) { + err := t.addHtbQdisc() + if err != nil && !strings.Contains(err.Error(), "invalid argument") { + log.Errorf("add htb qdisc error: %s", err) return err } - if err := t.addRootHandle(); err != nil { + err = t.addRootHandle() + if err != nil && !strings.Contains(err.Error(), "invalid argument") { + log.Errorf("add root handle error: %s", err) return err } @@ -89,8 +99,25 @@ func (t *Shaper) AddClasses() error { return rules[i].Rate < rules[j].Rate }) + classes, err := t.ListClass() + if err != nil { + log.Errorf("list class error: %s", err) + return err + } + + var base uint16 + for _, c := range classes { + // c.Attrs().Handle + _, minor := netlink.MajorMinor(c.Attrs().Handle) + if base < minor { + base = minor + } + } + + //TODO: add error handling. for idx, rule := range rules { - if err := t.addClass(idx, rule); err != nil { + if err := t.addClass(uint16(idx)+base, rule); err != nil { + log.Errorf("add class error: %s, rule: %s", err, rule) return err } } @@ -99,12 +126,12 @@ func (t *Shaper) AddClasses() error { } // add htb class -func (t *Shaper) addClass(idx int, rule v1alpha1.TrafficQoSGroup) error { +func (t *Shaper) addClass(idx uint16, rule v1alpha1.TrafficQoSGroup) error { attrs := netlink.ClassAttrs{ LinkIndex: t.link.Attrs().Index, Parent: netlink.MakeHandle(1, 1), //exclude 0, 1 - Handle: netlink.MakeHandle(1, uint16(idx+2)), + Handle: netlink.MakeHandle(1, idx+1), } rateValue, err := resource.ParseQuantity(rule.Rate) diff --git a/pkg/tc/utils.go b/pkg/tc/utils.go index 0f6877f..b6f17e8 100644 --- a/pkg/tc/utils.go +++ b/pkg/tc/utils.go @@ -20,7 +20,7 @@ import ( "strings" ) -func getNetworkDeviceFromPid(pid uint32) (string, error) { +func GetNetworkDeviceFromPid(pid uint32) (string, error) { igmpArgs := []string{ fmt.Sprintf("/proc/%d/net/igmp", pid), }