Skip to content

Commit

Permalink
Compare revision
Browse files Browse the repository at this point in the history
  • Loading branch information
tomleb committed Sep 17, 2024
1 parent b02a6e1 commit 3625f20
Show file tree
Hide file tree
Showing 7 changed files with 45 additions and 24 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.22.0

toolchain go1.22.7

replace github.com/rancher/lasso => github.com/tomleb/rancher-lasso v0.0.0-20240906163555-e567f1ea2d53
replace github.com/rancher/lasso => github.com/tomleb/rancher-lasso v0.0.0-20240916204456-7d2f79a45cda

replace (
github.com/crewjam/saml => github.com/rancher/saml v0.2.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,8 @@ github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/tomleb/rancher-lasso v0.0.0-20240906163555-e567f1ea2d53 h1:2PYvKjEB664bZn/+1ztEIMpvyDtuj6bUvQvpVva54NU=
github.com/tomleb/rancher-lasso v0.0.0-20240906163555-e567f1ea2d53/go.mod h1:Efx/+BbH3ivmnTPLu5cA3Gc9wT5oyGS0LBcqEuYTx+A=
github.com/tomleb/rancher-lasso v0.0.0-20240916204456-7d2f79a45cda h1:kzbjaj+1ccTE5242HLlGdab7h8TIs0GqbH5jgX6Um94=
github.com/tomleb/rancher-lasso v0.0.0-20240916204456-7d2f79a45cda/go.mod h1:Efx/+BbH3ivmnTPLu5cA3Gc9wT5oyGS0LBcqEuYTx+A=
github.com/urfave/cli v1.22.14 h1:ebbhrRiGK2i4naQJr+1Xj92HXZCrK7MsyTS/ob3HnAk=
github.com/urfave/cli v1.22.14/go.mod h1:X0eDS6pD6Exaclxm99NJ3FiCDRED7vIHpx2mDOHLvkA=
github.com/urfave/cli/v2 v2.27.4 h1:o1owoI+02Eb+K107p27wEX9Bb8eqIoZCfLXloLUSWJ8=
Expand Down
9 changes: 4 additions & 5 deletions pkg/stores/sqlpartition/partition_mocks_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/stores/sqlpartition/partitioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type UnstructuredStore interface {
Update(apiOp *types.APIRequest, schema *types.APISchema, data types.APIObject, id string) (*unstructured.Unstructured, []types.Warning, error)
Delete(apiOp *types.APIRequest, schema *types.APISchema, id string) (*unstructured.Unstructured, []types.Warning, error)

ListByPartitions(apiOp *types.APIRequest, schema *types.APISchema, partitions []partition.Partition) ([]unstructured.Unstructured, int, string, error)
ListByPartitions(apiOp *types.APIRequest, schema *types.APISchema, partitions []partition.Partition) (*unstructured.UnstructuredList, int, string, error)
WatchByPartitions(apiOp *types.APIRequest, schema *types.APISchema, wr types.WatchRequest, partitions []partition.Partition) (chan struct{}, error)
}

Expand Down
5 changes: 3 additions & 2 deletions pkg/stores/sqlpartition/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,13 @@ func (s *Store) List(apiOp *types.APIRequest, schema *types.APISchema) (types.AP

result.Count = total

for _, item := range list {
for _, item := range list.Items {
item := item.DeepCopy()
// the sql cache automatically adds the ID through a transformFunc. Because of this, we have a different set of reserved fields for the SQL cache
result.Objects = append(result.Objects, partition.ToAPI(schema, item, nil, s.sqlReservedFields))
}

result.Revision = ""
result.Revision = list.GetResourceVersion()
result.Continue = continueToken
return result, nil
}
Expand Down Expand Up @@ -145,6 +145,7 @@ func (s *Store) Watch(apiOp *types.APIRequest, schema *types.APISchema, wr types

for range c {
response <- types.APIEvent{
Name: "resource.changes",
ResourceType: schema.ID,
}
}
Expand Down
19 changes: 16 additions & 3 deletions pkg/stores/sqlproxy/proxy_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"io"
"io/ioutil"
"net/http"
"strconv"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -507,7 +508,7 @@ func (s *Store) CacheFor(client dynamic.ResourceInterface, schema *types.APISche
// - the total number of resources (returned list might be a subset depending on pagination options in apiOp)
// - a continue token, if there are more pages after the returned one
// - an error instead of all of the above if anything went wrong
func (s *Store) ListByPartitions(apiOp *types.APIRequest, schema *types.APISchema, partitions []partition.Partition) ([]unstructured.Unstructured, int, string, error) {
func (s *Store) ListByPartitions(apiOp *types.APIRequest, schema *types.APISchema, partitions []partition.Partition) (*unstructured.UnstructuredList, int, string, error) {
opts, err := listprocessor.ParseQuery(apiOp, s.namespaceCache)
if err != nil {
return nil, 0, "", err
Expand All @@ -532,13 +533,22 @@ func (s *Store) ListByPartitions(apiOp *types.APIRequest, schema *types.APISchem
return nil, 0, "", err
}

return list.Items, total, continueToken, nil
return list, total, continueToken, nil
}

// WatchByPartitions returns a channel of events for a list or resource belonging to any of the specified partitions
func (s *Store) WatchByPartitions(apiOp *types.APIRequest, schema *types.APISchema, wr types.WatchRequest, partitions []partition.Partition) (chan struct{}, error) {
ctx := apiOp.Context()

revision := 0
if wr.Revision != "" {
parsedRevision, err := strconv.ParseInt(wr.Revision, 10, 64)
if err != nil {
return nil, fmt.Errorf("invalid revision %q: %w", wr.Revision, err)
}
revision = int(parsedRevision)
}

// XXX: Why was this needed at all??
apiOp = apiOp.Clone().WithContext(ctx)

Expand All @@ -555,7 +565,10 @@ func (s *Store) WatchByPartitions(apiOp *types.APIRequest, schema *types.APISche
}

debounceListener := newDebounceListener(5 * time.Second)
inf.Watch(ctx, debounceListener)
latestRevision := inf.Watch(ctx, debounceListener)
if latestRevision > revision {
debounceListener.NotifyNow()
}
go debounceListener.Run(ctx)

return debounceListener.ch, nil
Expand Down
28 changes: 18 additions & 10 deletions pkg/stores/sqlproxy/watchers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package sqlproxy

import (
"context"
"sync/atomic"
"sync"
"time"

"github.com/rancher/lasso/pkg/cache/sql/informer"
Expand All @@ -11,8 +11,10 @@ import (
var _ informer.Listener = (*debounceListener)(nil)

type debounceListener struct {
lock sync.Mutex
notified bool

debounceRate time.Duration
notified atomic.Bool
ch chan struct{}
}

Expand All @@ -25,26 +27,32 @@ func newDebounceListener(debounceRate time.Duration) *debounceListener {
}

func (d *debounceListener) Run(ctx context.Context) {
d.ch <- struct{}{}
ticker := time.NewTicker(d.debounceRate)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
close(d.ch)
return
case <-ticker.C:
if !d.notified.Swap(false) {
continue
d.lock.Lock()
if d.notified {
d.ch <- struct{}{}
}
d.ch <- struct{}{}
d.notified = false
d.lock.Unlock()
}
}
}

func (d *debounceListener) Notify() {
d.notified.Store(true)
func (d *debounceListener) NotifyNow() {
d.lock.Lock()
defer d.lock.Unlock()
d.ch <- struct{}{}
}

func (d *debounceListener) Close() {
close(d.ch)
func (d *debounceListener) Notify() {
d.lock.Lock()
defer d.lock.Unlock()
d.notified = true
}

0 comments on commit 3625f20

Please sign in to comment.