kafka2.5.0生产者与消费者配置详解

摘要:
CONFIG=(newConfigDef())。定义

1)引入maven依赖

我这里使用的是springboot 2.1.3.RELEASE 版本:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

会引入一对的kafka包:

kafka2.5.0生产者与消费者配置详解第1张

 2)生产者配置:

所有配置参考kafka-clients-2.5.0.jar包里的org.apache.kafka.clients.producer.ProducerConfig类,并且在该类中可以查看所有配置项的默认值: CONFIG = (new ConfigDef()).define(  这里的define方法的第三个参数就是默认值

application.properties里可以这样配置:

#####################  重要配置  ######################
spring.kafka.producer.bootstrap.servers=192.168.2.60:9092,192.168.2.62:9092
spring.kafka.producer.key.serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value.serializer=org.apache.kafka.common.serialization.StringSerializer
# acks=0  如果设置为0,生产者将不等待任何来自服务器的确认。每个记录返回的偏移量将始终设置为-1。
# acks=1  这意味着leader确认消息即可,但不等待所有副本的完全确认的情况下进行响应。在这种情况下,如果leader在确认记录后立即失败,但是在副本复制它之前,那么记录将丢失。
# acks=all  不仅需要leader确认收到消息,还将等待全部的副本确认。这保证了只要至少有一个副本保持活动状态,记录就不会丢失。这是最有力的保证。这相当于ack =-1设置。
# acks=-1   跟集群有关
# 默认 1
spring.kafka.producer.acks=1
# 一个批次发送的大小,默认16KB,超过这个大小就会发送数据
spring.kafka.producer.batch.size=16384
# 一个批次最长等待多久就发送数据,默认0,即马上发送
spring.kafka.producer.linger.ms=5000
# 控制生产者最大发送大小,默认 1MB。这个值必须小于kafka服务器server.properties配置文件里的最大可接收数据大小配置:socket.request.max.bytes=104857600 (默认104857600 = 100MB)
spring.kafka.producer.max.request.size=1048576

#####################  非重要配置  ######################
# 生产者内存缓冲区大小。默认33554432bytes=32MB
spring.kafka.producer.buffer.memory=33554432
# 发送重试次数,默认 2147483647,接近无限大
spring.kafka.producer.retries=3
# 请求超时时间,默认30秒
spring.kafka.producer.request.timeout.ms=30000
# 默认值5。并发状态下,kafka生产者允许存在最大的kafka服务端未确认接收的消息个数最大值。
# 注意,如果该值设置为1,并且开启重试机制,则会在允许的重试次数内,阻塞其他消息发送到kafka Server端。并且为1的话,会严重影响生产者的吞吐量。仅适用于对数据有严格顺序要求的场景。
spring.kafka.producer.max.in.flight.requests.per.connection=5
# 最大阻塞时间,超过则抛出异常。默认60秒
spring.kafka.max.block.ms=60000
# 数据压缩类型:none、gzip、snappy、lz4、zstd,默认none什么都不做
spring.kafka.compression.type=none
# 客户端在进行发送和消费的时候,会缓存kafka的元数据。默认30秒
spring.kafka.producer.metadata.max.age.ms=30000

  

在springboot框架里,手动封装kafka生产者对象,并@bean对象注入到SpringBoot容器中去:

@Configuration
@EnableKafka
public class KafkaProducerConfig {
    @Value("${kafka.producer.servers}")
    private String servers;
    @Value("${kafka.producer.retries}")
    private int retries;
    @Value("${kafka.producer.batch.size}")
    private int batchSize;
    @Value("${kafka.producer.linger}")
    private int linger;
    @Value("${kafka.producer.buffer.memory}")
    private int bufferMemory;

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ProducerConfig.RETRIES_CONFIG, retries);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
        props.put(ProducerConfig.LINGER_MS_CONFIG, linger);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); // 
        DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory<>(props);
        KafkaTemplate kafkaTemplate
                = new KafkaTemplate<String, String>(factory) ;
        //kafkaTemplate.setProducerListener();
        return kafkaTemplate;
    }
}

key和value可以自定义序列化类,参考《kafka2.5.0自定义数据序列化类

 重要知识:

如果该topic的分区大于1,那么生产者生产的数据存放到哪个分区,完全取决于key值,比如key=A,那么存到分区0,key=B,那么存到分区1,如果key为null,那么负载均衡存储到每个分区!

再均衡监听器:无论分区个数还是消费者个数发生变化,都会触发再均衡,重新分配分区的消费者。如果需要自定义分区,请参考《kafka2.5.0自定义分区器

3)消费者配置:

 所有配置参考kafka-clients-2.5.0.jar包里的org.apache.kafka.clients.consumer.ConsumerConfig类,并且在该类中可以查看所有配置项的默认值: CONFIG = (new ConfigDef()).define(  这里的define方法的第三个参数就是默认值

kafka.consumer.bootstrap-servers=192.168.2.61:9092,192.168.2.61:9093
# 注意:相同的Topic下,相同的群组ID,只有一个消费者能消费到消息
#kafka.consumer.group-id=myGroupId1
# 消费者在读取一个没有偏移量的分区或者偏移量无效的情况下,读取设置。
# latest: (默认)读取最新的,earliest: 读取最早的,none: 如果没有为使用者的组找到偏移量,则consumer抛出异常,anything else: consumer抛出异常
kafka.consumer.auto-offset-reset=latest
# 是否自动提交偏移,默认true。偏移量自己控制,可以有效避免重复读、漏读
kafka.consumer.enable-auto-commit=false
# 自动提交间隔,默认5秒。从开始消费一条数据到业务结束,必须在5秒内完成,否则会造成提前提交偏移量,如果出现事务失败,将会漏掉该条消费
#kafka.consumer.auto.commit.interval.ms=5000

# 把分区分配给消费者的策略。RangeAssignor:默认。采用大部分分区都分配给消费者群组里的群主(即消费者0)的策略。RoundRobinAssignor:采用所有消费者平均分配分区策略
# 注意:无论分区个数变化或者消费者个数变化,都会触发再分配
kafka.consumer.partition-assignment-strategy=org.apache.kafka.clients.consumer.RangeAssignor.class
# 客户端在进行发送和消费的时候,会缓存kafka的元数据。默认30秒
kafka.consumer.metadata-max-age-ms=30000
# consumer最小拉取多大的数据,默认值1,就是立即发送。达不到这个数据就等待。注意:这里不是根据消费数据条数,而是数据大小,这样设计主要避免每个数据之间大小差距过大。
kafka.consumer.fetch.min.bytes=1
# consumer最多等待10秒就消费一次数据,默认500ms
kafka.consumer.fetch.max.wait.ms=10000
# 控制每次poll方法返回的记录数量,默认500。这个配置仅仅作用于手动 poll消费的情况下,在springboot中由于使用 @KafkaListener注解消费所以基本没用
kafka.consumer.max-poll-records=500

在springboot框架里,手动封装kafka生产者对象,并@bean对象注入到SpringBoot容器中去:

 先定义pojo 类:

@Component
@ConfigurationProperties(prefix = "kafka.consumer")
public class KafkaConsumerConfigModel {
    
     // 这里就是一个简单的pojo类,定义application.properties配置文件的kafka.consumer开头的所有字段.
     private String bootstrapServers;
     ......
     
     // getter and setter

}

再定义kafka consumer 工厂类:

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.RangeAssignor;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.support.TopicPartitionOffset;
import org.springframework.util.CollectionUtils;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaConsumerConfig {
    private Logger logger = LoggerFactory.getLogger(KafkaProducerConfig.class);

    @Autowired
    KafkaConsumerConfigModel config;

    @Bean("consumerFactory")
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers());
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, config.getEnableAutoCommit());
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, Arrays.asList( RangeAssignor.class));
        propsMap.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, config.getFetchMinBytes());
        propsMap.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, config.getFetchMaxWaitMs());
        propsMap.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, config.getMetadataMaxAgeMs());
        ConsumerFactory<String, String> factory = new DefaultKafkaConsumerFactory<>(propsMap);
        return factory;
    }

    @Bean("kafkaListenerContainerFactory")
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>>
    getKafkaListenerContainerFactory(
            @Autowired ConsumerFactory<String, String> consumerFactory
    ) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory
                = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        factory.setConcurrency(1);
        factory.getContainerProperties().setPollTimeout(1500);
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
//        factory.createContainer( new TopicPartitionOffset(Constant.TOPIC, 0));
        return factory;
    }
}

最后kafka consumer消费者长这样:

import com.joyce.kafka.Constant;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

@Component
public class KafkaConsumer {

    private Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);

    // 相同的groupId的消费者只能有一个接收到消息
    @KafkaListener(groupId="mygroup-1",topics = Constant.TOPIC )
    public void listen1(String data) {
        logger.info("消费到消息1: [{}]", data);
    }

    @KafkaListener(groupId="mygroup-2",topics =  Constant.TOPIC)
    public void listen(ConsumerRecord<?, ?> record, Acknowledgment ack) {
        logger.info("消费到消息2: [{}]", record.value());
        logger.info("消费到消息2|"+String.format(
                "主题:%s,分区:%d,偏移量:%d,key:%s,value:%s",
                record.topic(),record.partition(),record.offset(),
                record.key(),record.value()));//提交offset
        ack.acknowledge();
    }

    @KafkaListener(groupId="mygroup-3", topics =  Constant.TOPIC)
    public void test(String data, Acknowledgment ack) { // ConsumerRecord<String, String> record
        logger.info("消费到消息3: [{}]", data);
        //提交offset
        ack.acknowledge();
    }

}

end.

免责声明:文章转载自《kafka2.5.0生产者与消费者配置详解》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇WPF 自定义属性Cisco N7K VDC 配置步骤 (精简版)下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

一、Flume简明笔记

目录 一、Flume概述 1.1 Flume定义 1.2 Flume基础机构 二、Flume快速入门 2.1 安装部署 2.2 入门案例 2.2.1 监控端口数据官方案例:netcat-flume-logger.conf 2.2.2 实时监控单个追加文件案例:exec-flume-hdfs.conf 2.2.3 实时监控目录下多个新文件:s...

mysql状态查看 QPS/TPS/缓存命中率查看

运行中的mysql状态查看   对正在运行的mysql进行监控,其中一个方式就是查看mysql运行状态。    (1)QPS(每秒Query量)  QPS = Questions(or Queries) / seconds  mysql > show  global  status like 'Question%';    (2)TPS(每秒事务量)...

python学习笔记(九)内置函数

1 print(all([1,2,3,4]))#判断可迭代的对象里面的值是否都为真 True 2 print(any([0,1,2,3,4]))#判断可迭代的对象里面的值是否有一个为真 True 3 4 print(bin(10))#十进制转二进制 5 ejz=bin(100) #0b1010 6 print(ejz.replace('0...

在微信小程序中绘制图表(part2)

本期大纲 1、确定纵坐标的范围并绘制 2、根据真实数据绘制折线 相关阅读:在微信小程序中绘制图表(part1)在微信小程序中绘制图表(part3) 关注我的 github 项目 查看完整代码。 确定纵坐标的范围并绘制 为了避免纵坐标的刻度出现小数的情况,我们把纵坐标分为5个区块,我们取最小单位刻度为例如10(能够被5整除),当然真实情况会比这复杂,待...

射频识别技术漫谈(13)——Mifare S50与S70【worldisng笔记】

Mifare S50和Mifare S70又常被称为Mifare Standard、Mifare Classic、MF1,是遵守ISO14443A标准的卡片中应用最为广泛、影响力最大的的一员。而Mifare S70的容量是S50的4倍,S50的容量是1K字节,S70的容量为4K字节。读写器对卡片的操作时序和操作命令,二者完全一致。 Mifare S50和M...

禁止、允许MySQL root用户远程访问权限

禁止:1、进入mysql; 2、 mysql> use mysql; 3、执行修改权限语句(禁用); mysql> update user set host = "localhost" where user = "root" and host = "%"; 4、刷新权限; mysql> plush privileges;   启用: 1、...