Skip to content

Commit

Permalink
Keep track of iterators and close them before closing the DB
Browse files Browse the repository at this point in the history
Before closing the RocksDB database, first all open iterators
are closed before closing the main database to avoid an
inconsistent state with invalid databaset iterators.

Related to issue #113.
  • Loading branch information
aecio committed Apr 23, 2019
1 parent 4e1f46d commit d8fe9c1
Show file tree
Hide file tree
Showing 7 changed files with 134 additions and 89 deletions.
Original file line number Diff line number Diff line change
@@ -1,20 +1,27 @@
package focusedCrawler.util.persistence.rocksdb;

import com.google.common.base.Preconditions;
import focusedCrawler.util.CloseableIterator;
import focusedCrawler.util.KV;
import java.io.Closeable;
import java.io.File;
import java.io.UnsupportedEncodingException;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.rocksdb.Options;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;

import com.google.common.io.BaseEncoding;
import org.rocksdb.RocksIterator;

public abstract class AbstractRocksDbHashtable implements Closeable {

protected Options options;
protected RocksDB db;
private List<RocksDBIterator> iterators = new ArrayList<>();

static {
RocksDB.loadLibrary();
Expand Down Expand Up @@ -60,6 +67,11 @@ protected byte[] getBytes(byte[] keyBytes) {
@Override
public synchronized void close() {
if (db != null) {
for (Iterator<RocksDBIterator> listIt = this.iterators.iterator(); listIt.hasNext(); ) {
RocksDBIterator dbIt = listIt.next();
listIt.remove();
dbIt.close();
}
db.close();
db = null;
options.close();
Expand Down Expand Up @@ -104,4 +116,75 @@ static String bytesToString(byte[] bytes) {
}
}

protected RocksDBIterator openIterator() {
RocksDBIterator it = new RocksDBIterator(this.db);
this.iterators.add(it);
return it;
}

public class RocksDBIterator implements CloseableIterator<KV<byte[], byte[]>> {

private final RocksIterator cursor;
private boolean hasNext;
private boolean isOpen;
private byte[] value;
private byte[] key;
private RocksDB db;

private RocksDBIterator(RocksDB db) {
this.db = db;
this.cursor = db.newIterator();
this.cursor.seekToFirst();
this.isOpen = true;
readNextKV(true);
}

private void readNextKV(boolean firstEntry) {
if (!firstEntry) {
cursor.next();
}
if (cursor.isValid()) {
this.hasNext = true;
this.key = cursor.key();
this.value = cursor.value();
} else {
this.close();
}
}

@Override
public void close() {
if (this.isOpen) {
iterators.remove(this);
cursor.close();
this.isOpen = false;
this.hasNext = false;
}
}

@Override
public boolean hasNext() {
return hasNext;
}

@Override
public KV<byte[], byte[]> next() {
if (!hasNext) {
return null;
}
KV<byte[], byte[]> kv = new KV<>(this.key, this.value);
readNextKV(false);
return kv;
}

public void remove() {
try {
db.delete(key);
} catch (RocksDBException e) {
throw new RuntimeException("Failed to remove entry from RocksDb");
}
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public byte[] get(byte[] key) {

@Override
public CloseableIterator<KV<byte[], byte[]>> iterator() {
return new RocksDBIterator(this.db);
return super.openIterator();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public T get(byte[] key) {

@Override
public CloseableIterator<KV<byte[], T>> iterator() {
return new BytesObjectIterator(new RocksDBIterator(super.db));
return new BytesObjectIterator(super.openIterator());
}

private class BytesObjectIterator
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public T get(String key) {

@Override
public CloseableIterator<KV<String, T>> iterator() {
return new StringObjectIterator(new RocksDBIterator(super.db));
return new StringObjectIterator(super.openIterator());
}

protected class StringObjectIterator
Expand Down
27 changes: 13 additions & 14 deletions src/test/java/focusedCrawler/link/frontier/FrontierTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,20 @@
import focusedCrawler.util.persistence.PersistentHashtable.DB;

public class FrontierTest {

@Rule
public TemporaryFolder tempFolder = new TemporaryFolder();

public Path testPath;

private Frontier frontier;

@Before
public void setUp() throws IOException {
testPath = Paths.get(tempFolder.newFolder().toString());
frontier = new Frontier(testPath.toString(), 1000, DB.ROCKSDB);
}

@After
public void tearDown() throws IOException {
frontier.close();
Expand All @@ -45,44 +45,43 @@ public void shouldInsertUrl() throws Exception {
// given
LinkRelevance link1 = new LinkRelevance(new URL("http://www.example1.com/index.html"), 1);
LinkRelevance link2 = new LinkRelevance(new URL("http://www.example2.com/index.html"), 1);

// when
frontier.insert(link1);

// then
assertThat(frontier.exist(link1), is(1d));
assertThat(frontier.exist(link2), is(nullValue()));
}

@Test
public void shouldInsertUrlsAndSelectGivenNumberOfUrls() throws Exception {
// given
LinkRelevance link1 = new LinkRelevance(new URL("http://www.example1.com/index.html"), 1);
LinkRelevance link2 = new LinkRelevance(new URL("http://www.example2.com/index.html"), 2);

// when
frontier.insert(link1);
frontier.insert(link2);

// then
assertThat(frontier.exist(link1), is(notNullValue()));
assertThat(frontier.exist(link1), is(1d));

assertThat(frontier.exist(link2), is(notNullValue()));
assertThat(frontier.exist(link2), is(2d));
}



@Test
public void shouldInsertAndDeleteUrl() throws Exception {
// given
LinkRelevance link1 = new LinkRelevance(new URL("http://www.example1.com/index.html"), 1);

// when
frontier.insert(link1);
// then
assertThat(frontier.exist(link1), is(1d));

// when
frontier.delete(link1);
// then
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package focusedCrawler.util.persistence;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.junit.Assert.assertThat;

import java.net.URL;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
Expand Down Expand Up @@ -159,4 +161,37 @@ public void shoudNotCrashWhenIterateOverEmptyHashtable() throws Exception {
}
}

@Test
public void shouldNotCrashWhenIterateOnClosedHashtable() throws Exception {
// given
PersistentHashtable<Integer> ht = new PersistentHashtable<>(
tempFolder.newFolder().toString(), 1000, Integer.class, database);

// when
ht.put("1", 1);
ht.put("2", 2);
ht.commit();

TupleIterator<Integer> it;

// when we open an iterator
it = ht.iterator();
// and close the hash table (not the iterator)
ht.close();

// (following calls used to crash the JVM before the bug-fix (issue #113)
boolean hasNext1 = it.hasNext();
Tuple<Integer> next1 = it.next();

boolean hasNext2 = it.hasNext();
Tuple<Integer> next2 = it.next();

// then
assertThat(hasNext1, is(false));
assertThat(next1, is(nullValue()));

assertThat(hasNext2, is(false));
assertThat(next2, is(nullValue()));
}

}

0 comments on commit d8fe9c1

Please sign in to comment.