-
Notifications
You must be signed in to change notification settings - Fork 671
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
K8s resolver for the gRPC client #4190
Changes from all commits
f10763a
dc18a10
2c149c6
e85fdb7
000411c
4f69e6e
621ea23
96c662a
1f1f034
aa938d1
fbce105
c1f00d2
5fffa91
5bcd1bf
8bbcd21
c6fbc6b
b0ab79e
6c4dc83
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,179 @@ | ||
package resolver | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"net" | ||
"strings" | ||
"sync" | ||
|
||
"google.golang.org/grpc/grpclog" | ||
"google.golang.org/grpc/resolver" | ||
v1 "k8s.io/api/core/v1" | ||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
"k8s.io/apimachinery/pkg/util/wait" | ||
"k8s.io/client-go/kubernetes" | ||
|
||
"github.com/flyteorg/flyte/flytestdlib/logger" | ||
) | ||
|
||
const ( | ||
Schema = "k8s" | ||
) | ||
|
||
type targetInfo struct { | ||
serviceName string | ||
serviceNamespace string | ||
port string | ||
} | ||
|
||
func (t targetInfo) String() string { | ||
return fmt.Sprintf("%s:///%s.%s:%s", Schema, t.serviceNamespace, t.serviceName, t.port) | ||
} | ||
|
||
// NewBuilder creates a kubeBuilder which is used by grpc resolver. | ||
func NewBuilder(ctx context.Context, client kubernetes.Interface, schema string) resolver.Builder { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we don't intend to expose the scheme, maybe we could have another builder only for test. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. qq: can't we use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I meant we are exposing |
||
return &kubeBuilder{ | ||
ctx: ctx, | ||
k8sClient: client, | ||
schema: schema, | ||
} | ||
} | ||
|
||
type kubeBuilder struct { | ||
ctx context.Context | ||
k8sClient kubernetes.Interface | ||
schema string | ||
} | ||
|
||
func splitServicePortNamespace(hpn string) (service, port, namespace string) { | ||
service = hpn | ||
|
||
colon := strings.LastIndexByte(service, ':') | ||
if colon != -1 { | ||
service, port = service[:colon], service[colon+1:] | ||
} | ||
|
||
parts := strings.SplitN(service, ".", 3) | ||
if len(parts) >= 2 { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we set namespace to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it will use the namespace where agent is at if the namespace is None |
||
service, namespace = parts[0], parts[1] | ||
} | ||
|
||
return | ||
} | ||
|
||
func parseResolverTarget(target resolver.Target) (targetInfo, error) { | ||
var service, port, namespace string | ||
if target.URL.Host == "" { | ||
// k8s:///service.namespace:port | ||
service, port, namespace = splitServicePortNamespace(target.Endpoint()) | ||
} else if target.URL.Port() == "" && target.Endpoint() != "" { | ||
// k8s://namespace/service:port | ||
service, port, _ = splitServicePortNamespace(target.Endpoint()) | ||
namespace = target.URL.Hostname() | ||
} else { | ||
// k8s://service.namespace:port | ||
service, port, namespace = splitServicePortNamespace(target.URL.Host) | ||
} | ||
|
||
if service == "" { | ||
return targetInfo{}, fmt.Errorf("target %s must specify a service", &target.URL) | ||
} | ||
|
||
return targetInfo{ | ||
serviceName: service, | ||
serviceNamespace: namespace, | ||
port: port, | ||
}, nil | ||
} | ||
|
||
// Build creates a new resolver for the given target, e.g. k8s:///flyteagent:flyte:8000. | ||
func (b *kubeBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) { | ||
ti, err := parseResolverTarget(target) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
ctx, cancel := context.WithCancel(b.ctx) | ||
r := &kResolver{ | ||
target: ti, | ||
ctx: ctx, | ||
cancel: cancel, | ||
cc: cc, | ||
k8sClient: b.k8sClient, | ||
} | ||
go wait.Until(r.run, 0, ctx.Done()) | ||
return r, nil | ||
} | ||
|
||
// Scheme returns the scheme supported by this resolver. | ||
// Scheme is defined at https://github.com/grpc/grpc/blob/master/doc/naming.md. | ||
func (b *kubeBuilder) Scheme() string { | ||
return b.schema | ||
} | ||
|
||
type kResolver struct { | ||
target targetInfo | ||
ctx context.Context | ||
cancel context.CancelFunc | ||
cc resolver.ClientConn | ||
k8sClient kubernetes.Interface | ||
// wg is used to enforce Close() to return after the watcher() goroutine has finished. | ||
wg sync.WaitGroup | ||
} | ||
|
||
// ResolveNow is a no-op at this point. | ||
func (k *kResolver) ResolveNow(resolver.ResolveNowOptions) {} | ||
|
||
// Close closes the resolver. | ||
func (k *kResolver) Close() { | ||
k.cancel() | ||
k.wg.Wait() | ||
logger.Infof(k.ctx, "k8s resolver: closed") | ||
} | ||
|
||
func (k *kResolver) resolve(e *v1.Endpoints) { | ||
var newAddrs []resolver.Address | ||
for _, subset := range e.Subsets { | ||
port := k.target.port | ||
|
||
for _, address := range subset.Addresses { | ||
newAddrs = append(newAddrs, resolver.Address{ | ||
Addr: net.JoinHostPort(address.IP, port), | ||
ServerName: fmt.Sprintf("%s.%s", k.target.serviceName, k.target.serviceNamespace), | ||
Metadata: nil, | ||
}) | ||
} | ||
} | ||
err := k.cc.UpdateState(resolver.State{Addresses: newAddrs}) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is cc.UpdateState thread safe? do we need to lock its usage in someway? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. gRPC client uses lock under the hood. https://github.com/grpc/grpc-go/blob/e14d5831b59b0f46853d076640ae85f964164b37/clientconn.go#L829 |
||
if err != nil { | ||
grpclog.Errorf("k8s resolver: failed : %v", err) | ||
} | ||
} | ||
|
||
func (k *kResolver) run() { | ||
k.wg.Add(1) | ||
defer k.wg.Done() | ||
logger.Infof(k.ctx, "Starting k8s resolver for target: %s", k.target) | ||
watcher, err := k.k8sClient.CoreV1().Endpoints(k.target.serviceNamespace).Watch(k.ctx, metav1.ListOptions{FieldSelector: "metadata.name=" + k.target.serviceName}) | ||
if err != nil { | ||
grpclog.Errorf("k8s resolver: failed to create watcher: %v", err) | ||
return | ||
} | ||
|
||
for { | ||
select { | ||
case <-k.ctx.Done(): | ||
return | ||
case event, ok := <-watcher.ResultChan(): | ||
if !ok { | ||
logger.Debugf(k.ctx, "k8s resolver: watcher closed") | ||
return | ||
} | ||
if event.Object == nil { | ||
continue | ||
} | ||
k.resolve(event.Object.(*v1.Endpoints)) | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you were going to rename it to
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah that's what I meant.