1、n插上翅膀的大象(Spark on Hadoop)内容n三种计算框架nSpark产生背景nSpark特点nSpark生态系统nSpark核心概念RDDnSpark程序设计实例n进一步理解Spark核心概念RDD三种计算框架n根据伯克利大学提出的关于数据分析的软件栈BDAS(Berkeley Data Analytics Stack),目前的大数据处理可以分为如以下三个类型 n批处理计算n流式计算n交互式计算批处理(Batch)计算n成批处理数据,特点是吞吐量大,但处理速度慢,实时性差。在三种计算框架中实时性最低,响应时间在分钟级到数十分钟级,有时甚至达到数小时。n批处理计算和其它两种计算模式的
2、差别就好比火车和飞机的差别,火车一次载人多,吞吐量大,但速度较慢;飞机一次载人少,但实时性好。n典型实例:MapReduce、Hive、Pign适用场合:n适合PB级以上海量数据的离线处理。比如MapReduce的输入数据必须是静态的(即离线的),不能动态变化。n很多搜索引擎在统计过去一年或半年时间跨度内的最流行的K个搜索词时用到基于批处理的MapReduce计算框架(历史数据分析而不是实时数据分析)。流式计算(Streaming)n快速实时小批量地处理数据。在三种计算框架中实时性最高,响应时间在数百毫秒级到数秒级。n典型实例:Storm、Spark Streaming。n适用场合:n适合处理
3、大量在线流式数据,并返回实时的结果。n比如在电子商务中统计过去1分钟内访问最多的5件商品;n淘宝双11(光棍节)时实时统计网站商品的总交易额;n社交网络趋势追踪;n网站指标统计、点击日志分析等。交互式计算(Interactive)n以近实时方式处理SQL或类SQL交互式数据。在三种计算框架中实时性居中,响应时间在数十秒到数分钟之间。n典型实例:Impala、Spark SQL。n适用场合:n适合以请求-响应的交互方式处理大量结构化和半结构化数据。n比如使用SQL查询结构化的数据,很容易地完成包括商业智能BI在内的各种复杂的数据分析算法。内容n三种计算框架nSpark产生背景nSpark特点nS
4、park生态系统nSpark核心概念RDDnSpark程序设计实例n进一步理解Spark核心概念RDDSpark产生产生背景:MapReduce局限性nMapReduce框架局限性n仅支持Map和Reduce两种操作n迭代计算效率低(如不适应机器学习、图计算等需要大量迭代的计算)n不适合交互式处理(如数据挖掘)n不适合流式处理(如点击日志分析)nMapReduce编程不够灵活n尝试scala函数式编程语言Spark产生产生背景:框架多样化n现有的各种计算框架各自为战n批处理:MapReduce、Hive、Pign流式计算:Stormn交互式计算:Impalan能否有一种灵活的框架可同时进行批处
5、理、流式计算、交互式计算等?产生背景:大统一系统n在一个统一的框架下,进行批处理、流式计算、交互式计算内容n三种计算框架nSpark产生背景nSpark特点nSpark生态系统nSpark核心概念RDDnSpark程序设计实例n进一步理解Spark核心概念RDDSpark特点n高效n内存计算引擎nDAG图n比MapReduce快10100倍n易用n提供了丰富的API,支持Java,Scala,Python等多种等多种语言n同一功能实现,Scala代码量比MapReduce少25倍n与Hadoop集成n读写HDFS/Hbasen与YARN集成Spark特点1:高效MapReducen慢是由于Ma
6、pReduce用磁盘存放中间结果,而且写3份,用户的查询请求需要多次频繁地访问某个数据集时磁盘I/O慢,数据的复制和序列化也导致运行速度慢。Spark特点1:高效SparknSpark快的原因是把输入数据一次性读取到分布式内存中,以后用户的的多次查询和处理都从内存中读写,其速度是MapReduce网络复制和磁盘I/O速度的10到100倍。n由于技术的进步,机器的内存空间越来越大,而且内存条也越来越便宜,使Spark这种基于内存的计算成为可能。n但当数据量很大时,内存空间若放不下,Spark有相关机制可以把热点数据(比如一天读取几百次的数据)放入内存而把非热点数据(比如一个月读取一次的数据)放入
7、磁盘。Spark特点1:高效Sparkn当迭代次数达到30次时,Hadoop的运行时间大约是4000s,而Spark的运行时间大约是400s,相差10倍左右。Spark特点2:易用MapReduceSpark特点2:易用SparkSpark特点3:与Hadoop集成内容n三种计算框架nSpark产生背景nSpark特点nSpark生态系统nSpark核心概念RDDnSpark程序设计实例n进一步理解Spark核心概念RDDSpark生态系统生态系统Alluxio(原名Tachyon)n定义:Tachyon(超光子)是一种内存式的文件系统,可以认为是搭建在HDFS上的分布式缓存。它可以在集群里以
8、访问内存的速度来访问存在Tachyon里的文件。n架构:Tachyon是架构在最底层的分布式文件存储和上层的各种计算框架之间的一种中间件。具体地说,是在分布式文件存储系统如HDFS、S3等之上,在Spark、MapReduce、Impala等各种计算框架之下。引入Tachyon的原因n1)提高不同任务或框架间数据交换的速度n不同任务或不同计算框架间的数据共享情况在所难免,例如Spark的分属不同Stage的两个任务,或Spark与MapReduce框架的数据交互。n在这种情况下,一般就需要通过磁盘来完成数据交换,而这通常是效率很低的。n而引入Tachyon中间层后,数据交换实际上是在内存中进行
9、的。引入Tachyon的原因n2)使Spark的执行引擎和存储引擎分开nSpark作为内存计算框架,为什么还需要再加一层内存管理的文件系统?n因为Spark其实只提供了强大的内存计算能力,但未提供存储能力。n那么默认让Spark自己直接在内存管理数据不行吗?n让Spark自己来管理内存会出现的问题。默认情况下,Spark的任务执行和数据本身都在一个进程内。当执行出现问题时就会导致整个进程崩溃,并丢失进程内的所有数据。n而Tachyon这一中间层的引入,就相当于将存储引擎从Spark中抽离出来,从而每个任务进程只负责执行。进程的崩溃不会丢失数据,因为数据都在Tachyon里面了。引入Tachyo
10、n的原因n3)避免数据被重复加载n不同的Spark任务可能会访问同样的数据,例如两个任务都要访问HDFS中的某些Block。这时每个任务都要自己去磁盘加载数据到内存中。n而Tachyon可以只保存一份数据在内存中供加载,而且它还使用堆外内存,避免GC(垃圾收集)开销。Mesos和YARNnMesos是一个开源的资源管理系统,可以对集群中的资源做弹性管理。目前Twitter,Apple等公司在使用Mesos管理集群资源。Apple的siri的后端便是采用Mesos进行资源管理。n目前看来,Hadoop YARN要比Mesos更主流,前景更广阔。nYARN在实现资源管理的前提下,能够跟Hadoop
11、生态系统完美结合。YARN定位为大数据中的数据操作系统,能够更好地为上层各类应用程序(MapReduce/Spark)提供资源管理和调度功能。n另外,非常重要的一点是,YARN的社区力量要比Mesos强大的多,它的参与人员众多,周边系统的建设非常完善。Shark和Spark SQLnSpark SQL 是分布式SQL查询引擎,用于处理交互式数据流,把SQL命令分解成多个任务(Task)交给Hadoop集群处理。n自2013年3月面世以来,Spark SQL已经成为除Spark Core以外最大的Spark组件。n在2014年7月1日的Spark Summit上,Databricks公司宣布终止
12、对Shark的开发,将重点放到Spark SQL上。nSpark SQL将涵盖Shark的所有特性,用户可以从Shark 0.9进行无缝的升级。n除了接过Shark的接力棒,继续为Spark用户提供高性能的SQL on Hadoop解决方案之外,Spark SQL还为Spark带来了通用、高效、多元一体的结构化数据处理能力。nSpark SQL可加载和查询各种数据源,比如Hive数据、Parquet列式存储格式数据、JSON格式数据、通过JDBC和ODBC等连接各种数据源Spark StreamingnSpark Streaming是大规模流式数据处理的新贵,将流式计算分解成一系列短小的批处理
13、作业。nSpark Streaming类似于Apache Storm,用于流式数据的处理。Spark Streaming有高吞吐量和容错能力强这两个特点。nSpark Streaming支持的数据输入源很多,例如:HDFS、Kafka、Flume、Twitter、ZeroMQ和简单的TCP套接字等。数据输入后可以用Spark的高度抽象原语如map、reduce、join、window等进行运算。而结果也能保存在很多地方,如HDFS、数据库等。n另外Spark Streaming也能和MLlib(机器学习)以及GraphX完美融合。Spark Streaming支持的数据输入源和输出地Graph
14、XnSpark GraphX是一个分布式图处理框架,Spark GraphX基于Spark平台提供对图计算和图挖掘简洁易用的而丰富多彩的接口,极大的方便了大家对分布式图处理的需求。n社交网络中人与人之间有很多关系链,例如Twitter、Facebook、微博、微信,这些都是大数据产生的地方,都需要图计算,现在的图处理基本都是分布式的图处理,而并非单机处理。n图的分布式或者并行处理其实是把这张图拆分成很多的子图,然后我们分别对这些子图进行计算,计算的时候可以分别迭代进行分阶段的计算,即对图进行并行计算。nGraphX使用的是一种点和边都带有属性的有向多重图,有Table和Graph两种视图,只需
15、一份物理存储。MLBasenMLBase是Spark生态圈的一部分,专注于机器学习,包含三个组件:MLlib、MLI、ML Optimizer。nMLlib是Spark的机器学习库,包括分类、聚类、回归算法、决策树、推荐算法等各种机器学习的核心算法的实现。nMLI是MLlib的测试床,是用于特征提取和算法开发的实验性API。nMLBase的核心是ML Optimizer,把声明式的任务转化成复杂的学习计划,输出最优的模型和计算结果。nMLBase与其它机器学习系统Mahout的不同:n2014年4月,Mahout告别MapReduce实现,转而采用Spark作为底层实现基础。nMLBase是自
16、动化的,Mahout需要使用者具备机器学习技能,来选择自己想要的算法和参数来做处理。nMLBase提供了不同抽象程度的接口,可以扩充ML(机器学习)算法。内容n三种计算框架nSpark产生背景nSpark特点nSpark生态系统nSpark核心概念RDDnSpark程序设计实例n进一步理解Spark核心概念RDDSpark核心概念RDDnRDD(Resilient Distributed Datasets,弹性分布式数据集)n分布在集群中的只读对象集合,由多个Partition(分区)构成,可以存储在磁盘或内存中(有多种存储级别)。n可以由Scala集合创建RDD,比如nsc.parallel
17、ize(List(1,2,3)/对集合List(1,2,3)进行并行化生成RDD,默认Partition数量为1nsc.parallelize(1 to n,5)/对集合1到n(n可以是很大的整数)进行并行化生成RDD,指定Partition数量为5利用本地文件或HDFS文件文件创建RDDn1.由文本文件(TextInputFormat)创建RDDnsc.textFile(“file.txt”)/将本地文本文件加载成RDDnsc.textFile(“directory/*.txt”)/将某类文本文件加载成RDDnsc.textFile(“hdfs:/nn:9000/path/file”)/将h
18、dfs文件或目录加载成RDDnsc.textFile(“file:/file.txt”)/将本地文本文件加载成RDD,用file:/指定本地文件n2.由sequenceFile文件(SequenceFileInputFormat)创建RDDnsc.sequenceFile(“file.txt”)/将本地二进制文件加载成RDDnsc.sequenceFileString,Int(“hdfs:/nn:9000/path/file”)/将hdfs二进制文件加载成RDDRDDnRDD是只读数据集,不支持修改,不考虑并发和加锁。n那么如何对RDD进行操作呢?n有两种基本操作(operator):nTra
19、nsformation(转换)nAction(行动)TransformationnTransformation(转换)n可通过Scala集合或者Hadoop数据文件构造一个新的RDDn通过已有的RDD产生新的RDD(输入是RDD,输出还是RDD,则为Transformation)n举例:map,filter,groupBy,reduceByAction(行动)n通过RDD计算得到一个或者一组值(输入是RDD,输出不是RDD,而是一个或一组值或写入存储,则为Action)n举例:count,collect,savenTransformation操作和Action操作的重要区别:n无论执行了多少次
20、Transformation操作,RDD都不会真正执行运算(只做标记,不触发计算),只有当Action操作被执行时,运算才会触发。n这也称为Spark的惰性执行,即一段Spark代码不会执行,直到遇到第一个Action n惰性执行的好处是优化的概率提高,即看到的步骤越多,最终执行时优化的可能性就越高。两段代码执行效果有何不同nbeacons=spark.textFile(“hdfs:/.”)ncachedBeacons=beacons.cache()ncachedBeacons.filter(_.contains(“HouseOfCards”)ncachedBeacons.filter(_.c
21、ontains(“GameOfThrone”)nnbeacons=spark.textFile(“hdfs:/.”)ncachedBeacons=beacons.cache()ncachedBeacons.filter(_.contains(“HouseOfCards”).countncachedBeacons.filter(_.contains(“GameOfThrone”).countnn这两段代码都从HDFS文件中生成RDD,然后用cache操作把RDD数据缓存到内存,接着执行了两个filter操作,对缓存到内存中的RDD数据进行过滤,找出包含特定字符串的数据。n但第二段代码在filte
22、r操作之后用了count操作,注意到count是Action操作,所以会真正触发计算,从而得到包含特定字符串的数据个数。Operator示例nRDD1包含集合1到7的整数,由3个Partition组成。经过第一个map(+1)操作,生成RDD2,其中集合的每个整数加1,但该操作的结果并不真的生成,而是只做了标记,不在内存或磁盘生成该中间结果。因为map是Transformation操作。nRDD2接着执行SAVEASTEXTFILE(“HDFS:/”)操作,该操作是Action操作,因此计算被触发,把RDD中的加1后的数据保存到存储系统HDFS指定的文件中。RDD transformation
23、举例举例n1)/创建RDD,用集合创建,用集合创建RDDnval nums=sc.parallelize(List(1,2,3)n2)/将RDD传入map函数,求变量的平方,生成新的RDDnval squares=nums.map(x=x*x)/1,4,9n3)/对RDD中元素进行过滤,求模2结果为0的偶数,生成新的RDDnval even=squares.filter(_%2=0)/4n4)/利用利用flatMap将一个元素映射成多个,生成新的RDDnnums.flatMap(x=1 to x)/=1,1,2,1,2,3RDD Action举例举例n1)/用集合用集合创建新的RDDnval
24、nums=sc.parallelize(List(1,2,3)n2)/将RDD保存为本地数组,collect将分布式的将分布式的RDD返回为一个单返回为一个单机的数组,机的数组,返回到driver程序所在的机器程序所在的机器nnums.collect()/=Array(1,2,3)n3)/返回前K个元素,返回一个数组,该操作目前并非并行执行,而是由driver程序所在机器执行的nnums.take(2)/=Array(1,2)n4)/计算元素总数nnums.count()/=3n5)/合并集合元素nnums.reduce(_+_)/=6n6)/将RDD写到HDFS中nnums.saveAsTe
25、xtFile(“hdfs:/nn:8020/output”)nnums.saveAsSequenceFile(“hdfs:/nn:8020/output”)Key/Value类型的RDDn1)val pets=sc.parallelize(List(“cat”,1),(“dog”,1),(“cat”,2)n2)pets.reduceByKey(_+_)/=(cat,3),(dog,1)n3)pets.groupByKey()/=(cat,Seq(1,2),(dog,Seq(1)n4)pets.sortByKey()/=(cat,1),(cat,2),(dog,1)Transformation和
26、和Action内容n三种计算框架nSpark产生背景nSpark特点nSpark生态系统nSpark核心概念RDDnSpark程序设计实例n进一步理解Spark核心概念RDD实例1:wordcountn/导入相关类,_相当于*号,即导入一个包中所有的类nimport org.apache.spark._nimport SparkContext._n/定义对象WordCountnobject WordCount n/定义main函数,冒号后为数据类型ndef main(args:ArrayString)n/如果输入的参数个数不等于3,则打印输入命令格式的提示信息,n/参数0为master地址,参
27、数1为输入数据所在目录,比如:n/hdfs:/host:port/input/data,参数2为数据输出目录,比n/如:hdfs:/host:port/output/datanif(args.length!=3)nprintln(usage is org.test.WordCount )nreturnn实例1:wordcountn/创建SparkContext对象,封装了spark执行环境信息,依次包括n/master地址、作业名称、Spark安装目录、作业依赖的jar包nval sc=new SparkContext(args(0),WordCount,System.getenv(SPARK
28、_HOME),Seq(System.getenv(SPARK_TEST_JAR)n/用输入的文本文件创建RDDnval textFile=sc.textFile(args(1)n/单词计数的程序主体。其中split根据给定的正则表达式的匹配拆分n/字符串,s表示空格、回车、换行等分隔符,+表示一个或多个的意n/思nval result=textFile.flatMap(line=line.split(s+).map(word=(word,1).reduceByKey(_+_)n/把单词计数的结果存放到输出文件中nresult.saveAsTextFile(args(2)nnSpark程序设计基
29、本流程n1)创建SparkContext对象n这是Spark程序的入口,封装了spark执行环境信息。有点类似数据库编程中的建立连接操作。n2)创建RDDn可从Scala集合或Hadoop数据文件上创建n3)在RDD之上进行转换和actionnSpark提供了多种转换和action函数n4)返回结果n保存到HDFS中,或直接打印出来Spark程序设计Scalan用函数式编程的方式处理集合:n1)var list=List(1,2,3)n/“=”操作的含义可理解为把左边的变量装到右边的表达式中操作的含义可理解为把左边的变量装到右边的表达式中n2)list.foreach(x=println(x)
30、/打印1,2,3nlist.foreach(println)/与上式等价,形式更简练与上式等价,形式更简练n3)list.map(x=x+2)/=List(3,4,5)nlist.map(_+2)/与上式等价。此处与上式等价。此处“_”称为占位符,对集合称为占位符,对集合中的每个元素作用一次。中的每个元素作用一次。n4)list.filter(x=x%2=1)/=List(1,3)nlist.filter(_%2=1)/与上式等价。与上式等价。n5)list.reduce(x,y)=x+y)/6nlist.reduce(_+_)/与上式等价。与上式等价。Wordcount程序运行过程实例程序运
31、行过程实例实例2:分布式估算Pi-蒙特卡洛算法n它的具体定义是:在广场上画一个边长一米的正方形,在正方形内部随意用粉笔画一个不规则的形状,现在要计算这个不规则图形的面积,怎么计算呢?n蒙特卡洛(Monte Carlo)方法告诉我们,均匀的向该正方形内撒N(N 是一个很大的自然数)个黄豆,随后数数有多少个黄豆在这个不规则几何形状内部,比如说有M个,那么,这个奇怪形状的面积便近似于M/N,N越大,算出来的值便越精确。n在这里我们要假定豆子都在一个平面上,相互之间没有重叠。n蒙特卡洛方法可用于近似计算圆周率公式推导n1)假设正方形边长为d,则:n正方形面积为:d*dn圆的面积为:pi*(d/2)*(
32、d/2)n正方形与圆两者面积之比为:4/pin2)随机产生位于正方形内的点n个,假设落到圆中的有count个,则:nPi=4*count/nn3)当n-时,Pi逼近真实值分布式估算Pi调用“Spark Pi”程序程序n/定义对象SparkPinobject SparkPi n/定义函数定义函数mainndef main(args:ArrayString)n/创建创建SparkConf,封装了,封装了Spark配置信息:应用程配置信息:应用程序名称序名称nval conf=new SparkConf().setAppName(Spark Pi)n/创建创建SparkContext,封装了调度器等
33、信息封装了调度器等信息nval spark=new SparkContext(conf)分布式估算Pi-计算n/启动一定数量的启动一定数量的map task进行并行处理,默认数进行并行处理,默认数量为量为2nval slices=if(args.length 0)args(0).toInt else 2n/初始化初始化nnval n=100000*slicesn/计算落入园内的点数计算落入园内的点数nval count=spark.parallelize(1 to n,slices).map i=nval x=random*2-1nval y=random*2-1nif(x*x+y*y nva
34、l x=random*2-1nval y=random*2-1nif(x*x+y*y val data=Array(1,2,3,4,5)/产生datanscala val distData=sc.parallelize(data)/将data处理成RDDnscala distData.reduce(_+_)提交Spark程序(运行在YARN上)nexport YARN_CONF_DIR=/opt/hadoop/yarn-client/etc/hadoopnbin/spark-submit n-master yarn-cluster n-class com.hulu.examples.Spark
35、Pi n-name sparkpi n-driver-memory 2g n-executor-memory 3g n-executor-cores 2 n-num-executors 2 n-queue spark n-conf spark.pi.iterators=500000 n-conf spark.pi.slices=10 n$FWDIR/target/scala-2.10/spark-example-assembly-1.0.jarnMaster参数指定程序运行模式,比如local,yarn-client,yarn-cluster。该参数必选。nclass参数指定应用程序主类。该参
36、数必选。nname参数指定作业名称。该参数可选。ndriver-memory参数指定Driver需要的内存。该参数可选,默认为512MBnexecutor-memory参数指定每个executor需要的内存。该参数可选,默认为1GBnexecutor-cores参数指定每个executor线程数,相当于每个executor中的task数。该参数可选,默认为1nnum-executors参数指定需启动的Executor总数。该参数可选,默认为2nqueue参数指定提交应用程序给哪个YARN队列。该参数可选,默认为default队列。nconf参数指定用户自定义配置。n最后一行指定用户应用程序所在
37、jar包,必选,且一定要放在整个spark-submit提交命令的最后。提交Spark程序(运行在YARN上)内容n三种计算框架nSpark产生背景nSpark特点nSpark生态系统nSpark核心概念RDDnSpark程序设计实例n进一步理解Spark核心概念RDDRDD与DAGnSpark的计算发生在RDD的action操作,而对action之前的所有transformation,Spark只是记录下RDD生成的轨迹,而不会触发真正的计算。nSpark内核会在需要计算发生的时刻绘制一张关于计算路径的有向无环图,也就是DAG。即Action算子触发之后,将以所有累积的transformat
38、ion算子之间的RDD为节点形成一个有向无环图DAG。n举个例子,在图2中,从输入中逻辑上生成A和C两个RDD,经过一系列transformation操作,逻辑上生成了F,注意,我们说的是逻辑上,因为这时候计算没有发生,Spark内核做的事情只是记录了RDD的生成和依赖关系。当F要进行输出时,也就是F进行了action操作,Spark会根据RDD的依赖生成DAG,并从起点开始真正的计算。n图2 逻辑上的计算过程:DAG 划分Stagen有了计算的DAG图,Spark内核下一步的任务就是根据DAG图将计算划分成任务集,也就是Stage,这样可以将任务提交到计算节点进行真正的计算。nSpark计算
39、的中间结果默认是保存在内存中的,Spark在划分Stage的时候会充分考虑在分布式计算中可流水线计算(pipeline)的部分来提高计算的效率,而在这个过程中,主要的根据就是RDD的依赖类型。划分Stagen根据不同的transformation操作,RDD的依赖可以分为窄依赖(Narrow Dependency)和宽依赖(Wide Dependency,在代码中为ShuffleDependency)两种类型。n窄依赖指的是生成的RDD中每个partition只依赖于父RDD(s)固定的partition。n宽依赖指的是生成的RDD的每一个partition都依赖于父 RDD(s)所有part
40、ition。n窄依赖典型的操作有map,filter,union等,宽依赖典型的操作有groupByKey,sortByKey等。n可以看到,宽依赖往往意味着shuffle操作,这也是Spark划分stage的主要边界。n对于窄依赖,Spark会将其尽量划分在同一个stage中,因为它们可以进行流水线计算。宽依赖和窄依赖n图3 RDD的宽依赖和窄依赖 nRDD内的方框代表分区Partition Stage划分n我们再通过图4详细解释一下Spark中的Stage划分。n我们从HDFS中读入数据生成3个不同的RDD,通过一系列transformation操作后再将计算结果保存回HDFS。n可以看到这幅DAG中只有join操作是一个宽依赖,Spark内核会以此为边界将其前后划分成不同的Stage.n同时我们可以注意到,在图中Stage2中,从map到union都是窄依赖,这两步操作可以形成一个流水线操作,通过map操作生成的partition可以不用等待整个RDD计算结束,而是继续进行union操作,这样大大提高了计算的效率。n图4 Spark中的Stage划分 nA、B、C、D等代表不同的RDD,RDD内的方框代表分区Partition