Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipe & Load: Enable SQL load with "load-with-mods" param & Fixed the bug that mods is not deleted in load tsFile when there are exceptions & Fixed the potential NPE in air gap agent close() method #14363

Open
wants to merge 17 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -816,6 +816,59 @@ public void testLoadWithMods() throws Exception {
}
}

@Test
public void testLoadWithoutMods() throws Exception {
final long writtenPoint1;
// device 0, device 1, sg 0
try (final TsFileGenerator generator =
new TsFileGenerator(new File(tmpDir, "1-0-0-0.tsfile"))) {
generator.registerTimeseries(
SchemaConfig.DEVICE_0,
Arrays.asList(
SchemaConfig.MEASUREMENT_00,
SchemaConfig.MEASUREMENT_01,
SchemaConfig.MEASUREMENT_02,
SchemaConfig.MEASUREMENT_03,
SchemaConfig.MEASUREMENT_04,
SchemaConfig.MEASUREMENT_05,
SchemaConfig.MEASUREMENT_06,
SchemaConfig.MEASUREMENT_07));
generator.registerAlignedTimeseries(
SchemaConfig.DEVICE_1,
Arrays.asList(
SchemaConfig.MEASUREMENT_10,
SchemaConfig.MEASUREMENT_11,
SchemaConfig.MEASUREMENT_12,
SchemaConfig.MEASUREMENT_13,
SchemaConfig.MEASUREMENT_14,
SchemaConfig.MEASUREMENT_15,
SchemaConfig.MEASUREMENT_16,
SchemaConfig.MEASUREMENT_17));
generator.generateData(SchemaConfig.DEVICE_0, 100000, PARTITION_INTERVAL / 10_000, false);
generator.generateData(SchemaConfig.DEVICE_1, 100000, PARTITION_INTERVAL / 10_000, true);
writtenPoint1 = generator.getTotalNumber();
generator.generateDeletion(SchemaConfig.DEVICE_0, 10);
}

try (final Connection connection = EnvFactory.getEnv().getConnection();
final Statement statement = connection.createStatement()) {

statement.execute(
String.format(
"load \"%s\" with ('database-level'='2', 'load-with-mods'='false')",
tmpDir.getAbsolutePath()));

try (final ResultSet resultSet =
statement.executeQuery("select count(*) from root.** group by level=1,2")) {
if (resultSet.next()) {
Assert.assertEquals(writtenPoint1, resultSet.getLong("count(root.sg.test_0.*.*)"));
} else {
Assert.fail("This ResultSet is empty.");
}
}
}
}

@Test
public void testLoadWithEmptyTsFile() throws Exception {
try (final TsFileGenerator ignored = new TsFileGenerator(new File(tmpDir, "1-0-0-0.tsfile"))) {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -84,7 +85,9 @@ public void start() throws StartupException {
@Override
public void stop() {
try {
serverSocket.close();
if (Objects.nonNull(serverSocket)) {
serverSocket.close();
}
} catch (final IOException e) {
LOGGER.warn("Failed to close IoTDBAirGapReceiverAgent's server socket", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
import org.apache.iotdb.db.storageengine.load.LoadTsFileManager;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.rpc.TSStatusCode;

import org.apache.commons.io.FileUtils;
import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -185,7 +185,7 @@ public Optional<TSStatus> visitLoadFile(
}

if (loadTsFileStatement.isDeleteAfterLoad()) {
loadTsFileStatement.getTsFiles().forEach(FileUtils::deleteQuietly);
loadTsFileStatement.getTsFiles().forEach(LoadTsFileManager::cleanTsFile);
}

LOGGER.warn(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
import org.apache.iotdb.db.storageengine.load.LoadTsFileManager;
import org.apache.iotdb.rpc.TSStatusCode;

import org.apache.commons.io.FileUtils;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.write.record.Tablet;
import org.slf4j.Logger;
Expand Down Expand Up @@ -150,7 +150,7 @@ file, new IoTDBTreePattern(null), Long.MIN_VALUE, Long.MAX_VALUE, null, null)) {
}

if (loadTsFileStatement.isDeleteAfterLoad()) {
loadTsFileStatement.getTsFiles().forEach(FileUtils::deleteQuietly);
loadTsFileStatement.getTsFiles().forEach(LoadTsFileManager::cleanTsFile);
}

LOGGER.warn(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public synchronized PipeMemoryBlock forceAllocate(long sizeInBytes)
return forceAllocate(sizeInBytes, PipeMemoryBlockType.NORMAL);
}

public PipeTabletMemoryBlock forceAllocateForTabletWithRetry(long tabletSizeInBytes)
public PipeTabletMemoryBlock forceAllocateForTabletWithRetry(final long tabletSizeInBytes)
throws PipeRuntimeOutOfMemoryCriticalException {
if (!PIPE_MEMORY_MANAGEMENT_ENABLED) {
// No need to calculate the tablet size, skip it to save time
Expand All @@ -104,7 +104,7 @@ public PipeTabletMemoryBlock forceAllocateForTabletWithRetry(long tabletSizeInBy

try {
Thread.sleep(MEMORY_ALLOCATE_RETRY_INTERVAL_IN_MS);
} catch (InterruptedException ex) {
} catch (final InterruptedException ex) {
Thread.currentThread().interrupt();
LOGGER.warn("forceAllocateWithRetry: interrupted while waiting for available memory", ex);
}
Expand All @@ -127,7 +127,7 @@ public PipeTabletMemoryBlock forceAllocateForTabletWithRetry(long tabletSizeInBy
}
}

public PipeTsFileMemoryBlock forceAllocateForTsFileWithRetry(long tsFileSizeInBytes)
public PipeTsFileMemoryBlock forceAllocateForTsFileWithRetry(final long tsFileSizeInBytes)
throws PipeRuntimeOutOfMemoryCriticalException {
if (!PIPE_MEMORY_MANAGEMENT_ENABLED) {
return new PipeTsFileMemoryBlock(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2999,7 +2999,8 @@ public Analysis visitPipeEnrichedStatement(
}

@Override
public Analysis visitLoadFile(LoadTsFileStatement loadTsFileStatement, MPPQueryContext context) {
public Analysis visitLoadFile(
final LoadTsFileStatement loadTsFileStatement, final MPPQueryContext context) {
context.setQueryType(QueryType.WRITE);

final long startTime = System.nanoTime();
Expand All @@ -3023,7 +3024,7 @@ public Analysis visitLoadFile(LoadTsFileStatement loadTsFileStatement, MPPQueryC
}

private LoadTsFileAnalyzer getAnalyzer(
LoadTsFileStatement loadTsFileStatement, MPPQueryContext context) {
final LoadTsFileStatement loadTsFileStatement, final MPPQueryContext context) {
if (Objects.equals(loadTsFileStatement.getModel(), LoadTsFileConfigurator.MODEL_TREE_VALUE)) {
// Load to tree-model
return new LoadTsFileToTreeModelAnalyzer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public abstract class LoadTsFileAnalyzer implements AutoCloseable {
protected final boolean isConvertOnTypeMismatch;

protected final boolean isAutoCreateDatabase;

protected final boolean isLoadWithMods;
protected final int databaseLevel;

protected final String database;
Expand All @@ -93,6 +93,7 @@ public abstract class LoadTsFileAnalyzer implements AutoCloseable {
this.isDeleteAfterLoad = loadTsFileStatement.isDeleteAfterLoad();
this.isConvertOnTypeMismatch = loadTsFileStatement.isConvertOnTypeMismatch();
this.isAutoCreateDatabase = loadTsFileStatement.isAutoCreateDatabase();
this.isLoadWithMods = loadTsFileStatement.isLoadWithMods();
this.databaseLevel = loadTsFileStatement.getDatabaseLevel();
this.database = loadTsFileStatement.getDatabase();

Expand All @@ -103,7 +104,9 @@ public abstract class LoadTsFileAnalyzer implements AutoCloseable {
}

LoadTsFileAnalyzer(
LoadTsFile loadTsFileTableStatement, boolean isGeneratedByPipe, MPPQueryContext context) {
final LoadTsFile loadTsFileTableStatement,
final boolean isGeneratedByPipe,
final MPPQueryContext context) {
this.loadTsFileTableStatement = loadTsFileTableStatement;
this.tsFiles = loadTsFileTableStatement.getTsFiles();
this.statementString = loadTsFileTableStatement.toString();
Expand All @@ -113,16 +116,17 @@ public abstract class LoadTsFileAnalyzer implements AutoCloseable {
this.isAutoCreateDatabase = loadTsFileTableStatement.isAutoCreateDatabase();
this.databaseLevel = loadTsFileTableStatement.getDatabaseLevel();
this.database = loadTsFileTableStatement.getDatabase();
this.isLoadWithMods = loadTsFileTableStatement.isLoadWithMods();

this.loadTsFileTreeStatement = null;
this.isTableModelStatement = true;
this.isGeneratedByPipe = isGeneratedByPipe;
this.context = context;
}

public abstract IAnalysis analyzeFileByFile(IAnalysis analysis);
public abstract IAnalysis analyzeFileByFile(final IAnalysis analysis);

protected boolean doAnalyzeFileByFile(IAnalysis analysis) {
protected boolean doAnalyzeFileByFile(final IAnalysis analysis) {
// analyze tsfile metadata file by file
for (int i = 0, tsfileNum = tsFiles.size(); i < tsfileNum; i++) {
final File tsFile = tsFiles.get(i);
Expand All @@ -146,22 +150,22 @@ protected boolean doAnalyzeFileByFile(IAnalysis analysis) {
"Load - Analysis Stage: {}/{} tsfiles have been analyzed, progress: {}%",
i + 1, tsfileNum, String.format("%.3f", (i + 1) * 100.00 / tsfileNum));
}
} catch (AuthException e) {
} catch (final AuthException e) {
setFailAnalysisForAuthException(analysis, e);
return false;
} catch (VerifyMetadataTypeMismatchException e) {
executeDataTypeConversionOnTypeMismatch(analysis, e);
// just return false to STOP the analysis process,
// the real result on the conversion will be set in the analysis.
return false;
} catch (BufferUnderflowException e) {
} catch (final BufferUnderflowException e) {
LOGGER.warn(
"The file {} is not a valid tsfile. Please check the input file.", tsFile.getPath(), e);
throw new SemanticException(
String.format(
"The file %s is not a valid tsfile. Please check the input file.",
tsFile.getPath()));
} catch (Exception e) {
} catch (final Exception e) {
final String exceptionMessage =
String.format(
"Loading file %s failed. Detail: %s",
Expand Down Expand Up @@ -237,7 +241,7 @@ protected void addTsFileResource(TsFileResource tsFileResource) {
}
}

protected void addWritePointCount(long writePointCount) {
protected void addWritePointCount(final long writePointCount) {
if (isTableModelStatement) {
loadTsFileTableStatement.addWritePointCount(writePointCount);
} else {
Expand All @@ -262,19 +266,19 @@ protected int getDatabaseLevel() {
}

protected long getWritePointCount(
Map<IDeviceID, List<TimeseriesMetadata>> device2TimeseriesMetadata) {
return device2TimeseriesMetadata.values().stream()
final Map<IDeviceID, List<TimeseriesMetadata>> device2TimeSeriesMetadata) {
return device2TimeSeriesMetadata.values().stream()
.flatMap(List::stream)
.mapToLong(t -> t.getStatistics().getCount())
.sum();
}

protected void setFailAnalysisForAuthException(IAnalysis analysis, AuthException e) {
protected void setFailAnalysisForAuthException(final IAnalysis analysis, final AuthException e) {
analysis.setFinishQueryAfterAnalyze(true);
analysis.setFailStatus(RpcUtils.getStatus(e.getCode(), e.getMessage()));
}

protected void checkBeforeAnalyzeFileByFile(IAnalysis analysis) {
protected void checkBeforeAnalyzeFileByFile(final IAnalysis analysis) {
if (TSFileDescriptor.getInstance().getConfig().getEncryptFlag()) {
analysis.setFinishQueryAfterAnalyze(true);
analysis.setFailStatus(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
import org.apache.iotdb.db.storageengine.dataregion.utils.TsFileResourceUtils;
import org.apache.iotdb.db.storageengine.load.LoadTsFileManager;
import org.apache.iotdb.db.utils.TimestampPrecisionUtils;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;

import com.google.common.util.concurrent.ListenableFuture;
import org.apache.commons.io.FileUtils;
import org.apache.tsfile.encrypt.EncryptParameter;
import org.apache.tsfile.encrypt.EncryptUtils;
import org.apache.tsfile.file.metadata.IDeviceID;
Expand Down Expand Up @@ -90,7 +90,7 @@ public LoadTsFileToTableModelAnalyzer(
}

@Override
public IAnalysis analyzeFileByFile(IAnalysis analysis) {
public IAnalysis analyzeFileByFile(final IAnalysis analysis) {
checkBeforeAnalyzeFileByFile(analysis);
if (analysis.isFinishQueryAfterAnalyze()) {
return analysis;
Expand Down Expand Up @@ -137,7 +137,7 @@ protected void analyzeSingleTsFile(final File tsFile)
}

// check whether the encrypt type of the tsfile is supported
EncryptParameter param = reader.getEncryptParam();
final EncryptParameter param = reader.getEncryptParam();
if (!Objects.equals(param.getType(), EncryptUtils.encryptParam.getType())
|| !Arrays.equals(param.getKey(), EncryptUtils.encryptParam.getKey())) {
throw new SemanticException("The encryption way of the TsFile is not supported.");
Expand All @@ -149,7 +149,7 @@ protected void analyzeSingleTsFile(final File tsFile)
schemaCache.setDatabase(database);
schemaCache.setCurrentModificationsAndTimeIndex(tsFileResource, reader);

for (Map.Entry<String, org.apache.tsfile.file.metadata.TableSchema> name2Schema :
for (final Map.Entry<String, org.apache.tsfile.file.metadata.TableSchema> name2Schema :
reader.readFileMetadata().getTableSchemaMap().entrySet()) {
final TableSchema fileSchema =
TableSchema.fromTsFileTableSchema(name2Schema.getKey(), name2Schema.getValue());
Expand All @@ -162,7 +162,7 @@ protected void analyzeSingleTsFile(final File tsFile)
final Map<IDeviceID, List<TimeseriesMetadata>> device2TimeseriesMetadata =
timeseriesMetadataIterator.next();

for (IDeviceID deviceId : device2TimeseriesMetadata.keySet()) {
for (final IDeviceID deviceId : device2TimeseriesMetadata.keySet()) {
schemaCache.autoCreateAndVerify(deviceId);
}

Expand All @@ -184,7 +184,7 @@ protected void analyzeSingleTsFile(final File tsFile)
} catch (final LoadEmptyFileException loadEmptyFileException) {
LOGGER.warn("Failed to load empty file: {}", tsFile.getAbsolutePath());
if (isDeleteAfterLoad) {
FileUtils.deleteQuietly(tsFile);
LoadTsFileManager.cleanTsFile(tsFile);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
import org.apache.iotdb.db.storageengine.dataregion.utils.TsFileResourceUtils;
import org.apache.iotdb.db.storageengine.load.LoadTsFileManager;
import org.apache.iotdb.db.utils.TimestampPrecisionUtils;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;

import org.apache.commons.io.FileUtils;
import org.apache.tsfile.encrypt.EncryptParameter;
import org.apache.tsfile.encrypt.EncryptUtils;
import org.apache.tsfile.file.metadata.IDeviceID;
Expand Down Expand Up @@ -70,7 +70,7 @@ public LoadTsFileToTreeModelAnalyzer(
}

@Override
public IAnalysis analyzeFileByFile(IAnalysis analysis) {
public IAnalysis analyzeFileByFile(final IAnalysis analysis) {
checkBeforeAnalyzeFileByFile(analysis);
if (analysis.isFinishQueryAfterAnalyze()) {
return analysis;
Expand All @@ -83,13 +83,13 @@ public IAnalysis analyzeFileByFile(IAnalysis analysis) {

try {
schemaAutoCreatorAndVerifier.flush();
} catch (AuthException e) {
} catch (final AuthException e) {
setFailAnalysisForAuthException(analysis, e);
return analysis;
} catch (VerifyMetadataTypeMismatchException e) {
executeDataTypeConversionOnTypeMismatch(analysis, e);
return analysis;
} catch (Exception e) {
} catch (final Exception e) {
final String exceptionMessage =
String.format(
"Auto create or verify schema error when executing statement %s. Detail: %s.",
Expand Down Expand Up @@ -173,7 +173,7 @@ protected void analyzeSingleTsFile(final File tsFile)
} catch (final LoadEmptyFileException loadEmptyFileException) {
LOGGER.warn("Failed to load empty file: {}", tsFile.getAbsolutePath());
if (isDeleteAfterLoad) {
FileUtils.deleteQuietly(tsFile);
LoadTsFileManager.cleanTsFile(tsFile);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ private void doAutoCreateAndVerify()
if (loadTsFileAnalyzer.isVerifySchema()) {
verifySchema(schemaTree);
}
} catch (AuthException e) {
} catch (final AuthException e) {
throw e;
} catch (VerifyMetadataTypeMismatchException e) {
if (loadTsFileAnalyzer.isConvertOnTypeMismatch()) {
Expand All @@ -243,7 +243,7 @@ private void doAutoCreateAndVerify()
} else {
handleException(e, loadTsFileAnalyzer.getStatementString());
}
} catch (Exception e) {
} catch (final Exception e) {
handleException(e, loadTsFileAnalyzer.getStatementString());
}
}
Expand Down
Loading
Loading