Skip to content

Commit

Permalink
Pipe: Unify the case of the table model database and fix the incorrec…
Browse files Browse the repository at this point in the history
…t use of iterators in Load File (#14718)
  • Loading branch information
luoluoyuyu authored Jan 18, 2025
1 parent 141b7ba commit a011b01
Show file tree
Hide file tree
Showing 7 changed files with 26 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ public void testWriteBackSink() throws Exception {
extractorAttributes.put("extractor.table-name", "test.*");

processorAttributes.put("processor", "rename-database-processor");
processorAttributes.put("processor.new-db-name", "test1");
processorAttributes.put("processor.new-db-name", "Test1");

connectorAttributes.put("connector", "write-back-sink");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,9 @@ private void autoCreateDatabaseIfNecessary(final String database) {
final ListenableFuture<ConfigTaskResult> future =
task.execute(ClusterConfigTaskExecutor.getInstance());
final ConfigTaskResult result = future.get();
if (result.getStatusCode().getStatusCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
final int statusCode = result.getStatusCode().getStatusCode();
if (statusCode != TSStatusCode.SUCCESS_STATUS.getStatusCode()
&& statusCode != TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()) {
throw new PipeException(
String.format(
"Auto create database failed: %s, status code: %s",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.utils.PathUtils;

import javax.validation.constraints.NotNull;

public abstract class PipeInsertionEvent extends EnrichedEvent {

private Boolean isTableModelEvent; // lazy initialization
Expand All @@ -46,7 +48,9 @@ protected PipeInsertionEvent(
super(pipeName, creationTime, pipeTaskMeta, treePattern, tablePattern, startTime, endTime);
this.isTableModelEvent = isTableModelEvent;
this.treeModelDatabaseName = treeModelDatabaseName;
this.tableModelDatabaseName = tableModelDatabaseName;
if (tableModelDatabaseName != null) {
this.tableModelDatabaseName = tableModelDatabaseName.toLowerCase();
}
}

protected PipeInsertionEvent(
Expand Down Expand Up @@ -102,10 +106,10 @@ public String getTableModelDatabaseName() {
: tableModelDatabaseName;
}

public void renameTableModelDatabase(final String tableModelDatabaseName) {
public void renameTableModelDatabase(@NotNull final String tableModelDatabaseName) {
// Please note that if you parse TsFile, you need to use TreeModelDatabaseName, so you need to
// rename TreeModelDatabaseName as well.
this.tableModelDatabaseName = tableModelDatabaseName;
this.tableModelDatabaseName = tableModelDatabaseName.toLowerCase();
this.treeModelDatabaseName = "root." + tableModelDatabaseName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,10 @@ public class RenameDatabaseProcessor implements PipeProcessor {
@Override
public void validate(PipeParameterValidator validator) throws Exception {
validator.validateRequiredAttribute(PROCESSOR_RENAME_DATABASE_NEW_DB_NAME);
newDatabaseName = validator.getParameters().getString(PROCESSOR_RENAME_DATABASE_NEW_DB_NAME);

try {
TableConfigTaskVisitor.validateDatabaseName(newDatabaseName);
TableConfigTaskVisitor.validateDatabaseName(
validator.getParameters().getString(PROCESSOR_RENAME_DATABASE_NEW_DB_NAME));
} catch (final Exception e) {
throw new PipeException(
String.format(
Expand All @@ -66,7 +67,9 @@ public void validate(PipeParameterValidator validator) throws Exception {
@Override
public void customize(PipeParameters parameters, PipeProcessorRuntimeConfiguration configuration)
throws Exception {
// Do nothing
// In order to ensure that the database data is not case sensitive, it is necessary to convert
// it to lowercase.
newDatabaseName = parameters.getString(PROCESSOR_RENAME_DATABASE_NEW_DB_NAME).toLowerCase();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -951,7 +951,9 @@ private void autoCreateDatabaseIfNecessary(final String database) {
final ListenableFuture<ConfigTaskResult> future =
task.execute(ClusterConfigTaskExecutor.getInstance());
final ConfigTaskResult result = future.get();
if (result.getStatusCode().getStatusCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
final int statusCode = result.getStatusCode().getStatusCode();
if (statusCode != TSStatusCode.SUCCESS_STATUS.getStatusCode()
&& statusCode != TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()) {
throw new PipeException(
String.format(
"Auto create database failed: %s, status code: %s",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -556,7 +557,9 @@ private void convertFailedTsFilesToTabletsAndRetry() {
final LoadTsFileDataTypeConverter loadTsFileDataTypeConverter =
new LoadTsFileDataTypeConverter(isGeneratedByPipe);

for (final int failedLoadTsFileIndex : failedTsFileNodeIndexes) {
final Iterator<Integer> iterator = failedTsFileNodeIndexes.listIterator();
while (iterator.hasNext()) {
final int failedLoadTsFileIndex = iterator.next();
final LoadSingleTsFileNode failedNode = tsFileNodeList.get(failedLoadTsFileIndex);
final String filePath = failedNode.getTsFileResource().getTsFilePath();

Expand All @@ -573,7 +576,7 @@ private void convertFailedTsFilesToTabletsAndRetry() {
.orElse(null);

if (loadTsFileDataTypeConverter.isSuccessful(status)) {
failedTsFileNodeIndexes.remove(failedLoadTsFileIndex);
iterator.remove();
LOGGER.info(
"Load: Successfully converted TsFile {} into tablets and inserted.",
failedNode.getTsFileResource().getTsFilePath());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ public static String qualifyDatabaseName(String databaseName) {

public static String unQualifyDatabaseName(String databaseName) {
if (databaseName != null && databaseName.startsWith("root.")) {
databaseName = databaseName.substring(5);
databaseName = databaseName.substring(5).toLowerCase();
}
return databaseName;
}
Expand Down

0 comments on commit a011b01

Please sign in to comment.