Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package de.javakaffee.web.msm;

import net.spy.memcached.BinaryConnectionFactory;
import net.spy.memcached.DefaultConnectionFactory;

/**
* Created by asodhi on 11/29/2016.
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you're adding class documentation other content would be more useful :-)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will drop the class documentation looks like pretty much self explanatory.

*/
public class MemcachedConnectionFactory implements StorageClientFactory.MemcachedNodeURLBasedConnectionFactory {
@Override
public BinaryConnectionFactory createBinaryConnectionFactory(final long operationTimeout,
final long maxReconnectDelay,
boolean isClientDynamicMode) {
try {
return new BinaryConnectionFactory() {
@Override
public long getOperationTimeout() {
return operationTimeout;
}

@Override
public long getMaxReconnectDelay() {
return maxReconnectDelay;
}
};
} catch (final Exception e) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

try/catch should not be needed here.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure will drop try/catch.

throw new RuntimeException("Could not create memcached client", e);
}
}

@Override
public DefaultConnectionFactory createDefaultConnectionFactory(final long operationTimeout,
final long maxReconnectDelay,
boolean isClientDynamicMode) {
try {
return new DefaultConnectionFactory() {
@Override
public long getOperationTimeout() {
return operationTimeout;
}

@Override
public long getMaxReconnectDelay() {
return maxReconnectDelay;
}
};
} catch (final Exception e) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no try/catch needed.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do.

throw new RuntimeException("Could not create memcached client", e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package de.javakaffee.web.msm;

import net.spy.memcached.BinaryConnectionFactory;
import net.spy.memcached.ConnectionFactory;
import net.spy.memcached.DefaultConnectionFactory;

import java.lang.reflect.*;

/**
* Created by asodhi on 11/29/2016.
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More useful documentation, or drop it?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will drop it.

*/
public class MemcachedElasticConnectionFactory implements StorageClientFactory.MemcachedNodeURLBasedConnectionFactory {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename to MemcachedElastiCacheConnectionFactory?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes agree will rename.


@Override
public ConnectionFactory createBinaryConnectionFactory(final long operationTimeout,
final long maxReconnectDelay,
boolean isClientDynamicMode) {
try {
Class clientMode = Class.forName("net.spy.memcached.ClientMode");
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could also add the elasticache-java-cluster-client maven dependency, so that not that much reflection is needed. We already have dependencies for several client libs...

Copy link

@ashishsodhi ashishsodhi Jan 13, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

elasticache-java-cluster-client and java-memcached-client have method signature which are common but do different things.
net.spy.memcached.BinaryConnectionFactory() would create a standard connection with memcached if java-memcached-client is loaded first,

https://github.com/dustin/java-memcached-client/blob/master/src/main/java/net/spy/memcached/BinaryConnectionFactory.java

same method would create a dynamic connection with aws if elasticache-java-cluster-client is loaded first.
https://github.com/awslabs/aws-elasticache-cluster-client-memcached-for-java/blob/master/src/main/java/net/spy/memcached/BinaryConnectionFactory.java

Enum clientModeEnum = getClientMode(clientMode, isClientDynamicMode);
Class connectionFactoryClass = Class.forName("net.spy.memcached.BinaryConnectionFactory");
Constructor<BinaryConnectionFactory> connectionFactoryClassConstructor = connectionFactoryClass.getConstructor(clientMode);
BinaryConnectionFactory binaryConnectionFactory = connectionFactoryClassConstructor.newInstance(clientModeEnum);
return (ConnectionFactory) Proxy.newProxyInstance(
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are you using/creating a Proxy here?

Copy link

@ashishsodhi ashishsodhi Jan 13, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

trying to override the method public long getOperationTimeout() and public long getMaxReconnectDelay() in the BinaryConnectionFactory class. No easy way to override methods when the class is created by reflection.

binaryConnectionFactory.getClass().getClassLoader(), new Class<?>[] {ConnectionFactory.class},
new ConnectionFactoryInvocationHandler(binaryConnectionFactory,operationTimeout,operationTimeout));
} catch (final Exception e) {
throw new RuntimeException("Could not create binary connection factory", e);
}
}

@Override
public ConnectionFactory createDefaultConnectionFactory(final long operationTimeout,
final long maxReconnectDelay,
boolean isClientDynamicMode) {
try {
Class clientMode = Class.forName("net.spy.memcached.ClientMode");
Enum clientModeEnum = getClientMode(clientMode, isClientDynamicMode);
Class connectionFactoryClass = Class.forName("net.spy.memcached.DefaultConnectionFactory");
Constructor<DefaultConnectionFactory> connectionFactoryClassConstructor = connectionFactoryClass.getConstructor(Object.class);
DefaultConnectionFactory defaultConnectionFactory = connectionFactoryClassConstructor.newInstance(clientModeEnum);
return (ConnectionFactory)Proxy.newProxyInstance(
defaultConnectionFactory.getClass().getClassLoader(), new Class<?>[] {ConnectionFactory.class},
new ConnectionFactoryInvocationHandler(defaultConnectionFactory,operationTimeout,operationTimeout));
} catch (final Exception e) {
throw new RuntimeException("Could not create default connection factory", e);
}
}

private Enum getClientMode(Class clientMode, boolean isClientDynamicMode) {
if (isClientDynamicMode) {
return Enum.valueOf(clientMode, "Dynamic");
} else {
return Enum.valueOf(clientMode, "Static");
}
}
public class ConnectionFactoryInvocationHandler implements InvocationHandler {
private final Method TO_STRING_METHOD = getMethod("toString",
(Class<?>[]) null);

long operationTimeout;
long maxReconnectDelay;
DefaultConnectionFactory defaultConnectionFactory;
public ConnectionFactoryInvocationHandler(DefaultConnectionFactory defaultConnectionFactory,
final long operationTimeout,
final long maxReconnectDelay)
{
this.defaultConnectionFactory=defaultConnectionFactory;
this.operationTimeout = operationTimeout;
this.maxReconnectDelay = maxReconnectDelay;
}

@Override
public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable {

Object returnValue;
try {
if (method.getName().equalsIgnoreCase("getOperationTimeout"))
{
returnValue = operationTimeout;
}else if (method.getName().equalsIgnoreCase("getMaxReconnectDelay"))
{
returnValue = maxReconnectDelay;
}else {
returnValue = method.invoke(defaultConnectionFactory, args);
}
} catch (InvocationTargetException ex) {
throw ex.getTargetException();
}
if (TO_STRING_METHOD.equals(method)) {
return "Proxy (" + returnValue + ")";
}
return returnValue;
}

private Method getMethod(String methodName, Class<?>... paramTypes) {
try {
return Object.class.getMethod(methodName, paramTypes);
} catch (NoSuchMethodException e) {
e.printStackTrace();
} catch (SecurityException e) {
e.printStackTrace();
}
return null;
}
}
}
54 changes: 34 additions & 20 deletions core/src/main/java/de/javakaffee/web/msm/StorageClientFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,15 @@ MemcachedClient createCouchbaseClient(MemcachedNodesManager memcachedNodesManage
long maxReconnectDelay, Statistics statistics );
}

static interface MemcachedNodeURLBasedConnectionFactory {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename to MemcachedConnectionFactory? The implementation class MemcachedConnectionFactory could then be renamed to e.g. StandardMemcachedConnectionFactory.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok will do

ConnectionFactory createBinaryConnectionFactory(long operationTimeout,
long maxReconnectDelay, boolean isClientDynamicMode);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The isClientDynamicMode should not be part of this API, because its specific to the ElastiCache specific implementation.

Copy link

@ashishsodhi ashishsodhi Jan 13, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Normally the client mode is determined by the node name in aws client. If the node name contains ".cfg." it would be a dynamic mode. Either we pass the Boolean or we pass the node name?

How about

ConnectionFactory createBinaryConnectionFactory(long operationTimeout,
long maxReconnectDelay, String node);

or
I could add a parameter to the constructor of MemcachedElastiCacheConnectionFactory?


ConnectionFactory createDefaultConnectionFactory(long operationTimeout,
long maxReconnectDelay, boolean isClientDynamicMode);
}


protected StorageClient createStorageClient(final MemcachedNodesManager memcachedNodesManager,
final String memcachedProtocol, final String username, final String password, final long operationTimeout,
final long maxReconnectDelay, final Statistics statistics ) {
Expand Down Expand Up @@ -78,9 +87,30 @@ static MemcachedClient createCouchbaseClient(final MemcachedNodesManager memcach
}
}

protected static MemcachedNodeURLBasedConnectionFactory createMemcachedNodeURLBasedConnectionFactory() {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename to createMemcachedConnectionFactory?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

try {
Class.forName("net.spy.memcached.ClientMode");
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be extracted into a method like isElastiCacheClientLib (which also hides exception handling), so that here would be a simple if/else.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure will make the change.

return Class.forName("de.javakaffee.web.msm.MemcachedElasticConnectionFactory").asSubclass(MemcachedNodeURLBasedConnectionFactory.class).newInstance();
} catch (final ClassNotFoundException e) {
try {
return Class.forName("de.javakaffee.web.msm.MemcachedConnectionFactory").asSubclass(MemcachedNodeURLBasedConnectionFactory.class).newInstance();
} catch (final Exception ex)
{
throw new RuntimeException(ex);
}
} catch (final Exception e) {
throw new RuntimeException(e);
}
}

static ConnectionFactory createConnectionFactory(final MemcachedNodesManager memcachedNodesManager,
final ConnectionType connectionType, final String memcachedProtocol, final String username, final String password, final long operationTimeout,
final long maxReconnectDelay, final Statistics statistics ) {
boolean isClientDynamicMode = false;
if (memcachedNodesManager.getMemcachedNodes().contains(".cfg."))
{
isClientDynamicMode=true;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the clientMode set to "Dynamic" statically? And why is this coupled to if (memcachedNodesManager.getMemcachedNodes().contains(".cfg."))?

I'd expect that the environment variable / system property client.mode is evaluated...

And as already said, determining the clientMode should be done inside the MemcachedElastiCacheConnectionFactory.

Copy link

@ashishsodhi ashishsodhi Jan 13, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Normally the client mode is determined by the node name in aws client. If the node name contains ".cfg." it would be a dynamic mode.
Check the implementation in

https://github.com/awslabs/aws-elasticache-cluster-client-memcached-for-java/blob/master/src/main/java/net/spy/memcached/MemcachedClient.java

aws-elasticache-cluster-client-memcachedfor-java -->
/src/main/java/net/spy/memcached/MemcachedClient.java line
...
if(determineClientMode){
if(addrs.size() == 1){
if(addrs.get(0) == null){
throw new NullPointerException("Socket address is null");
}
String hostName = addrs.get(0).getHostName();
//All config endpoints has ".cfg." subdomain in the DNS name.
if(hostName != null && hostName.contains(".cfg.")){
cf = new DefaultConnectionFactory(ClientMode.Dynamic);
}
}
//Fallback to static mode
if(cf == null){
cf = new DefaultConnectionFactory(ClientMode.Static);
}
...

}
if (PROTOCOL_BINARY.equals( memcachedProtocol )) {
if (connectionType.isSASL()) {
final AuthDescriptor authDescriptor = new AuthDescriptor(new String[]{"PLAIN"}, new PlainCallbackHandler(username, password));
Expand All @@ -97,30 +127,14 @@ static ConnectionFactory createConnectionFactory(final MemcachedNodesManager mem
else {
return memcachedNodesManager.isEncodeNodeIdInSessionId() ? new SuffixLocatorBinaryConnectionFactory( memcachedNodesManager,
memcachedNodesManager.getSessionIdFormat(),
statistics, operationTimeout, maxReconnectDelay ) : new BinaryConnectionFactory() {
@Override
public long getOperationTimeout() {
return operationTimeout;
}
@Override
public long getMaxReconnectDelay() {
return maxReconnectDelay;
}
};
statistics, operationTimeout, maxReconnectDelay ) :
createMemcachedNodeURLBasedConnectionFactory().createBinaryConnectionFactory(operationTimeout, maxReconnectDelay,isClientDynamicMode);
}
}
return memcachedNodesManager.isEncodeNodeIdInSessionId()
? new SuffixLocatorConnectionFactory( memcachedNodesManager, memcachedNodesManager.getSessionIdFormat(), statistics, operationTimeout, maxReconnectDelay )
: new DefaultConnectionFactory() {
@Override
public long getOperationTimeout() {
return operationTimeout;
}
@Override
public long getMaxReconnectDelay() {
return maxReconnectDelay;
}
};
: createMemcachedNodeURLBasedConnectionFactory().createDefaultConnectionFactory(operationTimeout, maxReconnectDelay,isClientDynamicMode);

}

}
Expand Down