kafka 消息队列

摘要:
Kafka是一个用Java和Scala编写的快速、可扩展、高吞吐量的分布式消息队列系统。卡夫卡不会主动向消费者推送信息。消费者需要主动从代理读取数据。Kafka没有消息确认机制,消费者控制自己使用的消息。当消费者订阅主题时,kafka将通知消费者最新的偏移量。因此,kafkabroker不需要维护消息状态,这有利于提高吞吐量。与许多消息队列系统不同,kafka不删除消耗的信息,而是根据配置的超时或文件大小限制删除较早发送的消息或过大的分区文件。

kafka是使用Java和Scala编写的一个快速可扩展的高吞吐量的分布式消息队列系统。

kafka将数据持久化存储到磁盘上,自带分区和副本机制,因而具有较好的持久化保证。

但是kafka的消息消费没有确认机制,可能因为consumer崩溃导致消息没有完成处理。因此不建议将kafka用于一致性较高的业务场景,kafka经常被用做日志收集和数据仓库之间的缓存。

比如将网站的浏览日志缓存到kafka,然后从kafka中取出批量写入ElasticSearch, Hive或者HBase等数据仓库中。这样做可以极大的减轻离线分析系统的负载。

架构简介

kafka架构中有下列角色参与:

  • broker: kafka 集群中的服务器实例称为broker
  • producer: 向broker发送消息的客户端
  • consumer: 向从borker中读取消息的客户端
  • zookeeper: 存储集群状态的注册中心,不处理具体消息。在负载均衡和集群扩展等功能中有重要作用。

接下来介绍kafka的逻辑模型:

  • message: 消息是kafka通信的基本单元
  • topic: topic 在逻辑结构上类似于队列, 每条消息都属于一个 topic。
  • consumer group: 每个group中可以包含若干 consumer 实例,每个topic可以被多个consumer group 订阅。
    消费者组拥有唯一的 GroupID 进行标识, 每个 consumer 实例有且只有一个 GroupID。
  • partition: topic 被分为若干个 partition 进行存储,每条消息都属于一个 partition。
  • offset: 每条消息在 partition 中使用 offset (偏移量)作为唯一标识。

kafka 保证订阅某个 topic 的所有 consumer group 都会收到该 topic 中所有消息。

topic 中的一条消息在一个 consumer group 中都会被一个 consumer 读取,且仅会被该 consumer 读取。

若每个 consumer 都属于一个独立的 consumer group 那么消息会被所有 consumer 读取,即实现了消息广播。 若所有 consumer 属于同一个 consumer group, 那么消息只会被一个 consumer 读取,即实现消息单播。

kafka 不会主动将消息推送给消费者, 消费者需要主动从broker中读取数据。

kafka 没有消息确认机制,由 consumer 自行控制消费的消息。

partition与消息传递的实现

kafka 将一个 topic 中的数据存存储到多个 partition 中,每个 partition 分为多个段文件存储在 broker 节点上。

producer 会与 topic 下所有 partition 保持通信,并根据配置的算法(key-hash 或 round robin等)决定将消息写入哪个 partition 中。

partition 内部是有序的,但是同一个 topic 的多个 partition 之间不保证有序, 即 topic 不是整体有序的。

kafka 会为监听 topic 的 consumer 分配一个 partition。 在一个消费者组内,一个 partition 最多分配给一个 consumer。

当组内 consumer 数量大于 partition 数量时,可能有 consumer 分配不到数据。

一个 partition 可以被属于不同 group 的多个 consumer 监听。

consumer 监听不同 partition 的机制实现了消息只能被组内一个 consumer 消费的特性,避免使用锁机制极大提高了吞吐率简化了 broker 实现。

消费者通过 offset 标记自己读取的位置,主动读取 parttion 中的数据

消费者向 broker 发送包含 offset 和 max 参数的 fetch 请求来读取 partition中的数据。 因此,消费者可以自由设置 offset 来控制读取的位置,从而实现增量读取或从头读取等功能。

当消费者订阅某个 topic 时,kafka 会将最新的offset告知消费者。

消费者可以将自己当前的 offset 反馈给 kafka, kafka 会将状态保存到 zookeeper,使得消费者可以自由退出或者重新加入继续消费。

kafka 没有消息确认机制,完全由 consumer 设置 offset 来进行消费。因此,kafka broker 不需要维护消息状态,有利于提高吞吐率。

与很多消息队列系统不同的是, kafka 不会删除已消费的信息, 而是根据配置的超时时间或者文件大小限制,删除较早发送的消息或过大的partition文件。

replica

kafka 在0.8之后版本中支持了副本机制, 每个 topic 分为多个 partition, 每个 partition 存在多个 replica。

这些 replica 分布于不同的 broker 节点上, 降低单个 broker 宕机对系统可用性的影响。

kafka 的副本分布策略是: 在拥有 n 个 broker 节点的集群中, 将第 i 个 partition 的第 j 个 replica 存储在第 (i + j) % n 个 broker 上。

同一个 partition 的 replica 中存在一个 leader,生产者消费者只与 leader replica 进行交互, 其它 replica 从leader中同步数据。

kafka提供了两种主从复制机制:

  • 同步复制:消息被 partition 的所有 alive 状态 replica 复制消息才会成功提交,这种方式保证一致性却极大影响吞吐率。
  • 异步提交:消息被 partition 的 leader replica 写入即提交成功, 其它 replica 会异步同步数据。这种方式吞吐率较高但一致性较低,leader 崩溃可能导致消息丢失。

kafka通过两种机制判断alive状态:

  • zookeeper的心跳机制:broker必须维护zookeeper的session
  • slave 副本从 leader 复制数据的延迟不能超过阈值。
体验kafka

安装kafka

这里作者选择用homebrew进行安装.

brew install kafka

配置文件在/usr/local/etc/kafka/server.properties/usr/local/etc/kafka/zookeeper.properties

启动zookeeper:

zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties &

启动kafka:

kafka-server-start /usr/local/etc/kafka/server.properties &

命令行工具

创建topic:

kafka-topics --zookeeper localhost:2181 --create --topic test --partitions 30  --replication-factor 2 
  • zookeeper: 集群依赖的zookeeper服务地址
  • topic: topic 名称
  • partitions: topic 的 partition 数
  • replication-factor: 每个 partition 的副本数

查看 topic 信息:

kafka-topics --zookeeper localhost:2181 --describe --topic test

删除 topic:

kafka-topics --zookeeper localhost:2181 --delete --topic test

查看所有 topic:

kafka-topics --zookeeper localhost:2181 --list

修改Topic设置:

kafka-topics --zookeeper localhost:2181 --alter --topic test --partitions 4

该命令可以修改 partition 和 replica 配置。

发送消息:

kafka-console-producer --broker-list localhost:9092 --topic test

接收新消息:

kafka-console-consumer --zookeeper localhost:2181 --topic test

从头读取消息:

kafka-console-consumer --zookeeper localhost:2181 --topic test --from-beginning

读取 consumer group 状态:

kafka-consumer-groups --describe --group test --bootstrap-server localhost:9092

结果:

TOPICPARTITIONCURRENT-OFFSETLOG-END-OFFSETLAGCONSUMER-IDHOSTCLIENT-ID
test02752761consumer-1-5230c9/192.168.1.1consumer-1
test12742751consumer-2-c9e79/192.168.1.1consumer-2

其中:

  • CURRENT-OFFSET: consumer 当前消费进度(OFFSET)
  • LOG-END-OFFSET: partition 最大消费进度(OFFSET)
  • LAG: 未处理的消息数量,即 LOG-END-OFFSET - CURRENT-OFFSET

免责声明:文章转载自《kafka 消息队列》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇torch 深度学习(5)高精地图:激光雷达点云与高精地图融合下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

vue props 用法(转载)

前面的话   组件接受的选项大部分与Vue实例一样,而选项props是组件中非常重要的一个选项。在 Vue 中,父子组件的关系可以总结为 props down, events up。父组件通过 props 向下传递数据给子组件,子组件通过 events 给父组件发送消息。本文将详细介绍Vue组件选项props 父子级组件   在介绍props之前,先介绍...

迷你MVVM框架 avalonjs 入门教程

新官网请不要无视这里,这里都是链接,可以点的 OniUI组件库 学习教程 视频教程: 地址1 地址2 关于AvalonJs 开始的例子 扫描 视图模型 数据模型 绑定 作用域绑定(ms-controller, ms-important) 忽略扫描绑定(ms-skip) 模板绑定(ms-include) 数据填充(ms-text, ms-htm...

消息中间件(一)MQ详解及四大MQ比较

一、消息中间件相关知识 1、概述 消息队列已经逐渐成为企业IT系统内部通信的核心手段。它具有低耦合、可靠投递、广播、流量控制、最终一致性等一系列功能,成为异步RPC的主要手段之一。当今市面上有很多主流的消息中间件,如老牌的ActiveMQ、RabbitMQ,炙手可热的Kafka,阿里巴巴自主开发RocketMQ等。 2、消息中间件的组成       2....

安装 Accumulo——突破自己,就是成长

前言 在我刚开始接触分布式集群的时候,是自己在几台虚拟机中手动安装的 Hadoop 和 Spark ,所以当时对 Hadoop 的配置有个简单的印象 ,但是后面发现了 Cloudera 和 Ambari 之后(两个分布式集群自动管理工具),就再没有手动安装过。这就导致我用了很久的 Accumulo 却从未手动安装过,使用 Cloudera 安装导致我根本没...

delphi中时间控制

用TTimer的思路有点问题。 请参考以下思路:   窗体建立时,记录GetTickCount值(关于GetTickCount,请Google),然后,捕捉鼠标键盘消息,如有发送到本窗体的鼠标键盘消息,则重 新记录GetTickCount值,如无,则计算当前GetTickCount值减去原值是否大于规定时间,如大于则Close。 例子如下: //思路是这样...

Kafka消费者-从Kafka读取数据

(1)Customer和Customer Group (1)两种常用的消息模型 队列模型(queuing)和发布-订阅模型(publish-subscribe)。 队列的处理方式是一组消费者从服务器读取消息,一条消息只由其中的一个消费者来处理。 发布-订阅模型中,消息被广播给所有的消费者,接收到消息的消费者都可以处理此消息。 (2)Kafka的消费者和消...