Skip to content

Commit

Permalink
Add checked exceptions to Runnel decoding and tweak related APIs
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrisGreenaway committed Sep 7, 2018
1 parent ff8ae08 commit 04824ce
Show file tree
Hide file tree
Showing 53 changed files with 819 additions and 413 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public void messageFromServerInformsAboutReconnectedStatus() {
}

@Test
public void supplyReconnectDataAndInformReconnectListenerWhenReconnectionStarts() {
public void supplyReconnectDataAndInformReconnectListenerWhenReconnectionStarts() throws Exception {
when(reconnectDataSupplier.getReconnectData()).thenReturn(new LeaseReconnectData(1));
LeaseEndpointDelegate delegate = new LeaseEndpointDelegate(reconnectListener, reconnectDataSupplier);
byte[] bytes = delegate.createExtendedReconnectData();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.terracotta.runnel.StructBuilder;
import org.terracotta.runnel.decoding.StructDecoder;
import org.terracotta.runnel.encoding.StructEncoder;
import org.terracotta.runnel.utils.RunnelDecodingException;

import java.nio.ByteBuffer;

Expand All @@ -43,9 +44,13 @@ public byte[] encodeMessage(LeaseMessage leaseMessage) throws MessageCodecExcept

@Override
public LeaseMessage decodeMessage(byte[] bytes) throws MessageCodecException {
StructDecoder<Void> decoder = messageStruct.decoder(ByteBuffer.wrap(bytes));
LeaseMessageType type = decoder.<LeaseMessageType>enm("messageType").get();
return type.decode(decoder);
try {
StructDecoder<Void> decoder = messageStruct.decoder(ByteBuffer.wrap(bytes));
LeaseMessageType type = decoder.<LeaseMessageType>mandatoryEnm("messageType").get();
return type.decode(decoder);
} catch (RunnelDecodingException e) {
throw new MessageCodecException(e.getMessage(), e);
}
}

@Override
Expand All @@ -58,9 +63,13 @@ public byte[] encodeResponse(LeaseResponse leaseResponse) throws MessageCodecExc

@Override
public LeaseResponse decodeResponse(byte[] bytes) throws MessageCodecException {
StructDecoder<Void> decoder = responseStruct.decoder(ByteBuffer.wrap(bytes));
LeaseResponseType type = decoder.<LeaseResponseType>enm("responseType").get();
return type.decode(decoder);
try {
StructDecoder<Void> decoder = responseStruct.decoder(ByteBuffer.wrap(bytes));
LeaseResponseType type = decoder.<LeaseResponseType>mandatoryEnm("responseType").get();
return type.decode(decoder);
} catch (RunnelDecodingException e) {
throw new MessageCodecException(e.getMessage(), e);
}
}

private static Struct createMessageStruct() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
package org.terracotta.lease;

import org.terracotta.runnel.decoding.StructDecoder;
import org.terracotta.runnel.utils.RunnelDecodingException;

public interface LeaseMessageDecoder {
LeaseMessage decode(StructDecoder<Void> parentDecoder);
LeaseMessage decode(StructDecoder<Void> parentDecoder) throws RunnelDecodingException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,18 @@
package org.terracotta.lease;

import org.terracotta.runnel.decoding.StructDecoder;
import org.terracotta.runnel.utils.RunnelDecodingException;

enum LeaseMessageType {
LEASE_REQUEST(new LeaseMessageDecoder() {
@Override
public LeaseMessage decode(StructDecoder<Void> parentDecoder) {
public LeaseMessage decode(StructDecoder<Void> parentDecoder) throws RunnelDecodingException {
return LeaseRequest.decode(parentDecoder);
}
}),
LEASE_RECONNECT_FINISHED(new LeaseMessageDecoder() {
@Override
public LeaseMessage decode(StructDecoder<Void> parentDecoder) {
public LeaseMessage decode(StructDecoder<Void> parentDecoder) throws RunnelDecodingException {
return LeaseReconnectFinished.decode(parentDecoder);
}
});
Expand All @@ -37,7 +38,7 @@ public LeaseMessage decode(StructDecoder<Void> parentDecoder) {
this.leaseMessageDecoder = leaseMessageDecoder;
}

public LeaseMessage decode(StructDecoder<Void> decoder) {
public LeaseMessage decode(StructDecoder<Void> decoder) throws RunnelDecodingException {
return leaseMessageDecoder.decode(decoder);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.terracotta.runnel.StructBuilder;
import org.terracotta.runnel.decoding.StructDecoder;
import org.terracotta.runnel.encoding.StructEncoder;
import org.terracotta.runnel.utils.RunnelDecodingException;

import java.nio.ByteBuffer;

Expand Down Expand Up @@ -47,9 +48,9 @@ public byte[] encode() {
return encoder.encode().array();
}

public static LeaseReconnectData decode(byte[] bytes) {
public static LeaseReconnectData decode(byte[] bytes) throws RunnelDecodingException {
StructDecoder<Void> decoder = reconnectStruct.decoder(ByteBuffer.wrap(bytes));
long connectionSequenceNumber = decoder.int64("connectionSequenceNumber");
long connectionSequenceNumber = decoder.mandatoryInt64("connectionSequenceNumber");
return new LeaseReconnectData(connectionSequenceNumber);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.terracotta.runnel.StructBuilder;
import org.terracotta.runnel.decoding.StructDecoder;
import org.terracotta.runnel.encoding.StructEncoder;
import org.terracotta.runnel.utils.RunnelDecodingException;

import java.util.UUID;

Expand Down Expand Up @@ -59,10 +60,10 @@ public void encode(StructEncoder<Void> parentEncoder) {
encoder.end();
}

public static LeaseMessage decode(StructDecoder<Void> parentDecoder) {
StructDecoder<StructDecoder<Void>> decoder = parentDecoder.struct("leaseReconnectFinished");
long uuidMSB = decoder.int64("uuidMSB");
long uuidLSB = decoder.int64("uuidLSB");
public static LeaseMessage decode(StructDecoder<Void> parentDecoder) throws RunnelDecodingException {
StructDecoder<StructDecoder<Void>> decoder = parentDecoder.mandatoryStruct("leaseReconnectFinished");
long uuidMSB = decoder.mandatoryInt64("uuidMSB");
long uuidLSB = decoder.mandatoryInt64("uuidLSB");

UUID uuid = new UUID(uuidMSB, uuidLSB);
return new LeaseReconnectFinished(uuid);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.terracotta.runnel.StructBuilder;
import org.terracotta.runnel.decoding.StructDecoder;
import org.terracotta.runnel.encoding.StructEncoder;
import org.terracotta.runnel.utils.RunnelDecodingException;

/**
* A message to send from the client entity to the server entity to request a lease.
Expand Down Expand Up @@ -54,9 +55,9 @@ public void encode(StructEncoder<Void> parentEncoder) {
encoder.end();
}

public static LeaseMessage decode(StructDecoder<Void> parentDecoder) {
StructDecoder<StructDecoder<Void>> decoder = parentDecoder.struct("leaseRequest");
long connectionSequenceNumber = decoder.int64("connectionSequenceNumber");
public static LeaseMessage decode(StructDecoder<Void> parentDecoder) throws RunnelDecodingException {
StructDecoder<StructDecoder<Void>> decoder = parentDecoder.mandatoryStruct("leaseRequest");
long connectionSequenceNumber = decoder.mandatoryInt64("connectionSequenceNumber");
return new LeaseRequest(connectionSequenceNumber);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.terracotta.runnel.StructBuilder;
import org.terracotta.runnel.decoding.StructDecoder;
import org.terracotta.runnel.encoding.StructEncoder;
import org.terracotta.runnel.utils.RunnelDecodingException;

/**
* A message sent from the the server entity to the client entity to indicate the response to the LeaseRequest.
Expand Down Expand Up @@ -88,11 +89,11 @@ public void encode(StructEncoder<Void> parentEncoder) {
encoder.end();
}

public static LeaseResponse decode(StructDecoder<Void> parentDecoder) {
StructDecoder<StructDecoder<Void>> decoder = parentDecoder.struct("leaseRequestResult");
boolean connectionGood = decoder.bool("connectionGood");
boolean leaseGranted = decoder.bool("leaseGranted");
long leaseLength = decoder.int64("leaseLength");
public static LeaseResponse decode(StructDecoder<Void> parentDecoder) throws RunnelDecodingException {
StructDecoder<StructDecoder<Void>> decoder = parentDecoder.mandatoryStruct("leaseRequestResult");
boolean connectionGood = decoder.mandatoryBool("connectionGood");
boolean leaseGranted = decoder.mandatoryBool("leaseGranted");
long leaseLength = decoder.mandatoryInt64("leaseLength");
return new LeaseRequestResult(connectionGood, leaseGranted, leaseLength);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
package org.terracotta.lease;

import org.terracotta.runnel.decoding.StructDecoder;
import org.terracotta.runnel.utils.RunnelDecodingException;

public interface LeaseResponseDecoder {
LeaseResponse decode(StructDecoder<Void> parentDecoder);
LeaseResponse decode(StructDecoder<Void> parentDecoder) throws RunnelDecodingException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@
package org.terracotta.lease;

import org.terracotta.runnel.decoding.StructDecoder;
import org.terracotta.runnel.utils.RunnelDecodingException;

public enum LeaseResponseType {
LEASE_REQUEST_RESULT(new LeaseResponseDecoder() {
@Override
public LeaseResponse decode(StructDecoder<Void> parentDecoder) {
public LeaseResponse decode(StructDecoder<Void> parentDecoder) throws RunnelDecodingException {
return LeaseRequestResult.decode(parentDecoder);
}
}),
Expand All @@ -43,7 +44,7 @@ public LeaseResponse decode(StructDecoder<Void> parentDecoder) {
this.leaseResponseDecoder = leaseResponseDecoder;
}

public LeaseResponse decode(StructDecoder<Void> decoder) {
public LeaseResponse decode(StructDecoder<Void> decoder) throws RunnelDecodingException {
return leaseResponseDecoder.decode(decoder);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@
import org.terracotta.entity.IEntityMessenger;
import org.terracotta.entity.MessageCodecException;
import org.terracotta.entity.PassiveSynchronizationChannel;
import org.terracotta.entity.ReconnectRejectedException;
import org.terracotta.lease.service.LeaseResult;
import org.terracotta.lease.service.LeaseService;
import org.terracotta.runnel.utils.RunnelDecodingException;

import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -104,7 +106,12 @@ private boolean isLatestConnection(ClientDescriptor clientDescriptor, LeaseReque
@Override
public ActiveServerEntity.ReconnectHandler startReconnect() {
return (ClientDescriptor clientDescriptor, byte[] bytes)->{
LeaseReconnectData reconnectData = LeaseReconnectData.decode(bytes);
LeaseReconnectData reconnectData;
try {
reconnectData = LeaseReconnectData.decode(bytes);
} catch (RunnelDecodingException e) {
throw new ReconnectRejectedException(e.getMessage(), e);
}

long connectionSequenceNumber = reconnectData.getConnectionSequenceNumber();
connectionSequenceNumbers.put(clientDescriptor, connectionSequenceNumber);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.times;
Expand All @@ -43,16 +42,16 @@ public void callsCheckLeasesPeriodically() throws Exception {
assertTrue(leaseMonitorThread.isDaemon());
leaseMonitorThread.start();

verify(timeSource, timeout(10_000L).times(1)).sleep(200L);
verify(timeSource, timeout(20_000L).times(1)).sleep(200L);
verify(leaseState, times(1)).checkLeases();

timeSource.tickMillis(200L);

verify(timeSource, timeout(10_000L).times(2)).sleep(200L);
verify(timeSource, timeout(20_000L).times(2)).sleep(200L);
verify(leaseState, times(2)).checkLeases();

leaseMonitorThread.interrupt();
leaseMonitorThread.join(10_000L);
leaseMonitorThread.join(20_000L);
assertFalse(leaseMonitorThread.isAlive());
}
}
5 changes: 3 additions & 2 deletions runnel/src/main/java/org/terracotta/runnel/Struct.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package org.terracotta.runnel;

import org.terracotta.runnel.utils.RunnelDecodingException;
import org.terracotta.runnel.decoding.StructDecoder;
import org.terracotta.runnel.decoding.fields.Field;
import org.terracotta.runnel.decoding.fields.StructField;
Expand Down Expand Up @@ -60,7 +61,7 @@ public StructEncoder<Void> encoder() {
* @param byteBuffer the byte buffer containing the data to be decoded.
* @return the decoder.
*/
public StructDecoder<Void> decoder(ByteBuffer byteBuffer) {
public StructDecoder<Void> decoder(ByteBuffer byteBuffer) throws RunnelDecodingException {
root.checkFullyInitialized();
return new StructDecoder<Void>(root, new ReadBuffer(byteBuffer));
}
Expand All @@ -71,7 +72,7 @@ public StructDecoder<Void> decoder(ByteBuffer byteBuffer) {
* @param byteBuffer the byte buffer containing the data to be dumped.
* @param out the print stream to print to.
*/
public void dump(ByteBuffer byteBuffer, PrintStream out) {
public void dump(ByteBuffer byteBuffer, PrintStream out) throws RunnelDecodingException {
root.checkFullyInitialized();
Map<Integer, Field> fieldsByInteger = root.getMetadata().buildFieldsByIndexMap();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,23 @@

import org.terracotta.runnel.decoding.fields.ValueField;
import org.terracotta.runnel.utils.ReadBuffer;
import org.terracotta.runnel.utils.RunnelDecodingException;

import java.util.NoSuchElementException;

/**
* @author Ludovic Orban
*/
public class ArrayDecoder<T, P> {
public class ArrayDecoder<T, P> implements DecodingIterator<T> {

private final ValueField<T> arrayedField;
private final ReadBuffer readBuffer;
private final P parent;
private final int length;

public ArrayDecoder(ValueField<T> arrayedField, ReadBuffer readBuffer, P parent) {
private int count = 0;

public ArrayDecoder(ValueField<T> arrayedField, ReadBuffer readBuffer, P parent) throws RunnelDecodingException {
this.arrayedField = arrayedField;
this.parent = parent;
int size = readBuffer.getVlqInt();
Expand All @@ -37,11 +42,19 @@ public ArrayDecoder(ValueField<T> arrayedField, ReadBuffer readBuffer, P parent)
this.length = readBuffer.getVlqInt();
}

public int length() {
return length;
@Override
public boolean hasNext() {
return count < length;
}

public T value() {
@Override
public T next() throws RunnelDecodingException {
if (count >= length) {
throw new NoSuchElementException();
}

count++;

return arrayedField.decode(readBuffer);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright Terracotta, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.terracotta.runnel.decoding;

import org.terracotta.runnel.utils.RunnelDecodingException;

public interface DecodingIterator<T> {
boolean hasNext();

T next() throws RunnelDecodingException;
}
Loading

0 comments on commit 04824ce

Please sign in to comment.