From 79fa7f8cb5771791f2bf19ff7c968cc198a5ff81 Mon Sep 17 00:00:00 2001 From: "sachin.vm" Date: Wed, 29 May 2024 10:13:26 -0700 Subject: [PATCH 1/4] [CLIENT-2776] Support info commands for proxy client --- proxy/pom.xml | 2 +- .../client/proxy/AerospikeClientProxy.java | 6 +- .../client/proxy/InfoCommandProxy.java | 86 +++++++++++++++++++ .../com/aerospike/client/proxy/Parser.java | 17 ++-- .../client/proxy/grpc/GrpcConversions.java | 12 +-- 5 files changed, 111 insertions(+), 12 deletions(-) create mode 100644 proxy/src/com/aerospike/client/proxy/InfoCommandProxy.java diff --git a/proxy/pom.xml b/proxy/pom.xml index 84c64ea2d..319cb25cc 100644 --- a/proxy/pom.xml +++ b/proxy/pom.xml @@ -22,7 +22,7 @@ com.aerospike aerospike-proxy-stub - 1.0.1 + 1.0.2-SNAPSHOT diff --git a/proxy/src/com/aerospike/client/proxy/AerospikeClientProxy.java b/proxy/src/com/aerospike/client/proxy/AerospikeClientProxy.java index 889e1c502..eaef8d072 100644 --- a/proxy/src/com/aerospike/client/proxy/AerospikeClientProxy.java +++ b/proxy/src/com/aerospike/client/proxy/AerospikeClientProxy.java @@ -2536,7 +2536,11 @@ public void dropIndex( */ @Override public void info(EventLoop eventLoop, InfoListener listener, InfoPolicy policy, Node node, String... commands) { - throw new AerospikeException(NotSupported + "info"); + if (policy == null) { + policy = infoPolicyDefault; + } + InfoCommandProxy infoCommand = new InfoCommandProxy(executor, listener, policy, commands); + infoCommand.execute(); } //----------------------------------------------------------------- diff --git a/proxy/src/com/aerospike/client/proxy/InfoCommandProxy.java b/proxy/src/com/aerospike/client/proxy/InfoCommandProxy.java new file mode 100644 index 000000000..a5e352d4f --- /dev/null +++ b/proxy/src/com/aerospike/client/proxy/InfoCommandProxy.java @@ -0,0 +1,86 @@ +package com.aerospike.client.proxy; + +import com.aerospike.client.AerospikeException; +import com.aerospike.client.Info; +import com.aerospike.client.ResultCode; +import com.aerospike.client.cluster.Node; +import com.aerospike.client.command.Command; +import com.aerospike.client.listener.InfoListener; +import com.aerospike.client.policy.InfoPolicy; +import com.aerospike.client.policy.Policy; +import com.aerospike.client.policy.ScanPolicy; +import com.aerospike.client.proxy.grpc.GrpcCallExecutor; +import com.aerospike.client.proxy.grpc.GrpcConversions; +import com.aerospike.proxy.client.Kvs; +import com.aerospike.proxy.client.InfoGrpc; + +import java.util.HashMap; +import java.util.Map; + +public class InfoCommandProxy extends SingleCommandProxy { + + private final InfoListener listener; + private final String[] commands; + private Map map; + + private final InfoPolicy infoPolicy; + + public InfoCommandProxy(GrpcCallExecutor executor, InfoListener listener, InfoPolicy policy, String... commands) { + super(InfoGrpc.getInfoMethod(), executor, createPolicy(policy)); + this.infoPolicy = policy; + this.listener = listener; + this.commands = commands; + } + + private static Policy createPolicy(InfoPolicy policy) { + Policy p = new Policy(); + + if (policy == null) { + p.setTimeout(1000); + } + else { + p.setTimeout(policy.timeout); + } + return p; + } + + @Override + Kvs.AerospikeRequestPayload.Builder getRequestBuilder() { + Kvs.AerospikeRequestPayload.Builder builder = Kvs.AerospikeRequestPayload.newBuilder(); + Kvs.InfoRequest.Builder infoRequestBuilder = Kvs.InfoRequest.newBuilder(); + + if(commands != null){ + for(String command: commands){ + infoRequestBuilder.addCommands(command); + } + } + infoRequestBuilder.setInfoPolicy(GrpcConversions.toGrpc(infoPolicy)); + builder.setInfoRequest(infoRequestBuilder.build()); + return builder; + } + + @Override + void writeCommand(Command command) { + // Nothing to do since there is no Aerospike payload. + } + + @Override + void onFailure(AerospikeException ae) { + listener.onFailure(ae); + } + + @Override + void parseResult(Parser parser) { + int resultCode = parser.parseResultCode(); + if (resultCode != ResultCode.OK) { + throw new AerospikeException(resultCode); + } + Map infoCommandResponse = parser.parseInfoResult(); + try { + listener.onSuccess(infoCommandResponse); + } + catch (Throwable t) { + logOnSuccessError(t); + } + } +} diff --git a/proxy/src/com/aerospike/client/proxy/Parser.java b/proxy/src/com/aerospike/client/proxy/Parser.java index 940c6f82d..dd3a47c59 100644 --- a/proxy/src/com/aerospike/client/proxy/Parser.java +++ b/proxy/src/com/aerospike/client/proxy/Parser.java @@ -16,18 +16,16 @@ */ package com.aerospike.client.proxy; +import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; import java.util.zip.DataFormatException; import java.util.zip.Inflater; +import com.aerospike.client.*; +import com.aerospike.client.Record; import org.luaj.vm2.LuaValue; -import com.aerospike.client.AerospikeException; -import com.aerospike.client.Key; -import com.aerospike.client.Record; -import com.aerospike.client.ResultCode; -import com.aerospike.client.Value; import com.aerospike.client.command.Buffer; import com.aerospike.client.command.Command; import com.aerospike.client.command.Command.OpResults; @@ -172,6 +170,15 @@ public Key parseKey(BVal bVal) { return new Key(namespace, digest, setName, userKey); } + public Map parseInfoResult(){ + HashMap responses; + Info info = new Info(buffer, receiveSize); + responses = info.parseMultiResponse(); + return responses; + } + + + public Record parseRecord(boolean isOperation) { Map bins = new LinkedHashMap<>(); diff --git a/proxy/src/com/aerospike/client/proxy/grpc/GrpcConversions.java b/proxy/src/com/aerospike/client/proxy/grpc/GrpcConversions.java index 33c7fa1c3..6f1c6e140 100644 --- a/proxy/src/com/aerospike/client/proxy/grpc/GrpcConversions.java +++ b/proxy/src/com/aerospike/client/proxy/grpc/GrpcConversions.java @@ -20,11 +20,7 @@ import com.aerospike.client.Operation; import com.aerospike.client.ResultCode; import com.aerospike.client.Value; -import com.aerospike.client.policy.Policy; -import com.aerospike.client.policy.QueryDuration; -import com.aerospike.client.policy.QueryPolicy; -import com.aerospike.client.policy.ScanPolicy; -import com.aerospike.client.policy.WritePolicy; +import com.aerospike.client.policy.*; import com.aerospike.client.query.Filter; import com.aerospike.client.query.PartitionFilter; import com.aerospike.client.query.PartitionStatus; @@ -139,6 +135,12 @@ public static Kvs.QueryPolicy toGrpc(QueryPolicy queryPolicy) { return queryPolicyBuilder.build(); } + public static Kvs.InfoPolicy toGrpc(InfoPolicy infoPolicy){ + Kvs.InfoPolicy.Builder infoPolicyBuilder = Kvs.InfoPolicy.newBuilder(); + infoPolicyBuilder.setTimeout(infoPolicy.timeout); + return infoPolicyBuilder.build(); + } + /** * Convert a value to packed bytes. * From cda0148c5605787658aa6c33e0e94133b9a21c34 Mon Sep 17 00:00:00 2001 From: "sachin.vm" Date: Wed, 3 Jul 2024 10:11:16 -0700 Subject: [PATCH 2/4] changes from streaming to single call --- proxy/pom.xml | 2 +- .../aerospike/client/proxy/CommandProxy.java | 3 +- .../client/proxy/InfoCommandProxy.java | 79 +++++++++++++++++-- 3 files changed, 75 insertions(+), 9 deletions(-) diff --git a/proxy/pom.xml b/proxy/pom.xml index 319cb25cc..be91cffc8 100644 --- a/proxy/pom.xml +++ b/proxy/pom.xml @@ -22,7 +22,7 @@ com.aerospike aerospike-proxy-stub - 1.0.2-SNAPSHOT + 1.1.0 diff --git a/proxy/src/com/aerospike/client/proxy/CommandProxy.java b/proxy/src/com/aerospike/client/proxy/CommandProxy.java index a722dca80..684b4802f 100644 --- a/proxy/src/com/aerospike/client/proxy/CommandProxy.java +++ b/proxy/src/com/aerospike/client/proxy/CommandProxy.java @@ -27,6 +27,7 @@ import com.aerospike.client.proxy.grpc.GrpcConversions; import com.aerospike.client.proxy.grpc.GrpcStreamingCall; import com.aerospike.client.util.Util; +import com.aerospike.proxy.client.InfoGrpc; import com.aerospike.proxy.client.Kvs; import com.google.protobuf.ByteString; @@ -57,7 +58,7 @@ public CommandProxy( this.numExpectedResponses = numExpectedResponses; } - final void execute() { + void execute() { if (policy.totalTimeout > 0) { deadlineNanos = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(policy.totalTimeout); sendTimeoutMillis = (policy.socketTimeout > 0 && policy.socketTimeout < policy.totalTimeout)? diff --git a/proxy/src/com/aerospike/client/proxy/InfoCommandProxy.java b/proxy/src/com/aerospike/client/proxy/InfoCommandProxy.java index a5e352d4f..9daad38fe 100644 --- a/proxy/src/com/aerospike/client/proxy/InfoCommandProxy.java +++ b/proxy/src/com/aerospike/client/proxy/InfoCommandProxy.java @@ -1,35 +1,41 @@ package com.aerospike.client.proxy; import com.aerospike.client.AerospikeException; -import com.aerospike.client.Info; +import com.aerospike.client.Log; import com.aerospike.client.ResultCode; -import com.aerospike.client.cluster.Node; import com.aerospike.client.command.Command; import com.aerospike.client.listener.InfoListener; import com.aerospike.client.policy.InfoPolicy; import com.aerospike.client.policy.Policy; -import com.aerospike.client.policy.ScanPolicy; import com.aerospike.client.proxy.grpc.GrpcCallExecutor; import com.aerospike.client.proxy.grpc.GrpcConversions; +import com.aerospike.client.proxy.grpc.GrpcStreamingCall; +import com.aerospike.proxy.client.AboutGrpc; import com.aerospike.proxy.client.Kvs; import com.aerospike.proxy.client.InfoGrpc; +import io.grpc.*; +import io.grpc.stub.StreamObserver; -import java.util.HashMap; import java.util.Map; +import java.util.concurrent.TimeUnit; public class InfoCommandProxy extends SingleCommandProxy { private final InfoListener listener; private final String[] commands; - private Map map; - private final InfoPolicy infoPolicy; + private final GrpcCallExecutor executor; + private final MethodDescriptor methodDescriptor; + final Policy policy; public InfoCommandProxy(GrpcCallExecutor executor, InfoListener listener, InfoPolicy policy, String... commands) { super(InfoGrpc.getInfoMethod(), executor, createPolicy(policy)); + this.executor = executor; this.infoPolicy = policy; this.listener = listener; this.commands = commands; + this.methodDescriptor = InfoGrpc.getInfoMethod(); + this.policy = createPolicy(policy); } private static Policy createPolicy(InfoPolicy policy) { @@ -64,9 +70,68 @@ void writeCommand(Command command) { // Nothing to do since there is no Aerospike payload. } + @Override + void execute(){ + executeCommand(); + } + + private void executeCommand() { + Kvs.AerospikeRequestPayload.Builder builder = getRequestBuilder(); + + ManagedChannel channel = executor.getChannel(); + InfoGrpc.InfoBlockingStub stub = InfoGrpc.newBlockingStub(channel); + try{ + Kvs.AerospikeRequestPayload request = builder.build(); + Kvs.AerospikeResponsePayload response = stub.info(request); + inDoubt |= response.getInDoubt(); + onResponse(response); + }catch (Throwable t) { + inDoubt = true; + onFailure(t); + } finally { + // Shut down the channel + channel.shutdown(); + } + } + + @Override void onFailure(AerospikeException ae) { - listener.onFailure(ae); + + } + + @Override + void onFailure(Throwable t) { + AerospikeException ae; + + try { + if (t instanceof AerospikeException) { + ae = (AerospikeException)t; + ae.setPolicy(policy); + } + else if (t instanceof StatusRuntimeException) { + StatusRuntimeException sre = (StatusRuntimeException)t; + Status.Code code = sre.getStatus().getCode(); + + if (code == Status.Code.UNAVAILABLE) { + if (retry()) { + return; + } + } + ae = GrpcConversions.toAerospike(sre, policy, 1); + } + else { + ae = new AerospikeException(ResultCode.CLIENT_ERROR, t); + } + } + catch (AerospikeException ae2) { + ae = ae2; + } + catch (Throwable t2) { + ae = new AerospikeException(ResultCode.CLIENT_ERROR, t2); + } + + notifyFailure(ae); } @Override From 14c64d16693ded149b30198f37fe6a54bc89918d Mon Sep 17 00:00:00 2001 From: Brian Nichols Date: Wed, 3 Jul 2024 16:09:16 -0400 Subject: [PATCH 3/4] Change grpc version to 1.64 to be consistent with stubs. Put info in AsyncPutGet. Fix compile error in InfoCommandProxy. --- .../com/aerospike/examples/AsyncPutGet.java | 26 +++++++++++++++++++ pom.xml | 2 +- .../client/proxy/InfoCommandProxy.java | 1 - 3 files changed, 27 insertions(+), 2 deletions(-) diff --git a/examples/src/com/aerospike/examples/AsyncPutGet.java b/examples/src/com/aerospike/examples/AsyncPutGet.java index 405a12494..ffe0ddfd9 100644 --- a/examples/src/com/aerospike/examples/AsyncPutGet.java +++ b/examples/src/com/aerospike/examples/AsyncPutGet.java @@ -18,6 +18,7 @@ import java.io.IOException; import java.net.ConnectException; +import java.util.Map; import com.aerospike.client.AerospikeException; import com.aerospike.client.Bin; @@ -25,8 +26,10 @@ import com.aerospike.client.Key; import com.aerospike.client.Record; import com.aerospike.client.async.EventLoop; +import com.aerospike.client.listener.InfoListener; import com.aerospike.client.listener.RecordListener; import com.aerospike.client.listener.WriteListener; +import com.aerospike.client.policy.InfoPolicy; public class AsyncPutGet extends AsyncExample { /** @@ -34,11 +37,34 @@ public class AsyncPutGet extends AsyncExample { */ @Override public void runExample(IAerospikeClient client, EventLoop eventLoop) { + InfoListener listener = new InfoListener() { + @Override + public void onSuccess(Map map) { + System.out.println("Success"); + for (String cmd: map.keySet()){ + System.out.println("Command: "+cmd+" Output: "+map.get(cmd)); + } + } + @Override + public void onFailure(AerospikeException e) { + System.err.println("Info command failed: " + e.getMessage()); + } + }; + + String[] commands = new String[] { "build" }; + + InfoPolicy infoPolicy = new InfoPolicy(); + infoPolicy.setTimeout(5000); + + System.out.println("Call async info"); + client.info(eventLoop, listener, infoPolicy, null, commands); + /* Key key = new Key(params.namespace, params.set, "putgetkey"); Bin bin = new Bin("putgetbin", "value"); runPutGetInline(client, eventLoop, key, bin); runPutGetWithRetry(client, eventLoop, key, bin); + */ } // Inline asynchronous put/get calls. diff --git a/pom.xml b/pom.xml index 6c462c20b..0775371bc 100644 --- a/pom.xml +++ b/pom.xml @@ -41,7 +41,7 @@ 4.1.108.Final 2.0.62.Final - 1.59.0 + 1.64.0 3.0.1 0.4 1.7.0 diff --git a/proxy/src/com/aerospike/client/proxy/InfoCommandProxy.java b/proxy/src/com/aerospike/client/proxy/InfoCommandProxy.java index 9daad38fe..bc82cc1b6 100644 --- a/proxy/src/com/aerospike/client/proxy/InfoCommandProxy.java +++ b/proxy/src/com/aerospike/client/proxy/InfoCommandProxy.java @@ -100,7 +100,6 @@ void onFailure(AerospikeException ae) { } - @Override void onFailure(Throwable t) { AerospikeException ae; From 33496931d23abb566b997ed27098f3801d2b0633 Mon Sep 17 00:00:00 2001 From: "sachin.vm" Date: Fri, 5 Jul 2024 10:00:57 -0700 Subject: [PATCH 4/4] [CLIENT-2776] Added the new parser for info command response --- .../client/proxy/InfoCommandProxy.java | 56 ++++++++++++++----- 1 file changed, 43 insertions(+), 13 deletions(-) diff --git a/proxy/src/com/aerospike/client/proxy/InfoCommandProxy.java b/proxy/src/com/aerospike/client/proxy/InfoCommandProxy.java index bc82cc1b6..0d0efc3ce 100644 --- a/proxy/src/com/aerospike/client/proxy/InfoCommandProxy.java +++ b/proxy/src/com/aerospike/client/proxy/InfoCommandProxy.java @@ -1,7 +1,6 @@ package com.aerospike.client.proxy; import com.aerospike.client.AerospikeException; -import com.aerospike.client.Log; import com.aerospike.client.ResultCode; import com.aerospike.client.command.Command; import com.aerospike.client.listener.InfoListener; @@ -9,15 +8,14 @@ import com.aerospike.client.policy.Policy; import com.aerospike.client.proxy.grpc.GrpcCallExecutor; import com.aerospike.client.proxy.grpc.GrpcConversions; -import com.aerospike.client.proxy.grpc.GrpcStreamingCall; -import com.aerospike.proxy.client.AboutGrpc; import com.aerospike.proxy.client.Kvs; import com.aerospike.proxy.client.InfoGrpc; import io.grpc.*; -import io.grpc.stub.StreamObserver; +import java.util.HashMap; import java.util.Map; -import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; +import java.util.regex.Pattern; public class InfoCommandProxy extends SingleCommandProxy { @@ -25,7 +23,6 @@ public class InfoCommandProxy extends SingleCommandProxy { private final String[] commands; private final InfoPolicy infoPolicy; private final GrpcCallExecutor executor; - private final MethodDescriptor methodDescriptor; final Policy policy; public InfoCommandProxy(GrpcCallExecutor executor, InfoListener listener, InfoPolicy policy, String... commands) { @@ -34,7 +31,6 @@ public InfoCommandProxy(GrpcCallExecutor executor, InfoListener listener, InfoPo this.infoPolicy = policy; this.listener = listener; this.commands = commands; - this.methodDescriptor = InfoGrpc.getInfoMethod(); this.policy = createPolicy(policy); } @@ -134,12 +130,9 @@ else if (t instanceof StatusRuntimeException) { } @Override - void parseResult(Parser parser) { - int resultCode = parser.parseResultCode(); - if (resultCode != ResultCode.OK) { - throw new AerospikeException(resultCode); - } - Map infoCommandResponse = parser.parseInfoResult(); + void onResponse(Kvs.AerospikeResponsePayload response){ + String infoResponse = String.valueOf(response.getPayload()); + Map infoCommandResponse = createInfoMap(infoResponse); try { listener.onSuccess(infoCommandResponse); } @@ -147,4 +140,41 @@ void parseResult(Parser parser) { logOnSuccessError(t); } } + + public static Map createInfoMap(String byteStringRepresentation) { + Map infoMap = new HashMap<>(); + + String contents = getContents(byteStringRepresentation); + + if (contents != null && !contents.isEmpty()) { + String[] commands = contents.split("\\\\n"); + + for (String command : commands) { + String[] keyValue = command.split("\\\\t", 2); + + if (keyValue.length == 2) { + infoMap.put(keyValue[0], keyValue[1]); + } + } + } + return infoMap; + } + + public static String getContents(String byteStringRepresentation) { + String regex = "contents=\"(.*?)\""; + + Pattern pattern = Pattern.compile(regex); + Matcher matcher = pattern.matcher(byteStringRepresentation); + + if (matcher.find()) { + return matcher.group(1); + } + + return null; + } + + @Override + void parseResult(Parser parser) { + + } }