1、流计算提纲提纲1 1 流计算概述流计算概述2 2 流计算处理流程流计算处理流程3 3 流计算应用流计算应用4 4 流计算开源框架流计算开源框架 Storm Storm5 5 Spark StreamingSpark Streaming6 6 SamzaSamza7 7 StormStorm、Spark StreamingSpark Streaming和和SamzaSamza的应用场景的应用场景8 8 StormStorm编程实践编程实践1 1流计算概述流计算概述 1.1 静态数据和流数据 1.2 批量计算和实时计算 1.3 流计算概念 1.4 流计算与Hadoop 1.5 流计算框架很多企业为
2、了支持决策分析而构建的数据仓库系统,其中存放的大量历史数据就是静态数据。技术人员可以利用数据挖掘和OLAP(On-Line Analytical Processing)分析工具从静态数据中找到对企业有价值的信息1.1 1.1 静态数据和流数据静态数据和流数据近年来,在Web应用、网络监控、传感监测等领域,兴起了一种新的数据密集型应用流数据,即数据以大量、快速、时变的流形式持续到达实例:PM2.5检测、电子商务网站用户点击流流数据具有如下特征: 数据快速持续到达,潜在大小也许是无穷无尽的 数据来源众多,格式复杂 数据量大,但是不十分关注存储,一旦经过处理,要么被丢弃,要么被归档存储 注重数据的整
3、体价值,不过分关注个别数据 数据顺序颠倒,或者不完整,系统无法控制将要处理的新到达的数据元素的顺序1.1 1.1 静态数据和流数据静态数据和流数据对静态数据和流数据的处理,对应着两种截然不同的计算模式:批量计算和实时计算1.2 1.2 批量计算和实时计算批量计算和实时计算图8-2 数据的两种处理模型批量计算:充裕时间处理静态数据,如Hadoop流数据不适合采用批量计算,因为流数据不适合用传统的关系模型建模流数据必须采用实时计算,响应时间为秒级数据量少时,不是问题,但是,在大数据时代,数据格式复杂、来源众多、数据量巨大,对实时计算提出了很大的挑战。因此,针对流数据的实时计算流计算,应运而生 流计
4、算:实时获取来自不同数据源的海量数据,经过实时分析处理,获得有价值的信息1.3 1.3 流计算概念流计算概念图8-3 流计算示意图流计算秉承一个基本理念,即数据的价值随着时间的流逝而降低数据的价值随着时间的流逝而降低,如用户点击流。因此,当事件出现时就应该立即进行处理,而不是缓存起来进行批量处理。为了及时处理流数据,就需要一个低延迟、可扩展、高可靠的处理引擎对于一个流计算系统来说,它应达到如下需求: 高性能:处理大数据的基本要求,如每秒处理几十万条数据 海量式:支持TB级甚至是PB级的数据规模 实时性:保证较低的延迟时间,达到秒级别,甚至是毫秒级别 分布式:支持大数据的基本架构,必须能够平滑扩
5、展 易用性:能够快速进行开发和部署 可靠性:能可靠地处理流数据1.3 1.3 流计算概念流计算概念Hadoop设计的初衷是面向大规模数据的批量处理,每台机器并行运行MapReduce任务,最后对结果进行汇总输出MapReduce是专门面向静态数据的批量处理的,内部各种实现机制都为批处理做了高度优化,不适合用于处理持续到达的动态数据可能会想到一种“变通”的方案来降低批处理的时间延迟将基于MapReduce的批量处理转为小批量处理,将输入数据切成小的片段,每隔一个周期就启动一次MapReduce作业。但这种方式也无法有效处理流数据 切分成小片段,可以降低延迟,但是也增加了附加开销,还要处理片段之间
6、依赖关系 需要改造MapReduce以支持流式处理1.4 1.4 流计算与流计算与HadoopHadoop结论:鱼和熊掌不可兼得,结论:鱼和熊掌不可兼得,HadoopHadoop擅长批处理,不适合流计算擅长批处理,不适合流计算当前业界诞生了许多专门的流数据实时计算系统来满足各自需求目前有三类常见的流计算框架和平台:商业级的流计算平台、开源流计算框架、公司为支持自身业务开发的流计算框架商业级:IBM InfoSphere Streams和IBM StreamBase较为常见的是开源流计算框架,代表如下: Twitter Storm:免费、开源的分布式实时计算系统,可简单、高效、可靠地处理大量的流
7、数据 Yahoo! S4(Simple Scalable Streaming System):开源流计算平台,是通用的、分布式的、可扩展的、分区容错的、可插拔的流式系统 公司为支持自身业务开发的流计算框架: Facebook Puma Dstream(百度) 银河流数据处理平台(淘宝)1.5 1.5 流计算框架流计算框架2 2流计算处理流程流计算处理流程 2.1 概述 2.2 数据实时采集 2.3 数据实时计算 2.4 实时查询服务传统的数据处理流程,需要先采集数据并存储在关系数据库等数据管理系统中,之后由用户通过查询操作和数据管理系统进行交互传统的数据处理流程隐含了两个前提: 存储的数据是旧
8、的存储的数据是旧的。存储的静态数据是过去某一时刻的快照,这些数据在查询时可能已不具备时效性了 需要用户主动发出查询需要用户主动发出查询来获取结果来获取结果2.1 2.1 数据处理流程数据处理流程传统的数据处理流程示意图流计算的处理流程一般包含三个阶段:数据实时采集、数据实时计算、实时查询服务2.1 2.1 数据处理流程数据处理流程流计算处理流程示意图数据实时采集阶段通常采集多个数据源的海量数据,需要保证实时性、低延迟与稳定可靠以日志数据为例,由于分布式集群的广泛应用,数据分散存储在不同的机器上,因此需要实时汇总来自不同机器上的日志数据目前有许多互联网公司发布的开源分布式日志采集系统均可满足每秒
9、数百MB的数据采集和传输需求,如: Facebook的Scribe LinkedIn的Kafka 淘宝的Time Tunnel 基于Hadoop的Chukwa和Flume2.2 2.2 数据实时采集数据实时采集 数据采集系统的基本架构一般有以下三个部分: Agent:主动采集数据,并把数据推送到Collector部分 Collector:接收多个Agent的数据,并实现有序、可靠、高性能的转发 Store:存储Collector转发过来的数据(对于流计算不存储数据)2.2 2.2 数据实时采集数据实时采集数据采集系统基本架构数据实时计算阶段对采集的数据进行实时的分析和计算,并反馈实时结果经流处
10、理系统处理后的数据,可视情况进行存储,以便之后再进行分析计算。在时效性要求较高的场景中,处理之后的数据也可以直接丢弃2.3 2.3 数据实时数据实时计算计算数据实时计算流程实时查询服务:经由流计算框架得出的结果可供用户进行实时查询、展示或储存传统的数据处理流程,用户需要主动发出查询才能获得想要的结果。而在流处理流程中,实时查询服务可以不断更新结果,并将用户所需的结果实时推送给用户虽然通过对传统的数据处理系统进行定时定时查询,也可以实现不断地更新结果和结果推送,但通过这样的方式获取的结果,仍然是根据过去某一时刻的数据得到的结果,与实时结果有着本质的区别2.3 2.3 实时查询服务实时查询服务 可
11、见,流处理系统与传统的数据处理系统有如下不同: 流处理系统处理的是实时的数据,而传统的数据处理系统处理的是预先存储好的静态数据 用户通过流处理系统获取的是实时结果,而通过传统的数据处理系统,获取的是过去某一时刻的结果 流处理系统无需用户主动发出查询,实时查询服务可以主动将实时结果推送给用户2.3 2.3 实时查询服务实时查询服务流计算是针对流数据的实时计算,可以应用在多种场景中,如Web服务、机器翻译、广告投放、自然语言处理、气候模拟预测等如百度、淘宝等大型网站中,每天都会产生大量流数据,包括用户的搜索内容、用户的浏览记录等数据。采用流计算进行实时数据分析,可以了解每个时刻的流量变化情况,甚至
12、可以分析用户的实时浏览轨迹,从而进行实时个性化内容推荐但是,并不是每个应用场景都需要用到流计算的。流计算适合于需要处理持续到达的流数据、对数据处理有较高实时性要求的场景3 3 流计算的应用流计算的应用传统的业务分析一般采用分布式离线计算的方式,即将数据全部保存起来,然后每隔一定的时间进行离线分析来得到结果。但这样会导致一定的延时,难以保证结果的实时性随着分析业务对实时性要求的提升,离线分析模式已经不适合用于流数据的分析,也不适用于要求实时响应的互联网应用场景如淘宝网“双十一”、“双十二”的促销活动,商家需要根据广告效果来即时调整广告,这就需要对广告的受访情况进行分析。但以往采用分布式离线分析,
13、需要几小时甚至一天的延时才能得到分析结果。而促销活动只持续一天,因此,隔天才能得到的分析结果便失去了价值虽然分布式离线分析带来的小时级的分析延时可以满足大部分商家的需求,但随着实时性要求越来越高,如何实现秒级别的实时分析响应成为业务分析的一大挑战3.1 3.1 应用场景应用场景1: 1: 实时分析实时分析针对流数据,“量子恒道”开发了海量数据实时流计算框架Super Mario。通过该框架,量子恒道可处理每天TB级的实时流数据,并且从用户发出请求到数据展示,整个延时控制在2-3秒内,达到了实时性的要求3.1 3.1 应用场景应用场景1: 1: 实时分析实时分析Super Mario处理流程流计
14、算不仅为互联网带来改变,也能改变我们的生活如提供导航路线,一般的导航路线并没有考虑实时的交通状况,即便在计算路线时有考虑交通状况,往往也只是使用了以往的交通状况数据。要达到根据实时交通状态进行导航的效果,就需要获取海量的实时交通数据并进行实时分析借助于流计算的实时特性,不仅可以根据交通情况制定路线,而且在行驶过程中,也可以根据交通情况的变化实时更新路线,始终为用户提供最佳的行驶路线3.1 3.1 应用场景应用场景2: 2: 实时交通实时交通 4.1 Storm简介 4.2 Storm的特点 4.3 Storm设计思想 4.4 Storm框架设计4 4 开源流计算框架开源流计算框架StormSt
15、orm 以前只有政府机构和金融机构能够通过昂贵的定制系统来满足流数据实时分析计算需求 早期对于流计算的研究多数是基于对传统数据库处理的流式化,即实时数据库,很少研究流计算框架 Yahoo! S4和Twitter Storm的开源,改变了这个情况 在流数据处理上比MapReduce更有优势 批处理系统关注吞吐率,流处理系统关注延时 Yahoo! S4和Twitter Storm改变了开发实时应用的方式 以前既要关注处理逻辑,还要解决实时数据获取、传输、存储 现在可以快速低成本搭建起实时流处理系统4 4 开源流计算框架开源流计算框架StormStormTwitter Storm是一个免费、开源的分
16、布式实时计算系统,Storm对于实时计算的意义类似于Hadoop对于批处理的意义,Storm可以简单、高效、可靠地处理流数据,并支持多种编程语言Storm框架可以方便地与数据库系统进行整合,从而开发出强大的实时计算系统4.1 Storm4.1 Storm简介简介Twitter是全球访问量最大的社交网站之一,Twitter开发Storm流处理框架也是为了应对其不断增长的流数据实时处理需求4.1 Storm4.1 Storm简介简介Twitter的分层数据处理架构4.2 Storm4.2 Storm的特点的特点Storm可用于许多领域中,如实时分析、在线机器学习、持续计算、远程RPC、数据提取加载
17、转换等Storm具有以下主要特点: 整合性:Storm可方便地与队列系统和数据库系统进行整合 简易的API:Storm的API在使用上即简单又方便 可扩展性:Storm的并行特性使其可以运行在分布式集群中 容错性:Storm可自动进行故障节点的重启、任务的重新分配 可靠的消息处理:Storm保证每个消息都能完整处理 支持各种编程语言:Storm支持使用各种编程语言来定义任务 快速部署:Storm可以快速进行部署和使用 免费、开源:Storm是一款开源框架,可以免费使用4.3 Storm4.3 Storm设计思想设计思想Storm主要术语包括Streams、Spouts、Bolts、Topolo
18、gy和Stream GroupingsStreamsStreams:Storm将流数据Stream描述成一个无限的Tuple序列,这些Tuple序列会以分布式的方式并行地创建和处理每个tuple是一堆值,每个值有一个名字,并且每个值可以是任何类型Tuple本来应该是一个Key-Value的Map,由于各个组件间传递的tuple的字段名称已经事先定义好了,所以Tuple只需要按序填入各个Value,所以就是一个Value List(值列表)Field1Field1Field2Field2Field3Field3Field4Field44.3 Storm4.3 Storm设计思想设计思想Spout
19、Spout:Storm认为每个Stream都有一个源头,并把这个源头抽象为Spout通常Spout会从外部数据源(队列、数据库等)读取数据,然后封装成Tuple形式,发送到Stream中。Spout是一个主动的角色,在接口内部有个nextTuple函数,Storm框架会不停的调用该函数4.3 Storm4.3 Storm设计思想设计思想BoltBolt:Storm将Streams的状态转换过程抽象为Bolt。Bolt即可以处理Tuple,也可以将处理后的Tuple作为新的Streams发送给其他BoltBolt可以执行过滤、函数操作、Join、操作数据库等任何操作Bolt是一个被动的角色,其接
20、口中有一个execute(Tuple input)方法,在接收到消息之后会调用此函数,用户可以在此方法中执行自己的处理逻辑4.3 Storm4.3 Storm设计思想设计思想TopologyTopology:Storm将Spouts和Bolts组成的网络抽象成Topology,它可以被提交到Storm集群执行。Topology可视为流转换图,图中节点是一个Spout或Bolt,边则表示Bolt订阅了哪个Stream。当Spout或者Bolt发送元组时,它会把元组发送到每个订阅了该Stream的Bolt上进行处理Topology里面的每个处理组件(Spout或Bolt)都包含处理逻辑, 而组件之
21、间的连接则表示数据流动的方向Topology里面的每一个组件都是并行运行的在Topology里面可以指定每个组件的并行度, Storm会在集群里面分配那么多的线程来同时计算在Topology的具体实现上,Storm中的Topology定义仅仅是一些Thrift结构体(二进制高性能的通信中间件),支持各种编程语言进行定义SpoutSpoutBoltBoltBoltBoltBolt4.3 Storm4.3 Storm设计思想设计思想Stream GroupingsStream Groupings:S Storm中的Stream Groupings用于告知Topology如何在两个组件间(如Spou
22、t和Bolt之间,或者不同的Bolt之间)进行Tuple的传送。每一个Spout和Bolt都可以有多个分布式任务,一个任务在什么时候、以什么方式发送Tuple就是由Stream Groupings来决定的4.3 Storm4.3 Storm设计思想设计思想目前,Storm中的Stream Groupings有如下几种方式:(1)ShuffleGrouping:随机分组,随机分发Stream中的Tuple,保证每个Bolt的Task接收Tuple数量大致一致(2)FieldsGrouping:按照字段分组,保证相同字段的Tuple分配到同一个Task中(3)AllGrouping:广播发送,每一
23、个Task都会收到所有的Tuple(4)GlobalGrouping:全局分组,所有的Tuple都发送到同一个Task中(5)NonGrouping:不分组,和ShuffleGrouping类似,当前Task的执行会和它的被订阅者在同一个线程中执行(6)DirectGrouping:直接分组,直接指定由某个Task来执行Tuple的处理4.4 Storm4.4 Storm框架设计框架设计HadoopHadoopStormStorm应用名称JobTopology系统角色JobTrackerNimbusTaskTrackerSupervisor组件接口Map/ReduceSpout/BoltSto
24、rm和Hadoop架构组件功能对应关系Storm运行任务的方式与Hadoop类似:Hadoop运行的是MapReduce作业,而Storm运行的是“Topology”但两者的任务大不相同,主要的不同是:MapReduce作业最终会完成计算并结束运行,而Topology将持续处理消息(直到人为终止)4.4 Storm4.4 Storm框架设计框架设计Storm集群采用“MasterWorker”的节点方式: Master节点运行名为“Nimbus”的后台程序(类似Hadoop中的“JobTracker”),负责在集群范围内分发代码、为Worker分配任务和监测故障 Worker节点运行名为“Su
25、pervisor”的后台程序,负责监听分配给它所在机器的工作,即根据Nimbus分配的任务来决定启动或停止Worker进程,一个Worker节点上同时运行若干个Worker进程Storm使用Zookeeper来作为分布式协调组件,负责Nimbus和多个Supervisor之间的所有协调工作。借助于Zookeeper,若Nimbus进程或Supervisor进程意外终止,重启时也能读取、恢复之前的状态并继续工作,使得Storm极其稳定4.4 Storm4.4 Storm框架设计框架设计Storm集群架构示意图Worker进程Worker进程Worker进程4.4 Storm4.4 Storm框架
26、设计框架设计(1)worker:每个worker进程都属于一个特定的Topology,每个Supervisor节点的worker可以有多个,每个worker对Topology中的每个组件(Spout或 Bolt)运行一个或者多个executor线程来提供task的运行服务(2)executor:executor是产生于worker进程内部的线程,会执行同一个组件的一个或者多个task。(3)task:实际的数据处理由task完成,在Topology的生命周期中,每个组件的task数目是不会发生变化的,而executor的数目却不一定。executor数目小于等于task的数目,默认情况下,二者
27、是相等的Worker、Executor和Task的关系4.4 Storm4.4 Storm框架设计框架设计基于这样的架构设计,Storm的工作流程如下图所示:Storm工作流程示意图所有Topology任务的提交必须在Storm客户端节点上进行,提交后,由Nimbus节点分配给其他Supervisor节点进行处理Nimbus节点首先将提交的Topology进行分片,分成一个个Task,分配给相应的Supervisor,并将Task和Supervisor相关的信息提交到Zookeeper集群上Supervisor会去Zookeeper集群上认领自己的Task,通知自己的Worker进程进行Tas
28、k的处理说明:在提交了一个Topology之后,Storm就会创建Spout/Bolt实例并进行序列化。之后,将序列化的组件发送给所有的任务所在的机器(即Supervisor节点),在每一个任务上反序列化组件5 Spark Streaming5 Spark Streaming5.1 Spark Streaming设计5.2 Spark Streaming与Storm的对比5.1 Spark Streaming5.1 Spark Streaming设计设计Spark Streaming可整合多种输入数据源,如Kafka、Flume、HDFS,甚至是普通的TCP套接字。经处理后的数据可存储至文件系
29、统、数据库,或显示在仪表盘里图13 Spark Streaming支持的输入、输出数据源5.1 Spark Streaming5.1 Spark Streaming设计设计Spark Streaming的基本原理是将实时输入数据流以时间片(秒级)为单位进行拆分,然后经Spark引擎以类似批处理的方式处理每个时间片数据图14 Spark Streaming执行流程5.1 Spark Streaming5.1 Spark Streaming设计设计Spark Streaming最主要的抽象是DStream(Discretized Stream,离散化数据流),表示连续不断的数据流。在内部实现上,S
30、park Streaming的输入数据按照时间片(如1秒)分成一段一段的DStream,每一段数据转换为Spark中的RDD,并且对DStream的操作都最终转变为对相应的RDD的操作图15 DStream操作示意图5.2 Spark Streaming5.2 Spark Streaming与与StormStorm的对比的对比Spark Streaming和Storm最大的区别在于,Spark Streaming无法实现毫秒级的流计算,而Storm可以实现毫秒级响应Spark Streaming构建在Spark上,一方面是因为Spark的低延迟执行引擎(100ms+)可以用于实时计算,另一方面
31、,相比于Storm,RDD数据集更容易做高效的容错处理Spark Streaming采用的小批量处理的方式使得它可以同时兼容批量和实时数据处理的逻辑和算法,因此,方便了一些需要历史数据和实时数据联合分析的特定应用场合6 6 SamzaSamza6.1 基本概念6.2 系统架构6.1 6.1 基本概念基本概念1.作业一个作业(Job)是对一组输入流进行处理转化成输出流的程序。6.1 6.1 基本概念基本概念2.分区Samza的流数据单位既不是Storm中的元组,也不是Spark Streaming中的DStream,而是一条条消息Samza中的每个流都被分割成一个或多个分区,对于流里的每一个分区
32、而言,都是一个有序的消息序列,后续到达的消息会根据一定规则被追加到其中一个分区里6.1 6.1 基本概念基本概念3.任务一个作业会被进一步分割成多个任务(Task)来执行,其中,每个任务负责处理作业中的一个分区分区之间没有定义顺序,从而允许每一个任务独立执行YARN调度器负责把任务分发给各个机器,最终,一个工作中的多个任务会被分发到多个机器进行分布式并行处理6.1 6.1 基本概念基本概念4.数据流图一个数据流图是由多个作业构成的,其中,图中的每个节点表示包含数据的流,每条边表示数据传输多个作业串联起来就完成了流式的数据处理流程由于采用了异步的消息订阅分发机制,不同任务之间可以独立运行6.2
33、6.2 系统架构系统架构MapReduce批处理架构Samza流处理架构数据层HDFSKafka执行层YARNYARN处理层MapReduceSamza API表 MapReduce批处理架构和Samza流处理架构的类比Samza系统架构主要包括流数据层(Kafka)执行层(YARN)处理层(Samza API)流处理层和执行层都被设计成可插拔的,开发人员可以使用其他框架来替代YARN和Kafka6.2 6.2 系统架构系统架构处理分析过程如下:Samza客户端需要执行一个Samza作业时,它会向YARN的ResouceManager提交作业请求ResouceManager通过与NodeMan
34、ager沟通为该作业分配容器(包含了CPU、内存等资源)来运行Samza ApplicationMasterSamza ApplicationMaster进一步向ResourceManager申请运行任务的容器获得容器后,Samza ApplicationMaster与容器所在的NodeManager沟通,启动该容器,并在其中运行Samza Task RunnerSamza Task Runner负责执行具体的Samza任务,完成流数据处理分析7 Storm7 Storm、Spark StreamingSpark Streaming和和SamzaSamza的应用场景的应用场景从编程的灵活性来讲
35、,Storm是比较理想的选择,它使用Apache Thrift,可以用任何编程语言来编写拓扑结构(Topology)当需要在一个集群中把流计算和图计算、机器学习、SQL查询分析等进行结合时,可以选择Spark Streaming,因为,在Spark上可以统一部署Spark SQL,Spark Streaming、MLlib,GraphX等组件,提供便捷的一体化编程模型当有大量的状态需要处理时,比如每个分区都有数十亿个元组,则可以选择Samza。当应用场景需要毫秒级响应时,可以选择Storm和Samza,因为Spark Streaming无法实现毫秒级的流计算8 Storm8 Storm编程实践
36、编程实践8.1 编写Storm程序8.2 安装Storm的基本过程8.3 运行Storm程序Storm上机实践详细过程,请参考厦门大学数据库实验室建设的“中国高校大数据课程公共服务平台”中的技术文章:流计算学习指南访问地址:http:/ 8.1 编写编写StormStorm程序程序基于Storm的单词统计在形式上与基于MapReduce的单词统计是类似的,MapReduce使用的是Map和Reduce的抽象,而Storm使用的是Spout和Bolt的抽象Storm进行单词统计的整个流程: 从Spout中发送Stream(每个英文句子为一个Tuple) 用于分割单词的Bolt将接收的句子分解为独
37、立的单词,将单词作为Tuple的字段名发送出去 用于计数的Bolt接收表示单词的Tuple,并对其进行统计 输出每个单词以及单词出现过的次数程序任务:单词统计程序任务:单词统计8.1 8.1 编写编写StormStorm程序程序一个句子经Storm的单词统计得出的结果import org.apache.storm.Config;Import public class WordCountTopology public static class RandomSentenceSpout extends BaseRichSpout public static class SplitSentence e
38、xtends ShellBolt implements IRichBolt public static class WordCount extends BaseBasicBolt public static void main(String args) throws Exception TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(sentences, new RandomSentenceSpout(), 5); builder.setBolt(split, new SplitSentence(), 8).s
39、huffleGrouping(sentences); builder.setBolt(count, new WordCount(), 12).fieldsGrouping(split, new Fields(word); 8.1 8.1 编写编写StormStorm程序程序Storm的编程模型非常简单,如下代码即定义了整个单词统计Topology的整体逻辑8.1 8.1 编写编写StormStorm程序程序 main()函数中的处理逻辑Topology中仅定义了整体的计算逻辑,还需要定义具体的处理函数。具体的处理函数可以使用任一编程语言来定义,甚至也可以结合多种编程语言来实现public cl
40、ass RandomSentenceSpout extends BaseRichSpout SpoutOutputCollector _collector; Random _rand;Override public void nextTuple() Utils.sleep(100); String sentences = new String the cow jumped over the moon, an apple a day keeps the doctor away, four score and seven years ago, snow white and the seven dw
41、arfs, i am at two with nature ; String sentence = sentences_rand.nextInt(sentences.length); _collector.emit(new Values(sentence); Override public void declareOutputFields(OutputFieldsDeclarer declarer) declarer.declare(new Fields(“sentences); 8.1 8.1 编写编写StormStorm程序程序备注: 为简单起见,RandomSentenceSpout省略
42、了类中的一些方法RandomSentenceSpout类8.1 8.1 编写编写StormStorm程序程序如SplitSentence()方法虽然是通过Java语言定义的,但具体的操作可通过Python脚本来完成Topology里面的每个组件必须定义它要发射的Tuple的每个字段SplitSentenceSplitSentence类类8.1 8.1 编写编写StormStorm程序程序Python脚本splitsentence.py定义了一个简单的单词分割方法,即通过空格来分割单词。分割后的单词通过emit()方法以Tuple的形式发送给订阅了该Stream的Bolt进行接收和处理split
43、sentence.py8.1 8.1 编写编写StormStorm程序程序单词统计的具体逻辑:首先判断单词是否统计过,若未统计过,需先将count值置为0。若单词已统计过,则每出现一次该单词,count值就加1WordCount类 8.1 8.1 编写编写StormStorm程序程序上述虽然是一个简单的单词统计,但对其进行扩展,便可应用到许多场景中,如微博中的实时热门话题。Twitter也正是使用了Storm框架实现了实时热门话题Twitter实时热门话题处理流程示意图8.2 8.2 安装安装StormStorm的基本过程的基本过程安装Storm的基本过程如下:第一步:安装Java环境第二步:
44、安装 Zookeeper第三步:安装Storm(单机)第四步:关闭Storm本实例中Storm具体运行环境如下:CentOS 6.4Storm 0.9.6Java JDK 1.7ZooKeeper 3.4.6Python 2.6备注:CentOS中已默认安装了Python 2.6,我们还需要安装 JDK 环境以及分布式应用程序协调服务 ZookeeperStorm上机实践详细过程,请参考厦门大学数据库实验室建设的“中国高校大数据课程公共服务平台”中的“大数据课程学生服务站大数据课程学生服务站”中的“学习指南学习指南”栏目:学生服务站地址:http:/ 8.2 安装安装StormStorm的基本
45、过程的基本过程第一步:安装第一步:安装JavaJava环境环境Storm 运行需要 Java 环境,可选择 Oracle 的 JDK,或是 OpenJDK,现在一般 Linux 系统默认安装的基本是 OpenJDK,如 CentOS 6.4 就默认安装了 OpenJDK 1.7。但需要注意的是,CentOS 6.4 中默认安装的只是 Java JRE,而不是 JDK,为了开发方便,我们还是需要通过 yum 进行安装 JDK$ sudo yum install java-1.7.0-openjdk java-1.7.0-openjdk-devel接着需要配置一下 JAVA_HOME 环境变量,为
46、方便,可以在 /.bashrc 中进行设置8.2 8.2 安装安装StormStorm的基本过程的基本过程第二步:安装第二步:安装ZookeeperZookeeper到官网下载Zookeeper,比如下载 “zookeeper-3.4.6.tar.gz” 下载后执行如下命令进行安装 zookeeper(将命令中 3.4.6 改为你下载的版本):$ sudo tar -zxf /下载/zookeeper-3.4.6.tar.gz -C /usr/local$ cd /usr/local$ sudo mv zookeeper-* zookeeper #修改目录名称方便使用$ sudo chown
47、-R hadoop:hadoop ./zookeeper # 此处的hadoop为你的用户名chown命令让hadoop用户拥有zookeeper目录下的所有文件的权限8.2 8.2 安装安装StormStorm的基本过程的基本过程接着执行如下命令进行zookeeper配置:$ cd /usr/local/zookeeper$ mkdir tmp$ cp ./conf/zoo_sample.cfg ./conf/zoo.cfg$ vim ./conf/zoo.cfg进入zoo.cfg文件编辑状态后,将当中的 dataDir=/tmp/zookeeper 更改为 dataDir=/usr/loc
48、al/zookeeper/tmp 。接着执行:$ ./bin/zkServer.sh start第二步:安装第二步:安装ZookeeperZookeeper(续)(续)8.2 8.2 安装安装StormStorm的基本过程的基本过程第三步:安装第三步:安装StormStorm(单机)(单机)到官网下载Storm,比如Storm0.9.6下载后执行如下命令进行安装Storm:$ sudo tar -zxf /下载/apache-storm-0.9.6.tar.gz -C /usr/local$ cd /usr/local$ sudo mv apache-storm-0.9.6 storm$ su
49、do chown -R hadoop:hadoop ./storm # 此处的hadoop为你的用户名接着执行如下命令进行Storm配置:$ cd /usr/local/storm$ vim ./conf/storm.yaml备注:storm的运行有两种模式: 本地模式和分布式模式.在本地模式中, storm用一个进程里面的线程来模拟所有的spout和bolt. 本地模式对开发和测试来说比较有用。在分布式模式下, storm由一堆机器组成。当提交topology给master的时候, master负责分发代码并且负责给topolgoy分配工作进程。如果一个工作进程挂掉了, master节点会把
50、它重新分配到其它节点8.2 8.2 安装安装StormStorm的基本过程的基本过程修改其中的 storm.zookeeper.servers 和 nimbus.host 两个配置项,即取消掉注释且都修改值为 127.0.0.1(我们只需要在单机上运行),如下图所示。第三步:安装第三步:安装StormStorm(单机)(单机) (续)(续)然后就可以启动 Storm 了。执行如下命令启动 nimbus 后台进程:$ ./bin/storm nimbus8.2 8.2 安装安装StormStorm的基本过程的基本过程启动 nimbus 后,终端被该进程占用了,不能再继续执行其他命令了。因此我们需