From c65c32dce37a07b699da30725c4db9debcda5add Mon Sep 17 00:00:00 2001 From: Beinan Date: Wed, 24 Jan 2024 20:25:49 -0800 Subject: [PATCH] Improve the local cache fallback behavior for corrupted pages --- .../client/file/cache/LocalCacheManager.java | 12 +++++ .../file/cache/store/LocalPageStore.java | 11 +++-- .../file/cache/store/MemoryPageStore.java | 9 +++- .../cache/LocalCacheFileInStreamTest.java | 17 +++++++ .../file/cache/LocalCacheManagerTest.java | 47 ++++++++++++++++++- .../file/cache/store/LocalPageStoreTest.java | 18 ++++++- .../file/cache/store/MemoryPageStoreTest.java | 3 +- .../file/cache/store/PageStoreTest.java | 9 ++-- .../exception/PageCorruptedException.java | 35 ++++++++++++++ 9 files changed, 150 insertions(+), 11 deletions(-) create mode 100644 dora/core/common/src/main/java/alluxio/exception/PageCorruptedException.java diff --git a/dora/core/client/fs/src/main/java/alluxio/client/file/cache/LocalCacheManager.java b/dora/core/client/fs/src/main/java/alluxio/client/file/cache/LocalCacheManager.java index 9ecd5ade677b..43b77e32003e 100644 --- a/dora/core/client/fs/src/main/java/alluxio/client/file/cache/LocalCacheManager.java +++ b/dora/core/client/fs/src/main/java/alluxio/client/file/cache/LocalCacheManager.java @@ -26,6 +26,7 @@ import alluxio.collections.ConcurrentHashSet; import alluxio.collections.Pair; import alluxio.exception.FileDoesNotExistException; +import alluxio.exception.PageCorruptedException; import alluxio.exception.PageNotFoundException; import alluxio.exception.status.ResourceExhaustedException; import alluxio.file.ByteArrayTargetBuffer; @@ -919,6 +920,7 @@ private boolean deletePage(PageInfo pageInfo, boolean isTemporary) { private int getPage(PageInfo pageInfo, int pageOffset, int bytesToRead, ReadTargetBuffer target, CacheContext cacheContext) { + int originOffset = target.offset(); try { int ret = pageInfo.getLocalCacheDir().getPageStore() .get(pageInfo.getPageId(), pageOffset, bytesToRead, target, @@ -927,10 +929,20 @@ private int getPage(PageInfo pageInfo, int pageOffset, int bytesToRead, // data read from page store is inconsistent from the metastore LOG.error("Failed to read page {}: supposed to read {} bytes, {} bytes actually read", pageInfo.getPageId(), bytesToRead, ret); + target.offset(originOffset); //reset the offset + //best efforts to delete the corrupted file without acquire the write lock + deletePage(pageInfo, false); return -1; } + } catch (PageCorruptedException e) { + LOG.error("Data corrupted page {} from pageStore", pageInfo.getPageId(), e); + target.offset(originOffset); //reset the offset + //best efforts to delete the corrupted file without acquire the write lock + deletePage(pageInfo, false); + return -1; } catch (IOException | PageNotFoundException e) { LOG.debug("Failed to get existing page {} from pageStore", pageInfo.getPageId(), e); + target.offset(originOffset); //reset the offset return -1; } return bytesToRead; diff --git a/dora/core/client/fs/src/main/java/alluxio/client/file/cache/store/LocalPageStore.java b/dora/core/client/fs/src/main/java/alluxio/client/file/cache/store/LocalPageStore.java index 4b927fa4fc69..b972d944a129 100644 --- a/dora/core/client/fs/src/main/java/alluxio/client/file/cache/store/LocalPageStore.java +++ b/dora/core/client/fs/src/main/java/alluxio/client/file/cache/store/LocalPageStore.java @@ -15,6 +15,7 @@ import alluxio.client.file.cache.PageId; import alluxio.client.file.cache.PageStore; +import alluxio.exception.PageCorruptedException; import alluxio.exception.PageNotFoundException; import alluxio.exception.status.ResourceExhaustedException; import alluxio.file.ReadTargetBuffer; @@ -100,11 +101,15 @@ public int get(PageId pageId, int pageOffset, int bytesToRead, ReadTargetBuffer } Path pagePath = getPagePath(pageId, isTemporary); try (RandomAccessFile localFile = new RandomAccessFile(pagePath.toString(), "r")) { + long pageLength = localFile.length(); + if (pageOffset + bytesToRead > pageLength) { + throw new PageCorruptedException(String.format( + "The page %s (%s) probably has been corrupted, " + + "page-offset %s, bytes to read %s, page file length %s", + pageId, pagePath, pageOffset, bytesToRead, pageLength)); + } int bytesSkipped = localFile.skipBytes(pageOffset); if (pageOffset != bytesSkipped) { - long pageLength = pagePath.toFile().length(); - Preconditions.checkArgument(pageOffset <= pageLength, - "page offset %s exceeded page size %s", pageOffset, pageLength); throw new IOException( String.format("Failed to read page %s (%s) from offset %s: %s bytes skipped", pageId, pagePath, pageOffset, bytesSkipped)); diff --git a/dora/core/client/fs/src/main/java/alluxio/client/file/cache/store/MemoryPageStore.java b/dora/core/client/fs/src/main/java/alluxio/client/file/cache/store/MemoryPageStore.java index 65dd32101c84..7fde058b6c27 100644 --- a/dora/core/client/fs/src/main/java/alluxio/client/file/cache/store/MemoryPageStore.java +++ b/dora/core/client/fs/src/main/java/alluxio/client/file/cache/store/MemoryPageStore.java @@ -13,6 +13,7 @@ import alluxio.client.file.cache.PageId; import alluxio.client.file.cache.PageStore; +import alluxio.exception.PageCorruptedException; import alluxio.exception.PageNotFoundException; import alluxio.file.ReadTargetBuffer; @@ -68,8 +69,12 @@ public int get(PageId pageId, int pageOffset, int bytesToRead, ReadTargetBuffer throw new PageNotFoundException(pageId.getFileId() + "_" + pageId.getPageIndex()); } MemPage page = mPageStoreMap.get(pageKey); - Preconditions.checkArgument(pageOffset <= page.getPageLength(), - "page offset %s exceeded page size %s", pageOffset, page.getPageLength()); + if (pageOffset + bytesToRead > page.getPageLength()) { + throw new PageCorruptedException(String.format( + "The page %s probably has been corrupted, " + + "page-offset %s, bytes to read %s, page file length %s", + pageId, pageOffset, bytesToRead, page.getPageLength())); + } int bytesLeft = (int) Math.min(page.getPageLength() - pageOffset, target.remaining()); bytesLeft = Math.min(bytesLeft, bytesToRead); target.writeBytes(page.getPage(), pageOffset, bytesLeft); diff --git a/dora/core/client/fs/src/test/java/alluxio/client/file/cache/LocalCacheFileInStreamTest.java b/dora/core/client/fs/src/test/java/alluxio/client/file/cache/LocalCacheFileInStreamTest.java index 6034139bae12..badb1619694a 100644 --- a/dora/core/client/fs/src/test/java/alluxio/client/file/cache/LocalCacheFileInStreamTest.java +++ b/dora/core/client/fs/src/test/java/alluxio/client/file/cache/LocalCacheFileInStreamTest.java @@ -583,6 +583,23 @@ public void testUnbuffer() throws Exception { Assert.assertEquals(1, manager.mPagesServed); } + @Test + public void testPageDataFileCorrupted() throws Exception + { + int pages = 10; + int fileSize = mPageSize * pages; + byte[] testData = BufferUtils.getIncreasingByteArray(fileSize); + ByteArrayCacheManager manager = new ByteArrayCacheManager(); + //by default local cache fallback is not enabled, the read should fail for any error + LocalCacheFileInStream streamWithOutFallback = setupWithSingleFile(testData, manager); + + sConf.set(PropertyKey.USER_CLIENT_CACHE_FALLBACK_ENABLED, true); + LocalCacheFileInStream streamWithFallback = setupWithSingleFile(testData, manager); + Assert.assertEquals(100, streamWithFallback.positionedRead(0, new byte[10], 100, 100)); + Assert.assertEquals(1, + MetricsSystem.counter(MetricKey.CLIENT_CACHE_POSITION_READ_FALLBACK.getName()).getCount()); + } + @Test public void testPositionReadFallBack() throws Exception { diff --git a/dora/core/client/fs/src/test/java/alluxio/client/file/cache/LocalCacheManagerTest.java b/dora/core/client/fs/src/test/java/alluxio/client/file/cache/LocalCacheManagerTest.java index 0d04604c9532..e322688ea909 100644 --- a/dora/core/client/fs/src/test/java/alluxio/client/file/cache/LocalCacheManagerTest.java +++ b/dora/core/client/fs/src/test/java/alluxio/client/file/cache/LocalCacheManagerTest.java @@ -36,6 +36,7 @@ import alluxio.conf.Configuration; import alluxio.conf.InstancedConfiguration; import alluxio.conf.PropertyKey; +import alluxio.exception.PageCorruptedException; import alluxio.exception.PageNotFoundException; import alluxio.exception.status.ResourceExhaustedException; import alluxio.file.ByteArrayTargetBuffer; @@ -981,7 +982,7 @@ public void getTimeout() throws Exception { } @Test - public void getFaultyRead() throws Exception { + public void getFaultyReadWithNoExceptionManager() throws Exception { PageStoreOptions pageStoreOptions = PageStoreOptions.create(mConf).get(0); FaultyPageStore pageStore = new FaultyPageStore(); PageStoreDir dir = @@ -998,6 +999,40 @@ public void getFaultyRead() throws Exception { assertEquals(0, targetBuffer.offset()); } + @Test + public void getFaultyReadWithLocalCacheManager() throws Exception { + PageStoreOptions pageStoreOptions = PageStoreOptions.create(mConf).get(0); + FaultyPageStore pageStore = new FaultyPageStore(); + PageStoreDir dir = + new LocalPageStoreDir(pageStoreOptions, pageStore, mEvictor); + + mPageMetaStore = new DefaultPageMetaStore(ImmutableList.of(dir)); + LocalCacheManager cacheManager = createLocalCacheManager(mConf, mPageMetaStore); + cacheManager.put(PAGE_ID1, PAGE1); + ByteArrayTargetBuffer targetBuffer = new ByteArrayTargetBuffer(mBuf, 0); + pageStore.setGetFaulty(true); + assertEquals(-1, cacheManager.get(PAGE_ID1, PAGE1.length, + targetBuffer, CacheContext.defaults())); + assertEquals(0, targetBuffer.offset()); + } + + @Test + public void getCorruptedReadWithLocalCacheManager() throws Exception { + PageStoreOptions pageStoreOptions = PageStoreOptions.create(mConf).get(0); + FaultyPageStore pageStore = new FaultyPageStore(); + PageStoreDir dir = + new LocalPageStoreDir(pageStoreOptions, pageStore, mEvictor); + + mPageMetaStore = new DefaultPageMetaStore(ImmutableList.of(dir)); + LocalCacheManager cacheManager = createLocalCacheManager(mConf, mPageMetaStore); + cacheManager.put(PAGE_ID1, PAGE1); + ByteArrayTargetBuffer targetBuffer = new ByteArrayTargetBuffer(mBuf, 0); + pageStore.setGetCorrupted(true); + assertEquals(-1, cacheManager.get(PAGE_ID1, PAGE1.length, + targetBuffer, CacheContext.defaults())); + assertEquals(0, targetBuffer.offset()); + } + @Test public void deleteTimeout() throws Exception { mConf.set(PropertyKey.USER_CLIENT_CACHE_TIMEOUT_DURATION, "2s"); @@ -1157,6 +1192,8 @@ public FaultyPageStore() { private AtomicBoolean mDeleteFaulty = new AtomicBoolean(false); private AtomicBoolean mGetFaulty = new AtomicBoolean(false); + private AtomicBoolean mGetCorrupted = new AtomicBoolean(false); + @Override public int get(PageId pageId, int pageOffset, int bytesToRead, ReadTargetBuffer target, boolean isTemporary) throws IOException, PageNotFoundException { @@ -1164,6 +1201,10 @@ public int get(PageId pageId, int pageOffset, int bytesToRead, ReadTargetBuffer target.offset(target.offset() + 100); throw new IOException("Page read fault"); } + if (mGetCorrupted.get()) { + target.offset(target.offset() + 100); + throw new PageCorruptedException("page corrupted"); + } return super.get(pageId, pageOffset, bytesToRead, target, isTemporary); } @@ -1194,6 +1235,10 @@ void setDeleteFaulty(boolean faulty) { void setGetFaulty(boolean faulty) { mGetFaulty.set(faulty); } + + void setGetCorrupted(boolean faulty) { + mGetCorrupted.set(faulty); + } } /** diff --git a/dora/core/client/fs/src/test/java/alluxio/client/file/cache/store/LocalPageStoreTest.java b/dora/core/client/fs/src/test/java/alluxio/client/file/cache/store/LocalPageStoreTest.java index d07c492ca4a3..3adaeda7f3ad 100644 --- a/dora/core/client/fs/src/test/java/alluxio/client/file/cache/store/LocalPageStoreTest.java +++ b/dora/core/client/fs/src/test/java/alluxio/client/file/cache/store/LocalPageStoreTest.java @@ -15,10 +15,12 @@ import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import alluxio.client.file.cache.PageId; import alluxio.client.file.cache.PageStore; +import alluxio.exception.PageCorruptedException; import alluxio.file.ByteArrayTargetBuffer; import org.junit.Before; @@ -171,12 +173,26 @@ public void cleanFileAndDirectory() throws Exception { assertFalse(Files.exists(p.getParent())); } + @Test + public void testCorruptedPages() throws Exception { + mOptions.setFileBuckets(1); + LocalPageStore pageStore = new LocalPageStore(mOptions); + byte[] buf = new byte[1000]; + PageId id = new PageId("1", 0); + pageStore.put(id, "corrupted".getBytes()); + assertThrows(PageCorruptedException.class, () -> { + //the bytes caller want to read is larger than the page file, mostly means the page corrupted + pageStore.get(id, 0, 100, new ByteArrayTargetBuffer(buf, 0)); + }); + } + private void helloWorldTest(PageStore store) throws Exception { String msg = "Hello, World!"; PageId id = new PageId("0", 0); store.put(id, msg.getBytes()); byte[] buf = new byte[1024]; - assertEquals(msg.getBytes().length, store.get(id, new ByteArrayTargetBuffer(buf, 0))); + assertEquals(msg.getBytes().length, store.get(id, 0, msg.length(), + new ByteArrayTargetBuffer(buf, 0))); assertArrayEquals(msg.getBytes(), Arrays.copyOfRange(buf, 0, msg.getBytes().length)); } } diff --git a/dora/core/client/fs/src/test/java/alluxio/client/file/cache/store/MemoryPageStoreTest.java b/dora/core/client/fs/src/test/java/alluxio/client/file/cache/store/MemoryPageStoreTest.java index ed6f4b2aacb5..e20f11f50e5f 100644 --- a/dora/core/client/fs/src/test/java/alluxio/client/file/cache/store/MemoryPageStoreTest.java +++ b/dora/core/client/fs/src/test/java/alluxio/client/file/cache/store/MemoryPageStoreTest.java @@ -43,7 +43,8 @@ private void helloWorldTest(PageStore store) throws Exception { PageId id = new PageId("0", 0); store.put(id, msg.getBytes()); byte[] buf = new byte[PAGE_SIZE]; - assertEquals(msg.getBytes().length, store.get(id, new ByteArrayTargetBuffer(buf, 0))); + assertEquals(msg.getBytes().length, + store.get(id, 0, msg.length(), new ByteArrayTargetBuffer(buf, 0))); assertArrayEquals(msg.getBytes(), Arrays.copyOfRange(buf, 0, msg.getBytes().length)); } } diff --git a/dora/core/client/fs/src/test/java/alluxio/client/file/cache/store/PageStoreTest.java b/dora/core/client/fs/src/test/java/alluxio/client/file/cache/store/PageStoreTest.java index 40cfe243ce38..3c382d5b4f22 100644 --- a/dora/core/client/fs/src/test/java/alluxio/client/file/cache/store/PageStoreTest.java +++ b/dora/core/client/fs/src/test/java/alluxio/client/file/cache/store/PageStoreTest.java @@ -19,6 +19,7 @@ import alluxio.ProjectConstants; import alluxio.client.file.cache.PageId; import alluxio.client.file.cache.PageStore; +import alluxio.exception.PageCorruptedException; import alluxio.exception.PageNotFoundException; import alluxio.file.ByteArrayTargetBuffer; import alluxio.util.io.BufferUtils; @@ -79,7 +80,8 @@ public void helloWorldTest() throws Exception { PageId id = new PageId("0", 0); mPageStore.put(id, msgBytes); byte[] buf = new byte[1024]; - assertEquals(msgBytes.length, mPageStore.get(id, new ByteArrayTargetBuffer(buf, 0))); + assertEquals(msgBytes.length, + mPageStore.get(id, 0, msgBytes.length, new ByteArrayTargetBuffer(buf, 0))); assertArrayEquals(msgBytes, Arrays.copyOfRange(buf, 0, msgBytes.length)); mPageStore.delete(id); try { @@ -97,7 +99,8 @@ public void getOffset() throws Exception { mPageStore.put(id, BufferUtils.getIncreasingByteArray(len)); byte[] buf = new byte[len]; for (int offset = 1; offset < len; offset++) { - int bytesRead = mPageStore.get(id, offset, len, new ByteArrayTargetBuffer(buf, 0), false); + int bytesRead = mPageStore.get(id, offset, len - offset, + new ByteArrayTargetBuffer(buf, 0), false); assertEquals(len - offset, bytesRead); assertArrayEquals(BufferUtils.getIncreasingByteArray(offset, len - offset), Arrays.copyOfRange(buf, 0, bytesRead)); @@ -111,7 +114,7 @@ public void getOffsetOverflow() throws Exception { PageId id = new PageId("0", 0); mPageStore.put(id, BufferUtils.getIncreasingByteArray(len)); byte[] buf = new byte[1024]; - assertThrows(IllegalArgumentException.class, () -> + assertThrows(PageCorruptedException.class, () -> mPageStore.get(id, offset, len, new ByteArrayTargetBuffer(buf, 0))); } diff --git a/dora/core/common/src/main/java/alluxio/exception/PageCorruptedException.java b/dora/core/common/src/main/java/alluxio/exception/PageCorruptedException.java new file mode 100644 index 000000000000..45f4be35cac9 --- /dev/null +++ b/dora/core/common/src/main/java/alluxio/exception/PageCorruptedException.java @@ -0,0 +1,35 @@ +/* + * The Alluxio Open Foundation licenses this work under the Apache License, version 2.0 + * (the "License"). You may not use this work except in compliance with the License, which is + * available at www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied, as more fully set forth in the License. + * + * See the NOTICE file distributed with this work for information regarding copyright ownership. + */ + +package alluxio.exception; + +/** + * An exception that should be thrown when the data of a page has been corrupted in store. + */ +public class PageCorruptedException extends RuntimeException { + + /** + * Construct PageCorruptedException with the specified message. + * @param message + */ + public PageCorruptedException(String message) { + super(message); + } + + /** + * Construct PageCorruptedException with the specified message and cause. + * @param message + * @param cause + */ + public PageCorruptedException(String message, Throwable cause) { + super(message, cause); + } +}