Skip to content

Commit

Permalink
implements 'scan' command
Browse files Browse the repository at this point in the history
Implements 'scan' command which receives three paramaters

 - table: table name
 - options: Erlang proplist to specify scan options.
 - ref: Erlang reference to distinguish rows from multiple scans

API returns `ok` after starting scan asyncronously. It returns each rows
with {Ref, row, [ColumnList]} messages for each cells. If there is an
error occures during scan, it sends {Ref, error, Name, Msg} and
terminates scan. If all rows are successfully returned, it sends {Ref,
done} message.
  • Loading branch information
yjh0502 committed Jul 25, 2016
1 parent a364d72 commit 44f692c
Show file tree
Hide file tree
Showing 3 changed files with 197 additions and 5 deletions.
152 changes: 152 additions & 0 deletions java_src/main/java/me/cmoz/diver/AsyncScanner.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package me.cmoz.diver;

import com.ericsson.otp.erlang.*;
import com.stumbleupon.async.Callback;
import org.hbase.async.*;

import java.util.ArrayList;

class AsyncScanner implements Callback<Object, ArrayList<ArrayList<KeyValue>>> {
private static final OtpErlangAtom ROW_ATOM = new OtpErlangAtom("row");
private static final OtpErlangAtom DONE_ATOM = new OtpErlangAtom("done");

private final OtpErlangTuple from;
private final OtpMbox mbox;
private final OtpErlangRef ref;
private final Scanner scanner;
int numRows = Integer.MAX_VALUE;

public AsyncScanner(OtpErlangTuple from, OtpMbox mbox, OtpErlangRef ref, Scanner scanner, OtpErlangList options)
throws OtpErlangDecodeException {
this.from = from;
this.mbox = mbox;
this.ref = ref;
this.scanner = scanner;

// prevent returning partial row by default
scanner.setMaxNumKeyValues(-1);

for (final OtpErlangObject option : options) {
final OtpErlangTuple tuple = (OtpErlangTuple) option;
final OtpErlangObject[] tupleElements = tuple.elements();
final String optionName = ((OtpErlangAtom) tupleElements[0]).atomValue();
final OtpErlangObject optionValue = tupleElements[1];

switch(optionName) {
case "num_rows":
numRows = (int)((OtpErlangLong) optionValue).longValue();
scanner.setMaxNumRows(numRows);
break;

// TODO: setFamilies
case "family":
scanner.setFamily(((OtpErlangBinary) optionValue).binaryValue());
break;
// TODO: setFilter
case "key_regexp":
scanner.setKeyRegexp(new String(((OtpErlangBinary) optionValue).binaryValue()));
break;
// TODO: setKeyRegexp(regesp, charset)
case "max_num_bytes":
scanner.setMaxTimestamp(((OtpErlangLong) optionValue).longValue());
break;
case "max_num_keyvalues":
scanner.setMaxNumKeyValues((int)((OtpErlangLong) optionValue).longValue());
break;
case "max_num_rows":
scanner.setMaxNumRows((int)((OtpErlangLong) optionValue).longValue());
break;
case "max_timestamp":
scanner.setMaxTimestamp(((OtpErlangLong) optionValue).longValue());
break;
case "max_versions":
scanner.setMaxVersions((int)((OtpErlangLong) optionValue).longValue());
break;
case "qualifier":
scanner.setQualifier(((OtpErlangBinary) optionValue).binaryValue());
break;
// TODO: setQualifiers
case "server_block_cache":
scanner.setServerBlockCache(((OtpErlangLong) optionValue).longValue() != 0);
break;
case "start_key":
scanner.setStartKey(((OtpErlangBinary) optionValue).binaryValue());
break;
case "stop_key":
scanner.setStopKey(((OtpErlangBinary) optionValue).binaryValue());
break;
case "time_range":
final OtpErlangObject[] timeRangeElems = ((OtpErlangTuple)optionValue).elements();
scanner.setTimeRange(
((OtpErlangLong) timeRangeElems[0]).longValue(),
((OtpErlangLong) timeRangeElems[1]).longValue());
break;
default:
final String message = String.format("Invalid scan option: \"%s\"", tuple);
throw new OtpErlangDecodeException(message);
}
}
}

public void start() {
scanner.nextRows()
.addCallback(this)
.addErrback(new ScannerErrback(from, ref, mbox));
}

@Override
public Object call(ArrayList<ArrayList<KeyValue>> rows) throws Exception {
if (rows == null) {
sendDone();
return null;
}

for(final ArrayList<KeyValue> row : rows) {
sendRow(row);

numRows -= 1;
if(numRows == 0) {
sendDone();
return null;
}
}

scanner.nextRows()
.addCallback(this)
.addErrback(new ScannerErrback(from, ref, mbox));
return null;
}

public void sendDone() {
final OtpErlangObject[] body = new OtpErlangObject[] {
ref,
DONE_ATOM,
};

mbox.send((OtpErlangPid) from.elementAt(0), new OtpErlangTuple(body));
}

public void sendRow(final ArrayList<KeyValue> data) throws Exception {
final OtpErlangObject[] items = new OtpErlangObject[data.size()];
int i = 0;
for (final KeyValue keyValue : data) {
final OtpErlangObject[] erldata = new OtpErlangObject[] {
new OtpErlangBinary(keyValue.key()),
new OtpErlangBinary(keyValue.family()),
new OtpErlangBinary(keyValue.qualifier()),
new OtpErlangBinary(keyValue.value()),
new OtpErlangLong(keyValue.timestamp())
};
items[i] = new OtpErlangTuple(erldata);
i++;
}

final OtpErlangObject[] body = new OtpErlangObject[] {
ref,
ROW_ATOM,
new OtpErlangList(items)
};

mbox.send((OtpErlangPid) from.elementAt(0), new OtpErlangTuple(body));
}
}
31 changes: 31 additions & 0 deletions java_src/main/java/me/cmoz/diver/AsyncScannerErrback.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package me.cmoz.diver;

import com.ericsson.otp.erlang.*;
import com.stumbleupon.async.Callback;
import lombok.RequiredArgsConstructor;

@RequiredArgsConstructor
class ScannerErrback implements Callback<Object, Exception> {

private static final OtpErlangAtom ERROR_ATOM = new OtpErlangAtom("error");

private final OtpErlangTuple from;

private final OtpErlangRef ref;

private final OtpMbox mbox;

@Override
public Object call(final Exception e) throws Exception {
final OtpErlangObject[] body = new OtpErlangObject[] {
ref,
ERROR_ATOM,
new OtpErlangString(e.getClass().getSimpleName()),
new OtpErlangString(e.getLocalizedMessage())
};

mbox.send((OtpErlangPid) from.elementAt(0), new OtpErlangTuple(body));
return null;
}

}
19 changes: 14 additions & 5 deletions java_src/main/java/me/cmoz/diver/JavaServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import com.stumbleupon.async.Callback;
import com.stumbleupon.async.Deferred;
import lombok.extern.slf4j.Slf4j;
import org.hbase.async.HBaseClient;
import org.hbase.async.*;

@Slf4j
class JavaServer extends AbstractExecutionThreadService {
Expand Down Expand Up @@ -97,6 +97,15 @@ private void handleCall(final OtpErlangTuple from, final OtpErlangTuple req)
.addCallback(new GenServerGetCallback(from, mbox))
.addErrback(new GenServerErrback(from, mbox));
break;
case "scan":
final OtpErlangBinary table5 = (OtpErlangBinary) elements[1];
final OtpErlangList options = (OtpErlangList) elements[2];
final OtpErlangRef ref = (OtpErlangRef) elements[3];
final Scanner scanner = hbaseClient.newScanner(table5.binaryValue());
final AsyncScanner asyncScanner = new AsyncScanner(from, mbox, ref, scanner, options);
asyncScanner.start();
reply(from, new OtpErlangAtom("ok"));
break;
case "get_flush_interval":
final short flushInterval1 = hbaseClient.getFlushInterval();
reply(from, TypeUtil.tuple(new OtpErlangAtom("ok"), new OtpErlangShort(flushInterval1)));
Expand All @@ -109,18 +118,18 @@ private void handleCall(final OtpErlangTuple from, final OtpErlangTuple req)
reply(from, TypeUtil.tuple(reqType, mbox.self()));
break;
case "prefetch_meta":
final OtpErlangBinary table5 = (OtpErlangBinary) elements[1];
hbaseClient.prefetchMeta(table5.binaryValue())
final OtpErlangBinary table6 = (OtpErlangBinary) elements[1];
hbaseClient.prefetchMeta(table6.binaryValue())
.addCallback(new GenServerOkCallback(from, mbox))
.addErrback(new GenServerErrback(from, mbox));
break;
case "put":
final OtpErlangBinary table6 = (OtpErlangBinary) elements[1];
final OtpErlangBinary table7 = (OtpErlangBinary) elements[1];
final OtpErlangBinary key4 = (OtpErlangBinary) elements[2];
final OtpErlangBinary family3 = (OtpErlangBinary) elements[3];
final OtpErlangList qualifiers2 = (OtpErlangList) elements[4];
final OtpErlangList values2 = (OtpErlangList) elements[5];
hbaseClient.put(TypeUtil.putRequest(table6, key4, family3, qualifiers2, values2))
hbaseClient.put(TypeUtil.putRequest(table7, key4, family3, qualifiers2, values2))
.addCallback(new GenServerOkCallback(from, mbox))
.addErrback(new GenServerErrback(from, mbox));
break;
Expand Down

0 comments on commit 44f692c

Please sign in to comment.