spark分享分析-共48页课件.ppt

上传人(卖家):三亚风情 文档编号:3592688 上传时间:2022-09-22 格式:PPT 页数:48 大小:3.92MB
下载 相关 举报
spark分享分析-共48页课件.ppt_第1页
第1页 / 共48页
spark分享分析-共48页课件.ppt_第2页
第2页 / 共48页
spark分享分析-共48页课件.ppt_第3页
第3页 / 共48页
spark分享分析-共48页课件.ppt_第4页
第4页 / 共48页
spark分享分析-共48页课件.ppt_第5页
第5页 / 共48页
点击查看更多>>
资源描述

1、目录Spark简介Spark批处理Spark集群模式SparkSQLSpark StreamingSpark简介Spark是什么Spark特点Spark生态系统Spark与Hadoop的区别PPT模板下载:1ppt/moban/Spark是什么官网介绍:官网介绍:Apache Spark is a fast and general engine for large-scale data processing.Spark是加州大学伯克利分校AMP实验室开发的通用内存并行计算框架,分布式资源管理工作交由集群管理工具(Mesos、YARN)PPT模板下载:1ppt/moban/Spark特点1.先进

2、架构:Spark采用Scala语言编写,底层采用actor mode的akka作为通讯架构,代码十分简洁高效。基于DAG图的执行引擎,减少多次计算之间中间结果写到HDFS的开销。建立在统一抽象的RDD(分布式内存抽象)之上,使得它可以以基本一致的方式应对不同的大数据处理场景。2.运行速度快:提供Cache机制来支持需要反复迭代的计算,减少数据读取的IO开销3.易用性好:Spark提供广泛的数据集操作类型(各种转换算子,行动算子等)Spark支持Java,Python和Scala API,支持交互式Python和Scala的shell4.通用性强:以其RDD模型的强大表现能力,逐渐形成了一套自己

3、的生态圈,提供了full-stack的解决方案。主要包括Spark内存中批处理,Spark SQL交互式查询,Spark Streaming流式计算,Mllib机器学习算法,GraphX图计算。5.与Hadoop无缝衔接:Spark可以使用YARN作为它的集群管理器读取HDFS,HBASE的Hadoop的数据PPT模板下载:1ppt/moban/Spark生态圈也称为BDAS(伯克利数据分析栈),是伯克利APMLab实验室打造的,力图在算法(Algorithms)、机器(Machines)、人(People)之间通过大规模集成来展现大数据应用的一个平台。Spark生态圈以Spark Core为

4、核心,从HDFS、Cassandra、Amazon S3和HBase等持久层读取数据,以MESS、YARN和自身携带的Standalone为资源管理器调度Job完成Spark应用程序的计算。这些应用程序可以来自于不同的组件,如Spark Shell/Spark Submit的批处理、Spark Streaming的实时流处理应用、Spark SQL的即席查询、MLlib的机器学习、GraphX的图处理和SparkR的数学计算等等。Spark生态系统PPT模板下载:1ppt/moban/Spark与Hadoop的区别HadoopSpark抽象层次低,需要手工编写代码来完成,使用上难以上手基于RD

5、D的抽象,使数据处理逻辑的代码非常简短只提供两个操作,Map和Reduce,表达力欠缺提供很多转换和动作,很多基本操作如Join,GroupBy已经在RDD转换和动作中实现中间结果也放在HDFS文件系统中中间结果放在内存中,内存放不下了会写入本地磁盘ReduceTask需要等待所有MapTask都完成后才可以开始分区相同的转换构成流水线放在一个Task中运行,分区不同的转换需要Shuffle,被划分到不同的Stage中,需要等待前面的Stage完成后才可以开始时延高,只适用Batch数据处理,对于交互式数据处理,实时数据处理的支持不够通过将流拆成小的batch提供Discretized Str

6、eam处理流数据对于迭代式数据处理性能比较差通过在内存中缓存数据,提高迭代式计算的性能历经10年发展,已在生产环境稳定运行多年运行不够稳定PPT模板下载:1ppt/moban/Spark与Hadoop的区别Hadoop数据抽取运算模型:反复读写,磁盘IO是瓶颈Spark数据抽取运算模型:Spark批处理RDD简介Spark程序入口创建RDDRDD操作TransformationsActionsPPT模板下载:1ppt/moban/RDD(Resilient Distributed Dataset):弹性分布式数据集-分布式内存抽象的概念RDD是Spark对数据的核心抽象,是Spark的基石。R

7、DD是一个可容错、只读的、已被分区的、可并行操作的分布式元素集合RDD的特点:1.只读:状态不可变,不能修改2.分区:支持元素根据 Key 来分区(Partitioning),保存到多个结点上,还原时只会重新计算丢失分区的数据,而不会影响整个系统3.RDD必须是可序列化的4.路径:在 RDD 中叫血统(lineage),即 RDD 有充足的信息关于它是如何从其他 RDD 产生而来的5.持久化:可以控制存储级别(内存、磁盘等)来进行持久化在RDD的内部实现中每个RDD都可以使用5个方面的特性来表示,即数据分区的集合,能根据本地性快速访问到数据的偏好位置,依赖关系,计算方法,是否是哈希/范围分区的

8、元数据:RDD简介操作操作含义含义partitions()返回一组Partition对象preferredLocations(p)根据数据存放的位置,返回分区p在哪些节点访问更快dependencies()返回一组依赖iterator(p,parentIters)按照父分区的迭代器,逐个计算分区p的元素(计算函数)partitioner()返回RDD是否hash/range分区的元数据信息PPT模板下载:1ppt/moban/SparkContext:Spark应用程序需要做的第一件事就是创建一个 SparkContext 对象,SparkContext对象决定了Spark如何访问集群。而要新

9、建一个SparkContext对象,你还得需要构造一个 SparkConf 对象,SparkConf对象包含了你的应用程序的配置信息。每个JVM进程中,只能有一个活跃(active)的SparkContext对象。如果你非要再新建一个,那首先必须将之前那个活跃的SparkContext 对象stop()掉。Spark中已经有一个创建好的SparkContext,简称scScala:import org.apache.spark.SparkContext import org.apache.spark.SparkConfval conf=new SparkConf().setAppName(ap

10、pName).setMaster(master)val sc=new SparkContext(conf)Python:from pyspark import SparkConf,SparkContextconf=SparkConf().setAppName(appName).setMaster(master)sc=SparkContext(conf=conf)Spark程序入口PPT模板下载:1ppt/moban/一、并行化集合一、并行化集合并行化集合是以一个已有的集合对象(例如:Scala Seq)为参数,调用 SparkContext.parallelize()方法创建得到的RDD。集合

11、对象中所有的元素都将被复制到一个可并行操作的分布式数据集中。Scala:Python:val data=Array(1,2,3,4,5)data=1,2,3,4,5val distData=sc.parallelize(data)distData=sc.parallelize(data)二、外部数据集二、外部数据集Spark 可以通过Hadoop所支持的任何数据源来创建分布式数据集,包括:本地文件系统、HDFS、Cassandra、HBase、Amazon S3 等。Spark 支持的文件格式包括:文本文件(text files)、SequenceFiles,以及其他 Hadoop 支持的输入

12、格式(InputFormat)。Scala:val distFile=sc.textFile(data.txt)Python:distFile=sc.textFile(data.txt)三、通过转换现有的三、通过转换现有的RDD得到得到四、改变现有四、改变现有RDD的持久性的持久性(cache、save)创建RDDPPT模板下载:1ppt/moban/日志挖掘:val lines=sc.textFile(“hdfs:/.”)val errors=lines.filter(_.startsWith(“ERROR”)errors.cache()errors.filter(_.contains(“H

13、DFS”).map(_.split(t)(3).take(10)作用于RDD上的操作分为转换(transformation)和动作(action)。Spark中的所有transformation都是惰性的,在执行transformation,并不会提交Job,只有在执行action操作,才会被提交到集群中真正的被执行。Transformation:将已有RDD转换得到一个新的RDD。Action:计算,返回结果或把RDD数据写到存储系统中。RDD操作PPT模板下载:1ppt/moban/窄依赖(narrow dependencies):父RDD的每个分区都只被子RDD的一个分区所依赖比如map

14、、filter、union等宽依赖(wide dependencies):父RDD的分区被多个子RDD的分区所依赖。比如groupByKey、reduceByKey等1.程序优化:窄依赖支持在一个节点上管道化执行。如在filter之后执行map2.容错:窄依赖支持更高效的故障还原。只有丢失的父RDD的分区需要重新计算宽依赖需要所有父RDD的分区,因此就需要完全重新执行Checkpoint:Lineage链较长、宽依赖的RDD需要采用检查点机制。RDD依赖PPT模板下载:1ppt/moban/Transformation作用作用示例示例结果结果map(func)返回一个新的数据集,其中每个元素都

15、是由源RDD中一个元素经func转换得到的Rdd.map(x=x+1)1,2,3,32,3,4,4filter(func)返回一个新的数据集,其中包含的元素来自源RDD中元素经func过滤后的结果rdd.filter(x=x!=1)1,2,3,32,3,3flatMap(func)类似于map,但每个输入元素可以映射到0到n个输出元素Rdd.flatMap(x=x.split(“)“hello world”,”hi”“hello”,”world”,”hi”union(otherDataset)返回源数据集和参数数据集的并集Rdd.union(other)1,2,3 3,4,51,2,3,3,4

16、,5distinct(numTasks)返回对源数据集做元素去重后的新数据集Rdd.distinct()1,2,3,31,2,3groupByKey(numTasks)若源RDD包含(K,V)对,则返回一个新的数据集包含(K,Iterable)对Rdd.groupByKey()Rdd=(1,2),(3,4),(3,6)(1,2),(3,4,6)reduceByKey(func,numTasks)若源RDD为(K,V)对,则为(K,V)对的RDD,每个key对应的value是经过func聚合后的结果Rdd.reduceByKey(x,y)=x+y)Rdd=(1,2),(3,4),(3,6)(1,

17、2),(3,10)join(otherDataset,numTasks)若源RDD为(K,V)且参数RDD为(K,W),则返回的新RDD中将包含内关联后key对应的(K,(V,W)对Rdd.join(other)Rdd=(1,2),(3,4),(3,6)Other=(3,9)(3,(4,9),(3,(6,9)TransformationsPPT模板下载:1ppt/moban/Action作用作用示例示例1,2,3,3结果结果reduce(func)将RDD中元素按func进行聚合Rdd.reduce(x,y)=x+y)9collect()将数据集中所有元素以数组形式返回驱动器(driver)程

18、序。Rdd.collect()1,2,3,3count()返回数据集中元素个数Rdd.count()4first()返回数据集中首个元素(类似于 take(1))Rdd.first()1take(n)返回数据集中前 n个元素Rdd.take(2)1,2saveAsTextFile(path)将数据集中元素保存到指定目录下的文本文件中,支持本地文件系统、HDFS 或者其他任何Hadoop支持的文件系统。saveAsSequenceFile(path)将数据集中元素保存到指定目录下的Hadoop Sequence文件中,支持本地文件系统、HDFS 或者其他任何Hadoop支持的文件系统。saveA

19、sObjectFile(path)将RDD元素以Java序列化的格式保存成文件,保存结果文件可以使用 SparkContext.objectFile 来读取。ActionsSpark集群模式Spark完整示例集群模式概览术语解释Stage划分Spark任务调度Spark运行模式PPT模板下载:1ppt/moban/Spark完整示例Scala:submit提交:spark-submit-master yarn-class com.xxx AppName.jarimport org.apache.spark.SparkContext import org.apache.spark.SparkCo

20、nf object SimpleApp def main(args:ArrayString)val logFile=YOUR_SPARK_HOME/README.md val conf=new SparkConf().setAppName(Simple Application)val sc=new SparkContext(conf)val logData=sc.textFile(logFile,2).cache()val numAs=logData.filter(line=line.contains(a).count()val numBs=logData.filter(line=line.c

21、ontains(b).count()println(Lines with a:%s,Lines with b:%s.format(numAs,numBs)Python:submit提交:spark-submit-master yarn-executor-memory 10g AppName.pyfrom pyspark import SparkContextlogFile=YOUR_SPARK_HOME/README.md sc=SparkContext(local,Simple App)logData=sc.textFile(logFile).cache()numAs=logData.fil

22、ter(lambda s:a in s).count()numBs=logData.filter(lambda s:b in s).count()print(Lines with a:%i,lines with b:%i%(numAs,numBs)PPT模板下载:1ppt/moban/在Spark中由SparkContext负责和ClusterManager通信,进行资源的申请、任务的分配和监控等;当Executor部分运行完毕后,Driver负责将SparkContext关闭。Spark会为该应用在各个集群节点上申请executor,用于执行计算任务和存储数据。接下来,Spark将应用程序代

23、码(JAR包或者Python文件)发送给所申请到的executor。最后SparkContext将分割出的task发送给各个executor去运行。集群模式概览PPT模板下载:1ppt/moban/注意:1.每个Spark应用程序都有其对应的多个executor进程,executor进程在整个应用程序生命周期内,都保持运行状态,并以多线程方式运行所收到的任务。好处:可以隔离各个Spark应用,从调度角度来看,每个driver可以独立调度本应用程序内部的任务,从执行器角度来看,不同的Spark应用对应的任务将会在不同的JVM中运行。坏处:多个Spark应用程序之间无法共享数据,除非把数据写到外部

24、存储中。2.Spark对底层的ClusterManager一无所知。只要Spark能申请到executor进程,并且能与之通信即可。3.driver在整个生命周期内必须监听并接受其对应的各个Executor的连接请求。因此,driver必须能够被所有worker节点访问到。4.因为集群上的任务是由driver来调度的,所以driver应该和worker节点距离近一些,最好在同一个本地局域网中。如果你需要远程对集群发起请求,最好还是在driver节点上启动RPC服务,来响应这些远程请求,同时把driver本身放在集群worker节点比较近的机器上。集群模式概览PPT模板下载:1ppt/moban

25、/术语术语描述描述Application用户编写的Spark应用程序,包含一个Driver program和分布在集群中多个节点上运行的若干ExecutorDriver program运行Application的main()函数并且创建SparkContext Cluster manager在集群上获取资源的外部服务(例如:Standalone、Mesos、Yarn)Worker node集群中任何可以运行Application代码的节点ExecutorApplication运行在Worker 节点上的一个进程,该进程负责运行Task,并且负责将数据存在内存或者磁盘上,每个Applicatio

26、n都有各自独立的一批ExecutorTask被送到某个Executor上的工作单元Job包含多个Task组成的并行计算,往往由Spark Action催生Stage每个Job会被拆分很多组Task,每组任务被称为Stage,也可称TaskSet,一个作业分为多个阶段DAGScheduler实现将Spark作业分解成一到多个Stage,每个Stage根据RDD的Partition个数决定Task的个数,然后生成相应的Taskset放到TaskScheduler中TaskScheduler与DAGScheduler交互,实现Task分配到Executor上执行术语解释PPT模板下载:1ppt/mo

27、ban/Stage划分stage的边界有两种情况:1.宽依赖上的Shuffle操作;2.已缓存分区,它可以缩短父RDD的计算过程。一个stage的开始就是从外部存储或shuffle结果中读取数据,一个stage的结束就是发生shuffle或生成结果时PPT模板下载:1ppt/moban/1.创建RDD,经过一系列Transformation,最后Action2.Action会触发SparkContext的rujob方法,交给DAGScheduler处理3.DAGScheduler将DAG划分成Stage4.将Stage交给TaskScheduler5.集群的Executor上运行Spark任务

28、调度PPT模板下载:1ppt/moban/一个按 A-Z 首字母分类,查找相同首字母下不同姓名总个数的例子:步骤 1:创建 RDD 上面的例子除去最后一个 collect 是个动作,不会创建 RDD 之外,前面四个转换都会创建出新的 RDD。因此第一步就是创建好所有 RDD(内部的五项信息)。步骤 2:创建执行计划 Spark 会尽可能地管道化,并基于是否要重新组织数据来划分 阶段(stage),例如本例中的 groupByKey()转换就会将整个执行计划划分成两阶段执行。最终会产生一个 作为逻辑执行计划。步骤 3:调度任务将各阶段划分成不同的 任务(task),每个任务都是数据和计算的合体。

29、在进行下一阶段前,当前阶段的所有任务都要执行完成。sc.textFile(“hdfs:/names”).map(name=(name.charAt(0),name).groupByKey().mapValues(name=names.toSet.size).collect()RDD执行过程PPT模板下载:1ppt/moban/运行环境运行环境模式模式描述描述Local单机模式常用于本地开发测试,分为local单线程和local-cluster多线程Standalone集群模式Spark自带,最简单的集群模式Hadoop Yarn集群模式运行在Yarn资源管理器之上,由Yarn负责资源管理,Sp

30、ark负责任务调度和计算Apache Mesos集群模式运行在Mesos资源管理器之上,由Yarn负责资源管理,Spark负责任务调度和计算Amazon Ec2集群模式运行在云端的集群Spark运行模式Spark SQLSparkSQL简介SparkSQL入口DataFrame简介DataFrame创建DataFrame操作和RDD互操作PPT模板下载:1ppt/moban/SQL-on-Hadoop:Hive是SQL-on-Hadoop最常用的工具,但是MapReduce计算过程中大量的中间磁盘落地过程消耗了大量的I/O,降低的运行效率,为了提高SQL-on-Hadoop的效率,大量的SQL

31、-on-Hadoop工具开始产生,其中表现较为突出的是:MapR的DrillCloudera的ImpalaHortonworks的Hive on TezFacebook的PrestoSparkSQL(Shark-Hive on Spark)但Shark对于Hive的太多依赖,制约了Spark各个组件的相互集成,所以提出了SparkSQL项目。SparkSQL简介PPT模板下载:1ppt/moban/入口:入口:SQLContext:SparkSQL的所有功能入口都是SQLContext类及其子类。要创建一个SQLContext对象,首先需要有一个SparkContext对象。Scala:Val

32、 sc:SparkContext/假设已经有一个SparkContext对象Val sqlContext=new org.apache.spark.sql.SQLContext(sc)Python:sqlContext=SQLContext(sc)HiveContext:HiveContext继承自SQLContext,除了SQLContext的功能之外,HiveContext还提供了完整的HiveQL语法,UDF使用,以及对Hive表中数据的访问。Scala:Val HiveContext=new org.apache.spark.sql.hive.HiveContext(sc)Python

33、:HiveContext=HiveContext(sc)SparkSQL入口PPT模板下载:1ppt/moban/DataFrame:是一种分布式数据集合,每一条数据都由几个命名字段组成。概念上来说,她和关系型数据库的表 或者 R和Python中的data frame等价,只不过在底层,DataFrame采用了更多优化。DataFrame可以从很多数据源加载数据并构造得到,如:结构化数据文件,Hive中的表,外部数据库,或者已有的RDD。相对于RDD,DataFrame有几个特点:1.包含schema信息,能够进行针对性的优化。2.对用户有更加友好、更直观的API。3.与外部数据源 API紧密

34、集成,可以用作多种存储格式和存储系统间的数据交换媒介。DataFrame简介PPT模板下载:1ppt/moban/创建DataFrame1.从json文件创建DataFrame:val df=hiveContext.read.json(examples/src/main/resources/people.json)2.从parquet文件创建DataFrame:val df=hiveContext.read.parquet(examples/src/main/resources/people.parquet)3.从orc文件创建DataFrame:val df=hiveContext.read

35、.orc(examples/src/main/resources/people.orc)4.从hive表创建DataFrame:val df=hiveContext.table(gdpi)5.从txt文件创建DataFrame:val df=hiveContext.read.text(/path/to/spark/README.md)PPT模板下载:1ppt/moban/DataFrame操作1.展示 DataFrame 的内容 df.show()/age name/null Michael/30 Andy/19 Justin2.展示所有人,但所有人的 age 都加1 df.select(df

36、(name),df(age)+1).show()/name (age+1)/Michael null/Andy 31/Justin 20 3.计算各个年龄的人数 df.groupBy(age).count().show()/age count/null 1/19 1/30 1PPT模板下载:1ppt/moban/DataFrame操作SQL操作:1.首先把DataFrame注册为临时表df.registerTempTable(people)2.HiveContext.sql执行SQL查询,并返回DataFrame,语法与Hql一致val teenagers=hiveContext.sql(SE

37、LECT name,age FROM people WHERE age=13 AND age Person(p(0),p(1).trim.toInt).toDF()people.registerTempTable(people)/sqlContext.sql方法可以直接执行SQL语句 val teenagers=sqlContext.sql(SELECT name,age FROM people WHERE age=13 AND age StructField(fieldName,StringType,true)/将RDDpeople的各个记录转换为Rows,即:得到一个包含Row对象的RDD

38、val rowRDD=people.map(_.split(,).map(p=Row(p(0),p(1).trim)/将schema应用到包含Row对象的RDD上,得到一个DataFrame val peopleDataFrame=sqlContext.createDataFrame(rowRDD,schema)/将DataFrame注册为table peopleDataFrame.registerTempTable(people)/执行SQL语句 val results=sqlContext.sql(SELECT name FROM people)PPT模板下载:1ppt/moban/和RD

39、D互操作编程方式定义编程方式定义Schema:Python:From pyspark.sql import SQLContextFrom pyspar.sql.types import*sqlContext=SQLContext(sc)/加载文件Lines=sc.textFile(“examples/src/main/resources/people.txt”)Parts=lines.map(lambda l:l.split(“,”)/转换每一行为元组People=parts.map(lambda p:(p0,p1.strip()/定义schemaschemaString=“name age”

40、Fields=StructField(field_name,StringType(),True)for field_name in schemaString.split()Schema=StructType(fields)/将schema应用到RDD上schemaPeople=sqlContext.createDataFrame(people,schema)/注册为表schemaPeople.registerTempTable(“people”)Results=sqlContext.sql(“SELECT name FROM people”)Spark StreamingSpark Strea

41、ming简介Spark Streaming与Strom的对比Spark Streaming工作原理Spark Streaming编程离散数据流转换和输出算子PPT模板下载:1ppt/moban/Spark Streaming简介Spark Streaming能够实现对实时数据流的流式处理,并具有很好的可扩展性、高吞吐量和容错性。支持从多种数据源获取数据,包括Kafk、Flume、Twitter、ZeroMQ、Kinesis 以及TCP sockets,从数据源获取数据之后,可以使用诸如map、reduce、join和window等高级函数进行复杂算法的处理。最后还可以将处理结果存储到文件系统,

42、数据库数据库和现场仪表盘。在“One Stack rule them all”的基础上,还可以使用Spark的其他子框架,如机器学习、图计算等,对流数据进行处理。PPT模板下载:1ppt/moban/Spark Streaming与Strom的区别StromSpark Streaming传入一个处理一个,吞吐量相对小批处理,吞吐量大实现真正流式实时的处理数据,延迟在秒级以下,实时性很高本质还是批量处理,在短的时间窗口内进行数据实时处理,延迟在秒级左右,实时性相对较弱每个单独的记录当它通过系统时必须被跟踪,所以Storm能够至少保证每个记录将被处理一次,但是在从错误中恢复过来时候允许出现重复记录

43、能够保证每个批处理的所有数据只处理一次,保证数据不会在恢复的时候错乱(批处理重新执行)Clojure语言开发Scala语言开发提供java API提供java、python API2019年开始就在Twitter内部生产环境中使用,已经非常成熟2019年才陆续有一些公司开始试用,但最近越来越多公司已经在生产中使用PPT模板下载:1ppt/moban/Spark Streaming工作原理Spark Streaming从实时数据流接入数据,再将其划分为一个个小批量供后续Spark engine处理,最终得到处理后的一批批结果数据。处理流程:1.Spark Streaming把实时输入数据流以时间

44、片t(如1秒)为单位切分成块2.Spark Streaming会把每块数据作为一个RDD,并使用RDD操作处理每一小块数据3.每个块都会生成一个Spark Job处理4.最终结果也返回多块PPT模板下载:1ppt/moban/Spark Streaming编程入口:StreamingContext 是Spark Streaming的入口。val conf=new SparkConf().setMaster(local2).setAppName(NetworkWordCount)val ssc=new StreamingContext(conf,Seconds(1)/已有spark contex

45、t的情况val ssc=new StreamingContext(sc,Seconds(1)Python:Sc=SparkContext(“local2”,”NetworkWordCount”)Ssc=StreamingContext(sc,1)步骤:1.创建DStream对象,并定义好输入数据源。2.基于数据源DStream定义好计算逻辑和输出。3.调用streamingContext.start()启动接收并处理数据。4.调用streamingContext.awaitTermination()等待流式处理结束。5.你可以主动调用 streamingContextssc.stop()来手动

46、停止处理流程。注意点:1.一旦streamingContext启动,就不能再对其计算逻辑进行添加或修改。2.一旦streamingContext被stop掉,就不能restart。3.单个JVM虚机同一时间只能包含一个active的StreamingContext。PPT模板下载:1ppt/moban/Spark Streaming编程Spark-shell中:import org.apache.spark._import org.apache.spark.streaming._val ssc=new StreamingContext(sc,Seconds(1)/创建StreamingCont

47、ext,批次间隔为1秒val lines=ssc.socketTextStream(“192.168.5.2,9998)/创建一个连接到hostname:port的Dstreamval words=lines.flatMap(_.split(“”)/将每一行分割成多个单词val pairs=words.map(word=(word,1)/对每一批次中的单词进行计数val wordCounts=pairs.reduceByKey(_+_)wordCounts.print()/将该DStream产生的RDD的头十个元素打印到控制台上ssc.start()/启动流式计算ssc.awaitTermin

48、ation()/等待直到计算终止Python:From pyspark import SparkComtextFrom pyspark.streaming import StreamingContextWords=lines.flatMap(lambda line:line.split(“”)/将每一行分割成多个单词Pairs=words.map(lambda word:(word,1)/对每一批次中的单词进行计数wordCounts=pairs.reduceByKey(lambda x,y:x+y)wordCount.pprint()/将该DStream产生的RDD的头十个元素打印到控制台上

49、PPT模板下载:1ppt/moban/离散数据流离散数据流(DStreams)Dstream:离散数据流是Spark Streaming最基本的抽象。它代表了一种连续的数据流,要么从某种数据源提取数据,要么从其他数据流映射转换而来。DStream内部是由一系列连续的RDD组成的,每个RDD都是不可变、分布式的数据集。每个RDD都包含了特定时间间隔内的一批数据。任何作用于DStream的算子,其实都会被转化为对其内部RDD的操作。例如,在前面的例子中,我们将 lines 这个DStream转成words DStream对象,其实作用于lines上的flatMap算子,会施加于lines中的每个R

50、DD上,并生成新的对应的RDD,而这些新生成的RDD对象就组成了words这个DStream对象。PPT模板下载:1ppt/moban/Transformation算子Transformation用途用途transform(func)返回一个新的DStream,其包含的RDD为源RDD经过func操作后得到的结果。利用该算子可以对DStream施加任意的操作updateStateByKey(func)返回一个包含新”状态”的DStream。源DStream中每个key及其对应的values会作为func的输入,而func可以用于对每个key的“状态”数据作任意的更新操作。Transform算子

展开阅读全文
相关资源
猜你喜欢
相关搜索

当前位置:首页 > 办公、行业 > 各类PPT课件(模板)
版权提示 | 免责声明

1,本文(spark分享分析-共48页课件.ppt)为本站会员(三亚风情)主动上传,163文库仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对上载内容本身不做任何修改或编辑。
2,用户下载本文档,所消耗的文币(积分)将全额增加到上传者的账号。
3, 若此文所含内容侵犯了您的版权或隐私,请立即通知163文库(发送邮件至3464097650@qq.com或直接QQ联系客服),我们立即给予删除!


侵权处理QQ:3464097650--上传资料QQ:3464097650

【声明】本站为“文档C2C交易模式”,即用户上传的文档直接卖给(下载)用户,本站只是网络空间服务平台,本站所有原创文档下载所得归上传人所有,如您发现上传作品侵犯了您的版权,请立刻联系我们并提供证据,我们将在3个工作日内予以改正。


163文库-Www.163Wenku.Com |网站地图|