diff --git a/Makefile b/Makefile index 0f2255718..35f973165 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,6 @@ -.PHONY: compile test unit-test integration-test protogen +.PHONY: compile test unit-test integration-test integration-test-timeseries protogen + +RIAK_PORT ?= 8087 compile: mvn clean compile @@ -9,7 +11,13 @@ unit-test: mvn test integration-test: - mvn -Pitest,default -Dcom.basho.riak.2i=true -Dcom.basho.riak.yokozuna=true -Dcom.basho.riak.buckettype=true -Dcom.basho.riak.crdt=true -Dcom.basho.riak.lifecycle=true verify + mvn -Pitest,default -Dcom.basho.riak.2i=true -Dcom.basho.riak.yokozuna=true -Dcom.basho.riak.buckettype=true -Dcom.basho.riak.crdt=true -Dcom.basho.riak.lifecycle=true -Dcom.basho.riak.pbcport=$(RIAK_PORT) verify + +integration-test-timeseries: + mvn -Pitest,default -Dcom.basho.riak.buckettype=true -Dcom.basho.riak.crdt=true -Dcom.basho.riak.lifecycle=true -Dcom.basho.riak.timeseries=true -Dcom.basho.riak.pbcport=$(RIAK_PORT) verify + +integration-test-security: + mvn -Pitest,default -Dcom.basho.riak.security=true -Dcom.basho.riak.security.clientcert=true -Dcom.basho.riak.pbcport=$(RIAK_PORT) test-compile failsafe:integration-test protogen: mvn -Pprotobuf-generate generate-sources diff --git a/pom.xml b/pom.xml index 1e4d96bdb..e8fe8d21a 100755 --- a/pom.xml +++ b/pom.xml @@ -431,5 +431,10 @@ antlr-complete 3.5.2 + + org.erlang.otp + jinterface + 1.5.6 + diff --git a/src/main/java/com/basho/riak/client/core/DefaultNodeManager.java b/src/main/java/com/basho/riak/client/core/DefaultNodeManager.java index 454020873..d9d45a634 100644 --- a/src/main/java/com/basho/riak/client/core/DefaultNodeManager.java +++ b/src/main/java/com/basho/riak/client/core/DefaultNodeManager.java @@ -20,23 +20,24 @@ import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantReadWriteLock; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * - * The default {@link NodeManager} used by {@link RiakCluster} if none is + * The default {@link NodeManager} used by {@link RiakCluster} if none is * specified. - * - * This NodeManager round-robins through a list of {@link RiakNode}s and attempts - * to execute the operation passed to it. If a node reports that it is - * health checking it is removed from the list until it sends an update that it - * is again running. If the selected node cannot accept the operation because all - * connections are in use or it unable to make a new connection, the next node in - * the list is tried until either the operation is accepted or all nodes have - * been tried. If no nodes are able to accept the operation its setException() + * + * This NodeManager round-robins through a list of {@link RiakNode}s and attempts + * to execute the operation passed to it. If a node reports that it is + * health checking it is removed from the list until it sends an update that it + * is again running. If the selected node cannot accept the operation because all + * connections are in use or it unable to make a new connection, the next node in + * the list is tried until either the operation is accepted or all nodes have + * been tried. If no nodes are able to accept the operation its setException() * method is called with a {@link NoNodesAvailableException}. - * + * * @author Brian Roach * @since 2.0 */ @@ -47,7 +48,7 @@ public class DefaultNodeManager implements NodeManager, NodeStateListener private final AtomicInteger index = new AtomicInteger(); private final Logger logger = LoggerFactory.getLogger(DefaultNodeManager.class); private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true); - + @Override public void init(List nodes) { @@ -73,7 +74,7 @@ public boolean executeOnNode(FutureOperation operation, RiakNode previousNode) { int startIndex = index.getAndIncrement(); int currentIndex = startIndex; - + do { if (healthy.get(Math.abs(currentIndex % healthy.size())).execute(operation)) @@ -89,7 +90,7 @@ else if (healthy.size() == 1) { executed = healthy.get(0).execute(operation); } - + return executed; } finally @@ -97,7 +98,7 @@ else if (healthy.size() == 1) lock.readLock().unlock(); } } - + @Override public void nodeStateChanged(RiakNode node, State state) { @@ -110,7 +111,7 @@ public void nodeStateChanged(RiakNode node, State state) if (unhealthy.remove(node)) { healthy.add(node); - logger.info("NodeManager moved node to healthy list; {}:{}", + logger.info("NodeManager moved node to healthy list; {}:{}", node.getRemoteAddress(), node.getPort()); } } @@ -126,7 +127,7 @@ public void nodeStateChanged(RiakNode node, State state) if (healthy.remove(node)) { unhealthy.add(node); - logger.info("NodeManager moved node to unhealthy list; {}:{}", + logger.info("NodeManager moved node to unhealthy list; {}:{}", node.getRemoteAddress(), node.getPort()); } } @@ -174,7 +175,7 @@ public void addNode(RiakNode newNode) { lock.writeLock().unlock(); } - + } @Override @@ -194,12 +195,12 @@ public boolean removeNode(RiakNode node) { lock.writeLock().unlock(); } - + if (removed) { node.removeStateListener(this); node.shutdown(); - logger.info("NodeManager removed and shutdown node; {}:{}", + logger.info("NodeManager removed and shutdown node; {}:{}", node.getRemoteAddress(), node.getPort()); } return removed; diff --git a/src/main/java/com/basho/riak/client/core/FutureOperation.java b/src/main/java/com/basho/riak/client/core/FutureOperation.java index a3c21919f..49d112b9d 100644 --- a/src/main/java/com/basho/riak/client/core/FutureOperation.java +++ b/src/main/java/com/basho/riak/client/core/FutureOperation.java @@ -16,9 +16,6 @@ package com.basho.riak.client.core; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.util.Arrays; import java.util.HashSet; import java.util.LinkedList; @@ -28,6 +25,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.locks.ReentrantLock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * @author Brian Roach @@ -262,7 +261,7 @@ synchronized final void setException(Throwable t) public synchronized final Object channelMessage() { - Object message = createChannelMessage(); + final Object message = createChannelMessage(); state = State.WRITTEN; return message; } @@ -309,17 +308,35 @@ public final T get() throws InterruptedException, ExecutionException { latch.await(); + throwExceptionIfSet(); + + if (null == converted) + { + tryConvertResponse(); + } + + return converted; + } + + private void throwExceptionIfSet() throws ExecutionException + { if (exception != null) { throw new ExecutionException(exception); } - else if (null == converted) + } + + private void tryConvertResponse() throws ExecutionException + { + try { converted = convert(rawResponse); - } - - return converted; + catch(IllegalArgumentException ex) + { + exception = ex; + throwExceptionIfSet(); + } } @Override @@ -331,13 +348,12 @@ public final T get(long timeout, TimeUnit unit) throws InterruptedException, Exe { throw new TimeoutException(); } - else if (exception != null) - { - throw new ExecutionException(exception); - } - else if (null == converted) + + throwExceptionIfSet(); + + if (null == converted) { - converted = convert(rawResponse); + tryConvertResponse(); } return converted; @@ -373,6 +389,16 @@ public final void await(long timeout, TimeUnit unit) throws InterruptedException latch.await(timeout, unit); } + protected U checkAndGetSingleResponse(List responses) + { + if (responses.size() > 1) + { + LoggerFactory.getLogger(this.getClass()).error("Received {} responses when only one was expected.", + responses.size()); + } + + return responses.get(0); + } private void stateCheck(State... allowedStates) { diff --git a/src/main/java/com/basho/riak/client/core/RiakMessage.java b/src/main/java/com/basho/riak/client/core/RiakMessage.java index d24e3f526..af27d1901 100644 --- a/src/main/java/com/basho/riak/client/core/RiakMessage.java +++ b/src/main/java/com/basho/riak/client/core/RiakMessage.java @@ -1,43 +1,150 @@ -/* - * Copyright 2013 Basho Technologies Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ package com.basho.riak.client.core; +import com.basho.riak.client.core.netty.RiakResponseException; +import com.basho.riak.protobuf.RiakMessageCodes; +import com.basho.riak.protobuf.RiakPB; +import com.ericsson.otp.erlang.OtpErlangDecodeException; +import com.ericsson.otp.erlang.OtpExternal; +import com.ericsson.otp.erlang.OtpInputStream; +import com.google.protobuf.InvalidProtocolBufferException; +import java.nio.charset.StandardCharsets; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * Encapsulates the raw bytes sent to or received from Riak. + * * @author Brian Roach + * @author Sergey Galkin * @since 2.0 */ public final class RiakMessage { + private static final Logger logger = LoggerFactory.getLogger(RiakMessage.class); private final byte code; private final byte[] data; - + private final RiakResponseException riakError; + private static final String ERROR_RESP = "rpberrorresp"; + public RiakMessage(byte code, byte[] data) + { + this(code, data, true); + } + + public RiakMessage(byte code, byte[] data, boolean doErrorCheck) { this.code = code; this.data = data; + + if(doErrorCheck) + { + switch (this.code) + { + case RiakMessageCodes.MSG_ErrorResp: + this.riakError = getRiakErrorFromPbuf(this.data); + break; + case RiakMessageCodes.MSG_TsTtbMsg: + OtpInputStream ttbInputStream = new OtpInputStream(data); + this.riakError = getRiakErrorFromTtb(ttbInputStream); + break; + default: + this.riakError = null; + } + } + else + { + this.riakError = null; + } + } + + private static RiakResponseException getRiakErrorFromPbuf(byte[] data) + { + try + { + RiakPB.RpbErrorResp err = RiakPB.RpbErrorResp.parseFrom(data); + return new RiakResponseException(err.getErrcode(), err.getErrmsg().toStringUtf8()); + } + catch (InvalidProtocolBufferException ex) + { + logger.error("exception", ex); + return new RiakResponseException(0, "Could not parse protocol buffers error"); + } } - + public byte getCode() { return code; } - + public byte[] getData() { return data; } + + public boolean isRiakError() + { + return this.riakError != null; + } + + public RiakResponseException getRiakError() + { + return this.riakError; + } + + private RiakResponseException getRiakErrorFromTtb(OtpInputStream ttbInputStream) + { + final String decodeErrorMsg = "Error decoding Riak TTB Response, unexpected format."; + int ttbMsgArity; + + try + { + int firstByte = ttbInputStream.read1skip_version(); + + if(firstByte != OtpExternal.smallTupleTag && firstByte != OtpExternal.largeTupleTag) + { + return null; + } + + ttbInputStream.reset(); + + ttbMsgArity = ttbInputStream.read_tuple_head(); + } + catch (OtpErlangDecodeException ex) + { + logger.error(decodeErrorMsg + " Was expecting a tuple head.", ex); + throw new IllegalArgumentException(decodeErrorMsg, ex); + } + + if (ttbMsgArity == 3) + { + // NB: may be an error response + String atom; + try + { + atom = ttbInputStream.read_atom(); + } + catch (OtpErlangDecodeException ex) + { + logger.error(decodeErrorMsg + " Was expecting an atom.", ex); + throw new IllegalArgumentException(decodeErrorMsg, ex); + } + + if (ERROR_RESP.equals(atom)) + { + try + { + String errMsg = new String(ttbInputStream.read_binary(), StandardCharsets.UTF_8); + int errCode = ttbInputStream.read_int(); + return new RiakResponseException(errCode, errMsg); + } + catch (OtpErlangDecodeException ex) + { + logger.error(decodeErrorMsg, ex); + throw new IllegalArgumentException(decodeErrorMsg, ex); + } + } + } + + return null; + } } diff --git a/src/main/java/com/basho/riak/client/core/codec/InvalidTermToBinaryException.java b/src/main/java/com/basho/riak/client/core/codec/InvalidTermToBinaryException.java new file mode 100644 index 000000000..97146d42b --- /dev/null +++ b/src/main/java/com/basho/riak/client/core/codec/InvalidTermToBinaryException.java @@ -0,0 +1,18 @@ +package com.basho.riak.client.core.codec; + +import com.ericsson.otp.erlang.OtpErlangDecodeException; + +import java.io.IOException; + +public class InvalidTermToBinaryException extends IOException +{ + public InvalidTermToBinaryException(String errorMessage, OtpErlangDecodeException cause) + { + super(errorMessage, cause); + } + + public InvalidTermToBinaryException(String errorMessage) + { + super(errorMessage); + } +} diff --git a/src/main/java/com/basho/riak/client/core/codec/TermToBinaryCodec.java b/src/main/java/com/basho/riak/client/core/codec/TermToBinaryCodec.java new file mode 100644 index 000000000..d477733ad --- /dev/null +++ b/src/main/java/com/basho/riak/client/core/codec/TermToBinaryCodec.java @@ -0,0 +1,331 @@ +package com.basho.riak.client.core.codec; + +import com.basho.riak.client.core.query.timeseries.Cell; +import com.basho.riak.client.core.query.timeseries.QueryResult; +import com.basho.riak.client.core.query.timeseries.Row; +import com.basho.riak.client.core.util.CharsetUtils; +import com.basho.riak.protobuf.RiakTsPB; +import com.ericsson.otp.erlang.*; +import com.google.protobuf.ByteString; +import java.nio.charset.StandardCharsets; +import java.util.*; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TermToBinaryCodec +{ + private static final String TS_GET_REQ = "tsgetreq"; + private static final String TS_GET_RESP = "tsgetresp"; + private static final String TS_QUERY_REQ = "tsqueryreq"; + private static final String TS_QUERY_RESP = "tsqueryresp"; + private static final String TS_INTERPOLATION = "tsinterpolation"; + private static final String TS_PUT_REQ = "tsputreq"; + private static final String UNDEFINED = "undefined"; + private static final Logger logger = LoggerFactory.getLogger(TermToBinaryCodec.class); + + public static OtpOutputStream encodeTsGetRequest(String tableName, Collection keyValues, int timeout) + { + final OtpOutputStream os = new OtpOutputStream(); + os.write(OtpExternal.versionTag); // NB: this is the reqired 0x83 (131) value + + // NB: TsGetReq is a 4-tuple: tsgetreq, tableName, [key values], timeout + os.write_tuple_head(4); + os.write_atom(TS_GET_REQ); + os.write_binary(tableName.getBytes(StandardCharsets.UTF_8)); + + os.write_list_head(keyValues.size()); + for (Cell cell : keyValues) + { + writeTsCellToStream(os, cell); + } + os.write_nil(); // NB: finishes the list + + os.write_long(timeout); + + return os; + } + + public static QueryResult decodeTsResultResponse(byte[] response) + throws OtpErlangDecodeException, InvalidTermToBinaryException + { + return decodeTsResponse(response); + } + + public static OtpOutputStream encodeTsQueryRequest(String queryText) + { + final OtpOutputStream os = new OtpOutputStream(); + os.write(OtpExternal.versionTag); // NB: this is the reqired 0x83 (131) value + + // TsQueryReq is a 4-tuple: {'tsqueryreq', TsInt, boolIsStreaming, bytesCoverContext} + os.write_tuple_head(4); + os.write_atom(TS_QUERY_REQ); + + // TsInterpolation is a 3-tuple + // {'tsinterpolation', query, []} empty list is interpolations + os.write_tuple_head(3); + os.write_atom(TS_INTERPOLATION); + os.write_binary(queryText.getBytes(StandardCharsets.UTF_8)); + // interpolations is an empty list + os.write_nil(); + + // streaming is false for now + os.write_boolean(false); + + // cover_context is an undefined atom + os.write_atom(UNDEFINED); + + return os; + } + + public static OtpOutputStream encodeTsPutRequest(String tableName, Collection rows) + { + final OtpOutputStream os = new OtpOutputStream(); + os.write(OtpExternal.versionTag); // NB: this is the reqired 0x83 (131) value + + // TsPutReq is a 4-tuple: {'tsputreq', tableName, [], [rows]} + // columns is empte + os.write_tuple_head(4); + os.write_atom(TS_PUT_REQ); + os.write_binary(tableName.getBytes(StandardCharsets.UTF_8)); + // columns is an empty list + os.write_nil(); + + // write a list of rows + // each row is a tuple of cells + os.write_list_head(rows.size()); + for (Row row : rows) + { + os.write_tuple_head(row.getCellsCount()); + for (Cell cell : row) + { + if (cell == null) + { + // NB: Null cells are represented as empty lists + os.write_nil(); + } + else + { + writeTsCellToStream(os, cell); + } + } + } + os.write_nil(); + + return os; + } + + private static void writeTsCellToStream(OtpOutputStream stream, Cell cell) + { + if (cell.hasVarcharValue()) + { + stream.write_binary(cell.getVarcharAsUTF8String().getBytes(CharsetUtils.UTF_8)); + } + else if (cell.hasLong()) + { + stream.write_long(cell.getLong()); + } + else if (cell.hasTimestamp()) + { + stream.write_long(cell.getTimestamp()); + } + else if (cell.hasBoolean()) + { + stream.write_boolean(cell.getBoolean()); + } + else if (cell.hasDouble()) + { + stream.write_double(cell.getDouble()); + } + else + { + logger.error("Unknown TS cell type encountered."); + throw new IllegalArgumentException("Unknown TS cell type encountered."); + } + } + + private static QueryResult decodeTsResponse(byte[] response) + throws OtpErlangDecodeException, InvalidTermToBinaryException + { + QueryResult result = null; + + OtpInputStream is = new OtpInputStream(response); + + int firstByte = is.read1skip_version(); + is.reset(); + + if(firstByte != OtpExternal.smallTupleTag && firstByte != OtpExternal.largeTupleTag) + { + return parseAtomResult(is); + } + + return parseTupleResult(is); + } + + private static QueryResult parseAtomResult(OtpInputStream is) + throws OtpErlangDecodeException, InvalidTermToBinaryException + { + final String responseAtom = is.read_atom(); + + if(Objects.equals(responseAtom, TS_QUERY_RESP)) + { + return QueryResult.EMPTY; + } + + throw new InvalidTermToBinaryException("Invalid Response atom encountered: " + + responseAtom + ". Was expecting tsqueryresp"); + + } + + private static QueryResult parseTupleResult(OtpInputStream is) + throws OtpErlangDecodeException, InvalidTermToBinaryException + { + QueryResult result; + final int msgArity = is.read_tuple_head(); + // Response is: + // {'rpberrorresp', ErrMsg, ErrCode} + // {'tsgetresp', {ColNames, ColTypes, Rows}} + // {'tsqueryresp', {ColNames, ColTypes, Rows}} + final String respAtom = is.read_atom(); + switch (respAtom) + { + case TS_GET_RESP: + case TS_QUERY_RESP: + assert (msgArity == 2); + + final int dataArity = is.read_tuple_head(); + assert (dataArity == 3); + + final ArrayList columnDescriptions = parseColumnDescriptions(is); + + final ArrayList rows = parseRows(is, columnDescriptions); + + result = new QueryResult(columnDescriptions, rows); + + break; + default: + final String errorMsg = "Unsupported response message received: " + respAtom; + logger.error(errorMsg); + throw new IllegalArgumentException(errorMsg); + } + return result; + } + + private static ArrayList parseColumnDescriptions(OtpInputStream is) + throws OtpErlangDecodeException + { + final int colNameCount = is.read_list_head(); + final String[] columnNames = new String[colNameCount]; + for (int colNameIdx = 0; colNameIdx < colNameCount; colNameIdx++) + { + final String colName = new String(is.read_binary(), StandardCharsets.UTF_8); + columnNames[colNameIdx] = colName; + } + + if (colNameCount > 0) + { + is.read_nil(); + } + + + final int colTypeCount = is.read_list_head(); + assert (colNameCount == colTypeCount); + final String[] columnTypes = new String[colTypeCount]; + for (int colTypeIdx = 0; colTypeIdx < colTypeCount; colTypeIdx++) + { + final String colType = is.read_atom(); + columnTypes[colTypeIdx] = colType; + } + + if (colTypeCount > 0) + { + is.read_nil(); + } + + final ArrayList columnDescriptions = new ArrayList<>(colNameCount); + for (int colDescIdx = 0; colDescIdx < colNameCount; colDescIdx++) + { + final RiakTsPB.TsColumnDescription.Builder descBuilder = RiakTsPB.TsColumnDescription.newBuilder(); + + descBuilder.setName(ByteString.copyFromUtf8(columnNames[colDescIdx])); + descBuilder.setType(RiakTsPB.TsColumnType.valueOf(columnTypes[colDescIdx].toUpperCase(Locale.US))); + + columnDescriptions.add(descBuilder.build()); + } + return columnDescriptions; + } + + private static ArrayList parseRows(OtpInputStream is, + List columnDescriptions) + throws OtpErlangDecodeException, InvalidTermToBinaryException + { + final int rowCount = is.read_list_head(); + final ArrayList rows = new ArrayList<>(rowCount); + + for (int rowIdx = 0; rowIdx < rowCount; rowIdx++) + { + rows.add(parseRow(is, columnDescriptions)); + } + return rows; + } + + private static RiakTsPB.TsRow parseRow(OtpInputStream is, List columnDescriptions) + throws OtpErlangDecodeException, InvalidTermToBinaryException + { + final int rowDataCount = is.read_tuple_head(); + assert (columnDescriptions.size() == rowDataCount); + + final Cell[] cells = new Cell[rowDataCount]; + for (int j = 0; j < rowDataCount; j++) + { + final OtpErlangObject cell = is.read_any(); + cells[j] = parseCell(columnDescriptions, j, cell); + } + + return new Row(cells).getPbRow(); + } + + private static Cell parseCell(List columnDescriptions, int j, OtpErlangObject cell) + throws InvalidTermToBinaryException + { + if (cell instanceof OtpErlangBinary) + { + OtpErlangBinary v = (OtpErlangBinary) cell; + String s = new String(v.binaryValue(), StandardCharsets.UTF_8); + return new Cell(s); + } + else if (cell instanceof OtpErlangLong) + { + OtpErlangLong v = (OtpErlangLong) cell; + if (columnDescriptions.get(j).getType() == RiakTsPB.TsColumnType.TIMESTAMP) + { + return Cell.newTimestamp(v.longValue()); + } + else + { + return new Cell(v.longValue()); + } + } + else if (cell instanceof OtpErlangDouble) + { + OtpErlangDouble v = (OtpErlangDouble) cell; + return new Cell(v.doubleValue()); + } + else if (cell instanceof OtpErlangAtom) + { + OtpErlangAtom v = (OtpErlangAtom) cell; + return new Cell(v.booleanValue()); + } + else if (cell instanceof OtpErlangList) + { + final OtpErlangList l = (OtpErlangList) cell; + assert (l.arity() == 0); + return null; + } + else + { + throw new InvalidTermToBinaryException("Unknown cell type encountered: " + cell.toString() + ", unable to" + + " continue parsing."); + } + } +} diff --git a/src/main/java/com/basho/riak/client/core/netty/RiakMessageCodec.java b/src/main/java/com/basho/riak/client/core/netty/RiakMessageCodec.java index 2ae37b35d..c635826c4 100644 --- a/src/main/java/com/basho/riak/client/core/netty/RiakMessageCodec.java +++ b/src/main/java/com/basho/riak/client/core/netty/RiakMessageCodec.java @@ -33,7 +33,7 @@ protected void encode(ChannelHandlerContext ctx, RiakMessage msg, ByteBuf out) t int length = msg.getData().length + 1; out.writeInt(length); out.writeByte(msg.getCode()); - out.writeBytes(msg.getData()); + out.writeBytes(msg.getData()); } @Override @@ -44,7 +44,7 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) t { in.markReaderIndex(); int length = in.readInt(); - + // See if we have the full frame. if (in.readableBytes() < length) { @@ -58,8 +58,8 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) t in.readBytes(array); out.add(new RiakMessage(code,array)); } - + } } - + } diff --git a/src/main/java/com/basho/riak/client/core/netty/RiakOperationEncoder.java b/src/main/java/com/basho/riak/client/core/netty/RiakOperationEncoder.java index 619c98b1d..35b5eeec2 100644 --- a/src/main/java/com/basho/riak/client/core/netty/RiakOperationEncoder.java +++ b/src/main/java/com/basho/riak/client/core/netty/RiakOperationEncoder.java @@ -33,5 +33,5 @@ protected void encode(ChannelHandlerContext ctx, FutureOperation operation, List { out.add(operation.channelMessage()); } - + } diff --git a/src/main/java/com/basho/riak/client/core/netty/RiakResponseHandler.java b/src/main/java/com/basho/riak/client/core/netty/RiakResponseHandler.java index f98f7edf6..6b295d470 100644 --- a/src/main/java/com/basho/riak/client/core/netty/RiakResponseHandler.java +++ b/src/main/java/com/basho/riak/client/core/netty/RiakResponseHandler.java @@ -1,28 +1,9 @@ -/* - * Copyright 2013 Basho Technologies Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ package com.basho.riak.client.core.netty; import com.basho.riak.client.core.RiakMessage; import com.basho.riak.client.core.RiakResponseListener; -import com.basho.riak.protobuf.RiakMessageCodes; -import com.basho.riak.protobuf.RiakPB; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * @@ -31,9 +12,7 @@ */ public class RiakResponseHandler extends ChannelInboundHandlerAdapter { - private RiakResponseListener listener; - private final Logger logger = LoggerFactory.getLogger(RiakResponseHandler.class); public RiakResponseHandler(RiakResponseListener listener) { @@ -45,16 +24,9 @@ public RiakResponseHandler(RiakResponseListener listener) public void channelRead(ChannelHandlerContext chc, Object message) throws Exception { RiakMessage riakMessage = (RiakMessage) message; - if (riakMessage.getCode() == RiakMessageCodes.MSG_ErrorResp) - { - RiakPB.RpbErrorResp error = RiakPB.RpbErrorResp.parseFrom(riakMessage.getData()); - - listener.onRiakErrorResponse(chc.channel(), - new RiakResponseException(error.getErrcode(), - error.getErrmsg().toStringUtf8())); - } - else - { + if (riakMessage.isRiakError()) { + listener.onRiakErrorResponse(chc.channel(), riakMessage.getRiakError()); + } else { listener.onSuccess(chc.channel(), riakMessage); } } @@ -68,5 +40,4 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) listener.onException(ctx.channel(), cause); ctx.close(); } - -} +} \ No newline at end of file diff --git a/src/main/java/com/basho/riak/client/core/operations/DeleteOperation.java b/src/main/java/com/basho/riak/client/core/operations/DeleteOperation.java index 61b1350c5..9e018da3d 100644 --- a/src/main/java/com/basho/riak/client/core/operations/DeleteOperation.java +++ b/src/main/java/com/basho/riak/client/core/operations/DeleteOperation.java @@ -24,7 +24,7 @@ import java.util.List; -import static com.basho.riak.client.core.operations.Operations.checkMessageType; +import static com.basho.riak.client.core.operations.Operations.checkPBMessageType; import com.basho.riak.client.core.query.Location; /** @@ -46,7 +46,7 @@ private DeleteOperation(Builder builder) } @Override - protected Void convert(List rawResponse) + protected Void convert(List rawResponse) { return null; } @@ -54,7 +54,7 @@ protected Void convert(List rawResponse) @Override protected Void decode(RiakMessage rawResponse) { - checkMessageType(rawResponse, RiakMessageCodes.MSG_DelResp); + checkPBMessageType(rawResponse, RiakMessageCodes.MSG_DelResp); return null; } @@ -85,7 +85,7 @@ public Builder(Location location) { throw new IllegalArgumentException("Location can not be null"); } - + reqBuilder.setBucket(ByteString.copyFrom(location.getNamespace().getBucketName().unsafeGetValue())); reqBuilder.setKey(ByteString.copyFrom(location.getKey().unsafeGetValue())); reqBuilder.setType(ByteString.copyFrom(location.getNamespace().getBucketType().unsafeGetValue())); diff --git a/src/main/java/com/basho/riak/client/core/operations/DtFetchOperation.java b/src/main/java/com/basho/riak/client/core/operations/DtFetchOperation.java index 0cd46a6f6..6e5a7849c 100644 --- a/src/main/java/com/basho/riak/client/core/operations/DtFetchOperation.java +++ b/src/main/java/com/basho/riak/client/core/operations/DtFetchOperation.java @@ -74,7 +74,7 @@ protected RiakMessage createChannelMessage() @Override protected RiakDtPB.DtFetchResp decode(RiakMessage rawMessage) { - Operations.checkMessageType(rawMessage, RiakMessageCodes.MSG_DtFetchResp); + Operations.checkPBMessageType(rawMessage, RiakMessageCodes.MSG_DtFetchResp); try { return RiakDtPB.DtFetchResp.parseFrom(rawMessage.getData()); @@ -106,7 +106,7 @@ public Builder(Location location) { throw new IllegalArgumentException("Location can not be null"); } - + reqBuilder.setBucket(ByteString.copyFrom(location.getNamespace().getBucketName().unsafeGetValue())); reqBuilder.setKey(ByteString.copyFrom(location.getKey().unsafeGetValue())); reqBuilder.setType(ByteString.copyFrom(location.getNamespace().getBucketType().unsafeGetValue())); @@ -238,7 +238,7 @@ public DtFetchOperation build() } - public static class Response + public static class Response { private final BinaryValue context; private final RiakDatatype crdtElement; @@ -276,7 +276,7 @@ protected static abstract class Init> protected abstract T self(); protected abstract Response build(); - + T withContext(BinaryValue context) { if (context != null) diff --git a/src/main/java/com/basho/riak/client/core/operations/DtUpdateOperation.java b/src/main/java/com/basho/riak/client/core/operations/DtUpdateOperation.java index ecc767cbc..cf13ac748 100644 --- a/src/main/java/com/basho/riak/client/core/operations/DtUpdateOperation.java +++ b/src/main/java/com/basho/riak/client/core/operations/DtUpdateOperation.java @@ -47,7 +47,7 @@ private DtUpdateOperation(Builder builder) } @Override - protected Response convert(List rawResponse) + protected Response convert(List rawResponse) { if (rawResponse.size() != 1) { @@ -57,8 +57,8 @@ protected Response convert(List rawResponse) RiakDtPB.DtUpdateResp response = rawResponse.iterator().next(); CrdtResponseConverter converter = new CrdtResponseConverter(); RiakDatatype element = converter.convert(response); - - Response.Builder responseBuilder = + + Response.Builder responseBuilder = new Response.Builder().withCrdtElement(element); if (response.hasKey()) @@ -66,7 +66,7 @@ protected Response convert(List rawResponse) BinaryValue key = BinaryValue.unsafeCreate(response.getKey().toByteArray()); responseBuilder.withGeneratedKey(key); } - + if (response.hasContext()) { BinaryValue context = BinaryValue.unsafeCreate(response.getContext().toByteArray()); @@ -86,7 +86,7 @@ protected RiakMessage createChannelMessage() @Override protected RiakDtPB.DtUpdateResp decode(RiakMessage rawMessage) { - Operations.checkMessageType(rawMessage, RiakMessageCodes.MSG_DtUpdateResp); + Operations.checkPBMessageType(rawMessage, RiakMessageCodes.MSG_DtUpdateResp); try { RiakDtPB.DtUpdateResp resp = RiakDtPB.DtUpdateResp.parseFrom(rawMessage.getData()); @@ -109,7 +109,7 @@ public static class Builder private final RiakDtPB.DtUpdateReq.Builder reqBuilder = RiakDtPB.DtUpdateReq.newBuilder(); private final Location location; private boolean removeOpPresent = false; - + /** * Construct a builder for a DtUpdateOperation. * @param location The location of the object in Riak. @@ -124,11 +124,11 @@ else if (location.getNamespace().getBucketTypeAsString().equals(Namespace.DEFAUL { throw new IllegalArgumentException("Default bucket type does not accept CRDTs"); } - + reqBuilder.setBucket(ByteString.copyFrom(location.getNamespace().getBucketName().unsafeGetValue())); reqBuilder.setType(ByteString.copyFrom(location.getNamespace().getBucketType().unsafeGetValue())); reqBuilder.setKey(ByteString.copyFrom(location.getKey().unsafeGetValue())); - + this.location = location; } @@ -142,17 +142,17 @@ else if (namespace.getBucketTypeAsString().equals(Namespace.DEFAULT_BUCKET_TYPE) { throw new IllegalArgumentException("Default bucket type does not accept CRDTs"); } - + // This is simply for the returned query info Location loc = new Location(namespace, "RIAK_GENERATED"); - + reqBuilder.setBucket(ByteString.copyFrom(loc.getNamespace().getBucketName().unsafeGetValue())); reqBuilder.setType(ByteString.copyFrom(loc.getNamespace().getBucketType().unsafeGetValue())); - + this.location = loc; - + } - + /** * Set the context for this operation. * @@ -269,7 +269,7 @@ public DtUpdateOperation build() { throw new IllegalStateException("Remove operations cannot be performed without a context."); } - + return new DtUpdateOperation(this); } @@ -293,7 +293,7 @@ RiakDtPB.SetOp getSetOp(SetOp op) { setOpBuilder.addRemoves(ByteString.copyFrom(element.unsafeGetValue())); } - + if (setOpBuilder.getRemovesCount() > 0) { removeOpPresent = true; @@ -348,7 +348,7 @@ RiakDtPB.MapOp getMapOp(MapOp op) { mapOpBuilder.addRemoves(getMapField(field)); } - + if (mapOpBuilder.getRemovesCount() > 0) { removeOpPresent = true; @@ -437,38 +437,38 @@ private Builder withOp(SetOp op) } } - + public static class Response extends DtFetchOperation.Response { private final BinaryValue generatedKey; - + private Response(Init builder) { super(builder); this.generatedKey = builder.generatedKey; } - + public boolean hasGeneratedKey() { return generatedKey != null; } - + public BinaryValue getGeneratedKey() { return generatedKey; } - + protected static abstract class Init> extends DtFetchOperation.Response.Init { private BinaryValue generatedKey; - + T withGeneratedKey(BinaryValue generatedKey) { this.generatedKey = generatedKey; return self(); } } - + static class Builder extends Init { @Override @@ -476,14 +476,14 @@ protected Builder self() { return this; } - + @Override protected Response build() { return new Response(this); } - + } } - + } diff --git a/src/main/java/com/basho/riak/client/core/operations/FetchBucketPropsOperation.java b/src/main/java/com/basho/riak/client/core/operations/FetchBucketPropsOperation.java index dff694e43..c119f7fd1 100644 --- a/src/main/java/com/basho/riak/client/core/operations/FetchBucketPropsOperation.java +++ b/src/main/java/com/basho/riak/client/core/operations/FetchBucketPropsOperation.java @@ -35,17 +35,17 @@ public class FetchBucketPropsOperation extends FutureOperation rawResponse) + protected Response convert(List rawResponse) { - // This isn't streaming, there will only be one response. + // This isn't streaming, there will only be one response. RiakPB.RpbBucketProps pbProps = rawResponse.get(0).getProps(); return new Response(BucketPropertiesConverter.convert(pbProps)); } @@ -60,7 +60,7 @@ protected RiakMessage createChannelMessage() @Override protected RiakPB.RpbGetBucketResp decode(RiakMessage rawMessage) { - Operations.checkMessageType(rawMessage, RiakMessageCodes.MSG_GetBucketResp); + Operations.checkPBMessageType(rawMessage, RiakMessageCodes.MSG_GetBucketResp); try { return RiakPB.RpbGetBucketResp.parseFrom(rawMessage.getData()); @@ -76,13 +76,13 @@ public Namespace getQueryInfo() { return namespace; } - + public static class Builder { - private final RiakPB.RpbGetBucketReq.Builder reqBuilder = + private final RiakPB.RpbGetBucketReq.Builder reqBuilder = RiakPB.RpbGetBucketReq.newBuilder(); private final Namespace namespace; - + /** * Construct a builder for a FetchBucketPropsOperation. * @param namespace The namespace for the bucket. @@ -97,13 +97,13 @@ public Builder(Namespace namespace) reqBuilder.setType(ByteString.copyFrom(namespace.getBucketType().unsafeGetValue())); this.namespace = namespace; } - + public FetchBucketPropsOperation build() { return new FetchBucketPropsOperation(this); } } - + /** * Response from Fetching a bucket's properties. */ @@ -114,7 +114,7 @@ private Response(BucketProperties props) { this.props = props; } - + /** * Returns the fetched BucketProperties. * @return the BucketProperties. diff --git a/src/main/java/com/basho/riak/client/core/operations/FetchOperation.java b/src/main/java/com/basho/riak/client/core/operations/FetchOperation.java index 09e940de4..bc0784be6 100644 --- a/src/main/java/com/basho/riak/client/core/operations/FetchOperation.java +++ b/src/main/java/com/basho/riak/client/core/operations/FetchOperation.java @@ -16,7 +16,6 @@ package com.basho.riak.client.core.operations; import com.basho.riak.client.api.cap.BasicVClock; -import com.basho.riak.client.api.cap.VClock; import com.basho.riak.client.core.FutureOperation; import com.basho.riak.client.core.RiakMessage; import com.basho.riak.client.core.converters.RiakObjectConverter; @@ -41,7 +40,7 @@ public class FetchOperation extends FutureOperation responses) + protected FetchOperation.Response convert(List responses) { // This is not a streaming op, there will only be one response if (responses.size() > 1) { logger.error("Received {} responses when only one was expected.", responses.size()); } - + RiakKvPB.RpbGetResp response = responses.get(0); - - FetchOperation.Response.Builder responseBuilder = + + FetchOperation.Response.Builder responseBuilder = new FetchOperation.Response.Builder(); - - // If the response is null ... it means not found. Riak only sends + + // If the response is null ... it means not found. Riak only sends // a message code and zero bytes when that's the case. (See: decode() ) // Because that makes sense! if (null == response) { responseBuilder.withNotFound(true); } - else + else { // To unify the behavior of having just a tombstone vs. siblings // that include a tombstone, we create an empty object and mark @@ -104,7 +103,7 @@ protected FetchOperation.Response convert(List responses) RiakObject ro = new RiakObject() .setDeleted(true) .setVClock(new BasicVClock(response.getVclock().toByteArray())); - + responseBuilder.addObject(ro); } else @@ -115,7 +114,7 @@ protected FetchOperation.Response convert(List responses) responseBuilder.withUnchanged(response.hasUnchanged() ? response.getUnchanged() : false); } - + return responseBuilder.build(); } @@ -131,13 +130,13 @@ public Location getQueryInfo() { return location; } - + public static class Builder { - private final RiakKvPB.RpbGetReq.Builder reqBuilder = + private final RiakKvPB.RpbGetReq.Builder reqBuilder = RiakKvPB.RpbGetReq.newBuilder(); private final Location location; - + /** * Construct a FetchOperation that will retrieve an object from Riak stored * at the provided Location. @@ -149,14 +148,14 @@ public Builder(Location location) { throw new IllegalArgumentException("Location can not be null."); } - + reqBuilder.setKey(ByteString.copyFrom(location.getKey().unsafeGetValue())); reqBuilder.setBucket(ByteString.copyFrom(location.getNamespace().getBucketName().unsafeGetValue())); reqBuilder.setType(ByteString.copyFrom(location.getNamespace().getBucketType().unsafeGetValue())); this.location = location; - + } - + /** * Set the R value for this FetchOperation. * If not asSet the bucket default is used. @@ -203,7 +202,7 @@ public Builder withNotFoundOK(boolean notFoundOk) * Set the basic_quorum value. *

* The parameter controls whether a read request should return early in - * some fail cases. + * some fail cases. * E.g. If a quorum of nodes has already * returned notfound/error, don't wait around for the rest. *

@@ -232,7 +231,7 @@ public Builder withReturnDeletedVClock(boolean returnDeletedVClock) *

* Causes Riak to only return the metadata for the object. The value * will be asSet to null. - * @param headOnly true to return only metadata. + * @param headOnly true to return only metadata. * @return a reference to this object. */ public Builder withHeadOnly(boolean headOnly) @@ -242,9 +241,9 @@ public Builder withHeadOnly(boolean headOnly) } /** - * Do not return the object if the supplied vclock matches. + * Do not return the object if the supplied vclock matches. * @param vclock the vclock to match on - * @return a refrence to this object. + * @return a refrence to this object. */ public Builder withIfNotModified(byte[] vclock) { @@ -290,42 +289,42 @@ public Builder withSloppyQuorum(boolean sloppyQuorum) reqBuilder.setSloppyQuorum(sloppyQuorum); return this; } - + public FetchOperation build() { return new FetchOperation(this); } - - + + } - + protected static abstract class KvResponseBase { private final List objectList; - + protected KvResponseBase(Init builder) { this.objectList = builder.objectList; } - + public List getObjectList() { return objectList; } - - protected static abstract class Init> + + protected static abstract class Init> { private final List objectList = new LinkedList(); protected abstract T self(); protected abstract KvResponseBase build(); - + T addObject(RiakObject object) { objectList.add(object); return self(); } - + T addObjects(List objects) { objectList.addAll(objects); @@ -333,48 +332,48 @@ T addObjects(List objects) } } } - - + + public static class Response extends KvResponseBase { private final boolean notFound; private final boolean unchanged; - + private Response(Init builder) { super(builder); this.notFound = builder.notFound; this.unchanged = builder.unchanged; } - + public boolean isNotFound() { return notFound; } - + public boolean isUnchanged() { return unchanged; } - + protected static abstract class Init> extends KvResponseBase.Init { private boolean notFound; private boolean unchanged; - + T withNotFound(boolean notFound) { this.notFound = notFound; return self(); } - + T withUnchanged(boolean unchanged) { this.unchanged = unchanged; return self(); } } - + static class Builder extends Init { @Override diff --git a/src/main/java/com/basho/riak/client/core/operations/ListBucketsOperation.java b/src/main/java/com/basho/riak/client/core/operations/ListBucketsOperation.java index 2baf3585e..6592832f6 100644 --- a/src/main/java/com/basho/riak/client/core/operations/ListBucketsOperation.java +++ b/src/main/java/com/basho/riak/client/core/operations/ListBucketsOperation.java @@ -31,7 +31,7 @@ public class ListBucketsOperation extends FutureOperation rawResponse) + protected ListBucketsOperation.Response convert(List rawResponse) { List buckets = new ArrayList(rawResponse.size()); for (RiakKvPB.RpbListBucketsResp resp : rawResponse) @@ -69,7 +69,7 @@ protected RiakKvPB.RpbListBucketsResp decode(RiakMessage rawMessage) { try { - Operations.checkMessageType(rawMessage, RiakMessageCodes.MSG_ListBucketsResp); + Operations.checkPBMessageType(rawMessage, RiakMessageCodes.MSG_ListBucketsResp); return RiakKvPB.RpbListBucketsResp.parseFrom(rawMessage.getData()); } catch (InvalidProtocolBufferException e) @@ -84,19 +84,19 @@ public BinaryValue getQueryInfo() { return bucketType; } - + public static class Builder { - private final RiakKvPB.RpbListBucketsReq.Builder reqBuilder = + private final RiakKvPB.RpbListBucketsReq.Builder reqBuilder = RiakKvPB.RpbListBucketsReq.newBuilder().setStream(true); private BinaryValue bucketType = BinaryValue.create(Namespace.DEFAULT_BUCKET_TYPE); - + /** * Create a Builder for a ListBucketsOperation. */ public Builder() {} - + /** * Provide a timeout for this operation. * @param timeout value in milliseconds @@ -111,10 +111,10 @@ public Builder withTimeout(int timeout) reqBuilder.setTimeout(timeout); return this; } - + /** * Set the bucket type. - * If unset {@link Namespace#DEFAULT_BUCKET_TYPE} is used. + * If unset {@link Namespace#DEFAULT_BUCKET_TYPE} is used. * @param bucketType the bucket type to use * @return A reference to this object. */ @@ -128,34 +128,34 @@ public Builder withBucketType(BinaryValue bucketType) this.bucketType = bucketType; return this; } - + public ListBucketsOperation build() { return new ListBucketsOperation(this); } - + } - + public static class Response { private final BinaryValue bucketType; private final List buckets; - + Response(BinaryValue bucketType, List buckets) { this.bucketType = bucketType; this.buckets = buckets; } - + public BinaryValue getBucketType() { return bucketType; } - + public List getBuckets() { return buckets; } } - + } diff --git a/src/main/java/com/basho/riak/client/core/operations/ListKeysOperation.java b/src/main/java/com/basho/riak/client/core/operations/ListKeysOperation.java index f8cc7f561..2ebd99904 100644 --- a/src/main/java/com/basho/riak/client/core/operations/ListKeysOperation.java +++ b/src/main/java/com/basho/riak/client/core/operations/ListKeysOperation.java @@ -66,7 +66,7 @@ protected RiakKvPB.RpbListKeysResp decode(RiakMessage rawMessage) { try { - Operations.checkMessageType(rawMessage, RiakMessageCodes.MSG_ListKeysResp); + Operations.checkPBMessageType(rawMessage, RiakMessageCodes.MSG_ListKeysResp); return RiakKvPB.RpbListKeysResp.parseFrom(rawMessage.getData()); } catch (InvalidProtocolBufferException e) diff --git a/src/main/java/com/basho/riak/client/core/operations/MapReduceOperation.java b/src/main/java/com/basho/riak/client/core/operations/MapReduceOperation.java index 0ee1fb182..388364eed 100644 --- a/src/main/java/com/basho/riak/client/core/operations/MapReduceOperation.java +++ b/src/main/java/com/basho/riak/client/core/operations/MapReduceOperation.java @@ -43,7 +43,7 @@ public class MapReduceOperation extends FutureOperation rawResponse) { - // Riak streams the result back. Each message from Riak contains a int + // Riak streams the result back. Each message from Riak contains a int // that tells you what phase the result is from. The result from a phase // can span multiple messages. Each result chunk is a JSON array. - + final JsonNodeFactory factory = JsonNodeFactory.instance; final ObjectMapper mapper = new ObjectMapper(); final Map resultMap = new LinkedHashMap(); - + int phase = 0; - + for (RiakKvPB.RpbMapRedResp response : rawResponse) { if (response.hasPhase()) @@ -76,12 +76,12 @@ protected Response convert(List rawResponse) { jsonArray = resultMap.get(phase); } - else + else { jsonArray = factory.arrayNode(); resultMap.put(phase, jsonArray); } - + JsonNode responseJson; try { @@ -92,7 +92,7 @@ protected Response convert(List rawResponse) logger.error("Mapreduce job returned non-JSON; {}",response.getResponse().toStringUtf8()); throw new RuntimeException("Non-JSON response from MR job", ex); } - + if (responseJson.isArray()) { jsonArray.addAll((ArrayNode)responseJson); @@ -116,7 +116,7 @@ protected RiakMessage createChannelMessage() @Override protected RiakKvPB.RpbMapRedResp decode(RiakMessage rawMessage) { - Operations.checkMessageType(rawMessage, RiakMessageCodes.MSG_MapRedResp); + Operations.checkPBMessageType(rawMessage, RiakMessageCodes.MSG_MapRedResp); try { return RiakKvPB.RpbMapRedResp.parseFrom(rawMessage.getData()); @@ -138,13 +138,13 @@ public BinaryValue getQueryInfo() { return mapReduce; } - + public static class Builder { private final RiakKvPB.RpbMapRedReq.Builder reqBuilder = RiakKvPB.RpbMapRedReq.newBuilder(); private final BinaryValue mapReduce; - + /** * Create a MapReduce operation builder with the given function. * @@ -156,33 +156,33 @@ public Builder(BinaryValue mapReduce) { throw new IllegalArgumentException("MapReduce can not be null or empty."); } - - + + reqBuilder.setRequest(ByteString.copyFrom(mapReduce.unsafeGetValue())) .setContentType(ByteString.copyFromUtf8("application/json")); this.mapReduce = mapReduce; - + } - + public MapReduceOperation build() { return new MapReduceOperation(this); } } - + public static class Response { private final Map resultMap; - + Response(Map results) { this.resultMap = results; } - + public Map getResults() { return resultMap; } - + } } diff --git a/src/main/java/com/basho/riak/client/core/operations/Operations.java b/src/main/java/com/basho/riak/client/core/operations/Operations.java index 2a5f77607..fb72f9d77 100644 --- a/src/main/java/com/basho/riak/client/core/operations/Operations.java +++ b/src/main/java/com/basho/riak/client/core/operations/Operations.java @@ -21,7 +21,7 @@ public class Operations { - public static void checkMessageType(RiakMessage msg, byte expected) + public static void checkPBMessageType(RiakMessage msg, byte expected) { byte pbMessageCode = msg.getCode(); if (pbMessageCode != expected) @@ -47,7 +47,7 @@ public static short getUnsignedByteValue(byte b) /** * Convert a Java signed int to unsigned. - * Returns the unsigned value as a (signed) long. + * Returns the unsigned value as a (signed) long. * @param i a java signed int * @return a long containing the converted value. */ diff --git a/src/main/java/com/basho/riak/client/core/operations/PBFutureOperation.java b/src/main/java/com/basho/riak/client/core/operations/PBFutureOperation.java index f2b862eaf..c111be6f5 100644 --- a/src/main/java/com/basho/riak/client/core/operations/PBFutureOperation.java +++ b/src/main/java/com/basho/riak/client/core/operations/PBFutureOperation.java @@ -7,8 +7,6 @@ import com.google.protobuf.InvalidProtocolBufferException; import org.slf4j.LoggerFactory; -import java.util.List; - /** * An abstract PB operation that introduces generic encoding/decoding * @@ -23,7 +21,7 @@ public abstract class PBFutureOperation extends FutureOperation { protected final Builder reqBuilder; private final com.google.protobuf.Parser respParser; - private final byte reqMessageCode; + protected final byte reqMessageCode; private final byte respMessageCode; @@ -45,7 +43,7 @@ protected RiakMessage createChannelMessage() { @Override protected U decode(RiakMessage rawMessage) { - Operations.checkMessageType(rawMessage, respMessageCode); + Operations.checkPBMessageType(rawMessage, respMessageCode); try { byte[] data = rawMessage.getData(); @@ -65,15 +63,4 @@ protected U decode(RiakMessage rawMessage) { throw new IllegalArgumentException("Invalid message received", e); } } - - protected U checkAndGetSingleResponse(List responses) - { - if (responses.size() > 1) - { - LoggerFactory.getLogger(this.getClass()).error("Received {} responses when only one was expected.", - responses.size()); - } - - return responses.get(0); - } } diff --git a/src/main/java/com/basho/riak/client/core/operations/ResetBucketPropsOperation.java b/src/main/java/com/basho/riak/client/core/operations/ResetBucketPropsOperation.java index 6c4fec32d..f12590ce0 100644 --- a/src/main/java/com/basho/riak/client/core/operations/ResetBucketPropsOperation.java +++ b/src/main/java/com/basho/riak/client/core/operations/ResetBucketPropsOperation.java @@ -32,15 +32,15 @@ public class ResetBucketPropsOperation extends FutureOperation rawResponse) + protected Void convert(List rawResponse) { return null; } @@ -48,16 +48,16 @@ protected Void convert(List rawResponse) @Override protected RiakMessage createChannelMessage() { - RiakPB.RpbResetBucketReq req = + RiakPB.RpbResetBucketReq req = reqBuilder.build(); - + return new RiakMessage(RiakMessageCodes.MSG_ResetBucketReq, req.toByteArray()); } @Override protected Void decode(RiakMessage rawMessage) { - Operations.checkMessageType(rawMessage, RiakMessageCodes.MSG_ResetBucketResp); + Operations.checkPBMessageType(rawMessage, RiakMessageCodes.MSG_ResetBucketResp); return null; } @@ -66,15 +66,15 @@ public Namespace getQueryInfo() { return namespace; } - + public static class Builder { - private final RiakPB.RpbResetBucketReq.Builder reqBuilder = + private final RiakPB.RpbResetBucketReq.Builder reqBuilder = RiakPB.RpbResetBucketReq.newBuilder(); private final Namespace namespace; - + /** - * Construct a builder for a ResetBucketPropsOperation. + * Construct a builder for a ResetBucketPropsOperation. * @param namespace The namespace in Riak. */ public Builder(Namespace namespace) @@ -87,7 +87,7 @@ public Builder(Namespace namespace) reqBuilder.setType(ByteString.copyFrom(namespace.getBucketType().unsafeGetValue())); this.namespace = namespace; } - + public ResetBucketPropsOperation build() { return new ResetBucketPropsOperation(this); diff --git a/src/main/java/com/basho/riak/client/core/operations/SearchOperation.java b/src/main/java/com/basho/riak/client/core/operations/SearchOperation.java index 65b2facf5..6f5f29a06 100644 --- a/src/main/java/com/basho/riak/client/core/operations/SearchOperation.java +++ b/src/main/java/com/basho/riak/client/core/operations/SearchOperation.java @@ -57,7 +57,7 @@ private SearchOperation(Builder builder) } @Override - protected SearchOperation.Response convert(List rawResponse) + protected SearchOperation.Response convert(List rawResponse) { // This isn't a streaming op, there will only be one protobuf RiakSearchPB.RpbSearchQueryResp resp = rawResponse.get(0); @@ -92,7 +92,7 @@ protected RiakMessage createChannelMessage() @Override protected RiakSearchPB.RpbSearchQueryResp decode(RiakMessage rawMessage) { - Operations.checkMessageType(rawMessage, RiakMessageCodes.MSG_SearchQueryResp); + Operations.checkPBMessageType(rawMessage, RiakMessageCodes.MSG_SearchQueryResp); try { return RiakSearchPB.RpbSearchQueryResp.parseFrom(rawMessage.getData()); @@ -274,7 +274,7 @@ public static class Response implements Iterable private final List>> results; private final float maxScore; private final int numResults; - + Response(List>> results, float maxScore, int numResults) { this.results = results; @@ -308,13 +308,13 @@ public int numResults() /** * Returns the entire list of results from the search query. - * @return a list containing all the result sets. + * @return a list containing all the result sets. */ public List>> getAllResults() { return results; } - + } - + } diff --git a/src/main/java/com/basho/riak/client/core/operations/SecondaryIndexQueryOperation.java b/src/main/java/com/basho/riak/client/core/operations/SecondaryIndexQueryOperation.java index f19961a6c..49fe22ca0 100644 --- a/src/main/java/com/basho/riak/client/core/operations/SecondaryIndexQueryOperation.java +++ b/src/main/java/com/basho/riak/client/core/operations/SecondaryIndexQueryOperation.java @@ -17,7 +17,6 @@ import com.basho.riak.client.core.FutureOperation; import com.basho.riak.client.core.RiakMessage; -import com.basho.riak.client.core.query.Location; import com.basho.riak.client.core.query.Namespace; import com.basho.riak.client.core.util.BinaryValue; import com.basho.riak.protobuf.RiakMessageCodes; @@ -38,7 +37,7 @@ public class SecondaryIndexQueryOperation extends FutureOperation rawResponse) { - SecondaryIndexQueryOperation.Response.Builder responseBuilder = + SecondaryIndexQueryOperation.Response.Builder responseBuilder = new SecondaryIndexQueryOperation.Response.Builder(); - + for (RiakKvPB.RpbIndexResp pbEntry : rawResponse) { /** - * The 2i API is inconsistent on the Riak side. If it's not - * a range query, return_terms is ignored it only returns the + * The 2i API is inconsistent on the Riak side. If it's not + * a range query, return_terms is ignored it only returns the * list of object keys and you have to have * preserved the index key if you want to return it to the user - * with the results. - * + * with the results. + * * Also, the $key index queries just ignore return_terms altogether. */ - + if (pbReq.getReturnTerms() && !query.indexName.toString().equalsIgnoreCase("$key")) { if (pbReq.hasRangeMin()) { for (RpbPair pair : pbEntry.getResultsList()) { - responseBuilder.addEntry(new Response.Entry(BinaryValue.unsafeCreate(pair.getKey().toByteArray()), + responseBuilder.addEntry(new Response.Entry(BinaryValue.unsafeCreate(pair.getKey().toByteArray()), BinaryValue.unsafeCreate(pair.getValue().toByteArray()))); } } @@ -94,7 +93,7 @@ protected SecondaryIndexQueryOperation.Response convert(List @@ -445,9 +444,9 @@ public Builder withContinuation(BinaryValue continuation) *

*

* Note that this is not recommended for queries that could return a large - * result set; the overhead in Riak is substantial. + * result set; the overhead in Riak is substantial. *

- * + * * @param orderByKey true to sort the results, false to return as-is. * @return a reference to this object. */ @@ -467,7 +466,7 @@ public Builder withRegexTermFilter(BinaryValue filter) this.termFilter = filter; return this; } - + /** * Set the timeout for the query. *

@@ -481,7 +480,7 @@ public Builder withTimeout(int timeout) this.timeout = timeout; return this; } - + public Query build() { // sanity checks @@ -511,7 +510,7 @@ else if (termFilter != null && indexName.toStringUtf8().endsWith("_int")) } } } - + public static class Response { private final BinaryValue continuation; @@ -522,7 +521,7 @@ private Response(Builder builder) this.continuation = builder.continuation; this.entryList = builder.entryList; } - + public boolean hasContinuation() { return continuation != null; @@ -532,12 +531,12 @@ public BinaryValue getContinuation() { return continuation; } - + public List getEntryList() { return entryList; } - + public static class Entry { private final BinaryValue indexKey; @@ -569,31 +568,31 @@ public BinaryValue getObjectKey() return objectKey; } } - + static class Builder { private BinaryValue continuation; - private List entryList = + private List entryList = new ArrayList(); - + Builder withContinuation(BinaryValue continuation) { this.continuation = continuation; return this; } - + Builder addEntry(Response.Entry entry) { entryList.add(entry); return this; } - + Response build() { return new Response(this); } } - + } - + } diff --git a/src/main/java/com/basho/riak/client/core/operations/StoreBucketPropsOperation.java b/src/main/java/com/basho/riak/client/core/operations/StoreBucketPropsOperation.java index 9f35c32f6..3f738316b 100644 --- a/src/main/java/com/basho/riak/client/core/operations/StoreBucketPropsOperation.java +++ b/src/main/java/com/basho/riak/client/core/operations/StoreBucketPropsOperation.java @@ -42,7 +42,7 @@ private StoreBucketPropsOperation(Builder builder) } @Override - protected Void convert(List rawResponse) + protected Void convert(List rawResponse) { return null; } @@ -57,7 +57,7 @@ protected RiakMessage createChannelMessage() @Override protected Void decode(RiakMessage rawMessage) { - Operations.checkMessageType(rawMessage, RiakMessageCodes.MSG_SetBucketResp); + Operations.checkPBMessageType(rawMessage, RiakMessageCodes.MSG_SetBucketResp); return null; } @@ -437,7 +437,7 @@ public static class Builder extends PropsBuilder private final RiakPB.RpbSetBucketReq.Builder reqBuilder = RiakPB.RpbSetBucketReq.newBuilder(); private final Namespace namespace; - + /** * Constructs a builder for a StoreBucketPropsOperation. * @param namespace The namespace in Riak. @@ -452,13 +452,13 @@ public Builder(Namespace namespace) reqBuilder.setType(ByteString.copyFrom(namespace.getBucketType().unsafeGetValue())); this.namespace = namespace; } - + @Override protected Builder self() { return this; } - + public StoreBucketPropsOperation build() { reqBuilder.setProps(propsBuilder); diff --git a/src/main/java/com/basho/riak/client/core/operations/StoreOperation.java b/src/main/java/com/basho/riak/client/core/operations/StoreOperation.java index b9b6b2dcb..d12e27742 100644 --- a/src/main/java/com/basho/riak/client/core/operations/StoreOperation.java +++ b/src/main/java/com/basho/riak/client/core/operations/StoreOperation.java @@ -27,7 +27,7 @@ import java.util.List; -import static com.basho.riak.client.core.operations.Operations.checkMessageType; +import static com.basho.riak.client.core.operations.Operations.checkPBMessageType; import com.basho.riak.client.core.query.Location; import com.basho.riak.client.core.query.Namespace; @@ -42,7 +42,7 @@ public class StoreOperation extends FutureOperation responses) + protected Response convert(List responses) { // There should only be one response message from Riak. if (responses.size() != 1) @@ -59,29 +59,29 @@ protected Response convert(List responses) } RiakKvPB.RpbPutResp response = responses.get(0); - - StoreOperation.Response.Builder responseBuilder = + + StoreOperation.Response.Builder responseBuilder = new StoreOperation.Response.Builder(); - + // This only exists if no key was specified in the put request if (response.hasKey()) { responseBuilder.withGeneratedKey(BinaryValue.unsafeCreate(response.getKey().toByteArray())); } - + // Is there a case where this isn't true? Can a delete interleve? if (response.getContentCount() > 0) { responseBuilder.addObjects(RiakObjectConverter.convert(response.getContentList(), response.getVclock())); } - + return responseBuilder.build(); } @Override protected RiakKvPB.RpbPutResp decode(RiakMessage rawMessage) { - checkMessageType(rawMessage, RiakMessageCodes.MSG_PutResp); + checkPBMessageType(rawMessage, RiakMessageCodes.MSG_PutResp); try { return RiakKvPB.RpbPutResp.parseFrom(rawMessage.getData()); @@ -110,7 +110,7 @@ public static class Builder { private final RiakKvPB.RpbPutReq.Builder reqBuilder = RiakKvPB.RpbPutReq.newBuilder(); private final Location location; - + /** * Constructs a builder for a StoreOperation * @param location The location in Riak at which to store. @@ -121,15 +121,15 @@ public Builder(Location location) { throw new IllegalArgumentException("Location cannot be null"); } - + reqBuilder.setType(ByteString.copyFrom(location.getNamespace().getBucketType().unsafeGetValue())); reqBuilder.setBucket(ByteString.copyFrom(location.getNamespace().getBucketName().unsafeGetValue())); reqBuilder.setKey(ByteString.copyFrom(location.getKey().unsafeGetValue())); - + this.location = location; - + } - + /** * Constructs a builder for a StoreOperation. *

@@ -145,28 +145,28 @@ public Builder(Namespace namespace) } reqBuilder.setType(ByteString.copyFrom(namespace.getBucketType().unsafeGetValue())); reqBuilder.setBucket(ByteString.copyFrom(namespace.getBucketName().unsafeGetValue())); - + this.location = new Location(namespace, "RIAK_GENERATED"); - + } - - + + public Builder withContent(RiakObject content) { if (null == content) { throw new IllegalArgumentException("Object cannot be null."); } - + reqBuilder.setContent(RiakObjectConverter.convert(content)); - + if (content.getVClock() != null) { reqBuilder.setVclock(ByteString.copyFrom(content.getVClock().getBytes())); } return this; } - + /** * Set the W value for this StoreOperation. * If not asSet the bucket default is used. @@ -205,7 +205,7 @@ public Builder withPw(int pw) /** * Return the object after storing (including any siblings). - * @param returnBody true to return the object. + * @param returnBody true to return the object. * @return a reference to this object. */ public Builder withReturnBody(boolean returnBody) @@ -219,7 +219,7 @@ public Builder withReturnBody(boolean returnBody) *

* Causes Riak to only return the metadata for the object. The value * will be asSet to null. - * @param returnHead true to return only metadata. + * @param returnHead true to return only metadata. * @return a reference to this object. */ public Builder withReturnHead(boolean returnHead) @@ -231,7 +231,7 @@ public Builder withReturnHead(boolean returnHead) /** * Set the if_not_modified flag for this StoreOperation. *

- * Setting this to true means to update the value only if the + * Setting this to true means to update the value only if the * vclock in the supplied object matches the one in the database. *

*

@@ -250,13 +250,13 @@ public Builder withIfNotModified(boolean ifNotModified) /** * Set the if_none_match flag value for this StoreOperation. *

- * Setting this to true means store the value only if this - * bucket/key combination are not already defined. + * Setting this to true means store the value only if this + * bucket/key combination are not already defined. *

- * Be aware that there are several cases where + * Be aware that there are several cases where * this may not actually happen. Use of this option is discouraged. *

- * + * * @param ifNoneMatch the if_non-match value. * @return a reference to this object. */ @@ -318,49 +318,49 @@ public Builder withSloppyQuorum(boolean sloppyQuorum) reqBuilder.setSloppyQuorum(sloppyQuorum); return this; } - + public StoreOperation build() { return new StoreOperation(this); } - + } - + /** * Response returned from a StoreOperation */ public static class Response extends FetchOperation.KvResponseBase { private final BinaryValue generatedKey; - + private Response(Init builder) { super(builder); this.generatedKey = builder.generatedKey; } - + public boolean hasGeneratedKey() { return generatedKey != null; } - + public BinaryValue getGeneratedKey() { return generatedKey; } - + protected static abstract class Init> extends FetchOperation.KvResponseBase.Init { private BinaryValue generatedKey; - + T withGeneratedKey(BinaryValue key) { this.generatedKey = key; return self(); } - + } - + static class Builder extends Init { @Override @@ -368,7 +368,7 @@ protected Builder self() { return this; } - + @Override protected Response build() { diff --git a/src/main/java/com/basho/riak/client/core/operations/TTBFutureOperation.java b/src/main/java/com/basho/riak/client/core/operations/TTBFutureOperation.java new file mode 100644 index 000000000..1d757a868 --- /dev/null +++ b/src/main/java/com/basho/riak/client/core/operations/TTBFutureOperation.java @@ -0,0 +1,60 @@ +package com.basho.riak.client.core.operations; + +import com.basho.riak.client.core.FutureOperation; +import com.basho.riak.client.core.RiakMessage; +import com.basho.riak.protobuf.RiakMessageCodes; + +/** + * An abstract TTB operation that introduces generic encoding/decoding + * + * @author Alex Moore + * @param The type the operation returns + * @param Query info type + + * @since 2.0.6 + */ + +public abstract class TTBFutureOperation extends FutureOperation +{ + protected final byte reqMessageCode = RiakMessageCodes.MSG_TsTtbMsg; + protected final byte respMessageCode = RiakMessageCodes.MSG_TsTtbMsg; + protected final TTBEncoder requestBuilder; + protected final TTBParser responseParser; + + protected TTBFutureOperation(TTBEncoder requestBuilder, TTBParser responseParser) + { + this.requestBuilder = requestBuilder; + this.responseParser = responseParser; + } + + @Override + protected RiakMessage createChannelMessage() + { + return new RiakMessage(reqMessageCode, requestBuilder.build(), false); + } + + @Override + protected byte[] decode(RiakMessage rawMessage) + { + Operations.checkPBMessageType(rawMessage, respMessageCode); + + byte[] data = rawMessage.getData(); + + if (data.length == 0) // not found + { + return null; + } + + return data; + } + + public interface TTBEncoder + { + byte[] build(); + } + + public interface TTBParser + { + T parseFrom(byte[] data); + } +} diff --git a/src/main/java/com/basho/riak/client/core/operations/YzDeleteIndexOperation.java b/src/main/java/com/basho/riak/client/core/operations/YzDeleteIndexOperation.java index a3af69ccc..165ffd0e1 100644 --- a/src/main/java/com/basho/riak/client/core/operations/YzDeleteIndexOperation.java +++ b/src/main/java/com/basho/riak/client/core/operations/YzDeleteIndexOperation.java @@ -30,15 +30,15 @@ public class YzDeleteIndexOperation extends FutureOperation { private final RiakYokozunaPB.RpbYokozunaIndexDeleteReq.Builder reqBuilder; private final String indexName; - + private YzDeleteIndexOperation(Builder builder) { this.reqBuilder = builder.reqBuilder; this.indexName = builder.indexName; } - + @Override - protected Void convert(List rawResponse) + protected Void convert(List rawResponse) { return null; } @@ -48,13 +48,13 @@ protected RiakMessage createChannelMessage() { RiakYokozunaPB.RpbYokozunaIndexDeleteReq req = reqBuilder.build(); return new RiakMessage(RiakMessageCodes.MSG_YokozunaIndexDeleteReq, req.toByteArray()); - + } @Override protected Void decode(RiakMessage rawMessage) { - Operations.checkMessageType(rawMessage, RiakMessageCodes.MSG_DelResp); + Operations.checkPBMessageType(rawMessage, RiakMessageCodes.MSG_DelResp); return null; } @@ -63,13 +63,13 @@ public String getQueryInfo() { return indexName; } - + public static class Builder { private RiakYokozunaPB.RpbYokozunaIndexDeleteReq.Builder reqBuilder = RiakYokozunaPB.RpbYokozunaIndexDeleteReq.newBuilder(); private final String indexName; - + public Builder(String indexName) { if (null == indexName || indexName.length() == 0) @@ -79,7 +79,7 @@ public Builder(String indexName) reqBuilder.setName(ByteString.copyFromUtf8(indexName)); this.indexName = indexName; } - + public YzDeleteIndexOperation build() { return new YzDeleteIndexOperation(this); diff --git a/src/main/java/com/basho/riak/client/core/operations/YzFetchIndexOperation.java b/src/main/java/com/basho/riak/client/core/operations/YzFetchIndexOperation.java index 77191d0a9..1872d90a2 100644 --- a/src/main/java/com/basho/riak/client/core/operations/YzFetchIndexOperation.java +++ b/src/main/java/com/basho/riak/client/core/operations/YzFetchIndexOperation.java @@ -29,30 +29,30 @@ * @author Brian Roach */ public class YzFetchIndexOperation extends FutureOperation -{ +{ private final RiakYokozunaPB.RpbYokozunaIndexGetReq.Builder reqBuilder; private final String indexName; - + private YzFetchIndexOperation(Builder builder) { this.reqBuilder = builder.reqBuilder; this.indexName = builder.indexName; } - + @Override protected Response convert(List rawResponse) { // This isn't a streaming op, so there's only one protobuf in the list RiakYokozunaPB.RpbYokozunaIndexGetResp response = rawResponse.get(0); List indexList = new ArrayList(response.getIndexCount()); - + for (RiakYokozunaPB.RpbYokozunaIndex pbIndex : response.getIndexList()) { indexList.add(new YokozunaIndex(pbIndex.getName().toStringUtf8(), pbIndex.getSchema().toStringUtf8()) .withNVal(pbIndex.getNVal())); } - + return new Response(indexList); } @@ -61,13 +61,13 @@ protected RiakMessage createChannelMessage() { RiakYokozunaPB.RpbYokozunaIndexGetReq req = reqBuilder.build(); return new RiakMessage(RiakMessageCodes.MSG_YokozunaIndexGetReq, req.toByteArray()); - + } @Override protected RiakYokozunaPB.RpbYokozunaIndexGetResp decode(RiakMessage rawMessage) { - Operations.checkMessageType(rawMessage, RiakMessageCodes.MSG_YokozunaIndexGetResp); + Operations.checkPBMessageType(rawMessage, RiakMessageCodes.MSG_YokozunaIndexGetResp); try { return RiakYokozunaPB.RpbYokozunaIndexGetResp.parseFrom(rawMessage.getData()); @@ -76,7 +76,7 @@ protected RiakYokozunaPB.RpbYokozunaIndexGetResp decode(RiakMessage rawMessage) { throw new IllegalArgumentException("Invalid message received", ex); } - + } @Override @@ -84,17 +84,17 @@ public String getQueryInfo() { return indexName; } - + public static class Builder { - private final RiakYokozunaPB.RpbYokozunaIndexGetReq.Builder reqBuilder = + private final RiakYokozunaPB.RpbYokozunaIndexGetReq.Builder reqBuilder = RiakYokozunaPB.RpbYokozunaIndexGetReq.newBuilder(); private String indexName = "All Indexes"; - + public Builder() {} - - public Builder withIndexName(String indexName) + + public Builder withIndexName(String indexName) { if (null == indexName || indexName.length() == 0) { @@ -104,22 +104,22 @@ public Builder withIndexName(String indexName) this.indexName = indexName; return this; } - + public YzFetchIndexOperation build() { return new YzFetchIndexOperation(this); } } - + public static class Response { private final List indexList; - + Response(List indexList) { this.indexList = indexList; } - + public List getIndexes() { return this.indexList; diff --git a/src/main/java/com/basho/riak/client/core/operations/YzGetSchemaOperation.java b/src/main/java/com/basho/riak/client/core/operations/YzGetSchemaOperation.java index 57eb53118..f725086ea 100644 --- a/src/main/java/com/basho/riak/client/core/operations/YzGetSchemaOperation.java +++ b/src/main/java/com/basho/riak/client/core/operations/YzGetSchemaOperation.java @@ -38,16 +38,16 @@ private YzGetSchemaOperation(Builder builder) this.reqBuilder = builder.reqBuilder; this.schemaName = builder.schemaName; } - + @Override protected YzGetSchemaOperation.Response convert(List rawResponse) { // This isn't a streaming op, so there's only one protobuf in the list RiakYokozunaPB.RpbYokozunaSchemaGetResp response = rawResponse.get(0); - + return new Response(new YokozunaSchema(response.getSchema().getName().toStringUtf8(), response.getSchema().getContent().toStringUtf8())); - + } @Override @@ -55,13 +55,13 @@ protected RiakMessage createChannelMessage() { RiakYokozunaPB.RpbYokozunaSchemaGetReq req = reqBuilder.build(); return new RiakMessage(RiakMessageCodes.MSG_YokozunaSchemaGetReq, req.toByteArray()); - + } @Override protected RiakYokozunaPB.RpbYokozunaSchemaGetResp decode(RiakMessage rawMessage) { - Operations.checkMessageType(rawMessage, RiakMessageCodes.MSG_YokozunaSchemaGetResp); + Operations.checkPBMessageType(rawMessage, RiakMessageCodes.MSG_YokozunaSchemaGetResp); try { return RiakYokozunaPB.RpbYokozunaSchemaGetResp.parseFrom(rawMessage.getData()); @@ -70,7 +70,7 @@ protected RiakYokozunaPB.RpbYokozunaSchemaGetResp decode(RiakMessage rawMessage) { throw new IllegalArgumentException("Invalid message received", ex); } - + } @Override @@ -78,13 +78,13 @@ public String getQueryInfo() { return schemaName; } - + public static class Builder { - private final RiakYokozunaPB.RpbYokozunaSchemaGetReq.Builder reqBuilder = + private final RiakYokozunaPB.RpbYokozunaSchemaGetReq.Builder reqBuilder = RiakYokozunaPB.RpbYokozunaSchemaGetReq.newBuilder(); private final String schemaName; - + public Builder(String schemaName) { if (null == schemaName || schemaName.length() == 0) @@ -94,22 +94,22 @@ public Builder(String schemaName) reqBuilder.setName(ByteString.copyFromUtf8(schemaName)); this.schemaName = schemaName; } - + public YzGetSchemaOperation build() { return new YzGetSchemaOperation(this); } } - + public static class Response { private final YokozunaSchema schema; - + Response(YokozunaSchema schema) { this.schema = schema; } - + public YokozunaSchema getSchema() { return schema; diff --git a/src/main/java/com/basho/riak/client/core/operations/YzPutIndexOperation.java b/src/main/java/com/basho/riak/client/core/operations/YzPutIndexOperation.java index e50859733..1dd16ae04 100644 --- a/src/main/java/com/basho/riak/client/core/operations/YzPutIndexOperation.java +++ b/src/main/java/com/basho/riak/client/core/operations/YzPutIndexOperation.java @@ -31,13 +31,13 @@ public class YzPutIndexOperation extends FutureOperation rawResponse) { @@ -49,13 +49,13 @@ protected RiakMessage createChannelMessage() { RiakYokozunaPB.RpbYokozunaIndexPutReq req = reqBuilder.build(); return new RiakMessage(RiakMessageCodes.MSG_YokozunaIndexPutReq, req.toByteArray()); - + } @Override protected Void decode(RiakMessage rawMessage) { - Operations.checkMessageType(rawMessage, RiakMessageCodes.MSG_PutResp); + Operations.checkPBMessageType(rawMessage, RiakMessageCodes.MSG_PutResp); return null; } @@ -64,7 +64,7 @@ public YokozunaIndex getQueryInfo() { return index; } - + public static class Builder { private final RiakYokozunaPB.RpbYokozunaIndexPutReq.Builder reqBuilder = @@ -82,7 +82,7 @@ public Builder(YokozunaIndex index) final RiakYokozunaPB.RpbYokozunaIndex.Builder indexBuilder = RiakYokozunaPB.RpbYokozunaIndex.newBuilder(); indexBuilder.setName(ByteString.copyFromUtf8(index.getName())); - // A null schema is valid; the default will be used + // A null schema is valid; the default will be used if (index.getSchema() != null) { indexBuilder.setSchema(ByteString.copyFromUtf8(index.getSchema())); diff --git a/src/main/java/com/basho/riak/client/core/operations/YzPutSchemaOperation.java b/src/main/java/com/basho/riak/client/core/operations/YzPutSchemaOperation.java index c19f9efd4..8e4c5da65 100644 --- a/src/main/java/com/basho/riak/client/core/operations/YzPutSchemaOperation.java +++ b/src/main/java/com/basho/riak/client/core/operations/YzPutSchemaOperation.java @@ -31,13 +31,13 @@ public class YzPutSchemaOperation extends FutureOperation rawResponse) { @@ -54,7 +54,7 @@ protected RiakMessage createChannelMessage() @Override protected Void decode(RiakMessage rawMessage) { - Operations.checkMessageType(rawMessage, RiakMessageCodes.MSG_PutResp); + Operations.checkPBMessageType(rawMessage, RiakMessageCodes.MSG_PutResp); return null; } @@ -63,24 +63,24 @@ public YokozunaSchema getQueryInfo() { return schema; } - + public static class Builder { private final RiakYokozunaPB.RpbYokozunaSchemaPutReq.Builder reqBuilder = RiakYokozunaPB.RpbYokozunaSchemaPutReq.newBuilder(); private final YokozunaSchema schema; - + public Builder(YokozunaSchema schema) { - RiakYokozunaPB.RpbYokozunaSchema.Builder schemaBuilder = + RiakYokozunaPB.RpbYokozunaSchema.Builder schemaBuilder = RiakYokozunaPB.RpbYokozunaSchema.newBuilder(); - + schemaBuilder.setName(ByteString.copyFromUtf8(schema.getName())); schemaBuilder.setContent(ByteString.copyFromUtf8(schema.getContent())); reqBuilder.setSchema(schemaBuilder); this.schema = schema; } - + public YzPutSchemaOperation build() { return new YzPutSchemaOperation(this); diff --git a/src/main/java/com/basho/riak/client/core/operations/ts/FetchOperation.java b/src/main/java/com/basho/riak/client/core/operations/ts/FetchOperation.java index 61795773b..6da295f4c 100644 --- a/src/main/java/com/basho/riak/client/core/operations/ts/FetchOperation.java +++ b/src/main/java/com/basho/riak/client/core/operations/ts/FetchOperation.java @@ -1,10 +1,10 @@ package com.basho.riak.client.core.operations.ts; -import com.basho.riak.client.core.operations.PBFutureOperation; +import com.basho.riak.client.core.RiakMessage; +import com.basho.riak.client.core.operations.TTBFutureOperation; import com.basho.riak.client.core.query.timeseries.*; import com.basho.riak.protobuf.RiakTsPB; import com.basho.riak.protobuf.RiakMessageCodes; -import com.google.protobuf.ByteString; import java.util.List; @@ -15,28 +15,23 @@ * @author Sergey Galkin * @since 2.0.3 */ -public class FetchOperation extends PBFutureOperation +public class FetchOperation extends TTBFutureOperation { private final Builder builder; private String queryInfoMessage; private FetchOperation(Builder builder) { - super(RiakMessageCodes.MSG_TsGetReq, - RiakMessageCodes.MSG_TsGetResp, - builder.reqBuilder, - RiakTsPB.TsGetResp.PARSER); - + super(new TTBConverters.FetchEncoder(builder), new TTBConverters.QueryResultDecoder()); this.builder = builder; } @Override - protected QueryResult convert(List responses) + protected QueryResult convert(List responses) { // This is not a streaming op, there will only be one response - final RiakTsPB.TsGetResp response = checkAndGetSingleResponse(responses); - - return PbResultFactory.convertPbGetResp(response); + final byte[] bytes = checkAndGetSingleResponse(responses); + return this.responseParser.parseFrom(bytes); } @Override @@ -71,8 +66,7 @@ public static class Builder { private final String tableName; private final Iterable keyValues; - - private final RiakTsPB.TsGetReq.Builder reqBuilder = RiakTsPB.TsGetReq.newBuilder(); + private int timeout = 0; public Builder(String tableName, Iterable keyValues) { @@ -86,16 +80,13 @@ public Builder(String tableName, Iterable keyValues) throw new IllegalArgumentException("Key Values cannot be null or an empty."); } - this.reqBuilder.setTable(ByteString.copyFromUtf8(tableName)); - this.reqBuilder.addAllKey(ConvertibleIterable.asIterablePbCell(keyValues)); - this.tableName = tableName; this.keyValues = keyValues; } public Builder withTimeout(int timeout) { - this.reqBuilder.setTimeout(timeout); + this.timeout = timeout; return this; } @@ -103,5 +94,20 @@ public FetchOperation build() { return new FetchOperation(this); } + + public String getTableName() + { + return tableName; + } + + public Iterable getKeyValues() + { + return keyValues; + } + + public int getTimeout() + { + return timeout; + } } } diff --git a/src/main/java/com/basho/riak/client/core/operations/ts/QueryOperation.java b/src/main/java/com/basho/riak/client/core/operations/ts/QueryOperation.java index 37406131f..9b83a8281 100644 --- a/src/main/java/com/basho/riak/client/core/operations/ts/QueryOperation.java +++ b/src/main/java/com/basho/riak/client/core/operations/ts/QueryOperation.java @@ -1,9 +1,7 @@ package com.basho.riak.client.core.operations.ts; -import com.basho.riak.client.core.operations.PBFutureOperation; -import com.basho.riak.client.core.query.timeseries.PbResultFactory; +import com.basho.riak.client.core.operations.TTBFutureOperation; import com.basho.riak.client.core.query.timeseries.QueryResult; -import com.basho.riak.protobuf.RiakMessageCodes; import com.basho.riak.protobuf.RiakTsPB; import com.google.protobuf.ByteString; @@ -16,26 +14,23 @@ * @author Sergey Galkin * @since 2.0.3 */ -public class QueryOperation extends PBFutureOperation +public class QueryOperation extends TTBFutureOperation { private final String queryText; private QueryOperation(Builder builder) { - super(RiakMessageCodes.MSG_TsQueryReq, - RiakMessageCodes.MSG_TsQueryResp, - RiakTsPB.TsQueryReq.newBuilder().setQuery(builder.interpolationBuilder), - RiakTsPB.TsQueryResp.PARSER); + super(new TTBConverters.QueryEncoder(builder), new TTBConverters.QueryResultDecoder()); this.queryText = builder.queryText; } @Override - protected QueryResult convert(List responses) + protected QueryResult convert(List responses) { // This is not a streaming op, there will only be one response - final RiakTsPB.TsQueryResp response = checkAndGetSingleResponse(responses); - return PbResultFactory.convertPbQueryResp(response); + final byte[] response = checkAndGetSingleResponse(responses); + return this.responseParser.parseFrom(response); } @Override @@ -59,6 +54,11 @@ public Builder(String queryText) this.interpolationBuilder.setBase(ByteString.copyFromUtf8(queryText)); } + public String getQueryText() + { + return queryText; + } + public QueryOperation build() { return new QueryOperation(this); diff --git a/src/main/java/com/basho/riak/client/core/operations/ts/StoreOperation.java b/src/main/java/com/basho/riak/client/core/operations/ts/StoreOperation.java index dcc3f57a7..d5e9c410e 100644 --- a/src/main/java/com/basho/riak/client/core/operations/ts/StoreOperation.java +++ b/src/main/java/com/basho/riak/client/core/operations/ts/StoreOperation.java @@ -1,13 +1,8 @@ package com.basho.riak.client.core.operations.ts; -import com.basho.riak.client.core.operations.PBFutureOperation; -import com.basho.riak.client.core.query.timeseries.CollectionConverters; +import com.basho.riak.client.core.operations.TTBFutureOperation; import com.basho.riak.client.core.query.timeseries.ColumnDescription; -import com.basho.riak.client.core.query.timeseries.ConvertibleIterable; import com.basho.riak.client.core.query.timeseries.Row; -import com.basho.riak.protobuf.RiakMessageCodes; -import com.basho.riak.protobuf.RiakTsPB; -import com.google.protobuf.ByteString; import java.util.Collection; import java.util.List; @@ -19,30 +14,22 @@ * @author Sergey Galkin * @since 2.0.3 */ -public class StoreOperation extends PBFutureOperation +public class StoreOperation extends TTBFutureOperation { - private final String tableName; - private final int rowCount; + private final Builder builder; private String queryInfoMessage; - - private StoreOperation(Builder builder) + private StoreOperation(final Builder builder) { - super(RiakMessageCodes.MSG_TsPutReq, - RiakMessageCodes.MSG_TsPutResp, - builder.reqBuilder, - RiakTsPB.TsPutResp.PARSER); - - this.tableName = builder.reqBuilder.getTable().toStringUtf8(); - this.rowCount = builder.reqBuilder.getRowsCount(); + super(new TTBConverters.StoreEncoder(builder), new TTBConverters.VoidDecoder()); + this.builder = builder; } @Override - protected Void convert(List responses) + protected Void convert(List responses) { // This is not a streaming op, there will only be one response checkAndGetSingleResponse(responses); - return null; } @@ -59,12 +46,13 @@ public String getQueryInfo() private String createQueryInfoMessage() { - return "INSERT <" + this.rowCount + " rows> into " + this.tableName; + return "INSERT into " + builder.tableName; } public static class Builder { - private final RiakTsPB.TsPutReq.Builder reqBuilder; + private final String tableName; + private Collection rows; public Builder(String tableName) { @@ -73,27 +61,33 @@ public Builder(String tableName) throw new IllegalArgumentException("TableName can not be null or empty"); } - this.reqBuilder = RiakTsPB.TsPutReq.newBuilder(); - this.reqBuilder.setTable(ByteString.copyFromUtf8(tableName)); + this.tableName = tableName; } public Builder withColumns(Collection columns) { - this.reqBuilder.addAllColumns(CollectionConverters.convertColumnDescriptionsToPb(columns)); - return this; + throw new UnsupportedOperationException(); } - public Builder withRows(Iterable rows) + public Builder withRows(Collection rows) { - this.reqBuilder.addAllRows(ConvertibleIterable.asIterablePbRow(rows)); + this.rows = rows; return this; } + public String getTableName() + { + return tableName; + } + + public Collection getRows() + { + return rows; + } + public StoreOperation build() { return new StoreOperation(this); } } } - - diff --git a/src/main/java/com/basho/riak/client/core/operations/ts/TTBConverters.java b/src/main/java/com/basho/riak/client/core/operations/ts/TTBConverters.java new file mode 100644 index 000000000..30de0ba28 --- /dev/null +++ b/src/main/java/com/basho/riak/client/core/operations/ts/TTBConverters.java @@ -0,0 +1,115 @@ +package com.basho.riak.client.core.operations.ts; + +import com.basho.riak.client.core.codec.InvalidTermToBinaryException; +import com.basho.riak.client.core.codec.TermToBinaryCodec; +import com.basho.riak.client.core.operations.TTBFutureOperation; +import com.basho.riak.client.core.query.timeseries.Cell; +import com.basho.riak.client.core.query.timeseries.QueryResult; +import com.ericsson.otp.erlang.OtpErlangDecodeException; +import com.ericsson.otp.erlang.OtpOutputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.LinkedList; + +class TTBConverters +{ + private static Logger logger = LoggerFactory.getLogger(TTBConverters.class); + + private static abstract class BuilderTTBEncoder implements TTBFutureOperation.TTBEncoder + { + protected final T builder; + + BuilderTTBEncoder(T builder) + { + this.builder = builder; + } + + abstract OtpOutputStream buildMessage(); + + @Override + public byte[] build() + { + return buildMessage().toByteArray(); + } + } + + static class StoreEncoder extends BuilderTTBEncoder + { + StoreEncoder(StoreOperation.Builder builder) + { + super(builder); + } + + @Override + OtpOutputStream buildMessage() + { + return TermToBinaryCodec.encodeTsPutRequest(builder.getTableName(), builder.getRows()); + } + } + + static class FetchEncoder extends BuilderTTBEncoder + { + FetchEncoder(FetchOperation.Builder builder) + { + super(builder); + } + + @Override + OtpOutputStream buildMessage() + { + // TODO: Remove this later + LinkedList list = new LinkedList<>(); + for (Cell c : builder.getKeyValues()) + { + list.add(c); + } + return TermToBinaryCodec.encodeTsGetRequest(builder.getTableName(), list, builder.getTimeout()); + } + } + + static class QueryEncoder extends BuilderTTBEncoder + { + QueryEncoder(QueryOperation.Builder builder) + { + super(builder); + } + + @Override + OtpOutputStream buildMessage() + { + return TermToBinaryCodec.encodeTsQueryRequest(builder.getQueryText()); + } + } + + static class VoidDecoder implements TTBFutureOperation.TTBParser + { + @Override + public Void parseFrom(byte[] data) + { + return null; + } + } + + static class QueryResultDecoder implements TTBFutureOperation.TTBParser + { + @Override + public QueryResult parseFrom(byte[] data) + { + QueryResult rv; + + try + { + rv = TermToBinaryCodec.decodeTsResultResponse(data); + } + catch (OtpErlangDecodeException | InvalidTermToBinaryException ex) + { + final String errorMsg = "Error decoding Riak TTB response"; + logger.error(errorMsg, ex); + throw new IllegalArgumentException(errorMsg, ex); + } + + return rv; + } + } +} diff --git a/src/main/java/com/basho/riak/client/core/query/timeseries/Cell.java b/src/main/java/com/basho/riak/client/core/query/timeseries/Cell.java index ccdaf7dd9..1f2bf05a3 100644 --- a/src/main/java/com/basho/riak/client/core/query/timeseries/Cell.java +++ b/src/main/java/com/basho/riak/client/core/query/timeseries/Cell.java @@ -1,6 +1,7 @@ package com.basho.riak.client.core.query.timeseries; import com.basho.riak.client.core.util.BinaryValue; +import com.basho.riak.client.core.util.CharsetUtils; import com.basho.riak.protobuf.RiakTsPB; import com.google.protobuf.ByteString; @@ -26,9 +27,18 @@ public class Cell { - static final Cell NullCell = new Cell(RiakTsPB.TsCell.newBuilder().build()); - - private final RiakTsPB.TsCell pbCell; + private static final int VARCHAR_MASK = 0x00000001; + private static final int SINT64_MASK = 0x00000002; + private static final int DOUBLE_MASK = 0x00000004; + private static final int TIMESTAMP_MASK = 0x00000008; + private static final int BOOLEAN_MASK = 0x00000010; + private int typeBitfield = 0x0; + + private String varcharValue = ""; + private long sint64Value = 0L; + private double doubleValue = 0.0; + private long timestampValue = 0L; + private boolean booleanValue = false; /** * Creates a new "Varchar" Cell, based on the UTF8 binary encoding of the provided String. @@ -42,8 +52,7 @@ public Cell(String varcharValue) throw new IllegalArgumentException("String value cannot be NULL."); } - final ByteString varcharByteString = ByteString.copyFromUtf8(varcharValue); - this.pbCell = RiakTsPB.TsCell.newBuilder().setVarcharValue(varcharByteString).build(); + initVarchar(varcharValue); } /** @@ -58,8 +67,8 @@ public Cell(BinaryValue varcharValue) throw new IllegalArgumentException("BinaryValue value cannot be NULL."); } - final ByteString varcharByteString = ByteString.copyFrom(varcharValue.getValue()); - this.pbCell = RiakTsPB.TsCell.newBuilder().setVarcharValue(varcharByteString).build(); + initVarchar(varcharValue.toStringUtf8()); + } /** @@ -69,7 +78,7 @@ public Cell(BinaryValue varcharValue) */ public Cell(long sint64Value) { - this.pbCell = RiakTsPB.TsCell.newBuilder().setSint64Value(sint64Value).build(); + initSInt64(sint64Value); } /** @@ -79,7 +88,7 @@ public Cell(long sint64Value) */ public Cell(double doubleValue) { - this.pbCell = RiakTsPB.TsCell.newBuilder().setDoubleValue(doubleValue).build(); + initDouble(doubleValue); } /** @@ -89,7 +98,7 @@ public Cell(double doubleValue) */ public Cell(boolean booleanValue) { - this.pbCell = RiakTsPB.TsCell.newBuilder().setBooleanValue(booleanValue).build(); + initBoolean(booleanValue); } /** @@ -104,7 +113,7 @@ public Cell(Calendar timestampValue) throw new IllegalArgumentException("Calendar object for timestamp value cannot be NULL."); } - this.pbCell = RiakTsPB.TsCell.newBuilder().setTimestampValue(timestampValue.getTimeInMillis()).build(); + initTimestamp(timestampValue.getTimeInMillis()); } /** @@ -119,12 +128,39 @@ public Cell(Date timestampValue) throw new IllegalArgumentException("Date object for timestamp value cannot be NULL."); } - this.pbCell = RiakTsPB.TsCell.newBuilder().setTimestampValue(timestampValue.getTime()).build(); + initTimestamp(timestampValue.getTime()); } Cell(RiakTsPB.TsCell pbCell) { - this.pbCell = pbCell; + if (pbCell.hasBooleanValue()) + { + initBoolean(pbCell.getBooleanValue()); + } + else if (pbCell.hasDoubleValue()) + { + initDouble(pbCell.getDoubleValue()); + } + else if (pbCell.hasSint64Value()) + { + initSInt64(pbCell.getSint64Value()); + } + else if (pbCell.hasTimestampValue()) + { + initTimestamp(pbCell.getTimestampValue()); + } + else if (pbCell.hasVarcharValue()) + { + initVarchar(pbCell.getVarcharValue().toStringUtf8()); + } + else + { + throw new IllegalArgumentException("Unknown PB Cell encountered."); + } + } + + private Cell() + { } /** @@ -135,68 +171,132 @@ public Cell(Date timestampValue) */ public static Cell newTimestamp(long rawTimestampValue) { - final RiakTsPB.TsCell tsCell = RiakTsPB.TsCell.newBuilder().setTimestampValue(rawTimestampValue).build(); - return new Cell(tsCell); + final Cell cell = new Cell(); + cell.initTimestamp(rawTimestampValue); + return cell; + } + + private void initBoolean(boolean booleanValue) + { + setBitfieldType(BOOLEAN_MASK); + this.booleanValue = booleanValue; + } + + private void initTimestamp(long timestampValue) + { + setBitfieldType(TIMESTAMP_MASK); + this.timestampValue = timestampValue; + } + + private void initDouble(double doubleValue) + { + setBitfieldType(DOUBLE_MASK); + this.doubleValue = doubleValue; + } + + private void initSInt64(long longValue) + { + setBitfieldType(SINT64_MASK); + this.sint64Value = longValue; + } + + private void initVarchar(String stringValue) + { + setBitfieldType(VARCHAR_MASK); + this.varcharValue = stringValue; + } + + private void setBitfieldType(int mask) + { + typeBitfield |= mask; + } + + private boolean bitfieldHasType(int mask) + { + return (typeBitfield & mask) == mask; } public boolean hasVarcharValue() { - return pbCell.hasVarcharValue(); + return bitfieldHasType(VARCHAR_MASK); } public boolean hasLong() { - return pbCell.hasSint64Value(); + return bitfieldHasType(SINT64_MASK); } - public boolean hasTimestamp() + public boolean hasDouble() { - return pbCell.hasTimestampValue(); + return bitfieldHasType(DOUBLE_MASK); } - public boolean hasBoolean() + public boolean hasTimestamp() { - return pbCell.hasBooleanValue(); + return bitfieldHasType(TIMESTAMP_MASK); } - public boolean hasDouble() + public boolean hasBoolean() { - return pbCell.hasDoubleValue(); + return bitfieldHasType(BOOLEAN_MASK); } public String getVarcharAsUTF8String() { - return pbCell.getVarcharValue().toStringUtf8(); + return varcharValue; } public BinaryValue getVarcharValue() { - return BinaryValue.unsafeCreate(pbCell.getVarcharValue().toByteArray()); + return BinaryValue.unsafeCreate(varcharValue.getBytes(CharsetUtils.UTF_8)); } public long getLong() { - return pbCell.getSint64Value(); + return sint64Value; } public double getDouble() { - return pbCell.getDoubleValue(); + return doubleValue; } public long getTimestamp() { - return pbCell.getTimestampValue(); + return timestampValue; } public boolean getBoolean() { - return pbCell.getBooleanValue(); + return booleanValue; } RiakTsPB.TsCell getPbCell() { - return pbCell; + final RiakTsPB.TsCell.Builder builder = RiakTsPB.TsCell.newBuilder(); + + if (hasVarcharValue()) + { + builder.setVarcharValue(ByteString.copyFromUtf8(varcharValue)); + } + else if (hasLong()) + { + builder.setSint64Value(sint64Value); + } + else if (hasTimestamp()) + { + builder.setTimestampValue(timestampValue); + } + else if (hasBoolean()) + { + builder.setBooleanValue(booleanValue); + } + else if (hasDouble()) + { + builder.setDoubleValue(doubleValue); + } + + return builder.build(); } @Override @@ -252,13 +352,42 @@ public boolean equals(Object o) Cell cell = (Cell) o; - return !(pbCell != null ? !pbCell.equals(cell.pbCell) : cell.pbCell != null); + if (sint64Value != cell.sint64Value) + { + return false; + } + if (Double.compare(cell.doubleValue, doubleValue) != 0) + { + return false; + } + if (timestampValue != cell.timestampValue) + { + return false; + } + if (booleanValue != cell.booleanValue) + { + return false; + } + if (typeBitfield != cell.typeBitfield) + { + return false; + } + return varcharValue.equals(cell.varcharValue); } @Override public int hashCode() { - return pbCell != null ? pbCell.hashCode() : 0; + int result; + long temp; + result = varcharValue.hashCode(); + result = 31 * result + (int) (sint64Value ^ (sint64Value >>> 32)); + temp = Double.doubleToLongBits(doubleValue); + result = 31 * result + (int) (temp ^ (temp >>> 32)); + result = 31 * result + (int) (timestampValue ^ (timestampValue >>> 32)); + result = 31 * result + (booleanValue ? 1 : 0); + result = 31 * result + typeBitfield; + return result; } } diff --git a/src/main/java/com/basho/riak/client/core/query/timeseries/ConvertibleIterator.java b/src/main/java/com/basho/riak/client/core/query/timeseries/ConvertibleIterator.java index ad70b5307..358a175e1 100644 --- a/src/main/java/com/basho/riak/client/core/query/timeseries/ConvertibleIterator.java +++ b/src/main/java/com/basho/riak/client/core/query/timeseries/ConvertibleIterator.java @@ -11,6 +11,7 @@ */ public abstract class ConvertibleIterator implements Iterator { + private static final RiakTsPB.TsCell NullTSCell = RiakTsPB.TsCell.newBuilder().build(); private final Iterator iterator; public ConvertibleIterator(Iterator iterator) @@ -48,7 +49,7 @@ public ImmutablePBCellIterator(Iterator iterator) { protected RiakTsPB.TsCell convert(Cell cell) { if (cell == null) { - return Cell.NullCell.getPbCell(); + return NullTSCell; } return cell.getPbCell(); @@ -74,7 +75,7 @@ public ImmutableCellIterator(Iterator iterator) { @Override protected Cell convert(RiakTsPB.TsCell pbCell) { - if (pbCell.equals(Cell.NullCell.getPbCell())) + if (pbCell.equals(NullTSCell)) { return null; } diff --git a/src/main/java/com/basho/riak/client/core/query/timeseries/QueryResult.java b/src/main/java/com/basho/riak/client/core/query/timeseries/QueryResult.java index 5b69addc0..d9fa76d06 100644 --- a/src/main/java/com/basho/riak/client/core/query/timeseries/QueryResult.java +++ b/src/main/java/com/basho/riak/client/core/query/timeseries/QueryResult.java @@ -3,6 +3,8 @@ import com.basho.riak.protobuf.RiakTsPB; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -18,14 +20,16 @@ public class QueryResult implements Iterable { public static final QueryResult EMPTY = new QueryResult(); private final Iterable pbRows; - private final int pbRowsCount; + private final Row[] rows; + private final int rowCount; private final List pbColumnDescriptions; private QueryResult() { this.pbRows = Collections.emptyList(); - this.pbRowsCount = 0; + this.rowCount = 0; this.pbColumnDescriptions = Collections.emptyList(); + this.rows = null; } public QueryResult(List tsRows) @@ -37,14 +41,23 @@ public QueryResult(Iterable tsRowsIterator, int rowCount) { this.pbColumnDescriptions = null; this.pbRows = tsRowsIterator; - this.pbRowsCount = rowCount; + this.rowCount = rowCount; + this.rows = null; } public QueryResult(List columnsList, List rowsList) { this.pbColumnDescriptions = columnsList; this.pbRows = rowsList; - this.pbRowsCount = rowsList.size(); + this.rowCount = rowsList.size(); + this.rows = null; + } + + public QueryResult(Row[] rows) { + this.rows = rows; + this.rowCount = rows.length; + this.pbRows = Collections.emptyList(); + this.pbColumnDescriptions = Collections.emptyList(); } /** @@ -62,7 +75,11 @@ public List getColumnDescriptionsCopy() */ public Iterator iterator() { - return ConvertibleIterator.iterateAsRow(this.pbRows.iterator()); + if (this.rows != null) { + return Arrays.asList(this.rows).iterator(); + } else { + return ConvertibleIterator.iterateAsRow(this.pbRows.iterator()); + } } /** @@ -71,7 +88,7 @@ public Iterator iterator() */ public int getRowsCount() { - return this.pbRowsCount; + return this.rowCount; } /** diff --git a/src/main/java/com/basho/riak/client/core/query/timeseries/Row.java b/src/main/java/com/basho/riak/client/core/query/timeseries/Row.java index 8a015c6e3..0a5790c45 100644 --- a/src/main/java/com/basho/riak/client/core/query/timeseries/Row.java +++ b/src/main/java/com/basho/riak/client/core/query/timeseries/Row.java @@ -14,6 +14,8 @@ public class Row implements Iterable { private final RiakTsPB.TsRow pbRow; + private final Iterable cells; + private final int cellCount; /** * Create a new row. @@ -21,11 +23,16 @@ public class Row implements Iterable */ public Row(Iterable cells) { - final RiakTsPB.TsRow.Builder builder = RiakTsPB.TsRow.newBuilder(); + pbRow = null; + this.cells = cells; + int cellCount = 0; - builder.addAllCells(ConvertibleIterable.asIterablePbCell(cells)); + for (Cell ignored : this.cells) + { + cellCount++; + } - this.pbRow = builder.build(); + this.cellCount = cellCount; } /** @@ -34,16 +41,16 @@ public Row(Iterable cells) */ public Row(Cell... cells) { - final RiakTsPB.TsRow.Builder builder = RiakTsPB.TsRow.newBuilder(); - // TODO: consider avoiding ArrayList creation under the hood of Arrays.asList - builder.addAllCells(ConvertibleIterable.asIterablePbCell(Arrays.asList(cells))); - - this.pbRow = builder.build(); + pbRow = null; + this.cells = Arrays.asList(cells); + cellCount = cells.length; } Row(RiakTsPB.TsRow pbRow) { this.pbRow = pbRow; + cells = null; + cellCount = pbRow.getCellsCount(); } /** @@ -52,7 +59,7 @@ public Row(Cell... cells) */ public int getCellsCount() { - return pbRow.getCellsCount(); + return cellCount; } /** @@ -70,9 +77,16 @@ public List getCellsCopy() return cells; } - RiakTsPB.TsRow getPbRow() + public RiakTsPB.TsRow getPbRow() { - return pbRow; + if(pbRow != null) + { + return pbRow; + } + + RiakTsPB.TsRow.Builder builder = RiakTsPB.TsRow.newBuilder(); + builder.addAllCells(ConvertibleIterable.asIterablePbCell(cells)); + return builder.build(); } /** @@ -82,7 +96,14 @@ RiakTsPB.TsRow getPbRow() @Override public Iterator iterator() { - return ConvertibleIterator.iterateAsCell(pbRow.getCellsList().iterator()); + if(cells != null) + { + return cells.iterator(); + } + else // if (pbRow != null) + { + return ConvertibleIterator.iterateAsCell(pbRow.getCellsList().iterator()); + } } @Override @@ -97,15 +118,22 @@ public boolean equals(Object o) return false; } - Row cells = (Row) o; + Row cells1 = (Row) o; - return !(pbRow != null ? !pbRow.equals(cells.pbRow) : cells.pbRow != null); + if (cellCount != cells1.cellCount) + { + return false; + } + return getCellsCopy().equals(cells1.getCellsCopy()); } @Override public int hashCode() { - return pbRow != null ? pbRow.hashCode() : 0; + int result = pbRow != null ? pbRow.hashCode() : 0; + result = 31 * result + (cells != null ? cells.hashCode() : 0); + result = 31 * result + cellCount; + return result; } } diff --git a/src/test/java/com/basho/riak/client/api/commands/itest/ITestTimeSeries.java b/src/test/java/com/basho/riak/client/api/commands/itest/ITestTimeSeries.java index bb26c624e..2a69115a3 100644 --- a/src/test/java/com/basho/riak/client/api/commands/itest/ITestTimeSeries.java +++ b/src/test/java/com/basho/riak/client/api/commands/itest/ITestTimeSeries.java @@ -4,13 +4,18 @@ import com.basho.riak.client.api.commands.buckets.FetchBucketProperties; import com.basho.riak.client.api.commands.buckets.StoreBucketProperties; import com.basho.riak.client.api.commands.timeseries.*; +import com.basho.riak.client.core.RiakCluster; import com.basho.riak.client.core.RiakFuture; +import com.basho.riak.client.core.RiakNode; import com.basho.riak.client.core.operations.FetchBucketPropsOperation; +import com.basho.riak.client.core.operations.itest.ITestBase; import com.basho.riak.client.core.operations.itest.ts.ITestTsBase; import com.basho.riak.client.core.query.Namespace; import com.basho.riak.client.core.query.timeseries.*; import org.junit.FixMethodOrder; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import org.junit.runners.MethodSorters; import java.util.*; @@ -18,6 +23,7 @@ import java.util.concurrent.TimeUnit; import static org.junit.Assert.*; +import static org.junit.Assume.assumeTrue; /** * Time Series Commands Integration Tests @@ -70,6 +76,9 @@ private RiakFuture createTableAsync(final RiakClient client, Strin return client.executeAsync(cmd); } + @Rule + public ExpectedException thrown= ExpectedException.none(); + @Test public void test_a_TestCreateTableAndChangeNVal() throws InterruptedException, ExecutionException { @@ -381,7 +390,26 @@ public void test_r_TestDescribeTableCommandForNonExistingTable() throws Interrup final String message = describeFuture.cause().getMessage(); assertTrue(message.toLowerCase().contains(BAD_TABLE_NAME.toLowerCase())); - assertTrue(message.toLowerCase().contains("not an active table.")); + assertTrue(message.toLowerCase().contains("not an active table")); + } + + @Test + public void test_z_TestPBCErrorsReturnWhenSecurityIsOn() throws InterruptedException, ExecutionException + { + assumeTrue(security); + + thrown.expect(ExecutionException.class); + thrown.expectMessage("Security is enabled, please STARTTLS first"); + + // Build connection WITHOUT security + final RiakNode node = new RiakNode.Builder().withRemoteAddress(hostname).withRemotePort(pbcPort).build(); + final RiakCluster cluster = new RiakCluster.Builder(node).build(); + cluster.start(); + final RiakClient client = new RiakClient(cluster); + + Query query = new Query.Builder("DESCRIBE " + tableName).build(); + + final QueryResult result = client.execute(query); } private static List GetCreatedTableFullDescriptions() diff --git a/src/test/java/com/basho/riak/client/core/RiakMessageTest.java b/src/test/java/com/basho/riak/client/core/RiakMessageTest.java new file mode 100644 index 000000000..9416732bc --- /dev/null +++ b/src/test/java/com/basho/riak/client/core/RiakMessageTest.java @@ -0,0 +1,40 @@ +package com.basho.riak.client.core; + +import com.basho.riak.client.core.netty.RiakResponseException; +import com.basho.riak.protobuf.RiakMessageCodes; +import com.basho.riak.protobuf.RiakPB; +import com.google.protobuf.ByteString; +import org.junit.Assert; +import org.junit.Test; + +/** + * + * @author Luke Bakken + */ +public class RiakMessageTest +{ + @Test + public void parsesPbufErrorCorrectly() { + RiakPB.RpbErrorResp.Builder b = RiakPB.RpbErrorResp.newBuilder(); + b.setErrcode(1234); + b.setErrmsg(ByteString.copyFromUtf8("this is an error")); + RiakPB.RpbErrorResp rpbErrorResp = b.build(); + + RiakMessage msg = new RiakMessage(RiakMessageCodes.MSG_ErrorResp, rpbErrorResp.toByteArray()); + RiakResponseException err = msg.getRiakError(); + Assert.assertEquals("this is an error", err.getMessage()); + Assert.assertEquals(1234, err.getCode()); + } + + @Test + public void parsesTtbErrorCorrectly() { + final byte[] TTB_ERROR = {(byte)131, 104, 3, 100, 0, 12, 114, 112, 98, 101, 114, + 114, 111, 114, 114, 101, 115, 112, 109, 0, 0, 0, 16, 116, 104, 105, 115, 32, 105, 115, + 32, 97, 110, 32, 101, 114, 114, 111, 114, 98, 0, 0, 4, (byte)210}; + + RiakMessage msg = new RiakMessage(RiakMessageCodes.MSG_TsTtbMsg, TTB_ERROR); + RiakResponseException err = msg.getRiakError(); + Assert.assertEquals("this is an error", err.getMessage()); + Assert.assertEquals(1234, err.getCode()); + } +} \ No newline at end of file diff --git a/src/test/java/com/basho/riak/client/core/codec/TermToBinaryCodecTest.java b/src/test/java/com/basho/riak/client/core/codec/TermToBinaryCodecTest.java new file mode 100644 index 000000000..9742f0822 --- /dev/null +++ b/src/test/java/com/basho/riak/client/core/codec/TermToBinaryCodecTest.java @@ -0,0 +1,217 @@ +package com.basho.riak.client.core.codec; + +import com.basho.riak.client.api.RiakException; +import com.basho.riak.client.core.query.timeseries.Cell; +import com.basho.riak.client.core.query.timeseries.ColumnDescription; +import com.basho.riak.client.core.query.timeseries.QueryResult; +import com.basho.riak.client.core.query.timeseries.Row; +import com.basho.riak.protobuf.RiakTsPB; +import com.ericsson.otp.erlang.OtpErlangDecodeException; +import com.ericsson.otp.erlang.OtpOutputStream; +import com.google.protobuf.ByteString; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Calendar; +import java.util.List; + +/** + * + * @author Luke Bakken + */ +public class TermToBinaryCodecTest +{ + private static final String TABLE_NAME = "test_table"; + private static final String QUERY = "SELECT * FROM FRAZZLE"; + + @Test + public void encodesPutRequestCorrectly_1() { + // {tsputreq, <<"test_table">>, [], [{<<"varchar">>, 12345678, 12.34, true, 12345}, {<<"string">>, 8765432, 43.21, false, 543321}]} + final byte[] exp = {(byte)131, 104, 4, 100, 0, 8, 116, 115, 112, 117, 116, 114, 101, 113, 109, 0, + 0, 0, 10, 116, 101, 115, 116, 95, 116, 97, 98, 108, 101, 106, 108, 0, 0, + 0, 2, 104, 5, 109, 0, 0, 0, 7, 118, 97, 114, 99, 104, 97, 114, 98, 0, (byte)188, + 97, 78, + 70, 64, 65, 38, 102, 102, 102, 102, 102, + 100, 0, 4, 116, + 114, 117, 101, 98, 0, 0, 48, 57, 104, 5, 109, 0, 0, 0, 6, 115, 116, 114, + 105, 110, 103, 98, 0, (byte)133, (byte)191, (byte)248, + 70, 64, 65, 38, 102, 102, 102, 102, 102, + 100, 0, 5, 102, 97, 108, 115, 101, 98, 0, 8, 74, 89, 106}; + + Cell c1 = new Cell("varchar"); + Cell c2 = new Cell(12345678L); + Cell c3 = new Cell(34.3); + Cell c4 = new Cell(true); + Calendar cal1 = Calendar.getInstance(); + cal1.setTimeInMillis(12345); + Cell c5 = new Cell(cal1); + Row r1 = new Row(c1, c2, c3, c4, c5); + + Cell c6 = new Cell("string"); + Cell c7 = new Cell(8765432L); + Cell c8 = new Cell(34.3); + Cell c9 = new Cell(false); + Calendar cal2 = Calendar.getInstance(); + cal2.setTimeInMillis(543321); + Cell c10 = new Cell(cal2); + Row r2 = new Row(c6, c7, c8, c9, c10); + Row[] rows = { r1, r2 }; + + try { + OtpOutputStream os = TermToBinaryCodec.encodeTsPutRequest(TABLE_NAME, Arrays.asList(rows)); + os.flush(); + byte[] msg = os.toByteArray(); + Assert.assertArrayEquals(exp, msg); + } catch (IOException ex) { + Assert.fail(ex.getMessage()); + } + } + + @Test + public void encodesPutRequestCorrectly_2() { + // A = riakc_ts_put_operator:serialize(<<"test_table">>,[{<<"series">>, <<"family">>, 12345678, 1, true, 34.3, []}], true). + // A = {tsputreq,<<"test_table">>,[],[{<<"series">>,<<"family">>,12345678,1,true,34.3,[]}]} + final byte[] exp = {(byte)131,104,4, // outer tuple arity 4 + 100,0,8,116,115,112,117,116,114,101,113, // tsputreq atom + 109,0,0,0,10,116,101,115,116,95,116,97,98,108,101, // table name binary + 106, // empty list + 108,0,0,0,1, // list start arity 1 + 104,7, // row tuple arity 7 + 109,0,0,0,6,115,101,114,105,101,115, // series binary + 109,0,0,0,6,102,97,109,105,108,121, // family binary + 98,0,(byte)188,97,78, // integer + 97,1, // small integer + 100,0,4,116,114,117,101, // true atom + // NB: this is what Erlang generates, an old-style float + // 99,51,46,52,50,57,57,57,57,57,57,57,57,57,57,57,57,57,55,49,53,55,56,101,43,48,49,0,0,0,0,0, // float_ext len 31 + // NB: this is what JInterface generates, a new-style float + 70, 64, 65, 38, 102, 102, 102, 102, 102, + 106, // null cell empty list + 106}; // list arity 1 end + + final ArrayList rows = new ArrayList<>(1); + rows.add(new Row(new Cell("series"), new Cell("family"), Cell.newTimestamp(12345678), + new Cell(1L), new Cell(true), new Cell(34.3), null)); + + try { + OtpOutputStream os = TermToBinaryCodec.encodeTsPutRequest(TABLE_NAME, rows); + os.flush(); + byte[] msg = os.toByteArray(); + Assert.assertArrayEquals(exp, msg); + } catch (IOException ex) { + Assert.fail(ex.getMessage()); + } + } + + @Test + public void encodesGetRequestCorrectly() { + // {tsgetreq, <<"test_table">>, [<<"series">>, <<"family">>, 12345678], 5000} + final byte[] exp = {(byte)131, 104, 4, 100, 0, 8, 116, 115, 103, 101, 116, 114, 101, 113, 109, 0, + 0, 0, 10, 116, 101, 115, 116, 95, 116, 97, 98, 108, 101, 108, 0, 0, 0, 3, + 109, 0, 0, 0, 6, 115, 101, 114, 105, 101, 115, 109, 0, 0, 0, 6, 102, 97, + 109, 105, 108, 121, 98, 0, (byte)188, 97, 78, 106, 98, 0, 0, 19, (byte)136}; + + Cell k1 = new Cell("series"); + Cell k2 = new Cell("family"); + Cell k3 = new Cell(12345678); + Cell[] key = {k1, k2, k3}; + + try { + OtpOutputStream os = TermToBinaryCodec.encodeTsGetRequest(TABLE_NAME, Arrays.asList(key), 5000); + os.flush(); + byte[] msg = os.toByteArray(); + Assert.assertArrayEquals(exp, msg); + } catch (IOException ex) { + Assert.fail(ex.getMessage()); + } + } + + @Test + public void encodesQueryRequestCorrectly() { + // {tsqueryreq, {tsinterpolation, <<"SELECT * FROM FRAZZLE">>, []}, false, []} + final byte[] exp = {(byte)131,104,4,100,0,10,116,115,113,117,101,114,121,114,101, + 113,104,3,100,0,15,116,115,105,110,116,101,114,112,111, + 108,97,116,105,111,110,109,0,0,0,21,83,69,76,69,67,84, + 32,42,32,70,82,79,77,32,70,82,65,90,90,76,69,106,100,0, + 5,102,97,108,115,101,100,0,9,117,110,100,101,102,105, + 110,101,100}; + + try { + OtpOutputStream os = TermToBinaryCodec.encodeTsQueryRequest(QUERY); + os.flush(); + byte[] msg = os.toByteArray(); + Assert.assertArrayEquals(exp, msg); + } catch (IOException ex) { + Assert.fail(ex.getMessage()); + } + } + + @Test + public void decodesQueryResultCorrectly() throws OtpErlangDecodeException + { + + /* MSG = {tsqueryresp, DATA} + DATA = {COLUMN_NAMES, COLUMN_TYPES, ROWS} + COLUMN_NAMES = [binary, ...] + COLUMN_TYPES = [atom, ...] + ROWS = [ ROW, ...] + ROW = { binary :: numeric :: atom :: [], ... } + + { tsqueryresp, + { [<<"geohash">>,<<"user">>,<<"time">>,<<"weather">>,<<"temperature">>,<<"uv_index">>,<<"observed">>], + [varchar,varchar,timestamp,varchar,double,sint64,boolean], + [ + {<<"hash1">>,<<"user2">>,1443806600000,<<"cloudy">>,[],[],true} + ] + } + } + */ + + final byte[] input = + {-125, 104, 2, 100, 0, 11, 116, 115, 113, 117, 101, 114, 121, 114, 101, 115, 112, 104, 3, 108, 0, 0, + 0, 7, 109, 0, 0, 0, 7, 103, 101, 111, 104, 97, 115, 104, 109, 0, 0, 0, 4, 117, 115, 101, 114, 109, + 0, 0, 0, 4, 116, 105, 109, 101, 109, 0, 0, 0, 7, 119, 101, 97, 116, 104, 101, 114, 109, 0, 0, 0, 11, + 116, 101, 109, 112, 101, 114, 97, 116, 117, 114, 101, 109, 0, 0, 0, 8, 117, 118, 95, 105, 110, 100, + 101, 120, 109, 0, 0, 0, 8, 111, 98, 115, 101, 114, 118, 101, 100, 106, 108, 0, 0, 0, 7, 100, 0, 7, + 118, 97, 114, 99, 104, 97, 114, 100, 0, 7, 118, 97, 114, 99, 104, 97, 114, 100, 0, 9, 116, 105, 109, + 101, 115, 116, 97, 109, 112, 100, 0, 7, 118, 97, 114, 99, 104, 97, 114, 100, 0, 6, 100, 111, 117, + 98, 108, 101, 100, 0, 6, 115, 105, 110, 116, 54, 52, 100, 0, 7, 98, 111, 111, 108, 101, 97, 110, + 106, 108, 0, 0, 0, 1, 104, 7, 109, 0, 0, 0, 5, 104, 97, 115, 104, 49, 109, 0, 0, 0, 5, 117, 115, + 101, 114, 50, 110, 6, 0, 64, 91, -108, 41, 80, 1, 109, 0, 0, 0, 6, 99, 108, 111, 117, 100, 121, 106, + 106, 100, 0, 4, 116, 114, 117, 101, 106}; + + final ColumnDescription[] expectedColumnDescriptions = new ColumnDescription[7]; + expectedColumnDescriptions[0] = new ColumnDescription("geohash", ColumnDescription.ColumnType.VARCHAR); + expectedColumnDescriptions[1] = new ColumnDescription("user", ColumnDescription.ColumnType.VARCHAR); + expectedColumnDescriptions[2] = new ColumnDescription("time", ColumnDescription.ColumnType.TIMESTAMP); + expectedColumnDescriptions[3] = new ColumnDescription("weather", ColumnDescription.ColumnType.VARCHAR); + expectedColumnDescriptions[4] = new ColumnDescription("temperature", ColumnDescription.ColumnType.DOUBLE); + expectedColumnDescriptions[5] = new ColumnDescription("uv_index", ColumnDescription.ColumnType.SINT64); + expectedColumnDescriptions[6] = new ColumnDescription("observed", ColumnDescription.ColumnType.BOOLEAN); + + final Row row = new Row(new Cell("hash1"), new Cell("user2"), Cell.newTimestamp(1443806600000L), new Cell("cloudy"), null, null, new Cell(true)); + final Row[] expectedRows = new Row[1]; + expectedRows[0] = (row); + + try + { + final QueryResult actual = TermToBinaryCodec.decodeTsResultResponse(input); + + final List actualColumnDescriptions = actual.getColumnDescriptionsCopy(); + final List actualRows = actual.getRowsCopy(); + + Assert.assertArrayEquals(expectedColumnDescriptions, + actualColumnDescriptions.toArray(new ColumnDescription[actualColumnDescriptions.size()])); + + Assert.assertArrayEquals(expectedRows, actualRows.toArray(new Row[actualRows.size()])); + } + catch (InvalidTermToBinaryException ex) + { + Assert.fail(ex.getMessage()); + } + + } +} diff --git a/src/test/java/com/basho/riak/client/core/operations/OperationsTest.java b/src/test/java/com/basho/riak/client/core/operations/OperationsTest.java index 84e34ef9f..d5656f57f 100644 --- a/src/test/java/com/basho/riak/client/core/operations/OperationsTest.java +++ b/src/test/java/com/basho/riak/client/core/operations/OperationsTest.java @@ -9,7 +9,7 @@ /** * Operations Class Unit Tests - + * * @author Alex Moore * @since 2.0.3 */ @@ -19,11 +19,20 @@ public class OperationsTest public void testThatMessageCodesAreWrittenCorrectlyInErrorMessages() { // MSG_StartTls = 255 - RiakMessage msg = new RiakMessage(RiakMessageCodes.MSG_StartTls, new byte[0]); + RiakMessage msg = null; + + try + { + msg = new RiakMessage(RiakMessageCodes.MSG_StartTls, new byte[0]); + } + catch (Exception ex) + { + assertTrue("unexpected exception", false); + } try { - Operations.checkMessageType(msg, RiakMessageCodes.MSG_GetReq); + Operations.checkPBMessageType(msg, RiakMessageCodes.MSG_GetReq); } catch (IllegalStateException ex) { @@ -34,8 +43,8 @@ public void testThatMessageCodesAreWrittenCorrectlyInErrorMessages() @Test public void testThatSingedToUnsignedConversionIsCorrect() { - assertEquals(Operations.getUnsignedByteValue((byte)0x00), 0); - assertEquals(Operations.getUnsignedByteValue((byte)0xFF), 255); + assertEquals(Operations.getUnsignedByteValue((byte) 0x00), 0); + assertEquals(Operations.getUnsignedByteValue((byte) 0xFF), 255); assertEquals(Operations.getUnsignedIntValue(0x00000000), 0l); assertEquals(Operations.getUnsignedIntValue(0xffffffff), 4294967295l); } diff --git a/src/test/java/com/basho/riak/client/core/operations/itest/ts/ITestQueryOperation.java b/src/test/java/com/basho/riak/client/core/operations/itest/ts/ITestQueryOperation.java index 7c0056724..f77d54d51 100644 --- a/src/test/java/com/basho/riak/client/core/operations/itest/ts/ITestQueryOperation.java +++ b/src/test/java/com/basho/riak/client/core/operations/itest/ts/ITestQueryOperation.java @@ -7,6 +7,7 @@ import org.junit.BeforeClass; import org.junit.Test; +import java.util.Random; import java.util.concurrent.ExecutionException; import static org.junit.Assert.*; @@ -67,4 +68,22 @@ public void querySomeMatches() throws ExecutionException, InterruptedException assertEquals(7, queryResult.getColumnDescriptionsCopy().size()); assertEquals(1, queryResult.getRowsCount()); } + + @Test + public void queryCreateTable() throws ExecutionException, InterruptedException + { + final String queryText = "CREATE TABLE BobbyTables" + Integer.toString(new Random().nextInt()) + + " (" + + " k1 varchar not null," + + " k2 varchar not null," + + " k3 timestamp not null, " + + " PRIMARY KEY((k1, k2, quantum(k3, 15, 'm')),k1, k2, k3)" + + " )"; + + final QueryResult queryResult = executeQuery(new QueryOperation.Builder(queryText)); + + assertNotNull(queryResult); + assertEquals(0, queryResult.getColumnDescriptionsCopy().size()); + assertEquals(0, queryResult.getRowsCount()); + } }