Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TABLE MODEL] Support user-defined aggregate function #14331

Merged
merged 53 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
ccb57c8
refactor
Cpaulyz Nov 19, 2024
ea028ef
refactor
Cpaulyz Nov 19, 2024
90d34e1
save
Cpaulyz Nov 19, 2024
0d971bc
save
Cpaulyz Nov 19, 2024
4490c55
rollback scalar function definition
Cpaulyz Nov 19, 2024
e1dc67e
add license
Cpaulyz Nov 19, 2024
9983999
add license
Cpaulyz Nov 19, 2024
93ad64b
fix it
Cpaulyz Nov 19, 2024
6a68d33
Merge branch 'table_udsf' into udsf
Cpaulyz Nov 19, 2024
499ae65
save
Cpaulyz Nov 19, 2024
007a2d8
spotless
Cpaulyz Nov 19, 2024
b8a4fb6
save
Cpaulyz Nov 19, 2024
f55f48d
fix builtin
Cpaulyz Nov 19, 2024
7321535
save
Cpaulyz Nov 19, 2024
87dc7ea
fix it
Cpaulyz Nov 19, 2024
fa3c683
fix it
Cpaulyz Nov 19, 2024
c215e63
fix it
Cpaulyz Nov 20, 2024
bc0873d
fix it
Cpaulyz Nov 20, 2024
656dc4c
fix it
Cpaulyz Nov 20, 2024
dbe07ca
fix review
Cpaulyz Nov 26, 2024
f30dda4
split drop function plan
Cpaulyz Nov 26, 2024
54b24c2
fix clear throw NPE
Cpaulyz Nov 26, 2024
fbe6fde
Merge branch 'table_udsf' into udsf
Cpaulyz Nov 27, 2024
0ed1d85
merge master
Cpaulyz Nov 27, 2024
b1912aa
revert useless change
Cpaulyz Nov 27, 2024
1e2ce48
Save
Cpaulyz Nov 27, 2024
e8ea11c
add IT
Cpaulyz Nov 27, 2024
b8d24a2
add license
Cpaulyz Nov 27, 2024
ebcd1b9
rename beforeStart and add getLocalDate
Cpaulyz Nov 28, 2024
5797329
Merge branch 'master' into udsf
Cpaulyz Nov 28, 2024
30be7e5
modify getdatatype
Cpaulyz Nov 28, 2024
69c6ec3
fix review
Cpaulyz Dec 4, 2024
324cac7
spotless
Cpaulyz Dec 4, 2024
736b709
add ut and fix it
Cpaulyz Dec 4, 2024
7fb82cf
save
Cpaulyz Dec 4, 2024
8f067a5
update pom and add date IT
Cpaulyz Dec 4, 2024
60fd763
add license
Cpaulyz Dec 4, 2024
bfb32a1
fix
Cpaulyz Dec 4, 2024
89adefc
fix cpp client
Cpaulyz Dec 4, 2024
2bb60b2
save
Cpaulyz Dec 5, 2024
7ce6d2b
save
Cpaulyz Dec 9, 2024
502775e
merge master
Cpaulyz Dec 9, 2024
b7141d1
fix it
Cpaulyz Dec 9, 2024
3f4e8e0
add aggregate IT
Cpaulyz Dec 10, 2024
864abf1
remove useless code
Cpaulyz Dec 10, 2024
db2b84b
exp
Cpaulyz Dec 11, 2024
0b33fb8
add removable
Cpaulyz Dec 12, 2024
1e7e8cb
delete useless:
Cpaulyz Dec 12, 2024
1374d01
done
Cpaulyz Dec 12, 2024
eb05523
resolve conflict
Cpaulyz Dec 13, 2024
918bbc7
update binary type and to string
Cpaulyz Dec 17, 2024
915331c
fix some review
Cpaulyz Dec 17, 2024
6467e2f
fix it
Cpaulyz Dec 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.udf;

import org.apache.iotdb.udf.api.State;
import org.apache.iotdb.udf.api.customizer.config.AggregateFunctionConfig;
import org.apache.iotdb.udf.api.customizer.parameter.FunctionParameters;
import org.apache.iotdb.udf.api.exception.UDFException;
import org.apache.iotdb.udf.api.exception.UDFParameterNotValidException;
import org.apache.iotdb.udf.api.relational.AggregateFunction;
import org.apache.iotdb.udf.api.relational.access.Record;
import org.apache.iotdb.udf.api.type.Type;
import org.apache.iotdb.udf.api.utils.ResultValue;

import java.nio.ByteBuffer;

/**
* This is an internal example of the AggregateFunction implementation.
*
* <p>CREATE DATABASE test;
*
* <p>USE test;
*
* <p>CREATE TABLE t1(device_id STRING ID, s1 TEXT MEASUREMENT, s2 INT32 MEASUREMENT);
*
* <p>INSERT INTO t1(time, device_id, s1, s2) VALUES (1, 'd1', 'a', 1), (2, 'd1', null, 2), (3,
* 'd2', 'c', null);
*
* <p>CREATE FUNCTION my_count AS 'org.apache.iotdb.udf.AggregateFunctionExample';
*
* <p>SHOW FUNCTIONS;
*
* <p>SELECT time, device_id, my_count(s1) as s1_count, my_count(s2) as s2_count FROM t1 group by
* device_id;
*
* <p>SELECT time, my_count(s1) as s1_count, my_count(s2) as s2_count FROM t1;
*/
public class AggregateFunctionExample implements AggregateFunction {

static class CountState implements State {

long count;

@Override
public void reset() {
count = 0;
}

@Override
public byte[] serialize() {
ByteBuffer buffer = ByteBuffer.allocate(Double.BYTES);
buffer.putLong(count);
return buffer.array();
}

@Override
public void deserialize(byte[] bytes) {
ByteBuffer buffer = ByteBuffer.wrap(bytes);
count = buffer.getLong();
}
}

@Override
public void validate(FunctionParameters parameters) throws UDFException {
if (parameters.getChildExpressionsSize() != 1) {
throw new UDFParameterNotValidException("Only one parameter is required.");
}
}

@Override
public void beforeStart(FunctionParameters parameters, AggregateFunctionConfig configurations) {
configurations.setOutputDataType(Type.INT64);
}

@Override
public State createState() {
return new CountState();
}

@Override
public void addInput(State state, Record input) {
CountState countState = (CountState) state;
if (!input.isNull(0)) {
countState.count++;
}
}

@Override
public void combineState(State state, State rhs) {
CountState countState = (CountState) state;
CountState rhsCountState = (CountState) rhs;
countState.count += rhsCountState.count;
}

@Override
public void outputFinal(State state, ResultValue resultValue) {
CountState countState = (CountState) state;
resultValue.setLong(countState.count);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.db.query.udf.example.relational;

import org.apache.iotdb.udf.api.State;
import org.apache.iotdb.udf.api.customizer.config.AggregateFunctionConfig;
import org.apache.iotdb.udf.api.customizer.parameter.FunctionParameters;
import org.apache.iotdb.udf.api.exception.UDFException;
import org.apache.iotdb.udf.api.relational.AggregateFunction;
import org.apache.iotdb.udf.api.relational.access.Record;
import org.apache.iotdb.udf.api.type.Type;
import org.apache.iotdb.udf.api.utils.ResultValue;

import java.nio.ByteBuffer;

public class FirstTwoSum implements AggregateFunction {

static class FirstTwoSumState implements State {
long firstTime = Long.MAX_VALUE;
long secondTime = Long.MAX_VALUE;
double firstValue;
double secondValue;

@Override
public void reset() {
firstTime = Long.MAX_VALUE;
secondTime = Long.MAX_VALUE;
firstValue = 0;
secondValue = 0;
}

@Override
public byte[] serialize() {
ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES * 2 + Double.BYTES * 2);
buffer.putLong(firstTime);
buffer.putLong(secondTime);
buffer.putDouble(firstValue);
buffer.putDouble(secondValue);
return buffer.array();
}

@Override
public void deserialize(byte[] bytes) {
ByteBuffer buffer = ByteBuffer.wrap(bytes);
firstTime = buffer.getLong();
secondTime = buffer.getLong();
firstValue = buffer.getDouble();
secondValue = buffer.getDouble();
}
}

@Override
public void validate(FunctionParameters parameters) throws UDFException {
if (parameters.getChildExpressionsSize() != 3) {
throw new UDFException("FirstTwoSum should accept three column as input");
}
for (int i = 0; i < 2; i++) {
if (parameters.getDataType(i) != Type.INT32
&& parameters.getDataType(i) != Type.INT64
&& parameters.getDataType(i) != Type.FLOAT
&& parameters.getDataType(i) != Type.DOUBLE) {
throw new UDFException(
"FirstTwoSum should accept INT32, INT64, FLOAT, DOUBLE as the first two inputs");
}
}
if (parameters.getDataType(2) != Type.TIMESTAMP) {
throw new UDFException("FirstTwoSum should accept TIMESTAMP as the third input");
}
}

@Override
public void beforeStart(FunctionParameters parameters, AggregateFunctionConfig configurations) {
configurations.setOutputDataType(Type.DOUBLE);
}

@Override
public State createState() {
return new FirstTwoSumState();
}

@Override
public void addInput(State state, Record input) {
FirstTwoSumState firstTwoSumState = (FirstTwoSumState) state;
long time = input.getLong(2);
if (!input.isNull(0) && time < firstTwoSumState.firstTime) {
firstTwoSumState.firstTime = time;
switch (input.getDataType(0)) {
case INT32:
firstTwoSumState.firstValue = input.getInt(0);
break;
case INT64:
firstTwoSumState.firstValue = input.getLong(0);
break;
case FLOAT:
firstTwoSumState.firstValue = input.getFloat(0);
break;
case DOUBLE:
firstTwoSumState.firstValue = input.getDouble(0);
break;
default:
throw new UDFException(
"FirstTwoSum should accept INT32, INT64, FLOAT, DOUBLE as the first two inputs");
}
}
if (!input.isNull(1) && time < firstTwoSumState.secondTime) {
firstTwoSumState.secondTime = time;
switch (input.getDataType(1)) {
case INT32:
firstTwoSumState.secondValue = input.getInt(1);
break;
case INT64:
firstTwoSumState.secondValue = input.getLong(1);
break;
case FLOAT:
firstTwoSumState.secondValue = input.getFloat(1);
break;
case DOUBLE:
firstTwoSumState.secondValue = input.getDouble(1);
break;
default:
throw new UDFException(
"FirstTwoSum should accept INT32, INT64, FLOAT, DOUBLE as the first two inputs");
}
}
}

@Override
public void combineState(State state, State rhs) {
FirstTwoSumState firstTwoSumState = (FirstTwoSumState) state;
FirstTwoSumState rhsState = (FirstTwoSumState) rhs;
if (rhsState.firstTime < firstTwoSumState.firstTime) {
firstTwoSumState.firstTime = rhsState.firstTime;
firstTwoSumState.firstValue = rhsState.firstValue;
}
if (rhsState.secondTime < firstTwoSumState.secondTime) {
firstTwoSumState.secondTime = rhsState.secondTime;
firstTwoSumState.secondValue = rhsState.secondValue;
}
}

@Override
public void outputFinal(State state, ResultValue resultValue) {
FirstTwoSumState firstTwoSumState = (FirstTwoSumState) state;
if (firstTwoSumState.firstTime == Long.MAX_VALUE
&& firstTwoSumState.secondTime == Long.MAX_VALUE) {
resultValue.setNull();
} else {
resultValue.setDouble(firstTwoSumState.firstValue + firstTwoSumState.secondValue);
}
}
}
Loading
Loading