kafka之生产者

摘要:
如果消息成功写入kafka,返回的对象将包含主题、分区信息和分区中记录的偏移量。要创建生产者并将消息写入kafka,首先创建生产者对象并设置一些属性。但是,建议提供至少两个代理的信息,以防止一个代理宕机,并且生产者仍然可以连接到集群。超时。毫秒,请求超时。ms和元数据。取来超时。mstimeout。ms指定代理等待来自同步副本的消息确认的时间,这与任务的配置相匹配。最大要求。size此参数用于控制生产者发送的请求大小。
生产者生产消息概述
  • 从创建一个ProducerRecord对象开始,此对象包含目标主题和要发送的内容,还可以指定键和分区。
  • 在发送ProducerRecord对象时,生产者首先要把键和值对象序列化成字节数组,这样才可以在网络上传输。
  • 数据传送给分区器,如果数据指定了分区则使用此分区;如果没有指定分区,则通过ProducerRecord的键通过一定的算法来选择一个分区。这样生产者就知道将此消息往哪个主题的哪个分区发送了。
  • 这条消息会被添加到一个记录批次里,这个批次里的所有消息会被发送到相同主题的同一分区。此时的消息是被放到一个缓冲区,然后有一个独立的线程负责把这些记录发送到相应的broker上。
  • broker在收到这些消息时会返回一个响应。如果消息成功写入kafka,在返回的对象中会包含主题,分区信息以及记录在分区里的偏移量。如果写入消息失败,这返回错误,会根据错误类型和我们的配置,自动选择重试还是直接抛出异常。
创建生产者

要往kafka写入消息,首先要创建一个生产者对象,并设置一些属性。其中有三个属性是必选的:

bootstrap.servers:该属性指定broker的地址清单,地址格式:host:port,如果有多个地址用逗号隔开。清单里不需要包含所有的broker地址,生产者会从给定的broker里查找到其他的broker信息。不过建议至少要提供两个broker的信息,防止一个宕机了,生产者仍然可以连接到集群。

key.serializer:broker希望接收到的消息的键和值都是字节数组。生产者接口允许使用参数化类型,因此可以把java对象作为键和值发送给broker。不过生产者需要知道如何把这些java对象转换成字节数组。生产者就是根据此参数配置额值来把键转换成字节数组的。

value.serializer:与key.serializer类似,value.serializer指定的类会将值序列化。

发送消息方式

发送并忘记

意义:把消息发送给服务器,但是并不关心它是否正常到达。

同步发送

意义:我们使用send()方法发送消息,它会返回一个future对象,调用get()方法进行等待,就可以知道消息是否发送成功。

如果服务器返回错误,get()方法会抛出异常。

如果没有发生错误,我们会得到一个RecordMetadata对象,可以用它获取消息的偏移量。

如果发送数据之前或者发送过程中发生了任何错误,比如broker返回了一个不允许重试的异常或者已经超过了重试次数,那么就会抛出异常。我们只是简单的把异常信息打印出来。

异步发送

意义:我们使用send()方法发送消息,并指定一个回调函数,服务器在返回响应时调用该函数。

生产者配置

acks

执行必须要有多少个分区副本收到消息,生产者才会认为消息写入是成功的。这个参数对于消息丢失的可能性有重要影响

0:生产者在成功写入消息之前不会等待任何来自服务器的响应

1:只要集群的首领节点收到消息,生产者就会收到一个来自服务器的成功响应。这种方式有风险,如果首领节点服务器崩溃,一个没有收到消息的节点被选举为了新首领,消息就会丢失。

all:只有当所有的同步副本全部接收到消息时,生产者才会收到一个来自服务器的成功响应。这种模式是最安全的,它可以保证不止一个服务器收到消息,就算有服务器崩溃,整个集群仍然可以运行。不过它的延迟也最高,因为要等待不只一个服务器节点接收到消息。

buffer.memory

该参数用来设置生产者内存缓冲区的大小,生产者用它缓冲将要发送到服务器的消息。

如果应用程序发送消息的速度超过了发送到服务器的速度,就会导致生产者空间不足。这个时候send()方法调用要么阻塞,要么抛出异常,取决于如何设置max.block.ms参数,标识在抛出异常之前可以阻塞一段时间。

compression.type

它执行了消息被发送服务器之前使用何种压缩方式。默认情况下,消息发送是不压缩的,该参数可以设置为:snappy,gzip和lz4,

retries

此参数的值决定了生产者可以重发消息的次数,如果达到了这个次数,生产者会放弃重试并返回错误。默认情况下,生产者会在每次重试之间等待100ms,不过可以通过retry.backoff.ms来设置。

batch.size

该参数正定了一个批次可以使用的内存的大小,按照字节数计算(而不是消息个数)。当批次被填满,批次里的所有消息会被发送出去。

linger.ms

该参数制定了生产者在发送批次之前等待更多消息加入批次的时间

client.id

该参数可以是任意字符窜,服务器会用它来识别消息的来源,还可以用在日志和配额指标里。

max.in.flight.requests.per.connection

该参数指定了生产者在收到服务器响应之前可以发送多个消息。它的值越高,就会占用越多的内存,不过也会提升吞吐量。把它设置成1可以保证消息是按照发送的顺序写入服务器的,即使发生了重试。

timeout.ms,request.timeout.ms和metadata.fetch.timeout.ms

timeout.ms指定了broker等待同步副本返回消息确认的时间,与asks的配置相匹配。如果在指定的时间内没有收到同步副本的确认,这broker就会返回一个错误。

request.timeout.ms指定了生产者在 发送数据时等待服务器返回响应时间

metadata.fetch.timeout.ms指定了生产者在获取元数据(比如目标分区的首领是谁)时等待服务器返回响应的时间。

max.block.ms

该参数指定了再调用seed()方法或者使用partitionsFor()方法获取元数据时生产者的阻塞时间。

max.request.size

该参数用于控制生产者发送的请求大小。他可以指能发送的单个消息的最大值,也可以指单个请求里所有消息总的大小。注意此值与broker配置中可接收消息的最大值(message.max.bytes)相匹配,避免被broker拒绝。

例如:假设此值为1MB,那么可以发送的单个最大消息为1MB,或者生产者可以在单个请求里发送一个批次,该批次包含了1000个消息,每个消息的大小为1kb。

免责声明:文章转载自《kafka之生产者》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇普通Java类获取spring 容器的bean的5种方法财务人员如何学习SAP下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

RPC

背景:公司提供给第三方的数据传输接口一直是以Hessian的协议进行发布的,但是由于交通车辆通行数据量较大,导致第三方反应出现数据延迟的情况或者连接超时的情况,所以需要更换Hessian,换成性能更高的Thrift协议 区别: Hessian  Thrift 优点 1、简单易用,面向接口,通过接口暴露服务,jar包只有200、300k,不需要配置...

消息中间件(一)MQ详解及四大MQ比较

一、消息中间件相关知识 1、概述 消息队列已经逐渐成为企业IT系统内部通信的核心手段。它具有低耦合、可靠投递、广播、流量控制、最终一致性等一系列功能,成为异步RPC的主要手段之一。当今市面上有很多主流的消息中间件,如老牌的ActiveMQ、RabbitMQ,炙手可热的Kafka,阿里巴巴自主开发RocketMQ等。 2、消息中间件的组成       2....

netty作为基础通信组件

阿里分布式服务框架 Dubbo 的 RPC 框架使用 Dubbo 协议进行节点间通信,Dubbo 协议默认使用 Netty 作为基础通信组件,用于实现各进程节点之间的内部通信。其中,服务提供者和服务消费者之间,服务提供者、服务消费者和性能统计节点之间使用 Netty 进行异步/同步通信。     除了 Dubbo 之外,淘宝的消息中间件 RocketMQ...

wcf通道Channel

正文       客户端与服务进行交互的过程是通过通道进行交互的。客户端通过调用代理类执行相应的方法,通过通道编码,调用上下文,传输客户端的事务,管理可靠会话,对消息正文的加密,最后要执行的通道是传输通道就像我们七层的最后一层是物理传输层与服务端的那一头的传输通道交接。服务端拿到以后会逐个拆包,然后交给分发器,分发器交给对应的服务处理。         ...

一、在 ASP.NET Core 中使用 SignalR

一、介绍 SignalR是一个用于实现实时网站的 Microsoft .NET 库。它使用多种技术来实现服务器与客户端间的双向通信,服务器可以随时将消息推送到连接的客户端。 https://docs.microsoft.com/zh-cn/aspnet/core/tutorials/signalr?tabs=visual-studio&view=a...

人工智能-有限状态机(FSM)的学习

首先声明:此文源于本人最近学习的一本书 《游戏人工智能编程案例精粹》 FSM的定义: 一个有限状态机是一个设备,或是一个设备模型,具有有限数量的状态,它可以在任何给定的时间根据输入进行操作,使得从一个状态变换到另一个状态,或者是促使一个输出或者一种行为的发生。一个有限状态机在任何瞬间只能处在一种状态。 FSM的实现: 不要用if else语句或者switc...