Skip to content

Commit

Permalink
Merge pull request #4 from jmeagher/Ignore_Files_Regex
Browse files Browse the repository at this point in the history
Add option to ignore files by regex
  • Loading branch information
edwardcapriolo committed Jul 18, 2014
2 parents 4192297 + 70d0616 commit 591f1b8
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 7 deletions.
48 changes: 44 additions & 4 deletions src/main/java/com/m6d/filecrush/crush/Crush.java
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,11 @@ public class Crush extends Configured implements Tool {
* specification.
*/
private List<Matcher> matchers;

/**
* Regex from the --ignore-regex option used for filtering out files for crushing.
*/
private Matcher ignoredFiles;

/**
* The counters from the completed job.
Expand Down Expand Up @@ -206,6 +211,15 @@ Options buildOptions() {

options.addOption(option);

option = OptionBuilder
.hasArg()
.withArgName("ignore file regex")
.withDescription("Regular expression to apply for filtering out crush candidate files. Any files in the input crush directory matching this will be ignored")
.withLongOpt("ignore-regex")
.create();

options.addOption(option);

option = OptionBuilder
.hasArg()
.withArgName("replacement string")
Expand Down Expand Up @@ -344,6 +358,10 @@ boolean createJobConfAndParseArgs(String... args) throws ParseException, IOExcep
} else {
console = Verbosity.NONE;
}

if (cli.hasOption("ignore-regex")) {
ignoredFiles = Pattern.compile(cli.getOptionValue("ignore-regex")).matcher("");
}

excludeSingleFileDirs = !cli.hasOption("include-single-file-dirs");

Expand Down Expand Up @@ -478,6 +496,10 @@ boolean createJobConfAndParseArgs(String... args) throws ParseException, IOExcep
* Add the crush specs and compression options to the configuration.
*/
job.set("crush.timestamp", crushTimestamp);

if (ignoredFiles != null) {
job.set("crush.ignore-regex", ignoredFiles.pattern().pattern());
}

if (regexes.size() != replacements.size() || replacements.size() != inFormats.size() || inFormats.size() != outFormats.size()) {
throw new IllegalArgumentException("Must be an equal number of regex, replacement, in-format, and out-format options");
Expand Down Expand Up @@ -654,6 +676,14 @@ private void standAlone() throws IOException {

for (FileStatus content : contents) {
if (!content.isDir()) {
if (ignoredFiles != null) {
// Check for files to skip
ignoredFiles.reset(content.getPath().toUri().getPath());
if (ignoredFiles.matches()) {
LOG.trace("Ignoring " + content.getPath().toString());
continue;
}
}
files.add(new Text(content.getPath().toUri().getPath()));
}
}
Expand Down Expand Up @@ -681,17 +711,19 @@ private void standAlone() throws IOException {
CrushReducer reducer = new CrushReducer();

reducer.configure(job);
reducer.reduce(bucket, files.iterator(), new NullOutputCollector<Text, Text>(), Reporter.NULL);
reducer.reduce(bucket, files.iterator(), new NullOutputCollector<Text, Text>(), Reporter.NULL);
reducer.close();

/*
* Use a glob here because the temporary and task attempt work dirs have funny names.
* Include a * at the end to cover wildcards for compressed files.
*/
Path crushOutput = new Path(absOutDir + "/*/*/crush" + absSrcDir + "/" + dest.getName());
Path crushOutput = new Path(absOutDir + "/*/*/crush" + absSrcDir + "/" + dest.getName() + "*");

FileStatus[] statuses = fs.globStatus(crushOutput);

if (statuses == null || 1 != statuses.length) {
throw new AssertionError();
throw new AssertionError("Did not find the expected output in " + crushOutput.toString());
}

rename(statuses[0].getPath(), dest.getParent(), dest.getName());
Expand Down Expand Up @@ -992,7 +1024,15 @@ void writeDirs() throws IOException {

print(Verbosity.INFO, "\n\n" + dir.toUri().getPath());

FileStatus[] contents = fs.listStatus(dir);
FileStatus[] contents = fs.listStatus(dir, new PathFilter() {
@Override
public boolean accept(Path testPath) {
if (ignoredFiles == null) return true;
ignoredFiles.reset(testPath.toUri().getPath());
return !ignoredFiles.matches();
}

});

if (contents == null || contents.length == 0) {
print(Verbosity.INFO, " is empty");
Expand Down
44 changes: 44 additions & 0 deletions src/test/java/com/m6d/filecrush/crush/CrushStandAloneTextTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,50 @@ public void noFiles() throws Exception {
assertThat(out.exists(), is(false));
}

@Test
public void ignoreRegexTest() throws Exception {

File in = tmp.newFolder("skip_test");

createFile(in, "lil-0", 0, 1);
createFile(in, "lil-1", 1, 2);
createFile(in, "big-2", 2, 5);
createFile(in, "big-3", 3, 5);
// Files to be ignored
createFile(in, "lil-0.index", 0, 10);
createFile(in, "lil-1.index", 1, 20);
createFile(in, "big-2.index", 2, 50);
createFile(in, "big-3.index", 3, 50);

File out = new File(tmp.getRoot(), "skip_test_out");

ToolRunner.run(job, new Crush(), new String[] {
"--input-format=text",
"--output-format=text",
"--ignore-regex=.*\\.index",
"--compress=none",

in.getAbsolutePath(), out.getAbsolutePath()
});

/*
* Make sure the original files are still there.
*/
verifyFile(in, "lil-0", 0, 1);
verifyFile(in, "lil-1", 1, 2);
verifyFile(in, "big-2", 2, 5);
verifyFile(in, "big-3", 3, 5);
verifyFile(in, "lil-0.index", 0, 10);
verifyFile(in, "lil-1.index", 1, 20);
verifyFile(in, "big-2.index", 2, 50);
verifyFile(in, "big-3.index", 3, 50);

/*
* Verify the crush output.
*/
verifyCrushOutput(out, new int[] { 0, 1 }, new int[] { 1, 2}, new int[] { 2, 5 }, new int[] { 3, 5 });
}

private void verifyCrushOutput(File crushOutput, int[]... keyCounts) throws IOException {

List<String> actual = new ArrayList<String>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -809,6 +809,37 @@ public void executeBackwardsCompatibleSequence() throws Exception {
assertThat(jobCounters.getCounter(ReducerCounter.FILES_CRUSHED), equalTo( 23L));
assertThat(jobCounters.getCounter(ReducerCounter.RECORDS_CRUSHED), equalTo(964L));
}


@Test
public void executeIgnoreFile() throws Exception {

// Ones to include
writeFile("in_skip_test/file10", Format.TEXT);
writeFile("in_skip_test/file11", Format.TEXT);
writeFile("in_skip_test/file12", Format.TEXT);
// Ones to skip
writeFile("in_skip_test/file90", Format.TEXT);
writeFile("in_skip_test/file91", Format.TEXT);
writeFile("in_skip_test/file92", Format.TEXT);

Crush crush = new Crush();

ToolRunner.run(job, crush, new String [] {
"--threshold=0.015",
"--max-file-blocks=1",
"--verbose",
"--input-format=text",
"--output-format=text",
"--compress=none",
"--ignore-regex=.*9[0-9]",

"in_skip_test", "out_skip_test", "20101116153015"
});

verifyOutput(homeDir + "/out_skip_test", "crushed_file-*-*-*", Format.TEXT, Format.TEXT, null, "file10", "file11", "file12");

}

/**
* Copies data from the given input stream to an HDFS file at the given path. This method will close the input stream.
Expand Down Expand Up @@ -921,7 +952,7 @@ private void verifyOutput(String dir, String crushOutMask, Format inFmt, Format
if (Format.TEXT == outFmt) {
/*
* TextInputFormat will produce keys that are byte offsets and values that are the line. This is not actually what we want.
* We want to preserve the actualy keys and values in the files, just like SequenceFileInputFormat. So, either way, the
* We want to preserve the actual keys and values in the files, just like SequenceFileInputFormat. So, either way, the
* keys and values should be the text representations of what went in.
*/
BufferedReader reader;
Expand All @@ -931,9 +962,9 @@ private void verifyOutput(String dir, String crushOutMask, Format inFmt, Format
Path path = new Path(dir + "/" + crushOutMask);

FileStatus[] globStatus = getFileSystem().globStatus(path);

if (globStatus == null || 1 != globStatus.length || globStatus[0].isDir()) {
fail(crushOutMask);
fail(crushOutMask + " was not found in " + path);
}

crushOut = globStatus[0].getPath();
Expand Down

0 comments on commit 591f1b8

Please sign in to comment.