From 8a37d561fdfaf872a5b55f72070354858f7b383e Mon Sep 17 00:00:00 2001 From: Jianjian Date: Wed, 3 Apr 2024 16:34:11 -0700 Subject: [PATCH] invalidate outdated page when restore --- .../client/file/cache/LocalCacheManager.java | 96 ++++++++++++------- 1 file changed, 63 insertions(+), 33 deletions(-) diff --git a/dora/core/client/fs/src/main/java/alluxio/client/file/cache/LocalCacheManager.java b/dora/core/client/fs/src/main/java/alluxio/client/file/cache/LocalCacheManager.java index cf8264d3b002..a3a577b2b7f7 100644 --- a/dora/core/client/fs/src/main/java/alluxio/client/file/cache/LocalCacheManager.java +++ b/dora/core/client/fs/src/main/java/alluxio/client/file/cache/LocalCacheManager.java @@ -114,6 +114,7 @@ public class LocalCacheManager implements CacheManager { */ private final AtomicReference mState = new AtomicReference<>(); private final CacheManagerOptions mOptions; + private Optional> mPagePredicate = Optional.empty(); /** * @param options the options of local cache manager @@ -163,17 +164,19 @@ public static LocalCacheManager create(CacheManagerOptions options, options.isAsyncRestoreEnabled() ? Optional.of(Executors.newSingleThreadExecutor()) : Optional.empty(); if (options.isTtlEnabled()) { + Predicate ttl = pageInfo -> { + try { + return System.currentTimeMillis() - pageInfo.getCreatedTimestamp() + >= options.getTtlThresholdSeconds() * 1000; + } catch (Exception ex) { + // In case of any exception, do not invalidate the cache + return false; + } + }; + mPagePredicate = Optional.of(ttl); mTtlEnforcerExecutor = Optional.of(newScheduledThreadPool(1)); mTtlEnforcerExecutor.get().scheduleAtFixedRate(() -> - LocalCacheManager.this.invalidate(pageInfo -> { - try { - return System.currentTimeMillis() - pageInfo.getCreatedTimestamp() - >= options.getTtlThresholdSeconds() * 1000; - } catch (Exception ex) { - // In case of any exception, do not invalidate the cache - return false; - } - }), 0, options.getTtlCheckIntervalSeconds(), SECONDS); + LocalCacheManager.this.invalidate(ttl), 0, options.getTtlCheckIntervalSeconds(), SECONDS); } else { mTtlEnforcerExecutor = Optional.empty(); } @@ -689,27 +692,7 @@ public boolean delete(PageId pageId, boolean isTemporary) { Metrics.DELETE_ERRORS.inc(); return false; } - ReadWriteLock pageLock = getPageLock(pageId); - try (LockResource r = new LockResource(pageLock.writeLock())) { - PageInfo pageInfo; - try (LockResource r1 = new LockResource(mPageMetaStore.getLock().writeLock())) { - try { - pageInfo = mPageMetaStore.removePage(pageId, isTemporary); - } catch (PageNotFoundException e) { - LOG.debug("Failed to delete page {} from metaStore ", pageId, e); - Metrics.DELETE_NON_EXISTING_PAGE_ERRORS.inc(); - Metrics.DELETE_ERRORS.inc(); - return false; - } - } - boolean ok = deletePage(pageInfo, isTemporary); - LOG.debug("delete({}) exits, success: {}", pageId, ok); - if (!ok) { - Metrics.DELETE_STORE_DELETE_ERRORS.inc(); - Metrics.DELETE_ERRORS.inc(); - } - return ok; - } + return deleteInternal(pageId, isTemporary); } @Override @@ -784,9 +767,27 @@ private boolean restore(PageStoreDir pageStoreDir) { return false; } try { - pageStoreDir.scanPages(pageInfo -> { - if (pageInfo.isPresent()) { - addPageToDir(pageStoreDir, pageInfo.get()); + pageStoreDir.scanPages(optionalPageInfo -> { + if (optionalPageInfo.isPresent()) { + PageInfo pageInfo = optionalPageInfo.get(); + boolean tested = false; + if (mPagePredicate.isPresent()) { + tested = mPagePredicate.get().test(pageInfo); + } + boolean isPageDeleted = false; + if (!tested) { + addPageToDir(pageStoreDir, pageInfo); + } + else { + isPageDeleted = deleteInternal(pageInfo.getPageId(), false); + } + if (isPageDeleted) { + MetricsSystem.meter(MetricKey.CLIENT_CACHE_PAGES_INVALIDATED.getName()).mark(); + } + else { + MetricsSystem.histogram(MetricKey.CLIENT_CACHE_PAGES_AGES.getName()) + .update(System.currentTimeMillis() - pageInfo.getCreatedTimestamp()); + } } }); } catch (IOException | RuntimeException e) { @@ -866,6 +867,11 @@ public void deleteTempFile(String fileId) { @Override public void invalidate(Predicate predicate) { + if (mState.get() != READ_WRITE) { + Metrics.DELETE_NOT_READY_ERRORS.inc(); + Metrics.DELETE_ERRORS.inc(); + return; + } mPageStoreDirs.forEach(dir -> { try { dir.scanPages(pageInfoOpt -> { @@ -902,6 +908,30 @@ public void close() throws Exception { mTtlEnforcerExecutor.ifPresent(ExecutorService::shutdownNow); } + private boolean deleteInternal(PageId pageId, boolean isTemporary) { + ReadWriteLock pageLock = getPageLock(pageId); + try (LockResource r = new LockResource(pageLock.writeLock())) { + PageInfo pageInfo; + try (LockResource r1 = new LockResource(mPageMetaStore.getLock().writeLock())) { + try { + pageInfo = mPageMetaStore.removePage(pageId, isTemporary); + } catch (PageNotFoundException e) { + LOG.debug("Failed to delete page {} from metaStore ", pageId, e); + Metrics.DELETE_NON_EXISTING_PAGE_ERRORS.inc(); + Metrics.DELETE_ERRORS.inc(); + return false; + } + } + boolean ok = deletePage(pageInfo, isTemporary); + LOG.debug("delete({}) exits, success: {}", pageId, ok); + if (!ok) { + Metrics.DELETE_STORE_DELETE_ERRORS.inc(); + Metrics.DELETE_ERRORS.inc(); + } + return ok; + } + } + /** * Attempts to delete a page from the page store. The page lock must be acquired before calling * this method. The metastore must be updated before calling this method.