diff --git a/flytepropeller/pkg/controller/controller.go b/flytepropeller/pkg/controller/controller.go index be26404a98..0cc3936109 100644 --- a/flytepropeller/pkg/controller/controller.go +++ b/flytepropeller/pkg/controller/controller.go @@ -14,6 +14,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "google.golang.org/grpc" _ "google.golang.org/grpc/balancer/roundrobin" //nolint + "google.golang.org/grpc/resolver" apiextensionsclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" apierrors "k8s.io/apimachinery/pkg/api/errors" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -54,6 +55,7 @@ import ( "github.com/flyteorg/flyte/flytestdlib/logger" "github.com/flyteorg/flyte/flytestdlib/promutils" "github.com/flyteorg/flyte/flytestdlib/promutils/labeled" + k8sResolver "github.com/flyteorg/flyte/flytestdlib/resolver" "github.com/flyteorg/flyte/flytestdlib/storage" ) @@ -552,6 +554,8 @@ func StartController(ctx context.Context, cfg *config.Config, defaultNamespace s return errors.Wrapf(err, "error building Kubernetes Clientset") } + resolver.Register(k8sResolver.NewBuilder(ctx, kubeClient, k8sResolver.Schema)) + flyteworkflowClient, err := clientset.NewForConfig(kubecfg) if err != nil { return errors.Wrapf(err, "error building FlyteWorkflow clientset") diff --git a/flytestdlib/go.mod b/flytestdlib/go.mod index 4793b283d2..144e012f23 100644 --- a/flytestdlib/go.mod +++ b/flytestdlib/go.mod @@ -29,6 +29,7 @@ require ( google.golang.org/grpc v1.56.1 google.golang.org/protobuf v1.30.0 gorm.io/gorm v1.22.4 + gotest.tools v2.2.0+incompatible k8s.io/api v0.20.2 k8s.io/apimachinery v0.20.2 k8s.io/client-go v0.0.0-20210217172142-7279fc64d847 @@ -54,6 +55,7 @@ require ( github.com/cespare/xxhash v1.1.0 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect + github.com/evanphx/json-patch v4.9.0+incompatible // indirect github.com/go-logr/logr v0.4.0 // indirect github.com/gofrs/uuid v4.2.0+incompatible // indirect github.com/gogo/protobuf v1.3.2 // indirect @@ -64,6 +66,7 @@ require ( github.com/google/uuid v1.3.0 // indirect github.com/googleapis/enterprise-certificate-proxy v0.2.3 // indirect github.com/googleapis/gax-go/v2 v2.7.1 // indirect + github.com/googleapis/gnostic v0.4.1 // indirect github.com/hashicorp/hcl v1.0.0 // indirect github.com/inconshreveable/mousetrap v1.0.0 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect @@ -92,6 +95,7 @@ require ( golang.org/x/net v0.9.0 // indirect golang.org/x/oauth2 v0.7.0 // indirect golang.org/x/sys v0.7.0 // indirect + golang.org/x/term v0.7.0 // indirect golang.org/x/text v0.9.0 // indirect golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect google.golang.org/api v0.114.0 // indirect @@ -102,7 +106,10 @@ require ( gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect k8s.io/klog/v2 v2.5.0 // indirect + k8s.io/kube-openapi v0.0.0-20201113171705-d219536bb9fd // indirect + k8s.io/utils v0.0.0-20201110183641-67b214c5f920 // indirect sigs.k8s.io/structured-merge-diff/v4 v4.0.3 // indirect + sigs.k8s.io/yaml v1.2.0 // indirect ) replace ( diff --git a/flytestdlib/go.sum b/flytestdlib/go.sum index bbe81e8dd1..28f70b612e 100644 --- a/flytestdlib/go.sum +++ b/flytestdlib/go.sum @@ -133,6 +133,7 @@ github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.m github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/ernesto-jimenez/gogen v0.0.0-20180125220232-d7d4131e6607 h1:cTavhURetDkezJCvxFggiyLeP40Mrk/TtVg2+ycw1Es= github.com/ernesto-jimenez/gogen v0.0.0-20180125220232-d7d4131e6607/go.mod h1:Cg4fM0vhYWOZdgM7RIOSTRNIc8/VT7CXClC3Ni86lu4= +github.com/evanphx/json-patch v4.9.0+incompatible h1:kLcOMZeuLAJvL2BPWLMIj5oaZQobrkAqrL+WFZwQses= github.com/evanphx/json-patch v4.9.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w= github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= @@ -257,6 +258,7 @@ github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+ github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= github.com/googleapis/gax-go/v2 v2.7.1 h1:gF4c0zjUP2H/s/hEGyLA3I0fA2ZWjzYiONAD6cvPr8A= github.com/googleapis/gax-go/v2 v2.7.1/go.mod h1:4orTrqY6hXxxaUL4LHIPl6lGo8vAE38/qKbhSAKP6QI= +github.com/googleapis/gnostic v0.4.1 h1:DLJCy1n/vrD4HPjOvYcT8aYQXpPIzoRZONaYwyycI+I= github.com/googleapis/gnostic v0.4.1/go.mod h1:LRhVm6pbyptWbWbuZ38d1eyptfvIytN3ir6b65WBswg= github.com/googleapis/google-cloud-go-testing v0.0.0-20200911160855-bcd43fbb19e8/go.mod h1:dvDLG8qkwmyD9a/MJJN3XJcT3xFxOKAvTZGvuZmac9g= github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= @@ -267,6 +269,7 @@ github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+l github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= +github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= @@ -338,8 +341,10 @@ github.com/ncw/swift v1.0.53 h1:luHjjTNtekIEvHg5KdAFIBaH7bWfNkefwFnpDffSIks= github.com/ncw/swift v1.0.53/go.mod h1:23YIA4yWVnGwv2dQlN4bB7egfYX6YLn0Yo/S6zZO/ZM= github.com/onsi/ginkgo v0.0.0-20170829012221-11459a886d9c/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/ginkgo v1.11.0 h1:JAKSXpt1YjtLA7YpPiqO9ss6sNXEsPfSGdwN0UHqzrw= github.com/onsi/ginkgo v1.11.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/gomega v0.0.0-20170829124025-dcabb60a477c/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA= +github.com/onsi/gomega v1.7.0 h1:XPnZz8VVBHjVsy1vzJmRwIcSwiUO+JFfrv/xGiigmME= github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/pelletier/go-toml v1.9.4 h1:tjENF6MfZAg8e4ZmZTeWaWiT2vXtsoO6+iuOjFhECwM= github.com/pelletier/go-toml v1.9.4/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c= @@ -601,6 +606,8 @@ golang.org/x/sys v0.7.0 h1:3jlCCIQZPdOYu1h8BkNvLz8Kgwtae2cagcG/VamtZRU= golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.7.0 h1:BEvjmm5fURWqcfbSKTdpkDXYBrUS1c0m8agp14W48vQ= +golang.org/x/term v0.7.0/go.mod h1:P32HKFT3hSsZrRxla30E9HqToFYAQPCMs/zFMBUFqPY= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -781,11 +788,13 @@ gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8 gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= +gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= gopkg.in/ini.v1 v1.66.4 h1:SsAcf+mM7mRZo2nJNGt8mZCjG8ZRaNGMURJw7BsIST4= gopkg.in/ini.v1 v1.66.4/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= @@ -800,6 +809,8 @@ gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gorm.io/gorm v1.22.4 h1:8aPcyEJhY0MAt8aY6Dc524Pn+pO29K+ydu+e/cXSpQM= gorm.io/gorm v1.22.4/go.mod h1:1aeVC+pe9ZmvKZban/gW4QPra7PRoTEssyc922qCAkk= +gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo= +gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= @@ -820,7 +831,9 @@ k8s.io/klog/v2 v2.0.0/go.mod h1:PBfzABfn139FHAV07az/IF9Wp1bkk3vpT2XSJ76fSDE= k8s.io/klog/v2 v2.4.0/go.mod h1:Od+F08eJP+W3HUb4pSrPpgp9DGU4GzlpG/TmITuYh/Y= k8s.io/klog/v2 v2.5.0 h1:8mOnjf1RmUPW6KRqQCfYSZq/K20Unmp3IhuZUhxl8KI= k8s.io/klog/v2 v2.5.0/go.mod h1:hy9LJ/NvuK+iVyP4Ehqva4HxZG/oXyIS3n3Jmire4Ec= +k8s.io/kube-openapi v0.0.0-20201113171705-d219536bb9fd h1:sOHNzJIkytDF6qadMNKhhDRpc6ODik8lVC6nOur7B2c= k8s.io/kube-openapi v0.0.0-20201113171705-d219536bb9fd/go.mod h1:WOJ3KddDSol4tAGcJo0Tvi+dK12EcqSLqcWsryKMpfM= +k8s.io/utils v0.0.0-20201110183641-67b214c5f920 h1:CbnUZsM497iRC5QMVkHwyl8s2tB3g7yaSHkYPkpgelw= k8s.io/utils v0.0.0-20201110183641-67b214c5f920/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA= rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= diff --git a/flytestdlib/resolver/k8s_resolver.go b/flytestdlib/resolver/k8s_resolver.go new file mode 100644 index 0000000000..e6c5f6f480 --- /dev/null +++ b/flytestdlib/resolver/k8s_resolver.go @@ -0,0 +1,179 @@ +package resolver + +import ( + "context" + "fmt" + "net" + "strings" + "sync" + + "google.golang.org/grpc/grpclog" + "google.golang.org/grpc/resolver" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes" + + "github.com/flyteorg/flyte/flytestdlib/logger" +) + +const ( + Schema = "k8s" +) + +type targetInfo struct { + serviceName string + serviceNamespace string + port string +} + +func (t targetInfo) String() string { + return fmt.Sprintf("%s:///%s.%s:%s", Schema, t.serviceNamespace, t.serviceName, t.port) +} + +// NewBuilder creates a kubeBuilder which is used by grpc resolver. +func NewBuilder(ctx context.Context, client kubernetes.Interface, schema string) resolver.Builder { + return &kubeBuilder{ + ctx: ctx, + k8sClient: client, + schema: schema, + } +} + +type kubeBuilder struct { + ctx context.Context + k8sClient kubernetes.Interface + schema string +} + +func splitServicePortNamespace(hpn string) (service, port, namespace string) { + service = hpn + + colon := strings.LastIndexByte(service, ':') + if colon != -1 { + service, port = service[:colon], service[colon+1:] + } + + parts := strings.SplitN(service, ".", 3) + if len(parts) >= 2 { + service, namespace = parts[0], parts[1] + } + + return +} + +func parseResolverTarget(target resolver.Target) (targetInfo, error) { + var service, port, namespace string + if target.URL.Host == "" { + // k8s:///service.namespace:port + service, port, namespace = splitServicePortNamespace(target.Endpoint()) + } else if target.URL.Port() == "" && target.Endpoint() != "" { + // k8s://namespace/service:port + service, port, _ = splitServicePortNamespace(target.Endpoint()) + namespace = target.URL.Hostname() + } else { + // k8s://service.namespace:port + service, port, namespace = splitServicePortNamespace(target.URL.Host) + } + + if service == "" { + return targetInfo{}, fmt.Errorf("target %s must specify a service", &target.URL) + } + + return targetInfo{ + serviceName: service, + serviceNamespace: namespace, + port: port, + }, nil +} + +// Build creates a new resolver for the given target, e.g. k8s:///flyteagent:flyte:8000. +func (b *kubeBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) { + ti, err := parseResolverTarget(target) + if err != nil { + return nil, err + } + + ctx, cancel := context.WithCancel(b.ctx) + r := &kResolver{ + target: ti, + ctx: ctx, + cancel: cancel, + cc: cc, + k8sClient: b.k8sClient, + } + go wait.Until(r.run, 0, ctx.Done()) + return r, nil +} + +// Scheme returns the scheme supported by this resolver. +// Scheme is defined at https://github.com/grpc/grpc/blob/master/doc/naming.md. +func (b *kubeBuilder) Scheme() string { + return b.schema +} + +type kResolver struct { + target targetInfo + ctx context.Context + cancel context.CancelFunc + cc resolver.ClientConn + k8sClient kubernetes.Interface + // wg is used to enforce Close() to return after the watcher() goroutine has finished. + wg sync.WaitGroup +} + +// ResolveNow is a no-op at this point. +func (k *kResolver) ResolveNow(resolver.ResolveNowOptions) {} + +// Close closes the resolver. +func (k *kResolver) Close() { + k.cancel() + k.wg.Wait() + logger.Infof(k.ctx, "k8s resolver: closed") +} + +func (k *kResolver) resolve(e *v1.Endpoints) { + var newAddrs []resolver.Address + for _, subset := range e.Subsets { + port := k.target.port + + for _, address := range subset.Addresses { + newAddrs = append(newAddrs, resolver.Address{ + Addr: net.JoinHostPort(address.IP, port), + ServerName: fmt.Sprintf("%s.%s", k.target.serviceName, k.target.serviceNamespace), + Metadata: nil, + }) + } + } + err := k.cc.UpdateState(resolver.State{Addresses: newAddrs}) + if err != nil { + grpclog.Errorf("k8s resolver: failed : %v", err) + } +} + +func (k *kResolver) run() { + k.wg.Add(1) + defer k.wg.Done() + logger.Infof(k.ctx, "Starting k8s resolver for target: %s", k.target) + watcher, err := k.k8sClient.CoreV1().Endpoints(k.target.serviceNamespace).Watch(k.ctx, metav1.ListOptions{FieldSelector: "metadata.name=" + k.target.serviceName}) + if err != nil { + grpclog.Errorf("k8s resolver: failed to create watcher: %v", err) + return + } + + for { + select { + case <-k.ctx.Done(): + return + case event, ok := <-watcher.ResultChan(): + if !ok { + logger.Debugf(k.ctx, "k8s resolver: watcher closed") + return + } + if event.Object == nil { + continue + } + k.resolve(event.Object.(*v1.Endpoints)) + } + } +} diff --git a/flytestdlib/resolver/k8s_resolver_test.go b/flytestdlib/resolver/k8s_resolver_test.go new file mode 100644 index 0000000000..18aa22c80a --- /dev/null +++ b/flytestdlib/resolver/k8s_resolver_test.go @@ -0,0 +1,177 @@ +package resolver + +import ( + "context" + "fmt" + "log" + "net/url" + "testing" + "time" + + "google.golang.org/grpc/resolver" + "google.golang.org/grpc/serviceconfig" + "gotest.tools/assert" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + testclient "k8s.io/client-go/kubernetes/fake" +) + +func parseTarget(target string) resolver.Target { + u, err := url.Parse(target) + if err != nil { + panic(err) + } + + return resolver.Target{ + URL: *u, + } +} + +type fakeConn struct { + cmp chan struct{} + found []string +} + +func (fc *fakeConn) UpdateState(state resolver.State) error { + for _, a := range state.Addresses { + fc.found = append(fc.found, a.Addr) + } + fc.cmp <- struct{}{} + return nil +} + +func (fc *fakeConn) ReportError(e error) { + log.Println(e) +} + +func (fc *fakeConn) ParseServiceConfig(_ string) *serviceconfig.ParseResult { + return nil +} + +func (fc *fakeConn) NewAddress(_ []resolver.Address) {} + +func (*fakeConn) NewServiceConfig(serviceConfig string) { + fmt.Printf("serviceConfig: %s\n", serviceConfig) +} + +func TestBuilder(t *testing.T) { + k8sClient := testclient.NewSimpleClientset() + builder := NewBuilder(context.Background(), k8sClient, "test") + fc := &fakeConn{ + cmp: make(chan struct{}), + } + k8sResolver, err := builder.Build(parseTarget("test://flyteagent.flyte.svc.cluster.local:8000"), fc, resolver.BuildOptions{}) + if err != nil { + t.Fatal(err) + } + + // Make sure watcher is started before we create the endpoint + time.Sleep(2 * time.Second) + + _, err = k8sClient.CoreV1().Endpoints("flyte").Create(context.Background(), &v1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "flyteagent", + Namespace: "flyte", + }, + Subsets: []v1.EndpointSubset{ + { + Addresses: []v1.EndpointAddress{ + { + IP: "10.0.0.1", + }, + }, + Ports: []v1.EndpointPort{ + { + Name: "grpc", + Port: 8000, + }, + }, + }, + }, + }, metav1.CreateOptions{}) + + <-fc.cmp + assert.NilError(t, err) + assert.Equal(t, len(fc.found), 1) + assert.Equal(t, fc.found[0], "10.0.0.1:8000") + + k8sResolver.Close() +} + +func TestParseResolverTargets(t *testing.T) { + for i, test := range []struct { + target string + want targetInfo + err bool + }{ + {"", targetInfo{}, true}, + {"k8s:///", targetInfo{}, true}, + {"k8s://a:30", targetInfo{"a", "", "30"}, false}, + {"k8s://a/", targetInfo{"a", "", ""}, false}, + {"k8s:///a", targetInfo{"a", "", ""}, false}, + {"k8s://a/b", targetInfo{"b", "a", ""}, false}, + {"k8s://a.b/", targetInfo{"a", "b", ""}, false}, + {"k8s:///a.b:80", targetInfo{"a", "b", "80"}, false}, + {"k8s:///a.b:port", targetInfo{"a", "b", "port"}, false}, + {"k8s:///a:port", targetInfo{"a", "", "port"}, false}, + {"k8s://x/a:port", targetInfo{"a", "x", "port"}, false}, + {"k8s://a.x:30/", targetInfo{"a", "x", "30"}, false}, + {"k8s://a.b.svc.cluster.local", targetInfo{"a", "b", ""}, false}, + {"k8s://a.b.svc.cluster.local:80", targetInfo{"a", "b", "80"}, false}, + {"k8s:///a.b.svc.cluster.local", targetInfo{"a", "b", ""}, false}, + {"k8s:///a.b.svc.cluster.local:80", targetInfo{"a", "b", "80"}, false}, + {"k8s:///a.b.svc.cluster.local:port", targetInfo{"a", "b", "port"}, false}, + } { + got, err := parseResolverTarget(parseTarget(test.target)) + if err == nil && test.err { + t.Errorf("case %d: want error but got nil", i) + continue + } + if err != nil && !test.err { + t.Errorf("case %d:got '%v' error but don't want an error", i, err) + continue + } + if got != test.want { + t.Errorf("case %d: parseTarget(%q) = %+v, want %+v", i, test.target, got, test.want) + } + } +} + +func TestParseTargets(t *testing.T) { + for i, test := range []struct { + target string + want targetInfo + err bool + }{ + {"", targetInfo{}, true}, + {"k8s:///", targetInfo{}, true}, + {"k8s://a:30", targetInfo{"a", "", "30"}, false}, + {"k8s://a/", targetInfo{"a", "", ""}, false}, + {"k8s:///a", targetInfo{"a", "", ""}, false}, + {"k8s://a/b", targetInfo{"b", "a", ""}, false}, + {"k8s://a.b/", targetInfo{"a", "b", ""}, false}, + {"k8s:///a.b:80", targetInfo{"a", "b", "80"}, false}, + {"k8s:///a.b:port", targetInfo{"a", "b", "port"}, false}, + {"k8s:///a:port", targetInfo{"a", "", "port"}, false}, + {"k8s://x/a:port", targetInfo{"a", "x", "port"}, false}, + {"k8s://a.x:30/", targetInfo{"a", "x", "30"}, false}, + {"k8s://a.b.svc.cluster.local", targetInfo{"a", "b", ""}, false}, + {"k8s://a.b.svc.cluster.local:80", targetInfo{"a", "b", "80"}, false}, + {"k8s:///a.b.svc.cluster.local", targetInfo{"a", "b", ""}, false}, + {"k8s:///a.b.svc.cluster.local:80", targetInfo{"a", "b", "80"}, false}, + {"k8s:///a.b.svc.cluster.local:port", targetInfo{"a", "b", "port"}, false}, + } { + got, err := parseResolverTarget(parseTarget(test.target)) + if err == nil && test.err { + t.Errorf("case %d: want error but got nil", i) + continue + } + if err != nil && !test.err { + t.Errorf("case %d:got '%v' error but don't want an error", i, err) + continue + } + if got != test.want { + t.Errorf("case %d: parseTarget(%q) = %+v, want %+v", i, test.target, got, test.want) + } + } +}