Skip to content

Commit efc6a8f

Browse files
author
Jordan Epstein
committed
Move deleted files to Hadoop trash if configured
As of now, the HadoopFileIO uses the Java delete API, which always skips using a configured trash directory. If the table's hadoop configuration has trash enabled, we should use it. In the event of file deletions, we aim to mimic existing behavior, throwing an error when attempting to delete a directory. In the single file delete case, this notably adds an RPC call (to check if a path is a directory) if the trash is enabled.
1 parent e268df6 commit efc6a8f

File tree

2 files changed

+137
-2
lines changed

2 files changed

+137
-2
lines changed

core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.hadoop.fs.FileSystem;
3131
import org.apache.hadoop.fs.Path;
3232
import org.apache.hadoop.fs.RemoteIterator;
33+
import org.apache.hadoop.fs.Trash;
3334
import org.apache.iceberg.exceptions.RuntimeIOException;
3435
import org.apache.iceberg.io.BulkDeletionFailureException;
3536
import org.apache.iceberg.io.DelegateFileIO;
@@ -47,6 +48,7 @@
4748

4849
public class HadoopFileIO implements HadoopConfigurable, DelegateFileIO {
4950

51+
public static final String USE_TRASH_IF_CONFIGURED = "iceberg.hadoop.use-trash-if-configured";
5052
private static final Logger LOG = LoggerFactory.getLogger(HadoopFileIO.class);
5153
private static final String DELETE_FILE_PARALLELISM = "iceberg.hadoop.delete-file-parallelism";
5254
private static final String DELETE_FILE_POOL_NAME = "iceberg-hadoopfileio-delete";
@@ -102,7 +104,7 @@ public void deleteFile(String path) {
102104
Path toDelete = new Path(path);
103105
FileSystem fs = Util.getFs(toDelete, getConf());
104106
try {
105-
fs.delete(toDelete, false /* not recursive */);
107+
deletePath(fs, toDelete, false);
106108
} catch (IOException e) {
107109
throw new RuntimeIOException(e, "Failed to delete file: %s", path);
108110
}
@@ -167,7 +169,7 @@ public void deletePrefix(String prefix) {
167169
FileSystem fs = Util.getFs(prefixToDelete, getConf());
168170

169171
try {
170-
fs.delete(prefixToDelete, true /* recursive */);
172+
deletePath(fs, prefixToDelete, true);
171173
} catch (IOException e) {
172174
throw new UncheckedIOException(e);
173175
}
@@ -211,6 +213,27 @@ private ExecutorService executorService() {
211213
return executorService;
212214
}
213215

216+
private void deletePath(FileSystem fs, Path toDelete, boolean recursive) throws IOException {
217+
if (!shouldUseTrashIfConfigured()) {
218+
fs.delete(toDelete, recursive);
219+
return;
220+
}
221+
Trash trash = new Trash(fs, getConf());
222+
if (!trash.isEnabled()) {
223+
fs.delete(toDelete, recursive);
224+
} else {
225+
if (!recursive && fs.getFileStatus(toDelete).isDirectory()) {
226+
throw new IOException(
227+
String.format("Cannot move directory %s to trash", toDelete.toString()));
228+
}
229+
trash.moveToTrash(toDelete);
230+
}
231+
}
232+
233+
private boolean shouldUseTrashIfConfigured() {
234+
return Boolean.parseBoolean(getConf().get(USE_TRASH_IF_CONFIGURED, "false"));
235+
}
236+
214237
/**
215238
* This class is a simple adaptor to allow for using Hadoop's RemoteIterator as an Iterator.
216239
*

core/src/test/java/org/apache/iceberg/hadoop/TestHadoopFileIO.java

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,10 @@
1818
*/
1919
package org.apache.iceberg.hadoop;
2020

21+
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY;
22+
import static org.apache.iceberg.hadoop.HadoopFileIO.USE_TRASH_IF_CONFIGURED;
2123
import static org.assertj.core.api.Assertions.assertThat;
24+
import static org.assertj.core.api.Assertions.assertThatNoException;
2225
import static org.assertj.core.api.Assertions.assertThatThrownBy;
2326

2427
import java.io.File;
@@ -126,6 +129,47 @@ public void testDeletePrefix() {
126129
.hasMessageContaining("java.io.FileNotFoundException");
127130
}
128131

132+
@Test
133+
public void testDeletePrefixWithTrashEnabled() throws IOException {
134+
Configuration conf = new Configuration();
135+
conf.set(FS_TRASH_INTERVAL_KEY, "60");
136+
conf.set(USE_TRASH_IF_CONFIGURED, "true");
137+
fs = FileSystem.getLocal(conf);
138+
139+
hadoopFileIO = new HadoopFileIO(conf);
140+
Path parent = new Path(tempDir.toURI());
141+
142+
List<Integer> scaleSizes = Lists.newArrayList(1, 1000, 2500);
143+
144+
scaleSizes.parallelStream()
145+
.forEach(
146+
scale -> {
147+
Path scalePath = new Path(parent, Integer.toString(scale));
148+
149+
List<Path> filesCreated = createRandomFiles(scalePath, scale);
150+
hadoopFileIO.deletePrefix(scalePath.toUri().toString());
151+
152+
// Hadoop filesystem will throw if the path does not exist
153+
assertThatThrownBy(
154+
() -> hadoopFileIO.listPrefix(scalePath.toUri().toString()).iterator())
155+
.isInstanceOf(UncheckedIOException.class)
156+
.hasMessageContaining("java.io.FileNotFoundException");
157+
filesCreated.forEach(
158+
file -> {
159+
String fileSuffix = Path.getPathWithoutSchemeAndAuthority(file).toString();
160+
String trashPath =
161+
fs.getTrashRoot(scalePath).toString() + "/Current" + fileSuffix;
162+
assertThat(hadoopFileIO.newInputFile(trashPath).exists()).isTrue();
163+
});
164+
});
165+
166+
hadoopFileIO.deletePrefix(parent.toUri().toString());
167+
// Hadoop filesystem will throw if the path does not exist
168+
assertThatThrownBy(() -> hadoopFileIO.listPrefix(parent.toUri().toString()).iterator())
169+
.isInstanceOf(UncheckedIOException.class)
170+
.hasMessageContaining("java.io.FileNotFoundException");
171+
}
172+
129173
@Test
130174
public void testDeleteFiles() {
131175
Path parent = new Path(tempDir.toURI());
@@ -136,6 +180,74 @@ public void testDeleteFiles() {
136180
file -> assertThat(hadoopFileIO.newInputFile(file.toString()).exists()).isFalse());
137181
}
138182

183+
@Test
184+
public void testDeleteFilesWithTrashEnabled() throws IOException {
185+
Configuration conf = new Configuration();
186+
conf.set(FS_TRASH_INTERVAL_KEY, "60");
187+
conf.set(USE_TRASH_IF_CONFIGURED, "true");
188+
fs = FileSystem.getLocal(conf);
189+
190+
hadoopFileIO = new HadoopFileIO(conf);
191+
Path parent = new Path(tempDir.toURI());
192+
List<Path> filesCreated = createRandomFiles(parent, 10);
193+
hadoopFileIO.deleteFiles(
194+
filesCreated.stream().map(Path::toString).collect(Collectors.toList()));
195+
filesCreated.forEach(
196+
file -> assertThat(hadoopFileIO.newInputFile(file.toString()).exists()).isFalse());
197+
filesCreated.forEach(
198+
file -> {
199+
String fileSuffix = Path.getPathWithoutSchemeAndAuthority(file).toString();
200+
String trashPath = fs.getTrashRoot(parent).toString() + "/Current" + fileSuffix;
201+
assertThat(hadoopFileIO.newInputFile(trashPath).exists()).isTrue();
202+
});
203+
}
204+
205+
@Test
206+
public void testDeleteFilesWithoutIcebergTrashToggleDoesNotMoveToTrash() throws IOException {
207+
Configuration conf = new Configuration();
208+
conf.set(FS_TRASH_INTERVAL_KEY, "60");
209+
fs = FileSystem.getLocal(conf);
210+
211+
hadoopFileIO = new HadoopFileIO(conf);
212+
Path parent = new Path(tempDir.toURI());
213+
List<Path> filesCreated = createRandomFiles(parent, 10);
214+
hadoopFileIO.deleteFiles(
215+
filesCreated.stream().map(Path::toString).collect(Collectors.toList()));
216+
filesCreated.forEach(
217+
file -> assertThat(hadoopFileIO.newInputFile(file.toString()).exists()).isFalse());
218+
filesCreated.forEach(
219+
file -> {
220+
String fileSuffix = Path.getPathWithoutSchemeAndAuthority(file).toString();
221+
String trashPath = fs.getTrashRoot(parent).toString() + "/Current" + fileSuffix;
222+
assertThat(hadoopFileIO.newInputFile(trashPath).exists()).isFalse();
223+
});
224+
}
225+
226+
@Test
227+
public void testDeleteDirectoryWithTrashEnabledShouldThrow() throws IOException {
228+
Configuration conf = new Configuration();
229+
conf.set(FS_TRASH_INTERVAL_KEY, "60");
230+
conf.set(USE_TRASH_IF_CONFIGURED, "true");
231+
fs = FileSystem.getLocal(conf);
232+
233+
hadoopFileIO = new HadoopFileIO(conf);
234+
Path parent = new Path(tempDir.toURI());
235+
assertThatThrownBy(() -> hadoopFileIO.deleteFile(parent.toString()))
236+
.hasCauseInstanceOf(IOException.class);
237+
}
238+
239+
@Test
240+
public void testDeleteDirectoryWithTrashEnabledShouldNotThrow() throws IOException {
241+
Configuration conf = new Configuration();
242+
conf.set(FS_TRASH_INTERVAL_KEY, "60");
243+
conf.set(USE_TRASH_IF_CONFIGURED, "true");
244+
fs = FileSystem.getLocal(conf);
245+
246+
hadoopFileIO = new HadoopFileIO(conf);
247+
Path parent = new Path(tempDir.toURI());
248+
assertThatNoException().isThrownBy(() -> hadoopFileIO.deletePrefix(parent.toString()));
249+
}
250+
139251
@Test
140252
public void testDeleteFilesErrorHandling() {
141253
List<String> filesCreated =

0 commit comments

Comments
 (0)