1、 大数据导论第十一章CONTENTS目录PART 01 分布式图计算PART 02 Spark Graphx简介PART 03 Graphx实现PART 04 Graphx实例PART 05 Spark Graphx的优势PART 06 作业PART 01 分布式图计算分布式图计算n数据并行与图并行计算1.数据并行系统,像MapReduce和Spark等计算框架,主要用于对数据集进行各种运算,在数据内部之间关联度不高的计算场景下能够进行很高效的处理。2.图并行计算对存在较高关联度的数据处理非常合适分布式图计算n图存储模式1.边分割:每个顶点都存储一次,但有的边会被打断分到两台机器上。2.点分割
2、:每条边只存储一次,都只会出现在一台机器上。邻居多的点会被复制到多台机器上分布式图计算n图计算模式1.图计算框架基本上都遵循BSP(Bulk Synchronous Parallell)计算模式。2.在BSP中,一次计算过程由一系列全局超步组成3.超步分为三个阶段:本地执行阶段 全局通信阶段 栅栏同步阶段分布式图计算nPregel模型:像顶点一样思考Pregel框架以顶点为中心,对边进行切割,将图数据分成若干个分区,每一个分区包含一组顶点以及由这组顶点为源顶点构成的边,并不断在顶点上进行算法迭代和数据同步分布式图计算nPregel计算过程读取输入数据,初始化图数据,在图数据上运行一系列的超步运
3、算直至整个计算结束,输出结果。当一个节点结束计算之后,该节点停止运行,如果有新任务分配,则又重新开始运行,然后再次停止。当所有节点全部停止运行,并且没有新任务分配的时候,整个算法停止分布式图计算nGAS模型:邻居更新模型GAS模型是以节点为中心的图计算编程模型,某个顶点可能被部署到多台机器上,其中一台机器上的为主顶点(Master),其余机器上的为镜像顶点(Mirror),与主顶点的数据保持同步,将边唯一部署在某一台机器上分布式图计算nGAS模型计算阶段划分 收集阶段:工作顶点的边,可以是出边或入边,也可以同时包含入边和出边,从邻接顶点和自身收集数据,并对收集的数据使用用户定义的函数进行运算。
4、这一阶段对顶点和边都是只读的。执行阶段:镜像顶点将收集阶段的计算结果发送给主顶点,主顶点对从各个镜像顶点收集的数据进行聚集运算,并利用聚集结果和上一步的顶点数据,按照用户定义的更新函数进行计算,更新主顶点的数据,并同步给镜像顶点。在执行阶段中,工作顶点可修改,边不可修改。分发阶段:工作顶点更新完成之后,更新边上的数据,通知对其有依赖的邻接边更新状态。在分发阶段,工作顶点只读,边上数据可写。PART 02 Spark GraphX简介Spark GraphX是一个分布式图处理框架,它是基于Spark平台提供对图计算和图挖掘简洁易用的而丰富的接口,极大的方便了对分布式图处理的需求Spark Gra
5、phX简介nGraphX的应用背景 在社交网络中人与人之间存在有很多关系链,例如微博、微信、QQ、Twitter、Facebook、Linkedin等,这些都是大数据产生的地方,都需要图计算。因为图的结构复杂、数据量大,只有分布式的图处理才能胜任。由于Spark GraphX底层是基于Spark来处理的,所以天然就是一个分布式的图处理系统。Spark GraphX简介nGraphX的框架GraphX的核心抽象是Resilient Distributed Property Graph,一种点和边都带属性的有向多重图。它扩展了Spark RDD的抽象,具有Table和Graph两种视图,而只需要一
6、份物理存储Spark GraphX简介nGraphX的框架Spark GraphX简介nGraphX的设计要点 1.对GraphX视图的所有操作,最终都会转换成其关联的Table视图的RDD操作来完成 2.两种视图底层共用的物理数据,由RDDVertexPartition和RDDEdgePartition这两个RDD组成 3.图的分布式存储采用点分割模式,而且使用partitionBy方法,由用户指定不同的划分策略Spark GraphX简介nGraphX的设计要点PART 03 GraphX的实现GraphX公开一组基本的功能操作以及Pregel API的一个优化。另外,GraphX包含了一
7、个日益增长的图算法和图builders的集合,用以简化图分析任务。GraphX的实现nGraphX的存储模式Graphx使用点分割方式存储图,用三个RDD存储图数据信息:VertexTable(id,data)id为Vertex id,data为Edge data;EdgeTable(pid,src,dst,data)pid为Partion id,src为源顶点id,dst为目的顶点id;RoutingTable(id,pid)id为Vertex id,pid为Partion idGraphX的实现n GraphX的计算模式GraphX的Graph类提供了丰富的图运算符GraphX的实现1.图
8、的缓存 var g=.var prevG:GraphVD,ED=null while(.)prevG=g g=g.()g.cache()prevG.unpersistVertices(blocking=false)prevG.edges.unpersist(blocking=false)GraphX的默认接口只提供unpersistVertices方法。如果要释放边,调用g.edges.unpersist()方法才行,根据GraphX中Graph的不变性,对g做操作并赋回给g之后,g已不是原来的g了,而且会在下一轮迭代使用,所以必须cacheGraphX的实现2.邻边聚合mrTriplets是
9、GraphX中最核心的一个接口,它的计算过程为:Map:应用于每一个Triplet上,生成一个或者多个消息,消息以Triplet关联的两个顶点中的任意一个或两个为目标顶点;Reduce:应用于每一个Vertex上,将发送给每一个顶点的消息合并起来。MrTriplets:最后返回的是一个VertexRDDA,包含每一个顶点聚合之后的消息(类型为A),没有接收到消息的顶点不会包含在返回的VertexRDD中。GraphX的实现2.邻边聚合def mapReduceTripletsA(map:EdgeTripletVD,ED=Iterator(VertexId,A),reduce:(A,A)=A):
10、VertexRDDAGraphX的实现3.进化的Pregel模式这种基于mrTrilets方法的Pregel模式,与标准Pregel的最大区别是,它的第2段参数体接收的是3个函数参数,而不接收messageList。它不会在单个顶点上进行消息遍历,而是将顶点的多个Ghost副本收到的消息聚合后,发送给Master副本,再使用vprog函数来更新点值。消息的接收和发送都被自动并行化处理,无需担心超级节点的问题GraphX的实现3.进化的Pregel模式def pregelA(initialMsg:A,maxIterations:Int,activeDirection:EdgeDirection)
11、(vprog:(VertexID,VD,A)=VD,sendMsg:EdgeTripletVD,ED=Iterator(VertexID,A),mergeMsg:(A,A)=A):GraphVD,EDGraphX的实现/更新顶点 vprog(vid:Long,vert:Vertex,msg:Double):Vertex=v.score=msg+(1-ALPHA)*v.weight /发送消息 sendMsg(edgeTriplet:EdgeTriplet):Iterator(Long,Double)(destId,ALPHA*edgeTriplet.srcAttr.score*edgeTrip
12、let.attr.weight)/合并消息 mergeMsg(v1:Double,v2:Double):Double=v1+v2 GraphX的实现4.图算法工具包GraphX提供了一套图算法工具包,方便用户对图进行分析。目前最新版本已支持PageRank、数三角形、最大连通图和最短路径等6种经典的图算法。这些算法的代码实现,目的和重点在于通用性。如果要获得最佳性能,可以参考其实现进行修改和扩展满足业务需求。另外,研读这些代码,也是理解GraphX编程最佳实践的好方法。PART 04 GraphX实例GraphX实例n 例子介绍有6个人,每个人有名字和年龄,这些人根据社会关系形成8条边,每条边
13、有其属性。在以下例子演示中将构建顶点、边和图,打印图的属性、转换操作、结构操作、连接操作、聚合操作,并结合实际要求进行演示。GraphX实例代码和操作解释 开始开始的第一步是引入Spark和GraphX到项目中:import org.apache.log4j.Level,Loggerimport org.apache.spark.SparkContext,SparkConfimport org.apache.spark.graphx._import org.apache.spark.rdd.RDDGraphX实例代码和操作解释 属性图属性图是一个有向多重图,属性图通过vertex(VD)和ed
14、ge(ED)类型参数化。逻辑上的属性图对应于一对类型化的集合(RDD),这个集合编码了每一个顶点和边的属性。因此,图类包含访问图中顶点和边的成员。属性图类的定义如下:class GraphVD,ED val vertices:VertexRDDVD val edges:EdgeRDDED 属性图在GraphX实例中,假设我们想构造一个包括用户社会关系的属性图。顶点属性可能包含用户名和年龄。我们可以用描述用户关系之间的密切关系的长度来标注边。所得的图形将具有类型签名:GraphX实例代码和操作解释 属性图从RDD集合生成属性图/设置运行环境 val conf=new SparkConf().se
15、tAppName(SimpleGraphX).setMaster(local)val sc=new SparkContext(conf)/设置顶点和边,注意顶点和边都是用元组定义的Array /顶点的数据类型是VD:(String,Int)val vertexArray=Array(1L,(Alice,28),(2L,(Bob,27),(3L,(Charlie,65),(4L,(David,42),(5L,(Ed,55),(6L,(Fran,50)/边的数据类型ED:Int val edgeArray=Array(Edge(2L,1L,7),Edge(2L,4L,2),Edge(3L,2L,4
16、),Edge(3L,6L,3),Edge(4L,1L,1),Edge(5L,2L,2),Edge(5L,3L,8),Edge(5L,6L,3)/构造vertexRDD和edgeRDD val vertexRDD:RDD(Long,(String,Int)=sc.parallelize(vertexArray)val edgeRDD:RDDEdgeInt=sc.parallelize(edgeArray)/构造图GraphVD,ED val graph:Graph(String,Int),Int=Graph(vertexRDD,edgeRDD)GraphX实例代码和操作解释 属性图分别用grap
17、h.vertices和graph.edges成员将一个图解构为相应的顶点和边。println(属性演示)println(找出图中年龄大于30的顶点:)graph.vertices.filter case(id,(name,age)=age 30.collect.foreach case(id,(name,age)=println(s$name is$age)/边操作:找出图中属性大于5的边 println(找出图中属性大于5的边:)graph.edges.filter(e=e.attr 5).collect.foreach(e=println(s$e.srcId to$e.dstId att$
18、e.attr)printlnGraphX实例代码和操作解释 属性图除了属性图的顶点和边视图,GraphX也包含了一个三元组视图,三元视图逻辑上将顶点和边的属性保存为一个RDDEdgeTripletVD,ED,它包含EdgeTriplet类的实例。/triplets 操作,(srcId,srcAttr),(dstId,dstAttr),attr)println(列出边属性 5 的 tripltes:)for(triplet t.attr 5).collect)println(s$triplet.srcAttr._1 likes$triplet.dstAttr._1)println GraphX实
19、例代码和操作解释 属性操作属性图也有基本的集合操作,这些操作采用用户自定义的函数并产生包含转换特征和结构的新图。如RDD的map操作一样,属性图包含下面的操作:class GraphVD,ED def mapVerticesVD2(map:(VertexId,VD)=VD2):GraphVD2,ED def mapEdgesED2(map:EdgeED=ED2):GraphVD,ED2 def mapTripletsED2(map:EdgeTripletVD,ED=ED2):GraphVD,ED2GraphX实例代码和操作解释 结构性操作当前的GraphX仅仅支持一组简单的常用结构性操作。下面
20、是基本的结构性操作列表。class GraphVD,ED def reverse:GraphVD,ED def subgraph(epred:EdgeTripletVD,ED=Boolean,vpred:(VertexId,VD)=Boolean):GraphVD,ED def maskVD2,ED2(other:GraphVD2,ED2):GraphVD,ED def groupEdges(merge:(ED,ED)=ED):GraphVD,EDGraphX实例代码和操作解释 连接操作在许多情况下,有必要将外部数据加入到图中。例如,我们可能有额外的用户属性需要合并到已有的图中或者我们可能想从
21、一个图中取出顶点特征加入到另外一个图中。这些任务可以用join操作完成。下面列出的是主要的join操作。class GraphVD,ED def joinVerticesU(table:RDD(VertexId,U)(map:(VertexId,VD,U)=VD):GraphVD,ED def outerJoinVerticesU,VD2(table:RDD(VertexId,U)(map:(VertexId,VD,OptionU)=VD2):GraphVD2,EDGraphX实例代码和操作解释 相邻聚合图分析任务的一个关键步骤是汇总每个顶点附近的信息。例如我们可能想知道每个用户的追随者的数量
22、或者每个用户的追随者的平均年龄。许多迭代图算法(如PageRank,最短路径和连通体)多次聚合相邻顶点的属性。/*聚合操作 println(聚合操作)println(找出年纪最大的追求者:)val oldestFollower:VertexRDD(String,Int)=userGraph.mapReduceTriplets(String,Int)(/将源顶点的属性发送给目标顶点,map过程 edge=Iterator(edge.dstId,(edge.srcAttr.name,edge.srcAttr.age),/得到最大追求者,reduce过程 (a,b)=if(a._2 b._2)a e
23、lse b )userGraph.vertices.leftJoin(oldestFollower)(id,user,optOldestFollower)=optOldestFollower match case None=s$user.name does not have any followers.case Some(name,age)=s$name is the oldest follower of$user.name.collect.foreach case(id,str)=println(str)printlnGraphX实例代码和操作解释 实用操作下面的代码找出顶点5到其他各顶点最
24、短的边:/*实用操作 println(聚合操作)println(找出5到各顶点的最短:)val sourceId:VertexId=5L/定义源点 val initialGraph=graph.mapVertices(id,_)=if(id=sourceId)0.0 else Double.PositiveInfinity)val sssp=initialGraph.pregel(Double.PositiveInfinity)(id,dist,newDist)=math.min(dist,newDist),triplet=/计算权重 if(triplet.srcAttr+triplet.at
25、tr math.min(a,b)/最短距离 )println(sssp.vertices.collect.mkString(n)GraphX实例代码和操作解释 实用操作下面的代码找出顶点5到其他各顶点最短的边:/*实用操作 println(聚合操作)println(找出5到各顶点的最短:)val sourceId:VertexId=5L/定义源点 val initialGraph=graph.mapVertices(id,_)=if(id=sourceId)0.0 else Double.PositiveInfinity)val sssp=initialGraph.pregel(Double.
26、PositiveInfinity)(id,dist,newDist)=math.min(dist,newDist),triplet=/计算权重 if(triplet.srcAttr+triplet.attr math.min(a,b)/最短距离 )println(sssp.vertices.collect.mkString(n)GraphX实例代码和操作解释PART 05 Spark GraphX的优势 Spark GraphX的优势1.Spark GraphX能够把表格和图进行互相转换2.能够用更少的框架解决更多的问题解释:Spark GraphX基于Spark,可以和MLlib、Spark
27、 SQL等进行协作3.处理效率更高解释:由于基于spark,中间结果不必每次都写入磁盘,结构化数据不必重构,统一了表格和图视图,可以轻松做流水线操作PART 06 作业 作业作业:1.什么是数据并行计算框架,什么是图并行计算框架?各自的主要应用场合是什么?2.图存储的模式有哪两种?各自的优缺点是什么?为什么点分割模式现在更流行?3.BSP(Bulk Synchronous Parallell)计算模式的三个阶段是什么?4.请描述Pregel计算模型?它的主要优缺点是什么?5.图11.17是一个有向图,每一个节点有一个数字,顶点的算法是当节点收到了一个比自己的数字更小的数字,该节点将数字修改成这
28、个较小值,请使用Pregel计算模型画出每一个超步时候各节点的状态值。作业作业:6.什么是GAS模型?其三个处理阶段是什么?7.什么是Spark GraphX?其主要应用背景是什么?8.什么是GraphX的核心抽象?它有哪两种视图?其主要优势是什么?9.GraphX的主要设计要点有哪三条?10.请画出图11.18图的GraphX的属性图,Vertex Table和Edge Table作业作业:作业作业:11.GraphX的属性操作主要有哪些?12.GraphX的结构性操作主要有哪些?13.GraphX的连接操作主要有哪些?14.GraphX的相邻聚合操作的功能是什么?15.GraphX的主要优势有哪三点?16.请编写完成求单词计数的任务。Spark Streaming将监控某目录中的文件,获取在间隔时间段内变化的数据,然后通过Spark Streaming计算出改时间段内单词统计数。谢谢FOR YOUR LISTENINGHandge CO.LTD.2016.12.09