Skip to content

Commit

Permalink
Improve the local cache fallback behavior for corrupted pages
Browse files Browse the repository at this point in the history
  • Loading branch information
beinan committed Jan 31, 2024
1 parent 055b375 commit c65c32d
Show file tree
Hide file tree
Showing 9 changed files with 150 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import alluxio.collections.ConcurrentHashSet;
import alluxio.collections.Pair;
import alluxio.exception.FileDoesNotExistException;
import alluxio.exception.PageCorruptedException;
import alluxio.exception.PageNotFoundException;
import alluxio.exception.status.ResourceExhaustedException;
import alluxio.file.ByteArrayTargetBuffer;
Expand Down Expand Up @@ -919,6 +920,7 @@ private boolean deletePage(PageInfo pageInfo, boolean isTemporary) {

private int getPage(PageInfo pageInfo, int pageOffset, int bytesToRead,
ReadTargetBuffer target, CacheContext cacheContext) {
int originOffset = target.offset();
try {
int ret = pageInfo.getLocalCacheDir().getPageStore()
.get(pageInfo.getPageId(), pageOffset, bytesToRead, target,
Expand All @@ -927,10 +929,20 @@ private int getPage(PageInfo pageInfo, int pageOffset, int bytesToRead,
// data read from page store is inconsistent from the metastore
LOG.error("Failed to read page {}: supposed to read {} bytes, {} bytes actually read",
pageInfo.getPageId(), bytesToRead, ret);
target.offset(originOffset); //reset the offset
//best efforts to delete the corrupted file without acquire the write lock
deletePage(pageInfo, false);
return -1;
}
} catch (PageCorruptedException e) {
LOG.error("Data corrupted page {} from pageStore", pageInfo.getPageId(), e);
target.offset(originOffset); //reset the offset
//best efforts to delete the corrupted file without acquire the write lock
deletePage(pageInfo, false);
return -1;
} catch (IOException | PageNotFoundException e) {
LOG.debug("Failed to get existing page {} from pageStore", pageInfo.getPageId(), e);
target.offset(originOffset); //reset the offset
return -1;
}
return bytesToRead;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import alluxio.client.file.cache.PageId;
import alluxio.client.file.cache.PageStore;
import alluxio.exception.PageCorruptedException;
import alluxio.exception.PageNotFoundException;
import alluxio.exception.status.ResourceExhaustedException;
import alluxio.file.ReadTargetBuffer;
Expand Down Expand Up @@ -100,11 +101,15 @@ public int get(PageId pageId, int pageOffset, int bytesToRead, ReadTargetBuffer
}
Path pagePath = getPagePath(pageId, isTemporary);
try (RandomAccessFile localFile = new RandomAccessFile(pagePath.toString(), "r")) {
long pageLength = localFile.length();
if (pageOffset + bytesToRead > pageLength) {
throw new PageCorruptedException(String.format(
"The page %s (%s) probably has been corrupted, "
+ "page-offset %s, bytes to read %s, page file length %s",
pageId, pagePath, pageOffset, bytesToRead, pageLength));
}
int bytesSkipped = localFile.skipBytes(pageOffset);
if (pageOffset != bytesSkipped) {
long pageLength = pagePath.toFile().length();
Preconditions.checkArgument(pageOffset <= pageLength,
"page offset %s exceeded page size %s", pageOffset, pageLength);
throw new IOException(
String.format("Failed to read page %s (%s) from offset %s: %s bytes skipped",
pageId, pagePath, pageOffset, bytesSkipped));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import alluxio.client.file.cache.PageId;
import alluxio.client.file.cache.PageStore;
import alluxio.exception.PageCorruptedException;
import alluxio.exception.PageNotFoundException;
import alluxio.file.ReadTargetBuffer;

Expand Down Expand Up @@ -68,8 +69,12 @@ public int get(PageId pageId, int pageOffset, int bytesToRead, ReadTargetBuffer
throw new PageNotFoundException(pageId.getFileId() + "_" + pageId.getPageIndex());
}
MemPage page = mPageStoreMap.get(pageKey);
Preconditions.checkArgument(pageOffset <= page.getPageLength(),
"page offset %s exceeded page size %s", pageOffset, page.getPageLength());
if (pageOffset + bytesToRead > page.getPageLength()) {
throw new PageCorruptedException(String.format(
"The page %s probably has been corrupted, "
+ "page-offset %s, bytes to read %s, page file length %s",
pageId, pageOffset, bytesToRead, page.getPageLength()));
}
int bytesLeft = (int) Math.min(page.getPageLength() - pageOffset, target.remaining());
bytesLeft = Math.min(bytesLeft, bytesToRead);
target.writeBytes(page.getPage(), pageOffset, bytesLeft);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,23 @@ public void testUnbuffer() throws Exception {
Assert.assertEquals(1, manager.mPagesServed);
}

@Test
public void testPageDataFileCorrupted() throws Exception
{
int pages = 10;
int fileSize = mPageSize * pages;
byte[] testData = BufferUtils.getIncreasingByteArray(fileSize);
ByteArrayCacheManager manager = new ByteArrayCacheManager();
//by default local cache fallback is not enabled, the read should fail for any error
LocalCacheFileInStream streamWithOutFallback = setupWithSingleFile(testData, manager);

sConf.set(PropertyKey.USER_CLIENT_CACHE_FALLBACK_ENABLED, true);
LocalCacheFileInStream streamWithFallback = setupWithSingleFile(testData, manager);
Assert.assertEquals(100, streamWithFallback.positionedRead(0, new byte[10], 100, 100));
Assert.assertEquals(1,
MetricsSystem.counter(MetricKey.CLIENT_CACHE_POSITION_READ_FALLBACK.getName()).getCount());
}

@Test
public void testPositionReadFallBack() throws Exception
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import alluxio.conf.Configuration;
import alluxio.conf.InstancedConfiguration;
import alluxio.conf.PropertyKey;
import alluxio.exception.PageCorruptedException;
import alluxio.exception.PageNotFoundException;
import alluxio.exception.status.ResourceExhaustedException;
import alluxio.file.ByteArrayTargetBuffer;
Expand Down Expand Up @@ -981,7 +982,7 @@ public void getTimeout() throws Exception {
}

@Test
public void getFaultyRead() throws Exception {
public void getFaultyReadWithNoExceptionManager() throws Exception {
PageStoreOptions pageStoreOptions = PageStoreOptions.create(mConf).get(0);
FaultyPageStore pageStore = new FaultyPageStore();
PageStoreDir dir =
Expand All @@ -998,6 +999,40 @@ public void getFaultyRead() throws Exception {
assertEquals(0, targetBuffer.offset());
}

@Test
public void getFaultyReadWithLocalCacheManager() throws Exception {
PageStoreOptions pageStoreOptions = PageStoreOptions.create(mConf).get(0);
FaultyPageStore pageStore = new FaultyPageStore();
PageStoreDir dir =
new LocalPageStoreDir(pageStoreOptions, pageStore, mEvictor);

mPageMetaStore = new DefaultPageMetaStore(ImmutableList.of(dir));
LocalCacheManager cacheManager = createLocalCacheManager(mConf, mPageMetaStore);
cacheManager.put(PAGE_ID1, PAGE1);
ByteArrayTargetBuffer targetBuffer = new ByteArrayTargetBuffer(mBuf, 0);
pageStore.setGetFaulty(true);
assertEquals(-1, cacheManager.get(PAGE_ID1, PAGE1.length,
targetBuffer, CacheContext.defaults()));
assertEquals(0, targetBuffer.offset());
}

@Test
public void getCorruptedReadWithLocalCacheManager() throws Exception {
PageStoreOptions pageStoreOptions = PageStoreOptions.create(mConf).get(0);
FaultyPageStore pageStore = new FaultyPageStore();
PageStoreDir dir =
new LocalPageStoreDir(pageStoreOptions, pageStore, mEvictor);

mPageMetaStore = new DefaultPageMetaStore(ImmutableList.of(dir));
LocalCacheManager cacheManager = createLocalCacheManager(mConf, mPageMetaStore);
cacheManager.put(PAGE_ID1, PAGE1);
ByteArrayTargetBuffer targetBuffer = new ByteArrayTargetBuffer(mBuf, 0);
pageStore.setGetCorrupted(true);
assertEquals(-1, cacheManager.get(PAGE_ID1, PAGE1.length,
targetBuffer, CacheContext.defaults()));
assertEquals(0, targetBuffer.offset());
}

@Test
public void deleteTimeout() throws Exception {
mConf.set(PropertyKey.USER_CLIENT_CACHE_TIMEOUT_DURATION, "2s");
Expand Down Expand Up @@ -1157,13 +1192,19 @@ public FaultyPageStore() {
private AtomicBoolean mDeleteFaulty = new AtomicBoolean(false);
private AtomicBoolean mGetFaulty = new AtomicBoolean(false);

private AtomicBoolean mGetCorrupted = new AtomicBoolean(false);

@Override
public int get(PageId pageId, int pageOffset, int bytesToRead, ReadTargetBuffer target,
boolean isTemporary) throws IOException, PageNotFoundException {
if (mGetFaulty.get()) {
target.offset(target.offset() + 100);
throw new IOException("Page read fault");
}
if (mGetCorrupted.get()) {
target.offset(target.offset() + 100);
throw new PageCorruptedException("page corrupted");
}
return super.get(pageId, pageOffset, bytesToRead, target, isTemporary);
}

Expand Down Expand Up @@ -1194,6 +1235,10 @@ void setDeleteFaulty(boolean faulty) {
void setGetFaulty(boolean faulty) {
mGetFaulty.set(faulty);
}

void setGetCorrupted(boolean faulty) {
mGetCorrupted.set(faulty);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;

import alluxio.client.file.cache.PageId;
import alluxio.client.file.cache.PageStore;
import alluxio.exception.PageCorruptedException;
import alluxio.file.ByteArrayTargetBuffer;

import org.junit.Before;
Expand Down Expand Up @@ -171,12 +173,26 @@ public void cleanFileAndDirectory() throws Exception {
assertFalse(Files.exists(p.getParent()));
}

@Test
public void testCorruptedPages() throws Exception {
mOptions.setFileBuckets(1);
LocalPageStore pageStore = new LocalPageStore(mOptions);
byte[] buf = new byte[1000];
PageId id = new PageId("1", 0);
pageStore.put(id, "corrupted".getBytes());
assertThrows(PageCorruptedException.class, () -> {
//the bytes caller want to read is larger than the page file, mostly means the page corrupted
pageStore.get(id, 0, 100, new ByteArrayTargetBuffer(buf, 0));
});
}

private void helloWorldTest(PageStore store) throws Exception {
String msg = "Hello, World!";
PageId id = new PageId("0", 0);
store.put(id, msg.getBytes());
byte[] buf = new byte[1024];
assertEquals(msg.getBytes().length, store.get(id, new ByteArrayTargetBuffer(buf, 0)));
assertEquals(msg.getBytes().length, store.get(id, 0, msg.length(),
new ByteArrayTargetBuffer(buf, 0)));
assertArrayEquals(msg.getBytes(), Arrays.copyOfRange(buf, 0, msg.getBytes().length));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ private void helloWorldTest(PageStore store) throws Exception {
PageId id = new PageId("0", 0);
store.put(id, msg.getBytes());
byte[] buf = new byte[PAGE_SIZE];
assertEquals(msg.getBytes().length, store.get(id, new ByteArrayTargetBuffer(buf, 0)));
assertEquals(msg.getBytes().length,
store.get(id, 0, msg.length(), new ByteArrayTargetBuffer(buf, 0)));
assertArrayEquals(msg.getBytes(), Arrays.copyOfRange(buf, 0, msg.getBytes().length));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import alluxio.ProjectConstants;
import alluxio.client.file.cache.PageId;
import alluxio.client.file.cache.PageStore;
import alluxio.exception.PageCorruptedException;
import alluxio.exception.PageNotFoundException;
import alluxio.file.ByteArrayTargetBuffer;
import alluxio.util.io.BufferUtils;
Expand Down Expand Up @@ -79,7 +80,8 @@ public void helloWorldTest() throws Exception {
PageId id = new PageId("0", 0);
mPageStore.put(id, msgBytes);
byte[] buf = new byte[1024];
assertEquals(msgBytes.length, mPageStore.get(id, new ByteArrayTargetBuffer(buf, 0)));
assertEquals(msgBytes.length,
mPageStore.get(id, 0, msgBytes.length, new ByteArrayTargetBuffer(buf, 0)));
assertArrayEquals(msgBytes, Arrays.copyOfRange(buf, 0, msgBytes.length));
mPageStore.delete(id);
try {
Expand All @@ -97,7 +99,8 @@ public void getOffset() throws Exception {
mPageStore.put(id, BufferUtils.getIncreasingByteArray(len));
byte[] buf = new byte[len];
for (int offset = 1; offset < len; offset++) {
int bytesRead = mPageStore.get(id, offset, len, new ByteArrayTargetBuffer(buf, 0), false);
int bytesRead = mPageStore.get(id, offset, len - offset,
new ByteArrayTargetBuffer(buf, 0), false);
assertEquals(len - offset, bytesRead);
assertArrayEquals(BufferUtils.getIncreasingByteArray(offset, len - offset),
Arrays.copyOfRange(buf, 0, bytesRead));
Expand All @@ -111,7 +114,7 @@ public void getOffsetOverflow() throws Exception {
PageId id = new PageId("0", 0);
mPageStore.put(id, BufferUtils.getIncreasingByteArray(len));
byte[] buf = new byte[1024];
assertThrows(IllegalArgumentException.class, () ->
assertThrows(PageCorruptedException.class, () ->
mPageStore.get(id, offset, len, new ByteArrayTargetBuffer(buf, 0)));
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* The Alluxio Open Foundation licenses this work under the Apache License, version 2.0
* (the "License"). You may not use this work except in compliance with the License, which is
* available at www.apache.org/licenses/LICENSE-2.0
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied, as more fully set forth in the License.
*
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/

package alluxio.exception;

/**
* An exception that should be thrown when the data of a page has been corrupted in store.
*/
public class PageCorruptedException extends RuntimeException {

/**
* Construct PageCorruptedException with the specified message.
* @param message
*/
public PageCorruptedException(String message) {
super(message);
}

/**
* Construct PageCorruptedException with the specified message and cause.
* @param message
* @param cause
*/
public PageCorruptedException(String message, Throwable cause) {
super(message, cause);
}
}

0 comments on commit c65c32d

Please sign in to comment.