Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

K8s resolver for the gRPC client #4190

Merged
merged 18 commits into from
Oct 19, 2023
4 changes: 4 additions & 0 deletions flytepropeller/pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"github.com/prometheus/client_golang/prometheus"
"google.golang.org/grpc"
_ "google.golang.org/grpc/balancer/roundrobin" //nolint
"google.golang.org/grpc/resolver"
apiextensionsclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
apierrors "k8s.io/apimachinery/pkg/api/errors"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -54,6 +55,7 @@
"github.com/flyteorg/flyte/flytestdlib/logger"
"github.com/flyteorg/flyte/flytestdlib/promutils"
"github.com/flyteorg/flyte/flytestdlib/promutils/labeled"
resolver2 "github.com/flyteorg/flyte/flytestdlib/resolver"
pingsutw marked this conversation as resolved.
Show resolved Hide resolved
"github.com/flyteorg/flyte/flytestdlib/storage"
)

Expand Down Expand Up @@ -552,6 +554,8 @@
return errors.Wrapf(err, "error building Kubernetes Clientset")
}

resolver.Register(resolver2.NewBuilder(kubeClient, resolver2.K8sSchema))

Check warning on line 558 in flytepropeller/pkg/controller/controller.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/controller.go#L557-L558

Added lines #L557 - L558 were not covered by tests
flyteworkflowClient, err := clientset.NewForConfig(kubecfg)
if err != nil {
return errors.Wrapf(err, "error building FlyteWorkflow clientset")
Expand Down
7 changes: 7 additions & 0 deletions flytestdlib/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ require (
google.golang.org/grpc v1.56.1
google.golang.org/protobuf v1.30.0
gorm.io/gorm v1.22.4
gotest.tools v2.2.0+incompatible
k8s.io/api v0.20.2
k8s.io/apimachinery v0.20.2
k8s.io/client-go v0.0.0-20210217172142-7279fc64d847
Expand All @@ -54,6 +55,7 @@ require (
github.com/cespare/xxhash v1.1.0 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/evanphx/json-patch v4.9.0+incompatible // indirect
github.com/go-logr/logr v0.4.0 // indirect
github.com/gofrs/uuid v4.2.0+incompatible // indirect
github.com/gogo/protobuf v1.3.2 // indirect
Expand All @@ -64,6 +66,7 @@ require (
github.com/google/uuid v1.3.0 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.2.3 // indirect
github.com/googleapis/gax-go/v2 v2.7.1 // indirect
github.com/googleapis/gnostic v0.4.1 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
Expand Down Expand Up @@ -92,6 +95,7 @@ require (
golang.org/x/net v0.9.0 // indirect
golang.org/x/oauth2 v0.7.0 // indirect
golang.org/x/sys v0.7.0 // indirect
golang.org/x/term v0.7.0 // indirect
golang.org/x/text v0.9.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/api v0.114.0 // indirect
Expand All @@ -102,7 +106,10 @@ require (
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/klog/v2 v2.5.0 // indirect
k8s.io/kube-openapi v0.0.0-20201113171705-d219536bb9fd // indirect
k8s.io/utils v0.0.0-20201110183641-67b214c5f920 // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.0.3 // indirect
sigs.k8s.io/yaml v1.2.0 // indirect
)

replace (
Expand Down
13 changes: 13 additions & 0 deletions flytestdlib/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.m
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/ernesto-jimenez/gogen v0.0.0-20180125220232-d7d4131e6607 h1:cTavhURetDkezJCvxFggiyLeP40Mrk/TtVg2+ycw1Es=
github.com/ernesto-jimenez/gogen v0.0.0-20180125220232-d7d4131e6607/go.mod h1:Cg4fM0vhYWOZdgM7RIOSTRNIc8/VT7CXClC3Ni86lu4=
github.com/evanphx/json-patch v4.9.0+incompatible h1:kLcOMZeuLAJvL2BPWLMIj5oaZQobrkAqrL+WFZwQses=
github.com/evanphx/json-patch v4.9.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w=
github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk=
Expand Down Expand Up @@ -257,6 +258,7 @@ github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
github.com/googleapis/gax-go/v2 v2.7.1 h1:gF4c0zjUP2H/s/hEGyLA3I0fA2ZWjzYiONAD6cvPr8A=
github.com/googleapis/gax-go/v2 v2.7.1/go.mod h1:4orTrqY6hXxxaUL4LHIPl6lGo8vAE38/qKbhSAKP6QI=
github.com/googleapis/gnostic v0.4.1 h1:DLJCy1n/vrD4HPjOvYcT8aYQXpPIzoRZONaYwyycI+I=
github.com/googleapis/gnostic v0.4.1/go.mod h1:LRhVm6pbyptWbWbuZ38d1eyptfvIytN3ir6b65WBswg=
github.com/googleapis/google-cloud-go-testing v0.0.0-20200911160855-bcd43fbb19e8/go.mod h1:dvDLG8qkwmyD9a/MJJN3XJcT3xFxOKAvTZGvuZmac9g=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
Expand All @@ -267,6 +269,7 @@ github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+l
github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
Expand Down Expand Up @@ -338,8 +341,10 @@ github.com/ncw/swift v1.0.53 h1:luHjjTNtekIEvHg5KdAFIBaH7bWfNkefwFnpDffSIks=
github.com/ncw/swift v1.0.53/go.mod h1:23YIA4yWVnGwv2dQlN4bB7egfYX6YLn0Yo/S6zZO/ZM=
github.com/onsi/ginkgo v0.0.0-20170829012221-11459a886d9c/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.11.0 h1:JAKSXpt1YjtLA7YpPiqO9ss6sNXEsPfSGdwN0UHqzrw=
github.com/onsi/ginkgo v1.11.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v0.0.0-20170829124025-dcabb60a477c/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA=
github.com/onsi/gomega v1.7.0 h1:XPnZz8VVBHjVsy1vzJmRwIcSwiUO+JFfrv/xGiigmME=
github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/pelletier/go-toml v1.9.4 h1:tjENF6MfZAg8e4ZmZTeWaWiT2vXtsoO6+iuOjFhECwM=
github.com/pelletier/go-toml v1.9.4/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c=
Expand Down Expand Up @@ -601,6 +606,8 @@ golang.org/x/sys v0.7.0 h1:3jlCCIQZPdOYu1h8BkNvLz8Kgwtae2cagcG/VamtZRU=
golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.7.0 h1:BEvjmm5fURWqcfbSKTdpkDXYBrUS1c0m8agp14W48vQ=
golang.org/x/term v0.7.0/go.mod h1:P32HKFT3hSsZrRxla30E9HqToFYAQPCMs/zFMBUFqPY=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
Expand Down Expand Up @@ -781,11 +788,13 @@ gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc=
gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
gopkg.in/ini.v1 v1.66.4 h1:SsAcf+mM7mRZo2nJNGt8mZCjG8ZRaNGMURJw7BsIST4=
gopkg.in/ini.v1 v1.66.4/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
Expand All @@ -800,6 +809,8 @@ gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gorm.io/gorm v1.22.4 h1:8aPcyEJhY0MAt8aY6Dc524Pn+pO29K+ydu+e/cXSpQM=
gorm.io/gorm v1.22.4/go.mod h1:1aeVC+pe9ZmvKZban/gW4QPra7PRoTEssyc922qCAkk=
gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo=
gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
Expand All @@ -820,7 +831,9 @@ k8s.io/klog/v2 v2.0.0/go.mod h1:PBfzABfn139FHAV07az/IF9Wp1bkk3vpT2XSJ76fSDE=
k8s.io/klog/v2 v2.4.0/go.mod h1:Od+F08eJP+W3HUb4pSrPpgp9DGU4GzlpG/TmITuYh/Y=
k8s.io/klog/v2 v2.5.0 h1:8mOnjf1RmUPW6KRqQCfYSZq/K20Unmp3IhuZUhxl8KI=
k8s.io/klog/v2 v2.5.0/go.mod h1:hy9LJ/NvuK+iVyP4Ehqva4HxZG/oXyIS3n3Jmire4Ec=
k8s.io/kube-openapi v0.0.0-20201113171705-d219536bb9fd h1:sOHNzJIkytDF6qadMNKhhDRpc6ODik8lVC6nOur7B2c=
k8s.io/kube-openapi v0.0.0-20201113171705-d219536bb9fd/go.mod h1:WOJ3KddDSol4tAGcJo0Tvi+dK12EcqSLqcWsryKMpfM=
k8s.io/utils v0.0.0-20201110183641-67b214c5f920 h1:CbnUZsM497iRC5QMVkHwyl8s2tB3g7yaSHkYPkpgelw=
k8s.io/utils v0.0.0-20201110183641-67b214c5f920/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
Expand Down
177 changes: 177 additions & 0 deletions flytestdlib/resolver/k8s_resolver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
package resolver

import (
"context"
"fmt"
"net"
"strings"
"sync"

"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/resolver"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"

"github.com/flyteorg/flyte/flytestdlib/logger"
)

const (
K8sSchema = "k8s"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I think this is called scheme.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

)

type targetInfo struct {
serviceName string
serviceNamespace string
port string
}

func (t targetInfo) String() string {
return fmt.Sprintf("kubernetes:///%s.%s:%s", t.serviceNamespace, t.serviceName, t.port)
pingsutw marked this conversation as resolved.
Show resolved Hide resolved
}

// NewBuilder creates a kubeBuilder which is used by grpc resolver.
func NewBuilder(client kubernetes.Interface, schema string) resolver.Builder {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want to pass ctx here and use it in .Build() ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated it

return &kubeBuilder{
k8sClient: client,
schema: schema,
}
}

type kubeBuilder struct {
k8sClient kubernetes.Interface
schema string
}

func splitServicePortNamespace(hpn string) (service, port, namespace string) {
service = hpn

colon := strings.LastIndexByte(service, ':')
if colon != -1 {
service, port = service[:colon], service[colon+1:]
}

parts := strings.SplitN(service, ".", 3)
if len(parts) >= 2 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we set namespace to default if parts==1 ?

Copy link
Member Author

@pingsutw pingsutw Oct 17, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it will use the namespace where agent is at if the namespace is None

service, namespace = parts[0], parts[1]
}

return
}

func parseResolverTarget(target resolver.Target) (targetInfo, error) {
var service, port, namespace string
if target.URL.Host == "" {
// kubernetes:///service.namespace:port
pingsutw marked this conversation as resolved.
Show resolved Hide resolved
service, port, namespace = splitServicePortNamespace(target.Endpoint())
} else if target.URL.Port() == "" && target.Endpoint() != "" {
// kubernetes://namespace/service:port
pingsutw marked this conversation as resolved.
Show resolved Hide resolved
service, port, _ = splitServicePortNamespace(target.Endpoint())
namespace = target.URL.Hostname()
} else {
// kubernetes://service.namespace:port
pingsutw marked this conversation as resolved.
Show resolved Hide resolved
service, port, namespace = splitServicePortNamespace(target.URL.Host)
}

if service == "" {
return targetInfo{}, fmt.Errorf("target %s must specify a service", &target.URL)
}

return targetInfo{
serviceName: service,
serviceNamespace: namespace,
port: port,
}, nil
}

// Build creates a new resolver for the given target, e.g. kubernetes:///flyteagent:flyte:8000.
func (b *kubeBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
ti, err := parseResolverTarget(target)
if err != nil {
return nil, err
}

Check warning on line 93 in flytestdlib/resolver/k8s_resolver.go

View check run for this annotation

Codecov / codecov/patch

flytestdlib/resolver/k8s_resolver.go#L92-L93

Added lines #L92 - L93 were not covered by tests

ctx, cancel := context.WithCancel(context.Background())
r := &kResolver{
target: ti,
ctx: ctx,
cancel: cancel,
cc: cc,
k8sClient: b.k8sClient,
}
go wait.Until(r.run, 0, ctx.Done())
return r, nil
}

// Scheme returns the scheme supported by this resolver.
// Scheme is defined at https://github.com/grpc/grpc/blob/master/doc/naming.md.
func (b *kubeBuilder) Scheme() string {
return b.schema

Check warning on line 110 in flytestdlib/resolver/k8s_resolver.go

View check run for this annotation

Codecov / codecov/patch

flytestdlib/resolver/k8s_resolver.go#L109-L110

Added lines #L109 - L110 were not covered by tests
}

type kResolver struct {
target targetInfo
ctx context.Context
cancel context.CancelFunc
cc resolver.ClientConn
k8sClient kubernetes.Interface
// wg is used to enforce Close() to return after the watcher() goroutine has finished.
wg sync.WaitGroup
}

// ResolveNow is a no-op at this point.
func (k *kResolver) ResolveNow(resolver.ResolveNowOptions) {}

Check warning on line 124 in flytestdlib/resolver/k8s_resolver.go

View check run for this annotation

Codecov / codecov/patch

flytestdlib/resolver/k8s_resolver.go#L124

Added line #L124 was not covered by tests

// Close closes the resolver.
func (k *kResolver) Close() {
k.cancel()
k.wg.Wait()
logger.Infof(k.ctx, "k8s resolver: closed")
}

func (k *kResolver) resolve(e *v1.Endpoints) {
var newAddrs []resolver.Address
for _, subset := range e.Subsets {
port := k.target.port

for _, address := range subset.Addresses {
newAddrs = append(newAddrs, resolver.Address{
Addr: net.JoinHostPort(address.IP, port),
ServerName: fmt.Sprintf("%s.%s", k.target.serviceName, k.target.serviceNamespace),
Metadata: nil,
})
}
}
err := k.cc.UpdateState(resolver.State{Addresses: newAddrs})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is cc.UpdateState thread safe? do we need to lock its usage in someway?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if err != nil {
grpclog.Errorf("k8s resolver: failed : %v", err)
}

Check warning on line 149 in flytestdlib/resolver/k8s_resolver.go

View check run for this annotation

Codecov / codecov/patch

flytestdlib/resolver/k8s_resolver.go#L148-L149

Added lines #L148 - L149 were not covered by tests
}

func (k *kResolver) run() {
k.wg.Add(1)
defer k.wg.Done()
logger.Infof(k.ctx, "Starting k8s resolver for target: %s", k.target)
watcher, err := k.k8sClient.CoreV1().Endpoints(k.target.serviceNamespace).Watch(k.ctx, metav1.ListOptions{FieldSelector: "metadata.name=" + k.target.serviceName})
if err != nil {
grpclog.Errorf("k8s resolver: failed to create watcher: %v", err)
return
}

Check warning on line 160 in flytestdlib/resolver/k8s_resolver.go

View check run for this annotation

Codecov / codecov/patch

flytestdlib/resolver/k8s_resolver.go#L158-L160

Added lines #L158 - L160 were not covered by tests

for {
select {
case <-k.ctx.Done():
return
case event, ok := <-watcher.ResultChan():
if !ok {
logger.Debugf(k.ctx, "k8s resolver: watcher closed")
return
}

Check warning on line 170 in flytestdlib/resolver/k8s_resolver.go

View check run for this annotation

Codecov / codecov/patch

flytestdlib/resolver/k8s_resolver.go#L168-L170

Added lines #L168 - L170 were not covered by tests
if event.Object == nil {
continue

Check warning on line 172 in flytestdlib/resolver/k8s_resolver.go

View check run for this annotation

Codecov / codecov/patch

flytestdlib/resolver/k8s_resolver.go#L172

Added line #L172 was not covered by tests
}
k.resolve(event.Object.(*v1.Endpoints))
}
}
}
Loading
Loading