From 61b7fd313c97fe47a42b8a9aeba55f48f687725e Mon Sep 17 00:00:00 2001 From: mlycore Date: Mon, 21 Mar 2022 10:09:48 +0800 Subject: [PATCH 01/31] chore(module): update go module --- go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go.mod b/go.mod index 7e458b4..5e9a21a 100644 --- a/go.mod +++ b/go.mod @@ -22,6 +22,7 @@ require ( github.com/mlycore/log v0.2.16 github.com/pkg/errors v0.9.1 github.com/vishvananda/netlink v1.1.1-0.20210330154013-f5de75959ad5 + github.com/vishvananda/netns v0.0.0-20210104183010-2eb08e3e575f golang.org/x/sync v0.0.0-20210220032951-036812b2e83c k8s.io/api v0.23.0 k8s.io/apimachinery v0.23.0 @@ -62,7 +63,6 @@ require ( github.com/prometheus/procfs v0.7.3 // indirect github.com/sirupsen/logrus v1.8.1 // indirect github.com/spf13/pflag v1.0.5 // indirect - github.com/vishvananda/netns v0.0.0-20210104183010-2eb08e3e575f // indirect golang.org/x/net v0.0.0-20211216030914-fe4d6282115f // indirect golang.org/x/oauth2 v0.0.0-20210819190943-2bc19b11175f // indirect golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e // indirect From 4e8f67b302c06244ce1b67bd123b00fb072dcb97 Mon Sep 17 00:00:00 2001 From: mlycore Date: Mon, 21 Mar 2022 20:49:39 +0800 Subject: [PATCH 02/31] wip: working on apply and remove --- pkg/bpf/load.go | 156 ++++++++++++++---- .../controllers/sqltrafficqos/controller.go | 24 +-- .../controllers/virtualdatabase/controller.go | 5 + pkg/manager/manager.go | 50 ++++++ pkg/server/server.go | 2 + 5 files changed, 192 insertions(+), 45 deletions(-) diff --git a/pkg/bpf/load.go b/pkg/bpf/load.go index c228c2c..305dfe4 100644 --- a/pkg/bpf/load.go +++ b/pkg/bpf/load.go @@ -18,6 +18,7 @@ import ( "bytes" "encoding/binary" "fmt" + "github.com/cilium/ebpf" "github.com/cilium/ebpf/ringbuf" "github.com/mlycore/log" @@ -28,29 +29,27 @@ const ( TcPktMap = "/sys/fs/bpf/tc/globals/my_pkt" ) -func load() error { - m, err := loadTcPktMap() +// TODO: add loader to load this program to net dev + +type Loader struct { +} + +// TODO: add port + +func (l *Loader) Load() error { + m, err := l.LoadTcPkgMap() if err != nil { return err } - if err := loadSockFilter(m); err != nil { + if err := l.LoadSockFilter(m); err != nil { return err } return nil -} -type Objs struct { - Prog *ebpf.Program `ebpf:"sql_filter"` - TailProg1 *ebpf.Program `ebpf:"sql_filter_1"` - FilterHelper *ebpf.Map `ebpf:"filter_helper"` - Buf *ebpf.Map `ebpf:"buf"` - JmpTable *ebpf.Map `ebpf:"jmp_table"` - MyPktEvt *ebpf.Map `ebpf:"my_pkt_evt"` } - -func loadSockFilter(tcPkt *ebpf.Map) error { +func (l *Loader) LoadSockFilter(tcPkt *ebpf.Map) error { spec, err := ebpf.LoadCollectionSpec(SockFilter) if err != nil { return err @@ -71,41 +70,26 @@ func loadSockFilter(tcPkt *ebpf.Map) error { } for { - evt, query, err := readRecord(objs, reader) + evt, query, err := l.ReadRecord(objs, reader) if err != nil { log.Warnln(err) continue } - evt.ClassId = calcQos(query) + evt.ClassId = l.CalcQos(query) if err := tcPkt.Update(uint32(0), &evt, ebpf.UpdateAny); err != nil { log.Warnln(err) } } + return nil } - -func loadTcPktMap() (*ebpf.Map, error) { +func (l *Loader) LoadTcPkgMap() (*ebpf.Map, error) { return ebpf.LoadPinnedMap(TcPktMap, nil) } - -func calcQos(query string) uint32 { +func (l *Loader) CalcQoS(query string) uint32 { return 0 } - -type Event struct { - Seq uint8 - Sport uint16 - Dport uint16 - Saddr uint32 - Daddr uint32 - PktLen uint32 - // tcp payload offset - Offset uint32 - ClassId uint32 -} - -// readRecord read record from ringbuf -func readRecord(objs Objs, reader *ringbuf.Reader) (Event, string, error) { +func (l *Loader) ReadRecord(objs Objs, reader *ringbuf.Reader) (Event, string, error) { record, err := reader.Read() if err != nil { return Event{}, "", fmt.Errorf("reading from reader: %s", err) @@ -131,3 +115,107 @@ func readRecord(objs Objs, reader *ringbuf.Reader) (Event, string, error) { return evt, string(chars), nil } + +// func load() error { +// m, err := loadTcPktMap() +// if err != nil { +// return err +// } + +// if err := loadSockFilter(m); err != nil { +// return err +// } + +// return nil +// } + +type Objs struct { + Prog *ebpf.Program `ebpf:"sql_filter"` + TailProg1 *ebpf.Program `ebpf:"sql_filter_1"` + FilterHelper *ebpf.Map `ebpf:"filter_helper"` + Buf *ebpf.Map `ebpf:"buf"` + JmpTable *ebpf.Map `ebpf:"jmp_table"` + MyPktEvt *ebpf.Map `ebpf:"my_pkt_evt"` +} + +// func loadSockFilter(tcPkt *ebpf.Map) error { +// spec, err := ebpf.LoadCollectionSpec(SockFilter) +// if err != nil { +// return err +// } + +// var objs Objs +// if err := spec.LoadAndAssign(&objs, nil); err != nil { +// return err +// } + +// if err = objs.JmpTable.Update(uint32(0), uint32(objs.TailProg1.FD()), ebpf.UpdateAny); err != nil { +// return fmt.Errorf("jmptable err: %s", err) +// } + +// reader, err := ringbuf.NewReader(objs.MyPktEvt) +// if err != nil { +// return err +// } + +// for { +// evt, query, err := readRecord(objs, reader) +// if err != nil { +// log.Warnln(err) +// continue +// } + +// evt.ClassId = calcQos(query) +// if err := tcPkt.Update(uint32(0), &evt, ebpf.UpdateAny); err != nil { +// log.Warnln(err) +// } +// } +// } + +// func loadTcPktMap() (*ebpf.Map, error) { +// return ebpf.LoadPinnedMap(TcPktMap, nil) +// } + +// func calcQos(query string) uint32 { +// return 0 +// } + +type Event struct { + Seq uint8 + Sport uint16 + Dport uint16 + Saddr uint32 + Daddr uint32 + PktLen uint32 + // tcp payload offset + Offset uint32 + ClassId uint32 +} + +// readRecord read record from ringbuf +// func readRecord(objs Objs, reader *ringbuf.Reader) (Event, string, error) { +// record, err := reader.Read() +// if err != nil { +// return Event{}, "", fmt.Errorf("reading from reader: %s", err) +// } + +// var evt Event +// if err := binary.Read(bytes.NewBuffer(record.RawSample), binary.LittleEndian, &evt); err != nil { +// return Event{}, "", fmt.Errorf("parsing ringbuf event: %s", err) +// } + +// var ( +// key uint32 +// value uint8 +// entries = objs.Buf.Iterate() +// chars = make([]byte, evt.PktLen) +// ) + +// // PktLen contains COM_TYPE 1byte, so evt.PktLen-1 here +// for i := 0; i < int(evt.PktLen-1); i++ { +// entries.Next(&key, &value) +// chars = append(chars, value) +// } + +// return evt, string(chars), nil +// } diff --git a/pkg/kubernetes/controllers/sqltrafficqos/controller.go b/pkg/kubernetes/controllers/sqltrafficqos/controller.go index 7c75ef2..94feaae 100644 --- a/pkg/kubernetes/controllers/sqltrafficqos/controller.go +++ b/pkg/kubernetes/controllers/sqltrafficqos/controller.go @@ -25,6 +25,7 @@ import ( "github.com/mlycore/log" "github.com/database-mesh/waterline/api/v1alpha1" + "github.com/database-mesh/waterline/pkg/tc" ) // SQLTrafficQoSReconciler reconciles a SQLTrafficQoS object @@ -51,9 +52,8 @@ func (r *SQLTrafficQoSReconciler) Reconcile(ctx context.Context, req ctrl.Reques // TODO(user): your logic here obj := &v1alpha1.SQLTrafficQoS{} - if err := r.Client.Get(ctx, req.NamespacedName, obj); err != nil { - log.Errorf("get resources error: %s", err) + log.Errorf("get SQLTrafficQos error: %s", err) return ctrl.Result{}, nil } @@ -66,20 +66,22 @@ func (r *SQLTrafficQoSReconciler) Reconcile(ctx context.Context, req ctrl.Reques // Read SQLTrafficQoS for basic QoS class up. // Read VirtualDatabase for application-level QoS after a Pod was scheduled on this Node - // err := r.SetTcs(ctx, obj) - // if err != nil { - // return ctrl.Result{Requeue: true}, nil - // } + shaper, err := tc.NewTcShaper(*obj, "1000M") + if err != nil { + log.Errorf("get shaper error: %s", err) + return ctrl.Result{Requeue: true}, nil + } + + if err = shaper.AddClasses(); err != nil { + log.Errorf("add classes error: %s", err) + return ctrl.Result{Requeue: true}, nil + } + log.Infof("SQLTrafficQoS: %#v", obj) return ctrl.Result{}, nil } -func (r *SQLTrafficQoSReconciler) SetTcs(ctx context.Context, qos *v1alpha1.SQLTrafficQoS) error { - //TODO: add TC operations - return nil -} - // SetupWithManager sets up the controller with the Manager. func (r *SQLTrafficQoSReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). diff --git a/pkg/kubernetes/controllers/virtualdatabase/controller.go b/pkg/kubernetes/controllers/virtualdatabase/controller.go index 99347df..2a875b4 100644 --- a/pkg/kubernetes/controllers/virtualdatabase/controller.go +++ b/pkg/kubernetes/controllers/virtualdatabase/controller.go @@ -23,6 +23,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "github.com/database-mesh/waterline/api/v1alpha1" + "github.com/database-mesh/waterline/pkg/bpf" ) // VirtualDatabaseReconciler reconciles a VirtualDatabase object @@ -57,6 +58,10 @@ func (r *VirtualDatabaseReconciler) Reconcile(ctx context.Context, req ctrl.Requ }() + // TODO: load SockFilter + l := &bpf.Loader{} + l.Load() + // pod := &corev1.Pod{} // err := r.Client.Get(ctx, types.NamespacedName{ // Name: obj.Name, diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index c4ea797..e4395a4 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -15,12 +15,18 @@ package manager import ( + "context" + "os" + + "github.com/database-mesh/waterline/api/v1alpha1" sqltrafficqos "github.com/database-mesh/waterline/pkg/kubernetes/controllers/sqltrafficqos" virtualdatabase "github.com/database-mesh/waterline/pkg/kubernetes/controllers/virtualdatabase" "github.com/database-mesh/waterline/pkg/kubernetes/watcher" "github.com/mlycore/log" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/watch" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/healthz" ctrlmgr "sigs.k8s.io/controller-runtime/pkg/manager" ) @@ -35,8 +41,17 @@ func (m *Manager) WatchAndHandle() error { select { case event := <-m.Pod.Core.ResultChan(): { + pod := event.Object.(*corev1.Pod) log.Infof("[%s] pod event: %#v", event.Type, event.Object.(*corev1.Pod).Name) //TODO: Handle different types of events + switch event.Type { + case watch.Added: + handleAdded(pod, m.Mgr.GetClient()) + case watch.Modified: + handleModified(pod, m.Mgr.GetClient()) + case watch.Deleted: + handleDeleted(pod, m.Mgr.GetClient()) + } } } } @@ -72,3 +87,38 @@ func (m *Manager) Bootstrap() error { } return nil } + +func handleAdded(pod *corev1.Pod, c client.Client) error { + //TODO: add related rules + hostname, err := os.Hostname() + if err != nil { + return err + } + + if hostname == pod.Spec.Hostname { + list := &v1alpha1.VirtualDatabaseList{Items: []v1alpha1.VirtualDatabase{}} + + if err := c.List(context.TODO(), list, &client.ListOptions{Namespace: pod.Namespace}); err != nil { + log.Errorf("get SQLTrafficQos error: %s", err) + return err + } + + for _, db := range list.Items { + // TODO: add loader + // db.Spec.Server.Port + // db.Spec.QoS + } + + } + +} + +func handleModified(pod *corev1.Pod, c client.Client) { + +} + +func handleDeleted(pod *corev1.Pod, c client.Client) { + //TODO: remove related rules + // move it to a queue ? + +} diff --git a/pkg/server/server.go b/pkg/server/server.go index 9372813..e126ddf 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -89,6 +89,8 @@ func (s *Server) Run() error { } var eg errgroup.Group + // mgr.GetClient() + // apply to Pod eg.Go(func() error { log.Infof("starting controllers") From b1679eda1b30ecf4d890f371ea3058c5e167143e Mon Sep 17 00:00:00 2001 From: mlycore Date: Mon, 21 Mar 2022 10:09:48 +0800 Subject: [PATCH 03/31] chore(module): update go module --- go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go.mod b/go.mod index 7e458b4..5e9a21a 100644 --- a/go.mod +++ b/go.mod @@ -22,6 +22,7 @@ require ( github.com/mlycore/log v0.2.16 github.com/pkg/errors v0.9.1 github.com/vishvananda/netlink v1.1.1-0.20210330154013-f5de75959ad5 + github.com/vishvananda/netns v0.0.0-20210104183010-2eb08e3e575f golang.org/x/sync v0.0.0-20210220032951-036812b2e83c k8s.io/api v0.23.0 k8s.io/apimachinery v0.23.0 @@ -62,7 +63,6 @@ require ( github.com/prometheus/procfs v0.7.3 // indirect github.com/sirupsen/logrus v1.8.1 // indirect github.com/spf13/pflag v1.0.5 // indirect - github.com/vishvananda/netns v0.0.0-20210104183010-2eb08e3e575f // indirect golang.org/x/net v0.0.0-20211216030914-fe4d6282115f // indirect golang.org/x/oauth2 v0.0.0-20210819190943-2bc19b11175f // indirect golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e // indirect From a186c8757e81f022b956bf13d4b11eca147f6c04 Mon Sep 17 00:00:00 2001 From: mlycore Date: Mon, 21 Mar 2022 20:49:39 +0800 Subject: [PATCH 04/31] wip: working on apply and remove --- pkg/bpf/load.go | 25 ++++++---- .../controllers/sqltrafficqos/controller.go | 24 +++++---- .../controllers/virtualdatabase/controller.go | 5 ++ pkg/manager/manager.go | 50 +++++++++++++++++++ pkg/server/server.go | 2 + 5 files changed, 86 insertions(+), 20 deletions(-) diff --git a/pkg/bpf/load.go b/pkg/bpf/load.go index e9db47b..a17700e 100644 --- a/pkg/bpf/load.go +++ b/pkg/bpf/load.go @@ -18,6 +18,7 @@ import ( "bytes" "encoding/binary" "fmt" + "github.com/cilium/ebpf" "github.com/cilium/ebpf/ringbuf" "github.com/mlycore/log" @@ -31,13 +32,19 @@ const ( TcPktMap = "/sys/fs/bpf/tc/globals/my_pkt" ) -func Load(ifaceName string, port uint16) error { +type Loader struct { +} + +func (l *Loader) Load(ifaceName string, port uint16) error { tcMap, err := loadTcPktMap() + // TODO: add loader to load this program to net dev + // TODO: add port + if err != nil { return err } - if err := loadSockFilter(ifaceName, port, tcMap); err != nil { + if err := l.LoadSockFilter(ifaceName, port, tcMap); err != nil { return err } @@ -54,7 +61,7 @@ type Objs struct { MyPktEvt *ebpf.Map `ebpf:"my_pkt_evt"` } -func loadSockFilter(ifaceName string, port uint16, tcPkt *ebpf.Map) error { +func (l *Loader) LoadSockFilter(ifaceName string, port uint16, tcPkt *ebpf.Map) error { spec, err := ebpf.LoadCollectionSpec(SockFilter) if err != nil { return err @@ -92,20 +99,21 @@ func loadSockFilter(ifaceName string, port uint16, tcPkt *ebpf.Map) error { } for { - evt, query, err := readRecord(objs, reader) + evt, query, err := l.ReadRecord(objs, reader) if err != nil { log.Warnln(err) continue } evt_value := EventValue{ - ClassId: calcQos(query), + ClassId: l.CalcQos(query), } if err := tcPkt.Update(&evt, &evt_value, ebpf.UpdateAny); err != nil { log.Warnln(err) } } + return nil } func openRawSock(ifaceName string) (int, error) { @@ -132,11 +140,10 @@ func openRawSock(ifaceName string) (int, error) { return sock, nil } -func loadTcPktMap() (*ebpf.Map, error) { +func (l *Loader) LoadTcPkgMap() (*ebpf.Map, error) { return ebpf.LoadPinnedMap(TcPktMap, nil) } - -func calcQos(query string) uint32 { +func (l *Loader) CalcQoS(query string) uint32 { return 0 } @@ -156,7 +163,7 @@ type EventValue struct { } // readRecord read record from ringbuf -func readRecord(objs Objs, reader *ringbuf.Reader) (EventKey, string, error) { +func (l *Loader) ReadRecord(objs Objs, reader *ringbuf.Reader) (EventKey, string, error) { record, err := reader.Read() if err != nil { return EventKey{}, "", fmt.Errorf("reading from reader: %s", err) diff --git a/pkg/kubernetes/controllers/sqltrafficqos/controller.go b/pkg/kubernetes/controllers/sqltrafficqos/controller.go index 7c75ef2..94feaae 100644 --- a/pkg/kubernetes/controllers/sqltrafficqos/controller.go +++ b/pkg/kubernetes/controllers/sqltrafficqos/controller.go @@ -25,6 +25,7 @@ import ( "github.com/mlycore/log" "github.com/database-mesh/waterline/api/v1alpha1" + "github.com/database-mesh/waterline/pkg/tc" ) // SQLTrafficQoSReconciler reconciles a SQLTrafficQoS object @@ -51,9 +52,8 @@ func (r *SQLTrafficQoSReconciler) Reconcile(ctx context.Context, req ctrl.Reques // TODO(user): your logic here obj := &v1alpha1.SQLTrafficQoS{} - if err := r.Client.Get(ctx, req.NamespacedName, obj); err != nil { - log.Errorf("get resources error: %s", err) + log.Errorf("get SQLTrafficQos error: %s", err) return ctrl.Result{}, nil } @@ -66,20 +66,22 @@ func (r *SQLTrafficQoSReconciler) Reconcile(ctx context.Context, req ctrl.Reques // Read SQLTrafficQoS for basic QoS class up. // Read VirtualDatabase for application-level QoS after a Pod was scheduled on this Node - // err := r.SetTcs(ctx, obj) - // if err != nil { - // return ctrl.Result{Requeue: true}, nil - // } + shaper, err := tc.NewTcShaper(*obj, "1000M") + if err != nil { + log.Errorf("get shaper error: %s", err) + return ctrl.Result{Requeue: true}, nil + } + + if err = shaper.AddClasses(); err != nil { + log.Errorf("add classes error: %s", err) + return ctrl.Result{Requeue: true}, nil + } + log.Infof("SQLTrafficQoS: %#v", obj) return ctrl.Result{}, nil } -func (r *SQLTrafficQoSReconciler) SetTcs(ctx context.Context, qos *v1alpha1.SQLTrafficQoS) error { - //TODO: add TC operations - return nil -} - // SetupWithManager sets up the controller with the Manager. func (r *SQLTrafficQoSReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). diff --git a/pkg/kubernetes/controllers/virtualdatabase/controller.go b/pkg/kubernetes/controllers/virtualdatabase/controller.go index 99347df..2a875b4 100644 --- a/pkg/kubernetes/controllers/virtualdatabase/controller.go +++ b/pkg/kubernetes/controllers/virtualdatabase/controller.go @@ -23,6 +23,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "github.com/database-mesh/waterline/api/v1alpha1" + "github.com/database-mesh/waterline/pkg/bpf" ) // VirtualDatabaseReconciler reconciles a VirtualDatabase object @@ -57,6 +58,10 @@ func (r *VirtualDatabaseReconciler) Reconcile(ctx context.Context, req ctrl.Requ }() + // TODO: load SockFilter + l := &bpf.Loader{} + l.Load() + // pod := &corev1.Pod{} // err := r.Client.Get(ctx, types.NamespacedName{ // Name: obj.Name, diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index c4ea797..e4395a4 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -15,12 +15,18 @@ package manager import ( + "context" + "os" + + "github.com/database-mesh/waterline/api/v1alpha1" sqltrafficqos "github.com/database-mesh/waterline/pkg/kubernetes/controllers/sqltrafficqos" virtualdatabase "github.com/database-mesh/waterline/pkg/kubernetes/controllers/virtualdatabase" "github.com/database-mesh/waterline/pkg/kubernetes/watcher" "github.com/mlycore/log" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/watch" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/healthz" ctrlmgr "sigs.k8s.io/controller-runtime/pkg/manager" ) @@ -35,8 +41,17 @@ func (m *Manager) WatchAndHandle() error { select { case event := <-m.Pod.Core.ResultChan(): { + pod := event.Object.(*corev1.Pod) log.Infof("[%s] pod event: %#v", event.Type, event.Object.(*corev1.Pod).Name) //TODO: Handle different types of events + switch event.Type { + case watch.Added: + handleAdded(pod, m.Mgr.GetClient()) + case watch.Modified: + handleModified(pod, m.Mgr.GetClient()) + case watch.Deleted: + handleDeleted(pod, m.Mgr.GetClient()) + } } } } @@ -72,3 +87,38 @@ func (m *Manager) Bootstrap() error { } return nil } + +func handleAdded(pod *corev1.Pod, c client.Client) error { + //TODO: add related rules + hostname, err := os.Hostname() + if err != nil { + return err + } + + if hostname == pod.Spec.Hostname { + list := &v1alpha1.VirtualDatabaseList{Items: []v1alpha1.VirtualDatabase{}} + + if err := c.List(context.TODO(), list, &client.ListOptions{Namespace: pod.Namespace}); err != nil { + log.Errorf("get SQLTrafficQos error: %s", err) + return err + } + + for _, db := range list.Items { + // TODO: add loader + // db.Spec.Server.Port + // db.Spec.QoS + } + + } + +} + +func handleModified(pod *corev1.Pod, c client.Client) { + +} + +func handleDeleted(pod *corev1.Pod, c client.Client) { + //TODO: remove related rules + // move it to a queue ? + +} diff --git a/pkg/server/server.go b/pkg/server/server.go index 9372813..e126ddf 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -89,6 +89,8 @@ func (s *Server) Run() error { } var eg errgroup.Group + // mgr.GetClient() + // apply to Pod eg.Go(func() error { log.Infof("starting controllers") From aecca7d45a27217f6dc49f0a2bc9d42a2cd2b4f0 Mon Sep 17 00:00:00 2001 From: mlycore Date: Tue, 22 Mar 2022 00:01:06 +0800 Subject: [PATCH 05/31] wip: add tc and bpf load utils --- api/v1alpha1/virtualdatabase_types.go | 2 +- .../controllers/sqltrafficqos/controller.go | 3 ++ .../controllers/virtualdatabase/controller.go | 6 ++-- pkg/manager/manager.go | 35 +++++++++++++++---- pkg/server/server.go | 1 + pkg/tc/tc.go | 1 + pkg/tc/utils.go | 2 +- 7 files changed, 40 insertions(+), 10 deletions(-) diff --git a/api/v1alpha1/virtualdatabase_types.go b/api/v1alpha1/virtualdatabase_types.go index 9d35e47..f99ffbe 100644 --- a/api/v1alpha1/virtualdatabase_types.go +++ b/api/v1alpha1/virtualdatabase_types.go @@ -46,7 +46,7 @@ type VirtualDatabaseSpec struct { // Important: Run "make" to regenerate code after modifying this file // Foo is an example field of VirtualDatabase. Edit virtualdatabase_types.go to remove/update - Selector map[string]string `json:"selector"` + Selector map[string]string `json:"selector"` // TODO: Is it needed ? Or is it applied with Endpoints ? Server VirtualDatabaseServer `json:"server"` QoS string `json:"qos"` Bandwidth string `json:"bandwidth` diff --git a/pkg/kubernetes/controllers/sqltrafficqos/controller.go b/pkg/kubernetes/controllers/sqltrafficqos/controller.go index 94feaae..97f995b 100644 --- a/pkg/kubernetes/controllers/sqltrafficqos/controller.go +++ b/pkg/kubernetes/controllers/sqltrafficqos/controller.go @@ -66,6 +66,9 @@ func (r *SQLTrafficQoSReconciler) Reconcile(ctx context.Context, req ctrl.Reques // Read SQLTrafficQoS for basic QoS class up. // Read VirtualDatabase for application-level QoS after a Pod was scheduled on this Node + //TODO: all rules should be removed once the resource was deleted + //TODO: is there an exception when more than one resource was created + shaper, err := tc.NewTcShaper(*obj, "1000M") if err != nil { log.Errorf("get shaper error: %s", err) diff --git a/pkg/kubernetes/controllers/virtualdatabase/controller.go b/pkg/kubernetes/controllers/virtualdatabase/controller.go index 2a875b4..d268560 100644 --- a/pkg/kubernetes/controllers/virtualdatabase/controller.go +++ b/pkg/kubernetes/controllers/virtualdatabase/controller.go @@ -47,6 +47,8 @@ type VirtualDatabaseReconciler struct { // - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.10.0/pkg/reconcile func (r *VirtualDatabaseReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { // TODO(user): your logic here + // TODO: all rules should be removed once the resource was deleted + // however the rules were applied with the Pod scheduled obj := &v1alpha1.VirtualDatabase{} if err := r.Client.Get(ctx, req.NamespacedName, obj); err != nil { @@ -59,8 +61,8 @@ func (r *VirtualDatabaseReconciler) Reconcile(ctx context.Context, req ctrl.Requ }() // TODO: load SockFilter - l := &bpf.Loader{} - l.Load() + // l := &bpf.Loader{} + // l.Load() // pod := &corev1.Pod{} // err := r.Client.Get(ctx, types.NamespacedName{ diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index e4395a4..7332336 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -46,11 +46,11 @@ func (m *Manager) WatchAndHandle() error { //TODO: Handle different types of events switch event.Type { case watch.Added: - handleAdded(pod, m.Mgr.GetClient()) + handleAdded(pod, m.Mgr.GetClient(), m.Mgr.CRI) case watch.Modified: - handleModified(pod, m.Mgr.GetClient()) + handleModified(pod, m.Mgr.GetClient(), m.Mgr.CRI) case watch.Deleted: - handleDeleted(pod, m.Mgr.GetClient()) + handleDeleted(pod, m.Mgr.GetClient(), m.Mgr.CRI) } } } @@ -88,7 +88,7 @@ func (m *Manager) Bootstrap() error { return nil } -func handleAdded(pod *corev1.Pod, c client.Client) error { +func handleAdded(pod *corev1.Pod, c client.Client, cr cri.ContainerRuntimeInterfaceClient) error { //TODO: add related rules hostname, err := os.Hostname() if err != nil { @@ -104,6 +104,29 @@ func handleAdded(pod *corev1.Pod, c client.Client) error { } for _, db := range list.Items { + var found bool + for k, v := range db.Spec.Selector { + if pod.Label[k] == v { + found = true + } else { + found = false + } + } + + if found { + l := &bpf.Loader{} + containerId := pod.Status.Container + pid := cr.GetPidFromContainer(containerId) + ifname, err := tc.GetNetworkDeviceFromPid() + if err != nil { + return err + } + err = l.Load(ifname, uint16(db.Spec.Server.Port)) + if err != nil { + return err + } + } + // TODO: add loader // db.Spec.Server.Port // db.Spec.QoS @@ -113,11 +136,11 @@ func handleAdded(pod *corev1.Pod, c client.Client) error { } -func handleModified(pod *corev1.Pod, c client.Client) { +func handleModified(pod *corev1.Pod, c client.Client, cr cri.ContainerRuntimeInterfaceClient) { } -func handleDeleted(pod *corev1.Pod, c client.Client) { +func handleDeleted(pod *corev1.Pod, c client.Client, cr cri.ContainerRuntimeInterfaceClient) { //TODO: remove related rules // move it to a queue ? diff --git a/pkg/server/server.go b/pkg/server/server.go index e126ddf..486858f 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -84,6 +84,7 @@ func (s *Server) Run() error { return err } manager := &manager.Manager{ + CRI: s.ContainerRuntimeClient, Pod: w, Mgr: mgr, } diff --git a/pkg/tc/tc.go b/pkg/tc/tc.go index 7ce311f..61b94dd 100644 --- a/pkg/tc/tc.go +++ b/pkg/tc/tc.go @@ -89,6 +89,7 @@ func (t *Shaper) AddClasses() error { return rules[i].Rate < rules[j].Rate }) + //TODO: add error handling. for idx, rule := range rules { if err := t.addClass(idx, rule); err != nil { return err diff --git a/pkg/tc/utils.go b/pkg/tc/utils.go index 0f6877f..b6f17e8 100644 --- a/pkg/tc/utils.go +++ b/pkg/tc/utils.go @@ -20,7 +20,7 @@ import ( "strings" ) -func getNetworkDeviceFromPid(pid uint32) (string, error) { +func GetNetworkDeviceFromPid(pid uint32) (string, error) { igmpArgs := []string{ fmt.Sprintf("/proc/%d/net/igmp", pid), } From 7a4f0d4ab78ba1d292b5f1907fc35179cda1d6b7 Mon Sep 17 00:00:00 2001 From: mlycore Date: Tue, 22 Mar 2022 09:53:24 +0800 Subject: [PATCH 06/31] fix: package dependencies --- pkg/manager/manager.go | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index 7332336..c2de450 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -17,11 +17,15 @@ package manager import ( "context" "os" + "strings" "github.com/database-mesh/waterline/api/v1alpha1" + "github.com/database-mesh/waterline/pkg/bpf" + "github.com/database-mesh/waterline/pkg/cri" sqltrafficqos "github.com/database-mesh/waterline/pkg/kubernetes/controllers/sqltrafficqos" virtualdatabase "github.com/database-mesh/waterline/pkg/kubernetes/controllers/virtualdatabase" "github.com/database-mesh/waterline/pkg/kubernetes/watcher" + "github.com/database-mesh/waterline/pkg/tc" "github.com/mlycore/log" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/watch" @@ -106,7 +110,7 @@ func handleAdded(pod *corev1.Pod, c client.Client, cr cri.ContainerRuntimeInterf for _, db := range list.Items { var found bool for k, v := range db.Spec.Selector { - if pod.Label[k] == v { + if pod.Labels[k] == v { found = true } else { found = false @@ -115,9 +119,12 @@ func handleAdded(pod *corev1.Pod, c client.Client, cr cri.ContainerRuntimeInterf if found { l := &bpf.Loader{} - containerId := pod.Status.Container - pid := cr.GetPidFromContainer(containerId) - ifname, err := tc.GetNetworkDeviceFromPid() + containerId := strings.Split(pod.Status.ContainerStatuses[0].ContainerID, "containerd://")[1] + pid, err := cr.GetPidFromContainer(context.TODO(), containerId) + if err != nil { + return err + } + ifname, err := tc.GetNetworkDeviceFromPid(pid) if err != nil { return err } From 03e0dbd0d29fa219ed742f7e24fc09fb58387430 Mon Sep 17 00:00:00 2001 From: mlycore Date: Tue, 22 Mar 2022 09:58:41 +0800 Subject: [PATCH 07/31] fix: import C for cgo --- pkg/bpf/load.go | 9 ++++++--- pkg/tc/tc.go | 4 +++- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/pkg/bpf/load.go b/pkg/bpf/load.go index 903ad33..544ad75 100644 --- a/pkg/bpf/load.go +++ b/pkg/bpf/load.go @@ -19,12 +19,14 @@ import ( "encoding/binary" "fmt" + "C" + "net" + "syscall" + "github.com/cilium/ebpf" "github.com/cilium/ebpf/ringbuf" "github.com/mlycore/log" "golang.org/x/sys/unix" - "net" - "syscall" ) const ( @@ -49,6 +51,7 @@ func (l *Loader) Load(ifaceName string, port uint16) error { } return nil +} type Objs struct { Prog *ebpf.Program `ebpf:"sql_filter"` @@ -192,4 +195,4 @@ func (l *Loader) ReadRecord(objs Objs, reader *ringbuf.Reader) (EventKey, string } return evt, string(chars), nil -} \ No newline at end of file +} diff --git a/pkg/tc/tc.go b/pkg/tc/tc.go index 61b94dd..9e83c29 100644 --- a/pkg/tc/tc.go +++ b/pkg/tc/tc.go @@ -15,11 +15,13 @@ package tc import ( + "C" + "sort" + v1alpha1 "github.com/database-mesh/waterline/api/v1alpha1" "github.com/vishvananda/netlink" "golang.org/x/sys/unix" "k8s.io/apimachinery/pkg/api/resource" - "sort" ) type Shaper struct { From 1e17a13ae8d7c2c3275f35ca83e3dccce8f47646 Mon Sep 17 00:00:00 2001 From: mlycore Date: Tue, 22 Mar 2022 09:58:41 +0800 Subject: [PATCH 08/31] fix: import C for cgo --- pkg/bpf/load.go | 11 +++++++---- pkg/tc/tc.go | 4 +++- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/pkg/bpf/load.go b/pkg/bpf/load.go index 903ad33..1e6cb2c 100644 --- a/pkg/bpf/load.go +++ b/pkg/bpf/load.go @@ -19,12 +19,14 @@ import ( "encoding/binary" "fmt" + "C" + "net" + "syscall" + "github.com/cilium/ebpf" "github.com/cilium/ebpf/ringbuf" "github.com/mlycore/log" "golang.org/x/sys/unix" - "net" - "syscall" ) const ( @@ -36,7 +38,7 @@ type Loader struct { } func (l *Loader) Load(ifaceName string, port uint16) error { - tcMap, err := loadTcPktMap() + tcMap, err := l.LoadTcPkgMap() // TODO: add loader to load this program to net dev // TODO: add port @@ -49,6 +51,7 @@ func (l *Loader) Load(ifaceName string, port uint16) error { } return nil +} type Objs struct { Prog *ebpf.Program `ebpf:"sql_filter"` @@ -192,4 +195,4 @@ func (l *Loader) ReadRecord(objs Objs, reader *ringbuf.Reader) (EventKey, string } return evt, string(chars), nil -} \ No newline at end of file +} diff --git a/pkg/tc/tc.go b/pkg/tc/tc.go index 61b94dd..9e83c29 100644 --- a/pkg/tc/tc.go +++ b/pkg/tc/tc.go @@ -15,11 +15,13 @@ package tc import ( + "C" + "sort" + v1alpha1 "github.com/database-mesh/waterline/api/v1alpha1" "github.com/vishvananda/netlink" "golang.org/x/sys/unix" "k8s.io/apimachinery/pkg/api/resource" - "sort" ) type Shaper struct { From bd2a88a1ab532b97aad6523f9987e2fff0a11ff1 Mon Sep 17 00:00:00 2001 From: mlycore Date: Tue, 22 Mar 2022 14:10:35 +0800 Subject: [PATCH 09/31] fix: typo --- pkg/bpf/load.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/bpf/load.go b/pkg/bpf/load.go index 1e6cb2c..144d7e3 100644 --- a/pkg/bpf/load.go +++ b/pkg/bpf/load.go @@ -108,7 +108,7 @@ func (l *Loader) LoadSockFilter(ifaceName string, port uint16, tcPkt *ebpf.Map) } evt_value := EventValue{ - ClassId: l.CalcQos(query), + ClassId: l.CalcQoS(query), } if err := tcPkt.Update(&evt, &evt_value, ebpf.UpdateAny); err != nil { From 039abf9bf1b4107eddf837e0fa97495a2ec36d83 Mon Sep 17 00:00:00 2001 From: mlycore Date: Tue, 22 Mar 2022 14:11:48 +0800 Subject: [PATCH 10/31] fix: remove unused packages --- pkg/kubernetes/controllers/virtualdatabase/controller.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/kubernetes/controllers/virtualdatabase/controller.go b/pkg/kubernetes/controllers/virtualdatabase/controller.go index d268560..e894786 100644 --- a/pkg/kubernetes/controllers/virtualdatabase/controller.go +++ b/pkg/kubernetes/controllers/virtualdatabase/controller.go @@ -23,7 +23,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "github.com/database-mesh/waterline/api/v1alpha1" - "github.com/database-mesh/waterline/pkg/bpf" + // "github.com/database-mesh/waterline/pkg/bpf" ) // VirtualDatabaseReconciler reconciles a VirtualDatabase object From 4940aa9885ded32a8404c2040c4b473bdc9ce392 Mon Sep 17 00:00:00 2001 From: mlycore Date: Tue, 22 Mar 2022 14:13:20 +0800 Subject: [PATCH 11/31] fix: add cri to manager --- pkg/manager/manager.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index c2de450..b93e11d 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -38,6 +38,7 @@ import ( type Manager struct { Pod *watcher.PodWatcher Mgr ctrlmgr.Manager + CRI cri.ContainerRuntimeInterfaceClient } func (m *Manager) WatchAndHandle() error { From 0798da94dcc0cb437cac7ee4285e3924bbbcebd6 Mon Sep 17 00:00:00 2001 From: mlycore Date: Tue, 22 Mar 2022 14:21:06 +0800 Subject: [PATCH 12/31] fix: cri --- pkg/manager/manager.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index b93e11d..015f64e 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -51,11 +51,11 @@ func (m *Manager) WatchAndHandle() error { //TODO: Handle different types of events switch event.Type { case watch.Added: - handleAdded(pod, m.Mgr.GetClient(), m.Mgr.CRI) + handleAdded(pod, m.Mgr.GetClient(), m.CRI) case watch.Modified: - handleModified(pod, m.Mgr.GetClient(), m.Mgr.CRI) + handleModified(pod, m.Mgr.GetClient(), m.CRI) case watch.Deleted: - handleDeleted(pod, m.Mgr.GetClient(), m.Mgr.CRI) + handleDeleted(pod, m.Mgr.GetClient(), m.CRI) } } } From 011c6bec13258cec7d909bfd0e0ff00f493166f9 Mon Sep 17 00:00:00 2001 From: mlycore Date: Tue, 22 Mar 2022 14:22:10 +0800 Subject: [PATCH 13/31] fix: fix error handle --- pkg/manager/manager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index 015f64e..e303ff3 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -141,7 +141,7 @@ func handleAdded(pod *corev1.Pod, c client.Client, cr cri.ContainerRuntimeInterf } } - + return nil } func handleModified(pod *corev1.Pod, c client.Client, cr cri.ContainerRuntimeInterfaceClient) { From d50dbb4bd7564d90665e53677f69c0d2d88c68d8 Mon Sep 17 00:00:00 2001 From: mlycore Date: Tue, 22 Mar 2022 14:28:07 +0800 Subject: [PATCH 14/31] fix: add error handle in main --- cmd/waterline/main.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/cmd/waterline/main.go b/cmd/waterline/main.go index f68a976..dee59cf 100644 --- a/cmd/waterline/main.go +++ b/cmd/waterline/main.go @@ -54,5 +54,7 @@ func main() { log.Fatalf("new server error") } - s.Run() + if err := s.Run(); err != nil { + log.Fatalf("server run error: %s", err) + } } From 162625a4cae07489428407adaa8d9ccff610f672 Mon Sep 17 00:00:00 2001 From: mlycore Date: Tue, 22 Mar 2022 18:06:05 +0800 Subject: [PATCH 15/31] feat: add error log --- pkg/tc/tc.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pkg/tc/tc.go b/pkg/tc/tc.go index 9e83c29..77cff21 100644 --- a/pkg/tc/tc.go +++ b/pkg/tc/tc.go @@ -15,10 +15,11 @@ package tc import ( - "C" + // "C" "sort" v1alpha1 "github.com/database-mesh/waterline/api/v1alpha1" + "github.com/mlycore/log" "github.com/vishvananda/netlink" "golang.org/x/sys/unix" "k8s.io/apimachinery/pkg/api/resource" @@ -78,10 +79,12 @@ func (t *Shaper) addRootHandle() error { func (t *Shaper) AddClasses() error { if err := t.addHtbQdisc(); err != nil { + log.Errorf("add htb qdisc error: %s", err) return err } if err := t.addRootHandle(); err != nil { + log.Errorf("add root handle error: %s", err) return err } @@ -94,6 +97,7 @@ func (t *Shaper) AddClasses() error { //TODO: add error handling. for idx, rule := range rules { if err := t.addClass(idx, rule); err != nil { + log.Errorf("add class error: %s, rule: %s", err, rule) return err } } From c3dcc6d1d7b1160ff036604b687729fd437ef09d Mon Sep 17 00:00:00 2001 From: mlycore Date: Tue, 22 Mar 2022 18:11:04 +0800 Subject: [PATCH 16/31] feat: add error log --- pkg/tc/tc.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/tc/tc.go b/pkg/tc/tc.go index 77cff21..d4dbc95 100644 --- a/pkg/tc/tc.go +++ b/pkg/tc/tc.go @@ -53,6 +53,7 @@ func (t *Shaper) addHtbQdisc() error { } qdisc := netlink.NewHtb(attrs) + log.Infof("htb qdisc attrs: %#v, qdisc: %#v", attrs, qdisc) return netlink.QdiscReplace(qdisc) } @@ -74,6 +75,7 @@ func (t *Shaper) addRootHandle() error { } class := netlink.NewHtbClass(attrs, htbClassAttrs) + log.Infof("add root handle attrs: %#v, htbClassAttrs: %#v, qdisc: %#v", attrs, htbClassAttrs, class) return netlink.ClassReplace(class) } From c8e0b61a8b7bf0296d788c3c5f9802c787411008 Mon Sep 17 00:00:00 2001 From: mlycore Date: Fri, 25 Mar 2022 15:00:13 +0800 Subject: [PATCH 17/31] refactor: rename sqltrafficqos to trafficqos --- ...rafficqos_types.go => trafficqos_types.go} | 26 ++++++------- ...icqos_webhook.go => trafficqos_webhook.go} | 28 +++++++------- api/v1alpha1/zz_generated.deepcopy.go | 38 +++++++++---------- ...1alpha1.yaml => traffic-qos-v1alpha1.yaml} | 33 +++++++--------- 4 files changed, 59 insertions(+), 66 deletions(-) rename api/v1alpha1/{sqltrafficqos_types.go => trafficqos_types.go} (75%) rename api/v1alpha1/{sqltrafficqos_webhook.go => trafficqos_webhook.go} (64%) rename manifests/{sql-traffic-qos-v1alpha1.yaml => traffic-qos-v1alpha1.yaml} (70%) diff --git a/api/v1alpha1/sqltrafficqos_types.go b/api/v1alpha1/trafficqos_types.go similarity index 75% rename from api/v1alpha1/sqltrafficqos_types.go rename to api/v1alpha1/trafficqos_types.go index da65405..a40979c 100644 --- a/api/v1alpha1/sqltrafficqos_types.go +++ b/api/v1alpha1/trafficqos_types.go @@ -41,20 +41,20 @@ type TrafficQoSGroup struct { Ceil string `json:"ceil",omitempty` } -// SQLTrafficQoSSpec defines the desired state of SQLTrafficQoS -type SQLTrafficQoSSpec struct { +// TrafficQoSSpec defines the desired state of TrafficQoS +type TrafficQoSSpec struct { // INSERT ADDITIONAL SPEC FIELDS - desired state of cluster // Important: Run "make" to regenerate code after modifying this file - // Foo is an example field of SQLTrafficQoS. Edit sqltrafficqos_types.go to remove/update + // Foo is an example field of TrafficQoS. Edit trafficqos_types.go to remove/update NetworkDevice string `json:"networkDevice"` QoSClass QoSClassType `json:"qosClass,omitempty"` Strategy TrafficQoSStrategy `json:"strategy",omitempty` Groups []TrafficQoSGroup `json:"groups"` } -// SQLTrafficQoSStatus defines the observed state of SQLTrafficQoS -type SQLTrafficQoSStatus struct { +// TrafficQoSStatus defines the observed state of TrafficQoS +type TrafficQoSStatus struct { // INSERT ADDITIONAL STATUS FIELD - define observed state of cluster // Important: Run "make" to regenerate code after modifying this file @@ -64,24 +64,24 @@ type SQLTrafficQoSStatus struct { //+kubebuilder:object:root=true //+kubebuilder:subresource:status -// SQLTrafficQoS is the Schema for the sqltrafficqos API -type SQLTrafficQoS struct { +// TrafficQoS is the Schema for the trafficqos API +type TrafficQoS struct { metav1.TypeMeta `json:",inline"` metav1.ObjectMeta `json:"metadata,omitempty"` - Spec SQLTrafficQoSSpec `json:"spec,omitempty"` - Status SQLTrafficQoSStatus `json:"status,omitempty"` + Spec TrafficQoSSpec `json:"spec,omitempty"` + Status TrafficQoSStatus `json:"status,omitempty"` } //+kubebuilder:object:root=true -// SQLTrafficQoSList contains a list of SQLTrafficQoS -type SQLTrafficQoSList struct { +// TrafficQoSList contains a list of TrafficQoS +type TrafficQoSList struct { metav1.TypeMeta `json:",inline"` metav1.ListMeta `json:"metadata,omitempty"` - Items []SQLTrafficQoS `json:"items"` + Items []TrafficQoS `json:"items"` } func init() { - SchemeBuilder.Register(&SQLTrafficQoS{}, &SQLTrafficQoSList{}) + SchemeBuilder.Register(&TrafficQoS{}, &TrafficQoSList{}) } diff --git a/api/v1alpha1/sqltrafficqos_webhook.go b/api/v1alpha1/trafficqos_webhook.go similarity index 64% rename from api/v1alpha1/sqltrafficqos_webhook.go rename to api/v1alpha1/trafficqos_webhook.go index 0c3ae15..5919bb3 100644 --- a/api/v1alpha1/sqltrafficqos_webhook.go +++ b/api/v1alpha1/trafficqos_webhook.go @@ -22,9 +22,9 @@ import ( ) // log is for logging in this package. -var sqltrafficqoslog = logf.Log.WithName("sqltrafficqos-resource") +var trafficqoslog = logf.Log.WithName("trafficqos-resource") -func (r *SQLTrafficQoS) SetupWebhookWithManager(mgr ctrl.Manager) error { +func (r *TrafficQoS) SetupWebhookWithManager(mgr ctrl.Manager) error { return ctrl.NewWebhookManagedBy(mgr). For(r). Complete() @@ -32,25 +32,25 @@ func (r *SQLTrafficQoS) SetupWebhookWithManager(mgr ctrl.Manager) error { // TODO(user): EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN! -//+kubebuilder:webhook:path=/mutate-database-mesh-io-database-mesh-io-v1alpha1-sqltrafficqos,mutating=true,failurePolicy=fail,sideEffects=None,groups=database-mesh.io.database-mesh.io,resources=sqltrafficqos,verbs=create;update,versions=v1alpha1,name=msqltrafficqos.kb.io,admissionReviewVersions=v1 +//+kubebuilder:webhook:path=/mutate-database-mesh-io-database-mesh-io-v1alpha1-trafficqos,mutating=true,failurePolicy=fail,sideEffects=None,groups=database-mesh.io.database-mesh.io,resources=trafficqos,verbs=create;update,versions=v1alpha1,name=mtrafficqos.kb.io,admissionReviewVersions=v1 -var _ webhook.Defaulter = &SQLTrafficQoS{} +var _ webhook.Defaulter = &TrafficQoS{} // Default implements webhook.Defaulter so a webhook will be registered for the type -func (r *SQLTrafficQoS) Default() { - sqltrafficqoslog.Info("default", "name", r.Name) +func (r *TrafficQoS) Default() { + trafficqoslog.Info("default", "name", r.Name) // TODO(user): fill in your defaulting logic. } // TODO(user): change verbs to "verbs=create;update;delete" if you want to enable deletion validation. -//+kubebuilder:webhook:path=/validate-database-mesh-io-database-mesh-io-v1alpha1-sqltrafficqos,mutating=false,failurePolicy=fail,sideEffects=None,groups=database-mesh.io.database-mesh.io,resources=sqltrafficqos,verbs=create;update,versions=v1alpha1,name=vsqltrafficqos.kb.io,admissionReviewVersions=v1 +//+kubebuilder:webhook:path=/validate-database-mesh-io-database-mesh-io-v1alpha1-trafficqos,mutating=false,failurePolicy=fail,sideEffects=None,groups=database-mesh.io.database-mesh.io,resources=trafficqos,verbs=create;update,versions=v1alpha1,name=vtrafficqos.kb.io,admissionReviewVersions=v1 -var _ webhook.Validator = &SQLTrafficQoS{} +var _ webhook.Validator = &TrafficQoS{} // ValidateCreate implements webhook.Validator so a webhook will be registered for the type -func (r *SQLTrafficQoS) ValidateCreate() error { - sqltrafficqoslog.Info("validate create", "name", r.Name) +func (r *TrafficQoS) ValidateCreate() error { + trafficqoslog.Info("validate create", "name", r.Name) // TODO(user): fill in your validation logic upon object creation. @@ -58,16 +58,16 @@ func (r *SQLTrafficQoS) ValidateCreate() error { } // ValidateUpdate implements webhook.Validator so a webhook will be registered for the type -func (r *SQLTrafficQoS) ValidateUpdate(old runtime.Object) error { - sqltrafficqoslog.Info("validate update", "name", r.Name) +func (r *TrafficQoS) ValidateUpdate(old runtime.Object) error { + trafficqoslog.Info("validate update", "name", r.Name) // TODO(user): fill in your validation logic upon object update. return nil } // ValidateDelete implements webhook.Validator so a webhook will be registered for the type -func (r *SQLTrafficQoS) ValidateDelete() error { - sqltrafficqoslog.Info("validate delete", "name", r.Name) +func (r *TrafficQoS) ValidateDelete() error { + trafficqoslog.Info("validate delete", "name", r.Name) // TODO(user): fill in your validation logic upon object deletion. return nil diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index 4611627..068bd5e 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -24,7 +24,7 @@ import ( ) // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *SQLTrafficQoS) DeepCopyInto(out *SQLTrafficQoS) { +func (in *TrafficQoS) DeepCopyInto(out *TrafficQoS) { *out = *in out.TypeMeta = in.TypeMeta in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) @@ -32,18 +32,18 @@ func (in *SQLTrafficQoS) DeepCopyInto(out *SQLTrafficQoS) { out.Status = in.Status } -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SQLTrafficQoS. -func (in *SQLTrafficQoS) DeepCopy() *SQLTrafficQoS { +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TrafficQoS. +func (in *TrafficQoS) DeepCopy() *TrafficQoS { if in == nil { return nil } - out := new(SQLTrafficQoS) + out := new(TrafficQoS) in.DeepCopyInto(out) return out } // DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. -func (in *SQLTrafficQoS) DeepCopyObject() runtime.Object { +func (in *TrafficQoS) DeepCopyObject() runtime.Object { if c := in.DeepCopy(); c != nil { return c } @@ -51,31 +51,31 @@ func (in *SQLTrafficQoS) DeepCopyObject() runtime.Object { } // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *SQLTrafficQoSList) DeepCopyInto(out *SQLTrafficQoSList) { +func (in *TrafficQoSList) DeepCopyInto(out *TrafficQoSList) { *out = *in out.TypeMeta = in.TypeMeta in.ListMeta.DeepCopyInto(&out.ListMeta) if in.Items != nil { in, out := &in.Items, &out.Items - *out = make([]SQLTrafficQoS, len(*in)) + *out = make([]TrafficQoS, len(*in)) for i := range *in { (*in)[i].DeepCopyInto(&(*out)[i]) } } } -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SQLTrafficQoSList. -func (in *SQLTrafficQoSList) DeepCopy() *SQLTrafficQoSList { +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TrafficQoSList. +func (in *TrafficQoSList) DeepCopy() *TrafficQoSList { if in == nil { return nil } - out := new(SQLTrafficQoSList) + out := new(TrafficQoSList) in.DeepCopyInto(out) return out } // DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. -func (in *SQLTrafficQoSList) DeepCopyObject() runtime.Object { +func (in *TrafficQoSList) DeepCopyObject() runtime.Object { if c := in.DeepCopy(); c != nil { return c } @@ -83,7 +83,7 @@ func (in *SQLTrafficQoSList) DeepCopyObject() runtime.Object { } // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *SQLTrafficQoSSpec) DeepCopyInto(out *SQLTrafficQoSSpec) { +func (in *TrafficQoSSpec) DeepCopyInto(out *TrafficQoSSpec) { *out = *in if in.Groups != nil { in, out := &in.Groups, &out.Groups @@ -92,27 +92,27 @@ func (in *SQLTrafficQoSSpec) DeepCopyInto(out *SQLTrafficQoSSpec) { } } -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SQLTrafficQoSSpec. -func (in *SQLTrafficQoSSpec) DeepCopy() *SQLTrafficQoSSpec { +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TrafficQoSSpec. +func (in *TrafficQoSSpec) DeepCopy() *TrafficQoSSpec { if in == nil { return nil } - out := new(SQLTrafficQoSSpec) + out := new(TrafficQoSSpec) in.DeepCopyInto(out) return out } // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *SQLTrafficQoSStatus) DeepCopyInto(out *SQLTrafficQoSStatus) { +func (in *TrafficQoSStatus) DeepCopyInto(out *TrafficQoSStatus) { *out = *in } -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SQLTrafficQoSStatus. -func (in *SQLTrafficQoSStatus) DeepCopy() *SQLTrafficQoSStatus { +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TrafficQoSStatus. +func (in *TrafficQoSStatus) DeepCopy() *TrafficQoSStatus { if in == nil { return nil } - out := new(SQLTrafficQoSStatus) + out := new(TrafficQoSStatus) in.DeepCopyInto(out) return out } diff --git a/manifests/sql-traffic-qos-v1alpha1.yaml b/manifests/traffic-qos-v1alpha1.yaml similarity index 70% rename from manifests/sql-traffic-qos-v1alpha1.yaml rename to manifests/traffic-qos-v1alpha1.yaml index 52c6ae4..9107984 100644 --- a/manifests/sql-traffic-qos-v1alpha1.yaml +++ b/manifests/traffic-qos-v1alpha1.yaml @@ -6,20 +6,20 @@ metadata: annotations: controller-gen.kubebuilder.io/version: v0.7.0 creationTimestamp: null - name: sqltrafficqos.database-mesh.io + name: trafficqos.database-mesh.io spec: group: database-mesh.io names: - kind: SQLTrafficQoS - listKind: SQLTrafficQoSList - plural: sqltrafficqos - singular: sqltrafficqos + kind: TrafficQoS + listKind: TrafficQoSList + plural: trafficqos + singular: trafficqos scope: Namespaced versions: - name: v1alpha1 schema: openAPIV3Schema: - description: SQLTrafficQoS is the Schema for the sqltrafficqos API + description: TrafficQoS is the Schema for the trafficqos API properties: apiVersion: description: 'APIVersion defines the versioned schema of this representation @@ -34,41 +34,34 @@ spec: metadata: type: object spec: - description: SQLTrafficQoSSpec defines the desired state of SQLTrafficQoS + description: TrafficQoSSpec defines the desired state of TrafficQoS properties: groups: items: properties: ceil: type: string - classId: - type: string - networkDevice: - type: string - parent: - type: string rate: type: string required: - ceil - - classId - - networkDevice - - parent - rate type: object type: array - qosClass: - description: Foo is an example field of SQLTrafficQoS. Edit sqltrafficqos_types.go + networkDevice: + description: Foo is an example field of TrafficQoS. Edit trafficqos_types.go to remove/update type: string + qosClass: + type: string strategy: type: string required: - groups - - strategy + - networkDevice type: object status: - description: SQLTrafficQoSStatus defines the observed state of SQLTrafficQoS + description: TrafficQoSStatus defines the observed state of TrafficQoS type: object type: object served: true From 3fd2469e412d1d7f9dba873f2e3a97c8dfd25e4f Mon Sep 17 00:00:00 2001 From: mlycore Date: Fri, 25 Mar 2022 15:01:37 +0800 Subject: [PATCH 18/31] chore: add linux makefile --- Makefile | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/Makefile b/Makefile index 0444b69..3dfca25 100644 --- a/Makefile +++ b/Makefile @@ -3,4 +3,8 @@ LDFLAGS := "-s -w -X 'github.com/database-mesh/waterline/pkg/vesrion.GitCommit=` build: mkdir -p bin go build -ldflags=${LDFLAGS} -o bin/waterline cmd/waterline/main.go +linux: + mkdir -p bin + GOOS=linux GOARCH=amd64 go build -ldflags=${LDFLAGS} -o bin/waterline cmd/waterline/main.go + From feefd444077c52f316f7fdbbd61bac8fa087d1fe Mon Sep 17 00:00:00 2001 From: mlycore Date: Mon, 28 Mar 2022 11:53:37 +0800 Subject: [PATCH 19/31] feat(virtualdatabase): add bandwidth --- api/v1alpha1/virtualdatabase_types.go | 6 +++--- manifests/virtual-database-v1alpha1.yaml | 23 ++++++++++++++++++++++- 2 files changed, 25 insertions(+), 4 deletions(-) diff --git a/api/v1alpha1/virtualdatabase_types.go b/api/v1alpha1/virtualdatabase_types.go index f99ffbe..51f0376 100644 --- a/api/v1alpha1/virtualdatabase_types.go +++ b/api/v1alpha1/virtualdatabase_types.go @@ -49,15 +49,15 @@ type VirtualDatabaseSpec struct { Selector map[string]string `json:"selector"` // TODO: Is it needed ? Or is it applied with Endpoints ? Server VirtualDatabaseServer `json:"server"` QoS string `json:"qos"` - Bandwidth string `json:"bandwidth` + Bandwidth string `json:"bandwidth"` } // VirtualDatabaseStatus defines the observed state of VirtualDatabase type VirtualDatabaseStatus struct { // INSERT ADDITIONAL STATUS FIELD - define observed state of cluster // Important: Run "make" to regenerate code after modifying this file - PodList []string `json:"podList"` - ObservedGeneration int64 `json:"observedGeneration,omitempty"` + ClassInfo string `json:"classInfo"` + ObservedGeneration int64 `json:"observedGeneration,omitempty"` } type Pod string diff --git a/manifests/virtual-database-v1alpha1.yaml b/manifests/virtual-database-v1alpha1.yaml index b8f2cd5..df9c568 100644 --- a/manifests/virtual-database-v1alpha1.yaml +++ b/manifests/virtual-database-v1alpha1.yaml @@ -36,11 +36,17 @@ spec: spec: description: VirtualDatabaseSpec defines the desired state of VirtualDatabase properties: + bandwidth: + type: string qos: type: string - server: + selector: + additionalProperties: + type: string description: Foo is an example field of VirtualDatabase. Edit virtualdatabase_types.go to remove/update + type: object + server: properties: backends: items: @@ -70,11 +76,26 @@ spec: - protocol type: object required: + - bandwidth - qos + - selector - server type: object status: description: VirtualDatabaseStatus defines the observed state of VirtualDatabase + properties: + observedGeneration: + format: int64 + type: integer + podList: + description: 'INSERT ADDITIONAL STATUS FIELD - define observed state + of cluster Important: Run "make" to regenerate code after modifying + this file' + items: + type: string + type: array + required: + - podList type: object type: object served: true From bd9fbfd7a4d6fc7ac49a6aab7c3fa4edd42011c8 Mon Sep 17 00:00:00 2001 From: mlycore Date: Mon, 28 Mar 2022 11:54:08 +0800 Subject: [PATCH 20/31] feat(traffocqosmapping): add trafficqosmapping definition and manifests --- api/v1alpha1/trafficqosmapping_webhook.go | 74 +++++++++++++++++++++ api/v1alpha1/trafficqosmappping_types.go | 63 ++++++++++++++++++ manifests/traffic-qos-mapping-v1alpha1.yaml | 54 +++++++++++++++ 3 files changed, 191 insertions(+) create mode 100644 api/v1alpha1/trafficqosmapping_webhook.go create mode 100644 api/v1alpha1/trafficqosmappping_types.go create mode 100644 manifests/traffic-qos-mapping-v1alpha1.yaml diff --git a/api/v1alpha1/trafficqosmapping_webhook.go b/api/v1alpha1/trafficqosmapping_webhook.go new file mode 100644 index 0000000..65f69b0 --- /dev/null +++ b/api/v1alpha1/trafficqosmapping_webhook.go @@ -0,0 +1,74 @@ +// Copyright 2022 Database Mesh Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package v1alpha1 + +import ( + "k8s.io/apimachinery/pkg/runtime" + ctrl "sigs.k8s.io/controller-runtime" + logf "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/webhook" +) + +// log is for logging in this package. +var trafficqosmappinglog = logf.Log.WithName("trafficqosmapping-resource") + +func (r *TrafficQoSMapping) SetupWebhookWithManager(mgr ctrl.Manager) error { + return ctrl.NewWebhookManagedBy(mgr). + For(r). + Complete() +} + +// TODO(user): EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN! + +//+kubebuilder:webhook:path=/mutate-database-mesh-io-database-mesh-io-v1alpha1-TrafficQoSMapping,mutating=true,failurePolicy=fail,sideEffects=None,groups=database-mesh.io.database-mesh.io,resources=trafficqosmapping,verbs=create;update,versions=v1alpha1,name=mtrafficqosmapping.kb.io,admissionReviewVersions=v1 + +var _ webhook.Defaulter = &TrafficQoSMapping{} + +// Default implements webhook.Defaulter so a webhook will be registered for the type +func (r *TrafficQoSMapping) Default() { + trafficqosmappinglog.Info("default", "name", r.Name) + + // TODO(user): fill in your defaulting logic. +} + +// TODO(user): change verbs to "verbs=create;update;delete" if you want to enable deletion validation. +//+kubebuilder:webhook:path=/validate-database-mesh-io-database-mesh-io-v1alpha1-TrafficQoSMapping,mutating=false,failurePolicy=fail,sideEffects=None,groups=database-mesh.io.database-mesh.io,resources=trafficqosmapping,verbs=create;update,versions=v1alpha1,name=vtrafficqosmapping.kb.io,admissionReviewVersions=v1 + +var _ webhook.Validator = &TrafficQoSMapping{} + +// ValidateCreate implements webhook.Validator so a webhook will be registered for the type +func (r *TrafficQoSMapping) ValidateCreate() error { + trafficqosmappinglog.Info("validate create", "name", r.Name) + + // TODO(user): fill in your validation logic upon object creation. + + return nil +} + +// ValidateUpdate implements webhook.Validator so a webhook will be registered for the type +func (r *TrafficQoSMapping) ValidateUpdate(old runtime.Object) error { + trafficqosmappinglog.Info("validate update", "name", r.Name) + + // TODO(user): fill in your validation logic upon object update. + return nil +} + +// ValidateDelete implements webhook.Validator so a webhook will be registered for the type +func (r *TrafficQoSMapping) ValidateDelete() error { + trafficqosmappinglog.Info("validate delete", "name", r.Name) + + // TODO(user): fill in your validation logic upon object deletion. + return nil +} diff --git a/api/v1alpha1/trafficqosmappping_types.go b/api/v1alpha1/trafficqosmappping_types.go new file mode 100644 index 0000000..d2a1b53 --- /dev/null +++ b/api/v1alpha1/trafficqosmappping_types.go @@ -0,0 +1,63 @@ +// Copyright 2022 Database Mesh Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package v1alpha1 + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN! +// NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized. + +// TrafficQoSMappingSpec defines the desired state of TrafficQoSMapping +type TrafficQoSMappingSpec struct { + // INSERT ADDITIONAL SPEC FIELDS - desired state of cluster + // Important: Run "make" to regenerate code after modifying this file + + // Foo is an example field of TrafficQoSMapping. Edit trafficqos_types.go to remove/update + References map[string]string `json:"references` +} + +// TrafficQoSMappingStatus defines the observed state of TrafficQoSMapping +type TrafficQoSMappingStatus struct { + // INSERT ADDITIONAL STATUS FIELD - define observed state of cluster + // Important: Run "make" to regenerate code after modifying this file + + //TODO: add ObservedGeneration +} + +//+kubebuilder:object:root=true +//+kubebuilder:subresource:status + +// TrafficQoSMapping is the Schema for the trafficqos API +type TrafficQoSMapping struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty"` + + Spec TrafficQoSMappingSpec `json:"spec,omitempty"` +} + +//+kubebuilder:object:root=true + +// TrafficQoSMappingList contains a list of TrafficQoSMapping +type TrafficQoSMappingList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata,omitempty"` + Items []TrafficQoSMapping `json:"items"` +} + +func init() { + SchemeBuilder.Register(&TrafficQoSMapping{}, &TrafficQoSMappingList{}) +} diff --git a/manifests/traffic-qos-mapping-v1alpha1.yaml b/manifests/traffic-qos-mapping-v1alpha1.yaml new file mode 100644 index 0000000..d1940ff --- /dev/null +++ b/manifests/traffic-qos-mapping-v1alpha1.yaml @@ -0,0 +1,54 @@ + +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + annotations: + controller-gen.kubebuilder.io/version: v0.7.0 + creationTimestamp: null + name: trafficqosmapping.database-mesh.io +spec: + group: database-mesh.io + names: + kind: TrafficQoSMapping + listKind: TrafficQoSMappingList + plural: trafficqosmappings + singular: trafficqosmapping + # scope: Namespaced + versions: + - name: v1alpha1 + schema: + openAPIV3Schema: + description: TrafficQoSMapping is the Schema for the trafficqos API + properties: + apiVersion: + description: 'APIVersion defines the versioned schema of this representation + of an object. Servers should convert recognized schemas to the latest + internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources' + type: string + kind: + description: 'Kind is a string value representing the REST resource this + object represents. Servers may infer this from the endpoint the client + submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds' + type: string + metadata: + type: object + spec: + description: TrafficQoSMappingSpec defines the desired state of TrafficQoSMapping + properties: + references: + type: map + required: + - references + type: object + type: object + served: true + storage: true + subresources: + status: {} +status: + acceptedNames: + kind: "" + plural: "" + conditions: [] + storedVersions: [] From ac4c12a405452d9a5d26a39edab1d9b07f132404 Mon Sep 17 00:00:00 2001 From: mlycore Date: Mon, 28 Mar 2022 11:54:29 +0800 Subject: [PATCH 21/31] feat(crd): update generated deep copy of crds --- api/v1alpha1/zz_generated.deepcopy.go | 102 ++++++++++++++++++++++++-- 1 file changed, 96 insertions(+), 6 deletions(-) diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index 068bd5e..a92a304 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -23,6 +23,8 @@ import ( "k8s.io/apimachinery/pkg/runtime" ) + + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *TrafficQoS) DeepCopyInto(out *TrafficQoS) { *out = *in @@ -252,11 +254,6 @@ func (in *VirtualDatabaseSpec) DeepCopy() *VirtualDatabaseSpec { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *VirtualDatabaseStatus) DeepCopyInto(out *VirtualDatabaseStatus) { *out = *in - if in.PodList != nil { - in, out := &in.PodList, &out.PodList - *out = make([]string, len(*in)) - copy(*out, *in) - } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new VirtualDatabaseStatus. @@ -267,4 +264,97 @@ func (in *VirtualDatabaseStatus) DeepCopy() *VirtualDatabaseStatus { out := new(VirtualDatabaseStatus) in.DeepCopyInto(out) return out -} \ No newline at end of file +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *TrafficQoSMapping) DeepCopyInto(out *TrafficQoSMapping) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + in.Spec.DeepCopyInto(&out.Spec) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TrafficQoS. +func (in *TrafficQoSMapping) DeepCopy() *TrafficQoSMapping { + if in == nil { + return nil + } + out := new(TrafficQoSMapping) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *TrafficQoSMapping) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *TrafficQoSMappingList) DeepCopyInto(out *TrafficQoSMappingList) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ListMeta.DeepCopyInto(&out.ListMeta) + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]TrafficQoSMapping, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TrafficQoSList. +func (in *TrafficQoSMappingList) DeepCopy() *TrafficQoSMappingList { + if in == nil { + return nil + } + out := new(TrafficQoSMappingList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *TrafficQoSMappingList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *TrafficQoSMappingSpec) DeepCopyInto(out *TrafficQoSMappingSpec) { + *out = *in + if in.References != nil { + in, out := &in.References, &out.References + *out = map[string]string{} + *out = *in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TrafficQoSSpec. +func (in *TrafficQoSMappingSpec) DeepCopy() *TrafficQoSMappingSpec { + if in == nil { + return nil + } + out := new(TrafficQoSMappingSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *TrafficQoSMappingStatus) DeepCopyInto(out *TrafficQoSMappingStatus) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TrafficQoSStatus. +func (in *TrafficQoSMappingStatus) DeepCopy() *TrafficQoSMappingStatus { + if in == nil { + return nil + } + out := new(TrafficQoSMappingStatus) + in.DeepCopyInto(out) + return out +} From 70b06c770d08ec49279e9b65b03a9e29d5764408 Mon Sep 17 00:00:00 2001 From: mlycore Date: Mon, 28 Mar 2022 11:55:23 +0800 Subject: [PATCH 22/31] chore(clang): remove c files --- pkg/bpf/load.go | 2 +- pkg/bpf/sock_filter.c | 193 ------------------------------------------ pkg/bpf/tc.c | 88 ------------------- 3 files changed, 1 insertion(+), 282 deletions(-) delete mode 100644 pkg/bpf/sock_filter.c delete mode 100644 pkg/bpf/tc.c diff --git a/pkg/bpf/load.go b/pkg/bpf/load.go index 144d7e3..52feb35 100644 --- a/pkg/bpf/load.go +++ b/pkg/bpf/load.go @@ -19,7 +19,7 @@ import ( "encoding/binary" "fmt" - "C" + // "C" "net" "syscall" diff --git a/pkg/bpf/sock_filter.c b/pkg/bpf/sock_filter.c deleted file mode 100644 index 321166c..0000000 --- a/pkg/bpf/sock_filter.c +++ /dev/null @@ -1,193 +0,0 @@ -// Copyright 2022 Database Mesh Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include -#include -#include -#include -#include -#include "headers/bpf_helpers.h" -#include "headers/bpf_endian.h" - -struct bpf_map_def SEC("maps") filter_helper = { - .type = BPF_MAP_TYPE_HASH, - .key_size = sizeof(__u8), - .value_size = sizeof(__u16), - .max_entries = 2, -}; - -struct bpf_map_def SEC("maps") buf = { - .type = BPF_MAP_TYPE_ARRAY, - .key_size = sizeof(__u32), - .value_size = sizeof(__u8), - .max_entries = 1<<16, -}; - -struct bpf_map_def SEC("maps") jmp_table = { - .type = BPF_MAP_TYPE_PROG_ARRAY, - .key_size = sizeof(__u32), - .value_size = sizeof(__u32), - .max_entries = 100, -}; - -struct event_key { - __u8 seq; - __u16 sport; - __u16 dport; - __u32 saddr; - __u32 daddr; -}; - -struct event_value { - __u32 payload_offset; - __u32 my_pkt_len; - __u32 class_id; -}; - -struct bpf_map_def SEC("maps") my_pkt = { - .type = BPF_MAP_TYPE_HASH, - .key_size = sizeof(struct event_key), - .value_size = sizeof(struct event_value), - .max_entries = 2048, -}; - -// used by tail call -struct bpf_map_def SEC("maps") tmp_pkt_evt = { - .type = BPF_MAP_TYPE_HASH, - .key_size = sizeof(__u8), - .value_size = sizeof(struct event_key), - .max_entries = 1, -}; - -struct bpf_map_def SEC("maps") my_pkt_evt = { - .type = BPF_MAP_TYPE_RINGBUF, - .max_entries = 4096, -}; - - -SEC("socket") -int sql_filter(struct __sk_buff *skb) { - if (skb->protocol != bpf_htons(ETH_P_IP)) { - return 0; - } - - struct iphdr iph; - bpf_skb_load_bytes(skb, ETH_HLEN, &iph, sizeof(iph)); - - if (iph.protocol != IPPROTO_TCP) { - return 0; - } - - __u32 ip_hlen = iph.ihl << 2; - - struct tcphdr tcph; - - bpf_skb_load_bytes(skb, ETH_HLEN + sizeof(iph), &tcph, sizeof(tcph)); - - __u32 tcp_hlen = tcph.doff << 2; - - __u8 filter_port_key = 0; - __u16 *port = bpf_map_lookup_elem(&filter_helper, &filter_port_key); - if (!port) return 0; - - if (bpf_ntohs(tcph.dest) != *port) { - return 0; - } - - __u32 payload_offset = ETH_HLEN + ip_hlen + tcp_hlen; - - __u8 header[5]; - bpf_skb_load_bytes(skb, payload_offset, &header, sizeof(header)); - - if (header[4] != 3) { - return 0; - } - - __u32 my_pkt_len = header[0] | header[1] << 8 | header[2] << 16; - - struct event_key evt_key; - __builtin_memset(&evt_key, 0, sizeof(struct event_key)); - evt_key.saddr = iph.saddr; - evt_key.sport = tcph.source, - evt_key.daddr = iph.daddr, - evt_key.dport = tcph.dest, - evt_key.seq = header[3]; - - - struct event_value evt_value; - __builtin_memset(&evt_value, 0, sizeof(struct event_value)); - evt_value.payload_offset = payload_offset; - evt_value.my_pkt_len = my_pkt_len; - evt_value.class_id = 0; - - bpf_map_update_elem(&my_pkt, &evt_key, &evt_value, BPF_ANY); - - __u8 tmp_key = 0; - bpf_map_update_elem(&tmp_pkt_evt, &tmp_key, &evt_key, BPF_ANY); - - __u8 b; - int i = 0; - __u32 buf_idx = 0; - #pragma unroll - for (i = 0; i < 200; i++) { - if (buf_idx == my_pkt_len - 1) { - bpf_ringbuf_output(&my_pkt_evt, &evt_key, sizeof(struct event_key), 0); - return -1; - } - if (bpf_skb_load_bytes(skb, payload_offset+5+i, &b, sizeof(b)) == 0) { - if (bpf_map_update_elem(&buf, &buf_idx, &b, BPF_ANY) == 0) { - buf_idx++; - } - } - } - - bpf_tail_call(skb, &jmp_table, 0); - - return -1; -} - -SEC("socket_1") -int sql_filter_1(struct __sk_buff *skb) { - __u8 tmp_key = 0; - - struct event_key *tmp_evt_key = bpf_map_lookup_elem(&tmp_pkt_evt, &tmp_key); - - if (!tmp_key) return -1; - - struct event_value *tmp_value = bpf_map_lookup_elem(&tmp_pkt_evt, &tmp_evt_key); - - if (!tmp_value) return -1; - - __u32 payload_offset = tmp_value->payload_offset; - __u32 my_pkt_len = tmp_value->my_pkt_len; - __u8 b; - __u32 buf_idx = 200; - #pragma unroll - for (int i = 200; i < 2048; i++) { - if (buf_idx == my_pkt_len - 1) { - bpf_ringbuf_output(&my_pkt_evt, tmp_value, sizeof(*tmp_value), 0); - return -1; - } - - if (bpf_skb_load_bytes(skb, payload_offset+5+i, &b, sizeof(b)) == 0) { - if (bpf_map_update_elem(&buf, &buf_idx, &b, BPF_ANY) == 0) { - buf_idx++; - } - } - } - - return -1; -} - -char LICENSE[] SEC("license") = "GPL"; \ No newline at end of file diff --git a/pkg/bpf/tc.c b/pkg/bpf/tc.c deleted file mode 100644 index 59b10a6..0000000 --- a/pkg/bpf/tc.c +++ /dev/null @@ -1,88 +0,0 @@ -#include -#include -#include -#include -#include -#include -#include "headers/bpf_helpers.h" -#include "headers/bpf_endian.h" - -// bpf_elf map definition from https://github.com/shemminger/iproute2/blob/main/include/bpf_elf.h -struct bpf_elf_map { - unsigned int type; - unsigned int size_key; - unsigned int size_value; - unsigned int max_elem; - unsigned int flags; - unsigned int id; - unsigned int pinning; - unsigned int inner_id; - unsigned int inner_idx; -}; - -struct event_key { - __u8 seq; - __u16 sport; - __u16 dport; - __u32 saddr; - __u32 daddr; -}; - -struct event_value { - __u32 payload_offset; - __u32 my_pkt_len; - __u32 class_id; -}; - -struct bpf_elf_map SEC("maps") my_pkt = { - .type = BPF_MAP_TYPE_HASH, - .size_key = sizeof(struct event_key), - .size_value = sizeof(struct event_value), - // pin path default is /sys/fs/bpf/tc/globals/my_pkt - // waterline can write qos rule to my_pkt by pin path - .pinning = 2, -}; - -// attach to eth0 || cni0 || docker0 -SEC("classifier") -int tc_egress(struct __sk_buff *skb) { - if (skb->protocol != bpf_htons(ETH_P_IP)) { - return TC_ACT_OK; - } - - struct iphdr iph; - bpf_skb_load_bytes(skb, ETH_HLEN, &iph, sizeof(iph)); - - if (iph.protocol != IPPROTO_TCP) { - return TC_ACT_OK; - } - - //__u32 ip_hlen = iph.ihl << 2; - - struct tcphdr tcph; - - bpf_skb_load_bytes(skb, ETH_HLEN + sizeof(iph), &tcph, sizeof(tcph)); - - //__u32 tcp_hlen = tcph.doff << 2; - - struct event_key evt_key; - __builtin_memset(&evt_key, 0, sizeof(struct event_key)); - evt_key.saddr = iph.daddr; - evt_key.sport = tcph.dest; - evt_key.daddr = iph.saddr; - evt_key.dport = tcph.source; - evt_key.seq = 0; - - struct event_value *evt_value = bpf_map_lookup_elem(&my_pkt, &evt_key); - if (!evt_value) return TC_ACT_OK; - - __u32 payload_offset = evt_value->payload_offset; - __u8 header[4]; - bpf_skb_load_bytes(skb, payload_offset, &header, sizeof(header)); - if (header[3] < 1) return TC_ACT_OK; - - skb->tc_classid = evt_value->class_id; - return TC_ACT_OK; -} - -char LICENSE[] SEC("license") = "GPL"; \ No newline at end of file From 3e744b8ae7283ffc2785c6fa13d9009956883b6d Mon Sep 17 00:00:00 2001 From: mlycore Date: Mon, 28 Mar 2022 11:56:32 +0800 Subject: [PATCH 23/31] refactor(trafficqosmapping): update trafficqos related references --- .../controller.go | 22 ++--- .../trafficqosmapping/controller.go | 93 +++++++++++++++++++ pkg/manager/manager.go | 15 ++- pkg/tc/tc.go | 4 +- 4 files changed, 118 insertions(+), 16 deletions(-) rename pkg/kubernetes/controllers/{sqltrafficqos => trafficqos}/controller.go (81%) create mode 100644 pkg/kubernetes/controllers/trafficqosmapping/controller.go diff --git a/pkg/kubernetes/controllers/sqltrafficqos/controller.go b/pkg/kubernetes/controllers/trafficqos/controller.go similarity index 81% rename from pkg/kubernetes/controllers/sqltrafficqos/controller.go rename to pkg/kubernetes/controllers/trafficqos/controller.go index 97f995b..b38a3d5 100644 --- a/pkg/kubernetes/controllers/sqltrafficqos/controller.go +++ b/pkg/kubernetes/controllers/trafficqos/controller.go @@ -28,8 +28,8 @@ import ( "github.com/database-mesh/waterline/pkg/tc" ) -// SQLTrafficQoSReconciler reconciles a SQLTrafficQoS object -type SQLTrafficQoSReconciler struct { +// TrafficQoSReconciler reconciles a TrafficQoS object +type TrafficQoSReconciler struct { client.Client Scheme *runtime.Scheme } @@ -41,29 +41,29 @@ type SQLTrafficQoSReconciler struct { // Reconcile is part of the main kubernetes reconciliation loop which aims to // move the current state of the cluster closer to the desired state. // TODO(user): Modify the Reconcile function to compare the state specified by -// the SQLTrafficQoS object against the actual cluster state, and then +// the TrafficQoS object against the actual cluster state, and then // perform operations to make the cluster state reflect the state specified by // the user. // // For more details, check Reconcile and its Result here: // - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.10.0/pkg/reconcile -func (r *SQLTrafficQoSReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { +func (r *TrafficQoSReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { // _ = log.FromContext(ctx) // TODO(user): your logic here - obj := &v1alpha1.SQLTrafficQoS{} + obj := &v1alpha1.TrafficQoS{} if err := r.Client.Get(ctx, req.NamespacedName, obj); err != nil { - log.Errorf("get SQLTrafficQos error: %s", err) + log.Errorf("get TrafficQos error: %s", err) return ctrl.Result{}, nil } - // TODO: sync SQLTrafficQoSStatus + // TODO: sync TrafficQoSStatus defer func() { }() // TODO: add logic, remove VirtualDatabase. - // Read SQLTrafficQoS for basic QoS class up. + // Read TrafficQoS for basic QoS class up. // Read VirtualDatabase for application-level QoS after a Pod was scheduled on this Node //TODO: all rules should be removed once the resource was deleted @@ -80,14 +80,14 @@ func (r *SQLTrafficQoSReconciler) Reconcile(ctx context.Context, req ctrl.Reques return ctrl.Result{Requeue: true}, nil } - log.Infof("SQLTrafficQoS: %#v", obj) + log.Infof("TrafficQoS: %#v", obj) return ctrl.Result{}, nil } // SetupWithManager sets up the controller with the Manager. -func (r *SQLTrafficQoSReconciler) SetupWithManager(mgr ctrl.Manager) error { +func (r *TrafficQoSReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). - For(&v1alpha1.SQLTrafficQoS{}). + For(&v1alpha1.TrafficQoS{}). Complete(r) } diff --git a/pkg/kubernetes/controllers/trafficqosmapping/controller.go b/pkg/kubernetes/controllers/trafficqosmapping/controller.go new file mode 100644 index 0000000..4076728 --- /dev/null +++ b/pkg/kubernetes/controllers/trafficqosmapping/controller.go @@ -0,0 +1,93 @@ +// Copyright 2022 Database Mesh Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kubernetes + +import ( + "context" + + "k8s.io/apimachinery/pkg/runtime" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + + // "sigs.k8s.io/controller-runtime/pkg/log" + "github.com/mlycore/log" + + "github.com/database-mesh/waterline/api/v1alpha1" + "github.com/database-mesh/waterline/pkg/tc" +) + +// TrafficQoSMappingReconciler reconciles a TrafficQoSMapping object +type TrafficQoSMappingReconciler struct { + client.Client + Scheme *runtime.Scheme +} + +//+kubebuilder:rbac:groups=database-mesh.io.my.domain,resources=sqltrafficqos,verbs=get;list;watch;create;update;patch;delete +//+kubebuilder:rbac:groups=database-mesh.io.my.domain,resources=sqltrafficqos/status,verbs=get;update;patch +//+kubebuilder:rbac:groups=database-mesh.io.my.domain,resources=sqltrafficqos/finalizers,verbs=update + +// Reconcile is part of the main kubernetes reconciliation loop which aims to +// move the current state of the cluster closer to the desired state. +// TODO(user): Modify the Reconcile function to compare the state specified by +// the TrafficQoSMapping object against the actual cluster state, and then +// perform operations to make the cluster state reflect the state specified by +// the user. +// +// For more details, check Reconcile and its Result here: +// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.10.0/pkg/reconcile +func (r *TrafficQoSMappingReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + // _ = log.FromContext(ctx) + + // TODO(user): your logic here + obj := &v1alpha1.TrafficQoSMapping{} + if err := r.Client.Get(ctx, req.NamespacedName, obj); err != nil { + log.Errorf("get TrafficQos error: %s", err) + return ctrl.Result{}, nil + } + + // TODO: sync TrafficQoSMappingStatus + defer func() { + + }() + + // TODO: add logic, remove VirtualDatabase. + // Read TrafficQoSMapping for basic QoS class up. + // Read VirtualDatabase for application-level QoS after a Pod was scheduled on this Node + + //TODO: all rules should be removed once the resource was deleted + //TODO: is there an exception when more than one resource was created + + shaper, err := tc.NewTcShaper(*obj, "1000M") + if err != nil { + log.Errorf("get shaper error: %s", err) + return ctrl.Result{Requeue: true}, nil + } + + if err = shaper.AddClasses(); err != nil { + log.Errorf("add classes error: %s", err) + return ctrl.Result{Requeue: true}, nil + } + + log.Infof("TrafficQoSMapping: %#v", obj) + + return ctrl.Result{}, nil +} + +// SetupWithManager sets up the controller with the Manager. +func (r *TrafficQoSMappingReconciler) SetupWithManager(mgr ctrl.Manager) error { + return ctrl.NewControllerManagedBy(mgr). + For(&v1alpha1.TrafficQoSMapping{}). + Complete(r) +} diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index e303ff3..e53c291 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -22,7 +22,8 @@ import ( "github.com/database-mesh/waterline/api/v1alpha1" "github.com/database-mesh/waterline/pkg/bpf" "github.com/database-mesh/waterline/pkg/cri" - sqltrafficqos "github.com/database-mesh/waterline/pkg/kubernetes/controllers/sqltrafficqos" + trafficqos "github.com/database-mesh/waterline/pkg/kubernetes/controllers/trafficqos" + trafficqosmapping "github.com/database-mesh/waterline/pkg/kubernetes/controllers/trafficqosmapping" virtualdatabase "github.com/database-mesh/waterline/pkg/kubernetes/controllers/virtualdatabase" "github.com/database-mesh/waterline/pkg/kubernetes/watcher" "github.com/database-mesh/waterline/pkg/tc" @@ -64,11 +65,19 @@ func (m *Manager) WatchAndHandle() error { } func (m *Manager) Bootstrap() error { - if err := (&sqltrafficqos.SQLTrafficQoSReconciler{ + if err := (&trafficqos.TrafficQoSReconciler{ Client: m.Mgr.GetClient(), Scheme: m.Mgr.GetScheme(), }).SetupWithManager(m.Mgr); err != nil { - log.Errorf("sqltrafficqos setupWithManager error: %s", err) + log.Errorf("trafficqos setupWithManager error: %s", err) + return err + } + + if err := (&trafficqosmapping.TrafficQoSMappingReconciler{ + Client: m.Mgr.GetClient(), + Scheme: m.Mgr.GetScheme(), + }).SetupWithManager(m.Mgr); err != nil { + log.Errorf("trafficqosmapping setupWithManager error: %s", err) return err } diff --git a/pkg/tc/tc.go b/pkg/tc/tc.go index d4dbc95..e60a8fa 100644 --- a/pkg/tc/tc.go +++ b/pkg/tc/tc.go @@ -26,12 +26,12 @@ import ( ) type Shaper struct { - qos v1alpha1.SQLTrafficQoS + qos v1alpha1.TrafficQoS link netlink.Link totalBandWidth string } -func NewTcShaper(qos v1alpha1.SQLTrafficQoS, totalBandWidth string) (*Shaper, error) { +func NewTcShaper(qos v1alpha1.TrafficQoS, totalBandWidth string) (*Shaper, error) { link, err := netlink.LinkByName(qos.Spec.NetworkDevice) if err != nil { return nil, err From 7e1d2bbe05352ab92a97805f12d1e725f7e555af Mon Sep 17 00:00:00 2001 From: mlycore Date: Mon, 28 Mar 2022 14:12:01 +0800 Subject: [PATCH 24/31] fix: remove reconcile in TrafficQoSMapping --- .../controllers/trafficqosmapping/controller.go | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/pkg/kubernetes/controllers/trafficqosmapping/controller.go b/pkg/kubernetes/controllers/trafficqosmapping/controller.go index 4076728..c8a7a26 100644 --- a/pkg/kubernetes/controllers/trafficqosmapping/controller.go +++ b/pkg/kubernetes/controllers/trafficqosmapping/controller.go @@ -25,7 +25,6 @@ import ( "github.com/mlycore/log" "github.com/database-mesh/waterline/api/v1alpha1" - "github.com/database-mesh/waterline/pkg/tc" ) // TrafficQoSMappingReconciler reconciles a TrafficQoSMapping object @@ -69,17 +68,6 @@ func (r *TrafficQoSMappingReconciler) Reconcile(ctx context.Context, req ctrl.Re //TODO: all rules should be removed once the resource was deleted //TODO: is there an exception when more than one resource was created - shaper, err := tc.NewTcShaper(*obj, "1000M") - if err != nil { - log.Errorf("get shaper error: %s", err) - return ctrl.Result{Requeue: true}, nil - } - - if err = shaper.AddClasses(); err != nil { - log.Errorf("add classes error: %s", err) - return ctrl.Result{Requeue: true}, nil - } - log.Infof("TrafficQoSMapping: %#v", obj) return ctrl.Result{}, nil From dd1fcdb6831df158cc3ec5221677d4330345c201 Mon Sep 17 00:00:00 2001 From: mlycore Date: Mon, 28 Mar 2022 14:24:37 +0800 Subject: [PATCH 25/31] chore(crd): fix object type in trafficqosmapping --- manifests/traffic-qos-mapping-v1alpha1.yaml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/manifests/traffic-qos-mapping-v1alpha1.yaml b/manifests/traffic-qos-mapping-v1alpha1.yaml index d1940ff..36cfe3d 100644 --- a/manifests/traffic-qos-mapping-v1alpha1.yaml +++ b/manifests/traffic-qos-mapping-v1alpha1.yaml @@ -12,9 +12,9 @@ spec: names: kind: TrafficQoSMapping listKind: TrafficQoSMappingList - plural: trafficqosmappings + plural: trafficqosmapping singular: trafficqosmapping - # scope: Namespaced + scope: Namespaced versions: - name: v1alpha1 schema: @@ -37,7 +37,7 @@ spec: description: TrafficQoSMappingSpec defines the desired state of TrafficQoSMapping properties: references: - type: map + type: object required: - references type: object From 04313b9816531fd3d49eeb017c3b95aa61da55de Mon Sep 17 00:00:00 2001 From: mlycore Date: Mon, 28 Mar 2022 16:23:33 +0800 Subject: [PATCH 26/31] fix(tc): add invalid argument to tc addclass --- pkg/tc/tc.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/pkg/tc/tc.go b/pkg/tc/tc.go index e60a8fa..84fff99 100644 --- a/pkg/tc/tc.go +++ b/pkg/tc/tc.go @@ -17,6 +17,7 @@ package tc import ( // "C" "sort" + "strings" v1alpha1 "github.com/database-mesh/waterline/api/v1alpha1" "github.com/mlycore/log" @@ -80,12 +81,14 @@ func (t *Shaper) addRootHandle() error { } func (t *Shaper) AddClasses() error { - if err := t.addHtbQdisc(); err != nil { + err := t.addHtbQdisc() + if err != nil && !strings.Contains(err.Error(), "invalid argument") { log.Errorf("add htb qdisc error: %s", err) return err } - if err := t.addRootHandle(); err != nil { + err = t.addRootHandle() + if err != nil && !strings.Contains(err.Error(), "invalid argument") { log.Errorf("add root handle error: %s", err) return err } From 9e1a5c8b72bc0c8ef4e4f2c37142e2103e9ab1b3 Mon Sep 17 00:00:00 2001 From: mlycore Date: Mon, 28 Mar 2022 16:45:44 +0800 Subject: [PATCH 27/31] fix(tc): using minor of current classes for new class --- pkg/tc/tc.go | 21 ++++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/pkg/tc/tc.go b/pkg/tc/tc.go index 84fff99..523aee2 100644 --- a/pkg/tc/tc.go +++ b/pkg/tc/tc.go @@ -99,9 +99,24 @@ func (t *Shaper) AddClasses() error { return rules[i].Rate < rules[j].Rate }) + classes, err := t.ListClass() + if err != nil { + log.Errorf("list class error: %s", err) + return err + } + + var base uint16 + for _, c := range classes { + // c.Attrs().Handle + _, minor := netlink.MajorMinor(c.Attrs().Handle) + if base < minor { + base = minor + } + } + //TODO: add error handling. for idx, rule := range rules { - if err := t.addClass(idx, rule); err != nil { + if err := t.addClass(uint16(idx)+base, rule); err != nil { log.Errorf("add class error: %s, rule: %s", err, rule) return err } @@ -111,12 +126,12 @@ func (t *Shaper) AddClasses() error { } // add htb class -func (t *Shaper) addClass(idx int, rule v1alpha1.TrafficQoSGroup) error { +func (t *Shaper) addClass(idx uint16, rule v1alpha1.TrafficQoSGroup) error { attrs := netlink.ClassAttrs{ LinkIndex: t.link.Attrs().Index, Parent: netlink.MakeHandle(1, 1), //exclude 0, 1 - Handle: netlink.MakeHandle(1, uint16(idx+2)), + Handle: netlink.MakeHandle(1, idx+2), } rateValue, err := resource.ParseQuantity(rule.Rate) From 27d94d3156964367984b4e4caee9d10b8f983468 Mon Sep 17 00:00:00 2001 From: mlycore Date: Mon, 28 Mar 2022 16:52:50 +0800 Subject: [PATCH 28/31] fix(tc): using next classid --- pkg/tc/tc.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/tc/tc.go b/pkg/tc/tc.go index 523aee2..d06a9a7 100644 --- a/pkg/tc/tc.go +++ b/pkg/tc/tc.go @@ -131,7 +131,7 @@ func (t *Shaper) addClass(idx uint16, rule v1alpha1.TrafficQoSGroup) error { LinkIndex: t.link.Attrs().Index, Parent: netlink.MakeHandle(1, 1), //exclude 0, 1 - Handle: netlink.MakeHandle(1, idx+2), + Handle: netlink.MakeHandle(1, idx+1), } rateValue, err := resource.ParseQuantity(rule.Rate) From d2c9cecab5784d3f7e1673f308e39407e3b9c22e Mon Sep 17 00:00:00 2001 From: mlycore Date: Mon, 28 Mar 2022 16:53:34 +0800 Subject: [PATCH 29/31] chore(crd): remove required flag for trafficqos ceil --- manifests/traffic-qos-v1alpha1.yaml | 1 - 1 file changed, 1 deletion(-) diff --git a/manifests/traffic-qos-v1alpha1.yaml b/manifests/traffic-qos-v1alpha1.yaml index 9107984..8ef694e 100644 --- a/manifests/traffic-qos-v1alpha1.yaml +++ b/manifests/traffic-qos-v1alpha1.yaml @@ -44,7 +44,6 @@ spec: rate: type: string required: - - ceil - rate type: object type: array From a8ea40fabb1c27e4b8bb53fc0cde58107c04fc84 Mon Sep 17 00:00:00 2001 From: mlycore Date: Mon, 28 Mar 2022 19:27:21 +0800 Subject: [PATCH 30/31] wip: add controllers --- pkg/bpf/load.go | 1 + pkg/kubernetes/controllers/trafficqos/controller.go | 2 ++ pkg/kubernetes/controllers/virtualdatabase/controller.go | 3 +++ pkg/manager/manager.go | 2 ++ pkg/tc/tc.go | 2 +- 5 files changed, 9 insertions(+), 1 deletion(-) diff --git a/pkg/bpf/load.go b/pkg/bpf/load.go index 52feb35..473f25e 100644 --- a/pkg/bpf/load.go +++ b/pkg/bpf/load.go @@ -146,6 +146,7 @@ func (l *Loader) LoadTcPkgMap() (*ebpf.Map, error) { return ebpf.LoadPinnedMap(TcPktMap, nil) } func (l *Loader) CalcQoS(query string) uint32 { + //TODO: virtual database tc argument return 0 } diff --git a/pkg/kubernetes/controllers/trafficqos/controller.go b/pkg/kubernetes/controllers/trafficqos/controller.go index b38a3d5..7ec76e3 100644 --- a/pkg/kubernetes/controllers/trafficqos/controller.go +++ b/pkg/kubernetes/controllers/trafficqos/controller.go @@ -69,6 +69,8 @@ func (r *TrafficQoSReconciler) Reconcile(ctx context.Context, req ctrl.Request) //TODO: all rules should be removed once the resource was deleted //TODO: is there an exception when more than one resource was created + //TODO: add check for existed class rules + shaper, err := tc.NewTcShaper(*obj, "1000M") if err != nil { log.Errorf("get shaper error: %s", err) diff --git a/pkg/kubernetes/controllers/virtualdatabase/controller.go b/pkg/kubernetes/controllers/virtualdatabase/controller.go index e894786..5a5b1fa 100644 --- a/pkg/kubernetes/controllers/virtualdatabase/controller.go +++ b/pkg/kubernetes/controllers/virtualdatabase/controller.go @@ -60,6 +60,9 @@ func (r *VirtualDatabaseReconciler) Reconcile(ctx context.Context, req ctrl.Requ }() + //TODO: add tc class argument + //TODO: read mapping + // TODO: load SockFilter // l := &bpf.Loader{} // l.Load() diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index e53c291..8f5b2aa 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -138,10 +138,12 @@ func handleAdded(pod *corev1.Pod, c client.Client, cr cri.ContainerRuntimeInterf if err != nil { return err } + // Add veth egress err = l.Load(ifname, uint16(db.Spec.Server.Port)) if err != nil { return err } + } // TODO: add loader diff --git a/pkg/tc/tc.go b/pkg/tc/tc.go index d06a9a7..8853231 100644 --- a/pkg/tc/tc.go +++ b/pkg/tc/tc.go @@ -80,7 +80,7 @@ func (t *Shaper) addRootHandle() error { return netlink.ClassReplace(class) } -func (t *Shaper) AddClasses() error { +func (t *Shaper) AddClasses() ([]int, error) { err := t.addHtbQdisc() if err != nil && !strings.Contains(err.Error(), "invalid argument") { log.Errorf("add htb qdisc error: %s", err) From 1497bce786fb8fc4ade631989ce571ac6bee6e39 Mon Sep 17 00:00:00 2001 From: mlycore Date: Fri, 1 Apr 2022 16:08:15 +0800 Subject: [PATCH 31/31] chore(crd): remove comments in trafficqosmapping --- api/v1alpha1/trafficqosmappping_types.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/api/v1alpha1/trafficqosmappping_types.go b/api/v1alpha1/trafficqosmappping_types.go index d2a1b53..94f6ef9 100644 --- a/api/v1alpha1/trafficqosmappping_types.go +++ b/api/v1alpha1/trafficqosmappping_types.go @@ -28,6 +28,9 @@ type TrafficQoSMappingSpec struct { // Foo is an example field of TrafficQoSMapping. Edit trafficqos_types.go to remove/update References map[string]string `json:"references` + + // + // 1:1 <- virtualdatabase namespace / name } // TrafficQoSMappingStatus defines the observed state of TrafficQoSMapping