deserializeMetadata(ByteBuffer buffer)
+ /**
+ * Deserializes the metadata section from the given byte buffer.
+ *
+ * This is the counterpart of {@link #serializeMetadata(DataOutputStream)} and it is guaranteed that the buffer will
+ * be positioned at the start of the metadata section when this method is called.
+ *
+ * Important: It is mandatory for implementations to leave the cursor at the end of the metadata, in
+ * the exact same position as it was when {@link #serializeMetadata(DataOutputStream)} was called.
+ *
+ * Important: This method will be called at the end of the BaseDataConstructor constructor to read
+ * the metadata section. This means that it will be called before the subclass have been constructor
+ * have been called. Therefore it is not possible to use any subclass fields in this method.
+ */
+ protected void deserializeMetadata(ByteBuffer buffer)
throws IOException {
- return Collections.emptyMap();
+ buffer.getInt();
}
private byte[] serializeExceptions()
@@ -572,14 +585,9 @@ private Map deserializeExceptions(ByteBuffer buffer)
@Override
public String toString() {
if (_dataSchema == null) {
- return _metadata.toString();
+ return "{}";
} else {
- StringBuilder stringBuilder = new StringBuilder();
- stringBuilder.append("resultSchema:").append('\n');
- stringBuilder.append(_dataSchema).append('\n');
- stringBuilder.append("numRows: ").append(_numRows).append('\n');
- stringBuilder.append("metadata: ").append(_metadata.toString()).append('\n');
- return stringBuilder.toString();
+ return "resultSchema:" + '\n' + _dataSchema + '\n' + "numRows: " + _numRows + '\n';
}
}
}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/datablock/ColumnarDataBlock.java b/pinot-common/src/main/java/org/apache/pinot/common/datablock/ColumnarDataBlock.java
index 216f4d9d9131..51b01e6b69bf 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/datablock/ColumnarDataBlock.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/datablock/ColumnarDataBlock.java
@@ -20,6 +20,8 @@
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Objects;
import org.apache.pinot.common.utils.DataSchema;
@@ -27,7 +29,7 @@
* Column-wise data table. It stores data in columnar-major format.
*/
public class ColumnarDataBlock extends BaseDataBlock {
- private static final int VERSION = 1;
+ private static final int VERSION = 2;
protected int[] _cumulativeColumnOffsetSizeInBytes;
protected int[] _columnSizeInBytes;
@@ -80,17 +82,21 @@ protected int positionOffsetInVariableBufferAndGetLength(int rowId, int colId) {
}
@Override
- public ColumnarDataBlock toMetadataOnlyDataTable() {
- ColumnarDataBlock metadataOnlyDataTable = new ColumnarDataBlock();
- metadataOnlyDataTable._metadata.putAll(_metadata);
- metadataOnlyDataTable._errCodeToExceptionMap.putAll(_errCodeToExceptionMap);
- return metadataOnlyDataTable;
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof ColumnarDataBlock)) {
+ return false;
+ }
+ ColumnarDataBlock that = (ColumnarDataBlock) o;
+ return Objects.deepEquals(_cumulativeColumnOffsetSizeInBytes, that._cumulativeColumnOffsetSizeInBytes)
+ && Objects.deepEquals(_columnSizeInBytes, that._columnSizeInBytes);
}
@Override
- public ColumnarDataBlock toDataOnlyDataTable() {
- return new ColumnarDataBlock(_numRows, _dataSchema, _stringDictionary, _fixedSizeDataBytes, _variableSizeDataBytes);
+ public int hashCode() {
+ return Objects.hash(Arrays.hashCode(_cumulativeColumnOffsetSizeInBytes), Arrays.hashCode(_columnSizeInBytes));
}
-
- // TODO: add whole-column access methods.
+// TODO: add whole-column access methods.
}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlock.java b/pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlock.java
index 418426b4ac6b..768d6cdb885d 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlock.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlock.java
@@ -80,10 +80,6 @@ byte[] toBytes()
@Nullable
RoaringBitmap getNullRowIds(int colId);
- DataBlock toMetadataOnlyDataTable();
-
- DataBlock toDataOnlyDataTable();
-
enum Type {
ROW(0),
COLUMNAR(1),
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java
index 27f114032849..1a67f980b115 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java
@@ -26,15 +26,20 @@
import org.apache.pinot.common.response.ProcessingException;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public final class DataBlockUtils {
+ private static final Logger LOGGER = LoggerFactory.getLogger(DataBlockUtils.class);
+
private DataBlockUtils() {
}
static final int VERSION_TYPE_SHIFT = 5;
public static MetadataBlock getErrorDataBlock(Exception e) {
+ LOGGER.info("Caught exception while processing query", e);
if (e instanceof ProcessingException) {
return getErrorDataBlock(Collections.singletonMap(((ProcessingException) e).getErrorCode(), extractErrorMsg(e)));
} else {
@@ -51,33 +56,40 @@ private static String extractErrorMsg(Throwable t) {
}
public static MetadataBlock getErrorDataBlock(Map exceptions) {
- MetadataBlock errorBlock = new MetadataBlock(MetadataBlock.MetadataBlockType.ERROR);
- for (Map.Entry exception : exceptions.entrySet()) {
- errorBlock.addException(exception.getKey(), exception.getValue());
- }
- return errorBlock;
+ return MetadataBlock.newError(exceptions);
+ }
+
+ /**
+ * Reads an integer from the given byte buffer.
+ *
+ * The returned integer contains both the version and the type of the data block.
+ * {@link #getVersion(int)} and {@link #getType(int)} can be used to extract the version and the type.
+ * @param byteBuffer byte buffer to read from. A single int will be read
+ */
+ public static int readVersionType(ByteBuffer byteBuffer) {
+ return byteBuffer.getInt();
}
- public static MetadataBlock getEndOfStreamDataBlock() {
- return new MetadataBlock(MetadataBlock.MetadataBlockType.EOS);
+ public static int getVersion(int versionType) {
+ return versionType & ((1 << VERSION_TYPE_SHIFT) - 1);
}
- public static MetadataBlock getEndOfStreamDataBlock(Map stats) {
- return new MetadataBlock(MetadataBlock.MetadataBlockType.EOS, stats);
+ public static DataBlock.Type getType(int versionType) {
+ return DataBlock.Type.fromOrdinal(versionType >> VERSION_TYPE_SHIFT);
}
public static DataBlock getDataBlock(ByteBuffer byteBuffer)
throws IOException {
- int versionType = byteBuffer.getInt();
- int version = versionType & ((1 << VERSION_TYPE_SHIFT) - 1);
- DataBlock.Type type = DataBlock.Type.fromOrdinal(versionType >> VERSION_TYPE_SHIFT);
+ int versionType = readVersionType(byteBuffer);
+ int version = getVersion(versionType);
+ DataBlock.Type type = getType(versionType);
switch (type) {
case COLUMNAR:
return new ColumnarDataBlock(byteBuffer);
case ROW:
return new RowDataBlock(byteBuffer);
case METADATA:
- return new MetadataBlock(byteBuffer);
+ return MetadataBlock.deserialize(byteBuffer, version);
default:
throw new UnsupportedOperationException("Unsupported data table version: " + version + " with type: " + type);
}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/datablock/MetadataBlock.java b/pinot-common/src/main/java/org/apache/pinot/common/datablock/MetadataBlock.java
index 41c2e9d0ea8c..164ca5157701 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/datablock/MetadataBlock.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/datablock/MetadataBlock.java
@@ -18,16 +18,19 @@
*/
package org.apache.pinot.common.datablock;
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
+import java.io.DataOutputStream;
import java.io.IOException;
+import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
-import java.util.HashMap;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
import java.util.Map;
+import java.util.Objects;
+import javax.annotation.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
@@ -36,108 +39,116 @@
*/
public class MetadataBlock extends BaseDataBlock {
- private static final ObjectMapper JSON = new ObjectMapper();
-
+ private static final Logger LOGGER = LoggerFactory.getLogger(MetadataBlock.class);
@VisibleForTesting
- static final int VERSION = 1;
-
- public enum MetadataBlockType {
- /**
- * Indicates that this block is the final block to be sent
- * (End Of Stream) as part of an operator chain computation.
- */
- EOS,
+ static final int VERSION = 2;
+ @Nullable
+ private List _statsByStage;
- /**
- * An {@code ERROR} metadata block indicates that there was
- * some error during computation. To retrieve the error that
- * occurred, use {@link MetadataBlock#getExceptions()}
- */
- ERROR
+ private MetadataBlock() {
+ this(Collections.emptyList());
}
- /**
- * Used to serialize the contents of the metadata block conveniently and in
- * a backwards compatible way. Use JSON because the performance of metadata block
- * SerDe should not be a bottleneck.
- */
- @JsonIgnoreProperties(ignoreUnknown = true)
- @VisibleForTesting
- static class Contents {
-
- private String _type;
- private Map _stats;
-
- @JsonCreator
- public Contents(@JsonProperty("type") String type, @JsonProperty("stats") Map stats) {
- _type = type;
- _stats = stats;
- }
-
- @JsonCreator
- public Contents() {
- this(null, new HashMap<>());
- }
-
- public String getType() {
- return _type;
- }
-
- public void setType(String type) {
- _type = type;
- }
-
- public Map getStats() {
- return _stats;
- }
+ public static MetadataBlock newEos() {
+ return new MetadataBlock();
+ }
- public void setStats(Map stats) {
- _stats = stats;
+ public static MetadataBlock newError(Map exceptions) {
+ MetadataBlock errorBlock = new MetadataBlock();
+ for (Map.Entry exception : exceptions.entrySet()) {
+ errorBlock.addException(exception.getKey(), exception.getValue());
}
+ return errorBlock;
}
- private final Contents _contents;
+ public MetadataBlock(List statsByStage) {
+ super(0, null, new String[0], new byte[0], new byte[0]);
+ _statsByStage = statsByStage;
+ }
- public MetadataBlock(MetadataBlockType type) {
- this(type, new HashMap<>());
+ MetadataBlock(ByteBuffer byteBuffer)
+ throws IOException {
+ super(byteBuffer);
}
- public MetadataBlock(MetadataBlockType type, Map stats) {
- super(0, null, new String[0], new byte[]{0}, toContents(new Contents(type.name(), stats)));
- _contents = new Contents(type.name(), stats);
+ @Override
+ protected void serializeMetadata(DataOutputStream output)
+ throws IOException {
+ if (_statsByStage == null) {
+ output.writeInt(0);
+ return;
+ }
+ int size = _statsByStage.size();
+ output.writeInt(size);
+ if (size > 0) {
+ byte[] bytes = new byte[4096];
+ for (ByteBuffer stat : _statsByStage) {
+ if (stat == null) {
+ output.writeBoolean(false);
+ } else {
+ output.writeBoolean(true);
+ output.writeInt(stat.remaining());
+ ByteBuffer duplicate = stat.duplicate();
+ while (duplicate.hasRemaining()) {
+ int length = Math.min(duplicate.remaining(), bytes.length);
+ duplicate.get(bytes, 0, length);
+ output.write(bytes, 0, length);
+ }
+ }
+ }
+ }
}
- private static byte[] toContents(Contents type) {
- try {
- return JSON.writeValueAsBytes(type);
- } catch (JsonProcessingException e) {
- throw new RuntimeException(e);
+ public static MetadataBlock deserialize(ByteBuffer byteBuffer, int version)
+ throws IOException {
+ switch (version) {
+ case 1:
+ case 2:
+ return new MetadataBlock(byteBuffer);
+ default:
+ throw new IOException("Unsupported metadata block version: " + version);
}
}
- public MetadataBlock(ByteBuffer byteBuffer)
+ @Override
+ protected void deserializeMetadata(ByteBuffer buffer)
throws IOException {
- super(byteBuffer);
- if (_variableSizeDataBytes != null && _variableSizeDataBytes.length > 0) {
- _contents = JSON.readValue(_variableSizeDataBytes, Contents.class);
- } else {
- _contents = new Contents();
+ try {
+ int statsSize = buffer.getInt();
+
+ List stats = new ArrayList<>(statsSize);
+
+ for (int i = 0; i < statsSize; i++) {
+ if (buffer.get() != 0) {
+ int length = buffer.getInt();
+ buffer.limit(buffer.position() + length);
+ stats.add(buffer.slice());
+ buffer.position(buffer.limit());
+ buffer.limit(buffer.capacity());
+ } else {
+ stats.add(null);
+ }
+ }
+ _statsByStage = stats;
+ } catch (BufferUnderflowException e) {
+ LOGGER.info("Failed to read stats from metadata block. Considering it empty", e);;
+ } catch (RuntimeException e) {
+ LOGGER.warn("Failed to read stats from metadata block. Considering it empty", e);;
}
}
public MetadataBlockType getType() {
- String type = _contents.getType();
-
- // if type is null, then we're reading a legacy block where we didn't encode any
- // data. assume that it is an EOS block if there's no exceptions and an ERROR block
- // otherwise
- return type == null
- ? (getExceptions().isEmpty() ? MetadataBlockType.EOS : MetadataBlockType.ERROR)
- : MetadataBlockType.valueOf(type);
+ return _errCodeToExceptionMap.isEmpty() ? MetadataBlockType.EOS : MetadataBlockType.ERROR;
}
- public Map getStats() {
- return _contents.getStats() != null ? _contents.getStats() : new HashMap<>();
+ /**
+ * Returns the list of serialized stats.
+ *
+ * The returned list may contain nulls, which would mean that no stats were available for that stage.
+ */
+ @Nullable
+ public List getStatsByStage() {
+ return _statsByStage;
}
@Override
@@ -156,12 +167,35 @@ protected int positionOffsetInVariableBufferAndGetLength(int rowId, int colId) {
}
@Override
- public MetadataBlock toMetadataOnlyDataTable() {
- return this;
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof MetadataBlock)) {
+ return false;
+ }
+ MetadataBlock that = (MetadataBlock) o;
+ return Objects.equals(_statsByStage, that._statsByStage)
+ && _errCodeToExceptionMap.equals(that._errCodeToExceptionMap);
}
@Override
- public MetadataBlock toDataOnlyDataTable() {
- throw new UnsupportedOperationException();
+ public int hashCode() {
+ return Objects.hash(_statsByStage, _errCodeToExceptionMap);
+ }
+
+ public enum MetadataBlockType {
+ /**
+ * Indicates that this block is the final block to be sent
+ * (End Of Stream) as part of an operator chain computation.
+ */
+ EOS,
+
+ /**
+ * An {@code ERROR} metadata block indicates that there was
+ * some error during computation. To retrieve the error that
+ * occurred, use {@link MetadataBlock#getExceptions()}
+ */
+ ERROR
}
}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/datablock/RowDataBlock.java b/pinot-common/src/main/java/org/apache/pinot/common/datablock/RowDataBlock.java
index f5aa80648936..33a5e77cf764 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/datablock/RowDataBlock.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/datablock/RowDataBlock.java
@@ -20,6 +20,8 @@
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Objects;
import org.apache.pinot.common.utils.DataSchema;
@@ -27,7 +29,7 @@
* Wrapper for row-wise data table. It stores data in row-major format.
*/
public class RowDataBlock extends BaseDataBlock {
- private static final int VERSION = 1;
+ private static final int VERSION = 2;
protected int[] _columnOffsets;
protected int _rowSizeInBytes;
@@ -72,22 +74,25 @@ protected int positionOffsetInVariableBufferAndGetLength(int rowId, int colId) {
return _fixedSizeData.getInt(offset + 4);
}
- @Override
- public RowDataBlock toMetadataOnlyDataTable() {
- RowDataBlock metadataOnlyDataTable = new RowDataBlock();
- metadataOnlyDataTable._metadata.putAll(_metadata);
- metadataOnlyDataTable._errCodeToExceptionMap.putAll(_errCodeToExceptionMap);
- return metadataOnlyDataTable;
+ public int getRowSizeInBytes() {
+ return _rowSizeInBytes;
}
@Override
- public RowDataBlock toDataOnlyDataTable() {
- return new RowDataBlock(_numRows, _dataSchema, _stringDictionary, _fixedSizeDataBytes, _variableSizeDataBytes);
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof RowDataBlock)) {
+ return false;
+ }
+ RowDataBlock that = (RowDataBlock) o;
+ return _rowSizeInBytes == that._rowSizeInBytes && Objects.deepEquals(_columnOffsets, that._columnOffsets);
}
- public int getRowSizeInBytes() {
- return _rowSizeInBytes;
+ @Override
+ public int hashCode() {
+ return Objects.hash(Arrays.hashCode(_columnOffsets), _rowSizeInBytes);
}
-
- // TODO: add whole-row access methods.
+// TODO: add whole-row access methods.
}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java
index 72121611eb04..5fb1018dceae 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java
@@ -110,6 +110,8 @@ enum MetadataKey {
NUM_SEGMENTS_PROCESSED(6, "numSegmentsProcessed", MetadataValueType.INT),
NUM_SEGMENTS_MATCHED(7, "numSegmentsMatched", MetadataValueType.INT),
NUM_CONSUMING_SEGMENTS_QUERIED(8, "numConsumingSegmentsQueried", MetadataValueType.INT),
+ // the timestamp indicating the freshness of the data queried in consuming segments.
+ // This can be ingestion timestamp if provided by the stream, or the last index time
MIN_CONSUMING_FRESHNESS_TIME_MS(9, "minConsumingFreshnessTimeMs", MetadataValueType.LONG),
TOTAL_DOCS(10, "totalDocs", MetadataValueType.LONG),
NUM_GROUPS_LIMIT_REACHED(11, "numGroupsLimitReached", MetadataValueType.STRING),
@@ -135,6 +137,7 @@ enum MetadataKey {
OPERATOR_ID(31, "operatorId", MetadataValueType.STRING),
OPERATOR_EXEC_START_TIME_MS(32, "operatorExecStartTimeMs", MetadataValueType.LONG),
OPERATOR_EXEC_END_TIME_MS(33, "operatorExecEndTimeMs", MetadataValueType.LONG),
+ // Not actually used
MAX_ROWS_IN_JOIN_REACHED(34, "maxRowsInJoinReached", MetadataValueType.STRING);
// We keep this constant to track the max id added so far for backward compatibility.
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/datatable/StatMap.java b/pinot-common/src/main/java/org/apache/pinot/common/datatable/StatMap.java
new file mode 100644
index 000000000000..9d2818958bb7
--- /dev/null
+++ b/pinot-common/src/main/java/org/apache/pinot/common/datatable/StatMap.java
@@ -0,0 +1,498 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.datatable;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Preconditions;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.annotation.Nullable;
+import org.apache.pinot.spi.utils.JsonUtils;
+
+
+/**
+ * A map that stores statistics.
+ *
+ * Statistics must be keyed by an enum that implements {@link StatMap.Key}.
+ *
+ * A stat map efficiently store, serialize and deserialize these statistics.
+ *
+ * Serialization and deserialization is backward and forward compatible as long as the only change in the keys are:
+ *
+ * - Adding new keys
+ * - Change the name of the keys
+ *
+ *
+ * Any other change (like changing the type of key, changing their literal order are not supported or removing keys)
+ * are backward incompatible changes.
+ * @param
+ */
+public class StatMap & StatMap.Key> {
+ private final Class _keyClass;
+ private final Map _map;
+
+ private static final ConcurrentHashMap, Object[]> KEYS_BY_CLASS = new ConcurrentHashMap<>();
+
+ public StatMap(Class keyClass) {
+ _keyClass = keyClass;
+ // TODO: Study whether this is fine or we should impose a single thread policy in StatMaps
+ _map = Collections.synchronizedMap(new EnumMap<>(keyClass));
+ }
+
+ public int getInt(K key) {
+ Preconditions.checkArgument(key.getType() == Type.INT, "Key %s is of type %s, not INT", key, key.getType());
+ Object o = _map.get(key);
+ return o == null ? 0 : (Integer) o;
+ }
+
+ public StatMap merge(K key, int value) {
+ if (key.getType() == Type.LONG) {
+ merge(key, (long) value);
+ return this;
+ }
+ int oldValue = getInt(key);
+ int newValue = key.merge(oldValue, value);
+ if (newValue == 0) {
+ _map.remove(key);
+ } else {
+ _map.put(key, newValue);
+ }
+ return this;
+ }
+
+ public long getLong(K key) {
+ if (key.getType() == Type.INT) {
+ return getInt(key);
+ }
+ Preconditions.checkArgument(key.getType() == Type.LONG, "Key %s is of type %s, not LONG", key, key.getType());
+ Object o = _map.get(key);
+ return o == null ? 0L : (Long) o;
+ }
+
+ public StatMap merge(K key, long value) {
+ Preconditions.checkArgument(key.getType() == Type.LONG, "Key %s is of type %s, not LONG", key, key.getType());
+ long oldValue = getLong(key);
+ long newValue = key.merge(oldValue, value);
+ if (newValue == 0) {
+ _map.remove(key);
+ } else {
+ _map.put(key, newValue);
+ }
+ return this;
+ }
+
+ public boolean getBoolean(K key) {
+ Preconditions.checkArgument(key.getType() == Type.BOOLEAN, "Key %s is of type %s, not BOOLEAN",
+ key, key.getType());
+ Object o = _map.get(key);
+ return o != null && (Boolean) o;
+ }
+
+ public StatMap merge(K key, boolean value) {
+ boolean oldValue = getBoolean(key);
+ boolean newValue = key.merge(oldValue, value);
+ if (!newValue) {
+ _map.remove(key);
+ } else {
+ _map.put(key, Boolean.TRUE);
+ }
+ return this;
+ }
+
+ public String getString(K key) {
+ Preconditions.checkArgument(key.getType() == Type.STRING, "Key %s is of type %s, not STRING", key, key.getType());
+ Object o = _map.get(key);
+ return o == null ? null : (String) o;
+ }
+
+ public StatMap merge(K key, String value) {
+ String oldValue = getString(key);
+ String newValue = key.merge(oldValue, value);
+ if (newValue == null) {
+ _map.remove(key);
+ } else {
+ _map.put(key, newValue);
+ }
+ return this;
+ }
+
+ /**
+ * Returns the value associated with the key.
+ *
+ * Primitives will be boxed, so it is recommended to use the specific methods for each type.
+ */
+ public Object getAny(K key) {
+ switch (key.getType()) {
+ case BOOLEAN:
+ return getBoolean(key);
+ case INT:
+ return getInt(key);
+ case LONG:
+ return getLong(key);
+ case STRING:
+ return getString(key);
+ default:
+ throw new IllegalArgumentException("Unsupported type: " + key.getType());
+ }
+ }
+
+ /**
+ * Modifies this object to merge the values of the other object.
+ *
+ * @param other The object to merge with. This argument will not be modified.
+ * @return this object once it is modified.
+ */
+ public StatMap merge(StatMap other) {
+ Preconditions.checkState(_keyClass.equals(other._keyClass),
+ "Different key classes %s and %s", _keyClass, other._keyClass);
+ for (Map.Entry entry : other._map.entrySet()) {
+ K key = entry.getKey();
+ Object value = entry.getValue();
+ if (value == null) {
+ continue;
+ }
+ switch (key.getType()) {
+ case BOOLEAN:
+ merge(key, (boolean) value);
+ break;
+ case INT:
+ merge(key, (int) value);
+ break;
+ case LONG:
+ merge(key, (long) value);
+ break;
+ case STRING:
+ merge(key, (String) value);
+ break;
+ default:
+ throw new IllegalArgumentException("Unsupported type: " + key.getType());
+ }
+ }
+ return this;
+ }
+
+ public StatMap merge(DataInput input)
+ throws IOException {
+ byte serializedKeys = input.readByte();
+
+ K[] keys = (K[]) KEYS_BY_CLASS.computeIfAbsent(_keyClass, k -> k.getEnumConstants());
+ for (byte i = 0; i < serializedKeys; i++) {
+ int ordinal = input.readByte();
+ K key = keys[ordinal];
+ switch (key.getType()) {
+ case BOOLEAN:
+ merge(key, true);
+ break;
+ case INT:
+ merge(key, input.readInt());
+ break;
+ case LONG:
+ merge(key, input.readLong());
+ break;
+ case STRING:
+ merge(key, input.readUTF());
+ break;
+ default:
+ throw new IllegalStateException("Unknown type " + key.getType());
+ }
+ }
+ return this;
+ }
+
+ public ObjectNode asJson() {
+ ObjectNode node = JsonUtils.newObjectNode();
+
+ for (Map.Entry entry : _map.entrySet()) {
+ K key = entry.getKey();
+ Object value = entry.getValue();
+ switch (key.getType()) {
+ case BOOLEAN:
+ if (value == null) {
+ if (key.includeDefaultInJson()) {
+ node.put(key.getStatName(), false);
+ }
+ } else {
+ node.put(key.getStatName(), (boolean) value);
+ }
+ break;
+ case INT:
+ if (value == null) {
+ if (key.includeDefaultInJson()) {
+ node.put(key.getStatName(), 0);
+ }
+ } else {
+ node.put(key.getStatName(), (int) value);
+ }
+ break;
+ case LONG:
+ if (value == null) {
+ if (key.includeDefaultInJson()) {
+ node.put(key.getStatName(), 0L);
+ }
+ } else {
+ node.put(key.getStatName(), (long) value);
+ }
+ break;
+ case STRING:
+ if (value == null) {
+ if (key.includeDefaultInJson()) {
+ node.put(key.getStatName(), "");
+ }
+ } else {
+ node.put(key.getStatName(), (String) value);
+ }
+ break;
+ default:
+ throw new IllegalArgumentException("Unsupported type: " + key.getType());
+ }
+ }
+
+ return node;
+ }
+
+ public void serialize(DataOutput output)
+ throws IOException {
+
+ assert checkContainsNoDefault() : "No default value should be stored in the map";
+ output.writeByte(_map.size());
+
+ // We use written keys just to fail fast in tests if the number of keys written
+ // is not the same as the number of keys
+ int writtenKeys = 0;
+ K[] keys = (K[]) KEYS_BY_CLASS.computeIfAbsent(_keyClass, k -> k.getEnumConstants());
+ for (int ordinal = 0; ordinal < keys.length; ordinal++) {
+ K key = keys[ordinal];
+ switch (key.getType()) {
+ case BOOLEAN: {
+ if (getBoolean(key)) {
+ writtenKeys++;
+ output.writeByte(ordinal);
+ }
+ break;
+ }
+ case INT: {
+ int value = getInt(key);
+ if (value != 0) {
+ writtenKeys++;
+ output.writeByte(ordinal);
+ output.writeInt(value);
+ }
+ break;
+ }
+ case LONG: {
+ long value = getLong(key);
+ if (value != 0) {
+ writtenKeys++;
+ output.writeByte(ordinal);
+ output.writeLong(value);
+ }
+ break;
+ }
+ case STRING: {
+ String value = getString(key);
+ if (value != null) {
+ writtenKeys++;
+ output.writeByte(ordinal);
+ output.writeUTF(value);
+ }
+ break;
+ }
+ default:
+ throw new IllegalStateException("Unknown type " + key.getType());
+ }
+ }
+ assert writtenKeys == _map.size() : "Written keys " + writtenKeys + " but map size " + _map.size();
+ }
+
+ private boolean checkContainsNoDefault() {
+ for (Map.Entry entry : _map.entrySet()) {
+ K key = entry.getKey();
+ Object value = entry.getValue();
+ switch (key.getType()) {
+ case BOOLEAN:
+ if (value == null || !(boolean) value) {
+ throw new IllegalStateException("Boolean value must be true but " + value + " is stored for key " + key);
+ }
+ break;
+ case INT:
+ if (value == null || (int) value == 0) {
+ throw new IllegalStateException("Int value must be non-zero but " + value + " is stored for key " + key);
+ }
+ break;
+ case LONG:
+ if (value == null || (long) value == 0) {
+ throw new IllegalStateException("Long value must be non-zero but " + value + " is stored for key " + key);
+ }
+ break;
+ case STRING:
+ if (value == null) {
+ throw new IllegalStateException("String value must be non-null but null is stored for key " + key);
+ }
+ break;
+ default:
+ throw new IllegalArgumentException("Unsupported type: " + key.getType());
+ }
+ }
+ return true;
+ }
+
+ public static String getDefaultStatName(Key key) {
+ String name = key.name();
+ StringBuilder result = new StringBuilder();
+ boolean capitalizeNext = false;
+
+ for (char c : name.toCharArray()) {
+ if (c == '_') {
+ capitalizeNext = true;
+ } else {
+ if (capitalizeNext) {
+ result.append(c);
+ capitalizeNext = false;
+ } else {
+ result.append(Character.toLowerCase(c));
+ }
+ }
+ }
+
+ return result.toString();
+ }
+
+ public static & Key> StatMap deserialize(DataInput input, Class keyClass)
+ throws IOException {
+ StatMap result = new StatMap<>(keyClass);
+ result.merge(input);
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ StatMap> statMap = (StatMap>) o;
+ return Objects.equals(_map, statMap._map);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(_map);
+ }
+
+ @Override
+ public String toString() {
+ return asJson().toString();
+ }
+
+ public Class getKeyClass() {
+ return _keyClass;
+ }
+
+ public boolean isEmpty() {
+ return _map.isEmpty();
+ }
+
+ public Iterable keySet() {
+ return _map.keySet();
+ }
+
+ public interface Key {
+ String name();
+
+ /**
+ * The name of the stat used to report it. Names must be unique on the same key family.
+ */
+ default String getStatName() {
+ return getDefaultStatName(this);
+ }
+
+ default int merge(int value1, int value2) {
+ return value1 + value2;
+ }
+
+ default long merge(long value1, long value2) {
+ return value1 + value2;
+ }
+
+ default boolean merge(boolean value1, boolean value2) {
+ return value1 || value2;
+ }
+
+ default String merge(@Nullable String value1, @Nullable String value2) {
+ return value2 != null ? value2 : value1;
+ }
+
+ /**
+ * The type of the values associated to this key.
+ */
+ Type getType();
+
+ default boolean includeDefaultInJson() {
+ return false;
+ }
+
+ static int minPositive(int value1, int value2) {
+ if (value1 == 0 && value2 >= 0) {
+ return value2;
+ }
+ if (value2 == 0 && value1 >= 0) {
+ return value1;
+ }
+ return Math.min(value1, value2);
+ }
+
+ static long minPositive(long value1, long value2) {
+ if (value1 == 0 && value2 >= 0) {
+ return value2;
+ }
+ if (value2 == 0 && value1 >= 0) {
+ return value1;
+ }
+ return Math.min(value1, value2);
+ }
+
+ static int eqNotZero(int value1, int value2) {
+ if (value1 != value2) {
+ if (value1 == 0) {
+ return value2;
+ } else if (value2 == 0) {
+ return value1;
+ } else {
+ throw new IllegalStateException("Cannot merge non-zero values: " + value1 + " and " + value2);
+ }
+ }
+ return value1;
+ }
+ }
+
+ public enum Type {
+ BOOLEAN,
+ INT,
+ LONG,
+ STRING
+ }
+}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java b/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java
index 06f65fc2aa25..0d7ec268a749 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java
@@ -18,9 +18,12 @@
*/
package org.apache.pinot.common.response;
+import java.io.IOException;
import java.util.List;
+import javax.annotation.Nullable;
import org.apache.pinot.common.response.broker.QueryProcessingException;
import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.spi.utils.JsonUtils;
/**
@@ -33,6 +36,8 @@ public interface BrokerResponse {
*/
void setExceptions(List exceptions);
+ void addToExceptions(QueryProcessingException processingException);
+
/**
* Set the number of servers got queried by the broker.
*
@@ -47,21 +52,20 @@ public interface BrokerResponse {
*/
void setNumServersResponded(int numServersResponded);
+ long getTimeUsedMs();
+
/**
* Set the total time used in request handling, into the broker response.
*/
void setTimeUsedMs(long timeUsedMs);
- /**
- * Set the total number of rows in result set
- */
- void setNumRowsResultSet(int numRowsResultSet);
-
/**
* Convert the broker response to JSON String.
*/
- String toJsonString()
- throws Exception;
+ default String toJsonString()
+ throws IOException {
+ return JsonUtils.objectToString(this);
+ }
/**
* Returns the number of servers queried.
@@ -147,12 +151,13 @@ String toJsonString()
* set the result table.
* @param resultTable result table to be set.
*/
- void setResultTable(ResultTable resultTable);
+ void setResultTable(@Nullable ResultTable resultTable);
/**
* Get the result table.
* @return result table.
*/
+ @Nullable
ResultTable getResultTable();
/**
@@ -163,12 +168,10 @@ String toJsonString()
/**
* Get the total number of rows in result set
*/
- int getNumRowsResultSet();
-
- /**
- * Set the total thread cpu time used against offline table in request handling, into the broker response.
- */
- void setOfflineThreadCpuTimeNs(long offlineThreadCpuTimeNs);
+ default int getNumRowsResultSet() {
+ ResultTable resultTable = getResultTable();
+ return resultTable == null ? 0 : resultTable.getRows().size();
+ }
/**
* Get the thread cpu time used against offline table in request handling, from the broker response.
@@ -180,74 +183,43 @@ String toJsonString()
*/
long getRealtimeThreadCpuTimeNs();
- /**
- * Set the total thread cpu time used against realtime table in request handling, into the broker response.
- */
- void setRealtimeThreadCpuTimeNs(long realtimeThreadCpuTimeNs);
-
/**
* Get the system activities cpu time used against offline table in request handling, from the broker response.
*/
long getOfflineSystemActivitiesCpuTimeNs();
- /**
- * Set the system activities cpu time used against offline table in request handling, into the broker response.
- */
- void setOfflineSystemActivitiesCpuTimeNs(long offlineSystemActivitiesCpuTimeNs);
-
/**
* Get the system activities cpu time used against realtime table in request handling, from the broker response.
*/
long getRealtimeSystemActivitiesCpuTimeNs();
- /**
- * Set the system activities cpu time used against realtime table in request handling, into the broker response.
- */
- void setRealtimeSystemActivitiesCpuTimeNs(long realtimeSystemActivitiesCpuTimeNs);
-
/**
* Get the response serialization cpu time used against offline table in request handling, from the broker response.
*/
long getOfflineResponseSerializationCpuTimeNs();
- /**
- * Set the response serialization cpu time used against offline table in request handling, into the broker response.
- */
- void setOfflineResponseSerializationCpuTimeNs(long offlineResponseSerializationCpuTimeNs);
-
/**
* Get the response serialization cpu time used against realtime table in request handling, from the broker response.
*/
long getRealtimeResponseSerializationCpuTimeNs();
- /**
- * Set the response serialization cpu time used against realtime table in request handling, into the broker response.
- */
- void setRealtimeResponseSerializationCpuTimeNs(long realtimeResponseSerializationCpuTimeNs);
-
/**
* Get the total cpu time(thread cpu time + system activities cpu time + response serialization cpu time) used
* against offline table in request handling, from the broker response.
*/
- long getOfflineTotalCpuTimeNs();
-
- /**
- * Set the total cpu time(thread cpu time + system activities cpu time + response serialization cpu time) used
- * against offline table in request handling, into the broker response.
- */
- void setOfflineTotalCpuTimeNs(long offlineTotalCpuTimeNs);
+ default long getOfflineTotalCpuTimeNs() {
+ return getOfflineThreadCpuTimeNs() + getOfflineSystemActivitiesCpuTimeNs()
+ + getOfflineResponseSerializationCpuTimeNs();
+ }
/**
* Get the total cpu time(thread cpu time + system activities cpu time + response serialization cpu time) used
* against realtime table in request handling, from the broker response.
*/
- long getRealtimeTotalCpuTimeNs();
-
- /**
- * Set the total cpu time(thread cpu time + system activities cpu time + response serialization cpu time) used
- * against realtime table in request handling, into the broker response.
- */
- void setRealtimeTotalCpuTimeNs(long realtimeTotalCpuTimeNs);
+ default long getRealtimeTotalCpuTimeNs() {
+ return getRealtimeThreadCpuTimeNs() + getRealtimeSystemActivitiesCpuTimeNs()
+ + getRealtimeResponseSerializationCpuTimeNs();
+ }
/**
* Get the total number of segments pruned on the Broker side
@@ -264,11 +236,6 @@ String toJsonString()
*/
long getNumSegmentsPrunedByServer();
- /**
- * Set the total number of segments pruned on the Server side
- */
- void setNumSegmentsPrunedByServer(long numSegmentsPrunedByServer);
-
/**
* Get the total number of segments pruned due to invalid data or schema.
*
@@ -276,13 +243,6 @@ String toJsonString()
*/
long getNumSegmentsPrunedInvalid();
- /**
- * Set the total number of segments pruned due to invalid data or schema.
- *
- * This value is always lower or equal than {@link #getNumSegmentsPrunedByServer()}
- */
- void setNumSegmentsPrunedInvalid(long numSegmentsPrunedInvalid);
-
/**
* Get the total number of segments pruned by applying the limit optimization.
*
@@ -290,13 +250,6 @@ String toJsonString()
*/
long getNumSegmentsPrunedByLimit();
- /**
- * Set the total number of segments pruned by applying the limit optimization.
- *
- * This value is always lower or equal than {@link #getNumSegmentsPrunedByServer()}
- */
- void setNumSegmentsPrunedByLimit(long numSegmentsPrunedByLimit);
-
/**
* Get the total number of segments pruned applying value optimizations, like bloom filters.
*
@@ -304,33 +257,16 @@ String toJsonString()
*/
long getNumSegmentsPrunedByValue();
- /**
- * Set the total number of segments pruned applying value optimizations, like bloom filters.
- *
- * This value is always lower or equal than {@link #getNumSegmentsPrunedByServer()}
- */
- void setNumSegmentsPrunedByValue(long numSegmentsPrunedByValue);
-
/**
* Get the total number of segments with an EmptyFilterOperator when Explain Plan is called
*/
long getExplainPlanNumEmptyFilterSegments();
- /**
- * Set the total number of segments with an EmptyFilterOperator when Explain Plan is called
- */
- void setExplainPlanNumEmptyFilterSegments(long explainPlanNumEmptyFilterSegments);
-
/**
* Get the total number of segments with a MatchAllFilterOperator when Explain Plan is called
*/
long getExplainPlanNumMatchAllFilterSegments();
- /**
- * Set the total number of segments with a MatchAllFilterOperator when Explain Plan is called
- */
- void setExplainPlanNumMatchAllFilterSegments(long explainPlanNumMatchAllFilterSegments);
-
/**
* get request ID for the query
*/
@@ -354,4 +290,6 @@ String toJsonString()
long getBrokerReduceTimeMs();
void setBrokerReduceTimeMs(long brokerReduceTimeMs);
+
+ boolean isPartialResult();
}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
index 9fe098e26d51..d22ee51c89e1 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
@@ -22,6 +22,7 @@
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
+import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
@@ -37,7 +38,7 @@
/**
* This class implements pinot-broker's response format for any given query.
* All fields either primitive data types, or native objects (as opposed to JSONObjects).
- *
+ *
* Supports serialization via JSON.
*/
@JsonPropertyOrder({
@@ -78,8 +79,6 @@ public class BrokerResponseNative implements BrokerResponse {
private long _totalDocs = 0L;
private boolean _numGroupsLimitReached = false;
- private boolean _maxRowsInJoinReached = false;
- private boolean _partialResult = false;
private long _timeUsedMs = 0L;
private long _offlineThreadCpuTimeNs = 0L;
private long _realtimeThreadCpuTimeNs = 0L;
@@ -131,6 +130,7 @@ public static BrokerResponseNative empty() {
return new BrokerResponseNative();
}
+ @VisibleForTesting
public static BrokerResponseNative fromJsonString(String jsonString)
throws IOException {
return JsonUtils.stringToObject(jsonString, BrokerResponseNative.class);
@@ -143,7 +143,6 @@ public long getOfflineSystemActivitiesCpuTimeNs() {
}
@JsonProperty("offlineSystemActivitiesCpuTimeNs")
- @Override
public void setOfflineSystemActivitiesCpuTimeNs(long offlineSystemActivitiesCpuTimeNs) {
_offlineSystemActivitiesCpuTimeNs = offlineSystemActivitiesCpuTimeNs;
}
@@ -155,7 +154,6 @@ public long getRealtimeSystemActivitiesCpuTimeNs() {
}
@JsonProperty("realtimeSystemActivitiesCpuTimeNs")
- @Override
public void setRealtimeSystemActivitiesCpuTimeNs(long realtimeSystemActivitiesCpuTimeNs) {
_realtimeSystemActivitiesCpuTimeNs = realtimeSystemActivitiesCpuTimeNs;
}
@@ -167,7 +165,6 @@ public long getOfflineThreadCpuTimeNs() {
}
@JsonProperty("offlineThreadCpuTimeNs")
- @Override
public void setOfflineThreadCpuTimeNs(long timeUsedMs) {
_offlineThreadCpuTimeNs = timeUsedMs;
}
@@ -179,7 +176,6 @@ public long getRealtimeThreadCpuTimeNs() {
}
@JsonProperty("realtimeThreadCpuTimeNs")
- @Override
public void setRealtimeThreadCpuTimeNs(long timeUsedMs) {
_realtimeThreadCpuTimeNs = timeUsedMs;
}
@@ -191,7 +187,6 @@ public long getOfflineResponseSerializationCpuTimeNs() {
}
@JsonProperty("offlineResponseSerializationCpuTimeNs")
- @Override
public void setOfflineResponseSerializationCpuTimeNs(long offlineResponseSerializationCpuTimeNs) {
_offlineResponseSerializationCpuTimeNs = offlineResponseSerializationCpuTimeNs;
}
@@ -203,7 +198,6 @@ public long getRealtimeResponseSerializationCpuTimeNs() {
}
@JsonProperty("realtimeResponseSerializationCpuTimeNs")
- @Override
public void setRealtimeResponseSerializationCpuTimeNs(long realtimeResponseSerializationCpuTimeNs) {
_realtimeResponseSerializationCpuTimeNs = realtimeResponseSerializationCpuTimeNs;
}
@@ -215,7 +209,6 @@ public long getOfflineTotalCpuTimeNs() {
}
@JsonProperty("offlineTotalCpuTimeNs")
- @Override
public void setOfflineTotalCpuTimeNs(long offlineTotalCpuTimeNs) {
_offlineTotalCpuTimeNs = offlineTotalCpuTimeNs;
}
@@ -227,7 +220,6 @@ public long getRealtimeTotalCpuTimeNs() {
}
@JsonProperty("realtimeTotalCpuTimeNs")
- @Override
public void setRealtimeTotalCpuTimeNs(long realtimeTotalCpuTimeNs) {
_realtimeTotalCpuTimeNs = realtimeTotalCpuTimeNs;
}
@@ -251,7 +243,6 @@ public long getNumSegmentsPrunedByServer() {
}
@JsonProperty("numSegmentsPrunedByServer")
- @Override
public void setNumSegmentsPrunedByServer(long numSegmentsPrunedByServer) {
_numSegmentsPrunedByServer = numSegmentsPrunedByServer;
}
@@ -263,7 +254,6 @@ public long getNumSegmentsPrunedInvalid() {
}
@JsonProperty("numSegmentsPrunedInvalid")
- @Override
public void setNumSegmentsPrunedInvalid(long numSegmentsPrunedInvalid) {
_numSegmentsPrunedInvalid = numSegmentsPrunedInvalid;
}
@@ -275,7 +265,6 @@ public long getNumSegmentsPrunedByLimit() {
}
@JsonProperty("numSegmentsPrunedByLimit")
- @Override
public void setNumSegmentsPrunedByLimit(long numSegmentsPrunedByLimit) {
_numSegmentsPrunedByLimit = numSegmentsPrunedByLimit;
}
@@ -287,7 +276,6 @@ public long getNumSegmentsPrunedByValue() {
}
@JsonProperty("numSegmentsPrunedByValue")
- @Override
public void setNumSegmentsPrunedByValue(long numSegmentsPrunedByValue) {
_numSegmentsPrunedByValue = numSegmentsPrunedByValue;
}
@@ -299,7 +287,6 @@ public long getExplainPlanNumEmptyFilterSegments() {
}
@JsonProperty("explainPlanNumEmptyFilterSegments")
- @Override
public void setExplainPlanNumEmptyFilterSegments(long explainPlanNumEmptyFilterSegments) {
_explainPlanNumEmptyFilterSegments = explainPlanNumEmptyFilterSegments;
}
@@ -311,7 +298,6 @@ public long getExplainPlanNumMatchAllFilterSegments() {
}
@JsonProperty("explainPlanNumMatchAllFilterSegments")
- @Override
public void setExplainPlanNumMatchAllFilterSegments(long explainPlanNumMatchAllFilterSegments) {
_explainPlanNumMatchAllFilterSegments = explainPlanNumMatchAllFilterSegments;
}
@@ -350,7 +336,6 @@ public int getNumServersQueried() {
}
@JsonProperty("numServersQueried")
- @Override
public void setNumServersQueried(int numServersQueried) {
_numServersQueried = numServersQueried;
}
@@ -448,6 +433,7 @@ public void setNumConsumingSegmentsQueried(long numConsumingSegmentsQueried) {
public long getNumConsumingSegmentsProcessed() {
return _numConsumingSegmentsProcessed;
}
+
@JsonProperty("numConsumingSegmentsProcessed")
public void setNumConsumingSegmentsProcessed(long numConsumingSegmentsProcessed) {
_numConsumingSegmentsProcessed = numConsumingSegmentsProcessed;
@@ -497,27 +483,18 @@ public void setNumGroupsLimitReached(boolean numGroupsLimitReached) {
_numGroupsLimitReached = numGroupsLimitReached;
}
- @JsonProperty("maxRowsInJoinReached")
+ @JsonProperty(access = JsonProperty.Access.READ_ONLY)
public boolean isMaxRowsInJoinReached() {
- return _maxRowsInJoinReached;
- }
-
- @JsonProperty("maxRowsInJoinReached")
- public void setMaxRowsInJoinReached(boolean maxRowsInJoinReached) {
- _maxRowsInJoinReached = maxRowsInJoinReached;
+ return false;
}
- @JsonProperty("partialResult")
+ @JsonProperty(access = JsonProperty.Access.READ_ONLY)
public boolean isPartialResult() {
- return _partialResult;
- }
-
- @JsonProperty("partialResult")
- public void setPartialResult(boolean partialResult) {
- _partialResult = partialResult;
+ return isNumGroupsLimitReached() || getExceptionsSize() > 0 || isMaxRowsInJoinReached();
}
@JsonProperty("timeUsedMs")
+ @Override
public long getTimeUsedMs() {
return _timeUsedMs;
}
@@ -528,16 +505,10 @@ public void setTimeUsedMs(long timeUsedMs) {
_timeUsedMs = timeUsedMs;
}
- @JsonProperty("numRowsResultSet")
+ @JsonProperty(access = JsonProperty.Access.READ_ONLY)
@Override
public int getNumRowsResultSet() {
- return _numRowsResultSet;
- }
-
- @JsonProperty("numRowsResultSet")
- @Override
- public void setNumRowsResultSet(int numRowsResultSet) {
- _numRowsResultSet = numRowsResultSet;
+ return BrokerResponse.super.getNumRowsResultSet();
}
@JsonProperty("segmentStatistics")
@@ -560,15 +531,11 @@ public void setTraceInfo(Map traceInfo) {
_traceInfo = traceInfo;
}
- @Override
- public String toJsonString()
- throws IOException {
- return JsonUtils.objectToString(this);
- }
-
@JsonIgnore
@Override
public void setExceptions(List exceptions) {
+ // TODO: This is incorrect. It is adding and not setting the exceptions
+ // But there is some code that seems to depend on this.
for (ProcessingException exception : exceptions) {
_processingExceptions.add(new QueryProcessingException(exception.getErrorCode(), exception.getMessage()));
}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java
index 60a22460f39a..40f521e25176 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java
@@ -18,16 +18,17 @@
*/
package org.apache.pinot.common.response.broker;
+import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
-import java.io.IOException;
+import com.fasterxml.jackson.databind.node.ObjectNode;
import java.util.ArrayList;
-import java.util.HashMap;
+import java.util.Collections;
import java.util.List;
-import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.datatable.StatMap;
+import org.apache.pinot.common.response.BrokerResponse;
import org.apache.pinot.common.response.ProcessingException;
-import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.spi.utils.JsonUtils;
/**
@@ -46,28 +47,36 @@
"realtimeResponseSerializationCpuTimeNs", "offlineTotalCpuTimeNs", "realtimeTotalCpuTimeNs", "brokerReduceTimeMs",
"segmentStatistics", "traceInfo", "partialResult"
})
-public class BrokerResponseNativeV2 extends BrokerResponseNative {
+public class BrokerResponseNativeV2 implements BrokerResponse {
- private final Map _stageIdStats = new HashMap<>();
+ private ObjectNode _stageStats = null;
+ /**
+ * The max number of rows seen at runtime.
+ *
+ * In single-stage this doesn't make sense given it is the max number of rows read from the table. But in multi-stage
+ * virtual rows can be generated. For example, in a join query, the number of rows can be more than the number of rows
+ * in the table.
+ */
+ private long _maxRowsInOperator = 0;
+ private final StatMap _brokerStats = new StatMap<>(StatKey.class);
+ private final List _processingExceptions;
+ private ResultTable _resultTable;
+ private String _requestId;
+ private String _brokerId;
public BrokerResponseNativeV2() {
+ _processingExceptions = new ArrayList<>();
}
public BrokerResponseNativeV2(ProcessingException exception) {
- super(exception);
+ this(Collections.singletonList(exception));
}
public BrokerResponseNativeV2(List exceptions) {
- super(exceptions);
- }
-
- /** Generate EXPLAIN PLAN output when queries are evaluated by Broker without going to the Server. */
- private static BrokerResponseNativeV2 getBrokerResponseExplainPlanOutput() {
- BrokerResponseNativeV2 brokerResponse = BrokerResponseNativeV2.empty();
- List