Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

K8s resolver for the gRPC client #4190

Merged
merged 18 commits into from
Oct 19, 2023
2 changes: 1 addition & 1 deletion flytepropeller/cmd/controller/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func logAndExit(err error) {
}

func executeRootCmd(baseCtx context.Context, cfg *config2.Config) error {
// set up signals so we handle the first shutdown signal gracefully
// set up signals, so we handle the first shutdown signal gracefully
ctx := signals.SetupSignalHandler(baseCtx)

// set metric keys
Expand Down
4 changes: 4 additions & 0 deletions flytepropeller/pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import (
"context"
"fmt"
resolver2 "github.com/flyteorg/flyte/flytepropeller/pkg/resolver"
pingsutw marked this conversation as resolved.
Show resolved Hide resolved
"google.golang.org/grpc/resolver"
"os"
"runtime/pprof"
"time"
Expand Down Expand Up @@ -562,6 +564,8 @@
return errors.Wrapf(err, "error building Kubernetes Clientset")
}

resolver.Register(resolver2.NewBuilder(kubeClient, resolver2.KubernetesSchema))

Check warning on line 568 in flytepropeller/pkg/controller/controller.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/controller.go#L567-L568

Added lines #L567 - L568 were not covered by tests
flyteworkflowClient, err := clientset.NewForConfig(kubecfg)
if err != nil {
return errors.Wrapf(err, "error building FlyteWorkflow clientset")
Expand Down
182 changes: 182 additions & 0 deletions flytepropeller/pkg/resolver/k8s_resolver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
package resolver

import (
"context"
"fmt"
"github.com/flyteorg/flyte/flytestdlib/logger"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/resolver"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"net"
"strings"
"sync"
)

const (
KubernetesSchema = "k8s"
pingsutw marked this conversation as resolved.
Show resolved Hide resolved
)

type targetInfo struct {
serviceName string
serviceNamespace string
port string
}

// NewBuilder creates a kubeBuilder which is used by grpc resolver.
func NewBuilder(client *kubernetes.Clientset, schema string) resolver.Builder {
return &kubeBuilder{
k8sClient: client,
schema: schema,
}
}

type kubeBuilder struct {
k8sClient *kubernetes.Clientset
schema string
}

func splitServicePortNamespace(hpn string) (service, port, namespace string) {
service = hpn

colon := strings.LastIndexByte(service, ':')
if colon != -1 {
service, port = service[:colon], service[colon+1:]
}

// we want to split into the service name, namespace, and whatever else is left
// this will support fully qualified service names, e.g. {service-name}.<namespace>.svc.<cluster-domain-name>.
// Note that since we look up the endpoints by service name and namespace, we don't care about the
// cluster-domain-name, only that we can parse out the service name and namespace properly.
parts := strings.SplitN(service, ".", 3)
if len(parts) >= 2 {
service, namespace = parts[0], parts[1]
}

return
}

func parseResolverTarget(target resolver.Target) (targetInfo, error) {
var service, port, namespace string
if target.URL.Host == "" {
// kubernetes:///service.namespace:port
service, port, namespace = splitServicePortNamespace(target.Endpoint())
} else if target.URL.Port() == "" && target.Endpoint() != "" {
// kubernetes://namespace/service:port
service, port, _ = splitServicePortNamespace(target.Endpoint())
namespace = target.URL.Hostname()
} else {
// kubernetes://service.namespace:port
service, port, namespace = splitServicePortNamespace(target.URL.Host)
}

if service == "" {
return targetInfo{}, fmt.Errorf("target %s must specify a service", &target.URL)
}

return targetInfo{
serviceName: service,
serviceNamespace: namespace,
port: port,
}, nil
}

// Build creates a new resolver for the given target, e.g. k8s:///flyteagent:flyte:8000.
func (b *kubeBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
ti, err := parseResolverTarget(target)
if err != nil {
return nil, err
}

ctx, cancel := context.WithCancel(context.Background())
r := &kResolver{
target: ti,
ctx: ctx,
cancel: cancel,
cc: cc,
k8sClient: b.k8sClient,
}
go wait.Until(r.run, 0, ctx.Done())
return r, nil
}

// Scheme returns the scheme supported by this resolver.
// Scheme is defined at https://github.com/grpc/grpc/blob/master/doc/naming.md.
func (b *kubeBuilder) Scheme() string {
return b.schema
}

type kResolver struct {
target targetInfo
ctx context.Context
cancel context.CancelFunc
cc resolver.ClientConn
k8sClient *kubernetes.Clientset
// wg is used to enforce Close() to return after the watcher() goroutine has finished.
wg sync.WaitGroup
}

// ResolveNow is a no-op at this point.
func (k *kResolver) ResolveNow(resolver.ResolveNowOptions) {
pingsutw marked this conversation as resolved.
Show resolved Hide resolved
logger.Infof(k.ctx, "k8s resolver: resolveNow")
}

// Close closes the resolver.
func (k *kResolver) Close() {
k.cancel()
k.wg.Wait()
logger.Infof(k.ctx, "k8s resolver: closed")
}

func (k *kResolver) resolve(e *v1.Endpoints) {
var newAddrs []resolver.Address
for _, subset := range e.Subsets {
port := k.target.port

for _, address := range subset.Addresses {
newAddrs = append(newAddrs, resolver.Address{
Addr: net.JoinHostPort(address.IP, port),
ServerName: fmt.Sprintf("%s.%s", k.target.serviceName, k.target.serviceNamespace),
Metadata: nil,
})
}
}
logger.Infof(k.ctx, "k8s resolver: resolved %d addresses", len(newAddrs))
logger.Infof(k.ctx, "k8s resolver: resolved addresses: %v", newAddrs)
err := k.cc.UpdateState(resolver.State{Addresses: newAddrs, ServiceConfig: k.cc.ParseServiceConfig("{\"loadBalancingConfig\": [{\"round_robin\":{}}]}")})
pingsutw marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
grpclog.Errorf("k8s resolver: failed : %v", err)
}
}

func (k *kResolver) run() {
k.wg.Add(1)
defer k.wg.Done()
logger.Infof(k.ctx, "Starting k8s resolver for %s", k.target.serviceName)
watcher, err := k.k8sClient.CoreV1().Endpoints(k.target.serviceNamespace).Watch(k.ctx, metav1.ListOptions{FieldSelector: "metadata.name=" + k.target.serviceName})
if err != nil {
grpclog.Errorf("k8s resolver: failed to create watcher: %v", err)
return
}

for {
select {
case <-k.ctx.Done():
logger.Infof(k.ctx, "k8s resolver: context done")
return
case event, ok := <-watcher.ResultChan():
if !ok {
logger.Infof(k.ctx, "k8s resolver: watcher closed")
pingsutw marked this conversation as resolved.
Show resolved Hide resolved
return
}
logger.Infof(k.ctx, "k8s resolver: received event.type: %v", event.Type)
if event.Object == nil {
continue
}
logger.Infof(k.ctx, "k8s resolver2: received event: %v", event.Object.(*v1.Endpoints))
k.resolve(event.Object.(*v1.Endpoints))
}
}
}
Loading