Hadoop入门教程(十四):Hadoop MapReduce Shuffle 机制
在上一篇教程我们引入了 MapReduce 的执行机制,并粗略的讲了 InputFormat,现在该讲一下 Shuffle 了。Map方法之后,Reduce方法之前的数据处理过程称之为Shuffle。
MapReduce 是 Hadoop 的重点,而 Shuffle 是 MapReduce 的重点,很多神奇的操作都在这里发生,因为是入门类教程,我也只简单的了解一下,引导大家入门,先走进来,深入的部分请各位看官综合搜索引擎其他信息学习。
Partition 分区
MapReduce提供Partitioner接口,它的作用就是根据key或value及reduce的数量来决定当前的这对输出数据最终应该交由哪个ReduceTask处理。默认对key hash后再以ReduceTask数量取模。默认的取模方式只是为了平均reduce的处理能力,如果用户自己对Partitioner有需求,可以订制并设置到job上。
自定义 Partition 分区
想要自定义一个 Partition 分区,只需要继承 Partitioner 就可以开始我们的骚操作了,来个简单的案例,比如按照日志的IP开头进行分区到不同的ReduceTask:
public class MyPartitioner extends Partitioner<Text, DemoEntity> {
@Override
public int getPartition(Text text, DemoEntity demoEntity, int numPartitions) {
// 111.224.80.24 - - [17/Mar/2021:03:17:49 +0000] "GET /dictionary/gender HTTP/1.1" 200 405 "http://www.renfei.net/index.html" "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.100 Safari/537.36"
// 取 IP
String ip = text.toString().split(" ")[0];
// 假设按照 IP 开头的区别分别分区
if (ip.startsWith("192.")) {
return 0;
} else if (ip.startsWith("10.10.")) {
return 1;
} else if (ip.startsWith("10.0.")) {
return 2;
} else {
return 3;
}
}
使用我们自定义的 MyPartitioner,在任务上设置一下:
Job job = Job.getInstance(new Configuration());
job.setJarByClass(PartitionerDriver.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(DemoEntity.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DemoEntity.class);
// 此处只演示 自定义 Partition 分区 的使用
// 设置 ReduceTasks 是 4 个,因为我们分区为 0、1、2、3
job.setNumReduceTasks(4);
job.setPartitionerClass(MyPartitioner.class);
WritableComparable 排序
在之前的案例中,我们拿到的输入都是排序好的,无论是 MapTask 还是 ReduceTask 都会按照 Key 进行排序,不管你是否需要,Hadoop 都会进行排序,默认是按照字典排序,使用的排序算法是快速排序。
我们在前面的《Hadoop入门教程(十二):Hadoop 的 Writable 类》尝试创建了自己的Bean对象,如果想要让这个 Bean 支持排序,我们就需要实现 WritableComparable 接口,并重写 compareTo() 方法:
public class DemoEntity implements WritableComparable<DemoEntity> {
private String ip;
private String path;
private int port;
/**
* 序列化方法
*
* @param dataOutput 框架给我们的数据出口
* @throws IOException
*/
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(ip);
dataOutput.writeUTF(path);
dataOutput.writeInt(port);
}
/**
* 反序列化方法
*
* @param dataInput 框架给我们的数据来源
* @throws IOException
*/
@Override
public void readFields(DataInput dataInput) throws IOException {
ip = dataInput.readUTF();
path = dataInput.readUTF();
port = dataInput.readInt();
}
// 此处省略 Getter/Setter ....
/**
* 排序支持
*
* @param o
* @return
*/
@Override
public int compareTo(DemoEntity o) {
// 假设我们按 port 排序
return Integer.compare(o.getPort(), this.port);
}
}
Combiner 合并
Combiner 是 Mapper 和 Reducer 之外的一种,但 Combiner 的父类是 Reducer,但又跟 Reducer 不太一样。Combiner 是在 MapTask 所在节点运行的,可以为每个 MapTask 的输出结果进行汇总合并,减少网络IO,就像压缩功能一样,但默认不会使用 Combiner,因为他会修改 MapTask 的输出结果,如果要使用 Combiner 前提必须是不能影响业务结果!所以得根据自己的业务场景来决定是否使用 Combiner。
如果你需要自己定义一个 Combiner 并使用它,可以继承R educer,重写Reduce方法,然后:job.setCombinerClass(MyCombiner.class);,由于是入门级别,主要是我懒得写了,这个教程写了半个月了,在这就不做演示了。
GroupingComparator分组
分组呢也好理解,有的时候我们有一个销售订单数据,想要知道每个月最高金额的订单是哪个,那我们就是按月份进行分组了,自定义类继承 WritableComparator,重写compare()方法,这里我也懒得演示了,各位用的时候可以再查。
商业用途请联系作者获得授权。
版权声明:本文为博主「任霏」原创文章,遵循 CC BY-NC-SA 4.0 版权协议,转载请附上原文出处链接及本声明。
相关推荐
猜你还喜欢这些内容,不妨试试阅读一下评论与留言
以下内容均由网友提交发布,版权与真实性无法查证,请自行辨别。微信订阅号
扫码关注「任霏博客」微信订阅号- 你写得非常清晰明了,让我很容易理解你的观点。
- 感谢分享!拿走了~
- 您是说 UCClient 类接收来自Discuz的UCenter的消息吧,请求是来自 Discuz 的 UCenter吗?code 为 null 说明请求URL地址中没有 code 参数 (?code=xxx) ,确定是 UCenter 发起的请求吗?
- String code = request.getParameter("code"); code一直是null 这是为什么啊
- 你好,我想问一下如果是分析型的数据库要怎么制作docker镜像呢 是修改V008R003C002B0320版本号吗
- 可以的,我也正在开发分享的程序,可以邮件或群联系我都可以,关于页面里有联系方式:https://www.renfei.net/page/about 。
- 有破解软件的需要可以私下联系您吗?
- 您好,手机APP只是个客户端,用于数据呈现展示,数据均保存在服务器上,只留个APP没有任何用处,无能为力哦。
- 老哥 看你弄了这么多软件好厉害啊。 我有个软件 我买过几个小会员 没用几天 然后商家跑路了,软件服务器关闭了,连不上去 用不了。 你能做成一个打补丁版本可以本地用的么? 方便看下么?https://haodezhe.lanzouw.com/iD0f30h9joza 谢谢老哥!
- 您好,由于版权投诉和我国知识产权法的完善,我已经下架所有破解软件的下载链接了。
- 生花妙笔信手来 – 基于 Amazon SageMaker 使用 Grounded-SAM 加速电商广告素材生成 [1]
- github.renfei.net 不再完整代理 Github 页面改为代理指定文件
- 优雅的源代码管理(三):本地优雅的使用 Git Rebase 变基
- 优雅的源代码管理(二):Git 的工作原理
- 优雅的源代码管理(一):版本控制系统 VCS(Version Control System)与软件配置管理 SCM(Software Configuration Management)
- ChatGPT 开发商 OpenAI 买下极品域名 AI.com
- 火爆的 AI 人工智能 ChatGPT 国内注册教程、使用方式和收费标准
- 解决 SpringCloud 中 bootstrap.yml 不识别 @activatedProperties@ 参数
- Cron表达式书写教程搞定Linux、Spring、Quartz的定时任务
- 阿里云香港可用区C发生史诗级故障