Skip to content

Commit b0ba8b9

Browse files
author
Jordan Epstein
committed
Move deleted files to Hadoop trash if configured
As of now, the HadoopFileIO uses the Java delete API, which always skips using a configured trash directory. If the table's hadoop configuration has trash enabled, we should use it. In the event of file deletions, we aim to mimic existing behavior, throwing an error when attempting to delete a directory. In the single file delete case, this notably adds an RPC call (to check if a path is a directory) if the trash is enabled.
1 parent e268df6 commit b0ba8b9

File tree

2 files changed

+83
-2
lines changed

2 files changed

+83
-2
lines changed

core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.hadoop.fs.FileSystem;
3131
import org.apache.hadoop.fs.Path;
3232
import org.apache.hadoop.fs.RemoteIterator;
33+
import org.apache.hadoop.fs.Trash;
3334
import org.apache.iceberg.exceptions.RuntimeIOException;
3435
import org.apache.iceberg.io.BulkDeletionFailureException;
3536
import org.apache.iceberg.io.DelegateFileIO;
@@ -102,7 +103,15 @@ public void deleteFile(String path) {
102103
Path toDelete = new Path(path);
103104
FileSystem fs = Util.getFs(toDelete, getConf());
104105
try {
105-
fs.delete(toDelete, false /* not recursive */);
106+
Trash trash = new Trash(fs, getConf());
107+
if (!trash.isEnabled()) {
108+
fs.delete(toDelete, false /* not recursive */);
109+
} else {
110+
if (fs.getFileStatus(toDelete).isDirectory()) {
111+
throw new IOException(String.format("Cannot delete move directory %s to trash", path));
112+
}
113+
trash.moveToTrash(toDelete);
114+
}
106115
} catch (IOException e) {
107116
throw new RuntimeIOException(e, "Failed to delete file: %s", path);
108117
}
@@ -167,7 +176,12 @@ public void deletePrefix(String prefix) {
167176
FileSystem fs = Util.getFs(prefixToDelete, getConf());
168177

169178
try {
170-
fs.delete(prefixToDelete, true /* recursive */);
179+
Trash trash = new Trash(fs, getConf());
180+
if (!trash.isEnabled()) {
181+
fs.delete(prefixToDelete, true /* recursive */);
182+
} else {
183+
trash.moveToTrash(prefixToDelete);
184+
}
171185
} catch (IOException e) {
172186
throw new UncheckedIOException(e);
173187
}

core/src/test/java/org/apache/iceberg/hadoop/TestHadoopFileIO.java

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.iceberg.hadoop;
2020

21+
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY;
2122
import static org.assertj.core.api.Assertions.assertThat;
2223
import static org.assertj.core.api.Assertions.assertThatThrownBy;
2324

@@ -30,6 +31,7 @@
3031
import java.util.UUID;
3132
import java.util.Vector;
3233
import java.util.stream.Collectors;
34+
import java.util.stream.StreamSupport;
3335
import org.apache.hadoop.conf.Configuration;
3436
import org.apache.hadoop.fs.FileSystem;
3537
import org.apache.hadoop.fs.Path;
@@ -126,6 +128,48 @@ public void testDeletePrefix() {
126128
.hasMessageContaining("java.io.FileNotFoundException");
127129
}
128130

131+
@Test
132+
public void testDeletePrefixWithTrashEnabled() throws IOException {
133+
Configuration conf = new Configuration();
134+
conf.set(FS_TRASH_INTERVAL_KEY, "60");
135+
fs = FileSystem.getLocal(conf);
136+
137+
hadoopFileIO = new HadoopFileIO(conf);
138+
Path parent = new Path(tempDir.toURI());
139+
140+
List<Integer> scaleSizes = Lists.newArrayList(1, 1000, 2500);
141+
142+
scaleSizes.parallelStream()
143+
.forEach(
144+
scale -> {
145+
Path scalePath = new Path(parent, Integer.toString(scale));
146+
147+
createRandomFiles(scalePath, scale);
148+
hadoopFileIO.deletePrefix(scalePath.toUri().toString());
149+
150+
// Hadoop filesystem will throw if the path does not exist
151+
assertThatThrownBy(
152+
() -> hadoopFileIO.listPrefix(scalePath.toUri().toString()).iterator())
153+
.isInstanceOf(UncheckedIOException.class)
154+
.hasMessageContaining("java.io.FileNotFoundException");
155+
assertThat(
156+
StreamSupport.stream(
157+
hadoopFileIO
158+
.listPrefix(
159+
fs.getTrashRoot(scalePath).toString() + "/Current" + tempDir)
160+
.spliterator(),
161+
false)
162+
.count())
163+
.isEqualTo((Long) (long) (int) scale);
164+
});
165+
166+
hadoopFileIO.deletePrefix(parent.toUri().toString());
167+
// Hadoop filesystem will throw if the path does not exist
168+
assertThatThrownBy(() -> hadoopFileIO.listPrefix(parent.toUri().toString()).iterator())
169+
.isInstanceOf(UncheckedIOException.class)
170+
.hasMessageContaining("java.io.FileNotFoundException");
171+
}
172+
129173
@Test
130174
public void testDeleteFiles() {
131175
Path parent = new Path(tempDir.toURI());
@@ -136,6 +180,29 @@ public void testDeleteFiles() {
136180
file -> assertThat(hadoopFileIO.newInputFile(file.toString()).exists()).isFalse());
137181
}
138182

183+
@Test
184+
public void testDeleteFilesWithTrashEnabled() throws IOException {
185+
Configuration conf = new Configuration();
186+
conf.set(FS_TRASH_INTERVAL_KEY, "60");
187+
fs = FileSystem.getLocal(conf);
188+
189+
hadoopFileIO = new HadoopFileIO(conf);
190+
Path parent = new Path(tempDir.toURI());
191+
List<Path> filesCreated = createRandomFiles(parent, 10);
192+
hadoopFileIO.deleteFiles(
193+
filesCreated.stream().map(Path::toString).collect(Collectors.toList()));
194+
filesCreated.forEach(
195+
file -> assertThat(hadoopFileIO.newInputFile(file.toString()).exists()).isFalse());
196+
assertThat(
197+
StreamSupport.stream(
198+
hadoopFileIO
199+
.listPrefix(fs.getTrashRoot(parent).toString() + "/Current" + tempDir)
200+
.spliterator(),
201+
false)
202+
.count())
203+
.isEqualTo(10L);
204+
}
205+
139206
@Test
140207
public void testDeleteFilesErrorHandling() {
141208
List<String> filesCreated =

0 commit comments

Comments
 (0)