Skip to content

Commit

Permalink
Add fieldReader for row based frames (#16707)
Browse files Browse the repository at this point in the history
Add a new fieldReaders#makeRAC for RowBasedFrameRowsAndColumns.
  • Loading branch information
adarshsanjeev authored Aug 13, 2024
1 parent f67ff92 commit c6da2f3
Show file tree
Hide file tree
Showing 20 changed files with 1,190 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,32 @@

import com.google.common.base.Preconditions;
import org.apache.datasketches.memory.Memory;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.write.RowBasedFrameWriter;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.extraction.ExtractionFn;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.query.rowsandcols.column.ColumnAccessor;
import org.apache.druid.query.rowsandcols.column.accessor.ObjectColumnAccessorBase;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.DimensionSelector;
import org.apache.druid.segment.ObjectColumnSelector;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.serde.ComplexMetricSerde;
import org.apache.druid.segment.serde.ComplexMetrics;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.Comparator;

/**
* Reads values written by {@link ComplexFieldWriter}.
*
* <p>
* Format:
*
* <p>
* - 1 byte: {@link ComplexFieldWriter#NULL_BYTE} or {@link ComplexFieldWriter#NOT_NULL_BYTE}
* - 4 bytes: length of serialized complex value, little-endian int
* - N bytes: serialized complex value
Expand Down Expand Up @@ -121,7 +129,7 @@ public static Object readFieldFromByteArray(
* Alternative interface to read the field from the memory without creating a selector and field pointer
*/
@Nullable
public static Object readFieldFromMemory(
public static <T> T readFieldFromMemory(
final ComplexMetricSerde serde,
final Memory memory,
final long position
Expand All @@ -136,7 +144,8 @@ public static Object readFieldFromMemory(
final byte[] bytes = new byte[length];
memory.getByteArray(position + ComplexFieldWriter.HEADER_SIZE, bytes, 0, length);

return serde.fromBytes(bytes, 0, length);
//noinspection unchecked
return (T) serde.fromBytes(bytes, 0, length);
} else {
throw new ISE("Unexpected null byte [%s]", nullByte);
}
Expand Down Expand Up @@ -166,8 +175,8 @@ private Selector(Memory memory, ReadableFieldPointer fieldPointer, ComplexMetric
@Override
public T getObject()
{
//noinspection unchecked
return (T) readFieldFromMemory(serde, memory, fieldPointer.position());
final long fieldPosition = fieldPointer.position();
return readFieldFromMemory(serde, memory, fieldPosition);
}

@Override
Expand All @@ -183,4 +192,80 @@ public void inspectRuntimeShape(RuntimeShapeInspector inspector)
// Do nothing.
}
}

@Override
public Column makeRACColumn(Frame frame, RowSignature signature, String columnName)
{
return new ComplexFieldReaderColumn(frame, signature.indexOf(columnName), signature.size());
}

private class ComplexFieldReaderColumn implements Column
{
private final Frame frame;
private final Memory dataRegion;
private final ColumnType type;
private final FieldPositionHelper coach;

public ComplexFieldReaderColumn(Frame frame, int columnIndex, int numFields)
{
this.frame = frame;
dataRegion = frame.region(RowBasedFrameWriter.ROW_DATA_REGION);

this.type = ColumnType.ofComplex(serde.getTypeName());
this.coach = new FieldPositionHelper(
frame,
frame.region(RowBasedFrameWriter.ROW_OFFSET_REGION),
dataRegion,
columnIndex,
numFields
);
}

@Nonnull
@Override
public ColumnAccessor toAccessor()
{
return new ObjectColumnAccessorBase()
{
@Override
public ColumnType getType()
{
return type;
}

@Override
public int numRows()
{
return frame.numRows();
}

@Override
public boolean isNull(int rowNum)
{
final long fieldPosition = coach.computeFieldPosition(rowNum);
return dataRegion.getByte(fieldPosition) == ComplexFieldWriter.NULL_BYTE;
}

@Override
protected Object getVal(int rowNum)
{
return readFieldFromMemory(serde, dataRegion, coach.computeFieldPosition(rowNum));
}

@Override
protected Comparator<Object> getComparator()
{
return serde.getTypeStrategy();
}

};
}

@Nullable
@Override
public <T> T as(Class<? extends T> clazz)
{
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@
package org.apache.druid.frame.field;

import org.apache.datasketches.memory.Memory;
import org.apache.druid.error.NotYetImplemented;
import org.apache.druid.frame.Frame;
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.column.RowSignature;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -60,4 +64,10 @@ public int getIndividualFieldSize()
}
};
}

@Override
public Column makeRACColumn(Frame frame, RowSignature signature, String columnName)
{
throw NotYetImplemented.ex(null, "Class cannot create an RAC column.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,20 @@

import org.apache.datasketches.memory.Memory;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.write.RowBasedFrameWriter;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.query.rowsandcols.column.ColumnAccessor;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.DoubleColumnSelector;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.column.ValueType;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

/**
* Reads the values produced by {@link DoubleFieldWriter}
*
Expand Down Expand Up @@ -99,4 +108,135 @@ public boolean isNull()
return super._isNull();
}
}

@Override
public Column makeRACColumn(Frame frame, RowSignature signature, String columnName)
{
return new DoubleFieldReaderColumn(frame, signature.indexOf(columnName), signature.size());
}

private class DoubleFieldReaderColumn implements Column
{
private final Frame frame;
private final Memory dataRegion;
private final FieldPositionHelper coach;

public DoubleFieldReaderColumn(Frame frame, int columnIndex, int numFields)
{
this.frame = frame;
dataRegion = frame.region(RowBasedFrameWriter.ROW_DATA_REGION);

this.coach = new FieldPositionHelper(
frame,
frame.region(RowBasedFrameWriter.ROW_OFFSET_REGION),
dataRegion,
columnIndex,
numFields
);
}

@Nonnull
@Override
public ColumnAccessor toAccessor()
{
return new ColumnAccessor()
{
@Override
public ColumnType getType()
{
return ColumnType.DOUBLE;
}

@Override
public int numRows()
{
return frame.numRows();
}

@Override
public boolean isNull(int rowNum)
{
final long fieldPosition = coach.computeFieldPosition(rowNum);
return dataRegion.getByte(fieldPosition) == getNullIndicatorByte();
}

@Nullable
@Override
public Object getObject(int rowNum)
{
final long fieldPosition = coach.computeFieldPosition(rowNum);

if (dataRegion.getByte(fieldPosition) == getNullIndicatorByte()) {
return null;
} else {
return getDoubleAtPosition(fieldPosition);
}
}

@Override
public double getDouble(int rowNum)
{
final long fieldPosition = coach.computeFieldPosition(rowNum);

if (dataRegion.getByte(fieldPosition) == getNullIndicatorByte()) {
return 0L;
} else {
return getDoubleAtPosition(fieldPosition);
}
}

@Override
public float getFloat(int rowNum)
{
return (float) getDouble(rowNum);
}

@Override
public long getLong(int rowNum)
{
return (long) getDouble(rowNum);
}

@Override
public int getInt(int rowNum)
{
return (int) getDouble(rowNum);
}

@Override
public int compareRows(int lhsRowNum, int rhsRowNum)
{
long lhsPosition = coach.computeFieldPosition(lhsRowNum);
long rhsPosition = coach.computeFieldPosition(rhsRowNum);

final byte nullIndicatorByte = getNullIndicatorByte();
if (dataRegion.getByte(lhsPosition) == nullIndicatorByte) {
if (dataRegion.getByte(rhsPosition) == nullIndicatorByte) {
return 0;
} else {
return -1;
}
} else {
if (dataRegion.getByte(rhsPosition) == nullIndicatorByte) {
return 1;
} else {
return Double.compare(getDoubleAtPosition(lhsPosition), getDoubleAtPosition(rhsPosition));
}
}
}

private double getDoubleAtPosition(long lhsPosition)
{
return TransformUtils.detransformToDouble(dataRegion.getLong(lhsPosition + Byte.BYTES));
}
};
}

@Nullable
@Override
public <T> T as(Class<? extends T> clazz)
{
return null;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.frame.field;

import org.apache.datasketches.memory.Memory;
import org.apache.druid.frame.Frame;

/**
* Helps compute the field position for a frame from the different regions in the frame.
*/
public class FieldPositionHelper
{
private final Frame frame;
private final Memory offsetRegion;
private final Memory dataRegion;
private final int columnIndex;
private final long fieldsBytesSize;

public FieldPositionHelper(
Frame frame,
Memory offsetRegion,
Memory dataRegion,
int columnIndex,
int numFields
)
{
this.frame = frame;
this.offsetRegion = offsetRegion;
this.dataRegion = dataRegion;
this.columnIndex = columnIndex;
this.fieldsBytesSize = this.columnIndex == 0
? ((long) numFields) * Integer.BYTES
: ((long) (this.columnIndex - 1)) * Integer.BYTES;
}

public long computeFieldPosition(int rowNum)
{
rowNum = frame.physicalRow(rowNum);
final long rowPosition = rowNum == 0 ? 0 : offsetRegion.getLong(((long) rowNum - 1) * Long.BYTES);
final long fieldPosition;
if (columnIndex == 0) {
fieldPosition = rowPosition + fieldsBytesSize;
} else {
fieldPosition = rowPosition + dataRegion.getInt(rowPosition + fieldsBytesSize);
}
return fieldPosition;
}
}
Loading

0 comments on commit c6da2f3

Please sign in to comment.