Skip to content

Commit

Permalink
GH-5148 improved handling of corrupt spoc/posc/... indexes
Browse files Browse the repository at this point in the history
  • Loading branch information
hmottestad committed Oct 24, 2024
1 parent b6215bb commit 0076003
Show file tree
Hide file tree
Showing 6 changed files with 235 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,17 @@
import org.eclipse.rdf4j.sail.nativerdf.model.CorruptIRI;
import org.eclipse.rdf4j.sail.nativerdf.model.CorruptIRIOrBNode;
import org.eclipse.rdf4j.sail.nativerdf.model.CorruptUnknownValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A statement iterator that wraps a RecordIterator containing statement records and translates these records to
* {@link Statement} objects.
*/
class NativeStatementIterator extends LookAheadIteration<Statement> {

private static final Logger logger = LoggerFactory.getLogger(NativeStatementIterator.class);

/*-----------*
* Variables *
*-----------*/
Expand Down Expand Up @@ -59,7 +63,13 @@ public NativeStatementIterator(RecordIterator btreeIter, ValueStore valueStore)
@Override
public Statement getNextElement() throws SailException {
try {
byte[] nextValue = btreeIter.next();
byte[] nextValue;
try {
nextValue = btreeIter.next();
} catch (AssertionError | Exception e) {
logger.error("Error while reading next value from btree iterator for {}", btreeIter.toString(), e);
throw e;
}

if (nextValue == null) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,9 +287,71 @@ private Set<String> parseIndexSpecList(String indexSpecStr) throws SailException
}

private void initIndexes(Set<String> indexSpecs) throws IOException {

HashSet<String> invalidIndexes = new HashSet<>();

for (String fieldSeq : indexSpecs) {
logger.trace("Initializing index '{}'...", fieldSeq);
indexes.add(new TripleIndex(fieldSeq));
try {
indexes.add(new TripleIndex(fieldSeq, false));
} catch (Exception e) {
if (NativeStore.SOFT_FAIL_ON_CORRUPT_DATA) {
invalidIndexes.add(fieldSeq);
logger.warn("Ignoring index because it failed to initialize index '{}'", fieldSeq, e);
} else {
logger.error(
"Failed to initialize index '{}', consider setting org.eclipse.rdf4j.sail.nativerdf.softFailOnCorruptData to true.",
fieldSeq, e);
throw e;
}

}

}

if (NativeStore.SOFT_FAIL_ON_CORRUPT_DATA) {
indexSpecs.removeAll(invalidIndexes);
}

List<TripleIndex> emptyIndexes = new ArrayList<>();
List<TripleIndex> nonEmptyIndexes = new ArrayList<>();

checkIfIndexesAreEmptyOrNot(nonEmptyIndexes, emptyIndexes);

if (!emptyIndexes.isEmpty() && !nonEmptyIndexes.isEmpty()) {
if (NativeStore.SOFT_FAIL_ON_CORRUPT_DATA) {
indexes.removeAll(emptyIndexes);
} else {
for (TripleIndex index : emptyIndexes) {
throw new IOException("Index '" + new String(index.getFieldSeq())
+ "' is unexpectedly empty while other indexes are not. Consider setting the system property org.eclipse.rdf4j.sail.nativerdf.softFailOnCorruptData to true. Index file: "
+ index.getBTree().getFile().getAbsolutePath());
}
}
}

}

private void checkIfIndexesAreEmptyOrNot(List<TripleIndex> nonEmptyIndexes, List<TripleIndex> emptyIndexes)
throws IOException {
for (TripleIndex index : indexes) {
try (RecordIterator recordIterator = index.getBTree().iterateAll()) {
try {
byte[] next = recordIterator.next();
if (next != null) {
next = recordIterator.next();
if (next != null) {
nonEmptyIndexes.add(index);
} else {
emptyIndexes.add(index);
}
} else {
emptyIndexes.add(index);
}
} catch (Throwable ignored) {
emptyIndexes.add(index);
}
}
}
}

Expand Down Expand Up @@ -355,7 +417,7 @@ private void reindex(Set<String> currentIndexSpecs, Set<String> newIndexSpecs) t
for (String fieldSeq : addedIndexSpecs) {
logger.debug("Initializing new index '{}'...", fieldSeq);

TripleIndex addedIndex = new TripleIndex(fieldSeq);
TripleIndex addedIndex = new TripleIndex(fieldSeq, true);
BTree addedBTree = null;
RecordIterator sourceIter = null;
try {
Expand Down Expand Up @@ -1122,7 +1184,17 @@ private class TripleIndex {

private final BTree btree;

public TripleIndex(String fieldSeq) throws IOException {
public TripleIndex(String fieldSeq, boolean deleteExistingIndexFile) throws IOException {
if (deleteExistingIndexFile) {
File indexFile = new File(dir, getFilenamePrefix(fieldSeq) + ".dat");
if (indexFile.exists()) {
indexFile.delete();
}
File alloxFile = new File(dir, getFilenamePrefix(fieldSeq) + ".alloc");
if (alloxFile.exists()) {
alloxFile.delete();
}
}
tripleComparator = new TripleComparator(fieldSeq);
btree = new BTree(dir, getFilenamePrefix(fieldSeq), 2048, RECORD_LENGTH, tripleComparator, forceSync);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,12 @@ public BTree(File dataDir, String filenamePrefix, int blockSize, int valueSize,
this.valueSize = buf.getInt();
this.rootNodeID = buf.getInt();

if (rootNodeID == 0) {
if (nioFile.size() >= 1024) {
throw new IllegalStateException("Root node ID is 0 but file is not empty");
}
}

if (Arrays.equals(MAGIC_NUMBER, magicNumber)) {
if (version > FILE_FORMAT_VERSION) {
throw new IOException("Unable to read BTree file " + file + "; it uses a newer file format");
Expand Down Expand Up @@ -1117,4 +1123,11 @@ public void print(PrintStream out) throws IOException {
out.println("#values = " + valueCount);
out.println("---end of BTree file---");
}

@Override
public String toString() {
return "BTree{" +
"file=" + getFile() +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -422,4 +422,11 @@ public boolean nodeMergedWith(Node sourceNode, Node targetNode, int mergeIdx) th

return deregister;
}

@Override
public String toString() {
return "RangeIterator{" +
"tree=" + tree +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,15 @@ public void before() throws IOException {
backupFile(dataDir, "values.dat");
backupFile(dataDir, "values.id");
backupFile(dataDir, "values.hash");
backupFile(dataDir, "namespaces.dat");
backupFile(dataDir, "contexts.dat");
backupFile(dataDir, "triples-posc.alloc");
backupFile(dataDir, "triples-posc.dat");
backupFile(dataDir, "triples-spoc.alloc");
backupFile(dataDir, "triples-spoc.dat");

NativeStore.SOFT_FAIL_ON_CORRUPT_DATA = true;

}

public static void overwriteByteInFile(File valuesFile, long pos, int newVal) throws IOException {
Expand Down Expand Up @@ -234,25 +243,136 @@ public void testCorruptValuesIdFile() throws IOException {
@Test
public void testCorruptValuesHashFile() throws IOException {
repo.shutDown();
File valuesHashFile = new File(dataDir, "values.hash");
long fileSize = valuesHashFile.length();
String file = "values.hash";
File nativeStoreFile = new File(dataDir, file);
long fileSize = nativeStoreFile.length();

for (long i = 4; i < fileSize; i++) {
restoreFile(dataDir, file);
overwriteByteInFile(nativeStoreFile, i, 0x0);
repo.init();
List<Statement> list = getStatements();
assertEquals(6, list.size(), "Failed at byte position " + i);
repo.shutDown();
}
}

@Test
public void testCorruptValuesNamespacesFile() throws IOException {
repo.shutDown();
String file = "namespaces.dat";
File nativeStoreFile = new File(dataDir, file);
long fileSize = nativeStoreFile.length();

for (long i = 4; i < fileSize; i++) {
restoreFile(dataDir, file);
overwriteByteInFile(nativeStoreFile, i, 0x0);
repo.init();
List<Statement> list = getStatements();
assertEquals(6, list.size(), "Failed at byte position " + i);
repo.shutDown();
}
}

@Test
public void testCorruptValuesContextsFile() throws IOException {
repo.shutDown();
String file = "contexts.dat";
File nativeStoreFile = new File(dataDir, file);
long fileSize = nativeStoreFile.length();

for (long i = 4; i < fileSize; i++) {
restoreFile(dataDir, file);
overwriteByteInFile(nativeStoreFile, i, 0x0);
repo.init();
List<Statement> list = getStatements();
assertEquals(6, list.size(), "Failed at byte position " + i);
repo.shutDown();
}
}

@Test
public void testCorruptValuesPoscAllocFile() throws IOException {
repo.shutDown();
String file = "triples-posc.alloc";
File nativeStoreFile = new File(dataDir, file);
long fileSize = nativeStoreFile.length();

for (long i = 4; i < fileSize; i++) {
restoreFile(dataDir, "values.hash");
overwriteByteInFile(valuesHashFile, i, 0x0);
restoreFile(dataDir, file);
overwriteByteInFile(nativeStoreFile, i, 0x0);
repo.init();
List<Statement> list = getStatements();
assertEquals(6, list.size(), "Failed at byte position " + i);
repo.shutDown();
}
}

@Test
public void testCorruptValuesPoscDataFile() throws IOException {
repo.shutDown();
String file = "triples-posc.dat";
File nativeStoreFile = new File(dataDir, file);
long fileSize = nativeStoreFile.length();

for (long i = 4; i < fileSize; i++) {
NativeStore.SOFT_FAIL_ON_CORRUPT_DATA = true;
restoreFile(dataDir, file);
overwriteByteInFile(nativeStoreFile, i, 0x0);
repo.init();
List<Statement> list = getStatements();
assertEquals(6, list.size(), "Failed at byte position " + i);
repo.shutDown();
}
}

@Test
public void testCorruptValuesSpocAllocFile() throws IOException {
repo.shutDown();
String file = "triples-spoc.alloc";
File nativeStoreFile = new File(dataDir, file);
long fileSize = nativeStoreFile.length();

for (long i = 4; i < fileSize; i++) {
restoreFile(dataDir, file);
overwriteByteInFile(nativeStoreFile, i, 0x0);
repo.init();
List<Statement> list = getStatements();
assertEquals(6, list.size(), "Failed at byte position " + i);
repo.shutDown();
}
}

@Test
public void testCorruptValuesSpocDataFile() throws IOException {
repo.shutDown();
String file = "triples-spoc.dat";
File nativeStoreFile = new File(dataDir, file);
long fileSize = nativeStoreFile.length();

for (long i = 4; i < fileSize; i++) {
restoreFile(dataDir, file);
overwriteByteInFile(nativeStoreFile, i, 0x0);
repo.init();
try {
List<Statement> list = getStatements();
assertEquals(6, list.size(), "Failed at byte position " + i);
} catch (Throwable ignored) {
repo.shutDown();
nativeStoreFile.delete();
repo.init();
List<Statement> list = getStatements();
assertEquals(6, list.size(), "Failed at byte position " + i);
}

repo.shutDown();
}
}

@NotNull
private List<Statement> getStatements() {
List<Statement> list = new ArrayList<>();

NativeStore.SOFT_FAIL_ON_CORRUPT_DATA = true;

try (RepositoryConnection conn = repo.getConnection()) {
StringWriter stringWriter = new StringWriter();
RDFWriter writer = Rio.createWriter(RDFFormat.NQUADS, stringWriter);
Expand All @@ -266,13 +386,12 @@ private List<Statement> getStatements() {
}
}
return list;
} finally {
NativeStore.SOFT_FAIL_ON_CORRUPT_DATA = false;
}
}

@AfterEach
public void after() throws IOException {
NativeStore.SOFT_FAIL_ON_CORRUPT_DATA = false;
repo.shutDown();
}
}
3 changes: 2 additions & 1 deletion site/content/documentation/programming/repository.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ Repository repo = new SailRepository(new NativeStore());
```

In the unlikely event of corruption the system property `org.eclipse.rdf4j.sail.nativerdf.softFailOnCorruptData` can be set to `true` to
allow the NativeStore to output CorruptValue/CorruptIRI/CorruptIRIOrBNode/CorruptLiteral objects.
allow the NativeStore to output CorruptValue/CorruptIRI/CorruptIRIOrBNode/CorruptLiteral objects. Take a backup of all data before setting
this property as it allows the NativeStore to delete corrupt indexes in an attempt to recreate them. Consider this feature experimental and use with caution.

### Elasticsearch RDF Repository

Expand Down

0 comments on commit 0076003

Please sign in to comment.