1、Flume技术原理深入理解技术创新,变革未来智慧ITFlume简介 flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(比如文本、HDFS、Hbase等)的能力。flume的数据流由事件(Event)贯穿始终。事件是Flume的基本数据单位,它携带日志数据(字节数组形式)并且携带有头信息,这些Event由Agent外部的Source生成,当Source捕获事件后会进行特定的格式化,然后Source会把事件推入(单个或多个)Channel中。你可以把Channel看
2、作是一个缓冲区,它将保存事件直到Sink处理完该事件。Sink负责持久化日志或者把事件推向另一个Source。flume版本对比Flume-og采用了多Master的方式。为了保证配置数据的一致性,Flume引入了ZooKeeper,用于保存配置数据,ZooKeeper本身可保证配置数据的一致性和高可用,另外,在配置数据发生变化时,ZooKeeper可以通知FlumeMaster节点。FlumeMaster间使用gossip协议同步数据。Flume-ng最明显的改动就是取消了集中管理配置的Master和Zookeeper,变为一个纯粹的传输工具。Flume-ng另一个主要的不同点是读入数据和写
3、出数据现在由不同的工作线程处理(称为Runner)。在Flume-og中,读入线程同样做写出工作(除了故障重试)。如果写出慢的话(不是完全失败),它将阻塞Flume接收数据的能力。这种异步的设计使读入线程可以顺畅的工作而无需关注下游的任何问题。flume的优势1.Flume可以将应用产生的数据存储到任何集中存储器中,比如HDFS,HBase2.当收集数据的速度超过将写入数据的时候,也就是当收集信息遇到峰值时,这时候收集的信息非常大,甚至超过了系统的写入数据能力,这时候,Flume会在数据生产者和数据收容器间做出调整,保证其能够在两者之间提供平稳的数据.3.提供上下文路由特征4.Flume的管道
4、是基于事务,保证了数据在传送和接收时的一致性.5.Flume是可靠的,容错性高的,可升级的,易管理的,并且可定制的。flume的特征1. Flume可以高效率的将多个网站服务器中收集的日志信息存入HDFS/HBase中2. 使用Flume,我们可以将从多个服务器中获取的数据迅速的移交给Hadoop中3. 除了日志信息,Flume同时也可以用来接入收集规模宏大的社交网络节点事件数据,比如facebook,twitter,电商网站如亚马逊,flipkart等4. 支持各种接入资源数据的类型以及接出数据类型5. 支持多路径流量,多管道接入流量,多管道接出流量,上下文路由等6. 可以被水平扩展Flum
5、e架构 Flume运行的核心是Agent。Flume以agent为最小的独立运行单位。一个agent就是一个JVM。它是一个完整的数据收集工具,含有三个核心组件,分别是 source、channel、sink。通过这些组件,Event可以从一个地方流向另一个地方,如下图所示。Flume组件 Client:Client生产数据,运行在一个独立的线程。 Event:一个数据单元,消息头和消息体组成。(Events可以是日志记录、avro对象等。) Flow:Event从源点到达目的点的迁移的抽象。 Agent:一个独立的Flume进程,包含组件Source、Channel、Sink。(Agent使
6、用JVM运行Flume。每台机器运行一个agent,但是可以在一个agent中包含多个sources和sinks。) Source:数据收集组件。(source从Client收集数据,传递给Channel) Channel:中转Event的一个临时存储,保存由Source组件传递过来的Event。(Channel连接sources和sinks,这个有点像一个队列。) Sink:从Channel中读取并移除Event,将Event传递到FlowPipeline中的下一个Agent(如果有的话)(Sink从Channel收集数据,运行在一个独立线程。)Source Source是数据的收集端,负责
7、将数据捕获后进行特殊的格式化,将数据封装到事件(event)里,然后将事件推入Channel中。Flume提供了很多内置的Source,支持Avro,log4j,syslog和httppost(body为json格式)。可以让应用程序同已有的Source直接打交道,如AvroSource 如果内置的Source无法满足需要, Flume还支持自定义Source。SourceSource具体作用:AvroSource:监听一个avro服务端口,采集Avro数据序列化后的数据;ThriftSource:监听一个Thrift服务端口,采集Thrift数据序列化后的数据;ExecSource:基于Un
8、ix的command在标准输出上采集数据;JMSSource:Java消息服务数据源,Java消息服务是一个与具体平台无关的API,这是支持jms规范的数据源采集;SpoolingDirectorySource:通过文件夹里的新增的文件作为数据源的采集;【测试header】KafkaSource:从kafka服务中采集数据。NetCatSource:绑定的端口(tcp、udp),将流经端口的每一个文本行数据作为Event输入HTTPSource:监听HTTPPOST和GET产生的数据的采集Channel Channel是连接Source和Sink的组件,大家可以将它看做一个数据的缓冲区(数据队
9、列),它可以将事件暂存到内存中也可以持久化到本地磁盘上,直到Sink处理完该事件。介绍两个较为常用的Channel,MemoryChannel和FileChannel。Channel:一个数据的存储池,中间通道。主要作用:接受source传出的数据,向sink指定的目的地传输。Channel中的数据直到进入到下一个channel中或者进入终端才会被删除。当sink写入失败后,可以自动重写,不会造成数据丢失,因此很可靠。channel的类型很多比如:内存中、jdbc数据源中、文件形式存储等。常见采集的数据类型:MemoryChannel、FileChannel、JDBCChannel、Kafka
10、Channel等详细查看:http:/flume.apache.org/FlumeUserGuide.html#flume-channelsChannel具体作用:MemoryChannel:使用内存作为数据的存储。JDBCChannel:使用jdbc数据源来作为数据的存储。KafkaChannel:使用kafka服务来作为数据的存储。FileChannel:使用文件来作为数据的存储。SpillableMemoryChannel:使用内存和文件作为数据的存储,即:先存在内存中,如果内存中数据达到阀值则flush到文件中。Sink Sink从Channel中取出事件,然后将数据发到别处,可以向文
11、件系统、数据库、hadoop存数据,也可以是其他agent的Source。在日志数据较少时,可以将数据存储在文件系统中,并且设定一定的时间间隔保存数据。Sink:数据的最终的目的地。主要作用:接受channel写入的数据以指定的形式表现出来(或存储或展示)。sink的表现形式很多比如:打印到控制台、hdfs上、avro服务中、文件中等。常见采集的数据类型:HDFSSink、HiveSink、LoggerSink、AvroSink、ThriftSink、FileRollSink、HBaseSink、KafkaSink等详细查看:http:/flume.apache.org/FlumeUserGu
12、ide.html#flume-sinksHDFSSink需要有hdfs的配置文件和类库。一般采取多个sink汇聚到一台采集机器负责推送到hdfs。Sink具体作用:HDFSSink:将数据传输到hdfs集群中。HiveSink:将数据传输到hive的表中。LoggerSink:将数据作为日志处理(根据flume中的设置的日志的级别显示)。AvroSink:数据被转换成AvroEvent,然后发送到配置的RPC端口上。ThriftSink:数据被转换成ThriftEvent,然后发送到配置的RPC端口上。IRCSink:数据向指定的IRC服务和端口中发送。FileRollSink:数据传输到本地
13、文件中。NullSink:取消数据的传输,即不发送到任何目的地。HBaseSink:将数据发往hbase数据库中。MorphlineSolrSink:数据发送到Solr搜索服务器(集群)。ElasticSearchSink:数据发送到ElasticSearch搜索服务器(集群)。KafkaSink:将数据发送到kafka服务中。Flume数据流 Flume的核心是把数据从数据源收集过来,再送到目的地。为了保证输送一定成功,在送到目的地之前,会先缓存数据,待数据真正到达目的地后,删除自己缓存的数据 Flume传输的数据的基本单位是Event,如果是文本文件,通常是一行记录,这也是事务的基本单位。
14、Event从Source,流向Channel,再到Sink,本身为一个byte数组,并可携带headers信息。Event代表着一个数据流的最小完整单元,从外部数据源来,向外部的目的地去。 值得注意的是,Flume提供了大量内置的Source、Channel和Sink类型。不同类型的Source,Channel和Sink可以自由组合。组合方式基于用户设置的配置文件,非常灵活。比如:Channel可以把事件暂存在内存里,也可以持久化到本地硬盘上。Sink可以把日志写入HDFS,HBase,甚至是另外一个Source等等。Flume支持用户建立多级流,也就是说,多个agent可以协同工作,并且支持
15、Fan-in、Fan-out、ContextualRouting、BackupRoutes,这也正是Flume强大之处。如下图所示:Flume可靠性Flume使用事务性的方式保证传送Event整个过程的可靠性。Sink必须在Event被存入Channel后,或者,已经被传达到下一站agent里,又或者,已经被存入外部数据目的地之后,才能把Event从Channel中remove掉。这样数据流里的event无论是在一个agent里还是多个agent之间流转,都能保证可靠,因为以上的事务保证了event会被成功存储起来。比如Flume支持在本地保存一份文件channel作为备份,而memorych
16、annel将event存在内存queue里,速度快,但丢失的话无法恢复。Flume的安装见安装文档flume的使用-案例一 NetCatSourceNetCatSource:监听一个指定的网络端口,即只要应用程序向这个端口里面写数据,这个source组件就可以获取到信息。NetCatSource配置文件配置文件:netcat.conf#指定Agent的组件名称(a),一个进程a.sources=r1a.channels=c1a.sinks=k1a.sources.r1.type=netcata.sources.r1.bind=192.168.8.81a.sources.r1.port=8888
17、a.sources.r1.channels=c1a.channels.c1.type=memorya.channels.c1.capacity=1000a.channels.c1.transactionCapacity=1000a.sinks.k1.channel=c1a.sinks.k1.type=logger在conf目录下启动flumeagenta服务端命令:flume-ngagent-na-c./conf-f./netcat.conf-Dflume.root.logger=DEBUG,console进入到 /usr/local/soft/flume/conf复制一个配置文件模板:cp
18、flume-conf.properties.template netcat.confSpooling Directory Source:监听一个指定的目录,即只要应用程序向这个指定的目录中添加新的文件,source组件就可以获取到该信息,并解析该文件的内容,然后写入到channle。写入完成后,标记该文件已完成或者删除该文件。其中Sink:logger Channel:memoryflume案例-案二:Spooling Directory SourceSpooling Directory SourceSpooling Directory Source的两个注意事项:Ifafileiswritt
19、entoafterbeingplacedintothespoolingdirectory,Flumewillprintanerrortoitslogfileandstopprocessing.即:拷贝到spool目录下的文件不可以再打开编辑Ifafilenameisreusedatalatertime,Flumewillprintanerrortoitslogfileandstopprocessing.即:不能将具有相同文件名字的文件拷贝到这个目录下a1.sources=r1a1.sinks=k1a1.channels=c1#Describe/configurethesourcea1.sour
20、ces.r1.type=spooldira1.sources.r1.spoolDir=/usr/local/datainputa1.sources.r1.fileHeader=truea1.sources.r1.interceptors=i1a1.sources.r1.interceptors.i1.type=timestamp#Describethesinka1.sinks.k1.type=logger#Useachannelwhichbufferseventsinmemorya1.channels.c1.type=memorya1.channels.c1.capacity=1000a1.c
21、hannels.c1.transactionCapacity=100#Bindthesourceandsinktothechannela1.sources.r1.channels=c1a1.sinks.k1.channel=c1启动命令:flume-ng agent -c ./conf -f ./spooldir.properties -n a1 -Dflume.root.logger=INFO,console在 conf目录下新建文件:spooldir.properties控制台显示页面案例三 sink-hdfs#具体定义sinka.sinks.k1.type=hdfsa.sinks.k1.
22、hdfs.path=hdfs:/master:9000/flume_hdfsa.sinks.k1.hdfs.filePrefix=events-a.sinks.k1.hdfs.minBlockReplicas=1a.sinks.k1.hdfs.fileType=DataStream#a.sinks.k1.hdfs.fileType=CompressedStream#a.sinks.k1.hdfs.codeC=gzip#不按照条数生成文件a.sinks.k1.hdfs.rollCount=0a.sinks.k1.hdfs.rollSize=33554432#每隔Ns将临时文件滚动成一个目标文件a
23、.sinks.k1.hdfs.rollInterval=0a.sinks.k1.hdfs.idleTimeout=0 j代码中采集日志写入flume-avro_sourceflume的conf配置a.sources=r1a.channels=c1a.sinks=k1a.sources.r1.type=avroa.sources.r1.bind=192.168.73.132a.sources.r1.port=41414a.sources.r1.channels=c1a.channels.c1.type=memorya.channels.c1.capacity=1000a.channels.c1.
24、transactionCapacity=1000a.sinks.k1.channel=c1a.sinks.k1.type=logger程序日志采集-log4j-appenderLog4JAppender:通过log4j直接将web的日志达到flume中的avro_source中。实现步骤:1、pom中添加依赖org.apache.flumeflume-ng-core1.6.0org.apache.flume.flume-ng-clientsflume-ng-log4jappender1.6.02、在log4j.properties配置信息。3、程序打印日志。flume采集到HDFS部分#具体定
25、义sinka.sinks.k1.type=hdfsa.sinks.k1.hdfs.path=hdfs:/master:9000/flume_hdfsa.sinks.k1.hdfs.filePrefix=events-a.sinks.k1.hdfs.minBlockReplicas=1a.sinks.k1.hdfs.fileType=DataStream#a.sinks.k1.hdfs.fileType=CompressedStream#a.sinks.k1.hdfs.codeC=gzip#不按照条数生成文件a.sinks.k1.hdfs.rollCount=0a.sinks.k1.hdfs.r
26、ollSize=33554432#每隔Ns将临时文件滚动成一个目标文件a.sinks.k1.hdfs.rollInterval=0a.sinks.k1.hdfs.idleTimeout=0 log4j配置项log4j.rootLogger=INFO,stdout,flumelog4j.appender.stdout = org.apache.log4j.ConsoleAppenderlog4j.appender.stdout.Target = System.outlog4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.
27、appender.stdout.layout.ConversionPattern=%dyyyy-MM-dd HH:mm:ss,SSS %t %c %p - %m%nlog4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppenderlog4j.appender.flume.Hostname = 192.168.73.138log4j.appender.flume.Port = 41414log4j.appender.flume.UnsafeMode = truelog4j.appender.flume.layou
28、t=org.apache.log4j.PatternLayout log4j.appender.flume.layout.ConversionPattern=%m%n多个agent顺序连接 可以将多个Agent顺序连接起来,将最初的数据源经过收集,存储到最终的存储系统中。这是最简单的情况,一般情况下,应该控制这种顺序连接的 Agent的数量,因为数据流经的路径变长了,如果不考虑failover的话,出现故障将影响整个Flow上的Agent收集服务。多个Agent的数据汇聚到同一个Agent 这种情况应用的场景比较多,比如要收集Web网站的用户行为日志,Web网站为了可用性使用的负载集群模式,每个节点都产生用户行为日志,可以为每个节点都配置一个Agent来单独收集日志数据,然后多个Agent将数据最终汇聚到一个用来存储数据存储系统,如HDFS上。多级流 Flume还支持多级流,什么多级流?结合在云开发中的应用来举个例子,当syslog,java,nginx、tomcat等混合在一起的日志流开始流入一个agent后,可以agent中将混杂的日志流分开,然后给每种日志建立一个自己的传输通道。负载均衡功能下图Agent1是一个路由节点,负责将Channel暂存的Event均衡到对应的多个Sink组件上,而每个Sink组件分别连接到一个独立的Agent上PAGE43THANKS