Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IOTDB-6355] Fix query scan will return duplicated timestamp or unordered timestamp while TsFileResource degrading #14458

Merged
merged 1 commit into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,12 @@ public CommonConfig setPipeConnectorRequestSliceThresholdBytes(
return this;
}

@Override
public CommonConfig setQueryMemoryProportion(String queryMemoryProportion) {
setProperty("chunk_timeseriesmeta_free_memory_proportion", queryMemoryProportion);
return this;
}

// For part of the log directory
public String getClusterConfigStr() {
return fromConsensusFullNameToAbbr(properties.getProperty(CONFIG_NODE_CONSENSUS_PROTOCOL_CLASS))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -519,4 +519,11 @@ public CommonConfig setPipeConnectorRequestSliceThresholdBytes(
cnConfig.setPipeConnectorRequestSliceThresholdBytes(pipeConnectorRequestSliceThresholdBytes);
return this;
}

@Override
public CommonConfig setQueryMemoryProportion(String queryMemoryProportion) {
dnConfig.setQueryMemoryProportion(queryMemoryProportion);
cnConfig.setQueryMemoryProportion(queryMemoryProportion);
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -365,4 +365,9 @@ public CommonConfig setPipeConnectorRequestSliceThresholdBytes(
int pipeConnectorRequestSliceThresholdBytes) {
return this;
}

@Override
public CommonConfig setQueryMemoryProportion(String queryMemoryProportion) {
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -161,4 +161,6 @@ CommonConfig setPipeHeartbeatIntervalSecondsForCollectingPipeMeta(

CommonConfig setPipeConnectorRequestSliceThresholdBytes(
int pipeConnectorRequestSliceThresholdBytes);

CommonConfig setQueryMemoryProportion(String queryMemoryProportion);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.db.it;

import org.apache.iotdb.it.env.EnvFactory;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.ClusterIT;
import org.apache.iotdb.itbase.category.LocalStandaloneIT;

import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Locale;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;

@RunWith(IoTDBTestRunner.class)
@Category({LocalStandaloneIT.class, ClusterIT.class})
public class IoTDBFileTimeIndexIT {

private static final String[] sqls =
new String[] {
"insert into root.db.d1(time,s1) values(2,2)",
"insert into root.db.d1(time,s1) values(3,3)",
"flush",
"insert into root.db.d2(time,s1) values(5,5)",
"flush",
"insert into root.db.d1(time,s1) values(4,4)",
"flush",
"insert into root.db.d2(time,s1) values(1,1)",
"insert into root.db.d1(time,s1) values(3,30)",
"insert into root.db.d1(time,s1) values(4,40)",
"flush",
"insert into root.db.d2(time,s1) values(2,2)",
"insert into root.db.d1(time,s1) values(4,400)",
"flush",
};

@BeforeClass
public static void setUp() throws Exception {
Locale.setDefault(Locale.ENGLISH);

EnvFactory.getEnv()
.getConfig()
.getCommonConfig()
.setDataRegionGroupExtensionPolicy("CUSTOM")
.setDefaultDataRegionGroupNumPerDatabase(1)
.setEnableSeqSpaceCompaction(false)
.setEnableUnseqSpaceCompaction(false)
.setEnableCrossSpaceCompaction(false)
.setQueryMemoryProportion("1:100:200:50:200:200:0:250");
// Adjust memstable threshold size to make it flush automatically
EnvFactory.getEnv().initClusterEnvironment();
prepareData();
}

private static void prepareData() {
try (Connection connection = EnvFactory.getEnv().getConnection();
Statement statement = connection.createStatement()) {

for (String sql : sqls) {
statement.addBatch(sql);
}
statement.executeBatch();
} catch (Exception e) {
fail(e.getMessage());
}
}

@AfterClass
public static void tearDown() throws Exception {
EnvFactory.getEnv().cleanClusterEnvironment();
}

@Test
public void testQuery() throws SQLException {
long[] time = {2L, 3L, 4L};
double[] value = {2.0f, 30.0f, 400.0f};

try (Connection connection = EnvFactory.getEnv().getConnection();
Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery("select s1 from root.db.d1")) {
int cnt = 0;
while (resultSet.next()) {
assertEquals(time[cnt], resultSet.getLong(1));
assertEquals(value[cnt], resultSet.getDouble(2), 0.00001);
cnt++;
}
assertEquals(time.length, cnt);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.PriorityQueue;
import java.util.function.ToLongFunction;

Expand Down Expand Up @@ -1158,17 +1159,27 @@ private void unpackAllOverlappedTsFilesToTimeSeriesMetadata(long endpointTime)
unpackUnseqTsFileResource();
}
while (orderUtils.hasNextSeqResource() && orderUtils.isCurSeqOverlappedWith(endpointTime)) {
unpackSeqTsFileResource();
Optional<ITimeSeriesMetadata> timeSeriesMetadata = unpackSeqTsFileResource();
// asc: if current seq tsfile's endTime >= endpointTime, we don't need to continue
// desc: if current seq tsfile's startTime <= endpointTime, we don't need to continue
if (timeSeriesMetadata.isPresent()
&& orderUtils.overlappedSeqResourceSearchingNeedStop(
endpointTime, timeSeriesMetadata.get().getStatistics())) {
break;
}
}
}

private void unpackSeqTsFileResource() throws IOException {
private Optional<ITimeSeriesMetadata> unpackSeqTsFileResource() throws IOException {
ITimeSeriesMetadata timeseriesMetadata =
loadTimeSeriesMetadata(orderUtils.getNextSeqFileResource(true), true);
// skip if data type is mismatched which may be caused by delete
if (timeseriesMetadata != null && timeseriesMetadata.typeMatch(getTsDataTypeList())) {
timeseriesMetadata.setSeq(true);
seqTimeSeriesMetadata.add(timeseriesMetadata);
return Optional.of(timeseriesMetadata);
} else {
return Optional.empty();
}
}

Expand Down Expand Up @@ -1336,6 +1347,9 @@ boolean isTakeSeqAsFirst(
TsFileResource getNextUnseqFileResource(boolean isDelete);

void setCurSeqFileIndex(QueryDataSource dataSource);

boolean overlappedSeqResourceSearchingNeedStop(
long endPointTime, Statistics<? extends Object> currentStatistics);
}

class DescTimeOrderUtils implements TimeOrderUtils {
Expand Down Expand Up @@ -1454,6 +1468,12 @@ public TsFileResource getNextUnseqFileResource(boolean isDelete) {
public void setCurSeqFileIndex(QueryDataSource dataSource) {
curSeqFileIndex = dataSource.getSeqResourcesSize() - 1;
}

@Override
public boolean overlappedSeqResourceSearchingNeedStop(
long endPointTime, Statistics<?> currentStatistics) {
return currentStatistics.getStartTime() <= endPointTime;
}
}

class AscTimeOrderUtils implements TimeOrderUtils {
Expand Down Expand Up @@ -1572,6 +1592,12 @@ public TsFileResource getNextUnseqFileResource(boolean isDelete) {
public void setCurSeqFileIndex(QueryDataSource dataSource) {
curSeqFileIndex = 0;
}

@Override
public boolean overlappedSeqResourceSearchingNeedStop(
long endPointTime, Statistics<?> currentStatistics) {
return currentStatistics.getEndTime() >= endPointTime;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@ public class QueryDataSource implements IQueryDataSource {

private int curSeqIndex = -1;

// asc: startTime; desc: endTime
// asc: startTime, will be Long.MIN_VALUE if current tsfile resource is degraded
// desc: endTime, will be Long.MAX_VALUE if current tsfile resource is degraded
// if current tsfile resource is degraded, it will always be considered to be overlapping with
// current point
private long curSeqOrderTime = 0;

private Boolean curSeqSatisfied = null;
Expand Down Expand Up @@ -109,7 +112,7 @@ public boolean hasNextSeqResource(int curIndex, boolean ascending, IDeviceID dev
boolean res = ascending ? curIndex < seqResources.size() : curIndex >= 0;
if (res && curIndex != this.curSeqIndex) {
this.curSeqIndex = curIndex;
this.curSeqOrderTime = seqResources.get(curIndex).getOrderTime(deviceID, ascending);
this.curSeqOrderTime = seqResources.get(curIndex).getOrderTimeForSeq(deviceID, ascending);
this.curSeqSatisfied = null;
}
return res;
Expand Down Expand Up @@ -151,7 +154,9 @@ public boolean hasNextUnseqResource(int curIndex, boolean ascending, IDeviceID d
if (res && curIndex != this.curUnSeqIndex) {
this.curUnSeqIndex = curIndex;
this.curUnSeqOrderTime =
unseqResources.get(unSeqFileOrderIndex[curIndex]).getOrderTime(deviceID, ascending);
unseqResources
.get(unSeqFileOrderIndex[curIndex])
.getOrderTimeForUnseq(deviceID, ascending);
this.curUnSeqSatisfied = null;
}
return res;
Expand Down Expand Up @@ -208,7 +213,8 @@ public void fillOrderIndexes(IDeviceID deviceId, boolean ascending) {
int index = 0;
for (TsFileResource resource : unseqResources) {
orderTimeToIndexMap
.computeIfAbsent(resource.getOrderTime(deviceId, ascending), key -> new ArrayList<>())
.computeIfAbsent(
resource.getOrderTimeForUnseq(deviceId, ascending), key -> new ArrayList<>())
.add(index++);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -601,7 +601,17 @@ public long getEndTime(IDeviceID deviceId) {
}
}

public long getOrderTime(IDeviceID deviceId, boolean ascending) {
// cannot use FileTimeIndex
public long getOrderTimeForSeq(IDeviceID deviceId, boolean ascending) {
if (timeIndex instanceof ArrayDeviceTimeIndex) {
return ascending ? getStartTime(deviceId) : getEndTime(deviceId);
} else {
return ascending ? Long.MIN_VALUE : Long.MAX_VALUE;
}
}

// can use FileTimeIndex
public long getOrderTimeForUnseq(IDeviceID deviceId, boolean ascending) {
return ascending ? getStartTime(deviceId) : getEndTime(deviceId);
}

Expand Down
Loading