Skip to content

Commit

Permalink
[bugfix](hive)fix after insert overwrite hive table, data error (apac…
Browse files Browse the repository at this point in the history
…he#43049)

### What problem does this PR solve?

1. Different remoteFs should correspond to different nativeFs.
2. If it is s3,  we do not need to delete the stage directory.
3. When an error occurs when deleting a directory, we need to roll back.
  • Loading branch information
wuwenchi authored Nov 2, 2024
1 parent 2ad6cb7 commit c3507f5
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public class HMSTransaction implements Transaction {
private final Executor fileSystemExecutor;
private HmsCommitter hmsCommitter;
private List<THivePartitionUpdate> hivePartitionUpdates = Lists.newArrayList();
private String declaredIntentionsToWrite;
private Optional<String> stagingDirectory;
private boolean isMockedPartitionUpdate = false;

private static class UncompletedMpuPendingUpload {
Expand Down Expand Up @@ -184,10 +184,14 @@ public void rollback() {
}

public void beginInsertTable(HiveInsertCommandContext ctx) {
declaredIntentionsToWrite = ctx.getWritePath();
queryId = ctx.getQueryId();
isOverwrite = ctx.isOverwrite();
fileType = ctx.getFileType();
if (fileType == TFileType.FILE_S3) {
stagingDirectory = Optional.empty();
} else {
stagingDirectory = Optional.of(ctx.getWritePath());
}
}

public void finishInsertTable(SimpleTableInfo tableInfo) {
Expand All @@ -207,10 +211,12 @@ public void finishInsertTable(SimpleTableInfo tableInfo) {
}
});
} else {
fs.makeDir(declaredIntentionsToWrite);
setLocation(new THiveLocationParams() {{
setWritePath(declaredIntentionsToWrite);
}
stagingDirectory.ifPresent((v) -> {
fs.makeDir(v);
setLocation(new THiveLocationParams() {{
setWritePath(v);
}
});
});
}
}
Expand Down Expand Up @@ -643,15 +649,23 @@ private void recursiveDeleteItems(Path directory, boolean deleteEmptyDir, boolea
if (!deleteResult.getNotDeletedEligibleItems().isEmpty()) {
LOG.warn("Failed to delete directory {}. Some eligible items can't be deleted: {}.",
directory.toString(), deleteResult.getNotDeletedEligibleItems());
throw new RuntimeException(
"Failed to delete directory for files: " + deleteResult.getNotDeletedEligibleItems());
} else if (deleteEmptyDir && !deleteResult.dirNotExists()) {
LOG.warn("Failed to delete directory {} due to dir isn't empty", directory.toString());
throw new RuntimeException("Failed to delete directory for empty dir: " + directory.toString());
}
}

private DeleteRecursivelyResult recursiveDeleteFiles(Path directory, boolean deleteEmptyDir, boolean reverse) {
try {
if (!fs.directoryExists(directory.toString()).ok()) {
Status status = fs.directoryExists(directory.toString());
if (status.getErrCode().equals(Status.ErrCode.NOT_FOUND)) {
return new DeleteRecursivelyResult(true, ImmutableList.of());
} else if (!status.ok()) {
ImmutableList.Builder<String> notDeletedEligibleItems = ImmutableList.builder();
notDeletedEligibleItems.add(directory.toString() + "/*");
return new DeleteRecursivelyResult(false, notDeletedEligibleItems.build());
}
} catch (Exception e) {
ImmutableList.Builder<String> notDeletedEligibleItems = ImmutableList.builder();
Expand Down Expand Up @@ -1447,7 +1461,7 @@ private void doUpdateStatisticsTasks() {
}

private void pruneAndDeleteStagingDirectories() {
recursiveDeleteItems(new Path(declaredIntentionsToWrite), true, false);
stagingDirectory.ifPresent((v) -> recursiveDeleteItems(new Path(v), true, false));
}

private void abortMultiUploads() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.doris.common.CustomThreadFactory;

import com.google.common.collect.Sets;
import org.apache.hadoop.fs.FileSystem;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -27,6 +28,7 @@
import java.lang.ref.PhantomReference;
import java.lang.ref.Reference;
import java.lang.ref.ReferenceQueue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -63,6 +65,8 @@ public class RemoteFSPhantomManager {
private static final ConcurrentHashMap<PhantomReference<RemoteFileSystem>, FileSystem> referenceMap
= new ConcurrentHashMap<>();

private static final Set<FileSystem> fsSet = Sets.newConcurrentHashSet();

// Flag indicating whether the cleanup thread has been started
private static final AtomicBoolean isStarted = new AtomicBoolean(false);

Expand All @@ -77,9 +81,13 @@ public static void registerPhantomReference(RemoteFileSystem remoteFileSystem) {
start();
isStarted.set(true);
}
if (fsSet.contains(remoteFileSystem.dfsFileSystem)) {
throw new RuntimeException("FileSystem already exists: " + remoteFileSystem.dfsFileSystem.getUri());
}
RemoteFileSystemPhantomReference phantomReference = new RemoteFileSystemPhantomReference(remoteFileSystem,
referenceQueue);
referenceMap.put(phantomReference, remoteFileSystem.dfsFileSystem);
fsSet.add(remoteFileSystem.dfsFileSystem);
}

/**
Expand All @@ -102,6 +110,7 @@ public static void start() {
if (fs != null) {
try {
fs.close();
fsSet.remove(fs);
LOG.info("Closed file system: {}", fs.getUri());
} catch (IOException e) {
LOG.warn("Failed to close file system", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import org.apache.doris.analysis.StorageBackend;
import org.apache.doris.backup.Status;
import org.apache.doris.common.UserException;
import org.apache.doris.common.security.authentication.AuthenticationConfig;
import org.apache.doris.common.security.authentication.HadoopAuthenticator;
import org.apache.doris.datasource.property.PropertyConverter;
import org.apache.doris.fs.obj.S3ObjStorage;
import org.apache.doris.fs.remote.dfs.DFSFileSystem;
Expand All @@ -34,6 +36,7 @@
import org.apache.logging.log4j.Logger;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -74,12 +77,20 @@ protected FileSystem nativeFileSystem(String remotePath) throws UserException {
PropertyConverter.convertToHadoopFSProperties(properties).entrySet().stream()
.filter(entry -> entry.getKey() != null && entry.getValue() != null)
.forEach(entry -> conf.set(entry.getKey(), entry.getValue()));
AuthenticationConfig authConfig = AuthenticationConfig.getKerberosConfig(conf);
HadoopAuthenticator authenticator = HadoopAuthenticator.getHadoopAuthenticator(authConfig);
try {
dfsFileSystem = FileSystem.get(new Path(remotePath).toUri(), conf);
dfsFileSystem = authenticator.doAs(() -> {
try {
return FileSystem.get(new Path(remotePath).toUri(), conf);
} catch (IOException e) {
throw new RuntimeException(e);
}
});
RemoteFSPhantomManager.registerPhantomReference(this);
} catch (Exception e) {
throw new UserException("Failed to get S3 FileSystem for " + e.getMessage(), e);
}
RemoteFSPhantomManager.registerPhantomReference(this);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,11 @@ public FileSystem nativeFileSystem(String remotePath) throws UserException {
throw new RuntimeException(e);
}
});
operations = new HDFSFileOperations(dfsFileSystem);
RemoteFSPhantomManager.registerPhantomReference(this);
} catch (Exception e) {
throw new UserException(e);
throw new UserException("Failed to get dfs FileSystem for " + e.getMessage(), e);
}
operations = new HDFSFileOperations(dfsFileSystem);
RemoteFSPhantomManager.registerPhantomReference(this);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public void bindDataSink(Optional<InsertCommandContext> insertCtx)
if (insertCtx.isPresent()) {
HiveInsertCommandContext context = (HiveInsertCommandContext) insertCtx.get();
tSink.setOverwrite(context.isOverwrite());
context.setWritePath(storageLocation);
context.setWritePath(location);
context.setFileType(fileType);
}
} else {
Expand Down

0 comments on commit c3507f5

Please sign in to comment.