Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update the item only if it exists in the cache #4117

Merged
merged 8 commits into from
Oct 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func Test_monitor(t *testing.T) {
client.OnStatusMatch(ctx, mock.Anything).Return(core2.PhaseInfoSuccess(nil), nil)

wg := sync.WaitGroup{}
wg.Add(4)
wg.Add(8)
cacheObj, err := cache.NewAutoRefreshCache(rand.String(5), func(ctx context.Context, batch cache.Batch) (updatedBatch []cache.ItemSyncResponse, err error) {
wg.Done()
t.Logf("Syncing Item [%+v]", batch[0])
Expand Down
62 changes: 50 additions & 12 deletions flytestdlib/cache/auto_refresh.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cache
import (
"context"
"fmt"
"sync"
"time"

"github.com/flyteorg/flyte/flytestdlib/contextutils"
Expand Down Expand Up @@ -122,6 +123,7 @@ type autoRefresh struct {
syncPeriod time.Duration
workqueue workqueue.RateLimitingInterface
parallelizm int
lock sync.RWMutex
}

func getEvictionFunction(counter prometheus.Counter) func(key interface{}, value interface{}) {
Expand Down Expand Up @@ -173,6 +175,25 @@ func (w *autoRefresh) Start(ctx context.Context) error {
return nil
}

// Update updates the item only if it exists in the cache, return true if we updated the item.
func (w *autoRefresh) Update(id ItemID, item Item) (ok bool) {
w.lock.Lock()
defer w.lock.Unlock()
ok = w.lruMap.Contains(id)
if ok {
w.lruMap.Add(id, item)
Comment on lines +182 to +184
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why only update if it's already there, what if it has been evicted since?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we don't check if it exist in the cache, we might have this scenario.

worker1 removes item from the caches -> worker2 adds the item to cache again. (workerqueue can have duplicate item)

If we don't use lock, we might have this scenario.

  1. w.toDelete.Remove (worker 1)
  2. w.lruMap.Contains(id) (woker 2)
  3. w.lruMap.Remove(key) (woker 1)
  4. w.lruMap.Add(id, item) (woker 2) # we add item back to the cache again.

}
return ok
}

// Delete deletes the item from the cache if it exists.
func (w *autoRefresh) Delete(key interface{}) {
w.lock.Lock()
defer w.lock.Unlock()
Comment on lines +191 to +192
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to avoid locking the entire dataset for these operations... any reason to?

w.toDelete.Remove(key)
w.lruMap.Remove(key)
}

func (w *autoRefresh) Get(id ItemID) (Item, error) {
if val, ok := w.lruMap.Get(id); ok {
w.metrics.CacheHit.Inc()
Expand Down Expand Up @@ -212,8 +233,7 @@ func (w *autoRefresh) enqueueBatches(ctx context.Context) error {
snapshot := make([]ItemWrapper, 0, len(keys))
for _, k := range keys {
if w.toDelete.Contains(k) {
w.lruMap.Remove(k)
w.toDelete.Remove(k)
w.Delete(k)
continue
}
// If not ok, it means evicted between the item was evicted between getting the keys and this update loop
Expand Down Expand Up @@ -273,18 +293,37 @@ func (w *autoRefresh) sync(ctx context.Context) (err error) {
case <-ctx.Done():
return nil
default:
item, shutdown := w.workqueue.Get()
batch, shutdown := w.workqueue.Get()
if shutdown {
logger.Debugf(ctx, "Shutting down worker")
return nil
}

t := w.metrics.SyncLatency.Start()
updatedBatch, err := w.syncCb(ctx, *item.(*Batch))

// Since we create batches every time we sync, we will just remove the item from the queue here
// regardless of whether it succeeded the sync or not.
w.workqueue.Forget(item)
w.workqueue.Done(item)
w.workqueue.Forget(batch)
w.workqueue.Done(batch)

newBatch := make(Batch, 0, len(*batch.(*Batch)))
for _, b := range *batch.(*Batch) {
itemID := b.GetID()
item, ok := w.lruMap.Get(itemID)
if !ok {
logger.Debugf(ctx, "item with id [%v] not found in cache", itemID)
continue
}
if item.(Item).IsTerminal() {
logger.Debugf(ctx, "item with id [%v] is terminal", itemID)
continue
}
newBatch = append(newBatch, b)
}
if len(newBatch) == 0 {
continue
}

t := w.metrics.SyncLatency.Start()
updatedBatch, err := w.syncCb(ctx, newBatch)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add a check for if the len(newBatch) == 0, we don't need to sync at that point...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated it


if err != nil {
w.metrics.SyncErrors.Inc()
Expand All @@ -295,14 +334,13 @@ func (w *autoRefresh) sync(ctx context.Context) (err error) {

for _, item := range updatedBatch {
if item.Action == Update {
// Add adds the item if it has been evicted or updates an existing one.
w.lruMap.Add(item.ID, item.Item)
// Updates an existing item.
w.Update(item.ID, item.Item)
}
}

w.toDelete.Range(func(key interface{}) bool {
w.lruMap.Remove(key)
w.toDelete.Remove(key)
w.Delete(key)
return true
})

Expand Down
30 changes: 29 additions & 1 deletion flytestdlib/cache/auto_refresh_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func syncTerminalItem(_ context.Context, batch Batch) ([]ItemSyncResponse, error
panic("This should never be called")
}

func TestCacheThree(t *testing.T) {
func TestCacheFour(t *testing.T) {
testResyncPeriod := time.Millisecond
rateLimiter := workqueue.DefaultControllerRateLimiter()

Expand Down Expand Up @@ -142,6 +142,34 @@ func TestCacheThree(t *testing.T) {

cancel()
})

t.Run("Test update and delete cache", func(t *testing.T) {
cache, err := NewAutoRefreshCache("fake3", syncTerminalItem, rateLimiter, testResyncPeriod, 10, 2, promutils.NewTestScope())
assert.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
assert.NoError(t, cache.Start(ctx))

itemID := "dummy_id"
_, err = cache.GetOrCreate(itemID, terminalCacheItem{
val: 0,
})
assert.NoError(t, err)

// Wait half a second for all resync periods to complete
// If the cache tries to enqueue the item, a panic will be thrown.
time.Sleep(500 * time.Millisecond)

err = cache.DeleteDelayed(itemID)
assert.NoError(t, err)

time.Sleep(500 * time.Millisecond)
item, err := cache.Get(itemID)
assert.Nil(t, item)
assert.Error(t, err)

cancel()
})
}

func TestQueueBuildUp(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion rsts/deployment/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ plugins, authentication, performance tuning, and maintaining Flyte as a producti
:text: 🤖 Agent Setup
:classes: btn-block stretched-link
^^^^^^^^^^^^
Enable Flyte agents to extend Flyte's capabilities, including features like File sesnor, Databricks job, and Snowflake query services.
Enable Flyte agents to extend Flyte's capabilities, including features like File sensor, Databricks job, and Snowflake query services.

---

Expand Down