Skip to content

Commit

Permalink
PROTON-2287 Improve Symbol decoding cache
Browse files Browse the repository at this point in the history
  • Loading branch information
franz1981 committed Nov 5, 2020
1 parent a72aff3 commit 478e50f
Showing 8 changed files with 333 additions and 33 deletions.
Original file line number Diff line number Diff line change
@@ -145,6 +145,13 @@ public byte get(int index) {
throw new IndexOutOfBoundsException("The given index is not valid: " + index);
}

return _get(index);
}

/**
* Unchecked ie no bound-checks get
*/
private byte _get(int index) {
byte result = 0;

if (index == position) {
@@ -813,8 +820,8 @@ public int hashCode() {
int remaining = remaining();

if (currentArrayIndex < 0 || remaining <= currentArray.length - currentOffset) {
while (remaining > 0) {
hash = 31 * hash + currentArray[currentOffset + --remaining];
if (remaining > 0) {
hash = Hashing.byteBufferCompatibleHashCode(currentArray, currentOffset, currentOffset + remaining);
}
} else {
hash = hashCodeFromComponents();
@@ -875,7 +882,7 @@ public boolean equals(Object other) {
return true;
}

if (hasArray() || remaining <= currentArray.length - currentOffset) {
if (remaining <= currentArray.length - currentOffset || hasArray()) {
// Either there is only one array, or the span to compare is within a single chunk of this buffer,
// allowing the compare to directly access the underlying array instead of using slower get methods.
return equals(currentArray, currentOffset, remaining, buffer);
@@ -885,6 +892,40 @@ public boolean equals(Object other) {
}

private static boolean equals(byte[] buffer, int start, int length, ReadableBuffer other) {
if (other.hasArray()) {
// fast-path: jdk 11 has a vectorized Arrays::equals for ranged comparisons, but
// sadly JDK 8 nope so let's try to save at least bound checks
final int otherStart = other.arrayOffset() + other.position();
return equals(buffer, start, other.array(), otherStart, length);
} else if (other instanceof ByteBufferReader) {
return rawEquals(buffer, start, length, other.byteBuffer());
}
return rawEquals(buffer, start, length, other);
}

private static boolean uncheckedEquals(byte[] buffer, int start, int length, CompositeReadableBuffer other) {
final int position = other.position();
for (int i = 0; i < length; i++) {
if (buffer[start + i] != other._get(position + i)) {
return false;
}
}
return true;
}

private static boolean uncheckedEquals(CompositeReadableBuffer buffer, ByteBuffer other, int length) {
assert buffer.remaining() >= length;
final int otherPosition = other.position();
final int bufferPosition = buffer.position();
for (int i = 0; i < length; i++) {
if (buffer._get(bufferPosition + i) != other.get(otherPosition + i)) {
return false;
}
}
return true;
}

private static boolean rawEquals(byte[] buffer, int start, int length, ByteBuffer other) {
final int position = other.position();
for (int i = 0; i < length; i++) {
if (buffer[start + i] != other.get(position + i)) {
@@ -894,18 +935,47 @@ private static boolean equals(byte[] buffer, int start, int length, ReadableBuff
return true;
}

private static boolean equals(ReadableBuffer buffer, ReadableBuffer other) {
final int origPos = buffer.position();
try {
for (int i = other.position(); buffer.hasRemaining(); i++) {
if (!equals(buffer.get(), other.get(i))) {
return false;
}
private static boolean rawEquals(byte[] buffer, int start, int length, ReadableBuffer other) {
final int position = other.position();
for (int i = 0; i < length; i++) {
if (buffer[start + i] != other.get(position + i)) {
return false;
}
}
return true;
}

private static boolean equals(byte[] a, int aStart, byte[] b, int bStart, int length) {
for (int i = 0; i < length; i++) {
if (a[aStart + i] != b[bStart + i]) {
return false;
}
return true;
} finally {
buffer.position(origPos);
}
return true;
}

private static boolean equals(CompositeReadableBuffer buffer, ReadableBuffer other) {
final int bufferRemaining = buffer.remaining();
if (other.hasArray()) {
final int otherStart = other.arrayOffset() + other.position();
// check if otherEnd is beyond other limits, because the underline array is just limited by the capacity
if (other.limit() < otherStart + bufferRemaining) {
throw new BufferUnderflowException();
}
return uncheckedEquals(other.array(), otherStart, bufferRemaining, buffer);
}
if (other instanceof ByteBufferReader) {
return uncheckedEquals(buffer, other.byteBuffer(), bufferRemaining);
}
// slow path
final int bufferPosition = buffer.position();
final int otherPosition = other.position();
for (int i = 0; i < bufferRemaining; i++) {
if (buffer._get(bufferPosition + i) != other.get(otherPosition + i)) {
return false;
}
}
return true;
}

@Override
@@ -923,10 +993,6 @@ public String toString() {
return builder.toString();
}

private static boolean equals(byte x, byte y) {
return x == y;
}

private void maybeMoveToNextArray() {
if (currentArray.length == currentOffset) {
if (currentArrayIndex >= 0 && currentArrayIndex < (contents.size() - 1)) {
Original file line number Diff line number Diff line change
@@ -1075,8 +1075,18 @@ void readRaw(final byte[] data, final int offset, final int length)

<V> V readRaw(TypeDecoder<V> decoder, int size)
{
V decode = decoder.decode(this, _buffer.slice().limit(size));
_buffer.position(_buffer.position()+size);
final int originalLimit = _buffer.limit();
final int originalPosition = _buffer.position();
final V decode;
try {
decode = decoder.decode(this, _buffer.limit(originalPosition + size));
} catch (Throwable t) {
_buffer.position(originalPosition);
throw t;
} finally {
_buffer.limit(originalLimit);
}
_buffer.position(originalPosition + size);
return decode;
}

125 changes: 125 additions & 0 deletions proton-j/src/main/java/org/apache/qpid/proton/codec/Hashing.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.proton.codec;

import java.nio.ByteBuffer;

public class Hashing {

private Hashing() {

}

// this const propagation should be already handled by the JIT
// but we do this to make it more readable
private static final int PRIME_1 = 31;
private static final int PRIME_2 = PRIME_1 * PRIME_1;
private static final int PRIME_3 = PRIME_2 * PRIME_1;
private static final int PRIME_4 = PRIME_3 * PRIME_1;
private static final int PRIME_5 = PRIME_4 * PRIME_1;
private static final int PRIME_6 = PRIME_5 * PRIME_1;
private static final int PRIME_7 = PRIME_6 * PRIME_1;
private static final int PRIME_8 = PRIME_7 * PRIME_1;

public static int byteBufferCompatibleHashCode(ByteBuffer byteBuffer) {
if (byteBuffer.hasArray()) {
final int arrayOffset = byteBuffer.arrayOffset();
final int arrayPosition = arrayOffset + byteBuffer.position();
final int arrayLimit = arrayOffset + byteBuffer.limit();
return byteBufferCompatibleHashCode(byteBuffer.array(), arrayPosition, arrayLimit);
}
// direct ByteBuffers does have some heavy-weight bound checks and memory barriers that
// we just hope JIT to be better then us!
return byteBuffer.hashCode();
}

public static int byteBufferCompatibleHashCode(byte[] bytes, int position, int limit) {
int h = 1;
int remaining = limit - position;
if (remaining == 0) {
return h;
}
int index = limit - 1;
// unrolled version
final int bytesCount = remaining & 7;
if (bytesCount > 0) {
assert h == 1;
h = unrolledHashCode(bytes, index, bytesCount, 1);
index -= bytesCount;
}
final long longsCount = remaining >>> 3;
// let's break the data dependency of each per element hash code
// and save bound checks by manual unrolling 8 ops at time
for (int i = 0; i < longsCount; i++) {
final byte b7 = bytes[index];
final byte b6 = bytes[index - 1];
final byte b5 = bytes[index - 2];
final byte b4 = bytes[index - 3];
final byte b3 = bytes[index - 4];
final byte b2 = bytes[index - 5];
final byte b1 = bytes[index - 6];
final byte b0 = bytes[index - 7];
h = PRIME_8 * h +
PRIME_7 * b7 +
PRIME_6 * b6 +
PRIME_5 * b5 +
PRIME_4 * b4 +
PRIME_3 * b3 +
PRIME_2 * b2 +
PRIME_1 * b1 +
b0;
index -= Long.BYTES;
}
return h;
}

private static int unrolledHashCode(byte[] bytes, int index, int bytesCount, int h) {
// there is still the hash data dependency but is more friendly
// then a plain loop, given that we know no loop is needed here
assert bytesCount > 0 && bytesCount < 8;
h = PRIME_1 * h + bytes[index];
if (bytesCount == 1) {
return h;
}
h = PRIME_1 * h + bytes[index - 1];
if (bytesCount == 2) {
return h;
}
h = PRIME_1 * h + bytes[index - 2];
if (bytesCount == 3) {
return h;
}
h = PRIME_1 * h + bytes[index - 3];
if (bytesCount == 4) {
return h;
}
h = PRIME_1 * h + bytes[index - 4];
if (bytesCount == 5) {
return h;
}
h = PRIME_1 * h + bytes[index - 5];
if (bytesCount == 6) {
return h;
}
h = PRIME_1 * h + bytes[index - 6];
return h;
}
}
Original file line number Diff line number Diff line change
@@ -330,7 +330,7 @@ public interface ReadableBuffer {

final class ByteBufferReader implements ReadableBuffer {

private ByteBuffer buffer;
private final ByteBuffer buffer;

public static ByteBufferReader allocate(int size) {
ByteBuffer allocated = ByteBuffer.allocate(size);
@@ -522,7 +522,7 @@ public String toString() {

@Override
public int hashCode() {
return buffer.hashCode();
return Hashing.byteBufferCompatibleHashCode(buffer);
}

@Override
Original file line number Diff line number Diff line change
@@ -35,7 +35,7 @@ public class SymbolType extends AbstractPrimitiveType<Symbol>
private final SymbolEncoding _shortSymbolEncoding;

private final Map<ReadableBuffer, Symbol> _symbolCache = new HashMap<ReadableBuffer, Symbol>();
private DecoderImpl.TypeDecoder<Symbol> _symbolCreator =
private final DecoderImpl.TypeDecoder<Symbol> _symbolCreator =
new DecoderImpl.TypeDecoder<Symbol>()
{
@Override
@@ -44,7 +44,7 @@ public Symbol decode(DecoderImpl decoder, ReadableBuffer buffer)
Symbol symbol = _symbolCache.get(buffer);
if (symbol == null)
{
byte[] bytes = new byte[buffer.limit()];
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);

String str = new String(bytes, ASCII_CHARSET);
2 changes: 1 addition & 1 deletion tests/performance-jmh/pom.xml
Original file line number Diff line number Diff line change
@@ -30,7 +30,7 @@
<name>Proton-J JMH Performance Tests</name>

<properties>
<jmh-version>1.19</jmh-version>
<jmh-version>1.25.2</jmh-version>
</properties>

<dependencies>
Original file line number Diff line number Diff line change
@@ -23,6 +23,7 @@
import org.apache.qpid.proton.codec.ReadableBuffer.ByteBufferReader;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
@@ -43,15 +44,16 @@
@State(Scope.Benchmark)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@Warmup(iterations = 5, time = 1)
@Measurement(iterations = 5, time = 1)
@Warmup(iterations = 5, time = 400, timeUnit = TimeUnit.MILLISECONDS)
@Measurement(iterations = 5, time = 400, timeUnit = TimeUnit.MILLISECONDS)
@Fork(2)
public class CompositeReadableBufferEqualsBenchmark {

private CompositeReadableBuffer composite;
@Param({"8", "64", "1024"})
@Param({"8", "16", "64"})
private int size;
private ReadableBuffer.ByteBufferReader bufferReader;
@Param({"false", "true"})
@Param({ "false", "true" })
private boolean direct;
@Param({"1", "2"})
private int chunks;
@@ -97,11 +99,6 @@ public static void main(String[] args) throws RunnerException {
public static void runBenchmark(Class<?> benchmarkClass) throws RunnerException {
final Options opt = new OptionsBuilder()
.include(benchmarkClass.getSimpleName())
.addProfiler(GCProfiler.class)
.shouldDoGC(true)
.warmupIterations(5)
.measurementIterations(5)
.forks(1)
.build();
new Runner(opt).run();
}
Loading

0 comments on commit 478e50f

Please sign in to comment.