1、计算机综合实验Spark 简介饶国政 博士 副教授http:/ 计算机科学与技术学院2015年11月19日入门介绍入门介绍什么是什么是spark?Apache Spark是一个是一个围绕速度、易用性和复杂围绕速度、易用性和复杂分析构建的大数据处理框分析构建的大数据处理框架。最初在架。最初在2009年由加州年由加州大学伯克利分校的大学伯克利分校的AMP实实验室开发,被标榜为验室开发,被标榜为“快快如闪电的集群计算如闪电的集群计算”,可,可用来构建大型的、低延迟用来构建大型的、低延迟的数据分析应用程序。它的数据分析应用程序。它拥有一个繁荣的开源社区,拥有一个繁荣的开源社区,并且是目前最活跃的并且是
2、目前最活跃的Apache项目。项目。Spark FrameworkSpark FrameworkSpeedRun programs up to 100 x faster than Hadoop MapReduce in memory,or 10 x faster on disk.2014年年11月,月,Spark在在Daytona Gray Sort 100TB Benchmark竞竞赛中打破了由赛中打破了由Hadoop MapReduce保持的排序记录。保持的排序记录。Spark利用利用1/10的节点数,把的节点数,把100TB数据的排序时间从数据的排序时间从72分钟提高到了分钟提高到了23
3、分钟。分钟。Ease of UseWrite applications quickly in Java,Scala or Python.GeneralityCombine SQL,streaming,and complex analytics.Runs EverywhereSpark runs on Hadoop,Mesos,standalone,or in the cloud.It can access diverse data sources including HDFS,Cassandra,HBase,S3.Spark FrameworkSpark FrameworkProject Co
4、mponentsSpark 1.5.2 released(Nov 9,2015)与与MapReduce相比相比:Spark是基于内存并行大数据框架,中间结果存放到内存,对于迭代数据是基于内存并行大数据框架,中间结果存放到内存,对于迭代数据Spark效率高。效率高。Spark可以避免不必要的排序所带来的开销可以避免不必要的排序所带来的开销支持比支持比Map和和Reduce更多的函数。更多的函数。Spark提供了一个更快、更通用的数据处理平台。和提供了一个更快、更通用的数据处理平台。和Hadoop相比,相比,Spark可以可以让你的程序在内存中运行时速度提升让你的程序在内存中运行时速度提升100倍
5、,或者在磁盘上运行时速度提升倍,或者在磁盘上运行时速度提升10倍。在倍。在100 TB Daytona GraySort比赛中,比赛中,Spark战胜了战胜了Hadoop,它只使用了十分之一的,它只使用了十分之一的机器,但运行速度提升了机器,但运行速度提升了3倍。倍。其他其他优化优化任意操作算子图(任意操作算子图(operator graphs)可以帮助优化整体数据处理流程的大数据查询的延迟计算。可以帮助优化整体数据处理流程的大数据查询的延迟计算。提供简明、一致的提供简明、一致的Scala,Java和和Python API。提供交互式提供交互式Scala和和Python Shell。Spark
6、生态圈生态圈Spark FrameworkSpark FrameworkApache Sparkan open-source cluster computing framework originally developed in the AMPLab at UC BerkeleyIn contrast to Hadoops two-stage disk-based MapReduce paradigm,Sparks in-memory primitives provide performance up to 100 times faster for certain applicationsBy
7、 allowing user programs to load data into a clusters memory and query it repeatedly,Spark is well suited to machine learning algorithmsSpark can interface with a wide variety of file or storage systems,including Hadoop Distributed File System(HDFS),Cassandra,OpenStack Swift,or Amazon S3.In February
8、2014,Spark became an Apache Top-Level Project.Spark FrameworkSpark FrameworkProject ComponentsSpark Core and Resilient Distributed Datasets(RDDs)The fundamental programming abstraction is called Resilient Distributed Datasets,a logical collection of data partitioned across machines.RDDs can be creat
9、ed by referencing datasets in external storage systems,or by applying coarse-grained transformations(e.g.map,filter,reduce,join)on existing RDDs.The RDD abstraction is exposed through a language-integrated API in Java,Python,Scala similar to local,in-process collections.This simplifies programming c
10、omplexity because the way applications manipulate RDDs is similar to manipulating local collections of data.Spark CoreSpark Core包含Spark的基本功能;尤其是定义RDD的API、操作以及这两者上的动作。其他Spark的库都是构建在RDD和Spark Core之上的。RDD介绍介绍 RDD(Resilient Distributed Datasets弹性分布式数据集),是一个只读、分弹性分布式数据集),是一个只读、分区记录的集合,是区记录的集合,是spark中最重要的
11、概念,可以把他理解为一个存储数据的数据结构!中最重要的概念,可以把他理解为一个存储数据的数据结构!在在Spark中一切基于中一切基于RDD。用户可以简单的把。用户可以简单的把RDD理解成一个提供了许多操作接口理解成一个提供了许多操作接口的数据集合,和一般数据集不同的是,其实际数据分布存储于一批机器中(内存或的数据集合,和一般数据集不同的是,其实际数据分布存储于一批机器中(内存或磁盘中)。它的功能还包括容错、集合内的数据可以并行处理等。磁盘中)。它的功能还包括容错、集合内的数据可以并行处理等。RDD是Spark中对数据和计算的抽象,是Spark中最核心的概念,它表示已被分片/划分(partiti
12、on),不可变的并能够被并行操作的数据集合。对RDD的操作分为两种transformation和action。Transformation操作是通过转换从一个或多个RDD生成新的RDD。Action操作是从RDD生成最后的计算结果。在Spark最新的版本中,提供丰富的transformation和action操作,比起MapReduce计算模型中仅有的两种操作,会大大简化程序开发的难度。RDD的生成方式只有两种,一是从数据源读入,另一种就是从其它RDD通过transformation操作转换。一个典型的Spark程序就是通过Spark上下文环境(SparkContext)生成一个或多个RDD,
13、在这些RDD上通过一系列的transformation操作生成最终的RDD,最后通过调用最终RDD的action方法输出结果。每个RDD都可以用下面5个特性来表示,其中后两个为可选的:分片列表(数据块列表)计算每个分片的函数对父RDD的依赖列表对key-value类型的RDD的分片器(Partitioner)(可选)每个数据分片的预定义地址列表(如HDFS上的数据块的地址)(可选)Spark FrameworkSpark FrameworkProject ComponentsSpark SQLSpark SQL is a component on top of Spark Core that
14、introduces a new data abstraction called SchemaRDD,which provides support for structured and semi-structured data.Spark SQL provides a domain-specific language to manipulate SchemaRDDs in Scala,Java,or Python.It also provides SQL language support,with command-line interfaces and ODBC/JDBC server.Spa
15、rk SQL:提供通过Apache Hive的SQL变体Hive查询语言(HiveQL)与Spark进行交互的API。每个数据库表被当做一个RDD,Spark SQL查询被转换为Spark操作。对熟悉Hive和HiveQL的人,Spark可以拿来就用。Spark FrameworkSpark FrameworkProject ComponentsSpark StreamingSpark Streaming leverages Spark Cores fast scheduling capability to perform streaming analytics.It ingests data
16、 in mini-batches and performs RDD transformations on those mini-batches of data.This design enables the same set of application code written for batch analytics to be used in streaming analytics,on a single engine.Spark Streaming:允许对实时数据流进行处理和控制。很多实时数据库(如Apache Store)可以处理实时数据。Spark Streaming允许程序能够像普
17、通RDD一样处理实时数据。Spark FrameworkSpark FrameworkProject ComponentsMLlib Machine Learning LibraryMLlib is a distributed machine learning framework on top of Spark that because of the distributed memory-based Spark architecture is ten times as fast as Hadoop disk-based Apache MahoutMLlib:一个常用机器学习算法库,算法被实现为
18、对RDD的Spark操作。这个库包含可扩展的学习算法,比如分类、回归等需要对大量数据集进行迭代的操作。之前可选的大数据机器学习库Mahout,将会转到Spark,并在未来实现。Spark FrameworkSpark FrameworkProject ComponentsGraphXGraphX is a distributed graph processing framework on top of Spark.It provides an API for expressing graph computation that can model the Pregel abstraction.G
19、raphX:控制图、并行图操作和计算的一组算法和工具的集合。:控制图、并行图操作和计算的一组算法和工具的集合。GraphX扩展扩展了了RDD API,包含控制图、创建子图、访问路径上所有顶点的操作。,包含控制图、创建子图、访问路径上所有顶点的操作。运行时的运行时的SPARK Driver程序启动多个程序启动多个Worker,Worker从文件系从文件系统加载数据并产生统加载数据并产生RDD并并按照不同分区按照不同分区Cache到内存到内存中中逻辑上的计算过程:DAG 主要应用场景配置介绍配置介绍配置配置spark环境步骤:环境步骤:1.SSH无密钥登陆无密钥登陆2.jdk3.scala4.Ha
20、doop5.Spark6.eclipse配置介绍配置介绍:SSH无密钥登陆无密钥登陆为防止权限错误,进入管理员模式:为防止权限错误,进入管理员模式:sudo passwd root两次密码确认两次密码确认suSSH无密钥登陆无密钥登陆1.配置主机名:配置主机名:需要分别将需要分别将linux系统的主机名重命名系统的主机名重命名,重命重命名主机名的方法:名主机名的方法:gedit /etc/hostname通过修改通过修改hostname文件即可,所有节点都文件即可,所有节点都要修改,以示区分。如,主机可分别命名为:要修改,以示区分。如,主机可分别命名为:master,node1,node2,n
21、ode32.查看主机查看主机ip:ifconfig3.修改修改hostsSudo gedit/etc/hosts如图,如图,localhost部分不动,只修改主节点部分不动,只修改主节点和其他节点的主机名和和其他节点的主机名和IP配置介绍配置介绍:SSH无密钥登陆无密钥登陆4.生成密钥:生成密钥:ssh-keygen t rsa两次回车设置无密码两次回车设置无密码,密钥自密钥自动保存在动保存在/root/.ssh/5.在在master节点上节点上 复制复制id_rsa.pub分分发到发到node节点节点:cd.ssh scp id_rsa.pub node1:/home在在node1节点上:节
22、点上:cat/home/id_rsa.pub /root/.ssh/authorized_keys在其他在其他node节点重复这一过程,全部节点重复这一过程,全部结束后在结束后在master节点上:节点上:以以ssh node1的形式,登陆所有节点,的形式,登陆所有节点,如果不需要密码输入则配置成功。登陆如果不需要密码输入则配置成功。登陆后用后用exit登出。登出。图中用图中用slave1代替了代替了node1 配置介绍配置介绍:配置配置jdk和和scala1,去去oracle官网下载官网下载jdk,解压在解压在home目录下目录下2.去去scala官网下载官网下载scala,解压在,解压在h
23、ome目录下目录下3.gedit/etc/profile 添加如下代码添加如下代码 export JAVA_HOME=/home/zhao/jdk1.7.0_80 /jdk解压目解压目录录export JRE_HOME=$JAVA_HOME/jre export CLASSPATH=.:$JAVA_HOME/lib:$JRE_HOME/lib export PATH=$JAVA_HOME/bin:$PATHexport SCALA_HOME=/home/zhao/scala-2.11.7 /scala目目录录export PATH=$SCALA_HOME/bin:$PATH保存关闭后通过保存关
24、闭后通过 source/etc/profile使路径生效使路径生效在所有节点上重复这一过程在所有节点上重复这一过程配置介绍配置介绍:配置配置hadoop1.到到apache官网下载官网下载Hadoop 解压后修改名字为解压后修改名字为hadoop:mv Hadoop-2.2.0 Hadoop2.修改修改profile文件:文件:gedit/etc/profile添加如下代码:添加如下代码:export HADOOP_INSTALL=/home/zhao/hadoopexport PATH=$PATH:$HADOOP_INSTALL/binexport PATH=$PATH:$HADOOP_IN
25、STALL/sbinexport HADOOP_MAPRED_HOME=$HADOOP_INSTALLexport HADOOP_COMMON_HOME=$HADOOP_INSTALLexport HADOOP_HDFS_HOME=$HADOOP_INSTALLexport YARN_HOME=$HADOOP_INSTALL保存关闭后,保存关闭后,source/etc/profile使生效使生效配置介绍配置介绍:配置配置hadoop3进入进入hadoop/etc/hadoop目录,编辑目录,编辑hadoop-env.sh文件文件加入加入 export JAVA_HOME=/home/zhao/
26、jdk1.7.0_804编辑编辑core-site.xml文件,加入如下代码:文件,加入如下代码:fs.default.namehdfs:/master:9000hadoop.tmp.dir/home/zhao/hadoop/tmp 配置介绍配置介绍:配置配置hadoop5.编辑编辑yarn-site.xml文件:文件:yarn.nodemanager.aux-services mapreduce_shuffle yarn.nodemanager.aux-services.mapreduce.shuffle.class org.apache.hadoop.mapred.ShuffleHandl
27、er yarn.resourcemanager.scheduler.address master:8030配置介绍配置介绍:配置配置hadoop yarn.resourcemanager.address master:8032 yarn.resourcemanager.webapp.address master:8088 yarn.resourcemanager.resource-tracker.address master:8031 yarn.resourcemanager.admin.address master:8033配置介绍配置介绍:配置配置hadoop6.在在hadoop根目录下建
28、立文件夹根目录下建立文件夹hdfs,再在再在hdfs中建立两个文件夹中建立两个文件夹 namenode和和datanode进入进入hdoop/etc/hadoop目录,编辑目录,编辑hdfs-site.xml文件文件 dfs.replication 3 dfs.namenode.name.dir file:/home/zhao/hadoop/hdfs/namenode dfs.dataname.data.dir file:/home/zhao/hadoop/hdfs/datanode 配置介绍配置介绍:配置配置hadoop dfs.namenode.secondary.http-address
29、 master:9001 dfs.webhdfs.enabled true 配置介绍配置介绍:配置配置hadoop7.修改修改slaves文件,将从节点名改为文件,将从节点名改为node1node2node38.通过通过cd.返回返回hadoop根目录上一级目录。根目录上一级目录。通过通过scp r./hadoop node1:/home/zhao将将hadoop文件复制文件复制给给node1,其他节点类似。其他节点类似。9.在在master节点上的节点上的hadoop根目录下格式化根目录下格式化nodename./bin/Hadoop namenode format出现出现successfu
30、lly formatted为成功为成功10.启动:启动:在在hadoop根目录下根目录下:./sbin/start-all.sh在在jdk根目录下执行根目录下执行 bin/jps如果有以下几个进程则:如果有以下几个进程则:JpsnameNddesecondaryNameNoderesourceManager在从节点检查,如果有:在从节点检查,如果有:Datanode Jps NodeManager三个进三个进程则成功程则成功./sbin/stop-all.sh为关闭为关闭hadoop命令命令配置介绍配置介绍:配置配置spark1.到到apache官网下载官网下载spark,注意匹配注意匹配ha
31、doop和和scala版本,下载版本,下载prebuild版版2.解压解压 tar xzvf spark-*-bin-Hadoop.tgz3.配置配置conf/spark-env.shexport SCALA_HOME=/home/zhao/scala-2.10.4export SPARK_WORKER_MEMORY=250mexport SPARK_MASTER_IP=192.168.67.129export MASTER=spark:/192.168.67.129:7077这里的这里的ip地址为地址为master节点的节点的4.配置配置slaves文件文件将节点的主机名加到将节点的主机名加
32、到slaves文件中文件中node1node2node3配置介绍配置介绍:配置配置spark启动启动spark:1.先进入先进入hadoop根目录启动根目录启动Hadoop2.进入进入spark根目录,根目录,./sbin/start-all.sh正常状态下的正常状态下的master节点用节点用jps命令查看为:命令查看为:JpsMasterNameNodeSecondaryNameNodeResourceManagerWorker节点为节点为WorkerDataNodeJpsNodeManager配置介绍配置介绍:开发开发spark应用程序环境应用程序环境初试:可以通过初试:可以通过 ./b
33、in/run-example org.apache.spark.examples.SparkPi测试测试开发:可以使用开发:可以使用eclipse搭载搭载Scala插件开发,也可以用插件开发,也可以用spark自带的自带的shell来来开发开发将将Eclipse Scala IDE插件中插件中features和和plugins两个目录下的所有文件拷贝到两个目录下的所有文件拷贝到Eclipse解压后对应的目录中解压后对应的目录中步骤步骤$重新启动重新启动Eclipse,点击,点击eclipse右上角方框按钮,如下图所示,展开后,点右上角方框按钮,如下图所示,展开后,点击击“Other.”,查看是
34、否有,查看是否有“Scala”一项,有的话,直接点击打开,否则进行步一项,有的话,直接点击打开,否则进行步骤骤#操作。操作。步骤步骤#:在:在Eclipse中,依次选择中,依次选择“Help”“Install New Software”,在打开的,在打开的卡里填入卡里填入http:/download.scala-ide.org/sdk/e38/scala29/stable/site,并按回车,并按回车键,可看到以下内容,选择前两项进行安装即可。(由于步骤键,可看到以下内容,选择前两项进行安装即可。(由于步骤3已经将已经将jar包拷贝到包拷贝到eclipse中,安装很快,只是疏通一下)安装完后,
35、重复操作一遍步骤中,安装很快,只是疏通一下)安装完后,重复操作一遍步骤$便可。便可。配置介绍配置介绍:开发开发spark应用程序环境应用程序环境使用使用Scala语言开发语言开发Spark程序程序 在在eclipse中,依次选择中,依次选择“File”“New”“Other”“Scala Wizard”“Scala Project”,创建一个,创建一个Scala工程,并命名为工程,并命名为“SparkScala”。右击右击“SaprkScala”工程,选择工程,选择“Properties”,在弹出的框中,按照下图所示,在弹出的框中,按照下图所示,依次选择依次选择“Java Build Path
36、”“Libraties”“Add External JARs”,导入文,导入文章章“Apache Spark:将:将Spark部署到部署到Hadoop 2.2.0上上”中给出的中给出的assembly/target/scala-2.9.3/目录下的目录下的spark-assembly-0.8.1-incubating-hadoop2.2.0.jar,这个,这个jar包也可以自己编译包也可以自己编译spark生成,放在生成,放在spark目录下的目录下的assembly/target/scala-2.9.3/目录中。目录中。跟创建跟创建Scala工程类似,在工程中增加一个工程类似,在工程中增加一
37、个Scala Class,命名为:,命名为:WordCount,整个工程结构如下:整个工程结构如下:配置介绍配置介绍:开发开发spark应用程序环境应用程序环境WordCount就是最经典的词频统计程序,它将统计输入目录中所有单词出现的总就是最经典的词频统计程序,它将统计输入目录中所有单词出现的总次数,次数,Scala代码如下:代码如下:import org.apache.spark._ import SparkContext._ object WordCount def main(args:ArrayString)if(args.length!=3)println(usage is org.
38、test.WordCount )return val sc=new SparkContext(args(0),WordCount,System.getenv(SPARK_HOME),Seq(System.getenv(SPARK_TEST_JAR)val textFile=sc.textFile(args(1)val result=textFile.flatMap(line=line.split(s+).map(word=(word,1).reduceByKey(_+_)result.saveAsTextFile(args(2)在在Scala工程中,右击工程中,右击“WordCount.scala”,选择,选择“Export”,并在弹出框中选择,并在弹出框中选择“Java”“JAR File”,进而将该程序编译成,进而将该程序编译成jar包包Source:Wikipedia(Tianjin Eye)Questions?大作业提交及验收1月14日下午3:00-5:00提交相关材料联系人:赵振宇 55B3241月15日晚6:00-9:00第一批展示1月22日晚6:00-9:00第二批展示地点:55教学楼A-113实践、认识、再实践、再认识,实践、认识、再实践、再认识,这种形式,循环往复以至无穷这种形式,循环往复以至无穷毛泽东实践论毛泽东实践论