Skip to content

Commit

Permalink
Merge pull request kube-arbiter#24 from 0xff-dev/main
Browse files Browse the repository at this point in the history
resource tagger: rename field name
  • Loading branch information
nkwangleiGIT authored Oct 28, 2022
2 parents 9b5c1ba + 6dbdd4f commit 142f14c
Show file tree
Hide file tree
Showing 24 changed files with 244 additions and 278 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ COPY staging/ staging/

## Copy the go source and vendor
COPY cmd/server/server.go cmd/server/server.go
COPY internal/ internal/
COPY pkg/ pkg/

RUN ./build-resource-tagger.sh ${CGO} ${ARCH} ${GOOS}

Expand Down
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@ import (
"google.golang.org/grpc"
"k8s.io/klog/v2"

"github.com/kube-arbiter/arbiter-plugins/executor-plugins/resource-tagger/internal/service"
"github.com/kube-arbiter/arbiter-plugins/executor-plugins/default-plugins/pkg"
pb "github.com/kube-arbiter/arbiter/pkg/proto/lib/executor"

_ "github.com/kube-arbiter/arbiter-plugins/executor-plugins/default-plugins/pkg/plugins/resource-updater"
)

const (
Expand All @@ -38,7 +40,6 @@ func main() {
// Load flags from command line
klog.InitFlags(nil)
flag.Parse()

cleanup := func() {
if _, err := os.Stat(sockAddr); err == nil {
if err := os.RemoveAll(sockAddr); err != nil {
Expand All @@ -55,7 +56,7 @@ func main() {
}

server := grpc.NewServer()
execute := service.NewExecuteService()
execute := pkg.NewExecuteService()

pb.RegisterExecuteServer(server, execute)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module github.com/kube-arbiter/arbiter-plugins/executor-plugins/resource-tagger
module github.com/kube-arbiter/arbiter-plugins/executor-plugins/default-plugins

go 1.18

Expand Down
File renamed without changes.
81 changes: 81 additions & 0 deletions executor-plugins/default-plugins/pkg/executor_wrapper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
Copyright 2022 The Arbiter Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package pkg

import (
"context"
"flag"
"fmt"

"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/klog/v2"

executors "github.com/kube-arbiter/arbiter-plugins/executor-plugins/default-plugins/pkg/plugins/resource-updater"
pb "github.com/kube-arbiter/arbiter/pkg/proto/lib/executor"
)

type ExecuteServiceImpl struct {
pb.UnimplementedExecuteServer
}

var (
_ pb.ExecuteServer = (*ExecuteServiceImpl)(nil)
kubeconfig = flag.String("kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.")
)

func NewExecuteService() pb.ExecuteServer {
return new(ExecuteServiceImpl)
}

func (e *ExecuteServiceImpl) Execute(ctx context.Context, message *pb.ExecuteMessage) (*pb.ExecuteResponse, error) {
klog.V(10).Infof("kubeconfig path: %s\n", *kubeconfig)
klog.V(4).Infof("ResourceName: %s, namespace: %s, exprval: %f, condval: %v, actionData: %v, executors: %v\n",
message.ResourceName, message.Namespace, message.ExprVal, message.CondVal, message.ActionData, message.Executors)
resourceBaseFormat := fmt.Sprintf("%s/%s/%s:%s", message.Group, message.Version, message.Resources, message.ResourceName)

if len(message.Executors) == 0 {
klog.Warningf("%s executor is empty, return..", resourceBaseFormat)
return &pb.ExecuteResponse{}, nil
}
var (
config *rest.Config
err error
response *pb.ExecuteResponse
)
if *kubeconfig != "" {
config, err = clientcmd.BuildConfigFromFlags("", *kubeconfig)
} else {
config, err = rest.InClusterConfig()
}
if err != nil {
klog.Fatalf("error when building kubeconfig: %s", err.Error())
}
for _, executor := range message.Executors {
instance, ok := executors.GetExecutor(executor)
if !ok {
klog.Warningf("%s executor is %s, don't match target 'resourceUpdater'", resourceBaseFormat, executor)
continue
}

if response, err = instance.Execute(ctx, config, message); err != nil {
klog.Errorf("%s run %s error: %s\n", resourceBaseFormat, executor, err)
break
}
}
return response, err
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,10 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package service
package label

import (
"context"
"flag"
"fmt"

"k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -27,56 +26,23 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/klog/v2"

pb "github.com/kube-arbiter/arbiter/pkg/proto/lib/executor"
)

type ExecuteServiceImpl struct {
pb.UnimplementedExecuteServer
type LabelExecutor struct {
name string
}

var (
_ pb.ExecuteServer = (*ExecuteServiceImpl)(nil)
kubeconfig = flag.String("kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.")
)

func NewExecuteService() pb.ExecuteServer {
return new(ExecuteServiceImpl)
}

func (e *ExecuteServiceImpl) ExecuteAction(ctx context.Context, message *pb.ExecuteActionMessage) (*pb.ExecuteActionResponse, error) {
return &pb.ExecuteActionResponse{
Action: []string{"label", "none"},
}, nil
func (l *LabelExecutor) Name() string {
return l.name
}

func (e *ExecuteServiceImpl) Execute(ctx context.Context, message *pb.ExecuteMessage) (*pb.ExecuteResponse, error) {
klog.V(10).Infof("kubeconfig path: %s\n", *kubeconfig)
klog.V(4).Infof("ResourceName: %s, namespace: %s, exprval: %f, condval: %v, actionData: %v, behavior: %s\n",
message.ResourceName, message.Namespace, message.ExprVal, message.CondVal, message.ActionData, message.Action)
func (l *LabelExecutor) Execute(ctx context.Context, cfg *rest.Config, message *pb.ExecuteMessage) (*pb.ExecuteResponse, error) {
resourceBaseFormat := fmt.Sprintf("%s/%s/%s:%s", message.Group, message.Version, message.Resources, message.ResourceName)

if message.Action == "none" || message.Action == "" {
klog.Infof("%s action is %s, skip.", resourceBaseFormat, message.Action)
return &pb.ExecuteResponse{Data: ""}, nil
}

var (
config *rest.Config
err error
)

if *kubeconfig != "" {
config, err = clientcmd.BuildConfigFromFlags("", *kubeconfig)
} else {
config, err = rest.InClusterConfig()
}
if err != nil {
klog.Fatalf("error when building kubeconfig: %s", err.Error())
}
dynamicClient, err := dynamic.NewForConfig(config)
dynamicClient, err := dynamic.NewForConfig(cfg)
if err != nil {
panic(err)
}
Expand All @@ -93,16 +59,16 @@ func (e *ExecuteServiceImpl) Execute(ctx context.Context, message *pb.ExecuteMes
resouceToUpdate, err = namespaceableInterface.Get(context.Background(), message.ResourceName, metav1.GetOptions{})
}
if err != nil {
klog.Errorf("get resource %s (int namespace %s) error: %s\n", resourceBaseFormat, message.Namespace, err)
klog.Errorf("get resource %s (in namespace %s) error: %s\n", resourceBaseFormat, message.Namespace, err)
if errors.IsNotFound(err) {
response.Data = fmt.Sprintf("Resource %s not found in namespace %s", resourceBaseFormat, message.Namespace)
return response, nil
}
response.Data = fmt.Sprintf("get resource %s error: %s", resourceBaseFormat, err)
return response, err
}
// Let the custom code to handle how to update the resource
err = e.updateResource(resouceToUpdate, message)

err = UpdateResource(resouceToUpdate, message)
if err != nil {
response.Data = err.Error()
return response, err
Expand All @@ -116,5 +82,10 @@ func (e *ExecuteServiceImpl) Execute(ctx context.Context, message *pb.ExecuteMes
response.Data = fmt.Sprintf("update resource %s error: %s", resourceBaseFormat, err)
return response, err
}

return response, nil
}

func NewLabelExecutor(name string) *LabelExecutor {
return &LabelExecutor{name: name}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package service
package label

import (
"fmt"
Expand All @@ -31,7 +31,7 @@ import (
actionData defines the resource passed to the plugins, it can have arbitrary structure
You can use the .Raw data, and marshel a json object
*/
func (e *ExecuteServiceImpl) updateResource(resouceToUpdate *unstructured.Unstructured, message *pb.ExecuteMessage) (err error) {
func UpdateResource(resouceToUpdate *unstructured.Unstructured, message *pb.ExecuteMessage) (err error) {
resourceName := fmt.Sprintf("%s/%s", resouceToUpdate.GetKind(), resouceToUpdate.GetName())
klog.Infof("start processing resource %s", resourceName)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
Copyright 2022 The Arbiter Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package resourceupdater

import (
"context"
"sync"

"k8s.io/client-go/rest"
"k8s.io/klog/v2"

pb "github.com/kube-arbiter/arbiter/pkg/proto/lib/executor"
)

type Executor interface {
Name() string
Execute(context.Context, *rest.Config, *pb.ExecuteMessage) (*pb.ExecuteResponse, error)
}

var (
once sync.Once
mustRegister map[string]Executor
)

func Register(name string, instance Executor) {
once.Do(func() {
mustRegister = make(map[string]Executor)
})
if _, ok := mustRegister[name]; ok {
klog.Warningf("Executor %s already exists", name)
}

mustRegister[name] = instance
}

func GetExecutor(name string) (Executor, bool) {
v, ok := mustRegister[name]
return v, ok
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
Copyright 2022 The Arbiter Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package resourceupdater

import (
"github.com/kube-arbiter/arbiter-plugins/executor-plugins/default-plugins/pkg/plugins/label"
)

// init: registry some Executor instances.
func init() {
Register("resourceUpdater", label.NewLabelExecutor("resourceUpdater"))
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,9 @@ package execute;
option go_package = "lib/executor";

service Execute {
rpc ExecuteAction(ExecuteActionMessage) returns (ExecuteActionResponse) {}
rpc Execute (ExecuteMessage) returns (ExecuteResponse) {}
}

message ExecuteActionMessage {
}

message ExecuteActionResponse {
repeated string action = 1;
}

enum Kind {
pod = 0;
node = 1;
Expand All @@ -32,7 +24,7 @@ message ExecuteMessage {
string group = 7;
string version = 8;
string resources = 9;
string action = 10;
repeated string executors = 10;
}

message ExecuteResponse {
Expand Down
Loading

0 comments on commit 142f14c

Please sign in to comment.