Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: provide key columns as scalars (vs. vectors) to RollingFormula #6375

Merged
merged 4 commits into from
Nov 19, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Addressed PR comments.
lbooker42 committed Nov 18, 2024
commit 1e671a05163566cfc849791520529f71463777f8
Original file line number Diff line number Diff line change
@@ -1460,7 +1460,7 @@ private UpdateByOperator makeRollingFormulaMultiColumnOperator(
cd.getDataType())));
}

// Get the input column names from the formula and provide them to the groupBy operator
// Get the input column names from the formula and provide them to the rolling formula operator
final String[] allInputColumns =
selectColumn.initDef(vectorColumnDefinitions, compilationProcessor).toArray(String[]::new);
if (!selectColumn.getColumnArrays().isEmpty()) {
@@ -1494,10 +1494,9 @@ private UpdateByOperator makeRollingFormulaMultiColumnOperator(

final String[] affectingColumns;
if (rs.revWindowScale().timestampCol() == null) {
affectingColumns = ArrayUtils.addAll(inputKeyColumns, inputNonKeyColumns);
affectingColumns = inputNonKeyColumns;
} else {
affectingColumns = ArrayUtils.add(ArrayUtils.addAll(inputKeyColumns, inputNonKeyColumns),
rs.revWindowScale().timestampCol());
affectingColumns = ArrayUtils.add(inputNonKeyColumns, rs.revWindowScale().timestampCol());
}

// Create a new column pair with the same name for the left and right columns
Original file line number Diff line number Diff line change
@@ -13,6 +13,7 @@
import io.deephaven.engine.table.impl.QueryTable;
import io.deephaven.engine.table.impl.TableUpdateImpl;
import io.deephaven.engine.table.impl.util.RowRedirection;
import io.deephaven.util.type.ArrayTypeUtils;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

@@ -69,7 +70,8 @@ public class ZeroKeyUpdateByManager extends UpdateBy {

// create an updateby bucket instance directly from the source table
zeroKeyUpdateBy = new UpdateByBucketHelper(bucketDescription, source, windows, resultSources,
timestampColumnName, control, (oe, se) -> deliverUpdateError(oe, se, true), new Object[0]);
timestampColumnName, control, (oe, se) -> deliverUpdateError(oe, se, true),
ArrayTypeUtils.EMPTY_OBJECT_ARRAY);
buckets.offer(zeroKeyUpdateBy);

// make the source->result transformer
@@ -87,7 +89,7 @@ public class ZeroKeyUpdateByManager extends UpdateBy {
zeroKeyUpdateBy = new UpdateByBucketHelper(bucketDescription, source, windows, resultSources,
timestampColumnName, control, (oe, se) -> {
throw new IllegalStateException("Update failure from static zero key updateBy");
}, new Object[0]);
}, ArrayTypeUtils.EMPTY_OBJECT_ARRAY);
result = zeroKeyUpdateBy.result;
buckets.offer(zeroKeyUpdateBy);
sourceListener = null;
Original file line number Diff line number Diff line change
@@ -19,7 +19,6 @@
import io.deephaven.engine.table.impl.updateby.rollingformulamulticolumn.windowconsumer.RingBufferWindowConsumer;
import io.deephaven.engine.table.impl.util.ChunkUtils;
import io.deephaven.engine.table.impl.util.RowRedirection;
import io.deephaven.util.type.TypeUtils;
import io.deephaven.vector.Vector;
import org.apache.commons.lang3.ArrayUtils;
import org.jetbrains.annotations.NotNull;
@@ -55,7 +54,8 @@ private class Context extends UpdateByOperator.Context {
private final IntConsumer outputSetter;
private final IntConsumer outputNullSetter;

private final SingleValueColumnSource<?>[] keyValueSources;
@SuppressWarnings("rawtypes")
private final SingleValueColumnSource[] keyValueSources;
private final RingBufferWindowConsumer[] inputConsumers;

@SuppressWarnings("unused")
@@ -222,7 +222,8 @@ public void close() {

private void setBucketKeyValues(final Object[] bucketKeyValues) {
for (int i = 0; i < keyValueSources.length; i++) {
assignKeyValue(bucketKeyValues[i], keyValueSources[i]);
// noinspection unchecked
keyValueSources[i].set(bucketKeyValues[i]);
}
}
}
@@ -398,48 +399,6 @@ protected static IntConsumer getChunkNullSetter(final WritableChunk<? extends Va
return index -> objectChunk.set(index, null);
}
}

/**
* This assigns the value at {@code index} in {@code valueChunk} to the {@code inputColumnSource}. This is not
* particularly efficient but is only be called once per bucket.
*/
private static void assignKeyValue(
@NotNull final Object keyValue,
@NotNull final SingleValueColumnSource<?> inputColumnSource) {

final ChunkType chunkType = inputColumnSource.getChunkType();
switch (chunkType) {
case Boolean:
throw new IllegalStateException(
"Input chunk type should not be Boolean but should have been reinterpreted to byte");
case Byte:
inputColumnSource.set(TypeUtils.unbox((Byte) keyValue));
break;
case Char:
inputColumnSource.set(TypeUtils.unbox((Character) keyValue));
break;
case Double:
inputColumnSource.set(TypeUtils.unbox((Double) keyValue));
break;
case Float:
inputColumnSource.set(TypeUtils.unbox((Float) keyValue));
break;
case Int:
inputColumnSource.set(TypeUtils.unbox((Integer) keyValue));
break;
case Long:
inputColumnSource.set(TypeUtils.unbox((Long) keyValue));
break;
case Short:
inputColumnSource.set(TypeUtils.unbox((Short) keyValue));
break;
default:
// noinspection unchecked
final ObjectSingleValueSource<Object> source = (ObjectSingleValueSource<Object>) inputColumnSource;
source.set(keyValue);
break;
}
}
// endregion value-setters

@Override