Kafka-如何保证生产者的可靠性

摘要:
Kafka-如何保证生产者的可靠性即使我们尽可能把broker配置的很可靠,但如果没有对生产者进行可靠性方面的配置,整个系统仍然有可能出现突发性的数据丢失。假设现在往kafka发送消息,分区的首领刚好崩溃,新的首领正在选举中,kafka会向生产者返回“首领不可用”的响应。这算不上是broker的可靠性问题,因为broker并没有收到这个消息。如果broker返回的错误可以通过重试来解决,那么生产者会自动处理这些错误。在这种情况下,broker会收到两个相同的消息。

Kafka-如何保证生产者的可靠性

即使我们尽可能把broker配置的很可靠,但如果没有对生产者进行可靠性方面的配置,整个系统仍然有可能出现突发性的数据丢失。

举例:

  • broker配置了3个副本,并且禁用了不完全首领选举,这样应该可以保证万无一失。我们把生产者发送消息的acks设为1(只要首领接收到消息就可以认为消息写入成功)。生产者发送一个消息给首领,首领成功写入,但跟随者副本还没有接收到这个消息。首领向生产者发送了一个响应,告诉它消息写入成功,然后它崩溃了,而此时消息还没有被其它副本复制过去。另外两个副本此时仍然被认为是同步的(毕竟判定一个副本不同步需要一小段时间),而且其中的一个副本成了新的首领。因为消息还没有被写入这个副本,所以就丢失了,但发送消息的客户端却认为消息已成功写入。因为消费者看不到丢失的消息,所以此时的系统仍然是一致的(因为副本没有收到这个消息,所以消息不算已提交),但从生产者角度来看,它丢失了一个消息。
  • broker配置了3个副本,并且禁用了不完全首领选举。我们接受了之前的教训,把生产者的acks设为all。假设现在往kafka发送消息,分区的首领刚好崩溃,新的首领正在选举中,kafka会向生产者返回首领不可用的响应。在这个时候,如果生产者没能正确处理这个错误,也没有重试发送消息直到发送成功,那么消息也有可能丢失。这算不上是broker的可靠性问题,因为broker并没有收到这个消息。这也是不一致性问题,因为消费者并没有读到这个消息。问题在于如果生产者没能正确处理这些错误,弄丢消息的是它们自己。

从上面的例子可以看出,开发人员需要注意两件事情:

  • 根据可靠性需求配置恰当的acks
  • 在参数配置和代码里正确处理错误。

生产者发送确认

生产者可以选择一下三种不同的确认模式:

  • acks=0意味着如果生产者能够通过网络把消息发送出去,那么就认为消息已成功写入kafka。这种情况下还是有可能发生错误,比如发送的对象无法被序列化或者网卡发生故障,但如果是分区离线或整个集群长时间不可用,那就不会收到任何错误。即使是在发生完全首领选举的情况下,这种模式仍然会丢失消息,因为在新首领选举过程中它并不知道首领已经不可用了。在acks=0模式下的运行速度是非常快的(这就是为什么很多基准测试都是基于这个模式),可以得到惊人的吞吐量和带宽利用率,不过如果选择了这种模式,一定会丢失一些消息。
  • acks=1意味着首领在收到消息并把它写入到分区数据文件(不一定同步到磁盘上)时会返回确认或错误响应。在这个模式下,如果发生正常的首领选举,生产者会在选举时收到一个LeaderNotAvailableException异常,如果生产者能恰当地处理这个错误,它会重试发送消息,最终消息会安全到达新的首领那里。不过在这个模式下仍然有可能丢失数据,比如消息已经成功写入首领,但在消息被复制到跟随者副本之前首领发生崩溃。
  • acks=all意味着首领在返回确认或错误响应之前,会等待所有同步副本都收到消息。如果和min.insync.replicas参数结合起来,就可以决定在返回确认前至少有多少个副本能够收到消息。这是最保险的做法--生产者会一直重试直到消息被成功提交。不过这也是最慢的做法,生产者在继续发送其他消息之前需要等待所有副本都收到当前的消息。可以通过使用异步模式和更大的批次来加快速度,但这样做通常会降低吞吐量。

配置生产者的重试参数

生产者需要处理的错误包括两部分:一部分是生产者可以自动处理的错误,还有一部分是需要开发者手动处理的错误。

如果broker返回的错误可以通过重试来解决,那么生产者会自动处理这些错误。生产者向broker发送消息时,broker可以返回一个成功响应码或一个失败响应码。

错误响应码可以分为两种:

  • 可重试错误:重试之后可以解决的。如果broker返回的是LEADER_NOT_AVAILABLE错误,生产者可以尝试重新发送消息,也许这个时候一个新的首领被选举出来了,那么这次发送就成功了。
  • 不可重试错误:无法通过重试解决的。如果broker返回的是INVALID_CONFIG错误,即使通过重试也无法改变配置选项,所以这样的重试是没有意义的。

一般情况下,如果目标是不丢失任何消息,那么最好让生产者在遇到可重试错误时能够保持重试。因为像首领选举或网络连接这类问题都可以在几秒之内得到解决,如果生产者保持重试,开发者就不需要额外去处理这些问题了。

重试发送一个已经失败的消息会带来一些风险,如果两个消息都写入成功,会导致消息重复。例如,生产者因为网络问题没有收到broker的确认,但实际上消息已经写入成功,生产者会认为网络出现了临时故障,就重试发送该消息(因为它不知道消息已经写入乘公共)。在这种情况下,broker会收到两个相同的消息。重试和恰当地错误处理可以保证每个消息至少被保存一次。现实中的很多应用程序在消息里加入唯一标识符,用于检测重复消息,消费者在读取消息时可以对它们进行清理。还要一些应用程序可以做到消息的幂等,也就是说,即使出现了重复消息,也不会对处理结果的正确性造成负面影响。

免责声明:文章转载自《Kafka-如何保证生产者的可靠性》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇免费实用的录屏工具!支持全屏、特定窗口、选定区域录制,支持添加水印、嵌入摄像头!TypeScript 中slice(-1)是什么意思?下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

RabbitMQ 延迟队列实现订单支付结果异步阶梯性通知

在第三方支付中,例如支付宝、或者微信,对于订单请求,第三方支付系统采用的是消息同步返回、异步通知+主动补偿查询的补偿机制。 由于互联网通信的不可靠性,例如双方网络、服务器、应用等因素的影响,不管是同步返回、异步通知、主动查询报文都可能出现超时无响应、报文丢失等情况,所以像支付业务,对结果的通知一般采用几种方案结合的补偿机制,不能完全依赖某一种机制。...

使用云开发数据库构建更生动的小程序

长连接服务被广泛应用在消息提醒、即时通讯、推送、直播弹幕、游戏等场景。本篇文章将介绍云开发数据库的长连接服务 - 实时数据推送,使用它来构建更生动的小程序。 什么是实时数据推送 通过云开发数据库的实时数据推送能力,小程序端可实时监听数据库变更,即它支持根据开发者给定的查询语句进行监听,每当查询语句的结果发生变化时,小程序端就会收到包含更新内容的推送,并对实...

Linux 进程间通信(一)

Linux 进程间通信   进程是一个独立的资源分配单位,不同进程之间的资源是相互独立的,没有关联,不能在一个进程中直接访问另一个进程中的资源。但是,进程不是孤立的,不同的进程之间需要信息的交换以及状态的传递,因此需要进程间数据传递、同步与异步的机制。 分类 统一主机间进程通信 Unix进程间通信方式 无名通道 有名通道 信号 System V进...

【--RocketMQ--】RocketMQ实现事务消息

在RocketMQ4.3.0版本后,开放了事务消息这一特性,对于分布式事务而言,最常说的还是二阶段提交协议,那么RocketMQ的事务消息又是怎么一回事呢,这里主要带着以下几个问题来探究一下RocketMQ的事务消息:   事务消息是如何实现的  我们有哪些手段来监控事务消息的状态  事务消息的异常恢复机制  RocketMQ的事务消息是如何实现的 Roc...

1-rocketmq简介-部署

简介 基于java开发,高可用 应用场景 1、应用解耦 2、流量销峰 3、异步处理 4、消息分发(邮件、短信、日志、数据处理) 延时队列场景:需要延时单次延迟执行的场景,比如订单取消 常见问题 1、如何保证高可用 集群部署 2、如何保证消息不丢失(消息的可靠性传输) 生产者丢失数据 开启生产者确认模式,确认发送成功了才对消费者可见 消息队列丢失数据...

Jenkins Generic Webhook Trigger+gitlab设置触发器

在生产环境中因为代码仓库迁移导致Jenkins设置的触发器失效,在调试的过程gitlab触发事件响应状态码为200,但是响应消息一直为{"status":"ok","data":{..."triggered":false,"url":""}}}} 。 此篇文章的描述主要针对该问题,且面向对Jenkins和Gitlab有一定经验的小伙伴。 文章标签: Je...