1、 大数据导论第十章CONTENTS目录PART 01 Spark Streaming 简介PART 02 Spark Streaming的执行模型PART 03 编程模型PART 04 DStream的操作PART 05 持久化和性能调优PART 06 编程实战PART 07 作业PART 01 Spark Streaming 简介Spark Streaming是Spark核心API的一个扩展,可以实现高吞吐量的、具备容错机制的实时流数据的处理。概述Spark Streaming是Spark核心API的一个扩展,可以实现高吞吐量的、具备容错机制的实时流数据的处理。支持从多种数据源获取数据,再使
2、用高级函数进行复杂算法的处理。最后还可以将处理结果存储到文件系统,数据库和现场仪表盘。Spark Streaming 数据流:概述Spark Streaming 工作原理:接收实时的输入数据流 根据一定的时间间隔(比如1秒钟)拆分成一批批的数据 然后通过Spark引擎处理这些批数据 最终得到处理后的一批批结果数据概述Spark Streaming支持一个高层的抽象,叫做离散流(Discretized Stream)或者DStream,它代表连续的数据流。在内部,DStream是由一系列RDD组成。对应的批数据,在Spark内核对应一个RDD 实例。因此,对应流数据的DStream可以看成是一组
3、RDD,即RDD的一个序列。术语定义 离散流(Discretized Stream)或 DStream 这是 Spark Streaming对内部持续的实时数据流的抽象描述,即处理的一个实时数据流,在 Spark Streaming中对应于一个DStream 实例。批数据(Batch Data)是化整为零的第一步。将实时流数据以时间片为单位进行分批,将流处理转化为时间片数据的批处理。随着持续时间的推移,这些处理结果就形成了对应的结果数据流了。术语定义 时间片或批处理时间间隔(Batch Interval)这是人为地对流数据进行定量的标准,以时间片作为拆分流数据的依据。一个时间片的数据对应一个R
4、DD 实例。窗口长度(Window Length)一个窗口覆盖的流数据的时间长度。必须是批处理时间间隔的倍数。术语定义 滑动时间间隔 前一个窗口到后一个窗口所经过的时间长度。必须是批处理时间间隔的倍数。Input DStream 一个Input DStream是一个特殊的DStream,将 Spark Streaming连接到一个外部数据源来读取数据。PART 02 Spark Streaming 的执行模型Spark Streaming是一个对实时数据流进行高通量、容错处理的流式处理系统,可以对多种数据源进行类似Map、Reduce等复杂操作,并将结果保存到外部文件系统、数据库或应用到实时仪
5、表盘。传统流处理系统架构传统流处理系统架构传统流处理系统架构大部分传统的流处理系统被设计为连续算子模型。系统包含一系列的工作节点,每组节点运行一至多个连续算子;对于流数据,每个连续算子一次处理一条记录,并且将记录传输给管道中别的算子;源算子(Source Operator)从采集系统接收数据,接着沉算子(Sink Operator)输出到下游系统。连续算子是一种较为简单、自然的模型。然而,在大数据时代,这个传统的架构也面临着严峻的挑战。1.故障恢复问题 2.负载均衡问题3.支持统一的流处理与批处理以及交互工作4.高级分析能力Spark Streaming系统架构Spark Streaming
6、系统架构Spark Streaming系统架构Spark Streaming引入了一个称之为Discretized Streams(离散化的流数据处理)的新结构,它可以直接使用Spark引擎中丰富的库并且拥有优秀的故障容错机制。对于传统流处理中一次处理一条记录的方式而言,Spark Streaming取而代之的是将流数据离散化处理,使之能够进行秒级以下的微型批处理。与传统连续算子模型不同,传统模型是静态分配给一个节点进行计算,而Spark Task可基于数据的来源以及可用资源情况动态分配给工作节点。这能够更好的实现流处理所需要的两个特性:1.负载均衡 2.快速故障恢复此外,Executor除了
7、可以处理Task,还可以将数据存在Cache或者HDFS上面。计算流程Spark Streaming 计算流程计算流程l Spark Streaming首先把输入数据按照批段大小(Batch Size),如1秒,分成一段一段的数据(Discretized Stream)l 每一段数据都转换成Spark中的RDD(Resilient Distributed Dataset)l 然后将Spark Streaming中对DStream的Transformation操作变为针对Spark中对RDD的Transformation操作l 将RDD经过操作变成中间结果保存在内存中。动态负载均衡动态负载均衡动
8、态负载均衡Spark系统将数据划分为小批量,允许对资源进行细粒度分配。例如:当输入数据流需要由一个键值来分区处理。传统流处理系统采用传统静态分配任务给节点,如果其中一个分区计算比别的更密集,那么该节点处理将会遇到性能瓶颈,同时将会减缓管道处理。在Spark Streaming中,作业任务将会动态地平衡分配给各个节点,一些节点会处理数量较少且耗时较长任务,别的节点将会处理数量更多且耗时更短的任务。容错性对于流式计算来说,容错性至关重要。RDD的容错机制:每一个RDD都是一个不可变的分布式可重算的数据集,其记录着确定性的操作继承关系(Lineage),所以只要输入数据是可容错的,那么任意一个RDD
9、的分区(Partition)出错或不可用,都是可以利用原始输入数据通过转换操作而重新计算来得到。容错性RDD的传承关系:容错性故障恢复方式比较实时性Spark Streaming将流式计算分解成多个Spark Job,对于每一段数据的处理都会经过Spark DAG图分解以及Spark的任务集的调度过程。对于目前版本的Spark Streaming而言,其最小的批次大小的选取在0.52秒钟之间,所以Spark Streaming能够满足除对实时性要求非常高(如高频实时交易)之外的所有流式准实时计算场景。扩展性和吞吐量Spark目前在EC2上已能够线性扩展到100个节点(每个节点 4Core),可
10、以以数秒的延迟处理6GB/秒的数据量,60M条记录/秒。PART 03 编程模型DStream作为Spark Streaming的基础抽象,它代表持续性的数据流。这些数据流既可以通过外部输入源来获取,也可以通过现有的Dstream 的 Transformation操作来获得。编程模型在内部实现上,DStream由一组时间序列上连续的RDD来表示。每个RDD都包含了自己特定时间间隔内的数据流:对DStream中数据的各种操作也是映射到内部的RDD上来进行的:如何使用 Spark Streamingimport org.apache.spark._ import org.apache.spark.
11、streaming._ import org.apache.spark.streaming.StreamingContext._/创建一个拥有2个工作线程,时间片长度为1秒的StreamContext/主节点需要2核以免产生饥饿状态发生val conf=new SparkConf().setMaster(local2).setAppName(NetworkWordCount)val ssc=new StreamingContext(conf,Seconds(1)/创建连接到hostname:port的DStreamCreate,比如localhost:9999 val lines=ssc.so
12、cketTextStream(localhost,9999)如何使用 Spark Streaming/把每一行分解为单词val words=lines.flatMap(_.split()import org.apache.spark.streaming.StreamingContext._/计数每一个时间片内的单词量val pairs=words.map(word=(word,1)val wordCounts=pairs.reduceByKey(_+_)/打印该DStream生成的每个RDD中的前10个单词 wordCounts.print()ssc.start()/启动计算ssc.await
13、Termination()/等待计算完成DStream 的输入源在Spark Streaming中所有的操作都是基于流的,而输入源是这一系列操作的起点。输入DStream和DStream接收的流都代表输入数据流的来源,在Spark Streaming提供两种内置数据流来源:l 基础来源 在 StreamingContext API 中直接可用的来源。例如:文件系统、Socket(套接字)连接和Akka actors;l 高级来源 如Kafka、Flume、Kinesis、Twitter等,可以通过额外的实用工具类创建。与RDD类似,DStream也提供了自己的一系列操作方法,这些操作可以分成三
14、类:普通的转换操作、窗口转换操作和输出操作。PART 04 DStream 的操作普通的转换操作转换描述map(func)源DStream的每个元素通过函数func返回一个新的DStream。flatMap(func)类似与map操作,不同的是每个输入元素可以被映射出0或者更多的输出元素。filter(func)在源DSTREAM上选择 Func函数返回仅为true的元素,最终返回一个新的 DSTREAM。repartition(numPartitions)通过输入的参数numPartitions的值来改变 DStream的分区大小。union(otherStream)返回一个包含源DStre
15、am与其他DStream的元素合并后的新DSTREAMcount()对源DStream 内部的所含有的 RDD 的元素数量进行计数,返回一个内部的 RDD只包含一个元素的DStreaam。普通的转换操作转换描述reduce(func)使用函数func(有两个参数并返回一个结果)将源DStream中每个RDD的元素进行聚合操作,返回一个内部所包含的RDD只有一个元 素的新DStream。countByValue()计算DStream中每个RDD内的元素出现的频次并返回新的DStream(K,Long),其中K是RDD中元素的类型,Long是元素出现的频次。reduceByKey(func,num
16、Tasks)当一个类型为(K,V)键值对的DStream被调用的时候,返回类型为类型为(K,V)键值对的新DStream,其中每个键的值V都是使用聚合函数func汇总。可以通过配置 numTasks设置不同的并行任务数。join(otherStream,numTasks)当被调用类型分别为(K,V)和(K,W)键值对的2个DStream时,返回类型为(K,(V,W)键值对的一个新DStream。普通的转换操作转换描述cogroup(otherStream,numTasks)当被调用的两个DStream分别含有(K,V)和(K,W)键值对时,返回一个(K,SeqV,SeqW)类型的新的 DStr
17、eam。transform(func)通过对源DStream的每RDD应用RDD-to-RDD函数返回一个新的DStream,这可以用来在DStream做任意RDD操作。updateStateByKey(func)返回一个新状态的DStream,其中每个键的状态是根据键的前一个状态和键的新值应用给定函数func 后的更新。这个方法可以被用来维持每个键的任何状态数据。窗口转换操作转换描述window(windowLength,slideInterval)返回一个基于源DStream的窗口批次计算后得到新的DStream。countByWindow(windowLength,slideInterv
18、al)返回基于滑动窗口的DStream中的元素的数量。reduceByWindow(func,windowLength,slideInterval)基于滑动窗口对源DStream中的元素进行聚合操作,得到一个新的DStream。reduceByKeyAndWindow(func,windowLength,slideInterval,numTasks)基于滑动窗口对(K,V)键值对类型的 DStream 中的值按K使用聚合函数func进行聚合操作,得到一个新的DStream。窗口转换操作转换描述reduceByKeyAndWindow(func,invFunc,windowLength,slid
19、eInterval,numTasks)一个更高效的reduceByKkeyAndWindow()的实现版本,先对滑动窗口中新的时间间隔内数据增量聚合并移去最早的与新增数据量的时间间隔内的数据统计量。例如,计算t+4秒这个时 刻过去5秒窗口的WordCount,那么我们可以将 t+3时刻过去5秒的统计量加上t+3,t+4的统计量,在减去t-2,t-1的统计 量,这种方法可以复用中间三秒的统计量,提高统计的效率countByValueAndWindow(windowL ength,slideInterval,numTasks)基于滑动窗口计算源 DStream 中每个 RDD内每个元素出现的频次并
20、返回DStream(K,Long),其中 K是RDD中元素的类型,Long是元素频次。reduce任务的数量可以通过一个可选参数进行配置。输出操作转换描述print()在Driver 中打印出 DStream 中数据的前10个元素。saveAsTextFiles(prefix,suffix)将DStream中的内容以文本的形式保存为文本文件,其中每次批处理间隔内产生的文件以 prefix-TIME_IN_MS.suffix的方式命名。saveAsObjectFiles(prefix,suffix)将DStream中的内容按对象序列化并且以SequenceFile的格式保存。其中每次批处理间隔内
21、产生的文件以prefix-TIME_IN_MS.suffix的方式命名。saveAsHadoopFiles(prefix,suffix)将DStream中的内容以文本的形式保存为Hadoop文件,其中每次批处理 间隔内产生的文件以 prefix-TIME_IN_MS.suffix的方式命名。foreachRDD(func)最基本的输出操作,将func函数应用于DStream中的RDD上,这个操作会输出数据到外部系统,比如保存RDD到文件或者网络数据库等。需要注意的是 func函数是在运行该Streaming应用的Driver进程里执行的。持久化和性能调优PART 05 持久化和性能调优持久化与
22、RDD一样,DStream同样也能通过persist()方法将数据流存放在内存中,默认的持久化方式是 MEMORY_ONLY_SER,也就是在内存中存放数据同时序列化的方式。这样做的好处是遇到需要多次迭代计算的程序时,速度优势十分的明显。而对于一些基于窗口的操作,如reduceByWindow、reduceByKeyAndWindow,以及基于状态的操作,如updateStateBykey,其默认的持久化策略就是保存在内存中。对于来自网络的数据源(Kafka、Flume、sockets等),默认的持久化策略是将数据保存在两台机器上,这也是为了容错性而设计的。性能调优性能优化从大方面来分可以分为
23、两种:优化运行时间l 增加并行度l 减少数据序列化,反序列化的负担l 设置合理的批处理时间l 减少因任务提交和分发所带来的负担 优化内存使用l 控制批处理间隔内的数据量l 及时清理不再使用的数据l 观察及适当调整GC策略性能调优n 优化运行时间增加并行度确保使用整个集群的资源,而不是把任务集中在几个特定的节点上。对于包含shuffle的操作,增加其并行度以确保更为充分地使用集群资源。n 优化运行时间减少数据序列化,反序列化的负担序列化和反序列化需要更多的CPU时间,因此更加高效的序列化方式(Kryo)和自定义的系列化接口可以更高效地使用CPU。性能调优n 优化运行时间设置合理的批处理时间在Sp
24、ark Streaming中,Job之间有可能存在依赖关系,后面的Job必须确保前面的作业执行结束后才能提交。若前面的Job执行的时间超出了批处理时间间隔,那么后面的Job就无法按时提交,这样就会进一步拖延接下来的Job,造成后续Job的阻塞。因此设置一个合理的批处理间隔以确保作业能够在这个批处理间隔内结束时必须的。n 优化运行时间减少因任务提交和分发所带来的负担通常情况下,Akka框架能够高效地确保任务及时分发,但是当批处理间隔非常小(500ms)时,提交和分发任务的延迟就变得不可接受了。性能调优n 优化内存使用控制批处理间隔内的数据量Spark Streaming会把批处理间隔内接收到的所
25、有数据存放在Spark内部的可用内存区域中,因此必须确保当前节点Spark的可用内存中能容纳这个批处理时间间隔内的所有数据,否则必须增加新的资源以提高集群的处理能力。n 优化内存使用及时清理不再使用的数据对于处理过的不再需要的数据应及时清理,以确保Spark Streaming有富余的可用内存空间。性能调优n 优化内存使用观察及适当调整GC策略GC会影响Job的正常运行,可能延长Job的执行时间,引起一系列不可预料的问题。观察GC的运行情况,采用不同的GC策略以进一步减小内存回收对Job运行的影响。编程实战PART 06 编程实战流数据模拟器在实例演示中模拟实际情况,需要源源不断地接入流数据,
26、为了在演示过程中更接近真实环境,首先需要定义流数据模拟器。该模拟器主要功能是通过Socket方式监听指定的端口号,当外部程序通过该端口连接并请求数据时,模拟器将定时将指定的文件数据随机获取发送给外部程序。import java.io.PrintWriterimport .ServerSocketimport scala.io.Source object StreamingSimulation /定义随机获取整数的方法 def index(length:Int)=import java.util.Random val rdm=new Random rdm.nextInt(length)def m
27、ain(args:ArrayString)/调用该模拟器需要三个参数,分别为文件路径、端口号和间隔时间(单位:毫秒)if(args.length!=3)System.err.println(Usage:)System.exit(1)/获取指定文件总的行数 val filename=args(0)val lines=Source.fromFile(filename).getLines.toList val filerow=lines.length /指定监听某端口,当外部程序请求时建立连接 val listener=new ServerSocket(args(1).toInt)while(tru
28、e)val socket=listener.accept()new Thread()override def run=println(Got client connected from:+socket.getInetAddress)val out=new PrintWriter(socket.getOutputStream(),true)while(true)Thread.sleep(args(2).toLong)/当该端口接受请求时,随机获取某行数据发送给对方 val content=lines(index(filerow)println(content)out.write(content+
29、n)out.flush()socket.close().start()实例1:读取文件演示在该实例中Spark Streaming将监控某目录中的文件,获取在间隔时间段内变化的数据,然后通过Spark Streaming计算出该时间段内单词统计数。import org.apache.spark.SparkConfimport org.apache.spark.streaming.Seconds,StreamingContextimport org.apache.spark.streaming.StreamingContext._ object FileWordCount def main(ar
30、gs:ArrayString)val sparkConf=new SparkConf().setAppName(FileWordCount).setMaster(local2)/创建Streaming的上下文,包括Spark的配置和时间间隔,这里时间为间隔20秒 val ssc=new StreamingContext(sparkConf,Seconds(20)/指定监控的目录,在这里为/home/hadoop/temp/val lines=ssc.textFileStream(/home/hadoop/temp/)/对指定文件夹变化的数据进行单词统计并且打印 val words=lines.
31、flatMap(_.split()val wordCounts=words.map(x=(x,1).reduceByKey(_+_)wordCounts.print()/启动Streaming ssc.start()ssc.awaitTermination()实例2:网络数据演示在该实例中将由流数据模拟以1秒的频度发送模拟数据,Spark Streaming通过Socket接收流数据并每20秒运行一次用来处理接收到数据,处理完毕后打印该时间段内数据出现的频度,即在各处理段时间之间状态并无关系。import org.apache.spark.SparkContext,SparkConfimpor
32、t org.apache.spark.streaming.Milliseconds,Seconds,StreamingContextimport org.apache.spark.streaming.StreamingContext._import org.apache.spark.storage.StorageLevel object NetworkWordCount def main(args:ArrayString)val conf=new SparkConf().setAppName(NetworkWordCount).setMaster(local2)val sc=new Spark
33、Context(conf)val ssc=new StreamingContext(sc,Seconds(20)/通过Socket获取数据,需要提供Socket的主机名和端口号,数据保存在内存和硬盘中 val lines=ssc.socketTextStream(args(0),args(1).toInt,StorageLevel.MEMORY_AND_DISK_SER)/对读入的数据进行分割、计数 val words=lines.flatMap(_.split(,)val wordCounts=words.map(x=(x,1).reduceByKey(_+_)wordCounts.prin
34、t()ssc.start()ssc.awaitTermination()实例3:Stateful演示该实例为Spark Streaming状态操作,模拟数据由流数据模拟以1秒的频度发送,Spark Streaming通过Socket接收流数据并每5秒运行一次用来处理接收到数据,处理完毕后打印程序启动后单词出现的频度,相比较实例2,在该实例中各时间段之间状态是相关的。import org.apache.log4j.Level,Loggerimport org.apache.spark.SparkContext,SparkConfimport org.apache.spark.streaming.
35、Seconds,StreamingContextimport org.apache.spark.streaming.StreamingContext._ object StatefulWordCount def main(args:ArrayString)if(args.length!=2)System.err.println(Usage:StatefulWordCount )System.exit(1)Logger.getLogger(org.apache.spark).setLevel(Level.ERROR)Logger.getLogger(org.eclipse.jetty.serve
36、r).setLevel(Level.OFF)/定义更新状态方法,参数values为当前批次单词频度,state为以往批次单词频度 val updateFunc=(values:SeqInt,state:OptionInt)=val currentCount=values.foldLeft(0)(_+_)val previousCount=state.getOrElse(0)Some(currentCount+previousCount)val conf=new SparkConf().setAppName(StatefulWordCount).setMaster(local2)val sc=n
37、ew SparkContext(conf)/创建StreamingContext,Spark Steaming运行时间间隔为5秒 val ssc=new StreamingContext(sc,Seconds(5)/定义checkpoint目录为当前目录 ssc.checkpoint(.)/获取从Socket发送过来数据 val lines=ssc.socketTextStream(args(0),args(1).toInt)val words=lines.flatMap(_.split(,)val wordCounts=words.map(x=(x,1)/使用updateStateByKey
38、来更新状态,统计从运行开始以来单词总的次数 val stateDstream=wordCounts.updateStateByKeyInt(updateFunc)stateDstream.print()ssc.start()ssc.awaitTermination()实例4:窗口演示该实例为Spark Streaming窗口操作,模拟数据由流数据模拟以1秒的频度发送,Spark Streaming通过Socket接收流数据并每10秒运行一次用来处理接收到数据,处理完毕后打印程序启动后单词出现的频度。相比前面的实例,Spark Streaming窗口统计是通过reduceByKeyAndWind
39、ow()方法实现的,在该方法中需要指定窗口时间长度和滑动时间间隔。实例4:窗口演示import org.apache.log4j.Level,Loggerimport org.apache.spark.SparkContext,SparkConfimport org.apache.spark.storage.StorageLevelimport org.apache.spark.streaming._import org.apache.spark.streaming.StreamingContext._ object WindowWordCount def main(args:ArrayStr
40、ing)if(args.length!=4)System.err.println(Usage:WindowWorldCount )System.exit(1)Logger.getLogger(org.apache.spark).setLevel(Level.ERROR)Logger.getLogger(org.eclipse.jetty.server).setLevel(Level.OFF)val conf=new SparkConf().setAppName(WindowWordCount).setMaster(local2)val sc=new SparkContext(conf)/创建S
41、treamingContext val ssc=new StreamingContext(sc,Seconds(5)/定义checkpoint目录为当前目录 ssc.checkpoint(.)/通过Socket获取数据,需提供Socket的主机名和端口号,数据保存在内存和硬盘中 val lines=ssc.socketTextStream(args(0),args(1).toInt,StorageLevel.MEMORY_ONLY_SER)val words=lines.flatMap(_.split(,)/windows操作,第一种方式为叠加处理,第二种方式为增量处理 val wordCou
42、nts=words.map(x=(x,1).reduceByKeyAndWindow(a:Int,b:Int)=(a+b),Seconds(args(2).toInt),Seconds(args(3).toInt)/val wordCounts=words.map(x=(x,1).reduceByKeyAndWindow(_+_,_-_,Seconds(args(2).toInt),Seconds(args(3).toInt)wordCounts.print()ssc.start()ssc.awaitTermination()PART 07 作业 作业作业:1.什么是Spark Streami
43、ng?2.请画图描述并解释Spark Streaming的内部处理机制和工作原理。3.请解释下列与Spark Streaming相关的术语:离散流(DStream);批数据(Batch Data);时间片(Batch Interval);窗口长度(Window Length);滑动时间间隔(Slide Interval)4.传统流处理系统架构的主要缺陷是什么?5.请描述Spark Streaming的系统架构,并进行解释。6.请画图描述并解释Spark Streaming的计算流程。7.请阐述Spark Streaming的容错能力。作业作业:8.请描述DStream中的数据操作流程。9.DS
44、tream的输入源有哪些,请举例说明。10.请描述下述DStream普通转换操作的作用:window(windowLength,slideInterval);transform(func);updateByKey(func)11.请描述下述DStream窗口转换操作的作用:map(func);flatMap(func);union(otherStream);countByWindow(windowLength,slideInterval);reduceByWindow(func,windowLength,slideInterval);reduceByKeyAndWindow(func,wind
45、owLength,slideInterval);reduceByKeyAndWindow(func,invFunc,windowLength,slideInterval);countByValueAndWindow(windowL ength,slideInterval,numTasks);作业作业:12.假定图中批处理的时间间隔是1秒钟,请回答窗口间隔是多长,滑动间隔是多长。作业作业:13.请编写完成求单词计数的任务。Spark Streaming将监控某目录中的文件,获取在间隔时间段内变化的数据,然后通过Spark Streaming计算出改时间段内单词统计数。谢谢FOR YOUR LISTENING