-
Notifications
You must be signed in to change notification settings - Fork 6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
adding CombineInputFileFormat; only single use case so far #3
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,83 @@ | ||
package com.bc.calvalus.processing.l2; | ||
|
||
import com.bc.calvalus.JobClientsMap; | ||
import com.bc.calvalus.commons.CalvalusLogger; | ||
import com.bc.calvalus.commons.InputPathResolver; | ||
import com.bc.calvalus.inventory.hadoop.FileSystemPathIterator; | ||
import com.bc.calvalus.inventory.hadoop.HdfsFileSystemService; | ||
import com.bc.calvalus.processing.JobConfigNames; | ||
import com.bc.calvalus.processing.geodb.GeodbScanMapper; | ||
import com.bc.calvalus.processing.hadoop.NoRecordReader; | ||
import org.apache.hadoop.conf.Configuration; | ||
import org.apache.hadoop.fs.LocatedFileStatus; | ||
import org.apache.hadoop.fs.Path; | ||
import org.apache.hadoop.fs.RemoteIterator; | ||
import org.apache.hadoop.mapred.JobConf; | ||
import org.apache.hadoop.mapreduce.InputFormat; | ||
import org.apache.hadoop.mapreduce.InputSplit; | ||
import org.apache.hadoop.mapreduce.JobContext; | ||
import org.apache.hadoop.mapreduce.RecordReader; | ||
import org.apache.hadoop.mapreduce.TaskAttemptContext; | ||
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit; | ||
|
||
import java.io.IOException; | ||
import java.util.ArrayList; | ||
import java.util.List; | ||
import java.util.Set; | ||
|
||
/** | ||
* @author thomas | ||
*/ | ||
public class CombineFileInputFormat extends InputFormat { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a relation to org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat ? |
||
|
||
/** | ||
* Creates a single split from a given pattern | ||
*/ | ||
@Override | ||
public List<InputSplit> getSplits(JobContext context) throws IOException { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What about our other methods to determine inputs, in particular those using the geo-inventory? I know that PatternBasedInputFormat needs refactoring and decomposition but I think the other ways to determine inputs are required. |
||
Configuration conf = context.getConfiguration(); | ||
String inputPathPattern = conf.get(JobConfigNames.CALVALUS_INPUT_PATH_PATTERNS); | ||
|
||
List<InputSplit> splits = new ArrayList<>(1); | ||
JobClientsMap jobClientsMap = new JobClientsMap(new JobConf(conf)); | ||
HdfsFileSystemService hdfsFileSystemService = new HdfsFileSystemService(jobClientsMap); | ||
List<String> inputPatterns = new InputPathResolver().resolve(inputPathPattern); | ||
RemoteIterator<LocatedFileStatus> fileStatusIt = getFileStatuses(hdfsFileSystemService, inputPatterns, conf, null); | ||
addSplit(fileStatusIt, splits); | ||
CalvalusLogger.getLogger().info(String.format("Created %d split(s).", splits.size())); | ||
return splits; | ||
} | ||
|
||
private void addSplit(RemoteIterator<LocatedFileStatus> fileStatuses, List<InputSplit> splits) throws IOException { | ||
List<Path> filePaths = new ArrayList<>(); | ||
List<Long> fileLengths = new ArrayList<>(); | ||
while (fileStatuses.hasNext()) { | ||
LocatedFileStatus fileStatus = fileStatuses.next(); | ||
Path path = fileStatus.getPath(); | ||
filePaths.add(path); | ||
fileLengths.add(fileStatus.getLen()); | ||
} | ||
CombineFileSplit combineFileSplit = new CombineFileSplit(filePaths.toArray(new Path[filePaths.size()]), | ||
fileLengths.stream().mapToLong(Long::longValue).toArray()); | ||
splits.add(combineFileSplit); | ||
} | ||
|
||
|
||
protected RemoteIterator<LocatedFileStatus> getFileStatuses(HdfsFileSystemService fileSystemService, | ||
List<String> inputPatterns, | ||
Configuration conf, | ||
Set<String> existingPathes) throws IOException { | ||
FileSystemPathIterator.FileStatusFilter extraFilter = null; | ||
if (existingPathes != null && existingPathes.size() > 0) { | ||
extraFilter = fileStatus -> { | ||
String dbPath = GeodbScanMapper.getDBPath(fileStatus.getPath(), conf); | ||
return !existingPathes.contains(dbPath); | ||
}; | ||
} | ||
return fileSystemService.globFileStatusIterator(inputPatterns, conf, extraFilter); | ||
} | ||
|
||
public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) { | ||
return new NoRecordReader(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here, it seems the logic has unintentionally changed. The test for null was located before the second assignment of the copyFileToLocal before, and will never be true now.