diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBAlignByDeviceWithTemplateAggregationIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBAlignByDeviceWithTemplateAggregationIT.java new file mode 100644 index 000000000000..5ec4566c54ce --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBAlignByDeviceWithTemplateAggregationIT.java @@ -0,0 +1,542 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.it.alignbydevice; + +import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.ClusterIT; +import org.apache.iotdb.itbase.category.LocalStandaloneIT; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.sql.Connection; +import java.sql.Statement; + +import static org.apache.iotdb.db.it.utils.TestUtils.assertTestFail; +import static org.apache.iotdb.db.it.utils.TestUtils.resultSetEqualTest; + +@RunWith(IoTDBTestRunner.class) +@Category({LocalStandaloneIT.class, ClusterIT.class}) +public class IoTDBAlignByDeviceWithTemplateAggregationIT { + private static final String[] sqls = + new String[] { + // non-aligned template + "CREATE database root.sg1;", + "CREATE schema template t1 (s1 FLOAT encoding=RLE, s2 BOOLEAN encoding=PLAIN compression=SNAPPY, s3 INT32);", + "SET SCHEMA TEMPLATE t1 to root.sg1;", + "INSERT INTO root.sg1.d1(timestamp,s1,s2,s3) values(1,1.1,false,1), (2,2.2,false,2), (5,5.5,true,5), (1314000000000,13.14,true,1314);", + "INSERT INTO root.sg1.d2(timestamp,s1,s2,s3) values(1,11.1,false,11), (2,22.2,false,22), (5,50.0,false,5), (1314000000001,13.15,false,1315);", + "flush;", + "INSERT INTO root.sg1.d3(timestamp,s1,s2,s3) values(1,111.1,true,null), (4,444.4,true,44), (8,8.8,false,4), (1314000000002,13.16,false,1316);", + "INSERT INTO root.sg1.d4(timestamp,s1,s2,s3) values(1,1111.1,true,1111), (5,5555.5,false,5555), (8,0.8,true,10), (1314000000003,13.14,true,1314);", + + // aligned template + "CREATE database root.sg2;", + "CREATE schema template t2 aligned (s1 FLOAT encoding=RLE, s2 BOOLEAN encoding=PLAIN compression=SNAPPY, s3 INT32);", + "SET SCHEMA TEMPLATE t2 to root.sg2;", + "INSERT INTO root.sg2.d1(timestamp,s1,s2,s3) values(1,1.1,false,1), (2,2.2,false,2), (5,5.5,true,5), (1314000000000,13.14,true,1314);", + "INSERT INTO root.sg2.d2(timestamp,s1,s2,s3) values(1,11.1,false,11), (2,22.2,false,22), (5,50.0,false,5), (1314000000001,13.15,false,1315);", + "flush;", + "INSERT INTO root.sg2.d3(timestamp,s1,s2,s3) values(1,111.1,true,null), (4,444.4,true,44), (8,8.8,false,4), (1314000000002,13.16,false,1316);", + "INSERT INTO root.sg2.d4(timestamp,s1,s2,s3) values(1,1111.1,true,1111), (5,5555.5,false,5555), (8,0.8,true,10), (1314000000003,13.14,true,1314);", + }; + + @BeforeClass + public static void setUp() throws Exception { + EnvFactory.getEnv().initClusterEnvironment(); + insertData(); + } + + @AfterClass + public static void tearDown() throws Exception { + EnvFactory.getEnv().cleanClusterEnvironment(); + } + + @Test + public void aggregationTest() { + // only descending test + // no value filter + String[] expectedHeader = new String[] {"Device,max_time(s1),last_value(s1),last_value(s2)"}; + String[] retArray = + new String[] { + "root.sg1.d1,1314000000000,13.14,true,", + "root.sg1.d2,1314000000001,13.15,false,", + "root.sg1.d3,1314000000002,13.16,false,", + "root.sg1.d4,1314000000003,13.14,true,", + }; + resultSetEqualTest( + "SELECT max_time(s1), last_value(s1), last_value(s2) FROM root.sg1.** align by device;", + expectedHeader, + retArray); + + expectedHeader = new String[] {"Device,max_time(s1),last_value(s1),last_value(s2)"}; + retArray = + new String[] { + "root.sg2.d1,1314000000000,13.14,true,", + "root.sg2.d2,1314000000001,13.15,false,", + "root.sg2.d3,1314000000002,13.16,false,", + "root.sg2.d4,1314000000003,13.14,true,", + }; + resultSetEqualTest( + "SELECT max_time(s1), last_value(s1), last_value(s2) FROM root.sg2.** align by device;", + expectedHeader, + retArray); + + // __endTime result is ambiguous + + // not supported: group by session, condition, agg(*), agg(s1+1), count(s1+s2), non-aligned + // template + } + + @Test + public void filterTest() { + String[] expectedHeader = new String[] {"Device,max_time(s1),last_value(s1),last_value(s2)"}; + String[] retArray = + new String[] { + "root.sg1.d2,1314000000001,13.15,false,", + "root.sg1.d3,1314000000002,13.16,false,", + "root.sg1.d4,5,5555.5,false,", + }; + resultSetEqualTest( + "SELECT max_time(s1), last_value(s1), last_value(s2) FROM root.sg1.** where s3+1=1316 or s2=false having avg(s1)>2 align by device;", + expectedHeader, + retArray); + + retArray = + new String[] { + "root.sg2.d2,1314000000001,13.15,false,", + "root.sg2.d3,1314000000002,13.16,false,", + "root.sg2.d4,5,5555.5,false,", + }; + resultSetEqualTest( + "SELECT max_time(s1), last_value(s1), last_value(s2) FROM root.sg2.** where s3+1=1316 or s2=false having avg(s1)>2 align by device;", + expectedHeader, + retArray); + + // supported: ascending with descending aggregation descriptors + expectedHeader = new String[] {"Device,max_time(s1),last_value(s1),count(s2),first_value(s3)"}; + retArray = + new String[] { + "root.sg1.d2,1314000000001,13.15,4,11,", + "root.sg1.d3,1314000000002,13.16,2,4,", + "root.sg1.d4,5,5555.5,1,5555,", + }; + resultSetEqualTest( + "SELECT max_time(s1), last_value(s1), count(s2), first_value(s3) FROM root.sg1.** where s3+1=1316 or s2=false having avg(s1)>2 align by device;", + expectedHeader, + retArray); + + retArray = + new String[] { + "root.sg2.d2,1314000000001,13.15,4,11,", + "root.sg2.d3,1314000000002,13.16,2,4,", + "root.sg2.d4,5,5555.5,1,5555,", + }; + resultSetEqualTest( + "SELECT max_time(s1), last_value(s1), count(s2), first_value(s3) FROM root.sg2.** where s3+1=1316 or s2=false having avg(s1)>2 align by device;", + expectedHeader, + retArray); + + // duplicate select expressions + expectedHeader = + new String[] { + "Device,max_time(s1),last_value(s1),count(s2),first_value(s3),last_value(s1)" + }; + retArray = + new String[] { + "root.sg1.d2,1314000000001,13.15,4,11,13.15,", + "root.sg1.d3,1314000000002,13.16,2,4,13.16,", + "root.sg1.d4,5,5555.5,1,5555,5555.5,", + }; + resultSetEqualTest( + "SELECT max_time(s1), last_value(s1), count(s2), first_value(s3), last_value(s1) FROM root.sg1.** where s3+1=1316 or s2=false having avg(s1)>2 align by device;", + expectedHeader, + retArray); + retArray = + new String[] { + "root.sg2.d2,1314000000001,13.15,4,11,13.15,", + "root.sg2.d3,1314000000002,13.16,2,4,13.16,", + "root.sg2.d4,5,5555.5,1,5555,5555.5,", + }; + resultSetEqualTest( + "SELECT max_time(s1), last_value(s1), count(s2), first_value(s3), last_value(s1) FROM root.sg2.** where s3+1=1316 or s2=false having avg(s1)>2 align by device;", + expectedHeader, + retArray); + + // alias + expectedHeader = new String[] {"Device,c1,first_value(s3),c2"}; + retArray = + new String[] { + "root.sg1.d2,4,11,4,", "root.sg1.d3,2,4,2,", "root.sg1.d4,1,5555,1,", + }; + resultSetEqualTest( + "SELECT count(s1) as c1, first_value(s3), count(s1) as c2 FROM root.sg1.** where s3+1=1316 or s2=false having avg(s1)>2 align by device;", + expectedHeader, + retArray); + retArray = + new String[] { + "root.sg2.d2,4,11,4,", "root.sg2.d3,2,4,2,", "root.sg2.d4,1,5555,1,", + }; + resultSetEqualTest( + "SELECT count(s1) as c1, first_value(s3), count(s1) as c2 FROM root.sg2.** where s3+1=1316 or s2=false having avg(s1)>2 align by device;", + expectedHeader, + retArray); + + // arithmetic expression + expectedHeader = + new String[] {"Device,max_time(s1),count(s1),last_value(s2),count(s1) + last_value(s3)"}; + retArray = + new String[] { + "root.sg1.d2,1314000000001,4,false,1319.0,", + "root.sg1.d3,1314000000002,2,false,1318.0,", + "root.sg1.d4,5,1,false,5556.0,", + }; + resultSetEqualTest( + "SELECT max_time(s1), count(s1), last_value(s2), count(s1)+last_value(s3) FROM root.sg1.** where s3+1=1316 or s2=false having avg(s1)+sum(s3)>5 align by device;", + expectedHeader, + retArray); + retArray = + new String[] { + "root.sg2.d2,1314000000001,4,false,1319.0,", + "root.sg2.d3,1314000000002,2,false,1318.0,", + "root.sg2.d4,5,1,false,5556.0,", + }; + resultSetEqualTest( + "SELECT max_time(s1), count(s1), last_value(s2), count(s1)+last_value(s3) FROM root.sg2.** where s3+1=1316 or s2=false having avg(s1)+sum(s3)>5 align by device;", + expectedHeader, + retArray); + + expectedHeader = new String[] {"Device,max_time(s1),last_value(s1),count(s2),first_value(s3)"}; + retArray = + new String[] { + "root.sg1.d1,1314000000000,13.14,3,2,", + "root.sg1.d2,1314000000001,13.15,4,11,", + "root.sg1.d3,1314000000002,13.16,3,44,", + "root.sg1.d4,1314000000003,13.14,4,1111,", + }; + resultSetEqualTest( + "SELECT max_time(s1), last_value(s1), count(s2), first_value(s3) FROM root.sg1.** where s3>1 align by device;", + expectedHeader, + retArray); + retArray = + new String[] { + "root.sg2.d1,1314000000000,13.14,3,2,", + "root.sg2.d2,1314000000001,13.15,4,11,", + "root.sg2.d3,1314000000002,13.16,3,44,", + "root.sg2.d4,1314000000003,13.14,4,1111,", + }; + resultSetEqualTest( + "SELECT max_time(s1), last_value(s1), count(s2), first_value(s3) FROM root.sg2.** where s3>1 align by device;", + expectedHeader, + retArray); + } + + @Test + public void countTimeTest() { + // no filter + String[] expectedHeader = new String[] {"Device,count_time(*)"}; + String[] retArray = + new String[] { + "root.sg1.d1,4,", "root.sg1.d2,4,", "root.sg1.d3,4,", "root.sg1.d4,4,", + }; + resultSetEqualTest( + "SELECT count_time(*) FROM root.sg1.** align by device;", expectedHeader, retArray); + retArray = + new String[] { + "root.sg2.d1,4,", "root.sg2.d2,4,", "root.sg2.d3,4,", "root.sg2.d4,4,", + }; + resultSetEqualTest( + "SELECT count_time(*) FROM root.sg2.** align by device;", expectedHeader, retArray); + + // and filter + expectedHeader = new String[] {"Device,count_time(*)"}; + retArray = + new String[] { + "root.sg1.d1,2,", "root.sg1.d2,4,", "root.sg1.d3,2,", "root.sg1.d4,1,", + }; + resultSetEqualTest( + "SELECT count_time(*) FROM root.sg1.** where s3>0 and s2=false align by device;", + expectedHeader, + retArray); + retArray = + new String[] { + "root.sg2.d1,2,", "root.sg2.d2,4,", "root.sg2.d3,2,", "root.sg2.d4,1,", + }; + resultSetEqualTest( + "SELECT count_time(*) FROM root.sg2.** where s3>0 and s2=false align by device;", + expectedHeader, + retArray); + + // or filter + expectedHeader = new String[] {"Device,count_time(*)"}; + retArray = + new String[] { + "root.sg1.d1,2,", "root.sg1.d2,4,", "root.sg1.d3,2,", "root.sg1.d4,1,", + }; + resultSetEqualTest( + "SELECT count_time(*) FROM root.sg1.** where s3+1=1316 or s2=false align by device;", + expectedHeader, + retArray); + retArray = + new String[] { + "root.sg2.d1,2,", "root.sg2.d2,4,", "root.sg2.d3,2,", "root.sg2.d4,1,", + }; + resultSetEqualTest( + "SELECT count_time(*) FROM root.sg2.** where s3+1=1316 or s2=false align by device;", + expectedHeader, + retArray); + + // group by + expectedHeader = new String[] {"Time,Device,count_time(*)"}; + retArray = + new String[] { + "1,root.sg1.d1,2,", + "6,root.sg1.d1,0,", + "1,root.sg1.d2,3,", + "6,root.sg1.d2,0,", + "1,root.sg1.d3,0,", + "6,root.sg1.d3,1,", + "1,root.sg1.d4,1,", + "6,root.sg1.d4,0,", + }; + resultSetEqualTest( + "SELECT count_time(*) FROM root.sg1.** where s3+1=1316 or s2=false group by ([1,10), 5ms) align by device;", + expectedHeader, + retArray); + expectedHeader = new String[] {"Time,Device,count_time(*)"}; + retArray = + new String[] { + "1,root.sg2.d1,2,", + "6,root.sg2.d1,0,", + "1,root.sg2.d2,3,", + "6,root.sg2.d2,0,", + "1,root.sg2.d3,0,", + "6,root.sg2.d3,1,", + "1,root.sg2.d4,1,", + "6,root.sg2.d4,0,", + }; + resultSetEqualTest( + "SELECT count_time(*) FROM root.sg2.** where s3+1=1316 or s2=false group by ([1,10), 5ms) align by device;", + expectedHeader, + retArray); + } + + @Test + public void groupByTest() { + String[] expectedHeader = + new String[] {"Time,Device,max_time(s1),last_value(s1),last_value(s2)"}; + String[] retArray = + new String[] { + "1,root.sg1.d1,2,2.2,false,", + "5,root.sg1.d1,5,5.5,true,", + "1,root.sg1.d2,2,22.2,false,", + "5,root.sg1.d2,5,50.0,false,", + "1,root.sg1.d3,1,111.1,true,", + }; + resultSetEqualTest( + "select max_time(s1), last_value(s1), last_value(s2) from root.sg1.** group by ([1,10), 2ms) having last_value(s2) is not null limit 5 align by device;", + expectedHeader, + retArray); + expectedHeader = new String[] {"Time,Device,max_time(s1),last_value(s1),last_value(s2)"}; + retArray = + new String[] { + "1,root.sg2.d1,2,2.2,false,", + "5,root.sg2.d1,5,5.5,true,", + "1,root.sg2.d2,2,22.2,false,", + "5,root.sg2.d2,5,50.0,false,", + "1,root.sg2.d3,1,111.1,true,", + }; + resultSetEqualTest( + "select max_time(s1), last_value(s1), last_value(s2) from root.sg2.** group by ([1,10), 2ms) having last_value(s2) is not null limit 5 align by device;", + expectedHeader, + retArray); + + // sliding window + expectedHeader = new String[] {"Time,Device,max_time(s1),last_value(s1),last_value(s2)"}; + retArray = + new String[] { + "1,root.sg1.d1,2,2.2,false,", + "1,root.sg1.d2,2,22.2,false,", + "3,root.sg1.d2,5,50.0,false,", + "5,root.sg1.d2,5,50.0,false,", + "7,root.sg1.d3,8,8.8,false,", + "3,root.sg1.d4,5,5555.5,false,", + "5,root.sg1.d4,5,5555.5,false,", + }; + resultSetEqualTest( + "SELECT max_time(s1), last_value(s1), last_value(s2) FROM root.sg1.** where s3+1=1316 or s2=false group by ([1,10),3ms,2ms) having avg(s1)>0 align by device;", + expectedHeader, + retArray); + expectedHeader = new String[] {"Time,Device,max_time(s1),last_value(s1),last_value(s2)"}; + retArray = + new String[] { + "1,root.sg2.d1,2,2.2,false,", + "1,root.sg2.d2,2,22.2,false,", + "3,root.sg2.d2,5,50.0,false,", + "5,root.sg2.d2,5,50.0,false,", + "7,root.sg2.d3,8,8.8,false,", + "3,root.sg2.d4,5,5555.5,false,", + "5,root.sg2.d4,5,5555.5,false,", + }; + resultSetEqualTest( + "SELECT max_time(s1), last_value(s1), last_value(s2) FROM root.sg2.** where s3+1=1316 or s2=false group by ([1,10),3ms,2ms) having avg(s1)>0 align by device;", + expectedHeader, + retArray); + } + + @Test + public void havingTest() { + String[] expectedHeader = new String[] {"Device,max_time(s1),last_value(s1),last_value(s2)"}; + String[] retArray = + new String[] { + "root.sg1.d1,2,2.2,false,", + "root.sg1.d2,1314000000001,13.15,false,", + "root.sg1.d3,1314000000002,13.16,false,", + "root.sg1.d4,5,5555.5,false,", + }; + resultSetEqualTest( + "SELECT max_time(s1), last_value(s1), last_value(s2) FROM root.sg1.** where s2=false having avg(s3) > 1 align by device;", + expectedHeader, + retArray); + retArray = + new String[] { + "root.sg2.d1,2,2.2,false,", + "root.sg2.d2,1314000000001,13.15,false,", + "root.sg2.d3,1314000000002,13.16,false,", + "root.sg2.d4,5,5555.5,false,", + }; + resultSetEqualTest( + "SELECT max_time(s1), last_value(s1), last_value(s2) FROM root.sg2.** where s2=false having avg(s3) > 1 align by device;", + expectedHeader, + retArray); + + retArray = + new String[] { + "root.sg1.d1,2,2.2,false,", + "root.sg1.d2,1314000000001,13.15,false,", + "root.sg1.d3,1314000000002,13.16,false,", + }; + resultSetEqualTest( + "SELECT max_time(s1), last_value(s1), last_value(s2) FROM root.sg1.** where s2=false having count(s3)+count(s1) > 2 align by device;", + expectedHeader, + retArray); + retArray = + new String[] { + "root.sg2.d1,2,2.2,false,", + "root.sg2.d2,1314000000001,13.15,false,", + "root.sg2.d3,1314000000002,13.16,false,", + }; + resultSetEqualTest( + "SELECT max_time(s1), last_value(s1), last_value(s2) FROM root.sg2.** where s2=false having count(s3)+count(s1) > 2 align by device;", + expectedHeader, + retArray); + + retArray = + new String[] { + "root.sg2.d2,1314000000001,13.15,false,", + }; + resultSetEqualTest( + "SELECT max_time(s1), last_value(s1), last_value(s2) FROM root.sg2.** where s2=false having count(s3+s1) > 2 align by device;", + expectedHeader, + retArray); + } + + @Test + public void orderByTest() { + String[] expectedHeader = new String[] {"Time,Device,sum(s3)"}; + String[] retArray = + new String[] { + "4,root.sg1.d1,5.0,", + "4,root.sg1.d2,5.0,", + "4,root.sg1.d3,44.0,", + "4,root.sg1.d4,5555.0,", + "2,root.sg1.d1,2.0,", + }; + resultSetEqualTest( + "select sum(s3) from root.sg1.** where s1>1 GROUP BY([0, 10), 2ms) order by time desc offset 8 limit 5 align by device;", + expectedHeader, + retArray); + + expectedHeader = new String[] {"Time,Device,sum(s3)"}; + retArray = + new String[] { + "4,root.sg2.d1,5.0,", + "4,root.sg2.d2,5.0,", + "4,root.sg2.d3,44.0,", + "4,root.sg2.d4,5555.0,", + "2,root.sg2.d1,2.0,", + }; + resultSetEqualTest( + "select sum(s3) from root.sg2.** where s1>1 GROUP BY([0, 10), 2ms) order by time desc offset 8 limit 5 align by device;", + expectedHeader, + retArray); + + expectedHeader = new String[] {"Time,Device,sum(s3)"}; + retArray = + new String[] { + "0,root.sg1.d1,1.0,", + "2,root.sg1.d1,2.0,", + "4,root.sg1.d1,5.0,", + "0,root.sg1.d2,11.0,", + "2,root.sg1.d2,22.0,", + }; + resultSetEqualTest( + "select sum(s3) from root.sg1.** where s1>1 GROUP BY([0, 10), 2ms) order by count(s2) desc limit 5 align by device;", + expectedHeader, + retArray); + + expectedHeader = new String[] {"Time,Device,sum(s3)"}; + retArray = + new String[] { + "0,root.sg2.d1,1.0,", + "2,root.sg2.d1,2.0,", + "4,root.sg2.d1,5.0,", + "0,root.sg2.d2,11.0,", + "2,root.sg2.d2,22.0,", + }; + resultSetEqualTest( + "select sum(s3) from root.sg2.** where s1>1 GROUP BY([0, 10), 2ms) order by count(s2) desc limit 5 align by device;", + expectedHeader, + retArray); + + // order by non-existent measurement + assertTestFail( + "select sum(s3) from root.sg1.** where s1>1 order by count(s_null) desc limit 5 align by device;", + "count(s_null) in order by clause doesn't exist."); + assertTestFail( + "select sum(s3) from root.sg2.** where s1>1 order by count(s_null) desc limit 5 align by device;", + "count(s_null) in order by clause doesn't exist."); + } + + protected static void insertData() { + try (Connection connection = EnvFactory.getEnv().getConnection(); + Statement statement = connection.createStatement()) { + + for (String sql : sqls) { + statement.execute(sql); + } + } catch (Exception e) { + e.printStackTrace(); + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java index be5c98525bbf..5897f78b0ddd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java @@ -293,7 +293,7 @@ aggregation results last_value(temperature) and last_value(status), whereas buck private Template deviceTemplate; // when deviceTemplate is not empty and all expressions in this query are templated measurements, // i.e. no aggregation and arithmetic expression - private boolean onlyQueryTemplateMeasurements = true; + private boolean noWhereAndAggregation = true; // if it is wildcard query in templated align by device query private boolean templateWildCardQuery; // all queried measurementList and schemaList in deviceTemplate. @@ -437,8 +437,8 @@ public TSDataType getType(Expression expression) { return null; } - if (isAllDevicesInOneTemplate() - && (isOnlyQueryTemplateMeasurements() || expression instanceof TimeSeriesOperand)) { + if (allDevicesInOneTemplate() + && (noWhereAndAggregation() || expression instanceof TimeSeriesOperand)) { TimeSeriesOperand seriesOperand = (TimeSeriesOperand) expression; return deviceTemplate.getSchemaMap().get(seriesOperand.getPath().getMeasurement()).getType(); } @@ -921,7 +921,7 @@ public List getDeviceList() { // All Queries Devices Set In One Template ///////////////////////////////////////////////////////////////////////////////////////////////// - public boolean isAllDevicesInOneTemplate() { + public boolean allDevicesInOneTemplate() { return this.deviceTemplate != null; } @@ -933,12 +933,12 @@ public void setDeviceTemplate(Template template) { this.deviceTemplate = template; } - public boolean isOnlyQueryTemplateMeasurements() { - return onlyQueryTemplateMeasurements; + public boolean noWhereAndAggregation() { + return noWhereAndAggregation; } - public void setOnlyQueryTemplateMeasurements(boolean onlyQueryTemplateMeasurements) { - this.onlyQueryTemplateMeasurements = onlyQueryTemplateMeasurements; + public void setNoWhereAndAggregation(boolean value) { + this.noWhereAndAggregation = value; } public List getMeasurementList() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java index 3ed7bbfd169f..90908302cf1d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java @@ -209,7 +209,7 @@ public class AnalyzeVisitor extends StatementVisitor static final Expression DEVICE_EXPRESSION = TimeSeriesOperand.constructColumnHeaderExpression(DEVICE, TSDataType.TEXT); - static final Expression END_TIME_EXPRESSION = + public static final Expression END_TIME_EXPRESSION = TimeSeriesOperand.constructColumnHeaderExpression(ENDTIME, TSDataType.INT64); private final List lastQueryColumnNames = @@ -1904,7 +1904,8 @@ private void checkGroupByConditionExpressionType( && rightExpression instanceof ConstantOperand)) { throw new SemanticException( String.format( - "Please check the keep condition ([%s]),it need to be a constant or a compare expression constructed by 'keep' and a long number.", + "Please check the keep condition ([%s]), " + + "it need to be a constant or a compare expression constructed by 'keep' and a long number.", keepExpression.getExpressionString())); } return; @@ -1912,12 +1913,13 @@ private void checkGroupByConditionExpressionType( if (!(keepExpression instanceof ConstantOperand)) { throw new SemanticException( String.format( - "Please check the keep condition ([%s]),it need to be a constant or a compare expression constructed by 'keep' and a long number.", + "Please check the keep condition ([%s]), " + + "it need to be a constant or a compare expression constructed by 'keep' and a long number.", keepExpression.getExpressionString())); } } - private void analyzeGroupByTime(Analysis analysis, QueryStatement queryStatement) { + static void analyzeGroupByTime(Analysis analysis, QueryStatement queryStatement) { if (!queryStatement.isGroupByTime()) { return; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionTypeAnalyzer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionTypeAnalyzer.java index ed292c03870e..f5dbad88a827 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionTypeAnalyzer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionTypeAnalyzer.java @@ -65,7 +65,10 @@ private ExpressionTypeAnalyzer() {} public static TSDataType analyzeExpression(Analysis analysis, Expression expression) { if (!analysis.getExpressionTypes().containsKey(NodeRef.of(expression))) { ExpressionTypeAnalyzer analyzer = new ExpressionTypeAnalyzer(); - analyzer.analyze(expression, null); + + Map context = + analysis.allDevicesInOneTemplate() ? analysis.getDeviceTemplate().getSchemaMap() : null; + analyzer.analyze(expression, context); addExpressionTypes(analysis, analyzer); } @@ -96,7 +99,9 @@ public static void analyzeExpressionUsingTemplatedInfo( Expression expression, TemplatedInfo templatedInfo) { ExpressionTypeAnalyzer analyzer = new ExpressionTypeAnalyzer(); - analyzer.analyze(expression, templatedInfo.getSchemaMap()); + + Map schemaMap = templatedInfo.getSchemaMap(); + analyzer.analyze(expression, schemaMap); types.putAll(analyzer.getExpressionTypes()); } @@ -369,6 +374,7 @@ public TSDataType visitTimeSeriesOperand( return setExpressionType( timeSeriesOperand, context.get(timeSeriesOperand.getOutputSymbol()).getType()); } + return setExpressionType(timeSeriesOperand, timeSeriesOperand.getPath().getSeriesType()); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedAggregationAnalyze.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedAggregationAnalyze.java new file mode 100644 index 000000000000..3f6175908128 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedAggregationAnalyze.java @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.queryengine.plan.analyze; + +import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.db.exception.sql.SemanticException; +import org.apache.iotdb.db.queryengine.common.MPPQueryContext; +import org.apache.iotdb.db.queryengine.common.NodeRef; +import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree; +import org.apache.iotdb.db.queryengine.plan.expression.Expression; +import org.apache.iotdb.db.queryengine.plan.expression.leaf.ConstantOperand; +import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand; +import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimestampOperand; +import org.apache.iotdb.db.queryengine.plan.expression.multi.FunctionExpression; +import org.apache.iotdb.db.queryengine.plan.statement.component.ResultColumn; +import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement; +import org.apache.iotdb.db.schemaengine.template.Template; + +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.utils.Pair; +import org.apache.tsfile.write.schema.IMeasurementSchema; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; + +import static org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant.ENDTIME; +import static org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor.DEVICE_EXPRESSION; +import static org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor.analyzeExpressionType; +import static org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor.analyzeGroupByTime; +import static org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor.analyzeOutput; +import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionAnalyzer.normalizeExpression; +import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionAnalyzer.searchAggregationExpressions; +import static org.apache.iotdb.db.queryengine.plan.analyze.TemplatedAnalyze.analyzeDataPartition; +import static org.apache.iotdb.db.queryengine.plan.analyze.TemplatedAnalyze.analyzeDeviceToWhere; +import static org.apache.iotdb.db.queryengine.plan.analyze.TemplatedAnalyze.analyzeDeviceViewOutput; +import static org.apache.iotdb.db.queryengine.plan.analyze.TemplatedAnalyze.analyzeFrom; +import static org.apache.iotdb.db.queryengine.plan.optimization.LimitOffsetPushDown.canPushDownLimitOffsetInGroupByTimeForDevice; +import static org.apache.iotdb.db.queryengine.plan.optimization.LimitOffsetPushDown.pushDownLimitOffsetInGroupByTimeForDevice; + +/** Methods in this class are used for aggregation, templated with align by device situation. */ +public class TemplatedAggregationAnalyze { + + static boolean canBuildAggregationPlanUseTemplate( + Analysis analysis, + QueryStatement queryStatement, + IPartitionFetcher partitionFetcher, + ISchemaTree schemaTree, + MPPQueryContext context, + Template template) { + + // not support order by expression and non-aligned template + if (queryStatement.hasOrderByExpression() || !template.isDirectAligned()) { + return false; + } + + analysis.setNoWhereAndAggregation(false); + + List deviceList = analyzeFrom(queryStatement, schemaTree); + + if (canPushDownLimitOffsetInGroupByTimeForDevice(queryStatement)) { + // remove the device which won't appear in resultSet after limit/offset + deviceList = pushDownLimitOffsetInGroupByTimeForDevice(deviceList, queryStatement); + } + + List> outputExpressions = new ArrayList<>(); + boolean valid = analyzeSelect(queryStatement, analysis, outputExpressions, template); + if (!valid) { + analysis.setDeviceTemplate(null); + return false; + } + + analyzeDeviceToWhere(analysis, queryStatement); + if (deviceList.isEmpty()) { + analysis.setFinishQueryAfterAnalyze(true); + return true; + } + analysis.setDeviceList(deviceList); + + if (analysis.getWhereExpression() != null + && ConstantOperand.FALSE.equals(analysis.getWhereExpression())) { + analyzeOutput(analysis, queryStatement, outputExpressions); + analysis.setFinishQueryAfterAnalyze(true); + return true; + } + + valid = analyzeHaving(analysis, queryStatement); + if (!valid) { + analysis.setDeviceTemplate(null); + return false; + } + + analyzeDeviceToExpressions(analysis); + + analyzeDeviceViewOutput(analysis, queryStatement); + + // generate result set header according to output expressions + analyzeOutput(analysis, queryStatement, outputExpressions); + + analyzeGroupByTime(analysis, queryStatement); + context.generateGlobalTimeFilter(analysis); + + // fetch partition information + analyzeDataPartition(analysis, schemaTree, partitionFetcher, context.getGlobalTimeFilter()); + return true; + } + + private static boolean analyzeSelect( + QueryStatement queryStatement, + Analysis analysis, + List> outputExpressions, + Template template) { + LinkedHashSet selectExpressions = new LinkedHashSet<>(); + selectExpressions.add(DEVICE_EXPRESSION); + if (queryStatement.isOutputEndTime()) { + return false; + } + + analysis.setDeviceTemplate(template); + + ColumnPaginationController paginationController = + new ColumnPaginationController( + queryStatement.getSeriesLimit(), queryStatement.getSeriesOffset()); + + Set aggregationExpressions = new LinkedHashSet<>(); + for (ResultColumn resultColumn : queryStatement.getSelectComponent().getResultColumns()) { + if (paginationController.hasCurOffset()) { + paginationController.consumeOffset(); + } else if (paginationController.hasCurLimit()) { + Expression selectExpression = resultColumn.getExpression(); + outputExpressions.add(new Pair<>(selectExpression, resultColumn.getAlias())); + selectExpressions.add(selectExpression); + aggregationExpressions.add(selectExpression); + if (selectExpression instanceof FunctionExpression + && "count_time" + .equalsIgnoreCase(((FunctionExpression) selectExpression).getFunctionName())) { + analysis.getExpressionTypes().put(NodeRef.of(selectExpression), TSDataType.INT64); + ((FunctionExpression) selectExpression) + .setExpressions(Collections.singletonList(new TimestampOperand())); + } else { + analyzeExpressionType(analysis, selectExpression); + } + } else { + break; + } + } + + List measurementList = new ArrayList<>(); + List measurementSchemaList = new ArrayList<>(); + Set measurementSet = new HashSet<>(); + + if (queryStatement.isCountTimeAggregation()) { + measurementList = new ArrayList<>(template.getSchemaMap().keySet()); + measurementSchemaList = new ArrayList<>(template.getSchemaMap().values()); + } else { + int idx = 0; + for (Expression selectExpression : selectExpressions) { + idx++; + if (idx == 1 + || (idx == 2 && ENDTIME.equalsIgnoreCase(selectExpression.getOutputSymbol()))) { + continue; + } + + String measurement = selectExpression.getExpressions().get(0).getOutputSymbol(); + // not support agg(*), agg(s1+1) now + if (!template.getSchemaMap().containsKey(measurement)) { + return false; + } + + // for agg1(s1) + agg2(s1), only record s1 for one time + if (!measurementSet.contains(measurement)) { + measurementSet.add(measurement); + measurementList.add(measurement); + measurementSchemaList.add(template.getSchemaMap().get(measurement)); + } + } + } + + analysis.setMeasurementList(measurementList); + analysis.setMeasurementSchemaList(measurementSchemaList); + analysis.setAggregationExpressions(aggregationExpressions); + analysis.setOutputExpressions(outputExpressions); + analysis.setSelectExpressions(selectExpressions); + return true; + } + + private static boolean analyzeHaving(Analysis analysis, QueryStatement queryStatement) { + if (!queryStatement.hasHaving()) { + return true; + } + + Set measurementSet = new HashSet<>(analysis.getMeasurementList()); + Set aggregationExpressions = analysis.getAggregationExpressions(); + Expression havingExpression = queryStatement.getHavingCondition().getPredicate(); + for (Expression aggregationExpression : searchAggregationExpressions(havingExpression)) { + Expression normalizedAggregationExpression = normalizeExpression(aggregationExpression); + + // not support having agg(s1+s2) temporarily + if (!((normalizedAggregationExpression).getExpressions().get(0) + instanceof TimeSeriesOperand)) { + return false; + } + + String measurement = + normalizedAggregationExpression.getExpressions().get(0).getOutputSymbol(); + if (!measurementSet.contains(measurement)) { + // adapt this case: select agg(s1) from xx having agg(s3) + measurementSet.add(measurement); + analysis.getMeasurementList().add(measurement); + analysis + .getMeasurementSchemaList() + .add(analysis.getDeviceTemplate().getSchema(measurement)); + } + + analyzeExpressionType(analysis, aggregationExpression); + analyzeExpressionType(analysis, normalizedAggregationExpression); + + aggregationExpressions.add(aggregationExpression); + } + + TSDataType outputType = analyzeExpressionType(analysis, havingExpression); + if (outputType != TSDataType.BOOLEAN) { + throw new SemanticException( + String.format( + "The output type of the expression in HAVING clause should be BOOLEAN, actual data type: %s.", + outputType)); + } + analysis.setHavingExpression(havingExpression); + + return true; + } + + private static void analyzeDeviceToExpressions(Analysis analysis) { + analysis.setDeviceToSourceTransformExpressions(analysis.getDeviceToSelectExpressions()); + + analysis.setDeviceToSourceExpressions(analysis.getDeviceToSelectExpressions()); + analysis.setDeviceToOutputExpressions(analysis.getDeviceToSelectExpressions()); + + analysis.setDeviceToAggregationExpressions(analysis.getDeviceToSelectExpressions()); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedAnalyze.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedAnalyze.java index 87bea8c61a98..a1fcb4c18ee1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedAnalyze.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedAnalyze.java @@ -70,7 +70,9 @@ import static org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor.getTimePartitionSlotList; import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionAnalyzer.concatDeviceAndBindSchemaForExpression; import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionAnalyzer.getMeasurementExpression; +import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionAnalyzer.searchAggregationExpressions; import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionTypeAnalyzer.analyzeExpressionForTemplatedQuery; +import static org.apache.iotdb.db.queryengine.plan.analyze.TemplatedAggregationAnalyze.canBuildAggregationPlanUseTemplate; /** * This class provides accelerated implementation for multiple devices align by device query. This @@ -96,9 +98,7 @@ public static boolean canBuildPlanUseTemplate( IPartitionFetcher partitionFetcher, ISchemaTree schemaTree, MPPQueryContext context) { - if (queryStatement.isAggregationQuery() - || queryStatement.isGroupBy() - || queryStatement.isGroupByTime() + if (queryStatement.getGroupByComponent() != null || queryStatement.isSelectInto() || queryStatement.hasFill() || schemaTree.hasNormalTimeSeries()) { @@ -106,62 +106,66 @@ public static boolean canBuildPlanUseTemplate( } List