实现一个RecordReader来读取CombineFileSplit包装的文件Block
继承自CombineFileInputFormat实现一个使用我们自定义的RecordReader的输入规格说明类
处理数据的Mapper实现类
配置用来处理海量小文件的MapReduce Job
CombineSmallfileRecordReader类
为CombineFileSplit实现一个RecordReader,并在内部使用Hadoop自带的LineRecordReader来读取小文件的文本行数据,代码实现如下所示:
static class CombineSmallfileRecordReader extends RecordReader<LongWritable, Text> {
private CombineFileSplit combineFileSplit;
private LineRecordReader lineRecordReader = new LineRecordReader();
private Path[] paths;
private int totalLength;
private int currentIndex;
private float currentProgress = 0;
private LongWritable currentKey;
private Text currentValue = new Text();;
public CombineSmallfileRecordReader (CombineFileSplit combineFileSplit, TaskAttemptContext context, Integer index)
throws IOException {
super();
this.combineFileSplit = combineFileSplit;
this.currentIndex = index; // 当前要处理的小文件Block在CombineFileSplit中的索引
}
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
this.combineFileSplit = (CombineFileSplit) split;
// 处理CombineFileSplit中的一个小文件Block,因为使用LineRecordReader,需要构造一个FileSplit对象,然后才能够读取数据
FileSplit fileSplit = new FileSplit(combineFileSplit.getPath(currentIndex),
combineFileSplit.getOffset(currentIndex), combineFileSplit.getLength(currentIndex),
combineFileSplit.getLocations());
lineRecordReader.initialize(fileSplit, context);
this.paths = combineFileSplit.getPaths();
totalLength = paths.length;
context.getConfiguration().set("map.input.file.name", combineFileSplit.getPath(currentIndex).getName());
}
@Override
public LongWritable getCurrentKey() throws IOException, InterruptedException {
currentKey = lineRecordReader.getCurrentKey();
return currentKey;
}
@Override
public Text getCurrentValue() throws IOException, InterruptedException {
byte[] content = lineRecordReader.getCurrentValue().getBytes();
currentValue.set(content, 0, content.length);
return currentValue;
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (currentIndex >= 0 && currentIndex < totalLength) {
return lineRecordReader.nextKeyValue();
} else {
return false;
}
}
@Override
public float getProgress() throws IOException {
if (currentIndex >= 0 && currentIndex < totalLength) {
currentProgress = (float) currentIndex / totalLength;
return currentProgress;
}
return currentProgress;
}
@Override
public void close() throws IOException {
lineRecordReader.close();
}
}
如果存在这样的应用场景,你的小文件具有不同的格式,那么久需要考虑对不同类型的小文件,使用不同的内置RecordReader,具体逻辑也是在上面的类中实现。
CombineSmallfileInputFormat类
我们已经为CombineFileSplit实现了一个RecordReader,然后需要在一个CombineFileInputFormat中注入这个RecordReader类实现类CombineSmallfileRecordReader的对象。这时,需要实现一个CombineFileInputFormat的子类,可以重写createRecordReader方法。我们实现的CombineSmallfileInputFormat,代码如下所示:
static class CombineSmallfileInputFormat extends CombineFileInputFormat<LongWritable, Text> {
@Override
public RecordReader<LongWritable, Text> createRecordReader(InputSplit split,
TaskAttemptContext context) throws IOException {
CombineFileSplit combineFileSplit = (CombineFileSplit) split;
CombineFileRecordReader<LongWritable, Text> recordReader = new CombineFileRecordReader<LongWritable, Text>(
combineFileSplit, context, CombineSmallfileRecordReader.class);
try {
recordReader.initialize(combineFileSplit, context);
} catch (InterruptedException e) {
new RuntimeException("Error to initialize CombineSmallfileRecordReader.");
}
return recordReader;
}
}
以wordcount为例
static class wordCountmap extends Mapper<LongWritable, Text, Text, LongWritable> {
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context)
throws IOException, InterruptedException {
String wordcount = value.toString();
String[] words = wordcount.split(",");
for (String word : words) {
context.write(new Text(word), new LongWritable(1));
}
}
}
static class WordCountReduce extends Reducer<Text, LongWritable, Text, LongWritable> {
@Override
protected void reduce(Text key, Iterable<LongWritable> value,
Reducer<Text, LongWritable, Text, LongWritable>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
Iterator<LongWritable> iterator = value.iterator();
long count = 0;
for (LongWritable longWritable : value) {
count += longWritable.get();
}
context.write(key, new LongWritable(count));
}
}
static class CombineSmallfiles {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
conf.setInt("mapred.min.split.size", 1);
conf.setLong("mapred.max.split.size", 26214400); // 25m
conf.setInt("mapred.reduce.tasks", 2);
Job job = new Job(conf, "combine smallfiles");
job.setJarByClass(CombineSmallfiles.class);
job.setMapperClass(wordcounts.wordCountmap.class); job.setReducerClass(wordcounts.WordCountReduce.class);
job.setNumReduceTasks(2);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
以NLineInputFormat的方式对文件进行切片,这里设置的是每五行对文件进行切片
// NLineInputFormat.setNumLinesPerSplit(job, 5);
// job.setInputFormatClass(NLineInputFormat.class);
job.setInputFormatClass(CombineSmallfileInputFormat.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path("hdfs://c122:8020/zqc/wordcount"));
FileOutputFormat.setOutputPath(job, new Path("hdfs://c122:8020/zqc/reduces"));
int exitFlag = job.waitForCompletion(true) ? 0 : 1;
System.exit(exitFlag);
}