Skip to content

Commit

Permalink
Towards impoved caching for binary search
Browse files Browse the repository at this point in the history
  • Loading branch information
Aklakan committed Sep 6, 2024
1 parent bf1924c commit 1c50aa2
Show file tree
Hide file tree
Showing 9 changed files with 118 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ public ReadableChannelOverIterator(ArrayOps<T[]> arrayOps, Iterator<T> it, Runna
this.closeAction = closeAction;
}


public void setCloseAction(Runnable closeAction) {
this.closeAction = closeAction;
}
Expand All @@ -36,7 +35,6 @@ public Iterator<T> getIterator() {
return iterator;
}


@Override
public void closeActual() throws IOException {
if (closeAction != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,11 @@ public static ReadableChannel<byte[]> wrap(InputStream inputStream) {
}

public static <T> ReadableChannel<T[]> wrap(Stream<T> stream, ArrayOps<T[]> arrayOps) {
return new ReadableChannelOverIterator<>(arrayOps, stream.iterator(), stream::close);
return wrap(stream.iterator(), stream::close, arrayOps);
}

public static <T> ReadableChannel<T[]> wrap(Iterator<T> iterator, Runnable closeAction, ArrayOps<T[]> arrayOps) {
return new ReadableChannelOverIterator<>(arrayOps, iterator, closeAction);
}

public static <T extends ReadableChannel<byte[]>> ReadableByteChannelAdapter<T> newChannel(T dataStream) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,11 +158,11 @@ public static void main2(String[] args) throws IOException {

// if (true) { return; }

BinarySearcherOverBlockSource binSearcher = new BinarySearcherOverBlockSource(blockSource, BinSearchLevelCache.noCache(), 10000);

try (BufferedReader br = new BufferedReader(new InputStreamReader(binSearcher.search(lookup)))) {
br.lines().forEach(x -> System.out.println(x));
}
// BinarySearcherOverBlockSource binSearcher = new BinarySearcherOverBlockSource(blockSource, BinSearchLevelCache.noCache(), 10000);
//
// try (BufferedReader br = new BufferedReader(new InputStreamReader(binSearcher.search(lookup)))) {
// br.lines().forEach(x -> System.out.println(x));
// }
}

// public static void main3(String[] args) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public static void main(String[] args) throws IOException {
BinarySearcher bs = BinarySearchBuilder.newBuilder()
.setSource(bz2Path)
.setCodec(new BZip2Codec())
.setBinSearchCache(BinSearchLevelCache.dftCache())
// .setBinSearchCache(BinSearchLevelCache.dftCache())
.build();
// BinarySearcher bs = BinarySearchBuilder.newBuilder()
// .setSource(plainPath)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package org.aksw.commons.io.hadoop.binseach.v2;

import java.util.function.Supplier;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;

public class BinSearchResourceCache {
record CacheEntry(BinSearchLevelCache levelCache, Cache<Long, Block> blockCache) {}

protected Cache<Object, CacheEntry> resourceCache;

/** Factory for the caches of individual resources. */
protected Supplier<CacheEntry> cacheFactory;

public BinSearchResourceCache(int maxCacheSize) {
this(Caffeine.newBuilder().maximumSize(maxCacheSize).build(), () -> {
return new CacheEntry(BinSearchLevelCache.dftCache(), Caffeine.newBuilder().maximumSize(16).build());
});
}

public BinSearchResourceCache(Cache<Object, CacheEntry> resourceCache, Supplier<CacheEntry> cacheFactory) {
this.resourceCache = resourceCache;
this.cacheFactory = cacheFactory;
}

public CacheEntry getOrCreate(Object key) {
return resourceCache.get(key, k -> cacheFactory.get());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@ public static int compareToPrefix(InputStream in, byte[] prefix) throws IOExcept
return cmp;
}


public static InputStream configureStream(SeekableReadableChannel<byte[]> channel, long end, byte[] prefix) throws IOException {
public static InputStream configureStream(
SeekableReadableChannel<byte[]> channel, long end, byte[] prefix, BinSearchLevelCache levelCache) throws IOException {
InputStream result;
SeekableInputStream in = SeekableInputStreams.create(channel);
Match match = BinarySearcherOverPlainSource.binarySearch(in,SearchMode.BOTH, 0, 0, end, (byte)'\n', prefix, BinSearchLevelCache.noCache());
Match match = BinarySearcherOverPlainSource.binarySearch(in,SearchMode.BOTH, 0, 0, end, (byte)'\n', prefix, levelCache);
if (match != null) {
in.position(match.start());

Expand All @@ -73,7 +73,7 @@ public static InputStream configureStream(SeekableReadableChannel<byte[]> channe
new ReadableByteChannelForLinesMatchingPrefix(
SeekableInputStreams.wrap(in), scanState));
} else {
System.err.println("NO MATCH");
System.err.println("NO MATCH for " + new String(prefix));
in.close();
result = InputStream.nullInputStream(); // ReadableChannels.newInputStream(ReadableChannels.limit(in, 0));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
package org.aksw.commons.io.hadoop.binseach.v2;

import java.nio.file.Path;
import java.util.function.Supplier;

import org.aksw.commons.io.binseach.BinarySearcher;
import org.aksw.commons.io.hadoop.binseach.v2.BinSearchResourceCache.CacheEntry;
import org.aksw.commons.io.input.SeekableReadableChannelSource;
import org.apache.hadoop.io.compress.SplittableCompressionCodec;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;

public class BinarySearchBuilder {
protected Path path;
protected SplittableCompressionCodec codec;
protected Cache<Long, Block> blockCache;
protected BinSearchLevelCache binSearchCache;
// protected Cache<Long, Block> blockCache;
//protected BinSearchLevelCache binSearchCache;
protected BinSearchResourceCache resourceCache;

public static BinarySearchBuilder newBuilder() {
return new BinarySearchBuilder();
Expand All @@ -29,39 +31,49 @@ public BinarySearchBuilder setCodec(SplittableCompressionCodec codec) {
return this;
}

public BinarySearchBuilder setBlockCache(Cache<Long, Block> blockCache) {
this.blockCache = blockCache;
public BinarySearchBuilder setResourceCache(BinSearchResourceCache resourceCache) {
this.resourceCache = resourceCache;
return this;
}

public BinarySearchBuilder setBlockCacheSize(int maxSize) {
this.blockCache = Caffeine.newBuilder().maximumSize(maxSize).build();
return this;
}

public BinarySearchBuilder setBinSearchCache(BinSearchLevelCache binSearchCache) {
this.binSearchCache = binSearchCache;
return this;
}
// public BinarySearchBuilder setBlockCache(Cache<Long, Block> blockCache) {
// this.blockCache = blockCache;
// return this;
// }
//
// public BinarySearchBuilder setBlockCacheSize(int maxSize) {
// this.blockCache = Caffeine.newBuilder().maximumSize(maxSize).build();
// return this;
// }
//
// public BinarySearchBuilder setBinSearchCache(BinSearchLevelCache binSearchCache) {
// this.binSearchCache = binSearchCache;
// return this;
// }

public BinarySearcher build() {
BinarySearcher result;

BinSearchLevelCache finalBinSearchCache = binSearchCache != null
? binSearchCache
: BinSearchLevelCache.dftCache();
Supplier<CacheEntry> cacheSupplier = resourceCache != null
? () -> resourceCache.getOrCreate(path)
: () -> new CacheEntry(BinSearchLevelCache.dftCache(), Caffeine.newBuilder().maximumSize(16).build());

// BinSearchLevelCache finalBinSearchCache = binSearchCache != null
// ? binSearchCache
// : BinSearchLevelCache.dftCache();

if (codec == null) {
SeekableReadableChannelSource<byte[]> source = new SeekableReadableChannelSourceOverNio(path);
result = new BinarySearcherOverPlainSource(source, finalBinSearchCache);
result = new BinarySearcherOverPlainSource(source, cacheSupplier);
} else {
BlockSource blockSource = BlockSource.of(path, codec);

Cache<Long, Block> finalBlockCache = blockCache != null
? blockCache
: Caffeine.newBuilder().maximumSize(16).build();
// Cache<Long, Block> finalBlockCache = blockCache != null
// ? blockCache
// : Caffeine.newBuilder().maximumSize(16).build();

result = new BinarySearcherOverBlockSource(blockSource, finalBinSearchCache, finalBlockCache);
result = new BinarySearcherOverBlockSource(blockSource, cacheSupplier);
}
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.function.Supplier;
import java.util.stream.Stream;

import org.aksw.commons.io.binseach.BinarySearcher;
import org.aksw.commons.io.hadoop.SeekableInputStreams;
import org.aksw.commons.io.hadoop.binseach.v2.BinSearchResourceCache.CacheEntry;
import org.aksw.commons.io.input.ReadableChannel;
import org.aksw.commons.io.input.ReadableChannelSources;
import org.aksw.commons.io.input.ReadableChannelSupplier;
Expand All @@ -30,18 +32,18 @@ public class BinarySearcherOverBlockSource
private static final Logger logger = LoggerFactory.getLogger(BinarySearcherOverBlockSource.class);

protected BlockSource blockSource;
protected BinSearchLevelCache cache;
protected Cache<Long, Block> pageCache;
protected Supplier<CacheEntry> cacheSupplier;
// protected BinSearchLevelCache cache;
// protected Cache<Long, Block> pageCache;

public BinarySearcherOverBlockSource(BlockSource blockSource, BinSearchLevelCache cache, int pageCacheSize) {
this(blockSource, cache, Caffeine.newBuilder().maximumSize(pageCacheSize).build());
}
// public BinarySearcherOverBlockSource(BlockSource blockSource, BinSearchLevelCache cache, int pageCacheSize) {
// this(blockSource, cache, Caffeine.newBuilder().maximumSize(pageCacheSize).build());
// }

public BinarySearcherOverBlockSource(BlockSource blockSource, BinSearchLevelCache cache, Cache<Long, Block> pageCache) {
public BinarySearcherOverBlockSource(BlockSource blockSource, Supplier<CacheEntry> cacheSupplier) {
super();
this.blockSource = blockSource;
this.cache = cache;
this.pageCache = pageCache;
this.cacheSupplier = cacheSupplier;
}

@Override
Expand All @@ -50,17 +52,28 @@ public void close() throws Exception {

@Override
public InputStream search(byte[] prefix) throws IOException {
CacheEntry cacheEntry = cacheSupplier.get();
BinSearchLevelCache levelCache = cacheEntry == null ? null : cacheEntry.levelCache();
if (levelCache == null) {
levelCache = BinSearchLevelCache.noCache();
}

InputStream result;
Match match = binarySearch(blockSource, prefix, cache);
Match match = binarySearch(blockSource, prefix, levelCache);
if (match != null) {
// System.out.println("Match found: " + match);

Cache<Long, Block> blockCache = cacheEntry == null ? null : cacheEntry.blockCache();
if (blockCache == null) {
blockCache = Caffeine.newBuilder().maximumSize(16).build();
}

long startBlockId = match.start();

SeekableReadableChannelOverBlocks channel = new SeekableReadableChannelOverBlocks(blockSource, startBlockId, pageCache);
SeekableReadableChannelOverBlocks channel = new SeekableReadableChannelOverBlocks(blockSource, startBlockId, blockCache);
long blockSize = channel.getStartingBlockSize();
blockSize = 900000;
result = BinSearchUtils.configureStream(channel, blockSize * 2, prefix);
result = BinSearchUtils.configureStream(channel, blockSize * 2, prefix, BinSearchLevelCache.noCache());

boolean showKnownBlocks = false;
if (showKnownBlocks) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
import java.io.InputStream;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.function.Supplier;
import java.util.stream.Stream;

import org.aksw.commons.io.binseach.BinarySearcher;
import org.aksw.commons.io.hadoop.SeekableInputStream;
import org.aksw.commons.io.hadoop.binseach.v2.BinSearchResourceCache.CacheEntry;
import org.aksw.commons.io.input.ReadableChannel;
import org.aksw.commons.io.input.ReadableChannelSources;
import org.aksw.commons.io.input.ReadableChannelSupplier;
Expand All @@ -30,12 +32,12 @@ public class BinarySearcherOverPlainSource
private static final Logger logger = LoggerFactory.getLogger(BinarySearcherOverPlainSource.class);

protected SeekableReadableChannelSource<byte[]> source;
protected BinSearchLevelCache cache;
protected Supplier<CacheEntry> cacheSupplier;

protected BinarySearcherOverPlainSource(SeekableReadableChannelSource<byte[]> source, BinSearchLevelCache cache) {
protected BinarySearcherOverPlainSource(SeekableReadableChannelSource<byte[]> source, Supplier<CacheEntry> cacheSupplier) {
super();
this.source = source;
this.cache = cache;
this.cacheSupplier = cacheSupplier;
}

@Override
Expand All @@ -46,19 +48,25 @@ public static Match binarySearch(SeekableInputStream channel, long end, byte[] p
return binarySearch(channel, SearchMode.BOTH, 0, 0, end, (byte)'\n', prefix, BinSearchLevelCache.noCache());
}

public static BinarySearcherOverPlainSource of(SeekableReadableChannelSource<byte[]> source, BinSearchLevelCache cache) {
return new BinarySearcherOverPlainSource(source, cache);
public static BinarySearcherOverPlainSource of(SeekableReadableChannelSource<byte[]> source, Supplier<CacheEntry> cacheSupplier) {
return new BinarySearcherOverPlainSource(source, cacheSupplier);
}

public static BinarySearcherOverPlainSource of(Path path, BinSearchLevelCache cache) {
return of(new SeekableReadableChannelSourceOverNio(path), cache);
public static BinarySearcherOverPlainSource of(Path path, Supplier<CacheEntry> cacheSupplier) {
return of(new SeekableReadableChannelSourceOverNio(path), cacheSupplier);
}

@Override
public InputStream search(byte[] prefix) throws IOException {
CacheEntry cacheEntry = cacheSupplier.get();
BinSearchLevelCache levelCache = cacheEntry == null ? null : cacheEntry.levelCache();
if (levelCache == null) {
levelCache = BinSearchLevelCache.noCache();
}

SeekableReadableChannel<byte[]> channel = source.newReadableChannel();
long searchRangeEnd = source.size();
InputStream result = BinSearchUtils.configureStream(channel, searchRangeEnd, prefix);
InputStream result = BinSearchUtils.configureStream(channel, searchRangeEnd, prefix, levelCache);
return result;
}

Expand Down

0 comments on commit 1c50aa2

Please sign in to comment.