1、图计算流计算什么是流计算流计算处理流程流计算应用实例流计算框架Twitter Storm流计算框架汇总参考资料大数据时代数据处理及业务的变化初期:数据量小,业务简单 少量人力、服务器就可以满足需求过渡期:数据量有所膨胀,业务较复杂 需要增加大量服务器以支撑业务大数据时期:数据量急剧膨胀,业务很复杂 传统方案扛不住,简单的增加服务器已不能满足需求挑战数据量膨胀所带来的质变个性化、实时化的需求流计算来自于一个信念: 数据的价值随着时间的流逝而降低,所以事件出现后必须尽快地对它们进行处理,最好数据出现时便立刻对其进行处理,发生一个事件进行一次处理,而不是缓存起来成一批再处理。流计算的概念:流计算是针
2、对流式数据的实时计算。流式数据(流数据):是指将数据看作数据流的形式来处理。数据流是在时间分布和数量上无限的一系列动态数据集合体;数据记录是数据流的最小组成单元。流数据具有数据实时持续不断到达、到达次序独立、数据来源众多格式复杂、数据规模大且不十分关注存储、注重数据的整体价值而不关注个别数据等特点。流计算应用场景流计算是针对流数据的实时计算,其主要应用在于产生大量流数据、同时对实时性要求高的领域。流计算一方面可应用于处理金融服务如股票交易、银行交易等产生的大量实时数据。另一方面流计算主要应用于各种实时Web服务中,如搜索引擎、购物网站的实时广告推荐,SNS社交类网站的实时个性化内容推荐,大型网
3、站、网店的实时用户访问情况分析等。流计算:对流数据实时分析,从而获取有价值的实时信息主要区别有如下几个方面:流中的数据元素在线到达;系统无法控制将要处理的新到达的数据元素的顺序;数据流的潜在大小也许是无穷无尽的;一旦数据流中的某个元素经过处理,要么被丢弃,要么被归档存储。因此,除非该数据被直接存储在内存中,否则将不容易被检索。相对于数据流的大小,这是一种典型的极小相关。对于一个流计算系统来说,它应达到如下需求:高性能:处理大数据的基本要求,如每秒处理几十万条数据。海量式:支持TB级甚至是PB级的数据规模。实时性:必须保证一个较低的延迟时间,达到秒级别,甚至是毫秒级别。分布式:支持大数据的基本架
4、构,必须能够平滑扩展。易用性:能够快速进行开发和部署。可靠性:能可靠地处理流数据。针对不同的应用场景,相应的流计算系统会有不同的需求,但是,针对海量数据的流计算,无论在数据采集、数据处理中都应达到秒级别的要求。Hadoop的批量化处理是人们喜爱它的地方,但这在某些领域仍显不足,尤其是在例如移动、Web客户端或金融、网页广告等需要实时计算的领域。这些领域产生的数据量极大,没有足够的存储空间来存储每个业务收到的数据。而流计算则可以实时对数据进行分析,并决定是否抛弃无用的数据,而这无需经过Map/Reduce的环节。MapReduce框架为批处理做了高度优化,系统典型地通过调度批量任务来操作静态数据
5、,任务不是常驻服务,数据也不是实时流入;而数据流计算的典型范式之一是不确定数据速率的事件流流入系统,系统处理能力必须与事件流量匹配。数据流实时处理的模式决定了要和批处理使用非常不同的架构,试图搭建一个既适合流式计算又适合批处理的通用平台,结果可能会是一个高度复杂的系统,并且最终系统可能对两种计算都不理想。基于MapReduce的业务不得不面对处理延迟的问题。有一种想法是将基于MapReduce的批量处理转为小批量处理,将输入数据切成小的片段,每隔一个周期就启动一次MapReduce作业,这种实现需要减少每个片段的延迟,并且需要考虑系统的复杂度:将输入数据分隔成固定大小的片段,再由MapRedu
6、ce平台处理,缺点在于处理延迟与数据片段的长度、初始化处理任务的开销成正比。小的分段是会降低延迟,但是,也增加附加开销,并且分段之间的依赖管理更加复杂(例如一个分段可能会需要前一个分段的信息);反之,大的分段会增加延迟。最优化的分段大小取决于具体应用。为了支持流式处理,MapReduce需要被改造成Pipeline的模式,而不是reduce直接输出;考虑到效率,中间结果最好只保存在内存中等等。这些改动使得原有的MapReduce框架的复杂度大大增加,不利于系统的维护和扩展。用户被迫使用MapReduce的接口来定义流式作业,这使得用户程序的可伸缩性降低。传统数据处理流程示意图传统的数据操作,首
7、先将数据采集并存储在DBMS中,然后通过query和DBMS进行交互,得到用户想要的结果。这样的一个流程隐含了两个前提:Data is old。当对数据做查询的时候,里面数据其实是过去某一个。当对数据做查询的时候,里面数据其实是过去某一个时刻数据的一个时刻数据的一个snapshot,数据可能已经过期了;,数据可能已经过期了;这样的流程需要人们主动发出query。也就是说用户是主动的,而DBMS系统是被动的。系统是被动的。流计算一般有三个处理流程:数据实时采集、数据实时计算、实时查询服务。实时计算三个阶段阶段一:数据实时采集为流计算提供实时数据,要保证实时性、低延迟、稳定可靠。许多开源分布式日志
8、收集系统均可满足每秒数百MB的数据采集和传输需求。Hadoop的ChukwaFacebook的Scribe LinkedIn的KafkaCloudera的Flume 淘宝的TimeTunnel阶段一:数据实时采集数据采集系统基本架构一般由三部分组成Agent: 主动采集数据,并把数据推送到collectorCollector: 接收多个Agent的数据,并实现有序、可靠、高性能的转发Store: 存储Collector的数据(对于流计算来说,这边接收的数据一般直接用于计算)实时采集系统基本架构阶段二:数据实时计算现在大量存在的实时数据,人们需要根据当前的数据实时的作出判断。流计算在流数据不断变
9、化的运动过程中实时地进行分析,捕捉到可能对用户有用的信息,并把结果发送出去,在这种情况下:能对流数据做出实时回应;用户是被动的而DBMS是主动的。是主动的。数据实时计算示意图阶段三:实时查询服务经由流计算框架得出的结果可供实时查询、展示或存储。分析系统传统的分析系统都是离线计算,即将数据全部保存下来,然后每隔一定时间进行离线分析,再将结果保存。但这样会有一定的延时,这取决于离线计算的间隔时间和计算时长。而通过流计算,能在秒级别内得到实时分析结果,有利于根据实时分析结果及时做出决策、调整。基于分析系统的应用场景广告系统:如搜索引擎和购物网站,实时分析用户信息,展示更佳的相关广告。个性化推荐:如社
10、交网站,实时统计和分析用户行为,精确推荐,增加用户粘性。挑战实时计算处理数据3T/日离线分布式计算处理数据超过20T/日服务超过百万的淘宝卖家问题离线计算分析延时太大,对于需要实时分析数据的应用场景(如双11,双12,一年就一次,需要实时数据来帮助调整决策),如何实现秒级别的实时分析?Super Mario 2.0流计算框架流计算框架海量数据实时计算引擎、实时流传输框架基于Erlang+Zookeeper开发低延迟、高可靠性Super Mario 2.0(监控界面)实时数据处理流程Log数据由TimeTunnel在毫秒级别内实时送达。实时数据经由Super Mario流计算框架进行处理。HBa
11、se输出、存储结果实现效果可处理每天TB级的实时流数据。从用户发起请求到数据展示,延时控制在2-3秒内。量子恒道实时数据处理示意图IBM的流计算平台InfoSphereStreams能够广泛应用于制造、零售、交通运输、金融证券以及监管各行各业的解决方案之中,使得实时快速做出决策的理念得以实现。汇总来自不同源的实时数据InfoSphere Stream界面Streams应用于斯德哥尔摩的交通信息管理,通过结合来自不同源的实时数据,Streams可以生成动态的、多方位的看待交通流量的方式,为城市规划者和乘客提供实时交通状况查看。通过InfoSphere Streams分析实时交通信息流计算框架要求
12、高性能处理大数据的基本要求,如每秒处理几十万条数据海量式支持TB级数据,甚至是PB级实时性保证较低延迟事件,达到秒级,最好是毫秒级分布式支持大数据的基本架构,必须能平滑扩展易用性可靠性免费、开源的分布式实时计算系统简单、高效、可靠地处理大量的流数据Storm对于实时计算的意义类似于Hadoop对于批处理的意义基于Clojure和Java开发Storm 流式计算Twitter数据系统分层处理架构为了处理最近的数据,需要一个实时系统和批处理系统同时运行。要计算一个查询函数,需要查询批处理视图和实时视图,并把它们合并起来以得到最终的数据。Twitter中进行实时计算的系统就是Storm,它在数据流上
13、进行持续计算,并且对这种流式数据处理提供了有力保障。Twitter分层的数据处理架构由Hadoop和ElephantDB组成批处理系统,Storm和Cassandra组成实时系统,实时系统处理的结果最终会由批处理系统来修正,正是这个观点使得Storm的设计与众不同。Twitter数据系统分层处理架构流计算(Stream processing)实时分析(Real-time analytics)连续计算(Continuous computation)分布式远程过程调用(Distributed RPC)在线机器学习(Online machine learning)更多简单的编程模型:Storm降低了
14、进行实时处理的复杂性。支持各种编程语言:默认支持Clojure、Java、Ruby和Python。要增加对其他语言的支持,只需实现一个简单的Storm通信协议即可。容错性:Storm会自动管理工作进程和节点的故障。水平扩展:计算是在多个线程、进程和服务器之间并行进行的。可靠的消息处理:Storm保证每个消息至少能得到一次完整处理。快速:系统的设计保证了消息能得到快速的处理。本地模式:Storm有一个“本地模式”,可以在处理过程中完全模拟Storm集群,这样可以快速进行开发和单元测试。容易部署:Storm集群易于部署,只需少量的安装和配置就可运行。Storm对于流Stream的抽象:流是一个不间
15、断的无界的连续Tuple(元组,是元素有序列表)。Stream消息流,是一个没有边界的Tuple序列,这些Tuples会被以一种分布式的方式并行地创建和处理。Stream消息流,是一个没有边界的Tuple序列,这些Tuples会被以一种分布式的方式并行地创建和处理。Storm认为每个Stream都有一个源头,它将这个源头抽象为Spouts。Spouts流数据源,它会从外部读取流数据并发出Tuple。Spouts流数据源,它会从外部读取流数据并发出Tuple。Storm将流的中间状态转换抽象为Bolts,Bolts可以处理Tuples,同时它也可以发送新的流给其他Bolts使用。Bolts消息处
16、理者,所有的消息处理逻辑被封装在Bolts里面,处理输入的数据流并产生输出的新数据流,可执行过滤,聚合,查询数据库等操作。Bolts消息处理者,所有的消息处理逻辑被封装在Bolts里面,处理输入的数据流并产生输出的新数据流,可执行过滤,聚合,查询数据库等操作。为了提高效率,在Spout源接上多个Bolts处理器。Storm将这样的无向环图抽象为Topology(拓扑)。Topology是Storm中最高层次的抽象概念,它可以被提交到Storm集群执行,一个拓扑就是一个流转换图。图中的边表示Bolt订阅了哪些流。当Spout或者Bolt发送元组到流时,它就发送元组到每个订阅了该流的Bolt上进行
17、处理。Topology实现:Storm中拓扑定义仅仅是一些Thrift结构体(Thrift是基于二进制的高性能的通讯中间件),这样一来就可以使用其他语言来创建和提交拓扑。Tuple实现:一个Tuple就是一个值列表。列表中的每个value都有一个name,并且该value可以是基本类型,字符类型,字节数组等,也可以是其他可序列化的类型。拓扑的每个节点都要说明它所发射出的元组字段的name,其他节点只需要订阅该name就可以接收数据。Stream groupings(消息分发策略):定义一个Stream应该如何分配给Bolts,解决两个组件(Spout和Bolt)之间发送tuple元组的问题。T
18、ask(任务):每一个Spout和Bolt会被当作很多task在整个集群里面执行,每一个task对应到一个线程。Stream groupings示意图Task示意图一个Topology的完整示意图Storm集群表面类似Hadoop集群。在Hadoop上运行的是“MapReduce jobs”,在Storm上运行的是“Topologies”。两者大不相同,一个关键不同是一个MapReduce的Job最终会结束,而一个Topology永远处理消息(或直到kill它)。Storm集群有两种节点:控制(Master)节点和工作者(Worker)节点。控制节点运行一个称之为“Nimbus”的后台程序,负
19、责在集群范围内分发代码、为worker分配任务和故障监测。每个工作者节点运行一个称之“Supervisor”的后台程序,监听分配给它所在机器的工作,基于Nimbus分配给它的事情来决定启动或停止工作者进程。一个Zookeeper集群负责Nimbus和多个Supervisor之间的所有协调工作(一个完整的拓扑可能被分为多个子拓扑并由多个supervisor完成)。Nimbus后台程序和Supervisor后台程序都是快速失败(fail-fast)和无状态的,所有状态维持在Zookeeper或本地磁盘。这种设计中master并没有直接和worker通信,而是借助中介Zookeeper,这样一来可以
20、分离master和worker的依赖,将状态信息存放在zookeeper集群内以快速回复任何失败的一方。这意味着你可以kill杀掉nimbus进程和supervisor进程,然后重启,它们将恢复状态并继续工作,这种设计使Storm极其稳定。Storm工作流程示意图单词统计编程模型非常简单,通过Topology定义整个处理逻辑。Topology中定义了一个Spout和两个处理消息的Bolt。Bolt通过订阅通过订阅Tuple的的name值来接收数据值来接收数据单词统计Shuffle Grouping是随机分组,表示Tuple会被随机的分发给Bolt。Fields Grouping是按字段分组,保
21、证具有相同field值的Tuple会分发给同一个Task进行统计,保证统计的准确性。SplitSentenceWordCount每个从spout发送出来的消息(英文句子)都会触发很多的task被创建。Bolts将句子分解为独立的单词,然后发射这些单词。最后,实时的输出每个单词以及它出现过的次数。一个句子经单词统计后的统计结果示意图使用Storm的公司和项目淘宝、阿里巴巴将流计算广泛应用于业务监控、广告推荐、买家实时数据分析等场景。淘宝数据部新架构IBM InfoSphere Streams:商业级高级计算平台,帮助用户开发的应:商业级高级计算平台,帮助用户开发的应用程序快速摄取、分析和关联来自
22、数千个实时源的信息。用程序快速摄取、分析和关联来自数千个实时源的信息。http:/ StreamBase:IBM开发的另一款商业流计算系统,在金融部门和开发的另一款商业流计算系统,在金融部门和政府部门使用。政府部门使用。http:/Twitter Storm:免费、开源的分布式实时计算系统,可简单、高效、:免费、开源的分布式实时计算系统,可简单、高效、可靠地处理大量的流数据可靠地处理大量的流数据http:/storm- S4(Simple Scalable Streaming System):开源流计算平):开源流计算平台,是通用的、分布式的、可扩展的、分区容错的、可插拔的流式系统。台,是通用
23、的、分布式的、可扩展的、分区容错的、可插拔的流式系统。http:/incubator.apache.org/s4/Facebook Puma:Facebook使用使用Puma和和Hbase相结合来处理实时相结合来处理实时数据。数据。DStream:百度正在开发的属于百度的通用实时数据流计算系统。:百度正在开发的属于百度的通用实时数据流计算系统。银河流数据处理平台:淘宝开发的通用流数据实时计算系统。Super Mario:基于:基于erlang语言和语言和zookeeper模块开发的高性能数据流模块开发的高性能数据流处理框架。处理框架。Hstream、Esper、SQLstream等等关于流计算
24、的文章对互联网海量数据实时计算的理解http:/ MapReduce:谈2011年风靡的数据流计算系统http:/ Stormhttp:/storm- Pregel图计算模型图计算模型Pregel的的C+ APIPregel模型的基本体系结构模型的基本体系结构Pregel模型的应用实例模型的应用实例改进的图计算模型改进的图计算模型参考资料参考资料大型图(像社交网络和网络图等)常常作为现在系统计算需要的一部分。现在存在许多图计算问题像最短路径、集群、网页排名、最小切割、连通分支等等,但还没有一个可扩展的通用系统来解决这些问题。解决这些问题的算法的特点:它们常常表现为比较差的内存访问局部性、针对单
25、个顶点的处理工作过少、以及计算过程中伴随着的并行度的改变等问题。可能的解决方法:为特定的图应用定制相应的分布式实现基于现有的分布式计算平台使用单机的图算法库如BGL,LEAD,NetworkX,JDSL,Standford,GraphBase,FGL等使用已有的并行图计算系统如Parallel BGL,CGMgraph等目前通用的图处理软件主要包括两种。一种主要基于遍历算法、实时的图数据库,如Neo4j , OrientDB , DEX , 和InfiniteGraph .另一种则是以图顶点为中心的消息传递批处理的并行引擎,如Hama , Golden Orb , Giraph , 和Preg
26、el .第一种基本都基于tinkerpop的图基础框架,tinkerpop项目关系如图1所示:以图顶点为中心的消息传递批处理的并行引擎主要是基于BSP(Bulk Synchronous Parallel)模型所实现的并行图处理包。BSP是由哈佛大学Viliant和牛津大学Bill McColl提出的并行计算模型。一个BSP模型由大量相互关联的处理器(processor)所组成,它们之间形成了一个通信网络。每个处理器都有快速的本地内存和不同的计算线程。一次BSP计算过程由一系列全局超步组成,超步就是计算中一次迭代。每个超步主要包括三个组件:并发计算(Concurrent computation)
27、:每个参与的处理器都有自身的计算任务,它们只读取存储在本地内存的值。这些计算都是异步并且独立的。通讯(Communication): 处理器群相互交换数据,交换的形式:由一方发起推送(put)和获取(get)操作。栅栏同步(Barrier synchronisation): 当一个处理器遇到路障,会等到其他所有处理器完成它们的计算步骤。每一次同步也是一个超步的完成和下一个超步的开始。Pregel是由Google开发的一个用于分布式图计算的计算框架,主要用于图遍历(BFS)、最短路径(SSSP)、PageRank计算等等。共享内存的运行库有很多,但是对于Google来说,一台机器早已经放不下需要
28、计算的数据了,所以需要分布式的这样一个计算环境。没有Pregel之前,可以选择用MapReduce来做,但是效率很低。下面简单介绍一下PageRank算法在Pregel和MapReduce中的实现。PageRank算法作为Google的网页链接排名算法,具体公式如下: 对任意一个链接,其PR值为链入到该链接的源链接的PR值对该链接的贡献和(分母Ni为第i个源链接的链出度)。Pregel的计算模型主要来源于BSP并行计算模型的启发。要用Pregel计算模型实现PageRank算法,也就是将网页排名算法映射到图计算中,这其实是很自然的,网络链接是一个连通图。上图就是四个网页(A,B,C,D)互相链
29、入链出组成的联通图。根据Pregel的计算模型,将计算定义到顶点(vertex)即A,B,C,D上来,对应一个对象,即一个计算单元。每一个计算单元包含三个成员变量:Vertex value:顶点对应的PR值Out edge:只需要表示一条边,可以不取值Message:传递的消息,因为需要将本vertex对其它vertex的PR贡献传递给目标vertex每一个计算单元包含一个成员函数:Compute:该函数定义了vertex上的运算,包括该vertex的PR值计算,以及从该vertex发送消息到其链出vertexclass PageRankVertex : public Vertex publi
30、c: virtual void Compute(MessageIterator* msgs) if (superstep() = 1) double sum = 0; for (; !msgs-Done(); msgs-Next()sum += msgs-Value(); *MutableValue() =0.15 / NumVertices() + 0.85 * sum; if (superstep() 30) const int64 n = GetOutEdgeIterator().size(); SendMessageToAllNeighbors(GetValue() / n); els
31、e VoteToHalt(); ;Pregel的执行包含PageRankVertex类,它继承了Vertex类。该类顶点值的类型是double,用来存储暂定的PageRank,消息类型也是double,用来传递PageRank的部分。图在第0个超步中被初始化,所以它的每个顶点值为1.0。在每个超步中,每个顶点都会沿着它的出射边发送它的PageRank值除以出射边数后的结果值。从第1个超步开始,每个顶点会将到达的消息中的值加到sum值中,同时将它的PageRank值设为0.15/ NumVertices()+0.85*sum。为了收敛,可以设置一个超步数量的限制或用aggregators来检查是
32、否满足收敛条件阶段1:解析网页Map task把(URL,page content)对映射为(URL,(PRinit,list-of-urls)PRinit是URL的“seed”PageRank。list-of-urls包含通过URL指向的所有页。Reduce task只是恒等函数。阶段2:PageRank分配Map task得到(URL,(cur_rank,url_list)对于每一个url_list中的u,输出(u,cur_rank/|url_list|)。输出(URL,url_list)通过迭代器来获取列表的指向。Reduce task 获得(URL,url_list)和很多(URL,v
33、ar)值对汇总vals,乘上d(0.85)。输出(URL,(new_rank,url_list)。最后阶段:一个非并行组件决定是否达到收敛。如果达到收敛,写出PageRank生成的列表。否则,回退到第2阶段的输出,进行另一个第2阶段的迭代。MapReduce也是Google提出的一种计算模型,它是为全量计算而设计。它实现MapReduce需要以下三个阶段:下面是第二阶段,把网页链接映射到key-value对的伪代码:Mapper函数的伪码:input - PageA, PageB, PageC. / 链接关系begin Nn:= the number of outlinksfor PageN;
34、 for each outlinkPageK output PageK- / 同时输出链接关系,用于迭代 output PageN- PageA, PageB, PageC.EndMapper的输出如下(已经排序,所以PageK的数据排在一起,最后一列则是链接关系对):PageK- PageK- .PageK- Reduce函数的伪码:input mappers outputbegin RankK:= 0; for each inlinkPageNi RankK+= RankNi/Nni* beta / output the PageKand its new Rank for the next
35、 iteration output - end总结:简单地来讲,Pregel将PageRank处理对象看成是连通图,而MapReduce则将其看成是Key-Value对。Pregel将计算细化到顶点vertex,同时在vertex内控制循环迭代次数,而MapReduce则将计算批量化处理,按任务进行循环迭代控制。PageRank算法如果用MapReduce实现,需要一系列的MapReduce的调用。从一个阶段到下一个阶段,它需要传递整个图的状态,这样就需要许多的通信和随之而来的序列化和反序列化的开销。另外,这一连串的MapReduce作业各执行阶段需要的协同工作也增加了编程复杂度,而Prege
36、l使用超步简化了这个过程。一个典型的Pregel计算过程如下:读取输入初始化该图,当图被初始化好后,运行一系列的超步直到整个计算结束,这些超步之间通过一些全局的同步点分隔开,输出结果结束计算。在每一个超步中,计算框架都会针对每个顶点调用用户自定义的函数,这个过程是并行的。该函数描述的是一个顶点V在一个超步S中需要执行的操作。函数可以:读取前一个超步(S-1)中发送给V的消息发送消息给其他顶点,这些消息将会在下一个超步(S+1)中被接收修改顶点V及其出射边的状态发生拓扑变化整个Pregel程序的输出是所有顶点输出的集合。顶点:每一个顶点都有一个相应的由String描述的顶点标识符。每一个顶点都有
37、一个与之对应的可修改的用户自定义值。边:每一条有向边都和其源顶点关联,还记录了其目标顶点的标识符。每一条有向边拥有一个可修改的用户自定义值。在第0个超步,所有顶点都处于active状态只有active顶点参与对应超步中的计算顶点通过将其自身的status设置成“halt”来进入inactive状态inactive顶点收到其它顶点传送的消息被唤醒进入active状态整个计算在所有顶点都达到“inactive”状态,并且没有message在传送的时候宣告结束。计算模型是一种纯消息传递模型,忽略远程数据读取和其他共享内存的方式,有两个原因。第一,消息传递模型足够表达所有图算法。第二,出于性能的考虑。
38、在一个集群环境中,从远程机器上读取一个值是会有很高的延迟的。而我们的消息传递模式通过异步的方式传输批量消息,可以减少远程读取的延迟。通过一个简单的例子来说明这些基本概念:给定一个强连通图,图中每个顶点都包含一个值,它会将最大值传播到每个顶点。在每个超步中,顶点会从接收到的消息中选出一个最大值,并将这个值传送给其所有的相邻顶点。当某个超步中已经没有顶点更新其值,那么算法就宣告结束。编写一个Pregel程序需要继承Pregel中已预定义好的一个基类Vertex类template class Vertex public:virtual void Compute(MessageIterator* ms
39、gs) = 0; const string& vertex_id() const; int64 superstep() const; const VertexValue& GetValue(); VertexValue* MutableValue(); OutEdgeIterator GetOutEdgeIterator(); void SendMessageTo(const string& dest_vertex,const MessageValue& message); void VoteToHalt();用户覆写Vertex类的虚函数Compute(),该函数会在每一个超步中对每一个顶点
40、进行调用。Compute()方法可以通过调用GetValue()方法来得到当前顶点的值,或者通过调用MutableValue()方法来修改当前顶点的值。还可以通过由出射边的迭代器提供的方法来查看修改出射边对应的值。顶点之间的通信是直接通过发送消息,每条消息都包含了消息值和目标顶点的名称。消息值的数据类型是由用户通过Vertex类的模版参数来指定。在一个超步中,一个顶点可以发送任意多的消息。在该迭代器中并不保证消息的顺序,但是可以保证消息一定会被传送并且不会重复。消息可以传给任意标识符已知的顶点发送消息时,尤其是当目标顶点在另外一台机器时,会产生一些开销。某些情况可以用combiner降低这种开
41、销。比方说,假如Compute() 收到许多的int 值消息,而它仅仅关心的是这些值的和,而不是每一个int的值,这种情况下,系统可以将发往同一个顶点的多个消息combine成一个消息,该消息中仅包含它们的和值,这样就可以减少传输和缓存的开销。Combiners在默认情况下并没有被开启,而用户如果想要开启Combiner的功能,可以通过重载Combine()方法实现。框架并不会确保哪些消息会被Combine而哪些不会,也不会确保传送给Combine()的值和Combining操作的执行顺序。所以Combiner只应该对那些满足交换律和结合律的操作打开。例子:假设我们想统计在一组相关联的页面中所
42、有页面的链接数。在第一个迭代中,对从每一个顶点(页面)的链接,我们会向目标页面发送一个消息。这里输入消息队列上的count函数可以通过一个combiner来优化性能。在这个求最大值的例子中,一个Max combiner可以减少通信负荷。Pregel的aggregators是一种提供全局通信,监控和数据查看的机制。在一个超步S中,每一个顶点都可以向一个aggregator提供一个数据,系统会使用一种reduce操作来负责聚合这些值,而产生的值将会对所有的顶点在超步S+1中可见。Aggregators可以用来做统计和全局协同。Aggregators可以通过把Aggregator类子类化来实现。应该
43、满足交换律和结合律默认情况下,一个aggregator仅仅会对来自同一个超步的输入进行聚合。例子:Sum 运算符应用于每个顶点的出射边数可以用来生成图中边的总数并使它能与所有的顶点相通信。更复杂的Reduce运算符甚至可以产生直方图。在求最大值得例子中,我们我们可以通过运用一个Max aggregator在一个超步中完成整个程序。在一个超步中完成整个程序。Compute()算法也可以用来修改图的拓扑结构。在请求发出后在该超步中发生拓扑变化。拓扑变化的顺序:删除操作在添加操作之前删除边操作在删除顶点操作之前添加顶点操作在添加边操作之前这种局部有序性解决了很多冲突,其余的冲突由用户自定义的hand
44、lers解决。同一种handler机制将被用于解决由于多个顶点删除请求或多个边增加请求或删除请求而造成的冲突。Pregel的协同机制是惰性的,全局的拓扑改变在被apply之前不需要进行协调这种设计的选择是为了优化流式处理。直观来讲就是对顶点V的修改引发的冲突由V自己来处理。Pregel同样也支持纯local的拓扑改变,Local的拓扑改变不会引发冲突,并且顶点或边的本地增减能够立即生效,很大程度上简化了分布式的编程。可以采用多种格式进行图的保存,比如可以用text文件,关系数据库,或者Bigtable中的行。类似的,结果可以以任何一种格式输出并根据应用程序选择最适合的存储方式。用户可以通过继承
45、Reader和Writer类来定义他们自己的读写方式。Pregel是为Google的集群架构而设计的。应用程序通常通过一个集群管理系统执行,该管理系统会通过调度作业来优化集群资源的使用率,有时候会杀掉一些任务或将任务迁移到其他机器上去。持久化的数据被存储在GFS或Bigtable中,而临时文件比如缓存的消息则存储在本地磁盘中。Pregel library将一张图划分成许多的partitions,每一个partition包含了一些顶点和以这些顶点为起点的边。将一个顶点分配到某个partition上去取决于该顶点的ID。默认的partition函数为hash(ID) mod N,N为所有parti
46、tion总数。接下来描述一个Pregel程序执行的几个阶段。1.用户程序的多个copy开始在集群中的机器上执行。其中一个copy充当masterMaster不被分配图的任意部分,它负责协调worker的活动2.master将图进行分区,然后将一个或多个partition分给worker;每一个worker会在内存中维护分配到其之上的graph partition的状态。执行它的顶点上的用户定义的Compute()方法并管理来自或发给其他顶点的消息。执行过程3.Master为每个worker分配用户输入的一部分。输入被看做一系列的记录,每个记录包含任意数量的顶点和边。在输入完成加载后,所有的顶点
47、被标记为active。4.在一个超步中,master通知每一个worker去执行,只要存在active顶点worker一直执行,并为每一个active状态的顶点调用compute()方法。它也会传送以前的超步发送的消息。当worker完成后,它会向master作出响应,告诉master在下一个超步中active顶点的数量。5.计算结束后,master会通知所有的worker保存它那部分的计算结果。容错是通过checkpointing来实现的。在每个超步的开始阶段,master命令worker让它保存它上面的partitions的状态到持久存储设备,包括顶点值,边值,以及接收到的消息。Maste
48、r通过ping消息检测worker是否故障当一个或多个worker出现故障时,和它们关联的分区的当前状态就会丢失。Master重新分配图的partition到当前可用的worker集合上。所有的partition会从最近的某超步S开始时写出的checkpoint中重新加载状态信息。该超步可能比在出故障的worker上最后运行的超步S早好几个阶段整个系统从该超步重新开始Confined recovery可以改进恢复执行的开销和延迟。除了基本的checkpoint,worker同时还会将其在加载图的过程中和超步中发送出去的消息写入日志。这样恢复就会被限制在丢掉的那些partitions上。一个wo
49、rker机器会在内存中维护分配到其之上的graph partition的状态。当Compute()请求发送一个消息到其他顶点时,worker首先确认目标顶点是属于远程的worker机器,还是当前worker。如果是在远程的worker机器上,那么消息就会被缓存,当缓存大小达到一个阈值,最大的那些缓存数据将会被异步地flush出去,作为单独的一个网络消息传输到目标worker。如果是在当前worker,那么就可以做相应的优化:消息就会直接被放到目标顶点的输入消息队列中。如果用户提供了Combiner,那么在消息被加入到输出队列或者到达输入队列时,会执行combiner函数。后一种情况并不会节省网
50、络开销,但是会节省用于消息存储的空间。Master主要负责的worker之间的工作协调,每一个worker在其注册到master的时候会被分配一个唯一的ID。Master内部维护着一个当前活动的worker列表,master中保存这些信息的数据结构大小与partitions的个数相关,与图中的顶点和边的数目无关。绝大部分的master的工作,包括输入,输出,计算,保存以及从checkpoint中恢复,都将会在一个叫做barriers的地方终止:Master同时还保存着整个计算过程以及整个graph的状态的统计数据。为方便用户监控,Master在内部运行了一个HTTP服务器来显示这些信息。每个A