您的当前位置:首页正文

利用CombineFileInputFormat合并小文件

2024-11-30 来源:个人技术集锦

实现一个RecordReader来读取CombineFileSplit包装的文件Block
继承自CombineFileInputFormat实现一个使用我们自定义的RecordReader的输入规格说明类
处理数据的Mapper实现类
配置用来处理海量小文件的MapReduce Job
CombineSmallfileRecordReader类
为CombineFileSplit实现一个RecordReader,并在内部使用Hadoop自带的LineRecordReader来读取小文件的文本行数据,代码实现如下所示:

static class CombineSmallfileRecordReader extends RecordReader<LongWritable, Text> {

        private CombineFileSplit combineFileSplit;

        private LineRecordReader lineRecordReader = new LineRecordReader();

        private Path[] paths;

        private int totalLength;

        private int currentIndex;

        private float currentProgress = 0;

        private LongWritable currentKey;

        private Text currentValue = new Text();;

        public CombineSmallfileRecordReader (CombineFileSplit combineFileSplit, TaskAttemptContext context, Integer index)
                throws IOException {

            super();

            this.combineFileSplit = combineFileSplit;

            this.currentIndex = index; // 当前要处理的小文件Block在CombineFileSplit中的索引

        }

        @Override

        public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {

            this.combineFileSplit = (CombineFileSplit) split;

            // 处理CombineFileSplit中的一个小文件Block,因为使用LineRecordReader,需要构造一个FileSplit对象,然后才能够读取数据

            FileSplit fileSplit = new FileSplit(combineFileSplit.getPath(currentIndex),
                    combineFileSplit.getOffset(currentIndex), combineFileSplit.getLength(currentIndex),
                    combineFileSplit.getLocations());

            lineRecordReader.initialize(fileSplit, context);

            this.paths = combineFileSplit.getPaths();

            totalLength = paths.length;

            context.getConfiguration().set("map.input.file.name", combineFileSplit.getPath(currentIndex).getName());

        }

        @Override

        public LongWritable getCurrentKey() throws IOException, InterruptedException {

            currentKey = lineRecordReader.getCurrentKey();

            return currentKey;

        }

        @Override

        public Text getCurrentValue() throws IOException, InterruptedException {

            byte[] content = lineRecordReader.getCurrentValue().getBytes();
            currentValue.set(content, 0, content.length);

            return currentValue;

        }

        @Override

        public boolean nextKeyValue() throws IOException, InterruptedException {

            if (currentIndex >= 0 && currentIndex < totalLength) {

                return lineRecordReader.nextKeyValue();

            } else {

                return false;

            }

        }

        @Override

        public float getProgress() throws IOException {

            if (currentIndex >= 0 && currentIndex < totalLength) {

                currentProgress = (float) currentIndex / totalLength;

                return currentProgress;

            }

            return currentProgress;

        }

        @Override

        public void close() throws IOException {

            lineRecordReader.close();

        }
    }

如果存在这样的应用场景,你的小文件具有不同的格式,那么久需要考虑对不同类型的小文件,使用不同的内置RecordReader,具体逻辑也是在上面的类中实现。

CombineSmallfileInputFormat类
我们已经为CombineFileSplit实现了一个RecordReader,然后需要在一个CombineFileInputFormat中注入这个RecordReader类实现类CombineSmallfileRecordReader的对象。这时,需要实现一个CombineFileInputFormat的子类,可以重写createRecordReader方法。我们实现的CombineSmallfileInputFormat,代码如下所示:

static class CombineSmallfileInputFormat extends CombineFileInputFormat<LongWritable, Text> {

        @Override

        public RecordReader<LongWritable, Text> createRecordReader(InputSplit split,
                TaskAttemptContext context) throws IOException {

            CombineFileSplit combineFileSplit = (CombineFileSplit) split;

            CombineFileRecordReader<LongWritable, Text> recordReader = new CombineFileRecordReader<LongWritable, Text>(
                    combineFileSplit, context, CombineSmallfileRecordReader.class);

            try {

                recordReader.initialize(combineFileSplit, context);

            } catch (InterruptedException e) {

                new RuntimeException("Error to initialize CombineSmallfileRecordReader.");

            }

            return recordReader;

        }

    }

以wordcount为例

static class wordCountmap extends Mapper<LongWritable, Text, Text, LongWritable> {

        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context)
                throws IOException, InterruptedException {
            String wordcount = value.toString();
            String[] words = wordcount.split(",");
            for (String word : words) {
                context.write(new Text(word), new LongWritable(1));
            }
        }

    }

    static class WordCountReduce extends Reducer<Text, LongWritable, Text, LongWritable> {

        @Override
        protected void reduce(Text key, Iterable<LongWritable> value,
                Reducer<Text, LongWritable, Text, LongWritable>.Context context)
                throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            Iterator<LongWritable> iterator = value.iterator();
            long count = 0;
            for (LongWritable longWritable : value) {
                count += longWritable.get();
            }
            context.write(key, new LongWritable(count));
        }

    }

    static class CombineSmallfiles {
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            Configuration conf = new Configuration();
            conf.setInt("mapred.min.split.size", 1);
            conf.setLong("mapred.max.split.size", 26214400); // 25m
            conf.setInt("mapred.reduce.tasks", 2);
            Job job = new Job(conf, "combine smallfiles");
            job.setJarByClass(CombineSmallfiles.class);
        job.setMapperClass(wordcounts.wordCountmap.class);      job.setReducerClass(wordcounts.WordCountReduce.class);
            job.setNumReduceTasks(2);

            job.setOutputKeyClass(Text.class);

            job.setOutputValueClass(LongWritable.class);
以NLineInputFormat的方式对文件进行切片,这里设置的是每五行对文件进行切片
//          NLineInputFormat.setNumLinesPerSplit(job, 5);
//      job.setInputFormatClass(NLineInputFormat.class);    
        job.setInputFormatClass(CombineSmallfileInputFormat.class);
            job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
            FileInputFormat.addInputPath(job, new Path("hdfs://c122:8020/zqc/wordcount"));

            FileOutputFormat.setOutputPath(job, new Path("hdfs://c122:8020/zqc/reduces"));
    int exitFlag = job.waitForCompletion(true) ? 0 : 1;

        System.exit(exitFlag);

        }
显示全文