Skip to content

Commit

Permalink
[CLIENT-2776] Support info commands for proxy client
Browse files Browse the repository at this point in the history
  • Loading branch information
vmsachin committed May 29, 2024
1 parent b9831bf commit 79fa7f8
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 12 deletions.
2 changes: 1 addition & 1 deletion proxy/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<dependency>
<groupId>com.aerospike</groupId>
<artifactId>aerospike-proxy-stub</artifactId>
<version>1.0.1</version>
<version>1.0.2-SNAPSHOT</version>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2536,7 +2536,11 @@ public void dropIndex(
*/
@Override
public void info(EventLoop eventLoop, InfoListener listener, InfoPolicy policy, Node node, String... commands) {
throw new AerospikeException(NotSupported + "info");
if (policy == null) {
policy = infoPolicyDefault;
}
InfoCommandProxy infoCommand = new InfoCommandProxy(executor, listener, policy, commands);
infoCommand.execute();
}

//-----------------------------------------------------------------
Expand Down
86 changes: 86 additions & 0 deletions proxy/src/com/aerospike/client/proxy/InfoCommandProxy.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package com.aerospike.client.proxy;

import com.aerospike.client.AerospikeException;
import com.aerospike.client.Info;
import com.aerospike.client.ResultCode;
import com.aerospike.client.cluster.Node;
import com.aerospike.client.command.Command;
import com.aerospike.client.listener.InfoListener;
import com.aerospike.client.policy.InfoPolicy;
import com.aerospike.client.policy.Policy;
import com.aerospike.client.policy.ScanPolicy;
import com.aerospike.client.proxy.grpc.GrpcCallExecutor;
import com.aerospike.client.proxy.grpc.GrpcConversions;
import com.aerospike.proxy.client.Kvs;
import com.aerospike.proxy.client.InfoGrpc;

import java.util.HashMap;
import java.util.Map;

public class InfoCommandProxy extends SingleCommandProxy {

private final InfoListener listener;
private final String[] commands;
private Map<String,String> map;

private final InfoPolicy infoPolicy;

public InfoCommandProxy(GrpcCallExecutor executor, InfoListener listener, InfoPolicy policy, String... commands) {
super(InfoGrpc.getInfoMethod(), executor, createPolicy(policy));
this.infoPolicy = policy;
this.listener = listener;
this.commands = commands;
}

private static Policy createPolicy(InfoPolicy policy) {
Policy p = new Policy();

if (policy == null) {
p.setTimeout(1000);
}
else {
p.setTimeout(policy.timeout);
}
return p;
}

@Override
Kvs.AerospikeRequestPayload.Builder getRequestBuilder() {
Kvs.AerospikeRequestPayload.Builder builder = Kvs.AerospikeRequestPayload.newBuilder();
Kvs.InfoRequest.Builder infoRequestBuilder = Kvs.InfoRequest.newBuilder();

if(commands != null){
for(String command: commands){
infoRequestBuilder.addCommands(command);
}
}
infoRequestBuilder.setInfoPolicy(GrpcConversions.toGrpc(infoPolicy));
builder.setInfoRequest(infoRequestBuilder.build());
return builder;
}

@Override
void writeCommand(Command command) {
// Nothing to do since there is no Aerospike payload.
}

@Override
void onFailure(AerospikeException ae) {
listener.onFailure(ae);
}

@Override
void parseResult(Parser parser) {
int resultCode = parser.parseResultCode();
if (resultCode != ResultCode.OK) {
throw new AerospikeException(resultCode);
}
Map<String, String> infoCommandResponse = parser.parseInfoResult();
try {
listener.onSuccess(infoCommandResponse);
}
catch (Throwable t) {
logOnSuccessError(t);
}
}
}
17 changes: 12 additions & 5 deletions proxy/src/com/aerospike/client/proxy/Parser.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,16 @@
*/
package com.aerospike.client.proxy;

import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.zip.DataFormatException;
import java.util.zip.Inflater;

import com.aerospike.client.*;
import com.aerospike.client.Record;
import org.luaj.vm2.LuaValue;

import com.aerospike.client.AerospikeException;
import com.aerospike.client.Key;
import com.aerospike.client.Record;
import com.aerospike.client.ResultCode;
import com.aerospike.client.Value;
import com.aerospike.client.command.Buffer;
import com.aerospike.client.command.Command;
import com.aerospike.client.command.Command.OpResults;
Expand Down Expand Up @@ -172,6 +170,15 @@ public Key parseKey(BVal bVal) {
return new Key(namespace, digest, setName, userKey);
}

public Map<String, String> parseInfoResult(){
HashMap<String, String> responses;
Info info = new Info(buffer, receiveSize);
responses = info.parseMultiResponse();
return responses;
}



public Record parseRecord(boolean isOperation) {
Map<String, Object> bins = new LinkedHashMap<>();

Expand Down
12 changes: 7 additions & 5 deletions proxy/src/com/aerospike/client/proxy/grpc/GrpcConversions.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,7 @@
import com.aerospike.client.Operation;
import com.aerospike.client.ResultCode;
import com.aerospike.client.Value;
import com.aerospike.client.policy.Policy;
import com.aerospike.client.policy.QueryDuration;
import com.aerospike.client.policy.QueryPolicy;
import com.aerospike.client.policy.ScanPolicy;
import com.aerospike.client.policy.WritePolicy;
import com.aerospike.client.policy.*;
import com.aerospike.client.query.Filter;
import com.aerospike.client.query.PartitionFilter;
import com.aerospike.client.query.PartitionStatus;
Expand Down Expand Up @@ -139,6 +135,12 @@ public static Kvs.QueryPolicy toGrpc(QueryPolicy queryPolicy) {
return queryPolicyBuilder.build();
}

public static Kvs.InfoPolicy toGrpc(InfoPolicy infoPolicy){
Kvs.InfoPolicy.Builder infoPolicyBuilder = Kvs.InfoPolicy.newBuilder();
infoPolicyBuilder.setTimeout(infoPolicy.timeout);
return infoPolicyBuilder.build();
}

/**
* Convert a value to packed bytes.
*
Expand Down

0 comments on commit 79fa7f8

Please sign in to comment.