From c6da2f30e8f33c570173b7ac649a829173187481 Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Tue, 13 Aug 2024 14:04:41 +0530 Subject: [PATCH] Add fieldReader for row based frames (#16707) Add a new fieldReaders#makeRAC for RowBasedFrameRowsAndColumns. --- .../druid/frame/field/ComplexFieldReader.java | 97 ++++- .../frame/field/DoubleArrayFieldReader.java | 10 + .../druid/frame/field/DoubleFieldReader.java | 140 +++++++ .../frame/field/FieldPositionHelper.java | 65 ++++ .../apache/druid/frame/field/FieldReader.java | 12 +- .../frame/field/FloatArrayFieldReader.java | 10 + .../druid/frame/field/FloatFieldReader.java | 140 +++++++ .../frame/field/LongArrayFieldReader.java | 10 + .../druid/frame/field/LongFieldReader.java | 141 ++++++- .../frame/field/NumericArrayFieldReader.java | 2 +- .../druid/frame/field/NumericFieldReader.java | 7 +- .../druid/frame/field/StringFieldReader.java | 347 +++++++++++++++--- .../concrete/RowBasedFrameRowsAndColumns.java | 17 +- .../frame/field/DoubleFieldReaderTest.java | 30 ++ .../druid/frame/field/FieldReaderRACTest.java | 80 ++++ .../frame/field/FloatFieldReaderTest.java | 30 ++ .../frame/field/LongFieldReaderTest.java | 31 ++ .../rowsandcols/RowsAndColumnsTestBase.java | 5 +- .../ColumnBasedFrameRowsAndColumnsTest.java | 6 +- .../RowBasedFrameRowsAndColumnsTest.java | 80 ++++ 20 files changed, 1190 insertions(+), 70 deletions(-) create mode 100644 processing/src/main/java/org/apache/druid/frame/field/FieldPositionHelper.java create mode 100644 processing/src/test/java/org/apache/druid/frame/field/FieldReaderRACTest.java create mode 100644 processing/src/test/java/org/apache/druid/query/rowsandcols/concrete/RowBasedFrameRowsAndColumnsTest.java diff --git a/processing/src/main/java/org/apache/druid/frame/field/ComplexFieldReader.java b/processing/src/main/java/org/apache/druid/frame/field/ComplexFieldReader.java index 29bf0945adbe..75ad70d5cb42 100644 --- a/processing/src/main/java/org/apache/druid/frame/field/ComplexFieldReader.java +++ b/processing/src/main/java/org/apache/druid/frame/field/ComplexFieldReader.java @@ -21,24 +21,32 @@ import com.google.common.base.Preconditions; import org.apache.datasketches.memory.Memory; +import org.apache.druid.frame.Frame; +import org.apache.druid.frame.write.RowBasedFrameWriter; import org.apache.druid.java.util.common.ISE; import org.apache.druid.query.extraction.ExtractionFn; import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; +import org.apache.druid.query.rowsandcols.column.Column; +import org.apache.druid.query.rowsandcols.column.ColumnAccessor; +import org.apache.druid.query.rowsandcols.column.accessor.ObjectColumnAccessorBase; import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.DimensionSelector; import org.apache.druid.segment.ObjectColumnSelector; import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.column.ValueType; import org.apache.druid.segment.serde.ComplexMetricSerde; import org.apache.druid.segment.serde.ComplexMetrics; +import javax.annotation.Nonnull; import javax.annotation.Nullable; +import java.util.Comparator; /** * Reads values written by {@link ComplexFieldWriter}. - * + *

* Format: - * + *

* - 1 byte: {@link ComplexFieldWriter#NULL_BYTE} or {@link ComplexFieldWriter#NOT_NULL_BYTE} * - 4 bytes: length of serialized complex value, little-endian int * - N bytes: serialized complex value @@ -121,7 +129,7 @@ public static Object readFieldFromByteArray( * Alternative interface to read the field from the memory without creating a selector and field pointer */ @Nullable - public static Object readFieldFromMemory( + public static T readFieldFromMemory( final ComplexMetricSerde serde, final Memory memory, final long position @@ -136,7 +144,8 @@ public static Object readFieldFromMemory( final byte[] bytes = new byte[length]; memory.getByteArray(position + ComplexFieldWriter.HEADER_SIZE, bytes, 0, length); - return serde.fromBytes(bytes, 0, length); + //noinspection unchecked + return (T) serde.fromBytes(bytes, 0, length); } else { throw new ISE("Unexpected null byte [%s]", nullByte); } @@ -166,8 +175,8 @@ private Selector(Memory memory, ReadableFieldPointer fieldPointer, ComplexMetric @Override public T getObject() { - //noinspection unchecked - return (T) readFieldFromMemory(serde, memory, fieldPointer.position()); + final long fieldPosition = fieldPointer.position(); + return readFieldFromMemory(serde, memory, fieldPosition); } @Override @@ -183,4 +192,80 @@ public void inspectRuntimeShape(RuntimeShapeInspector inspector) // Do nothing. } } + + @Override + public Column makeRACColumn(Frame frame, RowSignature signature, String columnName) + { + return new ComplexFieldReaderColumn(frame, signature.indexOf(columnName), signature.size()); + } + + private class ComplexFieldReaderColumn implements Column + { + private final Frame frame; + private final Memory dataRegion; + private final ColumnType type; + private final FieldPositionHelper coach; + + public ComplexFieldReaderColumn(Frame frame, int columnIndex, int numFields) + { + this.frame = frame; + dataRegion = frame.region(RowBasedFrameWriter.ROW_DATA_REGION); + + this.type = ColumnType.ofComplex(serde.getTypeName()); + this.coach = new FieldPositionHelper( + frame, + frame.region(RowBasedFrameWriter.ROW_OFFSET_REGION), + dataRegion, + columnIndex, + numFields + ); + } + + @Nonnull + @Override + public ColumnAccessor toAccessor() + { + return new ObjectColumnAccessorBase() + { + @Override + public ColumnType getType() + { + return type; + } + + @Override + public int numRows() + { + return frame.numRows(); + } + + @Override + public boolean isNull(int rowNum) + { + final long fieldPosition = coach.computeFieldPosition(rowNum); + return dataRegion.getByte(fieldPosition) == ComplexFieldWriter.NULL_BYTE; + } + + @Override + protected Object getVal(int rowNum) + { + return readFieldFromMemory(serde, dataRegion, coach.computeFieldPosition(rowNum)); + } + + @Override + protected Comparator getComparator() + { + return serde.getTypeStrategy(); + } + + }; + } + + @Nullable + @Override + public T as(Class clazz) + { + return null; + } + } } diff --git a/processing/src/main/java/org/apache/druid/frame/field/DoubleArrayFieldReader.java b/processing/src/main/java/org/apache/druid/frame/field/DoubleArrayFieldReader.java index ec7de095e12c..2e57a3e9a5c1 100644 --- a/processing/src/main/java/org/apache/druid/frame/field/DoubleArrayFieldReader.java +++ b/processing/src/main/java/org/apache/druid/frame/field/DoubleArrayFieldReader.java @@ -20,7 +20,11 @@ package org.apache.druid.frame.field; import org.apache.datasketches.memory.Memory; +import org.apache.druid.error.NotYetImplemented; +import org.apache.druid.frame.Frame; +import org.apache.druid.query.rowsandcols.column.Column; import org.apache.druid.segment.ColumnValueSelector; +import org.apache.druid.segment.column.RowSignature; import javax.annotation.Nullable; @@ -60,4 +64,10 @@ public int getIndividualFieldSize() } }; } + + @Override + public Column makeRACColumn(Frame frame, RowSignature signature, String columnName) + { + throw NotYetImplemented.ex(null, "Class cannot create an RAC column."); + } } diff --git a/processing/src/main/java/org/apache/druid/frame/field/DoubleFieldReader.java b/processing/src/main/java/org/apache/druid/frame/field/DoubleFieldReader.java index 7f7a3f8639eb..3afadebe7111 100644 --- a/processing/src/main/java/org/apache/druid/frame/field/DoubleFieldReader.java +++ b/processing/src/main/java/org/apache/druid/frame/field/DoubleFieldReader.java @@ -21,11 +21,20 @@ import org.apache.datasketches.memory.Memory; import org.apache.druid.common.config.NullHandling; +import org.apache.druid.frame.Frame; +import org.apache.druid.frame.write.RowBasedFrameWriter; import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; +import org.apache.druid.query.rowsandcols.column.Column; +import org.apache.druid.query.rowsandcols.column.ColumnAccessor; import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.DoubleColumnSelector; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.column.ValueType; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + /** * Reads the values produced by {@link DoubleFieldWriter} * @@ -99,4 +108,135 @@ public boolean isNull() return super._isNull(); } } + + @Override + public Column makeRACColumn(Frame frame, RowSignature signature, String columnName) + { + return new DoubleFieldReaderColumn(frame, signature.indexOf(columnName), signature.size()); + } + + private class DoubleFieldReaderColumn implements Column + { + private final Frame frame; + private final Memory dataRegion; + private final FieldPositionHelper coach; + + public DoubleFieldReaderColumn(Frame frame, int columnIndex, int numFields) + { + this.frame = frame; + dataRegion = frame.region(RowBasedFrameWriter.ROW_DATA_REGION); + + this.coach = new FieldPositionHelper( + frame, + frame.region(RowBasedFrameWriter.ROW_OFFSET_REGION), + dataRegion, + columnIndex, + numFields + ); + } + + @Nonnull + @Override + public ColumnAccessor toAccessor() + { + return new ColumnAccessor() + { + @Override + public ColumnType getType() + { + return ColumnType.DOUBLE; + } + + @Override + public int numRows() + { + return frame.numRows(); + } + + @Override + public boolean isNull(int rowNum) + { + final long fieldPosition = coach.computeFieldPosition(rowNum); + return dataRegion.getByte(fieldPosition) == getNullIndicatorByte(); + } + + @Nullable + @Override + public Object getObject(int rowNum) + { + final long fieldPosition = coach.computeFieldPosition(rowNum); + + if (dataRegion.getByte(fieldPosition) == getNullIndicatorByte()) { + return null; + } else { + return getDoubleAtPosition(fieldPosition); + } + } + + @Override + public double getDouble(int rowNum) + { + final long fieldPosition = coach.computeFieldPosition(rowNum); + + if (dataRegion.getByte(fieldPosition) == getNullIndicatorByte()) { + return 0L; + } else { + return getDoubleAtPosition(fieldPosition); + } + } + + @Override + public float getFloat(int rowNum) + { + return (float) getDouble(rowNum); + } + + @Override + public long getLong(int rowNum) + { + return (long) getDouble(rowNum); + } + + @Override + public int getInt(int rowNum) + { + return (int) getDouble(rowNum); + } + + @Override + public int compareRows(int lhsRowNum, int rhsRowNum) + { + long lhsPosition = coach.computeFieldPosition(lhsRowNum); + long rhsPosition = coach.computeFieldPosition(rhsRowNum); + + final byte nullIndicatorByte = getNullIndicatorByte(); + if (dataRegion.getByte(lhsPosition) == nullIndicatorByte) { + if (dataRegion.getByte(rhsPosition) == nullIndicatorByte) { + return 0; + } else { + return -1; + } + } else { + if (dataRegion.getByte(rhsPosition) == nullIndicatorByte) { + return 1; + } else { + return Double.compare(getDoubleAtPosition(lhsPosition), getDoubleAtPosition(rhsPosition)); + } + } + } + + private double getDoubleAtPosition(long lhsPosition) + { + return TransformUtils.detransformToDouble(dataRegion.getLong(lhsPosition + Byte.BYTES)); + } + }; + } + + @Nullable + @Override + public T as(Class clazz) + { + return null; + } + } } diff --git a/processing/src/main/java/org/apache/druid/frame/field/FieldPositionHelper.java b/processing/src/main/java/org/apache/druid/frame/field/FieldPositionHelper.java new file mode 100644 index 000000000000..d4abe5300b49 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/frame/field/FieldPositionHelper.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.frame.field; + +import org.apache.datasketches.memory.Memory; +import org.apache.druid.frame.Frame; + +/** + * Helps compute the field position for a frame from the different regions in the frame. + */ +public class FieldPositionHelper +{ + private final Frame frame; + private final Memory offsetRegion; + private final Memory dataRegion; + private final int columnIndex; + private final long fieldsBytesSize; + + public FieldPositionHelper( + Frame frame, + Memory offsetRegion, + Memory dataRegion, + int columnIndex, + int numFields + ) + { + this.frame = frame; + this.offsetRegion = offsetRegion; + this.dataRegion = dataRegion; + this.columnIndex = columnIndex; + this.fieldsBytesSize = this.columnIndex == 0 + ? ((long) numFields) * Integer.BYTES + : ((long) (this.columnIndex - 1)) * Integer.BYTES; + } + + public long computeFieldPosition(int rowNum) + { + rowNum = frame.physicalRow(rowNum); + final long rowPosition = rowNum == 0 ? 0 : offsetRegion.getLong(((long) rowNum - 1) * Long.BYTES); + final long fieldPosition; + if (columnIndex == 0) { + fieldPosition = rowPosition + fieldsBytesSize; + } else { + fieldPosition = rowPosition + dataRegion.getInt(rowPosition + fieldsBytesSize); + } + return fieldPosition; + } +} diff --git a/processing/src/main/java/org/apache/druid/frame/field/FieldReader.java b/processing/src/main/java/org/apache/druid/frame/field/FieldReader.java index bc9d631361f3..8cf0378071d3 100644 --- a/processing/src/main/java/org/apache/druid/frame/field/FieldReader.java +++ b/processing/src/main/java/org/apache/druid/frame/field/FieldReader.java @@ -20,24 +20,32 @@ package org.apache.druid.frame.field; import org.apache.datasketches.memory.Memory; +import org.apache.druid.frame.Frame; import org.apache.druid.frame.key.RowKey; import org.apache.druid.frame.key.RowKeyReader; import org.apache.druid.query.extraction.ExtractionFn; +import org.apache.druid.query.rowsandcols.column.Column; import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.DimensionSelector; +import org.apache.druid.segment.column.RowSignature; import javax.annotation.Nullable; /** * Embeds the logic to read a specific field from row-based frames or from {@link RowKey}. - * + *

* Most callers should use {@link org.apache.druid.frame.read.FrameReader} or * {@link RowKeyReader} rather than using this interface directly. - * + *

* Stateless and immutable. */ public interface FieldReader { + /** + * Create a {@link Column} which provides accses to the rows in the frame, via the {@link Column#toAccessor()}. + */ + Column makeRACColumn(Frame frame, RowSignature signature, String columnName); + /** * Create a {@link ColumnValueSelector} backed by some memory and a moveable pointer. */ diff --git a/processing/src/main/java/org/apache/druid/frame/field/FloatArrayFieldReader.java b/processing/src/main/java/org/apache/druid/frame/field/FloatArrayFieldReader.java index e97af071824e..7252265ba8a8 100644 --- a/processing/src/main/java/org/apache/druid/frame/field/FloatArrayFieldReader.java +++ b/processing/src/main/java/org/apache/druid/frame/field/FloatArrayFieldReader.java @@ -20,7 +20,11 @@ package org.apache.druid.frame.field; import org.apache.datasketches.memory.Memory; +import org.apache.druid.error.NotYetImplemented; +import org.apache.druid.frame.Frame; +import org.apache.druid.query.rowsandcols.column.Column; import org.apache.druid.segment.ColumnValueSelector; +import org.apache.druid.segment.column.RowSignature; import javax.annotation.Nullable; @@ -60,4 +64,10 @@ public int getIndividualFieldSize() } }; } + + @Override + public Column makeRACColumn(Frame frame, RowSignature signature, String columnName) + { + throw NotYetImplemented.ex(null, "Class cannot create an RAC column."); + } } diff --git a/processing/src/main/java/org/apache/druid/frame/field/FloatFieldReader.java b/processing/src/main/java/org/apache/druid/frame/field/FloatFieldReader.java index 6617d563d679..3fc7213c73ed 100644 --- a/processing/src/main/java/org/apache/druid/frame/field/FloatFieldReader.java +++ b/processing/src/main/java/org/apache/druid/frame/field/FloatFieldReader.java @@ -21,11 +21,20 @@ import org.apache.datasketches.memory.Memory; import org.apache.druid.common.config.NullHandling; +import org.apache.druid.frame.Frame; +import org.apache.druid.frame.write.RowBasedFrameWriter; import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; +import org.apache.druid.query.rowsandcols.column.Column; +import org.apache.druid.query.rowsandcols.column.ColumnAccessor; import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.FloatColumnSelector; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.column.ValueType; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + /** * Reads values written by {@link FloatFieldWriter}. * @@ -99,4 +108,135 @@ public boolean isNull() return super._isNull(); } } + + @Override + public Column makeRACColumn(Frame frame, RowSignature signature, String columnName) + { + return new FloatFieldReaderColumn(frame, signature.indexOf(columnName), signature.size()); + } + + private class FloatFieldReaderColumn implements Column + { + private final Frame frame; + private final Memory dataRegion; + private final FieldPositionHelper coach; + + public FloatFieldReaderColumn(Frame frame, int columnIndex, int numFields) + { + this.frame = frame; + dataRegion = frame.region(RowBasedFrameWriter.ROW_DATA_REGION); + + this.coach = new FieldPositionHelper( + frame, + frame.region(RowBasedFrameWriter.ROW_OFFSET_REGION), + dataRegion, + columnIndex, + numFields + ); + } + + @Nonnull + @Override + public ColumnAccessor toAccessor() + { + return new ColumnAccessor() + { + @Override + public ColumnType getType() + { + return ColumnType.FLOAT; + } + + @Override + public int numRows() + { + return frame.numRows(); + } + + @Override + public boolean isNull(int rowNum) + { + final long fieldPosition = coach.computeFieldPosition(rowNum); + return dataRegion.getByte(fieldPosition) == getNullIndicatorByte(); + } + + @Nullable + @Override + public Object getObject(int rowNum) + { + final long fieldPosition = coach.computeFieldPosition(rowNum); + + if (dataRegion.getByte(fieldPosition) == getNullIndicatorByte()) { + return null; + } else { + return getFloatAtPosition(fieldPosition); + } + } + + @Override + public double getDouble(int rowNum) + { + return getFloat(rowNum); + } + + @Override + public float getFloat(int rowNum) + { + final long fieldPosition = coach.computeFieldPosition(rowNum); + + if (dataRegion.getByte(fieldPosition) == getNullIndicatorByte()) { + return 0L; + } else { + return getFloatAtPosition(fieldPosition); + } + } + + @Override + public long getLong(int rowNum) + { + return (long) getFloat(rowNum); + } + + @Override + public int getInt(int rowNum) + { + return (int) getFloat(rowNum); + } + + @Override + public int compareRows(int lhsRowNum, int rhsRowNum) + { + long lhsPosition = coach.computeFieldPosition(lhsRowNum); + long rhsPosition = coach.computeFieldPosition(rhsRowNum); + + final byte nullIndicatorByte = getNullIndicatorByte(); + if (dataRegion.getByte(lhsPosition) == nullIndicatorByte) { + if (dataRegion.getByte(rhsPosition) == nullIndicatorByte) { + return 0; + } else { + return -1; + } + } else { + if (dataRegion.getByte(rhsPosition) == nullIndicatorByte) { + return 1; + } else { + return Float.compare(getFloatAtPosition(lhsPosition), getFloatAtPosition(rhsPosition)); + } + } + } + + private float getFloatAtPosition(long rhsPosition) + { + return TransformUtils.detransformToFloat(dataRegion.getInt(rhsPosition + Byte.BYTES)); + } + }; + } + + @Nullable + @Override + public T as(Class clazz) + { + return null; + } + } } diff --git a/processing/src/main/java/org/apache/druid/frame/field/LongArrayFieldReader.java b/processing/src/main/java/org/apache/druid/frame/field/LongArrayFieldReader.java index 8f7578c07d38..ee77223aefe4 100644 --- a/processing/src/main/java/org/apache/druid/frame/field/LongArrayFieldReader.java +++ b/processing/src/main/java/org/apache/druid/frame/field/LongArrayFieldReader.java @@ -20,7 +20,11 @@ package org.apache.druid.frame.field; import org.apache.datasketches.memory.Memory; +import org.apache.druid.error.NotYetImplemented; +import org.apache.druid.frame.Frame; +import org.apache.druid.query.rowsandcols.column.Column; import org.apache.druid.segment.ColumnValueSelector; +import org.apache.druid.segment.column.RowSignature; import javax.annotation.Nullable; @@ -60,4 +64,10 @@ public int getIndividualFieldSize() } }; } + + @Override + public Column makeRACColumn(Frame frame, RowSignature signature, String columnName) + { + throw NotYetImplemented.ex(null, "Class cannot create an RAC column."); + } } diff --git a/processing/src/main/java/org/apache/druid/frame/field/LongFieldReader.java b/processing/src/main/java/org/apache/druid/frame/field/LongFieldReader.java index 8f3bbbf04517..9b514c930873 100644 --- a/processing/src/main/java/org/apache/druid/frame/field/LongFieldReader.java +++ b/processing/src/main/java/org/apache/druid/frame/field/LongFieldReader.java @@ -21,11 +21,20 @@ import org.apache.datasketches.memory.Memory; import org.apache.druid.common.config.NullHandling; +import org.apache.druid.frame.Frame; +import org.apache.druid.frame.write.RowBasedFrameWriter; import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; +import org.apache.druid.query.rowsandcols.column.Column; +import org.apache.druid.query.rowsandcols.column.ColumnAccessor; import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.LongColumnSelector; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.column.ValueType; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + /** * Reads values written by {@link LongFieldWriter}. * @@ -68,7 +77,6 @@ public ColumnValueSelector getColumnValueSelector( private static class LongFieldSelector extends NumericFieldReader.Selector implements LongColumnSelector { - final Memory dataRegion; final ReadableFieldPointer fieldPointer; @@ -99,4 +107,135 @@ public boolean isNull() return super._isNull(); } } + + @Override + public Column makeRACColumn(Frame frame, RowSignature signature, String columnName) + { + return new LongFieldReaderColumn(frame, signature.indexOf(columnName), signature.size()); + } + + private class LongFieldReaderColumn implements Column + { + private final Frame frame; + private final Memory dataRegion; + private final FieldPositionHelper coach; + + public LongFieldReaderColumn(Frame frame, int columnIndex, int numFields) + { + this.frame = frame; + this.dataRegion = frame.region(RowBasedFrameWriter.ROW_DATA_REGION); + + this.coach = new FieldPositionHelper( + frame, + frame.region(RowBasedFrameWriter.ROW_OFFSET_REGION), + dataRegion, + columnIndex, + numFields + ); + } + + @Nonnull + @Override + public ColumnAccessor toAccessor() + { + return new ColumnAccessor() + { + @Override + public ColumnType getType() + { + return ColumnType.LONG; + } + + @Override + public int numRows() + { + return frame.numRows(); + } + + @Override + public boolean isNull(int rowNum) + { + final long fieldPosition = coach.computeFieldPosition(rowNum); + return dataRegion.getByte(fieldPosition) == getNullIndicatorByte(); + } + + @Nullable + @Override + public Object getObject(int rowNum) + { + final long fieldPosition = coach.computeFieldPosition(rowNum); + + if (dataRegion.getByte(fieldPosition) == getNullIndicatorByte()) { + return null; + } else { + return getLongAtPosition(fieldPosition); + } + } + + @Override + public double getDouble(int rowNum) + { + return getLong(rowNum); + } + + @Override + public float getFloat(int rowNum) + { + return getLong(rowNum); + } + + @Override + public long getLong(int rowNum) + { + final long fieldPosition = coach.computeFieldPosition(rowNum); + + if (dataRegion.getByte(fieldPosition) == getNullIndicatorByte()) { + return 0L; + } else { + return getLongAtPosition(fieldPosition); + } + } + + @Override + public int getInt(int rowNum) + { + return (int) getLong(rowNum); + } + + @Override + public int compareRows(int lhsRowNum, int rhsRowNum) + { + long lhsPosition = coach.computeFieldPosition(lhsRowNum); + long rhsPosition = coach.computeFieldPosition(rhsRowNum); + + final byte nullIndicatorByte = getNullIndicatorByte(); + if (dataRegion.getByte(lhsPosition) == nullIndicatorByte) { + if (dataRegion.getByte(rhsPosition) == nullIndicatorByte) { + return 0; + } else { + return -1; + } + } else { + if (dataRegion.getByte(rhsPosition) == nullIndicatorByte) { + return 1; + } else { + return Long.compare(getLongAtPosition(lhsPosition), getLongAtPosition(rhsPosition)); + } + } + } + + private long getLongAtPosition(long rhsPosition) + { + return TransformUtils.detransformToLong(dataRegion.getLong(rhsPosition + Byte.BYTES)); + } + }; + } + + @Nullable + @Override + public T as(Class clazz) + { + return null; + } + } } diff --git a/processing/src/main/java/org/apache/druid/frame/field/NumericArrayFieldReader.java b/processing/src/main/java/org/apache/druid/frame/field/NumericArrayFieldReader.java index 8d6f3958b1e4..92dfbc98596d 100644 --- a/processing/src/main/java/org/apache/druid/frame/field/NumericArrayFieldReader.java +++ b/processing/src/main/java/org/apache/druid/frame/field/NumericArrayFieldReader.java @@ -29,7 +29,7 @@ /** * Reader class for the fields written by {@link NumericArrayFieldWriter}. See the Javadoc for the writer for more * information on the format - * + *

* The numeric array fields are byte comparable */ public abstract class NumericArrayFieldReader implements FieldReader diff --git a/processing/src/main/java/org/apache/druid/frame/field/NumericFieldReader.java b/processing/src/main/java/org/apache/druid/frame/field/NumericFieldReader.java index 1e11cfa65f37..bb0cc4faed61 100644 --- a/processing/src/main/java/org/apache/druid/frame/field/NumericFieldReader.java +++ b/processing/src/main/java/org/apache/druid/frame/field/NumericFieldReader.java @@ -50,6 +50,11 @@ public NumericFieldReader(boolean forArray) } } + public byte getNullIndicatorByte() + { + return nullIndicatorByte; + } + @Override public ColumnValueSelector makeColumnValueSelector(Memory memory, ReadableFieldPointer fieldPointer) { @@ -94,7 +99,7 @@ public abstract ColumnValueSelector getColumnValueSelector( /** * Helper class which allows the inheritors to fetch the nullity of the field located at fieldPointer's position in * the dataRegion. - * + *

* The implementations of the column value selectors returned by the {@link #getColumnValueSelector} can inherit this * class and call {@link #_isNull()} in their {@link ColumnValueSelector#isNull()} to offload the responsibility of * detecting null elements to this Selector, instead of reworking the null handling diff --git a/processing/src/main/java/org/apache/druid/frame/field/StringFieldReader.java b/processing/src/main/java/org/apache/druid/frame/field/StringFieldReader.java index 2513a2d24444..1c51a914e0d5 100644 --- a/processing/src/main/java/org/apache/druid/frame/field/StringFieldReader.java +++ b/processing/src/main/java/org/apache/druid/frame/field/StringFieldReader.java @@ -23,51 +23,64 @@ import it.unimi.dsi.fastutil.objects.ObjectArrays; import org.apache.datasketches.memory.Memory; import org.apache.druid.common.config.NullHandling; +import org.apache.druid.error.DruidException; +import org.apache.druid.error.NotYetImplemented; +import org.apache.druid.frame.Frame; import org.apache.druid.frame.read.FrameReaderUtils; +import org.apache.druid.frame.write.RowBasedFrameWriter; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.query.extraction.ExtractionFn; import org.apache.druid.query.filter.DruidPredicateFactory; import org.apache.druid.query.filter.ValueMatcher; import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; +import org.apache.druid.query.rowsandcols.column.Column; +import org.apache.druid.query.rowsandcols.column.ColumnAccessor; +import org.apache.druid.query.rowsandcols.column.accessor.ObjectColumnAccessorBase; import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.DimensionSelector; import org.apache.druid.segment.DimensionSelectorUtils; import org.apache.druid.segment.IdLookup; import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.column.ValueType; import org.apache.druid.segment.data.IndexedInts; import org.apache.druid.segment.data.RangeIndexedInts; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; +import java.util.Comparator; import java.util.List; /** * Reads fields written by {@link StringFieldWriter} or {@link StringArrayFieldWriter}. - * + *

* Strings are written in UTF8 and terminated by {@link StringFieldWriter#VALUE_TERMINATOR}. Note that this byte * appears in valid UTF8 encodings if and only if the string contains a NUL (char 0). Therefore, this field writer * cannot write out strings containing NUL characters. - * + *

* All rows are terminated by {@link StringFieldWriter#ROW_TERMINATOR}. - * + *

* Empty rows are represented in one byte: solely that {@link StringFieldWriter#ROW_TERMINATOR}. Rows that are null * themselves (i.e., a null array) are represented as a {@link StringFieldWriter#NULL_ROW} followed by a * {@link StringFieldWriter#ROW_TERMINATOR}. This encoding for null arrays is decoded by older readers as an * empty array; null arrays are a feature that did not exist in earlier versions of the code. - * + *

* Null strings are stored as {@link StringFieldWriter#NULL_BYTE}. All other strings are prepended by * {@link StringFieldWriter#NOT_NULL_BYTE} byte to differentiate them from nulls. - * + *

* This encoding allows the encoded data to be compared as bytes in a way that matches the behavior of * {@link org.apache.druid.segment.StringDimensionHandler#DIMENSION_SELECTOR_COMPARATOR}, except null and * empty list are not considered equal. */ public class StringFieldReader implements FieldReader { + public static final byte[] EXPECTED_BYTES_FOR_NULL = { + StringFieldWriter.NULL_BYTE, StringFieldWriter.VALUE_TERMINATOR, StringFieldWriter.ROW_TERMINATOR + }; private final boolean asArray; public StringFieldReader() @@ -123,6 +136,16 @@ public boolean isNull(Memory memory, long position) } } + @Override + public Column makeRACColumn(Frame frame, RowSignature signature, String columnName) + { + if (asArray) { + return new StringArrayFieldReaderColumn(frame, signature.indexOf(columnName), signature.size()); + } else { + return new StringFieldReaderColumn(frame, signature.indexOf(columnName), signature.size()); + } + } + /** * Selector that reads a value from a location pointed to by {@link ReadableFieldPointer}. */ @@ -297,70 +320,296 @@ private void updateCurrentUtf8Strings(final long fieldPosition) { currentUtf8StringsIsNull = false; currentUtf8Strings.clear(); + currentUtf8StringsIsNull = addStringsToList(memory, fieldPosition, currentUtf8Strings); + } + } - long position = fieldPosition; - long limit = memory.getCapacity(); + private static class StringFieldReaderColumn implements Column + { + private final Frame frame; + private final Memory dataRegion; + private final FieldPositionHelper coach; - boolean rowTerminatorSeen = false; + public StringFieldReaderColumn(Frame frame, int columnIndex, int numFields) + { + this.frame = frame; + this.dataRegion = frame.region(RowBasedFrameWriter.ROW_DATA_REGION); + + this.coach = new FieldPositionHelper( + frame, + frame.region(RowBasedFrameWriter.ROW_OFFSET_REGION), + dataRegion, + columnIndex, + numFields + ); + } - while (position < limit && !rowTerminatorSeen) { - final byte kind = memory.getByte(position); - position++; + @Nonnull + @Override + public ColumnAccessor toAccessor() + { + return new ObjectColumnAccessorBase() + { + @Override + public ColumnType getType() + { + return ColumnType.STRING; + } - switch (kind) { - case StringFieldWriter.VALUE_TERMINATOR: // Or NULL_ROW (same byte value) - if (position == fieldPosition + 1) { - // It was NULL_ROW. - currentUtf8StringsIsNull = true; + @Override + public int numRows() + { + return frame.numRows(); + } + + @Override + public boolean isNull(int rowNum) + { + final long fieldPosition = coach.computeFieldPosition(rowNum); + byte[] nullBytes = new byte[3]; + dataRegion.getByteArray(fieldPosition, nullBytes, 0, 3); + return Arrays.equals(nullBytes, EXPECTED_BYTES_FOR_NULL); + } + + @Override + public int compareRows(int lhsRowNum, int rhsRowNum) + { + ByteBuffer lhs = getUTF8BytesAtPosition(coach.computeFieldPosition(lhsRowNum)); + ByteBuffer rhs = getUTF8BytesAtPosition(coach.computeFieldPosition(rhsRowNum)); + + if (lhs == null) { + if (rhs == null) { + return 0; + } else { + return -1; + } + } else { + if (rhs == null) { + return 1; + } else { + return lhs.compareTo(rhs); } + } + } - // Skip; next byte will be a null/not-null byte or a row terminator. - break; + @Override + protected Object getVal(int rowNum) + { + return getStringAtPosition(coach.computeFieldPosition(rowNum)); + } - case StringFieldWriter.ROW_TERMINATOR: - // Skip; this is the end of the row, so we'll fall through to the return statement. - rowTerminatorSeen = true; - break; + @Override + protected Comparator getComparator() + { + // we implement compareRows and thus don't need to actually implement this method + throw new UnsupportedOperationException(); + } - case StringFieldWriter.NULL_BYTE: - currentUtf8Strings.add(null); - break; + @Nullable + private String getStringAtPosition(long fieldPosition) + { + return StringUtils.fromUtf8Nullable(getUTF8BytesAtPosition(fieldPosition)); + } - case StringFieldWriter.NOT_NULL_BYTE: - for (long i = position; ; i++) { - if (i >= limit) { - throw new ISE("Value overrun"); - } + @Nullable + private ByteBuffer getUTF8BytesAtPosition(long fieldPosition) + { + ArrayList buffers = new ArrayList<>(); + final boolean isNull = addStringsToList(dataRegion, fieldPosition, buffers); + if (isNull) { + return null; + } else { + if (buffers.size() > 1) { + throw DruidException.defensive( + "Can only work with single-valued strings, should use a COMPLEX or ARRAY typed Column instead" + ); + } + return buffers.get(0); + } + } + }; + } - final byte b = memory.getByte(i); + @Nullable + @Override + public T as(Class clazz) + { + return null; + } + } - if (b == StringFieldWriter.VALUE_TERMINATOR) { - final int len = Ints.checkedCast(i - position); + private static class StringArrayFieldReaderColumn implements Column + { + private final Frame frame; + private final Memory dataRegion; + private final FieldPositionHelper coach; + + public StringArrayFieldReaderColumn(Frame frame, int columnIndex, int numFields) + { + this.frame = frame; + this.dataRegion = frame.region(RowBasedFrameWriter.ROW_DATA_REGION); + + this.coach = new FieldPositionHelper( + frame, + frame.region(RowBasedFrameWriter.ROW_OFFSET_REGION), + this.dataRegion, + columnIndex, + numFields + ); + } + + @Nonnull + @Override + public ColumnAccessor toAccessor() + { + return new ObjectColumnAccessorBase() + { + @Override + public ColumnType getType() + { + return ColumnType.STRING_ARRAY; + } + + @Override + public int numRows() + { + return frame.numRows(); + } + + @Override + public boolean isNull(int rowNum) + { + final long fieldPosition = coach.computeFieldPosition(rowNum); + byte[] nullBytes = new byte[3]; + dataRegion.getByteArray(fieldPosition, nullBytes, 0, 3); + return Arrays.equals(nullBytes, EXPECTED_BYTES_FOR_NULL); + } + + @Override + public int compareRows(int lhsRowNum, int rhsRowNum) + { + throw NotYetImplemented.ex( + null, + "Should implement this by comparing the actual bytes in the frame, they should be comparable" + ); + } + + @Override + protected Object getVal(int rowNum) + { + return getStringsAtPosition(coach.computeFieldPosition(rowNum)); + } + + @Override + protected Comparator getComparator() + { + // we implement compareRows and thus don't need to actually implement this method + throw new UnsupportedOperationException(); + } + + @Nullable + private List getStringsAtPosition(long fieldPosition) + { + final List bufs = getUTF8BytesAtPosition(fieldPosition); + if (bufs == null) { + return null; + } + + final ArrayList retVal = new ArrayList<>(bufs.size()); + for (ByteBuffer buf : bufs) { + retVal.add(StringUtils.fromUtf8Nullable(buf)); + } + return retVal; + } - if (len == 0 && NullHandling.replaceWithDefault()) { - // Empty strings and nulls are the same in this mode. - currentUtf8Strings.add(null); - } else { - final ByteBuffer buf = FrameReaderUtils.readByteBuffer(memory, position, len); - currentUtf8Strings.add(buf); - } + @Nullable + private List getUTF8BytesAtPosition(long fieldPosition) + { + ArrayList buffers = new ArrayList<>(); + final boolean isNull = addStringsToList(dataRegion, fieldPosition, buffers); + if (isNull) { + return null; + } else { + return buffers; + } + } + }; + } - position += len; + @Nullable + @Override + public T as(Class clazz) + { + return null; + } + } - break; + private static boolean addStringsToList(Memory memory, long fieldPosition, List list) + { + long position = fieldPosition; + long limit = memory.getCapacity(); + + boolean rowTerminatorSeen = false; + boolean isEffectivelyNull = false; + + while (position < limit && !rowTerminatorSeen) { + final byte kind = memory.getByte(position); + position++; + + switch (kind) { + case StringFieldWriter.VALUE_TERMINATOR: // Or NULL_ROW (same byte value) + if (position == fieldPosition + 1) { + // It was NULL_ROW. + isEffectivelyNull = true; + } + + // Skip; next byte will be a null/not-null byte or a row terminator. + break; + + case StringFieldWriter.ROW_TERMINATOR: + // Skip; this is the end of the row, so we'll fall through to the return statement. + rowTerminatorSeen = true; + break; + + case StringFieldWriter.NULL_BYTE: + list.add(null); + break; + + case StringFieldWriter.NOT_NULL_BYTE: + for (long i = position; ; i++) { + if (i >= limit) { + throw new ISE("Value overrun"); + } + + final byte b = memory.getByte(i); + + if (b == StringFieldWriter.VALUE_TERMINATOR) { + final int len = Ints.checkedCast(i - position); + + if (len == 0 && NullHandling.replaceWithDefault()) { + // Empty strings and nulls are the same in this mode. + list.add(null); + } else { + final ByteBuffer buf = FrameReaderUtils.readByteBuffer(memory, position, len); + list.add(buf); } + + position += len; + + break; } + } - break; + break; - default: - throw new ISE("Invalid value start byte [%s]", kind); - } + default: + throw new ISE("Invalid value start byte [%s]", kind); } + } - if (!rowTerminatorSeen) { - throw new ISE("Unexpected end of field"); - } + if (!rowTerminatorSeen) { + throw new ISE("Unexpected end of field"); } + return isEffectivelyNull; } } diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/RowBasedFrameRowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/RowBasedFrameRowsAndColumns.java index 234410bc070c..fa17984e9ba5 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/RowBasedFrameRowsAndColumns.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/RowBasedFrameRowsAndColumns.java @@ -19,12 +19,13 @@ package org.apache.druid.query.rowsandcols.concrete; +import org.apache.druid.error.DruidException; import org.apache.druid.frame.Frame; import org.apache.druid.frame.FrameType; +import org.apache.druid.frame.field.FieldReader; +import org.apache.druid.frame.field.FieldReaders; import org.apache.druid.frame.read.FrameReader; -import org.apache.druid.frame.read.columnar.FrameColumnReaders; import org.apache.druid.frame.segment.FrameStorageAdapter; -import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.query.rowsandcols.RowsAndColumns; import org.apache.druid.query.rowsandcols.column.Column; @@ -65,6 +66,7 @@ public int numRows() @Override public Column findColumn(String name) { + // Use contains so that we can negative cache. if (!colCache.containsKey(name)) { final int columnIndex = signature.indexOf(name); if (columnIndex < 0) { @@ -72,9 +74,16 @@ public Column findColumn(String name) } else { final ColumnType columnType = signature .getColumnType(columnIndex) - .orElseThrow(() -> new ISE("just got the id, why is columnType not there?")); + .orElseThrow( + () -> DruidException.defensive( + "just got the id [%s][%s], why is columnType not there?", + columnIndex, + name + ) + ); - colCache.put(name, FrameColumnReaders.create(name, columnIndex, columnType).readRACColumn(frame)); + final FieldReader reader = FieldReaders.create(name, columnType); + colCache.put(name, reader.makeRACColumn(frame, signature, name)); } } return colCache.get(name); diff --git a/processing/src/test/java/org/apache/druid/frame/field/DoubleFieldReaderTest.java b/processing/src/test/java/org/apache/druid/frame/field/DoubleFieldReaderTest.java index 00fafe53f62c..175e9de2a56e 100644 --- a/processing/src/test/java/org/apache/druid/frame/field/DoubleFieldReaderTest.java +++ b/processing/src/test/java/org/apache/druid/frame/field/DoubleFieldReaderTest.java @@ -21,14 +21,20 @@ import org.apache.datasketches.memory.WritableMemory; import org.apache.druid.common.config.NullHandling; +import org.apache.druid.frame.key.KeyOrder; +import org.apache.druid.frame.write.FrameWriterTestData; import org.apache.druid.java.util.common.ISE; import org.apache.druid.query.extraction.SubstringDimExtractionFn; import org.apache.druid.query.filter.DruidObjectPredicate; import org.apache.druid.query.filter.StringPredicateDruidPredicateFactory; +import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns; +import org.apache.druid.query.rowsandcols.column.ColumnAccessor; +import org.apache.druid.query.rowsandcols.concrete.RowBasedFrameRowsAndColumnsTest; import org.apache.druid.segment.BaseDoubleColumnValueSelector; import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.DimensionDictionarySelector; import org.apache.druid.segment.DimensionSelector; +import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.data.IndexedInts; import org.apache.druid.testing.InitializedNullHandlingTest; import org.junit.After; @@ -42,6 +48,9 @@ import org.mockito.junit.MockitoRule; import org.mockito.quality.Strictness; +import java.util.List; +import java.util.Objects; + public class DoubleFieldReaderTest extends InitializedNullHandlingTest { private static final long MEMORY_POSITION = 1; @@ -143,6 +152,27 @@ public void test_makeDimensionSelector_defaultOrNull() } } + @Test + public void testCompareRows() + { + final List rows = FrameWriterTestData.TEST_DOUBLES.getData(KeyOrder.ASCENDING); + + final ColumnAccessor accessor = + RowBasedFrameRowsAndColumnsTest.MAKER.apply( + MapOfColumnsRowsAndColumns.builder() + .add("dim1", rows.toArray(), ColumnType.DOUBLE) + .build() + ).findColumn("dim1").toAccessor(); + + for (int i = 1; i < rows.size(); i++) { + if (Objects.equals(accessor.getObject(i - 1), accessor.getObject(i))) { + Assert.assertEquals(0, accessor.compareRows(i - 1, i)); + } else { + Assert.assertTrue(accessor.compareRows(i - 1, i) < 0); + } + } + } + @Test public void test_makeDimensionSelector_aValue() { diff --git a/processing/src/test/java/org/apache/druid/frame/field/FieldReaderRACTest.java b/processing/src/test/java/org/apache/druid/frame/field/FieldReaderRACTest.java new file mode 100644 index 000000000000..70ff7037b8cb --- /dev/null +++ b/processing/src/test/java/org/apache/druid/frame/field/FieldReaderRACTest.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.frame.field; + +import org.apache.druid.error.DruidExceptionMatcher; +import org.apache.druid.frame.Frame; +import org.apache.druid.frame.FrameType; +import org.apache.druid.frame.testutil.FrameTestUtil; +import org.apache.druid.query.rowsandcols.column.ColumnAccessor; +import org.apache.druid.query.rowsandcols.concrete.RowBasedFrameRowsAndColumns; +import org.apache.druid.segment.ColumnValueSelector; +import org.apache.druid.segment.QueryableIndex; +import org.apache.druid.segment.QueryableIndexStorageAdapter; +import org.apache.druid.segment.SimpleAscendingOffset; +import org.apache.druid.segment.TestIndex; +import org.apache.druid.segment.column.BaseColumn; +import org.apache.druid.segment.column.ColumnHolder; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.testing.InitializedNullHandlingTest; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; + +public class FieldReaderRACTest extends InitializedNullHandlingTest +{ + final DruidExceptionMatcher noArraysMatcher = DruidExceptionMatcher + .defensive() + .expectMessageIs("Can only work with single-valued strings, should use a COMPLEX or ARRAY typed Column instead"); + + @Test + public void testDataSet() throws IOException + { + final QueryableIndex index = TestIndex.getMMappedTestIndex(); + final QueryableIndexStorageAdapter storageAdapter = new QueryableIndexStorageAdapter(index); + final Frame frame = FrameTestUtil.adapterToFrame(storageAdapter, FrameType.ROW_BASED); + + final RowSignature siggy = storageAdapter.getRowSignature(); + final RowBasedFrameRowsAndColumns rowBasedRAC = new RowBasedFrameRowsAndColumns(frame, siggy); + + for (String columnName : siggy.getColumnNames()) { + final ColumnHolder colHolder = index.getColumnHolder(columnName); + final boolean multiValue = colHolder.getCapabilities().hasMultipleValues().isTrue(); + + try (BaseColumn col = colHolder.getColumn()) { + final ColumnAccessor racCol = rowBasedRAC.findColumn(columnName).toAccessor(); + + final SimpleAscendingOffset offset = new SimpleAscendingOffset(racCol.numRows()); + final ColumnValueSelector selector = col.makeColumnValueSelector(offset); + while (offset.withinBounds()) { + if (multiValue) { + noArraysMatcher.assertThrowsAndMatches(() -> racCol.getObject(offset.getOffset())); + } else { + final Object racObj = racCol.getObject(offset.getOffset()); + Assert.assertEquals(racCol.isNull(offset.getOffset()), racCol.getObject(offset.getOffset()) == null); + Assert.assertEquals(selector.getObject(), racObj); + } + offset.increment(); + } + } + } + } +} diff --git a/processing/src/test/java/org/apache/druid/frame/field/FloatFieldReaderTest.java b/processing/src/test/java/org/apache/druid/frame/field/FloatFieldReaderTest.java index 4b8d10ca0e00..3669c89232d6 100644 --- a/processing/src/test/java/org/apache/druid/frame/field/FloatFieldReaderTest.java +++ b/processing/src/test/java/org/apache/druid/frame/field/FloatFieldReaderTest.java @@ -21,14 +21,20 @@ import org.apache.datasketches.memory.WritableMemory; import org.apache.druid.common.config.NullHandling; +import org.apache.druid.frame.key.KeyOrder; +import org.apache.druid.frame.write.FrameWriterTestData; import org.apache.druid.java.util.common.ISE; import org.apache.druid.query.extraction.SubstringDimExtractionFn; import org.apache.druid.query.filter.DruidObjectPredicate; import org.apache.druid.query.filter.StringPredicateDruidPredicateFactory; +import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns; +import org.apache.druid.query.rowsandcols.column.ColumnAccessor; +import org.apache.druid.query.rowsandcols.concrete.RowBasedFrameRowsAndColumnsTest; import org.apache.druid.segment.BaseFloatColumnValueSelector; import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.DimensionDictionarySelector; import org.apache.druid.segment.DimensionSelector; +import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.data.IndexedInts; import org.apache.druid.testing.InitializedNullHandlingTest; import org.junit.After; @@ -42,6 +48,9 @@ import org.mockito.junit.MockitoRule; import org.mockito.quality.Strictness; +import java.util.List; +import java.util.Objects; + public class FloatFieldReaderTest extends InitializedNullHandlingTest { private static final long MEMORY_POSITION = 1; @@ -75,6 +84,27 @@ public void test_isNull_defaultOrNull() Assert.assertEquals(NullHandling.sqlCompatible(), FloatFieldReader.forPrimitive().isNull(memory, MEMORY_POSITION)); } + @Test + public void testCompareRows() + { + final List rows = FrameWriterTestData.TEST_FLOATS.getData(KeyOrder.ASCENDING); + + final ColumnAccessor accessor = + RowBasedFrameRowsAndColumnsTest.MAKER.apply( + MapOfColumnsRowsAndColumns.builder() + .add("dim1", rows.toArray(), ColumnType.FLOAT) + .build() + ).findColumn("dim1").toAccessor(); + + for (int i = 1; i < rows.size(); i++) { + if (Objects.equals(accessor.getObject(i - 1), accessor.getObject(i))) { + Assert.assertEquals(0, accessor.compareRows(i - 1, i)); + } else { + Assert.assertTrue(accessor.compareRows(i - 1, i) < 0); + } + } + } + @Test public void test_isNull_aValue() { diff --git a/processing/src/test/java/org/apache/druid/frame/field/LongFieldReaderTest.java b/processing/src/test/java/org/apache/druid/frame/field/LongFieldReaderTest.java index aab55654e5af..266d86e12833 100644 --- a/processing/src/test/java/org/apache/druid/frame/field/LongFieldReaderTest.java +++ b/processing/src/test/java/org/apache/druid/frame/field/LongFieldReaderTest.java @@ -21,14 +21,20 @@ import org.apache.datasketches.memory.WritableMemory; import org.apache.druid.common.config.NullHandling; +import org.apache.druid.frame.key.KeyOrder; +import org.apache.druid.frame.write.FrameWriterTestData; import org.apache.druid.java.util.common.ISE; import org.apache.druid.query.extraction.SubstringDimExtractionFn; import org.apache.druid.query.filter.DruidObjectPredicate; import org.apache.druid.query.filter.StringPredicateDruidPredicateFactory; +import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns; +import org.apache.druid.query.rowsandcols.column.ColumnAccessor; +import org.apache.druid.query.rowsandcols.concrete.RowBasedFrameRowsAndColumnsTest; import org.apache.druid.segment.BaseLongColumnValueSelector; import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.DimensionDictionarySelector; import org.apache.druid.segment.DimensionSelector; +import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.data.IndexedInts; import org.apache.druid.testing.InitializedNullHandlingTest; import org.junit.After; @@ -42,6 +48,9 @@ import org.mockito.junit.MockitoRule; import org.mockito.quality.Strictness; +import java.util.List; +import java.util.Objects; + public class LongFieldReaderTest extends InitializedNullHandlingTest { private static final long MEMORY_POSITION = 1; @@ -202,6 +211,28 @@ public void test_makeDimensionSelector_aValue_extractionFn() Assert.assertFalse(readSelector.makeValueMatcher(StringPredicateDruidPredicateFactory.equalTo("2")).matches(false)); } + + @Test + public void testCompareRows() + { + final List rows = FrameWriterTestData.TEST_LONGS.getData(KeyOrder.ASCENDING); + + final ColumnAccessor accessor = + RowBasedFrameRowsAndColumnsTest.MAKER.apply( + MapOfColumnsRowsAndColumns.builder() + .add("dim1", rows.toArray(), ColumnType.LONG) + .build() + ).findColumn("dim1").toAccessor(); + + for (int i = 1; i < rows.size(); i++) { + if (Objects.equals(accessor.getObject(i - 1), accessor.getObject(i))) { + Assert.assertEquals(0, accessor.compareRows(i - 1, i)); + } else { + Assert.assertTrue(accessor.compareRows(i - 1, i) < 0); + } + } + } + private void writeToMemory(final Long value) { Mockito.when(writeSelector.isNull()).thenReturn(value == null); diff --git a/processing/src/test/java/org/apache/druid/query/rowsandcols/RowsAndColumnsTestBase.java b/processing/src/test/java/org/apache/druid/query/rowsandcols/RowsAndColumnsTestBase.java index 56be3d50f20e..7b639b3d48d4 100644 --- a/processing/src/test/java/org/apache/druid/query/rowsandcols/RowsAndColumnsTestBase.java +++ b/processing/src/test/java/org/apache/druid/query/rowsandcols/RowsAndColumnsTestBase.java @@ -24,6 +24,8 @@ import org.apache.druid.java.util.common.ISE; import org.apache.druid.query.rowsandcols.concrete.ColumnBasedFrameRowsAndColumns; import org.apache.druid.query.rowsandcols.concrete.ColumnBasedFrameRowsAndColumnsTest; +import org.apache.druid.query.rowsandcols.concrete.RowBasedFrameRowsAndColumns; +import org.apache.druid.query.rowsandcols.concrete.RowBasedFrameRowsAndColumnsTest; import org.junit.Assert; import org.junit.Test; @@ -67,7 +69,8 @@ private static ArrayList getMakers() new Object[]{ConcatRowsAndColumns.class, ConcatRowsAndColumnsTest.MAKER}, new Object[]{RearrangedRowsAndColumns.class, RearrangedRowsAndColumnsTest.MAKER}, new Object[]{ColumnBasedFrameRowsAndColumns.class, ColumnBasedFrameRowsAndColumnsTest.MAKER}, - new Object[]{StorageAdapterRowsAndColumns.class, StorageAdapterRowsAndColumnsTest.MAKER} + new Object[]{StorageAdapterRowsAndColumns.class, StorageAdapterRowsAndColumnsTest.MAKER}, + new Object[]{RowBasedFrameRowsAndColumns.class, RowBasedFrameRowsAndColumnsTest.MAKER} ); } diff --git a/processing/src/test/java/org/apache/druid/query/rowsandcols/concrete/ColumnBasedFrameRowsAndColumnsTest.java b/processing/src/test/java/org/apache/druid/query/rowsandcols/concrete/ColumnBasedFrameRowsAndColumnsTest.java index cd1bb1b81ecb..acfcbe6f83ed 100644 --- a/processing/src/test/java/org/apache/druid/query/rowsandcols/concrete/ColumnBasedFrameRowsAndColumnsTest.java +++ b/processing/src/test/java/org/apache/druid/query/rowsandcols/concrete/ColumnBasedFrameRowsAndColumnsTest.java @@ -33,17 +33,13 @@ public ColumnBasedFrameRowsAndColumnsTest() super(ColumnBasedFrameRowsAndColumns.class); } - public static Function MAKER = input -> { - - return buildFrame(input); - }; + public static Function MAKER = ColumnBasedFrameRowsAndColumnsTest::buildFrame; public static ColumnBasedFrameRowsAndColumns buildFrame(MapOfColumnsRowsAndColumns input) { LazilyDecoratedRowsAndColumns rac = new LazilyDecoratedRowsAndColumns(input, null, null, null, OffsetLimit.limit(Integer.MAX_VALUE), null, null); rac.numRows(); // materialize - return (ColumnBasedFrameRowsAndColumns) rac.getBase(); } } diff --git a/processing/src/test/java/org/apache/druid/query/rowsandcols/concrete/RowBasedFrameRowsAndColumnsTest.java b/processing/src/test/java/org/apache/druid/query/rowsandcols/concrete/RowBasedFrameRowsAndColumnsTest.java new file mode 100644 index 000000000000..867e83d9e008 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/rowsandcols/concrete/RowBasedFrameRowsAndColumnsTest.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.rowsandcols.concrete; + +import org.apache.druid.frame.Frame; +import org.apache.druid.frame.allocation.ArenaMemoryAllocatorFactory; +import org.apache.druid.frame.write.FrameWriter; +import org.apache.druid.frame.write.FrameWriters; +import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns; +import org.apache.druid.query.rowsandcols.RowsAndColumnsTestBase; +import org.apache.druid.query.rowsandcols.column.Column; +import org.apache.druid.query.rowsandcols.semantic.ColumnSelectorFactoryMaker; +import org.apache.druid.segment.ColumnSelectorFactory; +import org.apache.druid.segment.column.RowSignature; + +import java.util.Collections; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; + +public class RowBasedFrameRowsAndColumnsTest extends RowsAndColumnsTestBase +{ + public RowBasedFrameRowsAndColumnsTest() + { + super(RowBasedFrameRowsAndColumns.class); + } + + public static Function MAKER = RowBasedFrameRowsAndColumnsTest::buildFrame; + + private static RowBasedFrameRowsAndColumns buildFrame(MapOfColumnsRowsAndColumns rac) + { + final AtomicInteger rowId = new AtomicInteger(0); + final int numRows = rac.numRows(); + final ColumnSelectorFactoryMaker csfm = ColumnSelectorFactoryMaker.fromRAC(rac); + final ColumnSelectorFactory selectorFactory = csfm.make(rowId); + + final RowSignature.Builder sigBob = RowSignature.builder(); + final ArenaMemoryAllocatorFactory memFactory = new ArenaMemoryAllocatorFactory(200 << 20); + + + for (String column : rac.getColumnNames()) { + final Column racColumn = rac.findColumn(column); + if (racColumn == null) { + continue; + } + sigBob.add(column, racColumn.toAccessor().getType()); + } + + final RowSignature signature = sigBob.build(); + final FrameWriter frameWriter = FrameWriters.makeRowBasedFrameWriterFactory( + memFactory, + signature, + Collections.emptyList(), + false + ).newFrameWriter(selectorFactory); + + rowId.set(0); + for (; rowId.get() < numRows; rowId.incrementAndGet()) { + frameWriter.addSelection(); + } + + return new RowBasedFrameRowsAndColumns(Frame.wrap(frameWriter.toByteArray()), signature); + } +}