diff --git a/dora/core/client/fs/src/main/java/alluxio/client/file/cache/LocalCacheManager.java b/dora/core/client/fs/src/main/java/alluxio/client/file/cache/LocalCacheManager.java index cf8264d3b002..21c9b4440f34 100644 --- a/dora/core/client/fs/src/main/java/alluxio/client/file/cache/LocalCacheManager.java +++ b/dora/core/client/fs/src/main/java/alluxio/client/file/cache/LocalCacheManager.java @@ -114,6 +114,7 @@ public class LocalCacheManager implements CacheManager { */ private final AtomicReference mState = new AtomicReference<>(); private final CacheManagerOptions mOptions; + private Predicate mTtlPredicate; /** * @param options the options of local cache manager @@ -128,13 +129,13 @@ public static LocalCacheManager create(CacheManagerOptions options, if (manager.mInitService.isPresent()) { manager.mInitService.get().submit(() -> { try { - manager.restoreOrInit(pageStoreDirs); + manager.restoreOrInit(pageStoreDirs, options); } catch (IOException e) { LOG.error("Failed to restore LocalCacheManager", e); } }); } else { - manager.restoreOrInit(pageStoreDirs); + manager.restoreOrInit(pageStoreDirs, options); } return manager; } @@ -163,9 +164,7 @@ public static LocalCacheManager create(CacheManagerOptions options, options.isAsyncRestoreEnabled() ? Optional.of(Executors.newSingleThreadExecutor()) : Optional.empty(); if (options.isTtlEnabled()) { - mTtlEnforcerExecutor = Optional.of(newScheduledThreadPool(1)); - mTtlEnforcerExecutor.get().scheduleAtFixedRate(() -> - LocalCacheManager.this.invalidate(pageInfo -> { + mTtlPredicate = pageInfo -> { try { return System.currentTimeMillis() - pageInfo.getCreatedTimestamp() >= options.getTtlThresholdSeconds() * 1000; @@ -173,7 +172,10 @@ public static LocalCacheManager create(CacheManagerOptions options, // In case of any exception, do not invalidate the cache return false; } - }), 0, options.getTtlCheckIntervalSeconds(), SECONDS); + }; + mTtlEnforcerExecutor = Optional.of(newScheduledThreadPool(1)); + mTtlEnforcerExecutor.get().scheduleAtFixedRate(() -> + LocalCacheManager.this.invalidate(mTtlPredicate), 0, options.getTtlCheckIntervalSeconds(), SECONDS); } else { mTtlEnforcerExecutor = Optional.empty(); } @@ -689,6 +691,10 @@ public boolean delete(PageId pageId, boolean isTemporary) { Metrics.DELETE_ERRORS.inc(); return false; } + return deleteInternal(pageId, isTemporary); + } + + private boolean deleteInternal(PageId pageId, boolean isTemporary) { ReadWriteLock pageLock = getPageLock(pageId); try (LockResource r = new LockResource(pageLock.writeLock())) { PageInfo pageInfo; @@ -749,10 +755,10 @@ public boolean append(PageId pageId, int appendAt, byte[] page, CacheContext cac * * @param pageStoreDirs */ - private void restoreOrInit(List pageStoreDirs) throws IOException { + private void restoreOrInit(List pageStoreDirs, CacheManagerOptions options) throws IOException { Preconditions.checkState(mState.get() == READ_ONLY); for (PageStoreDir pageStoreDir : pageStoreDirs) { - if (!restore(pageStoreDir)) { + if (!restore(pageStoreDir, options)) { try (LockResource r = new LockResource(mPageMetaStore.getLock().writeLock())) { mPageMetaStore.reset(); } @@ -771,7 +777,7 @@ private void restoreOrInit(List pageStoreDirs) throws IOException Metrics.STATE.inc(); } - private boolean restore(PageStoreDir pageStoreDir) { + private boolean restore(PageStoreDir pageStoreDir, CacheManagerOptions options) { long restoredPages = mPageMetaStore.numPages(); long restoredBytes = mPageMetaStore.bytes(); long discardPages = Metrics.PAGE_DISCARDED.getCount(); @@ -784,9 +790,25 @@ private boolean restore(PageStoreDir pageStoreDir) { return false; } try { - pageStoreDir.scanPages(pageInfo -> { - if (pageInfo.isPresent()) { - addPageToDir(pageStoreDir, pageInfo.get()); + + pageStoreDir.scanPages(optionalPageInfo -> { + if (optionalPageInfo.isPresent()) { + PageInfo pageInfo = optionalPageInfo.get(); + boolean tested = mTtlPredicate.test(pageInfo); + boolean isPageDeleted = false; + if (!tested) { + addPageToDir(pageStoreDir, pageInfo); + } + else { + isPageDeleted = deleteInternal(pageInfo.getPageId(), false); + } + if (isPageDeleted) { + MetricsSystem.meter(MetricKey.CLIENT_CACHE_PAGES_INVALIDATED.getName()).mark(); + } + else { + MetricsSystem.histogram(MetricKey.CLIENT_CACHE_PAGES_AGES.getName()) + .update(System.currentTimeMillis() - pageInfo.getCreatedTimestamp()); + } } }); } catch (IOException | RuntimeException e) { @@ -866,6 +888,11 @@ public void deleteTempFile(String fileId) { @Override public void invalidate(Predicate predicate) { + if (mState.get() != READ_WRITE) { + Metrics.DELETE_NOT_READY_ERRORS.inc(); + Metrics.DELETE_ERRORS.inc(); + return; + } mPageStoreDirs.forEach(dir -> { try { dir.scanPages(pageInfoOpt -> {