2021-03-23 09:15:23

Hadoop入门教程(十三):Hadoop MapReduce 的切片与并行


Hadoop入门教程(十三):Hadoop MapReduce 的切片与并行

在前面《Hadoop入门教程(十一):编程的方式使用 MapReduce 实现 WordCount 案例》中我们使用编程的方式体验了 MapReduce,MapReduce 的工作流程大概是下面这样的:

  • InputFormat:将文件整理成[K,V]值

  • Shuffle:将同Key的数据合并整理

  • OutputFormat:把[K,V]值输出

InputFormat数据输入

InputFormat 需要将文件整理成[K,V]值,而文件又以数据库块 Block 存储在各个集群节点上,那么它如何能做到高效处理呢?那就需要将文件切分给各个节点并行执行任务。

怎么切片

之前的文章中讲过一个数据库块 Block 默认的大小是 128M,说明文件都是以 128M 一块分布在集群的各个节点上,那我们如何切片才能提高效率呢?

平均分?加入有一个 300M 的文件需要处理,平均分成 3个 100M 处理?看似好像这样负载在各个节点很均衡,但其实不然,因为:

第一个数据库块 Block 切分 100M 以后剩余 28M,需要传给第二个节点,第二个节点 28M+128M 再切 100M 剩余 56M 又需要传输给下一个节点,这就增加了大量的 网络IO,而网络带宽也是十分宝贵的资源,不但浪费网络还会增加任务处理时间。

所以默认就是一个一个数据库块 Block 的大小作为切片大小,任务的Map阶段在客户端提交任务时,就做好了数据切片的规划,这个可以在源码中看到,我列一下我跟踪的路线:

先是 job.waitForCompletion(true),然后到达 org.apache.hadoop.mapreduce.Job#waitForCompletion,在里面有 this.submit(),到达 org.apache.hadoop.mapreduce.Job#submit,里面又执行了一句:

this.status = (JobStatus)this.ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
    public JobStatus run() throws IOException, InterruptedException, ClassNotFoundException {
        return submitter.submitJobInternal(Job.this, Job.this.cluster);
    }
});

主要是 submitter.submitJobInternal(Job.this, Job.this.cluster) 这句去到了:org.apache.hadoop.mapreduce.JobSubmitter#submitJobInternal,在这里面执行了一句:int maps = this.writeSplits(job, submitJobDir),这个就是切片的代码了,maps就是我们要切成几片,启动几个 MapTask,继续下钻进去:

private int writeSplits(JobContext job, Path jobSubmitDir) throws IOException, InterruptedException, ClassNotFoundException {
    JobConf jConf = (JobConf)job.getConfiguration();
    int maps;
    if (jConf.getUseNewMapper()) {
        maps = this.writeNewSplits(job, jobSubmitDir);
    } else {
        maps = this.writeOldSplits(jConf, jobSubmitDir);
    }
    return maps;
}

里面执行了 maps = this.writeNewSplits(job, jobSubmitDir),也就是 org.apache.hadoop.mapreduce.JobSubmitter#writeNewSplits,重点来啦,注意看里面的代码:

private <T extends InputSplit> int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException, InterruptedException, ClassNotFoundException {
    Configuration conf = job.getConfiguration();
    InputFormat<?, ?> input = (InputFormat)ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
    List<InputSplit> splits = input.getSplits(job);
    T[] array = (InputSplit[])((InputSplit[])splits.toArray(new InputSplit[splits.size()]));
    Arrays.sort(array, new JobSubmitter.SplitComparator());
    JobSplitWriter.createSplitFiles(jobSubmitDir, conf, jobSubmitDir.getFileSystem(conf), array);
    return array.length;
}

里面这句 List<InputSplit> splits = input.getSplits(job),就是关键内容,执行了 org.apache.hadoop.mapreduce.InputFormat#getSplits,也就是说 InputFormat 提供了切片的方法,这就引入了我们开篇的结构,我们需要进一步了解 InputFormat 的相关知识,完美回到主题,我都感觉快跑偏了。

org.apache.hadoop.mapreduce.InputFormat 是个抽象类,所以我们需要找它的实现,比较常见的就是对文件的操作,也就是 org.apache.hadoop.mapreduce.lib.input.FileInputFormat,他实现了这个接口,我们看看 FileInputFormat 的 getSplits,代码太多我就不粘了啊,直奔重点,里面执行了一句:long splitSize = computeSplitSize(blockSize, minSize, maxSize);,我们进去看看里面干嘛了:

protected long computeSplitSize(long blockSize, long minSize,
                                long maxSize) {
    return Math.max(minSize, Math.min(maxSize, blockSize));
}

这是啥意思呢?看名字,我们能猜出来,是最小、最大和块大小,最大和块大小之间取最小值,肯定就是块大小,对吧,它不可能超过最大值,然后就是最小值和块大小之间取最大值,那肯定也是块大小,块大小不可能小于最小值吧,所以其实就是取了三个数之间中间的那个值。

说了这么多其实就是想给大家说明 InputFormat 干了什么,它也很重要,既然 Hadoop 的 InputFormat 是个抽象类,那意味着什么?是的,我们可以写自己的 InputFormat!

自定义 InputFormat

官方的 InputFormat 我就不再赘述了,大家可以直接看看源码,网上一搜也有。

org.apache.hadoop.mapreduce.InputFormat 其实就两步,getSplits 负责切片,createRecordReader 负责转换成 Mappe r接收的[K,V]值。

由于是入门级的教程,我就不深入探究了,就给大家简单演示一下,更深入的玩法还需要自己去研究。

本文源码公开在:https://github.com/renfei/demo/tree/master/hadoop/hadoop_api/src/main/java/net/renfei/hadoop/inputformat

偷个懒,为了更快的实现自定义 InputFormat,继承 FileInputFormat 来一个,首先新建一个 DemoInputFormat,继承 FileInputFormat,这个时候我们需要重写 createRecordReader 方法:

public class DemoInputFormat extends FileInputFormat<Text, BytesWritable> {
    @Override
    public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
        return new DemoRecordReader();
    }
}

这里我们还需要一个 RecordReader,再新建一个类 DemoRecordReader:

public class DemoRecordReader extends RecordReader<Text, BytesWritable> {
    private boolean readed = false;
    private Text key = new Text();
    private BytesWritable value = new BytesWritable();
    private FileSplit fileSplit;
    private FSDataInputStream inputStream;
    /**
     * 初始化方法,初始化的时候会被调用一次
     *
     * @param split
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
        // 切片类型转为文件切片
        // 此处强转 FileSplit 是因为 net.renfei.hadoop.inputformat.DemoInputFormat 继承了 org.apache.hadoop.mapreduce.lib.input.FileInputFormat
        fileSplit = (FileSplit) split;
        // 获取切片路径
        Path path = fileSplit.getPath();
        // 通过路径获取文件系统
        FileSystem fileSystem = path.getFileSystem(context.getConfiguration());
        // 打开文件流
        inputStream = fileSystem.open(path);
    }
    /**
     * 读取下一组 KV 值
     *
     * @return
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {
        if (readed) {
            return false;
        } else {
            // 读取数据
            // 读 Key,因为是演示,Text 也就是 Key 我们就取文件路径了,没什么意义
            key.set(fileSplit.getPath().toString());
            // 读 Value,直接一次读取完,所以是 fileSplit.getLength()
            byte[] buf = new byte[(int) fileSplit.getLength()];
            inputStream.read(buf);
            value.set(buf, 0, buf.length);
            readed = true;
            return true;
        }
    }
    /**
     * 获取当前读取到的Key
     *
     * @return
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    public Text getCurrentKey() throws IOException, InterruptedException {
        return key;
    }
    /**
     * 读取当前的Value
     *
     * @return
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    public BytesWritable getCurrentValue() throws IOException, InterruptedException {
        return value;
    }
    /**
     * 当前数据读取的进度
     *
     * @return
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    public float getProgress() throws IOException, InterruptedException {
        return readed ? 1 : 0;
    }
    /**
     * 关闭资源
     *
     * @throws IOException
     */
    @Override
    public void close() throws IOException {
        IOUtils.closeStream(inputStream);
    }
}

创建一个 Driver 类:

public class DemoDriver {
    /**
     * 程序入口
     *
     * @param args
     */
    public static void main(String[] args) throws IOException {
        Job job = Job.getInstance(new Configuration());
        job.setJarByClass(DemoDriver.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(BytesWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(BytesWritable.class);
        // 设置使用我们自定义的 DemoInputFormat
        job.setInputFormatClass(DemoInputFormat.class);
        // 因为 DemoInputFormat 继承了 FileInputFormat,所以可以使用 FileInputFormat 设置
        FileInputFormat.setInputPaths(job, new Path("/Users/renfei/Downloads/demo.txt"));
        FileOutputFormat.setOutputPath(job, new Path("/Users/renfei/Downloads/demoout"));
    }
}



商业用途请联系作者获得授权。
版权声明:本文为博主「任霏」原创文章,遵循 CC BY-NC-SA 4.0 版权协议,转载请附上原文出处链接及本声明。
原文链接:https://www.renfei.net/posts/1003473
评论与留言
以下内容均由网友提交发布,版权与真实性无法查证,请自行辨别。

本站有缓存策略,时间约2小时后能看到您的评论。本站使用自动审核机制,如果您的内容包含广告/谩骂/恐怖/暴力/涉政等不和谐内容将无法展示!


本站有缓存策略,时间约2小时后能看到您的评论。本站使用自动审核机制,如果您的内容包含广告/谩骂/恐怖/暴力/涉政等不和谐内容将无法展示!

关注任霏博客
扫码关注「任霏博客」微信订阅号
微博:任霏博客网
Twitter:@renfeii
Facebook:任霏
最新留言 优先级低的并不代表一定要等到优先级高的运行完才能运行,只是cpu分配的资源少了而已。 /lib64/ld-linux-x86-64.so.2: No such file or directory 报了这个错误,怎么解决呢 对于一个布道 DevOps 多年的选手来讲,看到这个报告,还是想继续布道布道。虽然是各种对比哈,但是我感觉与 DevOps 太像了(可能是职业病犯了哈)。首先声明本人不是GitLab 用户(因为不免费,没法薅羊毛啊),本人是 GitHub 忠实用户。 首先,你这是田忌赛马的对比,中文对比一事,着实有点可笑 1 土生土长和外来户能立马拉到同一个起跑线上吗? 2 一个真正的开发者应该去提升自己的英语能力,而不是拿全部是中文文档说事。大家都知道现在开源非常热,开发者是开源的主力军,如果要贡献优秀的开源项目(诸如Linux 内核,Kubernetes),英语就是个硬门槛。如果我是你,我倒希望公司内部的系统是英文的,最起码能让我锻炼英语,在看开源项目文档的时候不至于看不懂,提 PR 的时候不至于提交代码的内容描述不清楚而没法被 Merge。 其次,阿里云效、Coding 大家都知道背后站的是谁,很容易造成厂商绑定,现在很多企业都希望不要被厂商绑定。 再者,有一个点需要明白,GitLab 是一个 DevOps 平台,什么叫做 DevOps 平台(DevOps 走到现在,确切的说应该叫做 DevSecOps)?就是覆盖了软件开发生命周期全阶段的,从项目管理到代码托管到安全再到日志监控、甚至包含现在的云原生能力。不仅仅是说一个 CI/CD 就能概括的了的。这一点是 DevOps 布道的真正误区,我见过太多了,我在这儿再布道一哈,CI/CD 不等于 DevOps,他只是 DevOps 落地实践的核心能力。仅凭借一个 CI/CD 能有现成模版就判断出哪个好坏,过于牵强了吧。相信大家真正到项目用的时候,模版是满足不了要求的吧,毕竟大家都很特性化。 最后,还是一个很热的话题,开源,open source。GitLab 是开源的,Coding 和 云效这方面我没看到相关的开源内容(可能是我孤陋寡闻)。大家可以看看国内有多少用 GitLb 的,GitLab 的 CE 版,然后私有化部署,就是很多公司的代码托管 + DevOps 解决方案。 个人愚见,做一些对比报告的时候,还是先需要明白这个产品的定位,去深入挖掘一些真正有意义的对比,这样的对比报告才能有意义。作为一个常年写博客、文章的人来说。你写的每个字、每篇文章,你要想到你的思想会影响到别人。有可能因为你的片面之词,让别人错失一些学习的好机会。 docker run 那一长串后,出来一个字符串,然后去 docker containers 下面看 显示 exited(1);logs 下就一行错误 initdb failed 感谢🙏,第一个问题是空格的问题应该,我逐字敲完后可以构建了.第二个问题是我docker环境的问题,docker更新为最新版后需要重置配置文件.现已经正常使用,再次感谢您的分享和您的细心解答,期待下次相遇😄 还有一个问题可以请教下吗?就是我在容器里建文件夹没有权限,su root后密码不知道是多少,sudo mkdir xxx 提示我,没有sudo命令,请问有好的解决方法吗?谢谢解答 -v 后面可以指定文件吗 我的也是报错,还有。我执行了这个:@localhost kingbase-es-v8-r3-docker % docker run -d --name kingbase -p 54321:54321 -e SYSTEM_PWD=SYSTEM -v /opt/kingbase/data:/opt/kingbase/data -v /opt/kingbase:/opt/kingbase/Server/bin kingbase:v8r3 docker: 'run -d --name kingbase -p 54321:54321 -e SYSTEM_PWD=SYSTEM -v /opt/kingbase/data:/opt/kingbase/data -v /opt/kingbase:/opt/kingbase/Server/bin kingbase:v8r3' is not a docker command. See 'docker --help' 麻烦帮忙看下,是不是我写的命令有问题,还是版本问题,谢谢啦 请问我build的时候一直报错,是资源没了吗?failed to solve with frontend dockerfile.v0: failed to create LLB definition: failed to do request: Head "https://reg-mirror.qiniu.com/v2/library/centos/manifests/7?ns=docker.io": Moved Permanently 能不能在代码那里详细解释一下啊,没完全懂呀 en 按照路径上的来操作的,但是启动时一直报:zsh: no such file or directory: docker run -d --name kingbase -p 54321:54321 -e SYSTEM_PWD=SYSTEM -v /Volumes/installation/opt/kingbase/data:/opt/kingbase/data -v /Volumes/installation/opt/kingbase/bin/license.dat:/opt/kingbase/Server/bin/license.dat kingbase:v8r3 错误