Skip to content

Commit

Permalink
backport of "Fix memory leak in service mirror" to 2.12 branch (#11345)
Browse files Browse the repository at this point in the history
* build(deps): NOT bump sigs.k8s.io/gateway-api from 0.5.1 to 0.6.0 (#10038)

* build(deps): bump sigs.k8s.io/gateway-api from 0.5.1 to 0.6.0

** NOTE **
This was cherry-picked from 62d6d7c in
order to acquire the `AddEventHandler` changes that went there. The
actualy gateway-api bump was discarded.

Bumps [sigs.k8s.io/gateway-api](https://github.com/kubernetes-sigs/gateway-api) from 0.5.1 to 0.6.0.
- [Release notes](https://github.com/kubernetes-sigs/gateway-api/releases)
- [Changelog](https://github.com/kubernetes-sigs/gateway-api/blob/main/CHANGELOG.md)
- [Commits](kubernetes-sigs/gateway-api@v0.5.1...v0.6.0)

---
updated-dependencies:
- dependency-name: sigs.k8s.io/gateway-api
  dependency-type: direct:production
  update-type: version-update:semver-minor
...


* Account for possible errors returned from `AddEventHandler`

* client-go v0.26.0 removed the openstack plugin

* Fix memory leak in service mirror (#10833)


* Bump go to 1.19 in go.mod

---------

Signed-off-by: dependabot[bot] <[email protected]>
Signed-off-by: tomasz.ziolkowski <[email protected]>
Signed-off-by: Alejandro Pedraza <[email protected]>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Oliver Gould <[email protected]>
Co-authored-by: ziollek <[email protected]>
  • Loading branch information
4 people authored Sep 8, 2023
1 parent 1b04e03 commit 6b86f18
Show file tree
Hide file tree
Showing 28 changed files with 258 additions and 273 deletions.
12 changes: 6 additions & 6 deletions .github/workflows/integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,9 @@ jobs:
runs-on: ubuntu-20.04
timeout-minutes: 15
steps:
- uses: actions/setup-go@268d8c0ca0432bb2cf416faae41297df9d262d7f
- uses: actions/setup-go@6edd4406fa81c3da01a34fa6f6343087c207a568
with:
go-version: '1.17'
go-version: '1.19'
- uses: actions/checkout@93ea575cb5d8a053eaa0ac8fa3b40d7e05a33cc8
- uses: actions/download-artifact@fb598a63ae348fa914e94cd0ff38f362e927b741
with:
Expand Down Expand Up @@ -262,9 +262,9 @@ jobs:
runs-on: ubuntu-20.04
timeout-minutes: 15
steps:
- uses: actions/setup-go@268d8c0ca0432bb2cf416faae41297df9d262d7f
- uses: actions/setup-go@6edd4406fa81c3da01a34fa6f6343087c207a568
with:
go-version: '1.17'
go-version: '1.19'
- uses: actions/checkout@93ea575cb5d8a053eaa0ac8fa3b40d7e05a33cc8
- uses: actions/download-artifact@fb598a63ae348fa914e94cd0ff38f362e927b741
with:
Expand Down Expand Up @@ -303,9 +303,9 @@ jobs:
runs-on: ubuntu-20.04
timeout-minutes: 15
steps:
- uses: actions/setup-go@268d8c0ca0432bb2cf416faae41297df9d262d7f
- uses: actions/setup-go@6edd4406fa81c3da01a34fa6f6343087c207a568
with:
go-version: '1.17'
go-version: '1.19'
- uses: actions/checkout@93ea575cb5d8a053eaa0ac8fa3b40d7e05a33cc8
- uses: actions/download-artifact@fb598a63ae348fa914e94cd0ff38f362e927b741
with:
Expand Down
12 changes: 6 additions & 6 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,9 @@ jobs:
steps:
- name: Checkout code
uses: actions/checkout@93ea575cb5d8a053eaa0ac8fa3b40d7e05a33cc8
- uses: actions/setup-go@268d8c0ca0432bb2cf416faae41297df9d262d7f
- uses: actions/setup-go@6edd4406fa81c3da01a34fa6f6343087c207a568
with:
go-version: '1.18'
go-version: '1.19'
- name: Download image archives
uses: actions/download-artifact@fb598a63ae348fa914e94cd0ff38f362e927b741
with:
Expand Down Expand Up @@ -178,9 +178,9 @@ jobs:
steps:
- name: Checkout code
uses: actions/checkout@93ea575cb5d8a053eaa0ac8fa3b40d7e05a33cc8
- uses: actions/setup-go@268d8c0ca0432bb2cf416faae41297df9d262d7f
- uses: actions/setup-go@6edd4406fa81c3da01a34fa6f6343087c207a568
with:
go-version: '1.18'
go-version: '1.19'
- name: Set environment variables from scripts
run: |
TAG='${{ needs.tag.outputs.tag }}'
Expand All @@ -202,9 +202,9 @@ jobs:
timeout-minutes: 30
steps:
- uses: actions/checkout@93ea575cb5d8a053eaa0ac8fa3b40d7e05a33cc8
- uses: actions/setup-go@c4a742cab115ed795e34d4513e2cf7d472deb55f
- uses: actions/setup-go@6edd4406fa81c3da01a34fa6f6343087c207a568
with:
go-version: '1.18'
go-version: '1.19'
- uses: docker/setup-buildx-action@94ab11c41e45d028884a99163086648e898eed25
- name: Pull linkerd binary
run: |
Expand Down
1 change: 0 additions & 1 deletion bin/install-deps
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ CGO_ENABLED=0 GOOS=linux GOARCH=$arch go install -mod=readonly \
k8s.io/client-go/plugin/pkg/client/auth/azure \
k8s.io/client-go/plugin/pkg/client/auth/gcp \
k8s.io/client-go/plugin/pkg/client/auth/oidc \
k8s.io/client-go/plugin/pkg/client/auth/openstack \
k8s.io/client-go/rest \
k8s.io/client-go/rest/watch \
k8s.io/client-go/tools/auth \
Expand Down
20 changes: 16 additions & 4 deletions controller/api/destination/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,22 @@ func NewServer(
return nil, err
}

endpoints := watcher.NewEndpointsWatcher(k8sAPI, log, enableEndpointSlices)
opaquePorts := watcher.NewOpaquePortsWatcher(k8sAPI, log, defaultOpaquePorts)
profiles := watcher.NewProfileWatcher(k8sAPI, log)
servers := watcher.NewServerWatcher(k8sAPI, log)
endpoints, err := watcher.NewEndpointsWatcher(k8sAPI, log, enableEndpointSlices)
if err != nil {
return nil, err
}
opaquePorts, err := watcher.NewOpaquePortsWatcher(k8sAPI, log, defaultOpaquePorts)
if err != nil {
return nil, err
}
profiles, err := watcher.NewProfileWatcher(k8sAPI, log)
if err != nil {
return nil, err
}
servers, err := watcher.NewServerWatcher(k8sAPI, log)
if err != nil {
return nil, err
}

srv := server{
pb.UnimplementedDestinationServer{},
Expand Down
20 changes: 16 additions & 4 deletions controller/api/destination/test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,10 +381,22 @@ spec:
t.Fatalf("initializeIndexers returned an error: %s", err)
}

endpoints := watcher.NewEndpointsWatcher(k8sAPI, log, false)
opaquePorts := watcher.NewOpaquePortsWatcher(k8sAPI, log, defaultOpaquePorts)
profiles := watcher.NewProfileWatcher(k8sAPI, log)
servers := watcher.NewServerWatcher(k8sAPI, log)
endpoints, err := watcher.NewEndpointsWatcher(k8sAPI, log, false)
if err != nil {
t.Fatalf("can't create Endpoints watcher: %s", err)
}
opaquePorts, err := watcher.NewOpaquePortsWatcher(k8sAPI, log, defaultOpaquePorts)
if err != nil {
t.Fatalf("can't create opaque ports watcher: %s", err)
}
profiles, err := watcher.NewProfileWatcher(k8sAPI, log)
if err != nil {
t.Fatalf("can't create profile watcher: %s", err)
}
servers, err := watcher.NewServerWatcher(k8sAPI, log)
if err != nil {
t.Fatalf("can't create Server watcher: %s", err)
}

// Sync after creating watchers so that the the indexers added get updated
// properly
Expand Down
24 changes: 18 additions & 6 deletions controller/api/destination/watcher/endpoints_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ var undefinedEndpointPort = Port(0)
// NewEndpointsWatcher creates an EndpointsWatcher and begins watching the
// k8sAPI for pod, service, and endpoint changes. An EndpointsWatcher will
// watch on Endpoints or EndpointSlice resources, depending on cluster configuration.
func NewEndpointsWatcher(k8sAPI *k8s.API, log *logging.Entry, enableEndpointSlices bool) *EndpointsWatcher {
func NewEndpointsWatcher(k8sAPI *k8s.API, log *logging.Entry, enableEndpointSlices bool) (*EndpointsWatcher, error) {
ew := &EndpointsWatcher{
publishers: make(map[ServiceID]*servicePublisher),
k8sAPI: k8sAPI,
Expand All @@ -144,34 +144,46 @@ func NewEndpointsWatcher(k8sAPI *k8s.API, log *logging.Entry, enableEndpointSlic
}),
}

k8sAPI.Svc().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
_, err := k8sAPI.Svc().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: ew.addService,
DeleteFunc: ew.deleteService,
UpdateFunc: func(_, obj interface{}) { ew.addService(obj) },
})
if err != nil {
return nil, err
}

k8sAPI.Srv().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
_, err = k8sAPI.Srv().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: ew.addServer,
DeleteFunc: ew.deleteServer,
UpdateFunc: func(_, obj interface{}) { ew.addServer(obj) },
})
if err != nil {
return nil, err
}

if ew.enableEndpointSlices {
ew.log.Debugf("Watching EndpointSlice resources")
k8sAPI.ES().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
_, err := k8sAPI.ES().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: ew.addEndpointSlice,
DeleteFunc: ew.deleteEndpointSlice,
UpdateFunc: ew.updateEndpointSlice,
})
if err != nil {
return nil, err
}
} else {
ew.log.Debugf("Watching Endpoints resources")
k8sAPI.Endpoint().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
_, err = k8sAPI.Endpoint().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: ew.addEndpoints,
DeleteFunc: ew.deleteEndpoints,
UpdateFunc: func(_, obj interface{}) { ew.addEndpoints(obj) },
})
if err != nil {
return nil, err
}
}
return ew
return ew, nil
}

////////////////////////
Expand Down
40 changes: 32 additions & 8 deletions controller/api/destination/watcher/endpoints_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -665,7 +665,10 @@ status:
t.Fatalf("NewFakeAPI returned an error: %s", err)
}

watcher := NewEndpointsWatcher(k8sAPI, logging.WithField("test", t.Name()), false)
watcher, err := NewEndpointsWatcher(k8sAPI, logging.WithField("test", t.Name()), false)
if err != nil {
t.Fatalf("can't create Endpoints watcher: %s", err)
}

k8sAPI.Sync(nil)

Expand Down Expand Up @@ -1281,7 +1284,10 @@ status:
t.Fatalf("NewFakeAPI returned an error: %s", err)
}

watcher := NewEndpointsWatcher(k8sAPI, logging.WithField("test", t.Name()), true)
watcher, err := NewEndpointsWatcher(k8sAPI, logging.WithField("test", t.Name()), true)
if err != nil {
t.Fatalf("can't create Endpoints watcher: %s", err)
}

k8sAPI.Sync(nil)

Expand Down Expand Up @@ -1398,7 +1404,10 @@ status:
t.Fatalf("NewFakeAPI returned an error: %s", err)
}

watcher := NewEndpointsWatcher(k8sAPI, logging.WithField("test", t.Name()), false)
watcher, err := NewEndpointsWatcher(k8sAPI, logging.WithField("test", t.Name()), false)
if err != nil {
t.Fatalf("can't create Endpoints watcher: %s", err)
}

k8sAPI.Sync(nil)

Expand Down Expand Up @@ -1519,7 +1528,10 @@ status:
t.Fatalf("NewFakeAPI returned an error: %s", err)
}

watcher := NewEndpointsWatcher(k8sAPI, logging.WithField("test", t.Name()), true)
watcher, err := NewEndpointsWatcher(k8sAPI, logging.WithField("test", t.Name()), true)
if err != nil {
t.Fatalf("can't create Endpoints watcher: %s", err)
}

k8sAPI.Sync(nil)

Expand Down Expand Up @@ -1746,7 +1758,10 @@ subsets:
t.Fatalf("NewFakeAPI returned an error: %s", err)
}

watcher := NewEndpointsWatcher(k8sAPI, logging.WithField("test", t.Name()), tt.enableEndpointSlices)
watcher, err := NewEndpointsWatcher(k8sAPI, logging.WithField("test", t.Name()), tt.enableEndpointSlices)
if err != nil {
t.Fatalf("can't create Endpoints watcher: %s", err)
}

k8sAPI.Sync(nil)

Expand Down Expand Up @@ -1906,7 +1921,10 @@ subsets:
t.Fatalf("NewFakeAPI returned an error: %s", err)
}

watcher := NewEndpointsWatcher(k8sAPI, logging.WithField("test", t.Name()), false)
watcher, err := NewEndpointsWatcher(k8sAPI, logging.WithField("test", t.Name()), false)
if err != nil {
t.Fatalf("can't create Endpoints watcher: %s", err)
}

k8sAPI.Sync(nil)

Expand Down Expand Up @@ -2029,7 +2047,10 @@ status:
t.Fatalf("NewFakeAPI returned an error: %s", err)
}

watcher := NewEndpointsWatcher(k8sAPI, logging.WithField("test", t.Name()), false)
watcher, err := NewEndpointsWatcher(k8sAPI, logging.WithField("test", t.Name()), false)
if err != nil {
t.Fatalf("can't create Endpoints watcher: %s", err)
}

k8sAPI.Sync(nil)

Expand Down Expand Up @@ -2122,7 +2143,10 @@ status:
t.Fatalf("NewFakeAPI returned an error: %s", err)
}

watcher := NewEndpointsWatcher(k8sAPI, logging.WithField("test", t.Name()), true)
watcher, err := NewEndpointsWatcher(k8sAPI, logging.WithField("test", t.Name()), true)
if err != nil {
t.Fatalf("can't create Endpoints watcher: %s", err)
}

k8sAPI.Sync(nil)

Expand Down
10 changes: 7 additions & 3 deletions controller/api/destination/watcher/opaque_ports_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,23 @@ type (

// NewOpaquePortsWatcher creates a OpaquePortsWatcher and begins watching for
// k8sAPI for service changes.
func NewOpaquePortsWatcher(k8sAPI *k8s.API, log *logging.Entry, opaquePorts map[uint32]struct{}) *OpaquePortsWatcher {
func NewOpaquePortsWatcher(k8sAPI *k8s.API, log *logging.Entry, opaquePorts map[uint32]struct{}) (*OpaquePortsWatcher, error) {
opw := &OpaquePortsWatcher{
subscriptions: make(map[ServiceID]*svcSubscriptions),
k8sAPI: k8sAPI,
log: log.WithField("component", "opaque-ports-watcher"),
defaultOpaquePorts: opaquePorts,
}
k8sAPI.Svc().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
_, err := k8sAPI.Svc().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: opw.addService,
DeleteFunc: opw.deleteService,
UpdateFunc: func(_, obj interface{}) { opw.addService(obj) },
})
return opw
if err != nil {
return nil, err
}

return opw, nil
}

// Subscribe subscribes a listener to a service; each time the service
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,10 @@ func TestOpaquePortsWatcher(t *testing.T) {
if err != nil {
t.Fatalf("NewFakeAPI returned an error: %s", err)
}
watcher := NewOpaquePortsWatcher(k8sAPI, logging.WithField("test", t.Name()), defaultOpaquePorts)
watcher, err := NewOpaquePortsWatcher(k8sAPI, logging.WithField("test", t.Name()), defaultOpaquePorts)
if err != nil {
t.Fatalf("can't create opaque ports watcher: %s", err)
}
k8sAPI.Sync(nil)
listener := newTestOpaquePortsListener()
watcher.Subscribe(tt.service, listener)
Expand Down
9 changes: 6 additions & 3 deletions controller/api/destination/watcher/profile_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,22 +44,25 @@ var profileVecs = newMetricsVecs("profile", []string{"namespace", "profile"})

// NewProfileWatcher creates a ProfileWatcher and begins watching the k8sAPI for
// service profile changes.
func NewProfileWatcher(k8sAPI *k8s.API, log *logging.Entry) *ProfileWatcher {
func NewProfileWatcher(k8sAPI *k8s.API, log *logging.Entry) (*ProfileWatcher, error) {
watcher := &ProfileWatcher{
profileLister: k8sAPI.SP().Lister(),
profiles: make(map[ProfileID]*profilePublisher),
log: log.WithField("component", "profile-watcher"),
}

k8sAPI.SP().Informer().AddEventHandler(
_, err := k8sAPI.SP().Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: watcher.addProfile,
UpdateFunc: watcher.updateProfile,
DeleteFunc: watcher.deleteProfile,
},
)
if err != nil {
return nil, err
}

return watcher
return watcher, nil
}

//////////////////////
Expand Down
10 changes: 8 additions & 2 deletions controller/api/destination/watcher/profile_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,10 @@ func TestProfileWatcherUpdates(t *testing.T) {
t.Fatalf("NewFakeAPI returned an error: %s", err)
}

watcher := NewProfileWatcher(k8sAPI, logging.WithField("test", t.Name()))
watcher, err := NewProfileWatcher(k8sAPI, logging.WithField("test", t.Name()))
if err != nil {
t.Fatalf("can't create profile watcher: %s", err)
}

k8sAPI.Sync(nil)

Expand Down Expand Up @@ -136,7 +139,10 @@ func TestProfileWatcherDeletes(t *testing.T) {
t.Fatalf("NewFakeAPI returned an error: %s", err)
}

watcher := NewProfileWatcher(k8sAPI, logging.WithField("test", t.Name()))
watcher, err := NewProfileWatcher(k8sAPI, logging.WithField("test", t.Name()))
if err != nil {
t.Fatalf("can't create profile watcher: %s", err)
}
k8sAPI.Sync(nil)

listener := NewDeletingProfileListener()
Expand Down
Loading

0 comments on commit 6b86f18

Please sign in to comment.