1、第八章Table API&SQL的介绍和使用第二章大数据必备编程知识l 案例:动物类(Animal)和鱼类(Fish)l 继承关系保证所有动物子类都具有动物类的属性和方法l 子类有自己的属性和方法。l 除了动物,还有很多其他事物也会移动,使用接口(interface)来抽象“移动”。继承l Java的继承:继承类 extends 实现接口 implements继承public class ClassA implements Move Override public void move().实现接口public class Dog extends Animal private String do
2、gData;public Dog(String myName,String myDescription,String myDogData)this.name=myName;this.description=myDescription;this.dogData=myDogData 继承类interfaceclassl 重写:子类和父类都定义同名方法,子类的方法会覆盖父类中已有的方法。l 重载:多个同名方法,这些方法名字相同、参数不同、返回类型不同。重写与重载public class ClassA implements Move Override public void move().Overri
3、de:在子类中重写父类中的同名方法public class Overloading /无参数,返回值为int类型 public int test()System.out.println(test);return 1;/有一个参数 public void test(int a)System.out.println(test +a);/有两个参数和一个返回值 public String test(int a,String s)System.out.println(test +a +s);return a+s;同名方法重载:一个类中多个方法都名为test,但是参数类型和返回值类型不同。l 案例:Ja
4、va中的List和ArrayListl ArrayList是一个泛型类,List是一个泛型接口l ArrayList泛型是一种集合容器,可以向这个集合容器中添加String、Double以及其他各类数据类型。没必要创建StringArrayList、DoubleArrayList等类。泛型ist strList=new ArrayList();List doubleList=new LinkedList();l 类名后面加上 l 类内部的一些属性和方法都可以使用泛型Tl 泛型规范:T 代表一般的任何类。E 代表元素(Element)或异常(Exception)。K 或KEY代表键(Key)。V
5、 代表值(Value),通常与K一起配合使用。Java泛型类public class MyArrayList private int size;T elements;public MyArrayList(int capacity)this.size=capacity;this.elements=(T)new Objectcapacity;public void set(T element,int position)elementsposition=element;Override public String toString()String result=;for(int i=0;i size
6、;i+)result+=elementsi.toString();return result;l 与泛型类类似,使用符号l 可以继承并实现这个接口Java泛型接口public interface List .public List subList(int fromIndex,int toIndex);public class ArrayList implements List .public List subList(int fromIndex,int toIndex)./返回一个List类型值 要实现的子类是泛型的 public class DoubleList implements Lis
7、t .public List subList(int fromIndex,int toIndex)./返回一个List类型值 要实现的子类不是泛型的,而是有确定类型的 l 泛型方法可以存在于泛型类中,也可以存在于普通的类中。l 泛型方法的类型E和泛型类中的类型T可以不一样。泛型方法是泛型类的一个成员,泛型方法既可以继续使用类的类型T,也可以自己定义新的类型E。Java泛型方法public class MyArrayList ./public关键字后的表明该方法是一个泛型方法 /泛型方法中的类型E和泛型类中的类型T可以不一样 public E processElement(E element).
8、return E;l Java泛型信息只存在于代码编译阶段,当程序运行到JVM上时,与泛型相关的信息会被擦除。l 对于绝大多数应用系统开发者来说影响不太大,对于框架开发者来说,必须要注意。类型擦除Class strListClass=new ArrayList().getClass();Class intListClass=new ArrayList().getClass();/输出:class java.util.ArrayListSystem.out.println(strListClass);/输出:class java.util.ArrayListSystem.out.println(
9、intListClass);/输出:trueSystem.out.println(strListClass.equals(intListClass);泛型擦除:无法区别strListClass和intListClass这两个类型 l 适合进行并行计算的一种编程范式l 非函数式编程:创建中间变量,分步执行l 函数式编程:与数学表达式更相似实现单个函数,将零到多个输入转换成零到多个输出。比如,add()将两个输入转化为一个输出。将多个函数连接起来,实现所需业务逻辑。比如,将add()、multiply()连接到一起。函数式编程addResult=x+yresult=addResult*z非函数式编
10、程result=add(x,y).multiply(z)函数式编程l Lambda表达式被一些编程语言用来实现函数式编程。l 一个箭头符号-,两边连接着输入参数和函数体。Lambda表达式(parameters)-body Java的Lambda表达式的语法规则/接收2个int类型参数,返回它们的和(int x,int y)-x+y/接收1个String类型参数,将其输出到控制台,不返回任何值(String s)-System.out.print(s);/参数为圆半径,返回圆面积,返回值为double类型(double r)-double pi=3.1415;return r*r*pi;几个J
11、ava Lambda表达式案例l 输入参数:接收零到多个输入参数程序员可以提供输入类型,也可以不提供类型,让代码根据上下文去推断参数可以放在圆括号()中,多个参数通过英文逗号,隔开l 函数体:可以有一到多行语句函数体有多行内容,必须使用花括号 输出的类型与所需要的类型相匹配Java Lambda表达式l Lambda表达式本质是一种接口,它要实现一个函数式接口(Functional Interface)中的虚方法l 函数式接口是一种接口,并且它只有一个虚方法。l FunctionalInterface 注解函数式接口FunctionalInterfaceinterface AddInterfa
12、ce T add(T a,T b);public static class MyAdd implements AddInterface Override public Double add(Double a,Double b)return a+b;如果没有Lambda表达式(Integer a,Integer b)-a+b;使用Lambda表达式l Java 8之后推出的,专注于对集合(Collection)对象的操作。l 右侧案例:数据先经过stream()方法被转换为一个Stream类型,后经过filter()、map()、collect()等处理逻辑,生成我们所需的输出。各个操作之间使用
13、英文点号.来连接,这种方式被称作链式调用(Method Chaining)。l 链式调用:将多个函数连接起来。l Flink的API是面向数据集或数据流的操作。这些操作分布在大数据集群的多个节点上,并行地分布式执行。Java Stream APIList strings=Arrays.asList(abc,bc,12345,efg,abcd,jkl);List lengths=strings .stream().filter(string-!string.isEmpty().map(s-s.length().collect(Collectors.toList();lengths.forEach
14、(s)-System.out.println(s);第九章Flink的部署和配置部署模式 Standalone集群:集群内只部署Flink Hadoop YARN集群:兼容Hadoop生态,可以部署Hadoop、Hive、Spark、Flink Kubernetes集群:兼容各类容器l 至少一个Master,至少一个Workerl Master:Dispatcher、ResourceManager和JobManagerl Worker:TaskManagerl 安装好Java、配置好SSH免密码访问l 配置Flink主目录的conf/flink-conf.yaml和conf/slaves两个文
15、件l 将Flink主目录部署到每个节点的相同路径,或者部署到所有节点都可访问的共享目录l 启动这个集群:bin/start-cluster.sh Standalone集群一个Standalone集群拓扑样例$./bin/flink run-m 192.168.0.1:8081./examples/batch/WordCount.jar提交作业:使用针对Hadoop的Flink,设置HADOOP_CONF_DIR和YARN_CONF_DIR 等与Hadoop相关的配置启动好YARN集群三种作业提交方式:Per-Job:每次向YARN提交一个作业,YARN为这个作业单独分配资源,基于这些资源启动一
16、个Flink集群,作业运行结束,相应的资源会被释放。Session:在YARN上启动一个长期运行的Flink集群,用户可以向这个集群提交多个作业。Application:在Per-Job模式上做了一些优化。Hadoop YARNl Client将作业提交给YARN的ResourceManagerl YARN为这个作业生成一个Application Master以运行Fink Master,里面运行这运行着JobManager和Flink-YARN ResourceManager l JobManager会根据本次作业所需资源向Flink-YARN ResourceManager申请Slot资源
17、 l Flink-YARN ResourceManager会向YARN申请所需的资源作为TaskManagerl TaskManager将这些Slot注册到Flink-YARN ResourceManagerl JobManager将作业的计算任务部署到各TaskManager上 YARN Per-JobYARN Per-Job模式l 在Client上,使用bin/yarn-session.sh启动一个YARN Session,Flink向YARN ResourceManager申请一个Application Masterl 用户提交一个作业,作业被发送给Dispatcherl Dispatc
18、her会启动针对该作业的JobManagerl JobManager向Flink-YARN ResourceManager上申请所需资源,启动TaskManagerl TaskManager注册l JobManager将计算任务部署到各TaskManager上l 如果用户提交下一个作业,那么Dispatcher启动新的JobManager,新的JobManager负责新作业的资源申请和任务调度。YARN SessionYARN Session模式l Per-Job和Session模式作业提交的过程依赖Client,main()方法在Client上执行,main()方法会将作业的各个依赖下载到本
19、地,生成JobGraph,并将依赖以及JobGraph发送到Flink集群,负载很重。l Application模式允许main()方法在JobManager上执行,这样可以分担Client的压力 YARN Applicationl 两种方式:Kubernetes和Kubernets原生l Kubernetes原生Session模式:使用bin/kubernetes-session.sh启动一个Kubernetes SessionKubernetes相关组件将进行初始化,生成Flink Master(Dispatcher、Flink-Kubernetes ResourceManager)用户提
20、交作业,申请资源,将作业运行到TaskManager上l Kubernetes原生Application模式KubernetesKubernetes原生Session模式l 使用Key-Value来设置参数,很多进程会读取这个文件l 从Flink官网下载的Flink主目录里的flink-conf.yaml有一些默认设置,针对单机环境,在自己环境中使用,需要修改这个文件conf/flink-conf.yaml l 安装Java,设置$JAVA_HOMEl 类加载:将Java的.class文件加载到JVM虚拟机l 一个Flink作业主要加载两种类:Java Classpath:JDK核心类库和Fl
21、ink主目录下lib文件夹中的类用户类(User Code):用户编写的应用作业中的类l 类加载策略:子类优先(Child-first):优先加载用户编写的应用作业中的类,Flink默认使用Child-first父类优先(Parent-first):优先加载Java Classpath中的类Java和类加载l 并行度(parallelism):每个算子都会被切分为parallelism个子任务,分布到多个Slot上。根据计算规模大小,调整并行度。l 如果作业开启了算子链和槽位共享,一个Slot上运行着一个作业所有算子组成的流水线(Pipeline),这个作业需要parallelism个Slot
22、。l 槽位划分:将TaskManager划分成多少个Slot。使用 taskmanager.numberOfTaskSlots 设置默认值为1,Standalone集群官方建议将参数值配置为与CPU核心数相等或成比例 配置taskmanager.numberOfTaskSlots没有绝对的准则:每个TaskManager下有一个Slot,该Slot会独立运行在一个JVM进程中。隔离性好。每个TaskManager下有多个Slot,那么多个Slot同时运行在一个JVM进程中。多个Slot可以共享资源,隔离性差。并行度和槽位划分l 堆区(Heap):JVM虚拟化之后的内存,存储Java对象实例,使
23、用Java垃圾回收(Garbage Collection,GC)机制来清理内存中的不再使用的对象。l 某个时间点,必须进行一次Full GC,Full GC影响Java应用的性能。l 悖论:Heap越大,Full GC时间越长。Heap太小,会出现OutOfMemoryError异常。l 堆外(Off-Heap):直接由操作系统管理的内存,适合读写操作频繁的场景。使用、监控和调试更复杂。Full GC影响性能时,可以考虑Off-Heap。Java内存l JVM进程:Master、TaskManagerl Flink占用的内存(Total Flink Memory):Flink可以使用的内存JV
24、M HeapOff-heap MemoryFlink Managed Memory:TaskManager个别场景使用 Direct Memory:网络通信缓存使用的内存 l JVM相关内存(JVM Specific Memory):Java程序都需要的一块内存区域l Flink将内存管理部分做了封装,用户在绝大多数情况下不用关注数据到底是如何写入内存的Flink内存模型l Master占用内存不大l 最简单的配置方法:设置Total Process Memory(jobmanager.memory.process.size),根据默认分配比例,将内存分配给各个模块。设置Total Flink
25、 Memory(jobmanager.memory.flink.size),如果Master进程需要管理多个作业,需要增大这个配置。Master的内存配置 l TaskManager涉及数据处理,对内存的需求很高l 根据是否为Flink框架所用,分为Flink框架和非Flink框架,框架使用的内存,用户任务无法使用(Framework Heap,Framework Off-heap)用户任务使用的内存(Task Heap、Task Off-heap、Flink Managed Memory和Network)l 根据堆区非堆区,分为Heap和Off-heapTask Heap:用户程序内存Fli
26、nk Managed Memory:流处理下RocksDB的State Backend,批处理下排序、中间数据缓存等Network:Netty缓存TaskManager的内存配置 l 粗粒度的配置方法:配置Total Process Memory或Total Flink Memory两者中的任意一个。各个子模块根据默认比例获得相应的内存。Total Process Memory:整个进程的内存,适合容器化部署方式。(taskmanager.memory.process.size)Total Flink Memory:Flink可用内存,适合Standalone集群方式。(taskmanager
27、.memory.flink.size)l 细粒度的配置方法同时配置Task Heap和Flink Managed Memory两个内存。这两个内存服务于具体的计算任务。(taskmanager.memory.task.heap.size和taskmanager.memory.managed.size)l 以上3个参数不要同时配置,否则会引起冲突。TaskManager的内存配置 l Flink会将一部分数据写到本地磁盘,比如:日志信息、RocksDB数据等。l io.tmp.dirs 配置了本地磁盘读写位置,默认会使用JVM的参数java.io.tmpdir l Linux下一般为/tmp磁盘
28、l Flink使用算子链将尽可能多的上、下游算子链接到一起。上、下游算子会被捆绑到一起,作为一个线程执行。可以提高资源利用率。l 如果两个算子不做算子链,算子间数据通信存在序列化和反序列化,通信成本高。l 数据不发生交换,才可以进行算子链。l 两个算子负载都很高,不应该进行算子链。l 默认开启算子链。env.disableOperatorChaining():关闭算子链。l startNewChain():对特定算子开启新的算子链。算子链和槽位共享l Flink默认开启了槽位共享:从Source到Sink的所有算子子任务组成的Pipline可以共享一个Slot。l 也可以手动设置槽位共享组(S
29、lot Sharing Group)。槽位共享整个作业并行度为2,从Source到Sink所有算子共享一个Slot。给Window Aggreagtion设置Slot Sharing Group,该算子及之后的算子被划分到另外的Slot stream.timeWindow(.).sum(.).slotSharingGroup(“A”);l 生产环境中,一般使用命令行管理作业,名为 flink,放在Flink主目录下的bin目录下。l 功能:提交、取消、罗列当前作业,获取作业信息,设置Savepoint。l 会从conf/flink-conf.yaml里读取配置信息。l 使用方式:l ACTIO
30、N包括run(提交作业)、stop(取消作业)等。OPTIONS为一些预置的选项,ARGUMENTS是用户传入的参数。命令行工具$./bin/flink OPTIONS ARGUMENTSl 模板:l 需要使用Maven对用户代码打包,得到JAR包。l 使用WordCount的例子:l 使用-c参数,设置程序入口类:l 在命令行中用-p选项设置这个作业的并行度提交作业$./bin/flink run OPTIONS ARGUMENTS$./bin/flink run./examples/streaming/WordCount.jar$./bin/flink run./examples/stre
31、aming/WordCount.jar-c org.apache.flink.streaming.examples.wordcount.WordCount$./bin/flink run-p 2./examples/streaming/WordCount.jarl 传入参数,参数会写入main方法的参数String中l 使用-m选项来设置向具体哪个Master提交提交作业$./bin/flink run./examples/streaming/WordCount.jar-c org.apache.flink.streaming.examples.wordcount.WordCount-inpu
32、t/tmp/a.log-output/tmp/b.log$./bin/flink run-m myJMHost:8081 ./examples/streaming/WordCount.jarl 罗列当前作业:l 触发一个作业执行Savepoint,savepointDirectory为目录:l 关停一个Flink作业:l 从一个Savepoint恢复一个应用作业:管理作业$./bin/flink list$./bin/flink savepoint savepointDirectory$./bin/flink cancel$./bin/flink run-s OPTIONS l 确认已经在机器
33、上安装好Hadoop,配置好环境变量HADOOP_CONF_DIR:l 将Hadoop依赖包添加到Flink中,两种方法:在环境变量中添加Hadoop Classpath,Flink从Hadoop Classpath中读取所需依赖包。将所需的Hadoop JAR包添加到Flink主目录下的lib目录中。与Hadoop集成HADOOP_CONF_DIR=/path/to/etc/hadoopl HADOOP_CLASSPATH存储Hadoop相关类的路径l hadoop是Hadoop提供的命令,hadoop classpath返回Hadoop所有相关的JAR包和依赖l 下面的命令会添加环境变量:
34、l Flink启动时会从$HADOOP_CLASSPATH中寻找所需依赖包。直接使用已经安装的Hadoop。l Flink所需要的依赖和已经安装的Hadoop提供的依赖有可能发生冲突。添加Hadoop Classpathexport HADOOP_CLASSPATH=hadoop classpathl Flink主目录下有一个lib目录,专门存放各类第三方的依赖包。l 可以将Hadoop依赖包添加到这个目录下:从Flink官网下载预打包的Hadoop依赖包:一些通用的Hadoop版本 从源码编译:一些定制的Hadoop将Hadoop添加到lib文件夹下第六章状态和检查点什么是有状态的计算 有状
35、态计算的潜在场景数据去重:需要记录哪些数据已经流入过应用,当新数据流入时,根据已流入数据去重检查输入流是否符合某个特定模式:之前流入的数据以状态的形式缓存下来对一个窗口内的数据进行聚合分析,比如分析一小时内某项指标75分位值或99分位值 Flink分布式计算,一个算子有多个算子子任务 状态可以被理解为某个算子子任务在当前实例上的一个变量,变量记录了数据流的历史信息,新数据流入,可以结合历史信息来进行计算接收输入流/获取对应状态/更新状态状态管理的难点 要解决问题:实时性,延迟不能太高数据不丢不重、恰好计算一次,尤其发生故障恢复后程序的可靠性要高,保证7*24小时稳定运行 难点不能将状态直接交由
36、内存,因为内存空间有限用持久化的系统备份状态,出现故障时,如何从备份中恢复需要考虑扩展到多个节点时的伸缩性 Flink解决了上述问题,提供有状态的计算APIl Managed State 和 Raw State托管状态(Managed State)是由Flink管理的,Flink帮忙存储、恢复和优化原生状态(Raw State)是开发者自己管理的,需要自己序列化 l Managed State又细分为Keyed State和Operator StateFlink的几种状态类型ManagedStateRawState状态管理方式Flink Runtime托管,自动存储、自动恢复、自动伸缩用户自己
37、管理状态数据结构Flink提供的常用数据结构,如ListState、MapState等字节数组:byte使用场景绝大多数Flink函数用户自定义函数l Keyed State是KeyedStream上的状态,每个Key共享一个状态l Operator State每个算子子任务共享一个状态Keyed State和Operator StateKeyed State相同Key的数据可以访问、更新这个状态Operator State流入这个算子子任务的所有数据可以访问、更新这个状态l Keyed State和Operator State都是基于本地的,每个算子子任务维护着自身的状态,不能访问其他算子子
38、任务的状态l 具体的实现层面,Keyed State需要重写Rich Function函数类,Operator State需要实现CheckpointedFunction等接口Keyed State和Operator StateKeyedStateOperatorState适用算子类型只适用于KeyedStream上的算子可以用于所有算子状态分配每个Key对应一个状态一个算子子任务对应一个状态创建和访问方式重写Rich Function,通过里面的RuntimeContext访问实现CheckpointedFunction等接口横向扩展状态随着Key自动在多个算子子任务上迁移有多种状态重新分配
39、的方式支持的数据结构ValueState、ListState、MapState等ListState、BroadcastState等l 修改Flink应用的并行度:每个算子的并行算子子任务数发生了变化,整个应用需要关停和启动一些算子子任务l 某份在原来某个算子子任务的状态需要平滑更新到新的算子子任务上l Flink的Checkpoint可以辅助状态数据在算子子任务之间迁移算子子任务生成快照(Snapshot)保存到分布式存储上子任务重启后,相应的状态在分布式存储上重建(Restore)l Keyed State与Operator State的横向扩展方式稍有不同横向扩展问题l Flink提供了封
40、装好的数据结构供我们使用,包括ValueState、ListState等l 主要有:ValueState:单值MapState:Key-Value对ListState:列表ReducingState和AggregatingState:合并l Keyed State由于跟Key绑定,Key自动分布到不同算子子任务,Keyed State也可以根据Key分发到不同算子子任务上Keyed Statel 实现RichFunction函数类,比如RichFlatMapFunctionl 创建StateDescriptor,StateDescriptor描述状态的名字和状态的数据结构,每种类型的状态有对应
41、的StateDescriptorl 通过StateDescriptor,从RuntimeContext中获取状态l 调用状态提供的方法,获取状态,更新数据Keyed State/创建StateDescriptor MapStateDescriptor behaviorMapStateDescriptor=new MapStateDescriptor(behaviorMap,Types.STRING,Types.INT);/通过StateDescriptor获取运行时上下文中的状态 behaviorMapState=getRuntimeContext().getMapState(behavior
42、MapStateDescriptor);MapState:UV get(UK key)void put(UK key,UV value)boolean contains(UK key)l 案例:统计电商用户行为UserBehavior场景下,某个用户(userId)下某种用户行为(behavior)的数量Keyed State/*MapStateFunction继承并实现RichFlatMapFunction*两个泛型分别为输入数据类型和输出数据类型*/public static class MapStateFunction extends RichFlatMapFunctionUserBeh
43、avior,Tuple3 /指向MapState的句柄 private MapState behaviorMapState;Override public void open(Configuration configuration)/创建StateDescriptor MapStateDescriptor behaviorMapStateDescriptor=new MapStateDescriptor(behaviorMap,Types.STRING,Types.INT);/通过StateDescriptor获取运行时上下文中的状态 behaviorMapState=getRuntimeCo
44、ntext().getMapState(behaviorMapStateDescriptor);Override public void flatMap(UserBehavior input,CollectorTuple3 out)throws Exception int behaviorCnt=1;/behavior有可能为pv、cart、fav、buy等/判断状态中是否有该behavior if(behaviorMapState.contains(input.behavior)behaviorCnt=behaviorMapState.get(input.behavior)+1;/更新状态
45、behaviorMapState.put(input.behavior,behaviorCnt);out.collect(Tuple3.of(input.userId,input.behavior,behaviorCnt);使用MapState记录某个behavior下的数量 l UserBehavior案例l 先基于userId进行keyByl 再使用有状态的MapStateFunction进行处理Keyed Stateenv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);DataStream userBehaviorSt
46、ream=./生成一个KeyedStream KeyedStream keyedStream=userBehaviorStream.keyBy(user-user.userId);/在KeyedStream上进行flatMap DataStreamTuple3 behaviorCountStream=keyedStream.flatMap(new MapStateFunction();l 状态:算子子任务的本地数据在Checkpoint过程时写入存储,这个过程被称为备份(Snapshot)初始化或重启一个Flink作业时,以一定逻辑从存储中读出并变为算子子任务的本地数据,这个过程被称为重建(R
47、estore)l Keyed State开箱即用:数据划分基于Key,Snapshot和Restore过程可以基于Key在多个算子子任务之间做数据迁移l Operator State每个算子子任务管理自己的状态,流入到这个算子子任务上的所有数据可以访问和修改Operator State故障重启后,数据流中某个元素不一定流入重启前的算子子任务上需要根据具体业务场景设计Snapshot和Restore的逻辑使用CheckpointedFunction接口类Operator Statel Flink定期执行Checkpoint,会将状态数据Snapshot到存储上l 每次执行Snapshot,会调用
48、snapshotState()方法,因此我们要实现一些Snapshot逻辑,比如将哪些状态持久化l initializeState()在算子子任务初始化状态时调用,有两种被调用的可能:整个Flink作业第一次执行,状态数据需要初始化一个默认值Flink作业遇到故障重启,基于之前已经持久化的状态恢复l ListState/UnionListStatel BroadcastStateOperator Statepublic interface CheckpointedFunction /Checkpoint时会调用这个方法,我们要实现具体的snapshot逻辑,比如将哪些本地状态持久化 void
49、snapshotState(FunctionSnapshotContext context)throws Exception;/初始化时会调用这个方法,向本地状态中填充数据 void initializeState(FunctionInitializationContext context)throws Exception;CheckpointedFunction源码l 状态以列表的形式序列化并存储l 单个状态为S,每个算子子任务有零到多个状态,共同组成一个列表 ListStateS,Snapshot时将这些状态以列表形式写入存储l 包含所有状态的大列表,当作业重启时,将这个大列表重新分布到各
50、个算子子任务上l ListState:将大列表按照Round-Ribon模式均匀分布到各个算子子任务上,每个算子子任务得到的是大列表的子集l UnionListState:将大列表广播给所有算子子任务l 应用场景:Source上保存流入数据的偏移量,Sink上对输出数据做缓存Operator State ListState、UnionListStateOperator State 使用方法l 重点实现snapshotState()和initializeState()两个方法l 在initializeState()方法里初始化并获取状态注册StateDescriptor,指定状态名字和数据类型从