大数据技术原理与应用第七章MapReduce分析课件.ppt

上传人(卖家):三亚风情 文档编号:3335947 上传时间:2022-08-20 格式:PPT 页数:45 大小:4.69MB
下载 相关 举报
大数据技术原理与应用第七章MapReduce分析课件.ppt_第1页
第1页 / 共45页
大数据技术原理与应用第七章MapReduce分析课件.ppt_第2页
第2页 / 共45页
大数据技术原理与应用第七章MapReduce分析课件.ppt_第3页
第3页 / 共45页
大数据技术原理与应用第七章MapReduce分析课件.ppt_第4页
第4页 / 共45页
大数据技术原理与应用第七章MapReduce分析课件.ppt_第5页
第5页 / 共45页
点击查看更多>>
资源描述

1、第七章 MapReduce提纲7.1 概述概述7.2 MapReduce体系结构体系结构7.3 MapReduce工作流程工作流程7.4 实例分析:实例分析:WordCount7.5 MapReduce的具体应用的具体应用7.6 MapReduce编程实践编程实践7.1概述n7.1.1 分布式并行编程n7.1.2 MapReduce模型简介n7.1.3 Map和Reduce函数7.1.1 分布式并行编程“摩尔定律”,CPU性能大约每隔18个月翻一番从2005年开始摩尔定律逐渐失效,需要处理的数据量快速增加,人们开始借助于分布式并行编程来提高程序性能 分布式程序运行在大规模计算机集群上,可以并行

2、执行大规模数据处理任务,从而获得海量的计算能力谷歌公司最先提出了分布式并行编程模型MapReduce,Hadoop MapReduce是它的开源实现,后者比前者使用门槛低很多 7.1.1 分布式并行编程问题:在MapReduce出现之前,已经有像MPI这样非常成熟的并行计算框架了,那么为什么Google还需要MapReduce?MapReduce相较于传统的并行计算框架有什么优势?传统并行计算框架传统并行计算框架MapReduce集群架构/容错性共享式(共享内存/共享存储),容错性差非共享式,容错性好硬件/价格/扩展性刀片服务器、高速网、SAN,价格贵,扩展性差普通PC机,便宜,扩展性好编程/

3、学习难度what-how,难what,简单适用场景实时、细粒度计算、计算密集型批处理、非实时、数据密集型7.1.2 MapReduce模型简介MapReduce将复杂的、运行于大规模集群上的并行计算过程高度地抽象到了两个函数:Map和Reduce编程容易,不需要掌握分布式并行编程细节,也可以很容易把自己的程序运行在分布式系统上,完成海量数据的计算MapReduce采用“分而治之分而治之”策略,一个存储在分布式文件系统中的大规模数据集,会被切分成许多独立的分片(split),这些分片可以被多个Map任务并行处理MapReduce设计的一个理念就是“计算向数据靠拢计算向数据靠拢”,而不是“数据向计

4、算靠拢”,因为,移动数据需要大量的网络传输开销MapReduce框架采用了Master/Slave架构,包括一个Master和若干个Slave。Master上运行JobTracker,Slave上运行TaskTracker Hadoop框架是用Java实现的,但是,MapReduce应用程序则不一定要用Java来写 7.1.3 Map和Reduce函数函数函数输入输入输出输出说明说明Map如:List()如:1.将小数据集进一步解析成一批对,输入Map函数中进行处理2.每一个输入的会输出一批。是计算的中间结果Reduce 如:“a”,输入的中间结果中的List(v2)表示是一批属于同一个k2的

5、value7.2 MapReduce的体系结构MapReduce体系结构主要由四个部分组成,分别是:Client、JobTracker、TaskTracker以及Task7.2 MapReduce的体系结构MapReduce主要有以下4个部分组成:1)Client用户编写的MapReduce程序通过Client提交到JobTracker端用户可通过Client提供的一些接口查看作业运行状态2)JobTrackerJobTracker负责资源监控和作业调度JobTracker 监控所有TaskTracker与Job的健康状况,一旦发现失败,就将相应的任务转移到其他节点JobTracker 会跟踪

6、任务的执行进度、资源使用量等信息,并将这些信息告诉任务调度器(TaskScheduler),而调度器会在资源出现空闲时,选择合适的任务去使用这些资源7.2 MapReduce的体系结构3)TaskTrackerTaskTracker 会周期性地通过“心跳”将本节点上资源的使用情况和任务的运行进度汇报给JobTracker,同时接收JobTracker 发送过来的命令并执行相应的操作(如启动新任务、杀死任务等)TaskTracker 使用“slot”等量划分本节点上的资源量(CPU、内存等)。一个Task 获取到一个slot 后才有机会运行,而Hadoop调度器的作用就是将各个TaskTrack

7、er上的空闲slot分配给Task使用。slot 分为Map slot 和Reduce slot 两种,分别供MapTask 和Reduce Task 使用4)TaskTask 分为Map Task 和Reduce Task 两种,均由TaskTracker 启动7.3MapReduce工作流程n7.3.1 工作流程概述n7.3.2 MapReduce各个执行阶段n7.3.3 Shuffle过程详解7.3.1 工作流程概述图7-1 MapReduce工作流程Shuffle7.3.1 工作流程概述不同的Map任务之间不会进行通信不同的Reduce任务之间也不会发生任何信息交换用户不能显式地从一台

8、机器向另一台机器发送消息所有的数据交换都是通过MapReduce框架自身去实现的7.3.2 MapReduce各个执行阶段7.3.2 MapReduce各个执行阶段HDFS 以固定大小的block 为基本单位存储数据,而对于MapReduce 而言,其处理单位是split。split 是一个逻辑概念,它只包含一些元数据信息,比如数据起始位置、数据长度、数据所在节点等。它的划分方法完全由用户自己决定。关于关于Split(分片)(分片)7.3.2 MapReduce各个执行阶段Reduce任务的数量任务的数量最优的Reduce任务个数取决于集群中可用的reduce任务槽(slot)的数目通常设置比

9、reduce任务槽数目稍微小一些的Reduce任务个数(这样可以预留一些系统资源处理可能发生的错误)Map任务的数量任务的数量Hadoop为每个split创建一个Map任务,split 的多少决定了Map任务的数目。大多数情况下,理想的分片大小是一个HDFS块7.3.3 Shuffle过程详解图7-3 Shuffle过程 1.Shuffle过程简介过程简介7.3.3 Shuffle过程详解2.Map端的端的Shuffle过程过程每个Map任务分配一个缓存MapReduce默认100MB缓存设置溢写比例0.8分区默认采用哈希函数排序是默认的操作排序后可以合并(Combine)合并不能改变最终结果

10、在Map任务全部结束之前进行归并归并得到一个大的文件,放在本地磁盘文件归并时,如果溢写文件数量大于预定值(默认是3)则可以再次启动Combiner,少于3不需要JobTracker会一直监测Map任务的执行,并通知Reduce任务来领取数据合并(Combine)和归并(Merge)的区别:两个键值对和,如果合并,会得到,如果归并,会得到“a”,7.3.3 Shuffle过程详解3.Reduce端的端的Shuffle过程过程Reduce任务通过RPC向JobTracker询问Map任务是否已经完成,若完成,则领取数据Reduce领取数据先放入缓存,来自不同Map机器,先归并,再合并,写入磁盘多个

11、溢写文件归并成一个或多个大文件,文件中的键值对是排序的当数据很少时,不需要溢写到磁盘,直接在缓存中归并,然后输出给Reduce7.3.3 Shuffle过程详解3.Reduce端的端的Shuffle过程过程图7-5 Reduce端的Shuffle过程 7.3.4 MapReduce应用程序执行过程7.4实例分析:WordCountn7.4.1 WordCount程序任务n7.4.2 WordCount设计思路n7.4.3 一个WordCount执行过程的实例7.4.1 WordCount程序任务表7-2 WordCount程序任务程序WordCount输入一个包含大量单词的文本文件输出文件中每

12、个单词及其出现次数(频数),并按照单词字母顺序排序,每个单词和其频数占一行,单词和频数之间有间隔表7-3 一个WordCount的输入和输出实例输入输出Hello WorldHello HadoopHello MapReduceHadoop 1Hello 3MapReduce 1World 17.4.2 WordCount设计思路n首先,需要检查WordCount程序任务是否可以采用MapReduce来实现n其次,确定MapReduce程序的设计思路n最后,确定MapReduce程序的执行过程7.4.3一个WordCount执行过程的实例图7-7 Map过程示意图 7.4.3一个WordCou

13、nt执行过程的实例图7-8 用户没有定义Combiner时的Reduce过程示意图 7.4.3一个WordCount执行过程的实例图7-9 用户有定义Combiner时的Reduce过程示意图 7.5 MapReduce的具体应用MapReduce可以很好地应用于各种计算问题n 关系代数运算(选择、投影、并、交、差、连接)n 分组与聚合运算n 矩阵-向量乘法n 矩阵乘法7.5 MapReduce的具体应用用用MapReduce实现关系的自然连接实现关系的自然连接7.5 MapReduce的具体应用用用MapReduce实现关系的自然连接实现关系的自然连接n 假设有关系R(A,B)和S(B,C)

14、,对二者进行自然连接操作n 使用Map过程,把来自R的每个元组转换成一个键值对b,,其中的键就是属性B的值。把关系R包含到值中,这样做使得我们可以在Reduce阶段,只把那些来自R的元组和来自S的元组进行匹配。类似地,使用Map过程,把来自S的每个元组,转换成一个键值对b,n 所有具有相同B值的元组被发送到同一个Reduce进程中,Reduce进程的任务是,把来自关系R和S的、具有相同属性B值的元组进行合并n Reduce进程的输出则是连接后的元组,输出被写到一个单独的输出文件中7.5 MapReduce的具体应用用用MapReduce实现关系的自然连接实现关系的自然连接7.6 MapRedu

15、ce编程实践n 7.6.1任务要求n 7.6.2编写Map处理逻辑n 7.6.3编写Reduce处理逻辑n 7.6.4 编写main方法n 7.6.5 编译打包代码以及运行程序n 7.6.6 Hadoop中执行MapReduce任务的几种方式7.6.1 任务要求文件文件A的内容如下:的内容如下:China is my motherlandI love China文件文件B的内容如下:的内容如下:I am from China期望结果期望结果如如右侧所示右侧所示:I 2is 1China 3my 1love 1am 1from 1motherland 17.6.2 编写Map处理逻辑Map输入类

16、型为期望的Map输出类型为Map输入类型最终确定为Map输出类型最终确定为public static class MyMapper extends Mapper private final static IntWritable one=new IntWritable(1);private Text word=new Text();public void map(Object key,Text value,Context context)throws IOException,InterruptedException StringTokenizer itr=new StringTokenizer(

17、value.toString();while(itr.hasMoreTokens()word.set(itr.nextToken();context.write(word,one);7.6.3 编写编写ReduceReduce处理逻辑处理逻辑在Reduce处理数据之前,Map的结果首先通过Shuffle阶段进行整理Reduce阶段的任务:对输入数字序列进行求和Reduce的输入数据为Reduce任务的输入数据:”I”,”China”,public static class MyReducer extends Reducer private IntWritable result=new IntW

18、ritable();public void reduce(Text key,Iterable values,Context context)throws IOException,InterruptedException int sum=0;for(IntWritable val:values)sum+=val.get();result.set(sum);context.write(key,result);7.6.3 编写编写ReduceReduce处理逻辑处理逻辑7.6.4 编写main方法public static void main(String args)throws Exception

19、 Configuration conf=new Configuration();/程序运行时参数 String otherArgs=new GenericOptionsParser(conf,args).getRemainingArgs();if(otherArgs.length!=2)System.err.println(Usage:wordcount );System.exit(2);Job job=new Job(conf,word count);/设置环境参数 job.setJarByClass(WordCount.class);/设置整个程序的类名 job.setMapperClas

20、s(MyMapper.class);/添加MyMapper类 job.setReducerClass(MyReducer.class);/添加MyReducer类 job.setOutputKeyClass(Text.class);/设置输出类型 job.setOutputValueClass(IntWritable.class);/设置输出类型 FileInputFormat.addInputPath(job,new Path(otherArgs0);/设置输入文件 FileOutputFormat.setOutputPath(job,new Path(otherArgs1);/设置输出文件

21、 System.exit(job.waitForCompletion(true)?0:1);import java.io.IOException;import java.util.StringTokenizer;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;impor

22、t org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.GenericOptionsParser;public class WordCount/WordCount类的具体代码见下一页完整代

23、码public class WordCount public static class MyMapper extends Mapper private final static IntWritable one=new IntWritable(1);private Text word=new Text();public void map(Object key,Text value,Context context)throws IOException,InterruptedException StringTokenizer itr=new StringTokenizer(value.toStrin

24、g();while(itr.hasMoreTokens()word.set(itr.nextToken();context.write(word,one);public static class MyReducer extends Reducer private IntWritable result=new IntWritable();public void reduce(Text key,Iterable values,Context context)throws IOException,InterruptedException int sum=0;for(IntWritable val:v

25、alues)sum+=val.get();result.set(sum);context.write(key,result);public static void main(String args)throws Exception Configuration conf=new Configuration();String otherArgs=new GenericOptionsParser(conf,args).getRemainingArgs();if(otherArgs.length!=2)System.err.println(Usage:wordcount );System.exit(2

26、);Job job=new Job(conf,word count);job.setJarByClass(WordCount.class);job.setMapperClass(MyMapper.class);job.setReducerClass(MyReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);FileInputFormat.addInputPath(job,new Path(otherArgs0);FileOutputFormat.setOutputP

27、ath(job,new Path(otherArgs1);System.exit(job.waitForCompletion(true)?0:1);7.6.5 编译打包代码以及运行程序实验步骤实验步骤:使用java编译程序,生成.class文件将.class文件打包为jar包运行jar包(需要启动Hadoop)查看结果7.6.5 编译打包代码以及运行程序Hadoop 2.x 版本中的依赖版本中的依赖 jarHadoop 2.x 版本中 jar 不再集中在一个 hadoop-core*.jar 中,而是分成多个 jar,如使用 Hadoop 2.6.0 运行 WordCount 实例至少需要如下

28、三个 jar:$HADOOP_HOME/share/hadoop/common/hadoop-common-2.6.0.jar$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.6.0.jar$HADOOP_HOME/share/hadoop/common/lib/commons-cli-1.2.jar通过命令 hadoop classpath 可以得到运行 Hadoop 程序所需的全部 classpath信息7.6.5 编译打包代码以及运行程序将 Hadoop 的 classhpath 信息添加到 CLASSP

29、ATH 变量中,在/.bashrc 中增加如下几行:export HADOOP_HOME=/usr/local/hadoop export CLASSPATH=$($HADOOP_HOME/bin/hadoop classpath):$CLASSPATH 执行 source/.bashrc 使变量生效,接着就可以通过 javac 命令编译 WordCount.java接着把.class 文件打包成 jar,才能在 Hadoop 中运行:运行程序:7.6.6 Hadoop中执行MapReduce任务的几种方式Hadoop jarPigHivePythonShell脚本在解决问题的过程中,开发效率

30、、执行效率都是要考虑的因素,不要太局限于某一种方法本章小结n 本章介绍了MapReduce编程模型的相关知识。MapReduce将复杂的、运行于大规模集群上的并行计算过程高度地抽象到了两个函数:Map和Reduce,并极大地方便了分布式编程工作,编程人员在不会分布式并行编程的情况下,也可以很容易将自己的程序运行在分布式系统上,完成海量数据集的计算n MapReduce执行的全过程包括以下几个主要阶段:从分布式文件系统读入数据、执行Map任务输出中间结果、通过 Shuffle阶段把中间结果分区排序整理后发送给Reduce任务、执行Reduce任务得到最终结果并写入分布式文件系统。在这几个阶段中,Shuffle阶段非常关键,必须深刻理解这个阶段的详细执行过程n MapReduce具有广泛的应用,比如关系代数运算、分组与聚合运算、矩阵-向量乘法、矩阵乘法等n 本章最后以一个单词统计程序为实例,详细演示了如何编写MapReduce程序代码以及如何运行程序

展开阅读全文
相关资源
猜你喜欢
相关搜索

当前位置:首页 > 办公、行业 > 各类PPT课件(模板)
版权提示 | 免责声明

1,本文(大数据技术原理与应用第七章MapReduce分析课件.ppt)为本站会员(三亚风情)主动上传,163文库仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对上载内容本身不做任何修改或编辑。
2,用户下载本文档,所消耗的文币(积分)将全额增加到上传者的账号。
3, 若此文所含内容侵犯了您的版权或隐私,请立即通知163文库(发送邮件至3464097650@qq.com或直接QQ联系客服),我们立即给予删除!


侵权处理QQ:3464097650--上传资料QQ:3464097650

【声明】本站为“文档C2C交易模式”,即用户上传的文档直接卖给(下载)用户,本站只是网络空间服务平台,本站所有原创文档下载所得归上传人所有,如您发现上传作品侵犯了您的版权,请立刻联系我们并提供证据,我们将在3个工作日内予以改正。


163文库-Www.163Wenku.Com |网站地图|