Skip to content

Commit

Permalink
Update the item only if it exists in the cache
Browse files Browse the repository at this point in the history
Signed-off-by: Kevin Su <[email protected]>
  • Loading branch information
pingsutw committed Oct 3, 2023
1 parent 368f997 commit a130ff6
Showing 1 changed file with 46 additions and 24 deletions.
70 changes: 46 additions & 24 deletions flytestdlib/cache/auto_refresh.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cache
import (
"context"
"fmt"
"sync"
"time"

"github.com/flyteorg/flyte/flytestdlib/contextutils"
Expand Down Expand Up @@ -122,6 +123,7 @@ type autoRefresh struct {
syncPeriod time.Duration
workqueue workqueue.RateLimitingInterface
parallelizm int
lock sync.RWMutex
}

func getEvictionFunction(counter prometheus.Counter) func(key interface{}, value interface{}) {
Expand Down Expand Up @@ -173,6 +175,24 @@ func (w *autoRefresh) Start(ctx context.Context) error {
return nil
}

// Update updates the item in the cache only if it exists, return true if we updated the item.
func (w *autoRefresh) Update(id ItemID, item Item) (ok bool) {
w.lock.Lock()
ok = w.lruMap.Contains(id)
if ok {
w.lruMap.Add(id, item)
}
w.lock.Unlock()
return ok
}

func (w *autoRefresh) Delete(key interface{}) {
w.lock.Lock()
w.toDelete.Remove(key)
w.lruMap.Remove(key)
w.lock.Unlock()
}

func (w *autoRefresh) Get(id ItemID) (Item, error) {
if val, ok := w.lruMap.Get(id); ok {
w.metrics.CacheHit.Inc()
Expand Down Expand Up @@ -212,8 +232,7 @@ func (w *autoRefresh) enqueueBatches(ctx context.Context) error {
snapshot := make([]ItemWrapper, 0, len(keys))
for _, k := range keys {
if w.toDelete.Contains(k) {
w.lruMap.Remove(k)
w.toDelete.Remove(k)
w.Delete(k)
continue
}
// If not ok, it means evicted between the item was evicted between getting the keys and this update loop
Expand All @@ -234,10 +253,8 @@ func (w *autoRefresh) enqueueBatches(ctx context.Context) error {
}

for _, batch := range batches {
for _, b := range batch {
logger.Debugf(ctx, "Enqueuing batch with id: %v", b.GetID())
w.workqueue.Add(b.GetID())
}
b := batch
w.workqueue.Add(&b)
}

return nil
Expand Down Expand Up @@ -275,29 +292,35 @@ func (w *autoRefresh) sync(ctx context.Context) (err error) {
case <-ctx.Done():
return nil
default:
itemID, shutdown := w.workqueue.Get()
item, shutdown := w.workqueue.Get()
if shutdown {
logger.Debugf(ctx, "Shutting down worker")
return nil
}

// Since we create batches every time we sync, we will just remove the item from the queue here
// regardless of whether it succeeded the sync or not.
w.workqueue.Forget(item)
w.workqueue.Done(item)

t := w.metrics.SyncLatency.Start()
logger.Debugf(ctx, "Syncing item with id [%v]", itemID)
item, ok := w.lruMap.Get(itemID)
if !ok {
logger.Debugf(ctx, "item with id [%v] not found in cache", itemID)
t.Stop()
continue
batch := *item.(*Batch)
if len(batch) == 1 {
itemID := batch[0].GetID()
item, ok := w.lruMap.Get(itemID)
if !ok {
logger.Debugf(ctx, "item with id [%v] not found in cache", itemID)
t.Stop()
continue
}
if item.(Item).IsTerminal() {
logger.Debugf(ctx, "item with id [%v] is terminal", itemID)
t.Stop()
continue
}
}
updatedBatch, err := w.syncCb(ctx, Batch{itemWrapper{
id: itemID.(ItemID),
item: item.(Item),
}})

// Since we create batches every time we sync, we will just remove the item from the queue here
// regardless of whether it succeeded the sync or not.
w.workqueue.Forget(itemID)
w.workqueue.Done(itemID)
updatedBatch, err := w.syncCb(ctx, *item.(*Batch))

if err != nil {
w.metrics.SyncErrors.Inc()
Expand All @@ -309,13 +332,12 @@ func (w *autoRefresh) sync(ctx context.Context) (err error) {
for _, item := range updatedBatch {
if item.Action == Update {
// Add adds the item if it has been evicted or updates an existing one.
w.lruMap.Add(item.ID, item.Item)
w.Update(item.ID, item.Item)
}
}

w.toDelete.Range(func(key interface{}) bool {
w.lruMap.Remove(key)
w.toDelete.Remove(key)
w.Delete(key)
return true
})

Expand Down

0 comments on commit a130ff6

Please sign in to comment.