Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integer responseID without nonBlocking support #53

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 24 additions & 68 deletions src/com/esotericsoftware/kryonet/rmi/ObjectSpace.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

Expand All @@ -42,9 +43,6 @@
* not final (note primitives are final) then an extra byte is written for that parameter.
* @author Nathan Sweet <[email protected]> */
public class ObjectSpace {
static private final byte kReturnValMask = (byte)0x80; // 1000 0000
static private final byte kReturnExMask = (byte)0x40; // 0100 0000

static private final Object instancesLock = new Object();
static ObjectSpace[] instances = new ObjectSpace[0];
static private final HashMap<Class, CachedMethod[]> methodCache = new HashMap();
Expand Down Expand Up @@ -196,21 +194,15 @@ protected void invoke (Connection connection, Object target, InvokeMethod invoke
+ "(" + argString + ")");
}

byte responseID = invokeMethod.responseID;
boolean transmitReturnVal = (responseID & kReturnValMask) == kReturnValMask;
boolean transmitExceptions = (responseID & kReturnExMask) == kReturnExMask;
int responseID = invokeMethod.responseID;

Object result = null;
Object result = null;
Method method = invokeMethod.method;
try {
result = method.invoke(target, invokeMethod.args);
// Catch exceptions caused by the Method#invoke
} catch (InvocationTargetException ex) {
if (transmitExceptions)
result = ex.getCause();
else
throw new RuntimeException("Error invoking method: " + method.getDeclaringClass().getName() + "." + method.getName(),
ex);
result = ex.getCause();
} catch (Exception ex) {
throw new RuntimeException("Error invoking method: " + method.getDeclaringClass().getName() + "." + method.getName(), ex);
}
Expand All @@ -221,12 +213,7 @@ protected void invoke (Connection connection, Object target, InvokeMethod invoke
invokeMethodResult.objectID = invokeMethod.objectID;
invokeMethodResult.responseID = responseID;

// Do not return non-primitives if transmitReturnVal is false
if (!transmitReturnVal && !invokeMethod.method.getReturnType().isPrimitive()) {
invokeMethodResult.result = null;
} else {
invokeMethodResult.result = result;
}
invokeMethodResult.result = result;

int length = connection.sendTCP(invokeMethodResult);
if (DEBUG) debug("kryonet", connection + " sent: " + result + " (" + length + ")");
Expand Down Expand Up @@ -271,13 +258,13 @@ static private class RemoteInvocationHandler implements InvocationHandler {
private boolean nonBlocking = false;
private boolean transmitReturnValue = true;
private boolean transmitExceptions = true;
private Byte lastResponseID;
private byte nextResponseNum = 1;
private AtomicInteger lastResponseID;
private AtomicInteger nextResponseNum = new AtomicInteger(1);
private Listener responseListener;

final ReentrantLock lock = new ReentrantLock();
final Condition responseCondition = lock.newCondition();
final ConcurrentHashMap<Byte, InvokeMethodResult> responseTable = new ConcurrentHashMap();
final ConcurrentHashMap<Integer, InvokeMethodResult> responseTable = new ConcurrentHashMap();

public RemoteInvocationHandler (Connection connection, final int objectID) {
super();
Expand Down Expand Up @@ -316,25 +303,16 @@ public Object invoke (Object proxy, Method method, Object[] args) throws Excepti
} else if (name.equals("setResponseTimeout")) {
timeoutMillis = (Integer)args[0];
return null;
} else if (name.equals("setNonBlocking")) {
nonBlocking = (Boolean)args[0];
return null;
} else if (name.equals("setTransmitReturnValue")) {
transmitReturnValue = (Boolean)args[0];
return null;
} else if (name.equals("setTransmitExceptions")) {
transmitExceptions = (Boolean)args[0];
return null;
} else if (name.equals("waitForLastResponse")) {
if (lastResponseID == null) throw new IllegalStateException("There is no last response to wait for.");
return waitForResponse(lastResponseID);
return waitForResponse(lastResponseID.get());
} else if (name.equals("getLastResponseID")) {
if (lastResponseID == null) throw new IllegalStateException("There is no last response ID.");
return lastResponseID;
return lastResponseID.get();
} else if (name.equals("waitForResponse")) {
if (!transmitReturnValue && !transmitExceptions && nonBlocking)
throw new IllegalStateException("This RemoteObject is currently set to ignore all responses.");
return waitForResponse((Byte)args[0]);
return waitForResponse((Integer)args[0]);
} else if (name.equals("getConnection")) {
return connection;
} else {
Expand All @@ -358,20 +336,13 @@ public Object invoke (Object proxy, Method method, Object[] args) throws Excepti
// The only time a invocation doesn't need a response is if it's async
// and no return values or exceptions are wanted back.
boolean needsResponse = transmitReturnValue || transmitExceptions || !nonBlocking;
if (needsResponse) {
byte responseID;
synchronized (this) {
// Increment the response counter and put it into the first six bits of the responseID byte
responseID = nextResponseNum++;
if (nextResponseNum == 64) nextResponseNum = 1; // Keep number under 2^6, avoid 0 (see else statement below)
}
// Pack return value and exception info into the top two bits
if (transmitReturnValue) responseID |= kReturnValMask;
if (transmitExceptions) responseID |= kReturnExMask;
invokeMethod.responseID = responseID;
} else {
invokeMethod.responseID = 0; // A response info of 0 means to not respond
}
int responseID;
synchronized (this) {
responseID = nextResponseNum.getAndIncrement();
nextResponseNum.compareAndSet(Integer.MAX_VALUE, 0);
}
// Pack return value and exception info into the top two bits
invokeMethod.responseID = responseID;
int length = connection.sendTCP(invokeMethod);
if (DEBUG) {
String argString = "";
Expand All @@ -382,22 +353,7 @@ public Object invoke (Object proxy, Method method, Object[] args) throws Excepti
debug("kryonet", connection + " sent: " + method.getDeclaringClass().getSimpleName() + "#" + method.getName() + "("
+ argString + ") (" + length + ")");
}

if (invokeMethod.responseID != 0) lastResponseID = invokeMethod.responseID;
if (nonBlocking) {
Class returnType = method.getReturnType();
if (returnType.isPrimitive()) {
if (returnType == int.class) return 0;
if (returnType == boolean.class) return Boolean.FALSE;
if (returnType == float.class) return 0f;
if (returnType == char.class) return (char)0;
if (returnType == long.class) return 0l;
if (returnType == short.class) return (short)0;
if (returnType == byte.class) return (byte)0;
if (returnType == double.class) return 0d;
}
return null;
}
lastResponseID = new AtomicInteger(invokeMethod.responseID);
try {
Object result = waitForResponse(invokeMethod.responseID);
if (result != null && result instanceof Exception)
Expand All @@ -409,7 +365,7 @@ public Object invoke (Object proxy, Method method, Object[] args) throws Excepti
}
}

private Object waitForResponse (byte responseID) {
private Object waitForResponse (int responseID) {
if (connection.getEndPoint().getUpdateThread() == Thread.currentThread())
throw new IllegalStateException("Cannot wait for an RMI response on the connection's update thread.");

Expand Down Expand Up @@ -451,7 +407,7 @@ static public class InvokeMethod implements FrameworkMessage, KryoSerializable {
// The top two bytes of the ID indicate if the remote invocation should respond with return values and exceptions,
// respectively. The rest is a six bit counter. This means up to 63 responses can be stored before undefined behavior
// occurs due to possible duplicate IDs.
public byte responseID;
public int responseID;

public void write (Kryo kryo, Output output) {
output.writeInt(objectID, true);
Expand All @@ -477,7 +433,7 @@ public void write (Kryo kryo, Output output) {
kryo.writeClassAndObject(output, args[i]);
}

output.writeByte(responseID);
output.writeInt(responseID);
}

public void read (Kryo kryo, Input input) {
Expand All @@ -503,14 +459,14 @@ public void read (Kryo kryo, Input input) {
args[i] = kryo.readClassAndObject(input);
}

responseID = input.readByte();
responseID = input.readInt();
}
}

/** Internal message to return the result of a remotely invoked method. */
static public class InvokeMethodResult implements FrameworkMessage {
public int objectID;
public byte responseID;
public int responseID;
public Object result;
}

Expand Down
29 changes: 2 additions & 27 deletions src/com/esotericsoftware/kryonet/rmi/RemoteObject.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,45 +10,20 @@ public interface RemoteObject {
/** Sets the milliseconds to wait for a method to return value. Default is 3000. */
public void setResponseTimeout (int timeoutMillis);

/** Sets the blocking behavior when invoking a remote method. Default is false.
* @param nonBlocking If false, the invoking thread will wait for the remote method to return or timeout (default). If true,
* the invoking thread will not wait for a response. The method will return immediately and the return value should
* be ignored. If they are being transmitted, the return value or any thrown exception can later be retrieved with
* {@link #waitForLastResponse()} or {@link #waitForResponse(byte)}. The responses will be stored until retrieved, so
* each method call should have a matching retrieve. */
public void setNonBlocking (boolean nonBlocking);

/** Sets whether return values are sent back when invoking a remote method. Default is true.
* @param transmit If true, then the return value for non-blocking method invocations can be retrieved with
* {@link #waitForLastResponse()} or {@link #waitForResponse(byte)}. If false, then non-primitive return values for
* remote method invocations are not sent by the remote side of the connection and the response can never be
* retrieved. This can also be used to save bandwidth if you will not check the return value of a blocking remote
* invocation. Note that an exception could still be returned by {@link #waitForLastResponse()} or
* {@link #waitForResponse(byte)} if {@link #setTransmitExceptions(boolean)} is true. */
public void setTransmitReturnValue (boolean transmit);

/** Sets whether exceptions are sent back when invoking a remote method. Default is true.
* @param transmit If false, exceptions will be unhandled and rethrown as RuntimeExceptions inside the invoking thread. This is
* the legacy behavior. If true, behavior is dependent on whether {@link #setNonBlocking(boolean)}. If non-blocking
* is true, the exception will be serialized and sent back to the call site of the remotely invoked method, where it
* will be re-thrown. If non-blocking is false, an exception will not be thrown in the calling thread but instead can
* be retrieved with {@link #waitForLastResponse()} or {@link #waitForResponse(byte)}, similar to a return value. */
public void setTransmitExceptions (boolean transmit);

/** Waits for the response to the last method invocation to be received or the response timeout to be reached. Must not be
* called from the connection's update thread.
* @see ObjectSpace#getRemoteObject(com.esotericsoftware.kryonet.Connection, int, Class...) */
public Object waitForLastResponse ();

/** Gets the ID of response for the last method invocation. */
public byte getLastResponseID ();
public int getLastResponseID ();

/** Waits for the specified method invocation response to be received or the response timeout to be reached. Must not be called
* from the connection's update thread. Response IDs use a six bit identifier, with one identifier reserved for "no response".
* This means that this method should be called to get the result for a non-blocking call before an additional 63 non-blocking
* calls are made, or risk undefined behavior due to identical IDs.
* @see ObjectSpace#getRemoteObject(com.esotericsoftware.kryonet.Connection, int, Class...) */
public Object waitForResponse (byte responseID);
public Object waitForResponse (int responseID);

/** Causes this RemoteObject to stop listening to the connection for method invocation response messages. */
public void close ();
Expand Down