Hadoop入门教程(十三):Hadoop MapReduce 的切片与并行
在前面《Hadoop入门教程(十一):编程的方式使用 MapReduce 实现 WordCount 案例》中我们使用编程的方式体验了 MapReduce,MapReduce 的工作流程大概是下面这样的:
InputFormat:将文件整理成[K,V]值
Shuffle:将同Key的数据合并整理
OutputFormat:把[K,V]值输出
InputFormat数据输入
InputFormat 需要将文件整理成[K,V]值,而文件又以数据库块 Block 存储在各个集群节点上,那么它如何能做到高效处理呢?那就需要将文件切分给各个节点并行执行任务。
怎么切片
之前的文章中讲过一个数据库块 Block 默认的大小是 128M,说明文件都是以 128M 一块分布在集群的各个节点上,那我们如何切片才能提高效率呢?
平均分?加入有一个 300M 的文件需要处理,平均分成 3个 100M 处理?看似好像这样负载在各个节点很均衡,但其实不然,因为:
第一个数据库块 Block 切分 100M 以后剩余 28M,需要传给第二个节点,第二个节点 28M+128M 再切 100M 剩余 56M 又需要传输给下一个节点,这就增加了大量的 网络IO,而网络带宽也是十分宝贵的资源,不但浪费网络还会增加任务处理时间。
所以默认就是一个一个数据库块 Block 的大小作为切片大小,任务的Map阶段在客户端提交任务时,就做好了数据切片的规划,这个可以在源码中看到,我列一下我跟踪的路线:
先是 job.waitForCompletion(true),然后到达 org.apache.hadoop.mapreduce.Job#waitForCompletion,在里面有 this.submit(),到达 org.apache.hadoop.mapreduce.Job#submit,里面又执行了一句:
this.status = (JobStatus)this.ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
public JobStatus run() throws IOException, InterruptedException, ClassNotFoundException {
return submitter.submitJobInternal(Job.this, Job.this.cluster);
}
});
主要是 submitter.submitJobInternal(Job.this, Job.this.cluster) 这句去到了:org.apache.hadoop.mapreduce.JobSubmitter#submitJobInternal,在这里面执行了一句:int maps = this.writeSplits(job, submitJobDir),这个就是切片的代码了,maps就是我们要切成几片,启动几个 MapTask,继续下钻进去:
private int writeSplits(JobContext job, Path jobSubmitDir) throws IOException, InterruptedException, ClassNotFoundException {
JobConf jConf = (JobConf)job.getConfiguration();
int maps;
if (jConf.getUseNewMapper()) {
maps = this.writeNewSplits(job, jobSubmitDir);
} else {
maps = this.writeOldSplits(jConf, jobSubmitDir);
}
return maps;
}
里面执行了 maps = this.writeNewSplits(job, jobSubmitDir),也就是 org.apache.hadoop.mapreduce.JobSubmitter#writeNewSplits,重点来啦,注意看里面的代码:
private <T extends InputSplit> int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = job.getConfiguration();
InputFormat<?, ?> input = (InputFormat)ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
List<InputSplit> splits = input.getSplits(job);
T[] array = (InputSplit[])((InputSplit[])splits.toArray(new InputSplit[splits.size()]));
Arrays.sort(array, new JobSubmitter.SplitComparator());
JobSplitWriter.createSplitFiles(jobSubmitDir, conf, jobSubmitDir.getFileSystem(conf), array);
return array.length;
}
里面这句 List<InputSplit> splits = input.getSplits(job),就是关键内容,执行了 org.apache.hadoop.mapreduce.InputFormat#getSplits,也就是说 InputFormat 提供了切片的方法,这就引入了我们开篇的结构,我们需要进一步了解 InputFormat 的相关知识,完美回到主题,我都感觉快跑偏了。
org.apache.hadoop.mapreduce.InputFormat 是个抽象类,所以我们需要找它的实现,比较常见的就是对文件的操作,也就是 org.apache.hadoop.mapreduce.lib.input.FileInputFormat,他实现了这个接口,我们看看 FileInputFormat 的 getSplits,代码太多我就不粘了啊,直奔重点,里面执行了一句:long splitSize = computeSplitSize(blockSize, minSize, maxSize);,我们进去看看里面干嘛了:
protected long computeSplitSize(long blockSize, long minSize,
long maxSize) {
return Math.max(minSize, Math.min(maxSize, blockSize));
}
这是啥意思呢?看名字,我们能猜出来,是最小、最大和块大小,最大和块大小之间取最小值,肯定就是块大小,对吧,它不可能超过最大值,然后就是最小值和块大小之间取最大值,那肯定也是块大小,块大小不可能小于最小值吧,所以其实就是取了三个数之间中间的那个值。
说了这么多其实就是想给大家说明 InputFormat 干了什么,它也很重要,既然 Hadoop 的 InputFormat 是个抽象类,那意味着什么?是的,我们可以写自己的 InputFormat!
自定义 InputFormat
官方的 InputFormat 我就不再赘述了,大家可以直接看看源码,网上一搜也有。
org.apache.hadoop.mapreduce.InputFormat 其实就两步,getSplits 负责切片,createRecordReader 负责转换成 Mappe r接收的[K,V]值。
由于是入门级的教程,我就不深入探究了,就给大家简单演示一下,更深入的玩法还需要自己去研究。
偷个懒,为了更快的实现自定义 InputFormat,继承 FileInputFormat 来一个,首先新建一个 DemoInputFormat,继承 FileInputFormat,这个时候我们需要重写 createRecordReader 方法:
public class DemoInputFormat extends FileInputFormat<Text, BytesWritable> {
@Override
public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
return new DemoRecordReader();
}
}
这里我们还需要一个 RecordReader,再新建一个类 DemoRecordReader:
public class DemoRecordReader extends RecordReader<Text, BytesWritable> {
private boolean readed = false;
private Text key = new Text();
private BytesWritable value = new BytesWritable();
private FileSplit fileSplit;
private FSDataInputStream inputStream;
/**
* 初始化方法,初始化的时候会被调用一次
*
* @param split
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
// 切片类型转为文件切片
// 此处强转 FileSplit 是因为 net.renfei.hadoop.inputformat.DemoInputFormat 继承了 org.apache.hadoop.mapreduce.lib.input.FileInputFormat
fileSplit = (FileSplit) split;
// 获取切片路径
Path path = fileSplit.getPath();
// 通过路径获取文件系统
FileSystem fileSystem = path.getFileSystem(context.getConfiguration());
// 打开文件流
inputStream = fileSystem.open(path);
}
/**
* 读取下一组 KV 值
*
* @return
* @throws IOException
* @throws InterruptedException
*/
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (readed) {
return false;
} else {
// 读取数据
// 读 Key,因为是演示,Text 也就是 Key 我们就取文件路径了,没什么意义
key.set(fileSplit.getPath().toString());
// 读 Value,直接一次读取完,所以是 fileSplit.getLength()
byte[] buf = new byte[(int) fileSplit.getLength()];
inputStream.read(buf);
value.set(buf, 0, buf.length);
readed = true;
return true;
}
}
/**
* 获取当前读取到的Key
*
* @return
* @throws IOException
* @throws InterruptedException
*/
@Override
public Text getCurrentKey() throws IOException, InterruptedException {
return key;
}
/**
* 读取当前的Value
*
* @return
* @throws IOException
* @throws InterruptedException
*/
@Override
public BytesWritable getCurrentValue() throws IOException, InterruptedException {
return value;
}
/**
* 当前数据读取的进度
*
* @return
* @throws IOException
* @throws InterruptedException
*/
@Override
public float getProgress() throws IOException, InterruptedException {
return readed ? 1 : 0;
}
/**
* 关闭资源
*
* @throws IOException
*/
@Override
public void close() throws IOException {
IOUtils.closeStream(inputStream);
}
}
创建一个 Driver 类:
public class DemoDriver {
/**
* 程序入口
*
* @param args
*/
public static void main(String[] args) throws IOException {
Job job = Job.getInstance(new Configuration());
job.setJarByClass(DemoDriver.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(BytesWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(BytesWritable.class);
// 设置使用我们自定义的 DemoInputFormat
job.setInputFormatClass(DemoInputFormat.class);
// 因为 DemoInputFormat 继承了 FileInputFormat,所以可以使用 FileInputFormat 设置
FileInputFormat.setInputPaths(job, new Path("/Users/renfei/Downloads/demo.txt"));
FileOutputFormat.setOutputPath(job, new Path("/Users/renfei/Downloads/demoout"));
}
}
商业用途请联系作者获得授权。
版权声明:本文为博主「任霏」原创文章,遵循 CC BY-NC-SA 4.0 版权协议,转载请附上原文出处链接及本声明。
相关推荐
猜你还喜欢这些内容,不妨试试阅读一下评论与留言
以下内容均由网友提交发布,版权与真实性无法查证,请自行辨别。微信订阅号
扫码关注「任霏博客」微信订阅号- 你写得非常清晰明了,让我很容易理解你的观点。
- 感谢分享!拿走了~
- 您是说 UCClient 类接收来自Discuz的UCenter的消息吧,请求是来自 Discuz 的 UCenter吗?code 为 null 说明请求URL地址中没有 code 参数 (?code=xxx) ,确定是 UCenter 发起的请求吗?
- String code = request.getParameter("code"); code一直是null 这是为什么啊
- 你好,我想问一下如果是分析型的数据库要怎么制作docker镜像呢 是修改V008R003C002B0320版本号吗
- 可以的,我也正在开发分享的程序,可以邮件或群联系我都可以,关于页面里有联系方式:https://www.renfei.net/page/about 。
- 有破解软件的需要可以私下联系您吗?
- 您好,手机APP只是个客户端,用于数据呈现展示,数据均保存在服务器上,只留个APP没有任何用处,无能为力哦。
- 老哥 看你弄了这么多软件好厉害啊。 我有个软件 我买过几个小会员 没用几天 然后商家跑路了,软件服务器关闭了,连不上去 用不了。 你能做成一个打补丁版本可以本地用的么? 方便看下么?https://haodezhe.lanzouw.com/iD0f30h9joza 谢谢老哥!
- 您好,由于版权投诉和我国知识产权法的完善,我已经下架所有破解软件的下载链接了。
- 生花妙笔信手来 – 基于 Amazon SageMaker 使用 Grounded-SAM 加速电商广告素材生成 [1]
- github.renfei.net 不再完整代理 Github 页面改为代理指定文件
- 优雅的源代码管理(三):本地优雅的使用 Git Rebase 变基
- 优雅的源代码管理(二):Git 的工作原理
- 优雅的源代码管理(一):版本控制系统 VCS(Version Control System)与软件配置管理 SCM(Software Configuration Management)
- ChatGPT 开发商 OpenAI 买下极品域名 AI.com
- 火爆的 AI 人工智能 ChatGPT 国内注册教程、使用方式和收费标准
- 解决 SpringCloud 中 bootstrap.yml 不识别 @activatedProperties@ 参数
- Cron表达式书写教程搞定Linux、Spring、Quartz的定时任务
- 阿里云香港可用区C发生史诗级故障