Skip to content

Commit

Permalink
invalidate outdated page when restore
Browse files Browse the repository at this point in the history
  • Loading branch information
jja725 committed Apr 4, 2024
1 parent 7d46848 commit 8a37d56
Showing 1 changed file with 63 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ public class LocalCacheManager implements CacheManager {
*/
private final AtomicReference<CacheManager.State> mState = new AtomicReference<>();
private final CacheManagerOptions mOptions;
private Optional<Predicate<PageInfo>> mPagePredicate = Optional.empty();

/**
* @param options the options of local cache manager
Expand Down Expand Up @@ -163,17 +164,19 @@ public static LocalCacheManager create(CacheManagerOptions options,
options.isAsyncRestoreEnabled() ? Optional.of(Executors.newSingleThreadExecutor()) :
Optional.empty();
if (options.isTtlEnabled()) {
Predicate<PageInfo> ttl = pageInfo -> {
try {
return System.currentTimeMillis() - pageInfo.getCreatedTimestamp()
>= options.getTtlThresholdSeconds() * 1000;
} catch (Exception ex) {
// In case of any exception, do not invalidate the cache
return false;
}
};
mPagePredicate = Optional.of(ttl);
mTtlEnforcerExecutor = Optional.of(newScheduledThreadPool(1));
mTtlEnforcerExecutor.get().scheduleAtFixedRate(() ->
LocalCacheManager.this.invalidate(pageInfo -> {
try {
return System.currentTimeMillis() - pageInfo.getCreatedTimestamp()
>= options.getTtlThresholdSeconds() * 1000;
} catch (Exception ex) {
// In case of any exception, do not invalidate the cache
return false;
}
}), 0, options.getTtlCheckIntervalSeconds(), SECONDS);
LocalCacheManager.this.invalidate(ttl), 0, options.getTtlCheckIntervalSeconds(), SECONDS);
} else {
mTtlEnforcerExecutor = Optional.empty();
}
Expand Down Expand Up @@ -689,27 +692,7 @@ public boolean delete(PageId pageId, boolean isTemporary) {
Metrics.DELETE_ERRORS.inc();
return false;
}
ReadWriteLock pageLock = getPageLock(pageId);
try (LockResource r = new LockResource(pageLock.writeLock())) {
PageInfo pageInfo;
try (LockResource r1 = new LockResource(mPageMetaStore.getLock().writeLock())) {
try {
pageInfo = mPageMetaStore.removePage(pageId, isTemporary);
} catch (PageNotFoundException e) {
LOG.debug("Failed to delete page {} from metaStore ", pageId, e);
Metrics.DELETE_NON_EXISTING_PAGE_ERRORS.inc();
Metrics.DELETE_ERRORS.inc();
return false;
}
}
boolean ok = deletePage(pageInfo, isTemporary);
LOG.debug("delete({}) exits, success: {}", pageId, ok);
if (!ok) {
Metrics.DELETE_STORE_DELETE_ERRORS.inc();
Metrics.DELETE_ERRORS.inc();
}
return ok;
}
return deleteInternal(pageId, isTemporary);
}

@Override
Expand Down Expand Up @@ -784,9 +767,27 @@ private boolean restore(PageStoreDir pageStoreDir) {
return false;
}
try {
pageStoreDir.scanPages(pageInfo -> {
if (pageInfo.isPresent()) {
addPageToDir(pageStoreDir, pageInfo.get());
pageStoreDir.scanPages(optionalPageInfo -> {
if (optionalPageInfo.isPresent()) {
PageInfo pageInfo = optionalPageInfo.get();
boolean tested = false;
if (mPagePredicate.isPresent()) {
tested = mPagePredicate.get().test(pageInfo);
}
boolean isPageDeleted = false;
if (!tested) {
addPageToDir(pageStoreDir, pageInfo);
}
else {
isPageDeleted = deleteInternal(pageInfo.getPageId(), false);
}
if (isPageDeleted) {
MetricsSystem.meter(MetricKey.CLIENT_CACHE_PAGES_INVALIDATED.getName()).mark();
}
else {
MetricsSystem.histogram(MetricKey.CLIENT_CACHE_PAGES_AGES.getName())
.update(System.currentTimeMillis() - pageInfo.getCreatedTimestamp());
}
}
});
} catch (IOException | RuntimeException e) {
Expand Down Expand Up @@ -866,6 +867,11 @@ public void deleteTempFile(String fileId) {

@Override
public void invalidate(Predicate<PageInfo> predicate) {
if (mState.get() != READ_WRITE) {
Metrics.DELETE_NOT_READY_ERRORS.inc();
Metrics.DELETE_ERRORS.inc();
return;
}
mPageStoreDirs.forEach(dir -> {
try {
dir.scanPages(pageInfoOpt -> {
Expand Down Expand Up @@ -902,6 +908,30 @@ public void close() throws Exception {
mTtlEnforcerExecutor.ifPresent(ExecutorService::shutdownNow);
}

private boolean deleteInternal(PageId pageId, boolean isTemporary) {
ReadWriteLock pageLock = getPageLock(pageId);
try (LockResource r = new LockResource(pageLock.writeLock())) {
PageInfo pageInfo;
try (LockResource r1 = new LockResource(mPageMetaStore.getLock().writeLock())) {
try {
pageInfo = mPageMetaStore.removePage(pageId, isTemporary);
} catch (PageNotFoundException e) {
LOG.debug("Failed to delete page {} from metaStore ", pageId, e);
Metrics.DELETE_NON_EXISTING_PAGE_ERRORS.inc();
Metrics.DELETE_ERRORS.inc();
return false;
}
}
boolean ok = deletePage(pageInfo, isTemporary);
LOG.debug("delete({}) exits, success: {}", pageId, ok);
if (!ok) {
Metrics.DELETE_STORE_DELETE_ERRORS.inc();
Metrics.DELETE_ERRORS.inc();
}
return ok;
}
}

/**
* Attempts to delete a page from the page store. The page lock must be acquired before calling
* this method. The metastore must be updated before calling this method.
Expand Down

0 comments on commit 8a37d56

Please sign in to comment.