diff --git a/examples/src/com/aerospike/examples/AsyncPutGet.java b/examples/src/com/aerospike/examples/AsyncPutGet.java index 405a12494..ffe0ddfd9 100644 --- a/examples/src/com/aerospike/examples/AsyncPutGet.java +++ b/examples/src/com/aerospike/examples/AsyncPutGet.java @@ -18,6 +18,7 @@ import java.io.IOException; import java.net.ConnectException; +import java.util.Map; import com.aerospike.client.AerospikeException; import com.aerospike.client.Bin; @@ -25,8 +26,10 @@ import com.aerospike.client.Key; import com.aerospike.client.Record; import com.aerospike.client.async.EventLoop; +import com.aerospike.client.listener.InfoListener; import com.aerospike.client.listener.RecordListener; import com.aerospike.client.listener.WriteListener; +import com.aerospike.client.policy.InfoPolicy; public class AsyncPutGet extends AsyncExample { /** @@ -34,11 +37,34 @@ public class AsyncPutGet extends AsyncExample { */ @Override public void runExample(IAerospikeClient client, EventLoop eventLoop) { + InfoListener listener = new InfoListener() { + @Override + public void onSuccess(Map map) { + System.out.println("Success"); + for (String cmd: map.keySet()){ + System.out.println("Command: "+cmd+" Output: "+map.get(cmd)); + } + } + @Override + public void onFailure(AerospikeException e) { + System.err.println("Info command failed: " + e.getMessage()); + } + }; + + String[] commands = new String[] { "build" }; + + InfoPolicy infoPolicy = new InfoPolicy(); + infoPolicy.setTimeout(5000); + + System.out.println("Call async info"); + client.info(eventLoop, listener, infoPolicy, null, commands); + /* Key key = new Key(params.namespace, params.set, "putgetkey"); Bin bin = new Bin("putgetbin", "value"); runPutGetInline(client, eventLoop, key, bin); runPutGetWithRetry(client, eventLoop, key, bin); + */ } // Inline asynchronous put/get calls. diff --git a/pom.xml b/pom.xml index 6c462c20b..0775371bc 100644 --- a/pom.xml +++ b/pom.xml @@ -41,7 +41,7 @@ 4.1.108.Final 2.0.62.Final - 1.59.0 + 1.64.0 3.0.1 0.4 1.7.0 diff --git a/proxy/pom.xml b/proxy/pom.xml index 84c64ea2d..be91cffc8 100644 --- a/proxy/pom.xml +++ b/proxy/pom.xml @@ -22,7 +22,7 @@ com.aerospike aerospike-proxy-stub - 1.0.1 + 1.1.0 diff --git a/proxy/src/com/aerospike/client/proxy/AerospikeClientProxy.java b/proxy/src/com/aerospike/client/proxy/AerospikeClientProxy.java index 889e1c502..eaef8d072 100644 --- a/proxy/src/com/aerospike/client/proxy/AerospikeClientProxy.java +++ b/proxy/src/com/aerospike/client/proxy/AerospikeClientProxy.java @@ -2536,7 +2536,11 @@ public void dropIndex( */ @Override public void info(EventLoop eventLoop, InfoListener listener, InfoPolicy policy, Node node, String... commands) { - throw new AerospikeException(NotSupported + "info"); + if (policy == null) { + policy = infoPolicyDefault; + } + InfoCommandProxy infoCommand = new InfoCommandProxy(executor, listener, policy, commands); + infoCommand.execute(); } //----------------------------------------------------------------- diff --git a/proxy/src/com/aerospike/client/proxy/CommandProxy.java b/proxy/src/com/aerospike/client/proxy/CommandProxy.java index a722dca80..684b4802f 100644 --- a/proxy/src/com/aerospike/client/proxy/CommandProxy.java +++ b/proxy/src/com/aerospike/client/proxy/CommandProxy.java @@ -27,6 +27,7 @@ import com.aerospike.client.proxy.grpc.GrpcConversions; import com.aerospike.client.proxy.grpc.GrpcStreamingCall; import com.aerospike.client.util.Util; +import com.aerospike.proxy.client.InfoGrpc; import com.aerospike.proxy.client.Kvs; import com.google.protobuf.ByteString; @@ -57,7 +58,7 @@ public CommandProxy( this.numExpectedResponses = numExpectedResponses; } - final void execute() { + void execute() { if (policy.totalTimeout > 0) { deadlineNanos = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(policy.totalTimeout); sendTimeoutMillis = (policy.socketTimeout > 0 && policy.socketTimeout < policy.totalTimeout)? diff --git a/proxy/src/com/aerospike/client/proxy/InfoCommandProxy.java b/proxy/src/com/aerospike/client/proxy/InfoCommandProxy.java new file mode 100644 index 000000000..0d0efc3ce --- /dev/null +++ b/proxy/src/com/aerospike/client/proxy/InfoCommandProxy.java @@ -0,0 +1,180 @@ +package com.aerospike.client.proxy; + +import com.aerospike.client.AerospikeException; +import com.aerospike.client.ResultCode; +import com.aerospike.client.command.Command; +import com.aerospike.client.listener.InfoListener; +import com.aerospike.client.policy.InfoPolicy; +import com.aerospike.client.policy.Policy; +import com.aerospike.client.proxy.grpc.GrpcCallExecutor; +import com.aerospike.client.proxy.grpc.GrpcConversions; +import com.aerospike.proxy.client.Kvs; +import com.aerospike.proxy.client.InfoGrpc; +import io.grpc.*; + +import java.util.HashMap; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class InfoCommandProxy extends SingleCommandProxy { + + private final InfoListener listener; + private final String[] commands; + private final InfoPolicy infoPolicy; + private final GrpcCallExecutor executor; + final Policy policy; + + public InfoCommandProxy(GrpcCallExecutor executor, InfoListener listener, InfoPolicy policy, String... commands) { + super(InfoGrpc.getInfoMethod(), executor, createPolicy(policy)); + this.executor = executor; + this.infoPolicy = policy; + this.listener = listener; + this.commands = commands; + this.policy = createPolicy(policy); + } + + private static Policy createPolicy(InfoPolicy policy) { + Policy p = new Policy(); + + if (policy == null) { + p.setTimeout(1000); + } + else { + p.setTimeout(policy.timeout); + } + return p; + } + + @Override + Kvs.AerospikeRequestPayload.Builder getRequestBuilder() { + Kvs.AerospikeRequestPayload.Builder builder = Kvs.AerospikeRequestPayload.newBuilder(); + Kvs.InfoRequest.Builder infoRequestBuilder = Kvs.InfoRequest.newBuilder(); + + if(commands != null){ + for(String command: commands){ + infoRequestBuilder.addCommands(command); + } + } + infoRequestBuilder.setInfoPolicy(GrpcConversions.toGrpc(infoPolicy)); + builder.setInfoRequest(infoRequestBuilder.build()); + return builder; + } + + @Override + void writeCommand(Command command) { + // Nothing to do since there is no Aerospike payload. + } + + @Override + void execute(){ + executeCommand(); + } + + private void executeCommand() { + Kvs.AerospikeRequestPayload.Builder builder = getRequestBuilder(); + + ManagedChannel channel = executor.getChannel(); + InfoGrpc.InfoBlockingStub stub = InfoGrpc.newBlockingStub(channel); + try{ + Kvs.AerospikeRequestPayload request = builder.build(); + Kvs.AerospikeResponsePayload response = stub.info(request); + inDoubt |= response.getInDoubt(); + onResponse(response); + }catch (Throwable t) { + inDoubt = true; + onFailure(t); + } finally { + // Shut down the channel + channel.shutdown(); + } + } + + + @Override + void onFailure(AerospikeException ae) { + + } + + void onFailure(Throwable t) { + AerospikeException ae; + + try { + if (t instanceof AerospikeException) { + ae = (AerospikeException)t; + ae.setPolicy(policy); + } + else if (t instanceof StatusRuntimeException) { + StatusRuntimeException sre = (StatusRuntimeException)t; + Status.Code code = sre.getStatus().getCode(); + + if (code == Status.Code.UNAVAILABLE) { + if (retry()) { + return; + } + } + ae = GrpcConversions.toAerospike(sre, policy, 1); + } + else { + ae = new AerospikeException(ResultCode.CLIENT_ERROR, t); + } + } + catch (AerospikeException ae2) { + ae = ae2; + } + catch (Throwable t2) { + ae = new AerospikeException(ResultCode.CLIENT_ERROR, t2); + } + + notifyFailure(ae); + } + + @Override + void onResponse(Kvs.AerospikeResponsePayload response){ + String infoResponse = String.valueOf(response.getPayload()); + Map infoCommandResponse = createInfoMap(infoResponse); + try { + listener.onSuccess(infoCommandResponse); + } + catch (Throwable t) { + logOnSuccessError(t); + } + } + + public static Map createInfoMap(String byteStringRepresentation) { + Map infoMap = new HashMap<>(); + + String contents = getContents(byteStringRepresentation); + + if (contents != null && !contents.isEmpty()) { + String[] commands = contents.split("\\\\n"); + + for (String command : commands) { + String[] keyValue = command.split("\\\\t", 2); + + if (keyValue.length == 2) { + infoMap.put(keyValue[0], keyValue[1]); + } + } + } + return infoMap; + } + + public static String getContents(String byteStringRepresentation) { + String regex = "contents=\"(.*?)\""; + + Pattern pattern = Pattern.compile(regex); + Matcher matcher = pattern.matcher(byteStringRepresentation); + + if (matcher.find()) { + return matcher.group(1); + } + + return null; + } + + @Override + void parseResult(Parser parser) { + + } +} diff --git a/proxy/src/com/aerospike/client/proxy/Parser.java b/proxy/src/com/aerospike/client/proxy/Parser.java index 940c6f82d..dd3a47c59 100644 --- a/proxy/src/com/aerospike/client/proxy/Parser.java +++ b/proxy/src/com/aerospike/client/proxy/Parser.java @@ -16,18 +16,16 @@ */ package com.aerospike.client.proxy; +import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; import java.util.zip.DataFormatException; import java.util.zip.Inflater; +import com.aerospike.client.*; +import com.aerospike.client.Record; import org.luaj.vm2.LuaValue; -import com.aerospike.client.AerospikeException; -import com.aerospike.client.Key; -import com.aerospike.client.Record; -import com.aerospike.client.ResultCode; -import com.aerospike.client.Value; import com.aerospike.client.command.Buffer; import com.aerospike.client.command.Command; import com.aerospike.client.command.Command.OpResults; @@ -172,6 +170,15 @@ public Key parseKey(BVal bVal) { return new Key(namespace, digest, setName, userKey); } + public Map parseInfoResult(){ + HashMap responses; + Info info = new Info(buffer, receiveSize); + responses = info.parseMultiResponse(); + return responses; + } + + + public Record parseRecord(boolean isOperation) { Map bins = new LinkedHashMap<>(); diff --git a/proxy/src/com/aerospike/client/proxy/grpc/GrpcConversions.java b/proxy/src/com/aerospike/client/proxy/grpc/GrpcConversions.java index 33c7fa1c3..6f1c6e140 100644 --- a/proxy/src/com/aerospike/client/proxy/grpc/GrpcConversions.java +++ b/proxy/src/com/aerospike/client/proxy/grpc/GrpcConversions.java @@ -20,11 +20,7 @@ import com.aerospike.client.Operation; import com.aerospike.client.ResultCode; import com.aerospike.client.Value; -import com.aerospike.client.policy.Policy; -import com.aerospike.client.policy.QueryDuration; -import com.aerospike.client.policy.QueryPolicy; -import com.aerospike.client.policy.ScanPolicy; -import com.aerospike.client.policy.WritePolicy; +import com.aerospike.client.policy.*; import com.aerospike.client.query.Filter; import com.aerospike.client.query.PartitionFilter; import com.aerospike.client.query.PartitionStatus; @@ -139,6 +135,12 @@ public static Kvs.QueryPolicy toGrpc(QueryPolicy queryPolicy) { return queryPolicyBuilder.build(); } + public static Kvs.InfoPolicy toGrpc(InfoPolicy infoPolicy){ + Kvs.InfoPolicy.Builder infoPolicyBuilder = Kvs.InfoPolicy.newBuilder(); + infoPolicyBuilder.setTimeout(infoPolicy.timeout); + return infoPolicyBuilder.build(); + } + /** * Convert a value to packed bytes. *