1、Kafka介绍研发二部内容kafka是什么kafka体系结构kafka设计理念简介kafka安装部署kafka producer和consumer开发Kafka关键词分布式发布-订阅消息系统LinkedIn 公司开发Scala语言分布式的,可划分的,多订阅者冗余备份持久性重复消费Kafka关键特性同时为发布和订阅提供高吞吐量。据了解,Kafka 每秒可以生产约 25 万消息(50 MB),每秒处理 55 万消息(110 MB)。可进行持久化操作。将消息持久化到磁盘,因此可用于批量消费,例如 ETL,以及实时应用程序。通过将数据持久化到硬盘以及 replication 防止数据丢失。分布式系统,
2、易于向外扩展。所有的 producer、broker 和 consumer 都会有多个,均为分布式的。无需停机即可扩展机器。消息被处理的状态是在 consumer 端维护,而不是由 server 端维护。当失败时能自动平衡。支持 online 和 offline 的场景。Kafka的两大法宝数据文件的分段:Kafka解决查询效率的手段之一是将数据文件分段;为数据文件建索引:为了进一步提高查找的效率,为了进一步提高查找的效率,Kafka为每个分段后的数据文件建立了索引文件,文为每个分段后的数据文件建立了索引文件,文件名与数据文件的名字是一样的,只是文件扩展名为件名与数据文件的名字是一样的,只是文
3、件扩展名为.index。索引文件中包含若干个索引条目,每个条目表示数据文件中一条Message的索引。索引包含两个部分(均为4个字节的数字),分别为相对offset和position。索引优化:稀疏存储,每隔一定字节的数据建立一条索引。消息队列分类点对点:消息生产者生产消息发送到queue中,然后消息消费者从queue中取出并且消费消息。注意:消息被消费以后,queue中不再有存储,所以消息消费者不可能消费到已经被消费的消息。Queue支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费。发布/订阅:消息生产者(发布)将消息发布到topic中,同时有多个消息消费者(订阅)消费该消息
4、。和点对点方式不同,发布到topic的消息会被所有订阅者消费。消息队列MQ对比RabbitMQ:支持的协议多,非常重量级消息队列,对路由(Routing),负载均衡(Load balance)或者数据持久化都有很好的支持。ZeroMQ:号称最快的消息队列系统,尤其针对大吞吐量的需求场景,擅长的高级/复杂的队列,但是技术也复杂,并且只提供非持久性的队列。ActiveMQ:Apache下的一个子项,类似ZeroMQ,能够以代理人和点对点的技术实现队列。Redis:是一个key-Value的NOSql数据库,但也支持MQ功能,数据量较小,性能优于RabbitMQ,数据超过10K就慢的无法忍受Jafk
5、a,基于Kafka孵化,非Apache官方孵化,活跃度也不是很高Kafka架构Kafka的基本概念Producers:消息和数据生产者,向 Kafka 的一个 topic 发布消息的过程叫做 producers。Consumers:消息和数据消费者,订阅 topics 并处理其发布的消息的过程叫做 consumers。Broker:缓存代理,Kafka 集群中的一台或多台服务器统称为 broker。Topic:特指 Kafka 处理的消息源(feeds of messages)的不同分类。Partition:Topic 物理上的分组,一个 topic 可以分为多个 partition,每个 p
6、artition 是一个有序的队列。partition 中的每条消息都会被分配一个有序的 id(offset)。Message:消息,是通信的基本单位,每个 producer 可以向一个 topic(主题)发布一些消息。Kafka的ProducersProducer将消息发布到指定的Topic中,同时Producer也能决定将此消息归属于哪个partition;比如基于round-robin方式或者通过其他的一些算法等.消息和数据生产者,向 Kafka 的一个 topic 发布消息的过程叫做 producers。异步发送批量发送可以很有效的提高发送效率。Kafka producer的异步发送模
7、式允许进行批量发送,先将消息缓存在内存中,然后一次请求批量发送出去。Kafka的broker1. Broker没有副本机制,一旦broker宕机,该broker的消息将都不可用。2. Broker:缓存代理,Kafka 集群中的一台或多台服务器统称为 broker。 broker会将消息暂时buffer起来,当消息的个数(或尺寸)达到一定阀值时,再flush到磁盘3. Broker不保存订阅者的状态,由订阅者自己保存。4. 无状态导致消息的删除成为难题(可能删除的消息正在被订阅),kafka采用基于时间的SLA(服务水平保证),消息保存一定时间(通常为7天)后会被删除。5. 消息订阅者可以re
8、wind back到任意位置重新进行消费,当订阅者故障时,可以选择最小的offset(id)进行重新读取消费消息。Kafka的Consumers消息和数据消费者,订阅 topics 并处理其发布的消息的过程叫做 consumers。本质上kafka只支持Topic.每个consumer属于一个consumer group;反过来说,每个group中可以有多个consumer.发送到Topic的消息,只会被订阅此Topic的每个group中的一个consumer消费.可以认为一个group是一个订阅者,一个Topic中的每个partions,只会被一个订阅者中的一个consumer消费,不过一个
9、 consumer可以消费多个partitions中的消息.kafka只能保证一个partition中的消息被某个consumer消费时,消息是顺 序的.事实上,从Topic角度来说,消息仍不是有序的.注: kafka的设计原理决定,对于一个topic,同一个group中不能有多于partitions个数的consumer同时消费,否则将意味着某些consumer将无法得到消息.一个partition中的消息只会被group中的一个consumer消费;每个group中consumer消息消费互相独立;Kafka的Topics/Log一个Topic可以认为是一类消息,每个topic将被分成多p
10、artition(区),每个partition在存储层面是append log文件。任何发布到此partition的消息都会被直接追加到log文件的尾部,每条消息在文件中的位置称为offset(偏移量),partition是以文件的形式存储在文件系统中。Logs文件根据broker中的配置要求,保留一定时间后删除来释放磁盘空间。Partition:Topic 物理上的分组,一个 topic 可以分为多个 partition,每个 partition 是一个有序的队列。 partition 中的每条消息都会被分配一个有序的 id(offset)。Kafka的partitions 设计目的:kaf
11、ka基于文件存储.通过分区,可以将日志内容分散到多个server上,来避免文件尺寸达到单机磁盘的上限,每个partiton都会被当前server(kafka实例)保存;可以将一个topic切分多任意多个partitions,来消息保存/消费的效率.越多的partitions意味着可以容纳更多的consumer,有效提升并发消费的能力.Kafka的MessageMessage消息:是通信的基本单位,每个 producer 可以向一个 topic(主题)发布一些消息。Kafka中的Message是以topic为基本单位组织的,不同的topic之间是相互独立的。每个topic又可以分成几个不同的pa
12、rtition(每个topic有几个partition是在创建topic时指定的),每个partition存储一部分Message。partition中的每条Message包含了以下三个属性:offset对应类型:longMessageSize对应类型:int32data是message的具体内容Kafka的MessageKafka的 offset每条消息在文件中的位置称为offset(偏移量)。offset 为一个long型数字,它是唯一标记一条消息。它唯一的标记一条消息。kafka并没有提供其他额外的索引机制来存储offset,因为在kafka中几 乎不允许对消息进行“随机读写”。Part
13、ition中的每条Message由offset来表示它在这个partition中的偏移量,这个offset不是该Message在partition数据文件中的实际存储位置,而是逻辑上一个值,它唯一确定了partition中的一条Message。因此,可以认为offset是partition中Message的id。怎样记录每个consumer处理的信息的状态?在Kafka中仅保存了每个consumer已经处理数据的offset。这样有两个好处:1)保 存的数据量少 2)当consumer出错时,重新启动consumer处理数据时,只需从最近的offset开始处理数据即可。Kafka的消息处理机制
14、 1. 发送到partitions中的消息将会按照它接收的顺序追加到日志中 2. 对于消费者而言,它们消费消息的顺序和日志中消息顺序一致. 3. 如果Topic的replication factor为N,那么允许N-1个kafka实例失效.4. kafka对消息的重复、丢失、错误以及顺序型没有严格的要求。5. kafka提供at-least-once delivery,即当consumer宕机后,有些消息可能会被重复delivery。6. 因每个partition只会被consumergroup内的一个consumer消费,故kafka保证每个partition内的消息会被顺序的订阅。7. K
15、afka为每条消息为每条消息计算CRC校验,用于错误检测,crc校验不通过的消息会直接被丢弃掉。ack校验,当消费者消费成功,返回ack信息!数据传输的事务定义at most once: 最多一次,这个和JMS中非持久化消息类似.发送一次,无论成败,将不会重发.at least once: 消息至少发送一次,如果消息未能接受成功,可能会重发,直到接收成功.exactly once: 消息只会发送一次.at most once: 消费者fetch消息,然后保存offset,然后处理消息;当client保存offset之后,但是在消息处理过程中出现了异常,导致部分消息未能继续处理.那么此后未处理的
16、消息将不能被fetch到,这就是at most once.at least once: 消费者fetch消息,然后处理消息,然后保存offset.如果消息处理成功之后,但是在保存offset阶段zookeeper异常导致保存操作未能执行成功,这就导致接下来再次fetch时可能获得上次已经处理过的消息,这就是at least once,原因offset没有及时的提交给zookeeper,zookeeper恢复正常还是之前offset状态.exactly once: kafka中并没有严格的去实现(基于2阶段提交,事务),我们认为这种策略在kafka中是没有必要的.注:通常情况下at-least-
17、once是我们首选.(相比at most once而言,重复接收数据总比丢失数据要好).Kafka的储存策略1. kafka以topic来进行消息管理,每个topic包含多个part(ition),每个part对应一个逻辑log,有多个segment组成。2. 每个segment中存储多条消息(见下图),消息id由其逻辑位置决定,即从消息id可直接定位到消息的存储位置,避免id到位置的额外映射。3.broker 收到发布消息往对应 partition 的最后一个 segment 上添加该消息Kafka的储存策略4. 每个part在内存中对应一个index,记录每个segment中的第一条消息偏
18、移。5. 发布者发到某个topic的 消息会被均匀的分布到多个part上(随机或根据用户指定的回调函数进行分布),broker收到发布消息往对应part的最后一个segment上添加 该消息,当某个segment上的消息条数达到配置值或消息发布时间超过阈值时,segment上的消息会被flush到磁盘,只有flush到磁盘上的 消息订阅者才能订阅到,segment达到一定的大小后将不会再往该segment写数据,broker会创建新的segment。Kafka的消息发送的流程由于 kafka broker 会持久化数据,broker 没有内存压力,因此,consumer 非常适合采取 pull
19、 的方式消费数据Producer 向Kafka(push)推数据consumer 从kafka 拉(pull)数据。Kafka设计原理实现kafka 以 topic 来进行消息管理,发布者发到某个 topic 的消息会被均匀的分布到多个 partition上每个 topic 包含多个 partition,每个 part 对应一个逻辑 log,有多个 segment 组成。每个 segment 中存储多条消息,消息 id 由其逻辑位置决定,即从消息 id 可直接定位到消息的存储位置,避免 id 到位置的额外映射。每个 part 在内存中对应一个 index,记录每个 segment 中的第一条消
20、息偏移。当某个 segment 上的消息条数达到配置值或消息发布时间超过阈值时,segment 上的消息会被 flush 到磁盘,只有 flush 到磁盘上的消息订阅者才能订阅到,segment 达到一定的大小后将不会再往该 segment 写数据,broker 会创建新的 segment。 显式分布式,即所有的 producer、broker 和 consumer 都会有多个,均为分布式的。Producer 和 broker 之间没有负载均衡机制。broker 和 consumer 之间利用 zookeeper 进行负载均衡。所有 broker 和 consumer 都会在 zookeepe
21、r 中进行注册,且 zookeeper 会保存他们的一些元数据信息。如果某个 broker 和 consumer 发生了变化,所有其他的 broker 和 consumer 都会得到通知。Kafka的分布式实现 一个Topic的多个partitions,被分布在kafka集群中的多个server上;每个server(kafka实例)负责partitions中消息的读写操作; 此外kafka还可以配置partitions需要备份的个数(replicas),每个partition将会被备份到多台机器上,以提高可用性; 基于replicated方案,那么就意味着需要对多个备份进行调度; 每个part
22、ition都有一个server为leader;leader负责所有的读写操作,如果leader失效,那么将会有其他follower来接管(成为新的leader); follower只是单调的和leader跟进,同步消息即可.由此可见作为leader的server承载了全部的请求压力,因此从集群的整体考虑,有多少个partitions就意味着有多少个leader; kafka会将leader均衡的分散在每个实例上,来确保整体的性能稳定.Kafka数据持久化数据持久化:发现线性的访问磁盘,很多时候比随机的内存访问快得多传统的使用内存做为磁盘的缓存Kafka直接将数据写入到日志文件中日志数据持久化特
23、性:写操作:通过将数据追加到文件中实现读操作:读的时候从文件中读就好了对比JVM特性:Java对象占用空间是非常大的,差不多是要存储的数据的两倍甚至更高随着堆中数据量的增加,垃圾回收回变的越来越困难优势:读操作不会阻塞写操作和其他操作,数据大小不对性能产生影响; 没有容量限制(相对于内存来说)的硬盘空间建立消息系统; 线性访问磁盘,速度快,可以保存任意一段时间!Kafka安装下载解压tar -xzf kafka_2.11-0.10.0.0.tgz启动服务首先启动zookeeper服务bin/zookeeper-server-start.sh config/zookeeper.propertie
24、s启动Kafkabin/kafka-server-start.sh config/server.properties创建topic创建一个test的topic,一个分区一个副本bin/kafka-topics.sh -create -zookeeper localhost:2181 -replication-factor 1 -partitions 1 -topic test查看主题bin/kafka-topics.sh -list -zookeeper localhost:2181查看主题详情bin/kafka-topics.sh -describe -zookeeper localhost
25、:2181 -topic test删除主题bin/kafka-topics.sh -zookeeper localhost:2181 -delete -topic testKafka客户端操作创建生产者 producerbin/kafka-console-producer.sh -broker-list localhost:9092 -topic test 创建消费者 consumerbin/kafka-console-consumer.sh -zookeeper localhost:2181 -topic test -from-beginning参数使用帮组信息查看:生产者参数查看:bin/
26、kafka-console-producer.sh消费者参数查看:bin/kafka-console-consumer.shKafka多broker部署修改config/service.propertiesbroker.id=0port=9020log.dirs=/tmp/kafka0-logs复制service.properties生成service1.propertiesbroker.id=1#id不能一样port=9040#port不能一样log.dirs=/tmp/kafka1-logs启动多个brokerbin/kafka-server-start.sh config/service
27、.properties &bin/kafka-server-start.sh config/service1.properties &创建主题bin/kafka-topics.sh -create -zookeeper localhost:2181 -replication-factor 3 -partitions 1 -topic testkafka集群安装安装zk集群修改配置文件broker.id: 唯一,填数字host.name:唯一,填服务器zookeeper.connect=192.168.40.134:2181,192.168.40.132:2181,192.168.40.133:
28、2181Kafka的核心配置server.properties配置详情见注释broker.id=work.threads=2num.io.threads=8socket.send.buffer.bytes=1048576socket.receive.buffer.bytes=1048576socket.request.max.bytes=104857600log.dirs=/tmp/kafka-logsnum.partitions=2log.retention.hours=168log.segment.bytes=536870912log.retention.check.interval.ms
29、=60000log.cleaner.enable=falsezookeeper.connect=localhost:2181zookeeper.connection.timeout.ms=1000000Kafka的一致性MQ要实现从producer到consumer之间的可靠的消息传送和分发。传统的MQ系统通常都是通过broker和consumer间的确认 (ack)机制实现的,并在broker保存消息分发的状态。即使这样一致性也是很难保证的(当然kafka也支持ack)。kafka保证一致性的做法是由 consumer自己保存状态,也不要任何确认。这样虽然consumer负担更重,但其实更灵
30、活了。因为不管consumer上任何原因导致需要重新处 理消息,都可以再次从broker获得。 Kafka的高可用性Kafaka可以将log文件复制到其他topic的分隔点(可以看成是server)。当一个server在集群中fails,可以允许自动的failover到其他的复制的server,所以消息可以继续存在在这种情况下。Kafka的负载均衡Producer和broker之间没有负载均衡机制。 负载均衡可以分为两个部分:producer发消息的负载均衡和consumer读消息的负载均衡。producer有一个到当前所有broker的连接池,当一个消息需要发送时,需要决定发到哪个broke
31、r(即partition)。consumer读取消息时,除了考虑当前的broker情况外,还要考虑其他consumer的情况,才能决定从哪个partition读取消息。多个 partition 需要选取出 lead partition,lead partition 负责读写,broker和consumer之间利用zookeeper进行负载均衡。所有broker和consumer都会在zookeeper中进行注册,且 zookeeper会保存他们的一些元数据信息。如果某个broker和consumer发生了变化,所有其他的broker和consumer都会得到 通知。Kafka 可扩展性当需要增
32、加 broker 结点时,新增的 broker 会向 zookeeper 注册,而 producer 及 consumer 会根据注册在 zookeeper 上的 watcher 感知这些变化,并及时作出调整,这样就保证了添加或去除broker时,各broker间仍能自动实现负载均衡。Kafka的Zookeeper协调控制1. 管理broker与consumer的动态加入与离开。2. 触发负载均衡,当broker或consumer加入或离开时会触发负载均衡算法,使得一个consumer group内的多个consumer的订阅负载平衡。3. 维护消费关系及每个partion的消费信息。Zook
33、eeper上的细节:1. 每个broker启动后会在zookeeper上注册一个临时的broker registry,包含broker的ip地址和端口号,所存储的topics和partitions信息。2. 每个consumer启动后会在zookeeper上注册一个临时的consumer registry:包含consumer所属的consumer group以及订阅的topics。3. 每个consumer group关 联一个临时的owner registry和一个持久的offset registry。对于被订阅的每个partition包含一个owner registry,内容为订阅这个p
34、artition的consumer id;同时包含一个offset registry,内容为上一次订阅的offset。写在最后写在最后成功的基础在于好的学习习惯成功的基础在于好的学习习惯The foundation of success lies in good habits谢谢聆听 学习就是为了达到一定目的而努力去干, 是为一个目标去战胜各种困难的过程,这个过程会充满压力、痛苦和挫折Learning Is To Achieve A Certain Goal And Work Hard, Is A Process To Overcome Various Difficulties For A Goal