I am using Hadoop 0.20.2 (that cannot be changed) and I want to add a filter to my input path. The data looks as follows:
/path1/test_a1
/path1/test_a2
/path1/train_a1
/path1/train_a2
and I only want to process all files with train in them.
A look at the FileInputFormat class suggests to use:
FileInputFormat.setInputPathFilter(Job job, Class<? extends PathFilter> filter)
and this is where my problem starts, since PathFilter is an interface - of course, I can extend the interface but then I still do not have an implementation. So instead, I implemented the interface:
class TrainFilter implements PathFilter
{
boolean accept(Path path)
{
return path.toString().contains("train");
}
}
When I use TrainFilter as PathFilter the code compiles, however when I run it, I get an exception as the input path is screwed up. Without setting the filter, my code runs through all files that are below /path1, however, when setting the filter, it throws the error:
InvalidInputException: Input path does not exist hdfs://localhost:9000/path1
Here is how I set it up in the driver code:
job.setMapperClass(....class);
job.setInputFormatClass(....class);
job.setMapOutputKeyClass(...class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPathFilter(job, TrainFilter.class);
FileInputFormat.addInputPath(job, new Path("/path1/"));
FileOutputFormat.setOutputPath(job, new Path("/path2/"));
job.waitForCompletion(true);
Any suggestions of what I am doing wrong here?
EDIT: I found the problem. The first call to the PathFilter is always the directory itself (/path1) and since it does not contain ("train"), the directory itself is invalid and thus the exception is thrown. Which brings me to another question: how can I test if an arbitrary path is a directory? For all I know, I need a reference to the FileSystem, which is not one of the default parameters of PathFilter.
Alternatively, you may try to loop through all of the files in the given directory and check if the file names begin with train. E.g:
Job job = new Job(conf, "myJob");
List<Path> inputhPaths = new ArrayList<Path>();
String basePath = "/user/hadoop/path";
FileSystem fs = FileSystem.get(conf);
FileStatus[] listStatus = fs.globStatus(new Path(basePath + "/train*"));
for (FileStatus fstat : listStatus) {
inputhPaths.add(fstat.getPath());
}
FileInputFormat.setInputPaths(job,
(Path[]) inputhPaths.toArray(new Path[inputhPaths.size()]));
A quick fix, You can blacklist paths instead of whitelisting like return false if path contains "test"
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With