Kafka:生产者

摘要:
客户端为我们的开发和使用实现了各种序列化程序。PublicinterfaceSerializer<T>扩展了可关闭{/**配置该类。*@paramconfigsconfigsinkey/valuepairs*@paramisKeywhetheriskeyor value*///用于配置当前类void configure(Map<String,?>configs,boolean isKey);/***将{@codedata}转换为abyterray。**@paramtopictopicassociatedwithdata*@paramdatatypeddata*@returnserializedbytes*//序列化操作byte[]serialize/***关闭此序列化程序生产者和消费者使用的序列化程序需要一一对应。如果未指定,则需要依赖分区根据键字段计算分区值。分区的作用是为消息分配分区。

Kafka java客户端数据生产流程解析

image-20210205141137316

ProducerRecord

ProducerRecord 含义: 发送给Kafka Broker的key/value 值对

//ProducerRecord的成员变量
public class ProducerRecord<K, V> {

    private final String topic;//主题
    private final Integer partition;//分区号
    private final Headers headers;//消息头
    private final K key;//键
    private final V value;//值
    private final Long timestamp;//消息时间戳

headers:可以设定一些与应用相关的信息

KafkaProducer

KafkaProducer是线程安全的,可以在多个线程中共享单个KafkaProducer实例,也可以将它实例进行池化来统一管理。

KafkaProducer的参数众多,bootstrap.servers,retries,key.serializer等,开发人员很难记住所有的配置,也很容易写错,所以可以用org.apache.kafka.clients.producer.ProducerConfig类预防:

        Properties properties = new Properties();
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.RETRIES_CONFIG, "10");
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.1.51:9092");
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

消息的发送

发后即忘

producer.send(record);

上述这种方式就是发后即忘,它只往kafka中发送消息并不关心消息是否正确到达,可能会造成消息丢失,发送性能最高,但是可靠性最差。

同步

    @Override
    public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
        return send(record, null);
    }

因为直接send返回值是Future,我们知道Future.get()会阻塞线程直至线程运行结果返回,通过此方法即可达到同步目的。

            try {
                producer.send(record).get();
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }

Future中可以获取一个RecordMetadata对象,包含了消息大的一些元数据信息,比如主题、分区号、分区中的偏移量(offset)、时间戳等。

此外Future.get(long timeout, TimeUnit unit)可以实现可超时的阻塞,

异步

producer.send(record, (metadata, exception) -> {
    System.out.println(metadata.topic());
});

当kafka有响应时,会触发回调方法

序列化器

生产者需要用序列化器把对象转换成字节数组才能通过网络发送给kafka。而在消费者需要用反序列化器把字节数组转成相应对象。

序列化器需要实现Serializer接口。客户端实现了多种序列化器供我们开发使用。

public interface Serializer<T> extends Closeable {

    /**
     * Configure this class.
     * @param configs configs in key/value pairs
     * @param isKey whether is for key or value
     */
    //用来配置当前类
    void configure(Map<String, ?> configs, boolean isKey);

    /**
     * Convert {@code data} into a byte array.
     *
     * @param topic topic associated with data
     * @param data typed data
     * @return serialized bytes
     */
    //序列化操作
    byte[] serialize(String topic, T data);

    /**
     * Close this serializer.
     *
     * This method must be idempotent as it may be called multiple times.
     */
    @Override
    //关闭当前序列化器
    void close();
}

生产者和消费者使用的序列化器是需要一一对应的。例如生产者shencghan端使用了StringSerializer,那么消费者端需要使用StringDeserializer。

分区器

消息在通过send()方法发往broker过程中,有可能经过拦截器、序列化器、分区器的一系列作用后才能真正发送到broker。拦截器一般不是必需的,而序列化器时必需的。消息经过序列化后就需要确定它发往的分区。

如果ProducerRecord 中指定了partition字段,那么就不需要分区器的作用。如果没有指定,那么就需要依赖分区器,根据key这个字段来计算partition的值。分区器的作用就是为消息分配分区。

kafka默认分区器是DefaultPartitioner,实现了Partitioner接口

public interface Partitioner extends Configurable, Closeable {

    /**
     * Compute the partition for the given record.
     *
     * @param topic The topic name 主题名
     * @param key The key to partition on (or null if no key) 键
     * @param keyBytes The serialized key to partition on( or null if no key) 序列化后的键
     * @param value The value to partition on or null 值
     * @param valueBytes The serialized value to partition on or null 序列化后的值
     * @param cluster The current cluster metadata 集群元数据信息
     */
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);

    /**
     * This is called when partitioner is closed.
     */
    public void close();

}

Partitioner的父接口Configurable

public interface Configurable {

    /**
     * Configure this class with the given key-value pairs
     */
    //该方法主要用来获取配置信息以及配置初始化数据
    void configure(Map<String, ?> configs);

}

在默认DefaultPartitioner中的分区计算方法:

image-20210205160014692

我们也可以自定义分区器,然后配置在properties:

properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"....");

生产者拦截器

实现生产者拦截器只需要实现ProducerInterceptor接口

public interface ProducerInterceptor<K, V> extends Configurable {

    public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);


    public void onAcknowledgement(RecordMetadata metadata, Exception exception);


    public void close();
}

onSend方法调用时机:KafkaProducer在将消息序列化和计算分区之前会调用拦截器的onSend()方法来对消息进行相应的定制化操作,一般来说不要修改ProducerRecord 的topic、key、partition信息。因为可能会影响分区的计算,同样会影响broker端日志压缩。

onAcknowledgement方法调用时机:该方法会在消息被应答之前或消息发送失败时调用,优先于用户设定的Callback之前执行。

close:主要用于在关闭拦截器时执行一些资源的清理工作

自定义生产者拦截器:

public class MyProducerInterceptor implements ProducerInterceptor {
    @Override
    public ProducerRecord onSend(ProducerRecord record) {
        System.out.println("onSend");
        return record;
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        System.out.println("onAcknowledgement");
    }

    @Override
    public void close() {
        System.out.println("close");
    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}
//设置properties参数
properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, MyProducerInterceptor.class.getName());

image-20210205161251160

此外可以指定多个拦截器,形成拦截器链,拦截器链会按照配置的拦截器顺序来一一执行(各个拦截器之间用逗号隔开)

properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, MyProducerInterceptor.class.getName()+","+MyProducerInterceptor2.class.getName());

原理分析

image-20210205162308652

整个生产者客户端由两个线程协调运行,分别为主线程和Sender线程(发送线程)

RecordAccumulator:消息累加器,主要用来缓存消息以便Sender线程可以批量发送。RecordAccumulator缓存大小可以通过生产者客户端参数buffer.memory配置,默认32MB.如果生产者发送消息速度超过发送到服务器速度,会导致生产者空间不足,这时候,send方法会要么被阻塞要么抛出异常,取决于max.block.ms配置,默认60000,即60s。

主线程发送过来的消息会被追加到RecordAccumulator的某个双端队列中Deque,即Deque<ProducerBatch>。ProducerBatch中包含一个至多个ProducerRecord。将较小的ProducerRecord拼凑成较大的ProducerBatch,可以减少网络请求的次数以提升整体的吞吐量。ProducerBatch的大小跟batch.size有关。

Sender从RecordAccumulator中获取到缓存的消息之后,会将原本<分区,Deque<ProducerBatch>>的保存形式转换为<Node,List<ProducerBatch>>,Node表示Kafka的broker节点。

转换为<Node,List<ProducerBatch>>之后,会进一步<Node,Request>的形式,这样就可以将Request发往各个Node了。Request表示Kafka的各种协议请求。

请求在发往kafka之前还会保存到InFlightRequests中,它的具体形式为Map<NodeId,Deque<Request>>,它的作用是缓存了已经发出去但还没有收到相应的请求。配置参数:max.in.flight.requests.per.connection,默认值是5,即每个连接最多只能缓存5个未响应的请求,超过该数目就不能向这个连接发送更多请求了。

重要的生产者参数

以下仅介绍部分参数,还有一些高级功能(事务、幂等)这里不做介绍。

acks

这个参数用来指定分区中必须要有多少个副本收到这条消息,之后生产者者才会认为这条消息是成功写入的。

acks的值都是字符串类型的。

acks=1,默认值,生产者发送消息之后,只要分区的leader副本成功写入消息,那么它就会收到来自服务端的成功响应。acks设置为1,是消息可靠性和吞吐量之间的这种方案。

acks=0,生产者发送消息之后不需要等待任何服务端的响应。

acks=-1或acks=all,生产者在消息发送之后,需要等待ISR中的所有副本都成功写入消息之后才能够来自服务端的成功响应。acks=-1可以达到最强的可靠性。

max.request.size

这个参数用来限制生产者客户端能发送的消息的最大值,默认1MB。这个参数在对kafka整体脉络没有把控的时候,不建议修改,因为这个参数还涉及到其他参数的修改。比如broker端的message.max.bytes参数。

retries和retry.backoff.ms

retries参数用来配置生产者重试的次数,默认值是0

retry.backoff.ms用来设置两次重试之间的时间间隔,避免无效的频繁重试,默认值是100ms。

connections.max.ide.ms

这个参数用来指定在多久之后关闭闲置的连接,默认是9分钟

linger.mx

指定生产者发送ProducerBatch之前等待更多消息加入ProducerBatch的时间,默认为0.

request.timeout.ms

用来配置Producer等待请求响应的最长时间,默认时间3000ms。请求超时后可以选择重试。

receive.buffer.bytes

设置socket接收消息缓冲区(SO_RECBUF)的大小,默认值32768B,即32KB,如果设置为-1,则为操作系统默认值。

send.buffer.bytes

设置socket发送消息缓冲区(SO_RECBUF)的大小,默认值131072B,即128KB,如果设置为-1,则为操作系统默认值。

免责声明:文章转载自《Kafka:生产者》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇WebApi:使用方法名或者控制器名作为接口地址CentOS7安装openjdk、tomcat和mysql流程介绍下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

Go并发

  进程、线程、协程     进程:进程是操作系统资源分配的最小单位       进程有自己的虚拟地址空间,这个空间包括了各种资源,例如堆、栈,各种段,它们其实都是虚拟地址空间的一块区域。所以说进程是资源分配的最小单位。     线程:线程是操作系统任务调度和执行的最小单位。       线程包含在进程之中,是进程中实际运作单位     协程:线程中协作式...

Jenkins Generic Webhook Trigger+gitlab设置触发器

在生产环境中因为代码仓库迁移导致Jenkins设置的触发器失效,在调试的过程gitlab触发事件响应状态码为200,但是响应消息一直为{"status":"ok","data":{..."triggered":false,"url":""}}}} 。 此篇文章的描述主要针对该问题,且面向对Jenkins和Gitlab有一定经验的小伙伴。 文章标签: Je...

ThreadPool.QueueUserWorkItem的性能问题

在WEB开发中,为了减少页面等待时间提高用户体验,我们往往会把一些浪费时间的操作放到新线程中在后台运行。 简单的实现代码就是: //代码一 new Thread(()=>{ //do something }).Start();   但是对于一个请求量大的网址这样做是很不现实的——每一个操作都要开启一个新线程,最终会因CPU不堪重负而...

【转】dbx用法讲解

http://blog.chinaunix.net/uid-25544300-id-328735.html dbx 命令 用途 提供了一个调试和运行程序的环境。 语法 dbx [ -a ProcessID ] [ -c CommandFile ] [ -d NestingDepth ] [ -I Directory ] [ -E DebugEnvironm...

[转]C# Invoke的使用方法

在多线程编程中,我们经常要在工作线程中去更新界面显示,而在多线程中直接调用界面控件的方法是错误的做法,Invoke 和 BeginInvoke 就是为了解决这个问题而出现的,使你在多线程中安全的更新界面显示。 正确的做法是将工作线程中涉及更新界面的代码封装为一个方法,通过 Invoke 或者 BeginInvoke 去调用,两者的区别就是一个导致工作线程等...

netty5客户端监测服务端断连后重连

  服务端挂了或者主动拒绝客户端的连接后,客户端不死心,每15秒重连试试,3次都不行就算了。修改下之前的客户端引导类(NettyClient,参见netty5心跳与业务消息分发实例),新增两个成员变量,在connect连接方法里的finally加入重连操作: private ScheduledExecutorService executorServ...