Flink原理与实践-DataStream-API的介绍和使用课件.pptx

上传人(卖家):晟晟文业 文档编号:5218050 上传时间:2023-02-17 格式:PPTX 页数:39 大小:1.24MB
下载 相关 举报
Flink原理与实践-DataStream-API的介绍和使用课件.pptx_第1页
第1页 / 共39页
Flink原理与实践-DataStream-API的介绍和使用课件.pptx_第2页
第2页 / 共39页
Flink原理与实践-DataStream-API的介绍和使用课件.pptx_第3页
第3页 / 共39页
Flink原理与实践-DataStream-API的介绍和使用课件.pptx_第4页
第4页 / 共39页
Flink原理与实践-DataStream-API的介绍和使用课件.pptx_第5页
第5页 / 共39页
点击查看更多>>
资源描述

1、第四章DataStream API的介绍和使用Flink程序的骨架结构1.初始化运行环境2.读取一到多个Source数据源3.根据业务逻辑对数据流进行Transformation转换4.将结果输出到Sink5.调用作业执行函数执行环境是作业与集群交互的入口设置并行度关闭算子链时间、Checkpoint流处理和批处理的执行环境不一样Java、Scala两套API设置执行环境/创建Flink执行环境 StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();env.setParalleli

2、sm(2);env.disableOperatorChaining();Source、Transformation和SinkSource读取数据源统称为Source文件系统、消息队列、数据库等Transformation使用Flink提供的各类函数,进行有状态的计算数据流的分组、窗口和聚合操作等Sink将计算结果输出到外部系统,统称为Sink目的地可以是文件系统、消息队列、数据库等Flink是延迟执行(Lazy Evaluation)的调用execute()方法,Flink才会真正执行否则无法得到计算结果字符串参数为当前作业名执行/executeenv.execute(kafka stream

3、ing word count);l 单数据流转换l 基于Key的分组转换l 多数据流转换l 数据重分布转换l DataStream 泛型T为数据流中每个元素的类型四类Tranformation转换l 每个输入元素对应一个输出元素l 重写MapFunction或RichMapFunctionl MapFunction T为输入类型O为输出类型l 实现其中的map()虚方法l 主逻辑中调用该函数单数据流转换-mapFunctionalInterface public interface MapFunction extends Function,Serializable /调用这个API就是继承并实

4、现这个虚函数 O map(T value)throws Exception;/第一个泛型是输入类型,第二个泛型是输出类型 public static class DoubleMapFunction implements MapFunction Override public String map(Integer input)return function input:+input+,output:+(input*2);DataStream functionDataStream=dataStream.map(new DoubleMapFunction();MapFunction源代码一个MapF

5、unction的实现l 直接继承接口类并实现map虚方法上页所示l 使用匿名类l 使用Lambda表达式单数据流转换-map/匿名类 DataStream anonymousDataStream=dataStream.map(new MapFunction()Override public String map(Integer input)throws Exception return anonymous function input:+input+,output:+(input*2););/使用Lambda表达式 DataStream lambdaStream=dataStream.map(

6、input-lambda input:+input+,output:+(input*2);匿名类实现MapFunctionLambda表达式实现MapFunctionl 对输入元素进行过滤l 继承并实现FilterFunction或RichFilterFunctionl 重写filter虚方法l True 保留l False 过滤单数据流转换-filterDataStream dataStream=senv.fromElements(1,2,-3,0,5,-9,8);/使用-构造Lambda表达式 DataStream lambda=dataStream.filter(input-input

7、0);public static class MyFilterFunction extends RichFilterFunction /limit参数可以从外部传入 private Integer limit;public MyFilterFunction(Integer limit)this.limit=limit;Override public boolean filter(Integer input)return input this.limit;Lambda表达式实现FilterFunction实现FilterFunctionl 与map()相似l 输出零个、一个或多个元素l 可对列表

8、结果展平单数据流转换-flatMap苹果,梨,香蕉.map(去皮)去皮苹果,去皮梨,去皮香蕉 mapflatMap苹果,梨,香蕉.flatMap(切碎)苹果碎片1,苹果碎片2,梨碎片1,梨碎片2,梨碎片3,香蕉碎片1 苹果碎片1,苹果碎片2,梨碎片1,梨碎片2,梨碎片3,香蕉碎片1l 使用Lambda表达式l Collector用来收集元素flatMap()虚方法中不使用return返回数据,使用Collector收集返回数据Collector中的泛型String为返回数据类型l 将flatMap()看做map()和filter()更一般的形式map()和filter()的语义更明确单数据流转

9、换-flatMapDataStream dataStream=senv.fromElements(Hello World,Hello this is Flink);/split函数的输入为 Hello World 输出为 Hello 和 World 组成的列表 Hello,World/flatMap将列表中每个元素提取出来/最后输出为 Hello,World,Hello,this,is,Flink DataStream words=dataStream.flatMap(String input,Collector collector)-for(String word:input.split()

10、collector.collect(word);).returns(Types.STRING);l 数据分组后可进行聚合操作l keyBy()将一个DataStream转化为一个KeyedStreaml 聚合操作将KeyedStream转化为DataStreaml KeyedStream继承自DataStream基于Key的分组转换l 根据某种属性或数据的某些字段对数据进行分组l 对一个分组内的数据进行处理l 股票:相同股票代号的数据分组到一起l 相同Key的数据被分配到同一算子实例上l 需要指定Key数字位置字段名KeySelector基于Key的分组转换-keyByDataStreamTu

11、ple2 dataStream=senv.fromElements(Tuple2.of(1,1.0),Tuple2.of(2,3.2),Tuple2.of(1,5.5),Tuple2.of(3,10.0),Tuple2.of(3,12.5);/使用数字位置定义Key 按照第一个字段进行分组 DataStreamTuple2 keyedStream=dataStream.keyBy(0).sum(1);l KeySelectorl 重写getKey()方法单数据流转换-keyBy/IN为数据流元素,KEY为所选择的Key FunctionalInterface public interface

12、KeySelector extends Function,Serializable /选择一个字段作为Key KEY getKey(IN value)throws Exception;public class Word public String word;public int count;/使用KeySelector DataStream keySelectorStream=wordStream.keyBy(new KeySelector()Override public String getKey(Word in)return in.word;).sum(count);KeySelecto

13、r源码一个KeySelector的实现l sum()、max()、min()等l 指定字段,对该字段进行聚合KeySelectorl 流数据上的聚合实时不断输出到下游状态存储中间数据单数据流转换 Aggregationsl 将某个字段加和l 结果保存到该字段上l 不关心其他字段的计算结果单数据流转换 sumDataStreamTuple3 tupleStream=senv.fromElements(Tuple3.of(0,0,0),Tuple3.of(0,1,1),Tuple3.of(0,2,2),Tuple3.of(1,0,6),Tuple3.of(1,1,7),Tuple3.of(1,0,

14、8);/按第一个字段分组,对第二个字段求和,打印出来的结果如下:/(0,0,0)/(0,1,0)/(0,3,0)/(1,0,6)/(1,1,6)/(1,1,6)DataStreamTuple3 sumStream=tupleStream.keyBy(0).sum(1);l max()对该字段求最大值结果保存到该字段上不保证其他字段的计算结果l maxBy()对该字段求最大值其他字段保留最大值元素的值单数据流转换 max/maxByDataStreamTuple3 tupleStream=senv.fromElements(Tuple3.of(0,0,0),Tuple3.of(0,1,1),Tu

15、ple3.of(0,2,2),Tuple3.of(1,0,6),Tuple3.of(1,1,7),Tuple3.of(1,0,8);/按第一个字段分组,对第三个字段求最大值max,打印出来的结果如下:/(0,0,0)/(0,0,1)/(0,0,2)/(1,0,6)/(1,0,7)/(1,0,8)DataStreamTuple3 maxStream=tupleStream.keyBy(0).max(2);/按第一个字段分组,对第三个字段求最大值maxBy,打印出来的结果如下:/(0,0,0)/(0,1,1)/(0,2,2)/(1,0,6)/(1,1,7)/(1,0,8)DataStreamTup

16、le3 maxByStream=tupleStream.keyBy(0).maxBy(2);l 比Aggregation更通用l 在KeyedStream上生效l 接受两个输入,生成一个输出l 两两合一地汇总操作基于Key的分组转换-reducel 实现ReduceFunction基于Key的分组转换-reducepublic static class MyReduceFunction implements ReduceFunction Override public Score reduce(Score s1,Score s2)return Score.of(s1.name,Sum,s1.s

17、core+s2.score);DataStream dataStream=senv.fromElements(Score.of(Li,English,90),Score.of(Wang,English,88),Score.of(Li,Math,85),Score.of(Wang,Math,92),Score.of(Liu,Math,91),Score.of(Liu,English,87);/实现ReduceFunction DataStream sumReduceFunctionStream=dataStream.keyBy(name).reduce(new MyReduceFunction(

18、);/使用 Lambda 表达式 DataStream sumLambdaStream=dataStream.keyBy(name).reduce(s1,s2)-Score.of(s1.name,Sum,s1.score+s2.score);l 将多个同类型的DataStream合并为一个DataStreaml 数据按照先进先出(FIFO)合并多数据流转换-unionDataStream shenzhenStockStream=.DataStream hongkongStockStream=.DataStream shanghaiStockStream=.DataStream unionSto

19、ckStream=shenzhenStockStream.union(hongkongStockStream,shanghaiStockStream);l 只能连接两个DataStream数据流l 两个数据流类型可以不一致l 两个DataStream经过connect()之后转化为ConnectedStreams,ConnectedStreams会对两个流的数据应用不同的处理方法,且双流之间可以共享状态l 应用场景为:使用一个控制流对另一个数据流进行控制多数据流转换-connectl 重写CoMapFunction或CoFlatMapFunctionl 三个泛型,分别对应第一个输入流的数据类型

20、、第二个输入流的数据类型和输出流的数据类型l 对于CoFlatMapFunction,flatMap1()方法处理第一个流的数据,flatMap2()方法处理第二个流的数据l 可以做到类似SQL Join的效果多数据流转换-connect/IN1为第一个输入流的数据类型/IN2为第二个输入流的数据类型/OUT为输出类型 public interface CoFlatMapFunction extends Function,Serializable /处理第一个流的数据 void flatMap1(IN1 value,Collector out)throws Exception;/处理第二个流的

21、数据 void flatMap2(IN2 value,Collector out)throws Exception;/CoMapFunction三个泛型分别对应第一个流的输入、第二个流的输入,map之后的输出 public static class MyCoMapFunction implements CoMapFunction Override public String map1(Integer input1)return input1.toString();Override public String map2(String input2)return input2;CoFlatMapF

22、unction源代码一个CoFlatMapFunction实现l 并行度逻辑视图中的算子被切分为多个算子子任务每个算子子任务处理一部分数据可以在整个作业的执行环境层面设置也可以对某个算子单独设置并行度StreamExecutionEnvironment senv=StreamExecutionEnvironment.getExecutionEnvironment();/获取当前执行环境的默认并行度 int defaultParalleism=senv.getParallelism();/设置所有算子的并行度为4,表示所有算子的并行执行的实例数为4 senv.setParallelism(4);

23、在执行环境中设置并行度:对某个算子单独设置:dataStream.map(new MyMapper().setParallelism(defaultParallelism*2);l 默认情况下,数据自动分布到多个实例(或者称之为分区)上l 手动在多个实例上进行数据分配避免数据倾斜l 输入是DataStream,输出也是DataStream数据重分布dataStream.shuffle();基于正态分布,将数据随机分配到下游各算子实例上:dataStream.broadcast();数据会被复制并广播发送给下游的所有实例上:dataStream.global();将所有数据发送给下游算子的第一个

24、实例上:l rebalance()使用Round-Ribon思想将数据均匀分配到各实例上l rescale()就近发送给下游每个实例数据重分布rebalance()将数据轮询式地分布到下游子任务上 当上游有2个子任务、下游有4个子任务时使用rescale()l partitionCustom()自定义数据重分布逻辑l PartitionerK中泛型K为根据哪个字段进行分区对一个Score类型数据流重分布,希望按照id均匀分配到下游各实例,那么泛型K就为id的数据类型Long重写partition()方法数据重分布FunctionalInterface public interface Part

25、itioner extends java.io.Serializable,Function /根据key决定该数据分配到下游第几个分区(实例)int partition(K key,int numPartitions);/*Partitioner 其中泛型T为指定的字段类型 *重写partiton函数,并根据T字段对数据流中的所有元素进行数据重分配 */public static class MyPartitioner implements Partitioner private Random rand=new Random();private Pattern pattern=Ppile(.*

26、d+.*);/*key 泛型T 即根据哪个字段进行数据重分配,本例中是Tuple2(Int,String)中的String *numPartitons 为当前有多少个并行实例 *函数返回值是一个Int 为该元素将被发送给下游第几个实例 */Override public int partition(String key,int numPartitions)int randomNum=rand.nextInt(numPartitions/2);Matcher m=pattern.matcher(key);if(m.matches()return randomNum;else return ran

27、domNum+numPartitions/2;/对(Int,String)中的第二个字段使用 MyPartitioner 中的重分布逻辑 DataStreamTuple2 partitioned=dataStream.partitionCustom(new MyPartitioner(),1);Partitioner源码 一个Partitioner的实现l 数据传输、持久化l 序列化:将内存对象转换成二进制串、网络可传输或可持久化l 反序列化:将二进制串转换为内存对象,可直接在编程语言中读写和操作l 常见序列化方式:JSONJava、Kryo、Avro、Thrift、Protobufl Fli

28、nk开发了自己的序列化框架更早地完成类型检查节省数据存储空间序列化和反序列化l基础类型Java、Scala基础数据类型l数组l复合类型Scala case classJava POJOTuplel辅助类型Option、List、Mapl泛型和其他类型GenericFlink支持的数据类型l TypeInformaton用来表示数据类型,创建序列化器l 每种数据类型都对应一个TypeInfomationTupleTypeInfo、PojoTypeInfo TypeInformationl Flink会自动推断类型,调用对应的序列化器,对数据进行序列化和反序列化类型推断和序列化package mo

29、n.typeinfo;public class Types /java.lang.Void public static final TypeInformation VOID=BasicTypeInfo.VOID_TYPE_INFO;/java.lang.String public static final TypeInformation STRING=BasicTypeInfo.STRING_TYPE_INFO;/java.lang.Boolean public static final TypeInformation BOOLEAN=BasicTypeInfo.BOOLEAN_TYPE_IN

30、FO;/java.lang.Integer public static final TypeInformation INT=BasicTypeInfo.INT_TYPE_INFO;/java.lang.Long public static final TypeInformation LONG=BasicTypeInfo.LONG_TYPE_INFO;.一些基础类型的TypeInformation:l Types.STRING 是用来表示 java.lang.String 的TypeInformationl Types.STRING 被定义为 BasicTypeInfo.STRING_TYPE_

31、INFOl STRING_TYPE_INFO:使用何种序列化器和比较器类型推断和序列化public static final BasicTypeInfo STRING_TYPE_INFO=new BasicTypeInfo(String.class,new Class,StringSerializer.INSTANCE,StringComparator.class);STRING_TYPE_INFO定义使用何种序列化器和比较器:l 在声明式文件中定义Schemal 使用工具将Schema转换为Java可用的类l Avro Specific生成的类与POJO类似有getter、setter方法在

32、Flink中可以像使用POJO一样使用Avro Specific模式l Avro Generic不生成具体的类用GenericRecord封装所有用户定义的数据结构必须给Flink提供Schema信息Avro namespace:org.apache.flink.tutorials.avro,type:record,name:MyPojo,fields:name:id,type:int,name:name,type:string Avro声明式文件:l Kryo是大数据领域经常使用的序列化框架l Flink无法推断出数据类型时,将该数据类型定义为GenericTypeInfo,使用Kryo作为

33、后备选项进行序列化l 最好实现自己的序列化器,并对数据类型和序列化器进行注册l Kryo在有些场景效率不高l env.getConfig.disableGenericTypes()禁用Kryo,可以定位到具体哪个类型无法被Flink自动推断,然后针对该类型创建更高效的序列化器Kryo注册数据类型和序列化器:/将MyCustomType类进行注册 env.getConfig().registerKryoType(MyCustomType.class);/或者使用下面的方式并且实现自定义序列化器 env.getConfig().registerTypeWithKryoSerializer(MyCu

34、stomType.class,MyCustomSerializer.class);static class MyClassSerializer extends Serializer implements Serializable private static final long serialVersionUID=.Override public void write(Kryo kryo,Output output,MyCustomType myCustomType).Override public MyCustomType read(Kryo kryo,Input input,Class t

35、ype).l 与Avro Specific模式相似,使用声明式语言定义Schema,使用工具将声明式语言转化为Java类l 有人已经实现好Kryo的序列化器l 案例:MyCustomType是使用Thrift工具生成的Java类,TBaseSerializer是com.twitter:chill-thrift包中别人实现好的序列化器,该序列化器基于Kryo的Serializer。l 注意在pom.xml中添加相应的依赖Thrift、Protobuf/Google Protobuf/MyCustomType类是使用Protobuf生成的Java类/ProtobufSerializer是别人实现好

36、的序列化器 env.getConfig().registerTypeWithKryoSerializer(MyCustomType.class,ProtobufSerializer.class);/Apache Thrift/MyCustomType是使用Thrift生成的Java类/TBaseSerializer是别人实现好的序列化器 env.getConfig().addDefaultKryoSerializer(MyCustomType.class,TBaseSerializer.class);l Flink的数据类型:Java、Scala、Table API分别有自己的数据类型体系l

37、绝大多数情况下,程序员不需要关心使用何种TypeInformation,只需要使用自己所需的数据类型l Flink会做类型推断、选择对应的序列化器l 当自动类型推断失效,用户需要关注TypeInformationl 数据类型选择:需要考虑:上下游的数据结构、序列化器的性能、状态数据的持续迭代能力POJO和Tuple等内置类型性能更好Avro、Thrift和Protobuf对上下游数据的兼容性更好,不需要在Flink应用中重新设计一套POJOPOJO和Avro对Flink状态数据的持续迭代更友好数据类型小结l 用户自定义函数的三种方式:继承并实现函数类 使用Lambda表达式 继承并实现Rich

38、函数类用户自定义函数l 对于map()、flatMap()、reduce()等函数,我们可以实现MapFunction、FlatMapFunction、ReduceFunction等interface接口。l 以FlatMapFunction函数式接口为例:继承了Flink的Function函数式接口函数在运行过程中要发送到各个实例上,发送前后要进行序列化和反序列化,一定要保证函数内的所有内容都可以被序列化两个泛型T和O,T是输入,O是输出,要设置好输入和输出数据类型,否则会报错重写虚方法flatMap()Collector收集输出数据函数类package mon.functions;Func

39、tionalInterface public interface FlatMapFunction extends Function,Serializable void flatMap(T value,Collector out)throws Exception;/使用FlatMapFunction实现过滤逻辑,只对字符串长度大于 limit 的内容进行词频统计 public static class WordSplitFlatMap implements FlatMapFunction private Integer limit;public WordSplitFlatMap(Integer

40、limit)this.limit=limit;Override public void flatMap(String input,Collector collector)throws Exception if(input.length()limit)for(String word:input.split()collector.collect(word);DataStream dataStream=senv.fromElements(Hello World,Hello this is Flink);DataStream functionStream=dataStream.flatMap(new

41、WordSplitFlatMap(10);FlatMapFunction源码一个FlatMapFunction实现l 简洁紧凑l Scala对Lambda表达式支持更好l Java 8之后也开始支持Lambda表达式,有类型擦除问题使用returns 提供类型信息Lambda表达式DataStream words=dataStream.flatMap(String input,Collector collector)-for(String word:input.split()collector.collect(word);)/提供类型信息以解决类型擦除问题.returns(Types.STRI

42、NG);val lambda=dataStream.flatMap(value:String,out:CollectorString)=if(value.size 10)value.split().foreach(out.collect)Scala:Java:l RichMapFunction、RichFlatMapFunction、RichReduceFunctionl 增加了更多功能:open()方法:初始化close()方法:算子最后执行这个方法,可以释放一些资源getRuntimeContext()方法:获取算子子任务的运行时上下文l 累加器例子:分布式计算环境下,计算是分布在多台节点

43、上的,每个节点处理一部分数据,使用for循环无法满足累加器功能Rich函数类/实现RichFlatMapFunction类/添加了累加器 Accumulator public static class WordSplitRichFlatMap extends RichFlatMapFunction private int limit;/创建一个累加器 private IntCounter numOfLines=new IntCounter(0);public WordSplitRichFlatMap(Integer limit)this.limit=limit;Override public

44、void open(Configuration parameters)throws Exception super.open(parameters);/在RuntimeContext中注册累加器 getRuntimeContext().addAccumulator(num-of-lines,this.numOfLines);Override public void flatMap(String input,Collector collector)throws Exception /运行过程中调用累加器 this.numOfLines.add(1);if(input.length()limit)for(String word:input.split()collector.collect(word);

展开阅读全文
相关资源
猜你喜欢
相关搜索

当前位置:首页 > 办公、行业 > 各类PPT课件(模板)
版权提示 | 免责声明

1,本文(Flink原理与实践-DataStream-API的介绍和使用课件.pptx)为本站会员(晟晟文业)主动上传,163文库仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对上载内容本身不做任何修改或编辑。
2,用户下载本文档,所消耗的文币(积分)将全额增加到上传者的账号。
3, 若此文所含内容侵犯了您的版权或隐私,请立即通知163文库(发送邮件至3464097650@qq.com或直接QQ联系客服),我们立即给予删除!


侵权处理QQ:3464097650--上传资料QQ:3464097650

【声明】本站为“文档C2C交易模式”,即用户上传的文档直接卖给(下载)用户,本站只是网络空间服务平台,本站所有原创文档下载所得归上传人所有,如您发现上传作品侵犯了您的版权,请立刻联系我们并提供证据,我们将在3个工作日内予以改正。


163文库-Www.163Wenku.Com |网站地图|