Skip to content

Commit

Permalink
invalidate outdated page when restore
Browse files Browse the repository at this point in the history
  • Loading branch information
jja725 committed Apr 4, 2024
1 parent c19feac commit 7296392
Showing 1 changed file with 39 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ public class LocalCacheManager implements CacheManager {
*/
private final AtomicReference<CacheManager.State> mState = new AtomicReference<>();
private final CacheManagerOptions mOptions;
private Predicate<PageInfo> mTtlPredicate;

/**
* @param options the options of local cache manager
Expand All @@ -128,13 +129,13 @@ public static LocalCacheManager create(CacheManagerOptions options,
if (manager.mInitService.isPresent()) {
manager.mInitService.get().submit(() -> {
try {
manager.restoreOrInit(pageStoreDirs);
manager.restoreOrInit(pageStoreDirs, options);
} catch (IOException e) {
LOG.error("Failed to restore LocalCacheManager", e);
}
});
} else {
manager.restoreOrInit(pageStoreDirs);
manager.restoreOrInit(pageStoreDirs, options);
}
return manager;
}
Expand Down Expand Up @@ -163,17 +164,18 @@ public static LocalCacheManager create(CacheManagerOptions options,
options.isAsyncRestoreEnabled() ? Optional.of(Executors.newSingleThreadExecutor()) :
Optional.empty();
if (options.isTtlEnabled()) {
mTtlEnforcerExecutor = Optional.of(newScheduledThreadPool(1));
mTtlEnforcerExecutor.get().scheduleAtFixedRate(() ->
LocalCacheManager.this.invalidate(pageInfo -> {
mTtlPredicate = pageInfo -> {
try {
return System.currentTimeMillis() - pageInfo.getCreatedTimestamp()
>= options.getTtlThresholdSeconds() * 1000;
} catch (Exception ex) {
// In case of any exception, do not invalidate the cache
return false;
}
}), 0, options.getTtlCheckIntervalSeconds(), SECONDS);
};
mTtlEnforcerExecutor = Optional.of(newScheduledThreadPool(1));
mTtlEnforcerExecutor.get().scheduleAtFixedRate(() ->
LocalCacheManager.this.invalidate(mTtlPredicate), 0, options.getTtlCheckIntervalSeconds(), SECONDS);
} else {
mTtlEnforcerExecutor = Optional.empty();
}
Expand Down Expand Up @@ -689,6 +691,10 @@ public boolean delete(PageId pageId, boolean isTemporary) {
Metrics.DELETE_ERRORS.inc();
return false;
}
return deleteInternal(pageId, isTemporary);
}

private boolean deleteInternal(PageId pageId, boolean isTemporary) {
ReadWriteLock pageLock = getPageLock(pageId);
try (LockResource r = new LockResource(pageLock.writeLock())) {
PageInfo pageInfo;
Expand Down Expand Up @@ -749,10 +755,10 @@ public boolean append(PageId pageId, int appendAt, byte[] page, CacheContext cac
*
* @param pageStoreDirs
*/
private void restoreOrInit(List<PageStoreDir> pageStoreDirs) throws IOException {
private void restoreOrInit(List<PageStoreDir> pageStoreDirs, CacheManagerOptions options) throws IOException {
Preconditions.checkState(mState.get() == READ_ONLY);
for (PageStoreDir pageStoreDir : pageStoreDirs) {
if (!restore(pageStoreDir)) {
if (!restore(pageStoreDir, options)) {
try (LockResource r = new LockResource(mPageMetaStore.getLock().writeLock())) {
mPageMetaStore.reset();
}
Expand All @@ -771,7 +777,7 @@ private void restoreOrInit(List<PageStoreDir> pageStoreDirs) throws IOException
Metrics.STATE.inc();
}

private boolean restore(PageStoreDir pageStoreDir) {
private boolean restore(PageStoreDir pageStoreDir, CacheManagerOptions options) {
long restoredPages = mPageMetaStore.numPages();
long restoredBytes = mPageMetaStore.bytes();
long discardPages = Metrics.PAGE_DISCARDED.getCount();
Expand All @@ -784,9 +790,25 @@ private boolean restore(PageStoreDir pageStoreDir) {
return false;
}
try {
pageStoreDir.scanPages(pageInfo -> {
if (pageInfo.isPresent()) {
addPageToDir(pageStoreDir, pageInfo.get());

pageStoreDir.scanPages(optionalPageInfo -> {
if (optionalPageInfo.isPresent()) {
PageInfo pageInfo = optionalPageInfo.get();
boolean tested = mTtlPredicate.test(pageInfo);
boolean isPageDeleted = false;
if (!tested) {
addPageToDir(pageStoreDir, pageInfo);
}
else {
isPageDeleted = deleteInternal(pageInfo.getPageId(), false);
}
if (isPageDeleted) {
MetricsSystem.meter(MetricKey.CLIENT_CACHE_PAGES_INVALIDATED.getName()).mark();
}
else {
MetricsSystem.histogram(MetricKey.CLIENT_CACHE_PAGES_AGES.getName())
.update(System.currentTimeMillis() - pageInfo.getCreatedTimestamp());
}
}
});
} catch (IOException | RuntimeException e) {
Expand Down Expand Up @@ -866,6 +888,11 @@ public void deleteTempFile(String fileId) {

@Override
public void invalidate(Predicate<PageInfo> predicate) {
if (mState.get() != READ_WRITE) {
Metrics.DELETE_NOT_READY_ERRORS.inc();
Metrics.DELETE_ERRORS.inc();
return;
}
mPageStoreDirs.forEach(dir -> {
try {
dir.scanPages(pageInfoOpt -> {
Expand Down

0 comments on commit 7296392

Please sign in to comment.