From 44f692c67fe499df2da2f6ab805fbc5635ff20f8 Mon Sep 17 00:00:00 2001 From: Jihyun Yu Date: Mon, 25 Jul 2016 22:47:41 +0900 Subject: [PATCH] implements 'scan' command Implements 'scan' command which receives three paramaters - table: table name - options: Erlang proplist to specify scan options. - ref: Erlang reference to distinguish rows from multiple scans API returns `ok` after starting scan asyncronously. It returns each rows with {Ref, row, [ColumnList]} messages for each cells. If there is an error occures during scan, it sends {Ref, error, Name, Msg} and terminates scan. If all rows are successfully returned, it sends {Ref, done} message. --- .../main/java/me/cmoz/diver/AsyncScanner.java | 152 ++++++++++++++++++ .../me/cmoz/diver/AsyncScannerErrback.java | 31 ++++ .../main/java/me/cmoz/diver/JavaServer.java | 19 ++- 3 files changed, 197 insertions(+), 5 deletions(-) create mode 100644 java_src/main/java/me/cmoz/diver/AsyncScanner.java create mode 100644 java_src/main/java/me/cmoz/diver/AsyncScannerErrback.java diff --git a/java_src/main/java/me/cmoz/diver/AsyncScanner.java b/java_src/main/java/me/cmoz/diver/AsyncScanner.java new file mode 100644 index 0000000..40dc6fd --- /dev/null +++ b/java_src/main/java/me/cmoz/diver/AsyncScanner.java @@ -0,0 +1,152 @@ +package me.cmoz.diver; + +import com.ericsson.otp.erlang.*; +import com.stumbleupon.async.Callback; +import org.hbase.async.*; + +import java.util.ArrayList; + +class AsyncScanner implements Callback>> { + private static final OtpErlangAtom ROW_ATOM = new OtpErlangAtom("row"); + private static final OtpErlangAtom DONE_ATOM = new OtpErlangAtom("done"); + + private final OtpErlangTuple from; + private final OtpMbox mbox; + private final OtpErlangRef ref; + private final Scanner scanner; + int numRows = Integer.MAX_VALUE; + + public AsyncScanner(OtpErlangTuple from, OtpMbox mbox, OtpErlangRef ref, Scanner scanner, OtpErlangList options) + throws OtpErlangDecodeException { + this.from = from; + this.mbox = mbox; + this.ref = ref; + this.scanner = scanner; + + // prevent returning partial row by default + scanner.setMaxNumKeyValues(-1); + + for (final OtpErlangObject option : options) { + final OtpErlangTuple tuple = (OtpErlangTuple) option; + final OtpErlangObject[] tupleElements = tuple.elements(); + final String optionName = ((OtpErlangAtom) tupleElements[0]).atomValue(); + final OtpErlangObject optionValue = tupleElements[1]; + + switch(optionName) { + case "num_rows": + numRows = (int)((OtpErlangLong) optionValue).longValue(); + scanner.setMaxNumRows(numRows); + break; + + // TODO: setFamilies + case "family": + scanner.setFamily(((OtpErlangBinary) optionValue).binaryValue()); + break; + // TODO: setFilter + case "key_regexp": + scanner.setKeyRegexp(new String(((OtpErlangBinary) optionValue).binaryValue())); + break; + // TODO: setKeyRegexp(regesp, charset) + case "max_num_bytes": + scanner.setMaxTimestamp(((OtpErlangLong) optionValue).longValue()); + break; + case "max_num_keyvalues": + scanner.setMaxNumKeyValues((int)((OtpErlangLong) optionValue).longValue()); + break; + case "max_num_rows": + scanner.setMaxNumRows((int)((OtpErlangLong) optionValue).longValue()); + break; + case "max_timestamp": + scanner.setMaxTimestamp(((OtpErlangLong) optionValue).longValue()); + break; + case "max_versions": + scanner.setMaxVersions((int)((OtpErlangLong) optionValue).longValue()); + break; + case "qualifier": + scanner.setQualifier(((OtpErlangBinary) optionValue).binaryValue()); + break; + // TODO: setQualifiers + case "server_block_cache": + scanner.setServerBlockCache(((OtpErlangLong) optionValue).longValue() != 0); + break; + case "start_key": + scanner.setStartKey(((OtpErlangBinary) optionValue).binaryValue()); + break; + case "stop_key": + scanner.setStopKey(((OtpErlangBinary) optionValue).binaryValue()); + break; + case "time_range": + final OtpErlangObject[] timeRangeElems = ((OtpErlangTuple)optionValue).elements(); + scanner.setTimeRange( + ((OtpErlangLong) timeRangeElems[0]).longValue(), + ((OtpErlangLong) timeRangeElems[1]).longValue()); + break; + default: + final String message = String.format("Invalid scan option: \"%s\"", tuple); + throw new OtpErlangDecodeException(message); + } + } + } + + public void start() { + scanner.nextRows() + .addCallback(this) + .addErrback(new ScannerErrback(from, ref, mbox)); + } + + @Override + public Object call(ArrayList> rows) throws Exception { + if (rows == null) { + sendDone(); + return null; + } + + for(final ArrayList row : rows) { + sendRow(row); + + numRows -= 1; + if(numRows == 0) { + sendDone(); + return null; + } + } + + scanner.nextRows() + .addCallback(this) + .addErrback(new ScannerErrback(from, ref, mbox)); + return null; + } + + public void sendDone() { + final OtpErlangObject[] body = new OtpErlangObject[] { + ref, + DONE_ATOM, + }; + + mbox.send((OtpErlangPid) from.elementAt(0), new OtpErlangTuple(body)); + } + + public void sendRow(final ArrayList data) throws Exception { + final OtpErlangObject[] items = new OtpErlangObject[data.size()]; + int i = 0; + for (final KeyValue keyValue : data) { + final OtpErlangObject[] erldata = new OtpErlangObject[] { + new OtpErlangBinary(keyValue.key()), + new OtpErlangBinary(keyValue.family()), + new OtpErlangBinary(keyValue.qualifier()), + new OtpErlangBinary(keyValue.value()), + new OtpErlangLong(keyValue.timestamp()) + }; + items[i] = new OtpErlangTuple(erldata); + i++; + } + + final OtpErlangObject[] body = new OtpErlangObject[] { + ref, + ROW_ATOM, + new OtpErlangList(items) + }; + + mbox.send((OtpErlangPid) from.elementAt(0), new OtpErlangTuple(body)); + } +} diff --git a/java_src/main/java/me/cmoz/diver/AsyncScannerErrback.java b/java_src/main/java/me/cmoz/diver/AsyncScannerErrback.java new file mode 100644 index 0000000..e1107eb --- /dev/null +++ b/java_src/main/java/me/cmoz/diver/AsyncScannerErrback.java @@ -0,0 +1,31 @@ +package me.cmoz.diver; + +import com.ericsson.otp.erlang.*; +import com.stumbleupon.async.Callback; +import lombok.RequiredArgsConstructor; + +@RequiredArgsConstructor +class ScannerErrback implements Callback { + + private static final OtpErlangAtom ERROR_ATOM = new OtpErlangAtom("error"); + + private final OtpErlangTuple from; + + private final OtpErlangRef ref; + + private final OtpMbox mbox; + + @Override + public Object call(final Exception e) throws Exception { + final OtpErlangObject[] body = new OtpErlangObject[] { + ref, + ERROR_ATOM, + new OtpErlangString(e.getClass().getSimpleName()), + new OtpErlangString(e.getLocalizedMessage()) + }; + + mbox.send((OtpErlangPid) from.elementAt(0), new OtpErlangTuple(body)); + return null; + } + +} diff --git a/java_src/main/java/me/cmoz/diver/JavaServer.java b/java_src/main/java/me/cmoz/diver/JavaServer.java index 25d4837..0918eed 100644 --- a/java_src/main/java/me/cmoz/diver/JavaServer.java +++ b/java_src/main/java/me/cmoz/diver/JavaServer.java @@ -7,7 +7,7 @@ import com.stumbleupon.async.Callback; import com.stumbleupon.async.Deferred; import lombok.extern.slf4j.Slf4j; -import org.hbase.async.HBaseClient; +import org.hbase.async.*; @Slf4j class JavaServer extends AbstractExecutionThreadService { @@ -97,6 +97,15 @@ private void handleCall(final OtpErlangTuple from, final OtpErlangTuple req) .addCallback(new GenServerGetCallback(from, mbox)) .addErrback(new GenServerErrback(from, mbox)); break; + case "scan": + final OtpErlangBinary table5 = (OtpErlangBinary) elements[1]; + final OtpErlangList options = (OtpErlangList) elements[2]; + final OtpErlangRef ref = (OtpErlangRef) elements[3]; + final Scanner scanner = hbaseClient.newScanner(table5.binaryValue()); + final AsyncScanner asyncScanner = new AsyncScanner(from, mbox, ref, scanner, options); + asyncScanner.start(); + reply(from, new OtpErlangAtom("ok")); + break; case "get_flush_interval": final short flushInterval1 = hbaseClient.getFlushInterval(); reply(from, TypeUtil.tuple(new OtpErlangAtom("ok"), new OtpErlangShort(flushInterval1))); @@ -109,18 +118,18 @@ private void handleCall(final OtpErlangTuple from, final OtpErlangTuple req) reply(from, TypeUtil.tuple(reqType, mbox.self())); break; case "prefetch_meta": - final OtpErlangBinary table5 = (OtpErlangBinary) elements[1]; - hbaseClient.prefetchMeta(table5.binaryValue()) + final OtpErlangBinary table6 = (OtpErlangBinary) elements[1]; + hbaseClient.prefetchMeta(table6.binaryValue()) .addCallback(new GenServerOkCallback(from, mbox)) .addErrback(new GenServerErrback(from, mbox)); break; case "put": - final OtpErlangBinary table6 = (OtpErlangBinary) elements[1]; + final OtpErlangBinary table7 = (OtpErlangBinary) elements[1]; final OtpErlangBinary key4 = (OtpErlangBinary) elements[2]; final OtpErlangBinary family3 = (OtpErlangBinary) elements[3]; final OtpErlangList qualifiers2 = (OtpErlangList) elements[4]; final OtpErlangList values2 = (OtpErlangList) elements[5]; - hbaseClient.put(TypeUtil.putRequest(table6, key4, family3, qualifiers2, values2)) + hbaseClient.put(TypeUtil.putRequest(table7, key4, family3, qualifiers2, values2)) .addCallback(new GenServerOkCallback(from, mbox)) .addErrback(new GenServerErrback(from, mbox)); break;