Skip to content

Commit

Permalink
changes from streaming to single call
Browse files Browse the repository at this point in the history
  • Loading branch information
vmsachin committed Jul 3, 2024
1 parent 79fa7f8 commit cda0148
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 9 deletions.
2 changes: 1 addition & 1 deletion proxy/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<dependency>
<groupId>com.aerospike</groupId>
<artifactId>aerospike-proxy-stub</artifactId>
<version>1.0.2-SNAPSHOT</version>
<version>1.1.0</version>
</dependency>

<dependency>
Expand Down
3 changes: 2 additions & 1 deletion proxy/src/com/aerospike/client/proxy/CommandProxy.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.aerospike.client.proxy.grpc.GrpcConversions;
import com.aerospike.client.proxy.grpc.GrpcStreamingCall;
import com.aerospike.client.util.Util;
import com.aerospike.proxy.client.InfoGrpc;
import com.aerospike.proxy.client.Kvs;
import com.google.protobuf.ByteString;

Expand Down Expand Up @@ -57,7 +58,7 @@ public CommandProxy(
this.numExpectedResponses = numExpectedResponses;
}

final void execute() {
void execute() {
if (policy.totalTimeout > 0) {
deadlineNanos = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(policy.totalTimeout);
sendTimeoutMillis = (policy.socketTimeout > 0 && policy.socketTimeout < policy.totalTimeout)?
Expand Down
79 changes: 72 additions & 7 deletions proxy/src/com/aerospike/client/proxy/InfoCommandProxy.java
Original file line number Diff line number Diff line change
@@ -1,35 +1,41 @@
package com.aerospike.client.proxy;

import com.aerospike.client.AerospikeException;
import com.aerospike.client.Info;
import com.aerospike.client.Log;
import com.aerospike.client.ResultCode;
import com.aerospike.client.cluster.Node;
import com.aerospike.client.command.Command;
import com.aerospike.client.listener.InfoListener;
import com.aerospike.client.policy.InfoPolicy;
import com.aerospike.client.policy.Policy;
import com.aerospike.client.policy.ScanPolicy;
import com.aerospike.client.proxy.grpc.GrpcCallExecutor;
import com.aerospike.client.proxy.grpc.GrpcConversions;
import com.aerospike.client.proxy.grpc.GrpcStreamingCall;
import com.aerospike.proxy.client.AboutGrpc;
import com.aerospike.proxy.client.Kvs;
import com.aerospike.proxy.client.InfoGrpc;
import io.grpc.*;
import io.grpc.stub.StreamObserver;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

public class InfoCommandProxy extends SingleCommandProxy {

private final InfoListener listener;
private final String[] commands;
private Map<String,String> map;

private final InfoPolicy infoPolicy;
private final GrpcCallExecutor executor;
private final MethodDescriptor<Kvs.AerospikeRequestPayload, Kvs.AerospikeResponsePayload> methodDescriptor;
final Policy policy;

public InfoCommandProxy(GrpcCallExecutor executor, InfoListener listener, InfoPolicy policy, String... commands) {
super(InfoGrpc.getInfoMethod(), executor, createPolicy(policy));
this.executor = executor;
this.infoPolicy = policy;
this.listener = listener;
this.commands = commands;
this.methodDescriptor = InfoGrpc.getInfoMethod();
this.policy = createPolicy(policy);
}

private static Policy createPolicy(InfoPolicy policy) {
Expand Down Expand Up @@ -64,9 +70,68 @@ void writeCommand(Command command) {
// Nothing to do since there is no Aerospike payload.
}

@Override
void execute(){
executeCommand();
}

private void executeCommand() {
Kvs.AerospikeRequestPayload.Builder builder = getRequestBuilder();

ManagedChannel channel = executor.getChannel();
InfoGrpc.InfoBlockingStub stub = InfoGrpc.newBlockingStub(channel);
try{
Kvs.AerospikeRequestPayload request = builder.build();
Kvs.AerospikeResponsePayload response = stub.info(request);
inDoubt |= response.getInDoubt();
onResponse(response);
}catch (Throwable t) {
inDoubt = true;
onFailure(t);
} finally {
// Shut down the channel
channel.shutdown();
}
}


@Override
void onFailure(AerospikeException ae) {
listener.onFailure(ae);

}

@Override
void onFailure(Throwable t) {
AerospikeException ae;

try {
if (t instanceof AerospikeException) {
ae = (AerospikeException)t;
ae.setPolicy(policy);
}
else if (t instanceof StatusRuntimeException) {
StatusRuntimeException sre = (StatusRuntimeException)t;
Status.Code code = sre.getStatus().getCode();

if (code == Status.Code.UNAVAILABLE) {
if (retry()) {
return;
}
}
ae = GrpcConversions.toAerospike(sre, policy, 1);
}
else {
ae = new AerospikeException(ResultCode.CLIENT_ERROR, t);
}
}
catch (AerospikeException ae2) {
ae = ae2;
}
catch (Throwable t2) {
ae = new AerospikeException(ResultCode.CLIENT_ERROR, t2);
}

notifyFailure(ae);
}

@Override
Expand Down

0 comments on commit cda0148

Please sign in to comment.