1、Kafka 深度分析架构kafka 是显式分布式架构,producer、broker(Kafka)和 consumer 都可以有多个。Kafka 的运行依赖于 ZooKeeper,Producer 推送消息给 kafka,Consumer 从 kafka 拉消息。kafka 关键技术点(1 ) zero-copy在 Kafka 上,有两个原因可能导致低效:1 )太多的网络请求 2)过多的字节拷贝。为了提高效率,Kafka 把 message 分成一组一组的,每次请求会把一组 message 发给相应的consumer。 此外, 为了减少字节拷贝,采用了 sendfile 系统调用。为了理解 s
2、endfile 原理,先说一下传统的利用 socket 发送文件要进行拷贝:Sendfile 系统调用:(2 ) Exactly once message transfer怎样记录每个 consumer 处理的信息的状态?在 Kafka 中仅保存了每个 consumer 已经处理数据的 offset。这样有两个好处:1 )保存的数据量少 2)当 consumer 出错时,重新启动consumer 处理数据时,只需从最近的 offset 开始处理数据即可。(3 ) Push/pullProducer 向 Kafka(push)推数据, consumer 从 kafka 拉(pull)数据。(4
3、)负载均衡和容错Producer 和 broker 之间没有负载均衡机制。broker 和 consumer 之间利用 zookeeper 进行负载均衡。所有 broker 和 consumer 都会在zookeeper 中进行注册,且 zookeeper 会保存他们的一些元数据信息。如果某个 broker 和consumer 发生了变化,所有其他的 broker 和 consumer 都会得到通知。kafka 术语TopicTopic,是 KAFKA 对 消 息 分 类 的 依 据 ;一 条 消 息 ,必 须 有 一 个 与 之 对 应 的 Topic;比 如 现 在 又 两 个 Topic
4、,分 别 是 TopicA 和 TopicB,Producer 向 TopicA 发 送 一 个 消 息messageA,然 后 向 TopicB 发 送 一 个 消 息 messaeB;那 么 ,订 阅 TopicA 的 Consumer 就 会收 到 消 息 messageA,订 阅 TopicB 的 Consumer 就 会 收 到 消 息 messaeB;(每 个 Consumer可 以 同 时 订 阅 多 个 Topic,也 即 是 说 ,同 时 订 阅 TopicA 和 TopicB 的 Consumer 可 以 收 到messageA 和 messaeB)。同 一 个 Group
5、 id 的 consumers 在 同 一 个 Topic 的 同 一 条 消 息 只 能 被 一 个consumer 消 费 , 实 现 了 点 对 点 模 式 , 不 同 Group id 的 Consumers 在 同 一 个 Topic 上的 同 一 条 消 息 可 以 同 时 消 费 到 , 则 实 现 了 发 布 订 阅 模 式 。 通 过 Consumer 的 Group id实 现 了 JMS 的 消 息 模 式MessageMessage 就 是 消 息 ,是 KAfKA 操 作 的 对 象 ,消 息 是 按 照 Topic 存 储 的 ;KAFKA 中 按 照 一 定 的
6、期 限 保 存 着 所 有 发 布 过 的 Message,不 管 这 些 Message是 否 被 消 费 过 ;例 如 这 些 Message 的 保 存 期 限 被 这 只 为 两 天 ,那 么 一 条Message 从 发 布 开 始 的 两 天 时 间 内 是 可 用 的 ,超 过 保 存 期 限 的 消 息 会 被 清 空以 释 放 存 储 空 间 。消 息 都 是 以 字 节 数 组 进 行 网 络 传 递 。Partition每 一 个 Topic 可 以 有 多 个 Partition,这 样 做 是 为 了 提 高 KAFKA 系 统 的 并 发能 力 , 每 个 Part
7、ition 中 按 照 消 息 发 送 的 顺 序 保 存 着 Producer 发 来 的 消 息 ,每 个 消 息 用 ID 标 识 ,代 表 这 个 消 息 在 改 Partition 中 的 偏 移 量 ,这 样 ,知 道 了 ID,就可 以 方 便 的 定 位 一 个 消 息 了 ;每 个 新 提 交 过 来 的 消 息 ,被 追 加 到 Partition 的尾 部 ;如 果 一 个 Partition 被 写 满 了 ,就 不 再 追 加 ;(注 意 ,KAFKA 不 保 证 不 同Partition 之 间 的 消 息 有 序 保 存 )LeaderPartition 中 负
8、责 消 息 读 写 的 节 点 ;Leader 是 从 Partition 的 节 点 中 随 机选 取 的 。 每 个 Partition 都 会 在 集 中 的 其 中 一 台 服 务 器 存 在 Leader。 一 个Topic 如 果 有 多 个 Partition, 则 会 有 多 个 Leader。ReplicationFactor一 个 Partition 中 复 制 数 据 的 所 有 节 点 ,包 括 已 经 挂 了 的 ;数 量 不 会 超 过 集群 中 broker 的 数 量isrReplicationFactor 的 子 集 ,存 活 的 且 和 Leader 保 持
9、 同 步 的 节 点 ;Consumer Group传 统 的 消 息 系 统 提 供 两 种 使 用 方 式 :队 列 和 发 布 -订 阅 ;队 列 : 是 一 个 池 中 有 若 干 个 Consumer,一 条 消 息 发 出 来 以 后 ,被 其 中 的 一个 Consumer 消 费 ;发 布 -订 阅 : 是 一 个 消 息 被 广 播 出 去 ,之 后 被 所 有 订 阅 该 主 题 的 Consumer 消 费 ;KAFKA 提 供 的 使 用 方 式 可 以 达 到 以 上 两 种 方 式 的 效 果 :Consumer Group;每 一 个 Consumer 用 Con
10、sumer Group Name 标 识 自 己 ,当 一 条 消 息 产 生 后 ,改消 息 被 订 阅 了 其 Topic 的 Consumer Group 收 到 ,之 后 被 这 个 Consumer Group 中 的 一 个 Consumer 消 费 ;如 果 所 有 的 Consumer 都 在 同 一 个 Consumer Group 中 ,那 么 这 就 和 传 统的 队 列 形 式 的 消 息 系 统 一 样 了 ;如 果 每 一 个 Consumer 都 在 一 个 不 同 的 Consumer Group 中 ,那 么 就 和 传统 的 发 布 -订 阅 的 形 式 一
11、 样 了 ;Offset消费者自己维护当前读取数据的 offser,或者同步到 zookeeper。mit.interval.ms 是 consumer 同步 offset 到 zookeeper 的时间间隔。这个值设置问题会影响到多线程 consumer,重复读取的问题。安装启动配置环境安装下载 kafka_2.11-0.8.2.1,并在 linux 上解压 tar -xzf kafka_2.11-0.8.2.1.tgz cd kafka_2.11-0.8.2.1/bin可用的命令如下:启动命令Kafka 需要用到 zookeeper,所有首先需要启动 zookeeper。 ./zookee
12、per-server-start.sh ./config/zookeeper.properties &然后启动 kafka 服务 ./kafka-server-start.sh ./config/server.properties &创建 Topic创建一个名字是”p2p”的 topic,使用一个单独的 partition 和和一个 replica ./kafka-topics.sh -create -zookeeper localhost:2181 -replication-factor 1 -partitions 1 -topic p2p使用命令查看 topic ./kafka-topic
13、s.sh -list -zookeeper localhost:2181p2p除了使用命令创建 Topic 外,可以让 kafka 自动创建,在客户端使用的时候,指定一个不存在的 topic,kafka 会自动给创建 topic,自动创建将不能自定义 partition 和 relica。集群多 broker将上述的单节点 kafka 扩展为 3 个节点的集群。从原始配置文件拷贝配置文件。 cp ./config/server.properties ./config/server-1.properties cp ./config/server.properties ./config/serve
14、r-2.properties修改配置文件。config/server-1.properties:broker.id=1port=9093log.dir=/tmp/kafka-logs-1config/server-2.properties:broker.id=2port=9094log.dir=/tmp/kafka-logs-2注意在集群中 broker.id 是唯一的。现在在前面单一节点和 zookeeper 的基础上,再启动两个 kafka节点。 ./kafka-server-start.sh ./config/server-1.properties & ./kafka-server-st
15、art.sh ./config/server-2.properties &创建一个新的 topic,带三个 ReplicationFactor ./kafka-topics.sh -create -zookeeper localhost:2181 -replication-factor 3 -partitions 1 -topic p2p-replicated-topic查看刚刚创建的 topic。 ./kafka-topics.sh -describe -zookeeper localhost:2181 -topic p2p-replicated-topicpartiton: partion
16、 id,由于此处只有一个 partition,因此 partition id 为 0leader:当前负责读写的 lead broker idrelicas:当前 partition 的所有 replication broker listisr:relicas 的子集,只包含出于活动状态的 brokerTopic-Partition-Leader-ReplicationFactor 之间的关系样图以上创建了三个节点的 kafka 集群,在集群上又用命令创建三个 topic,分别是: replicated3-partitions3-topic:三份复制三个 partition的 topic re
17、plicated2-partitions3-topic:二份复制三个 partition的 topic test:1 份复制,一个 partition 的 topic以我做测试创建的三个 topic 说明他们之间的关系。./kafka-topics.sh -describe -zookeeper localhost:2181 -topic replicated3-partitions3-topic./kafka-topics.sh -describe -zookeeper localhost:2181 -topic replicated2-partitions3-topic./kafka-to
18、pics.sh -describe -zookeeper localhost:2181 -topic test以 kafka 当前的描述画出以下关系图:Partition 0LeaderBroker Id 0Topic: testBroker Id 1Broker Id 2Partition 0LeaderTopic:replicated3-partitions3-topicPartition 0ReplicasTopic:replicated3-partitions3-topicPartition 0ReplicasTopic:replicated3-partitions3-topicPar
19、tition 1ReplicasTopic:replicated3-partitions3-topicPartition 1ReplicasTopic:replicated3-partitions3-topicPartition 1LeaderTopic:replicated3-partitions3-topicPartition 2LeaderTopic:replicated3-partitions3-topicPartition 2ReplicasTopic:replicated3-partitions3-topicPartition 2ReplicasTopic:replicated3-
20、partitions3-topicPartition 0LeaderTopic:replicated2-partitions2-topicPartition 0ReplicasTopic:replicated2-partitions2-topicPartition 1LeaderTopic:replicated2-partitions2-topicPartition 1ReplicasTopic:replicated2-partitions2-topic从图上可以看到 test 没有备份,当 broke Id 0 宕机后,虽然集群还有两个节点可以使用,但 test 这个 topic 却不能正常
21、转发消息了。所以为了系统的可靠性,创建的 replicas 尽量的多,但却不能超过 broker 的数量。客户端使用 APIProducer API从 0.8.2 版本开始,apache 提供了新的 java 版本的 Producer 的 API。这个 java 版本在测试中表现比之前的 scala 客户端性能要好。Pom 获取 java 客户端:org.apache.kafkakafka-clients0.8.2.1ExampleConsumer APIKafka 0.8.2.1 版本已经放出了 java 版的 consumer,看 javadoc 文档和代码不太匹配,也没有样例来说明 ja
22、va 版的 consumer 的使用样例,这里还是用 scala 版的 consumer API 来使用。Kafka 提供了两套 API 给 Consumer:The high-level Consumer API:高度抽象的 Consumer API,封装了很多 consumer需要的高级功能,使用起来简单、方便The SimpleConsumer API:只有最基本的链接、读取功能,可以自己去读 offset,并指定 offset 的读取方式。适合于各种自定义High Levelclass Consumer/* Create a ConsumerConnector:创建 consumer connector* param config at the minimum, need to specify the groupid of the consumer and the zookeeper connection string zookeeper.connect.config 参数作用:需要置顶consumer 的 groupid 以及 zookeeper 连接字符串 zookeeper.connect