2021-03-24 09:49:27

Hadoop入门教程(十四):Hadoop MapReduce Shuffle 机制


Hadoop入门教程(十四):Hadoop MapReduce Shuffle 机制

在上一篇教程我们引入了 MapReduce 的执行机制,并粗略的讲了 InputFormat,现在该讲一下 Shuffle 了。Map方法之后,Reduce方法之前的数据处理过程称之为Shuffle。

MapReduce 是 Hadoop 的重点,而 Shuffle 是 MapReduce 的重点,很多神奇的操作都在这里发生,因为是入门类教程,我也只简单的了解一下,引导大家入门,先走进来,深入的部分请各位看官综合搜索引擎其他信息学习。

Partition 分区

MapReduce提供Partitioner接口,它的作用就是根据key或value及reduce的数量来决定当前的这对输出数据最终应该交由哪个ReduceTask处理。默认对key hash后再以ReduceTask数量取模。默认的取模方式只是为了平均reduce的处理能力,如果用户自己对Partitioner有需求,可以订制并设置到job上。 

自定义 Partition 分区

想要自定义一个 Partition 分区,只需要继承 Partitioner 就可以开始我们的骚操作了,来个简单的案例,比如按照日志的IP开头进行分区到不同的ReduceTask:

public class MyPartitioner extends Partitioner<Text, DemoEntity> {
@Override
public int getPartition(Text text, DemoEntity demoEntity, int numPartitions) {
    // 111.224.80.24 - - [17/Mar/2021:03:17:49 +0000] "GET /dictionary/gender HTTP/1.1" 200 405 "http://www.renfei.net/index.html" "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.100 Safari/537.36"
    // 取 IP
    String ip = text.toString().split(" ")[0];
    // 假设按照 IP 开头的区别分别分区
    if (ip.startsWith("192.")) {
        return 0;
    } else if (ip.startsWith("10.10.")) {
        return 1;
    } else if (ip.startsWith("10.0.")) {
        return 2;
    } else {
        return 3;
    }
}

使用我们自定义的 MyPartitioner,在任务上设置一下:

Job job = Job.getInstance(new Configuration());
job.setJarByClass(PartitionerDriver.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(DemoEntity.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DemoEntity.class);
// 此处只演示 自定义 Partition 分区 的使用
// 设置 ReduceTasks 是 4 个,因为我们分区为 0、1、2、3
job.setNumReduceTasks(4);
job.setPartitionerClass(MyPartitioner.class);

WritableComparable 排序

在之前的案例中,我们拿到的输入都是排序好的,无论是 MapTask 还是 ReduceTask 都会按照 Key 进行排序,不管你是否需要,Hadoop 都会进行排序,默认是按照字典排序,使用的排序算法是快速排序。

我们在前面的《Hadoop入门教程(十二):Hadoop 的 Writable 类》尝试创建了自己的Bean对象,如果想要让这个 Bean 支持排序,我们就需要实现 WritableComparable 接口,并重写 compareTo() 方法:

public class DemoEntity implements WritableComparable<DemoEntity> {
    private String ip;
    private String path;
    private int port;
    /**
     * 序列化方法
     *
     * @param dataOutput 框架给我们的数据出口
     * @throws IOException
     */
    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeUTF(ip);
        dataOutput.writeUTF(path);
        dataOutput.writeInt(port);
    }
    /**
     * 反序列化方法
     *
     * @param dataInput 框架给我们的数据来源
     * @throws IOException
     */
    @Override
    public void readFields(DataInput dataInput) throws IOException {
        ip = dataInput.readUTF();
        path = dataInput.readUTF();
        port = dataInput.readInt();
    }
    // 此处省略 Getter/Setter ....
    /**
     * 排序支持
     *
     * @param o
     * @return
     */
    @Override
    public int compareTo(DemoEntity o) {
        // 假设我们按 port 排序
        return Integer.compare(o.getPort(), this.port);
    }
}

Combiner 合并

Combiner 是 Mapper 和 Reducer 之外的一种,但 Combiner 的父类是 Reducer,但又跟 Reducer 不太一样。Combiner 是在 MapTask 所在节点运行的,可以为每个 MapTask 的输出结果进行汇总合并,减少网络IO,就像压缩功能一样,但默认不会使用 Combiner,因为他会修改 MapTask 的输出结果,如果要使用 Combiner 前提必须是不能影响业务结果!所以得根据自己的业务场景来决定是否使用 Combiner。

如果你需要自己定义一个 Combiner 并使用它,可以继承R educer,重写Reduce方法,然后:job.setCombinerClass(MyCombiner.class);,由于是入门级别,主要是我懒得写了,这个教程写了半个月了,在这就不做演示了。

GroupingComparator分组

分组呢也好理解,有的时候我们有一个销售订单数据,想要知道每个月最高金额的订单是哪个,那我们就是按月份进行分组了,自定义类继承 WritableComparator,重写compare()方法,这里我也懒得演示了,各位用的时候可以再查。



商业用途请联系作者获得授权。
版权声明:本文为博主「任霏」原创文章,遵循 CC BY-NC-SA 4.0 版权协议,转载请附上原文出处链接及本声明。
原文链接:https://www.renfei.net/posts/1003474
评论与留言
以下内容均由网友提交发布,版权与真实性无法查证,请自行辨别。

本站有缓存策略,时间约2小时后能看到您的评论。本站使用自动审核机制,如果您的内容包含广告/谩骂/恐怖/暴力/涉政等不和谐内容将无法展示!


本站有缓存策略,时间约2小时后能看到您的评论。本站使用自动审核机制,如果您的内容包含广告/谩骂/恐怖/暴力/涉政等不和谐内容将无法展示!

关注任霏博客
扫码关注「任霏博客」微信订阅号
微博:任霏博客网
Twitter:@renfeii
Facebook:任霏
最新留言 优先级低的并不代表一定要等到优先级高的运行完才能运行,只是cpu分配的资源少了而已。 /lib64/ld-linux-x86-64.so.2: No such file or directory 报了这个错误,怎么解决呢 对于一个布道 DevOps 多年的选手来讲,看到这个报告,还是想继续布道布道。虽然是各种对比哈,但是我感觉与 DevOps 太像了(可能是职业病犯了哈)。首先声明本人不是GitLab 用户(因为不免费,没法薅羊毛啊),本人是 GitHub 忠实用户。 首先,你这是田忌赛马的对比,中文对比一事,着实有点可笑 1 土生土长和外来户能立马拉到同一个起跑线上吗? 2 一个真正的开发者应该去提升自己的英语能力,而不是拿全部是中文文档说事。大家都知道现在开源非常热,开发者是开源的主力军,如果要贡献优秀的开源项目(诸如Linux 内核,Kubernetes),英语就是个硬门槛。如果我是你,我倒希望公司内部的系统是英文的,最起码能让我锻炼英语,在看开源项目文档的时候不至于看不懂,提 PR 的时候不至于提交代码的内容描述不清楚而没法被 Merge。 其次,阿里云效、Coding 大家都知道背后站的是谁,很容易造成厂商绑定,现在很多企业都希望不要被厂商绑定。 再者,有一个点需要明白,GitLab 是一个 DevOps 平台,什么叫做 DevOps 平台(DevOps 走到现在,确切的说应该叫做 DevSecOps)?就是覆盖了软件开发生命周期全阶段的,从项目管理到代码托管到安全再到日志监控、甚至包含现在的云原生能力。不仅仅是说一个 CI/CD 就能概括的了的。这一点是 DevOps 布道的真正误区,我见过太多了,我在这儿再布道一哈,CI/CD 不等于 DevOps,他只是 DevOps 落地实践的核心能力。仅凭借一个 CI/CD 能有现成模版就判断出哪个好坏,过于牵强了吧。相信大家真正到项目用的时候,模版是满足不了要求的吧,毕竟大家都很特性化。 最后,还是一个很热的话题,开源,open source。GitLab 是开源的,Coding 和 云效这方面我没看到相关的开源内容(可能是我孤陋寡闻)。大家可以看看国内有多少用 GitLb 的,GitLab 的 CE 版,然后私有化部署,就是很多公司的代码托管 + DevOps 解决方案。 个人愚见,做一些对比报告的时候,还是先需要明白这个产品的定位,去深入挖掘一些真正有意义的对比,这样的对比报告才能有意义。作为一个常年写博客、文章的人来说。你写的每个字、每篇文章,你要想到你的思想会影响到别人。有可能因为你的片面之词,让别人错失一些学习的好机会。 docker run 那一长串后,出来一个字符串,然后去 docker containers 下面看 显示 exited(1);logs 下就一行错误 initdb failed 感谢🙏,第一个问题是空格的问题应该,我逐字敲完后可以构建了.第二个问题是我docker环境的问题,docker更新为最新版后需要重置配置文件.现已经正常使用,再次感谢您的分享和您的细心解答,期待下次相遇😄 还有一个问题可以请教下吗?就是我在容器里建文件夹没有权限,su root后密码不知道是多少,sudo mkdir xxx 提示我,没有sudo命令,请问有好的解决方法吗?谢谢解答 -v 后面可以指定文件吗 我的也是报错,还有。我执行了这个:@localhost kingbase-es-v8-r3-docker % docker run -d --name kingbase -p 54321:54321 -e SYSTEM_PWD=SYSTEM -v /opt/kingbase/data:/opt/kingbase/data -v /opt/kingbase:/opt/kingbase/Server/bin kingbase:v8r3 docker: 'run -d --name kingbase -p 54321:54321 -e SYSTEM_PWD=SYSTEM -v /opt/kingbase/data:/opt/kingbase/data -v /opt/kingbase:/opt/kingbase/Server/bin kingbase:v8r3' is not a docker command. See 'docker --help' 麻烦帮忙看下,是不是我写的命令有问题,还是版本问题,谢谢啦 请问我build的时候一直报错,是资源没了吗?failed to solve with frontend dockerfile.v0: failed to create LLB definition: failed to do request: Head "https://reg-mirror.qiniu.com/v2/library/centos/manifests/7?ns=docker.io": Moved Permanently 能不能在代码那里详细解释一下啊,没完全懂呀 en 按照路径上的来操作的,但是启动时一直报:zsh: no such file or directory: docker run -d --name kingbase -p 54321:54321 -e SYSTEM_PWD=SYSTEM -v /Volumes/installation/opt/kingbase/data:/opt/kingbase/data -v /Volumes/installation/opt/kingbase/bin/license.dat:/opt/kingbase/Server/bin/license.dat kingbase:v8r3 错误