From a130ff65c731f7494551f3b3dd0d1e8567ec8b3c Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Tue, 3 Oct 2023 15:24:09 -0700 Subject: [PATCH] Update the item only if it exists in the cache Signed-off-by: Kevin Su --- flytestdlib/cache/auto_refresh.go | 70 ++++++++++++++++++++----------- 1 file changed, 46 insertions(+), 24 deletions(-) diff --git a/flytestdlib/cache/auto_refresh.go b/flytestdlib/cache/auto_refresh.go index 3ed258f97c..15765d7d5c 100644 --- a/flytestdlib/cache/auto_refresh.go +++ b/flytestdlib/cache/auto_refresh.go @@ -3,6 +3,7 @@ package cache import ( "context" "fmt" + "sync" "time" "github.com/flyteorg/flyte/flytestdlib/contextutils" @@ -122,6 +123,7 @@ type autoRefresh struct { syncPeriod time.Duration workqueue workqueue.RateLimitingInterface parallelizm int + lock sync.RWMutex } func getEvictionFunction(counter prometheus.Counter) func(key interface{}, value interface{}) { @@ -173,6 +175,24 @@ func (w *autoRefresh) Start(ctx context.Context) error { return nil } +// Update updates the item in the cache only if it exists, return true if we updated the item. +func (w *autoRefresh) Update(id ItemID, item Item) (ok bool) { + w.lock.Lock() + ok = w.lruMap.Contains(id) + if ok { + w.lruMap.Add(id, item) + } + w.lock.Unlock() + return ok +} + +func (w *autoRefresh) Delete(key interface{}) { + w.lock.Lock() + w.toDelete.Remove(key) + w.lruMap.Remove(key) + w.lock.Unlock() +} + func (w *autoRefresh) Get(id ItemID) (Item, error) { if val, ok := w.lruMap.Get(id); ok { w.metrics.CacheHit.Inc() @@ -212,8 +232,7 @@ func (w *autoRefresh) enqueueBatches(ctx context.Context) error { snapshot := make([]ItemWrapper, 0, len(keys)) for _, k := range keys { if w.toDelete.Contains(k) { - w.lruMap.Remove(k) - w.toDelete.Remove(k) + w.Delete(k) continue } // If not ok, it means evicted between the item was evicted between getting the keys and this update loop @@ -234,10 +253,8 @@ func (w *autoRefresh) enqueueBatches(ctx context.Context) error { } for _, batch := range batches { - for _, b := range batch { - logger.Debugf(ctx, "Enqueuing batch with id: %v", b.GetID()) - w.workqueue.Add(b.GetID()) - } + b := batch + w.workqueue.Add(&b) } return nil @@ -275,29 +292,35 @@ func (w *autoRefresh) sync(ctx context.Context) (err error) { case <-ctx.Done(): return nil default: - itemID, shutdown := w.workqueue.Get() + item, shutdown := w.workqueue.Get() if shutdown { logger.Debugf(ctx, "Shutting down worker") return nil } + // Since we create batches every time we sync, we will just remove the item from the queue here + // regardless of whether it succeeded the sync or not. + w.workqueue.Forget(item) + w.workqueue.Done(item) + t := w.metrics.SyncLatency.Start() - logger.Debugf(ctx, "Syncing item with id [%v]", itemID) - item, ok := w.lruMap.Get(itemID) - if !ok { - logger.Debugf(ctx, "item with id [%v] not found in cache", itemID) - t.Stop() - continue + batch := *item.(*Batch) + if len(batch) == 1 { + itemID := batch[0].GetID() + item, ok := w.lruMap.Get(itemID) + if !ok { + logger.Debugf(ctx, "item with id [%v] not found in cache", itemID) + t.Stop() + continue + } + if item.(Item).IsTerminal() { + logger.Debugf(ctx, "item with id [%v] is terminal", itemID) + t.Stop() + continue + } } - updatedBatch, err := w.syncCb(ctx, Batch{itemWrapper{ - id: itemID.(ItemID), - item: item.(Item), - }}) - // Since we create batches every time we sync, we will just remove the item from the queue here - // regardless of whether it succeeded the sync or not. - w.workqueue.Forget(itemID) - w.workqueue.Done(itemID) + updatedBatch, err := w.syncCb(ctx, *item.(*Batch)) if err != nil { w.metrics.SyncErrors.Inc() @@ -309,13 +332,12 @@ func (w *autoRefresh) sync(ctx context.Context) (err error) { for _, item := range updatedBatch { if item.Action == Update { // Add adds the item if it has been evicted or updates an existing one. - w.lruMap.Add(item.ID, item.Item) + w.Update(item.ID, item.Item) } } w.toDelete.Range(func(key interface{}) bool { - w.lruMap.Remove(key) - w.toDelete.Remove(key) + w.Delete(key) return true })