Kafka之SpringBoot集成Kafka实战

摘要:
在spring应用程序中,如果我们需要订阅kafka消息,我们通常不会直接使用kafka客户端,而是使用更方便的层来封装spring-kafka。在spring-kafka的运行时,将启动两种类型的线程,一种是Consumer线程,另一种是Listener线程。前者用于直接调用kafka客户端的poll()方法以获取消息,后者是调用代码中标记有@KafkaListener注释的方法的线程。如果要使用多个使用者,除了启动多个进程之外,还可以在一个进程中使用多个线程来执行while()循环。

  在spring应用中如果需要订阅kafka消息,通常情况下我们不会直接使用kafka-client, 而是使用更方便的一层封装spring-kafka。
  在spring-kafka在运行时会启动两类线程,一类是Consumer线程,另一类是Listener线程。前者用来直接调用kafka-client的poll()方法获取消息,后者才是调用我们代码中标有@KafkaListener注解方法的线程。如果直接使用kafka-client的话,那么正常的写法是一个while循环,在循环里面调用poll(),然后处理消息,这在kafka broker看来就是一个Consumer。如果想用多个Consumer,除了多启动几个进程以外,也可以在一个进程使用多个线程执行此while()循环。spring-kafka就是这么干的。

1.添加依赖

<dependencies>
    <dependency>
	<groupId>org.springframework.kafka</groupId>
	<artifactId>spring-kafka</artifactId>
	<version>2.5.5.RELEASE</version>
    </dependency>
    <dependency>
    	<groupId>org.springframework.boot</groupId>
    	<artifactId>spring-boot-starter-web</artifactId>
    	<version>2.3.8.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.12</version>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.74</version>
    </dependency>
</dependencies>

2.kafka配置

  springBoot集成kafka,kafka的原生配置可以参考以下源码:

org.apache.kafka.clients.CommonClientConfigs.class
org.apache.kafka.clients.consumer.ConsumerConfig.class
org.apache.kafka.clients.producer.ProducerConfig.class

   application.properties配置如下:

#============== KAFKA START===================
spring.kafka.listener.concurrency=5

spring.kafka.producer.bootstrap.servers=192.168.15.218:9093
spring.kafka.producer.retries= 3
spring.kafka.producer.buffer.memory=33554432
spring.kafka.producer.acks=0
#自定义配置,控制生产者是否发送消息
spring.kafka.producer.enable=false

spring.kafka.consumer.bootstrap.servers=192.168.15.218:9093
spring.kafka.consumer.group.id=kafka-group-ryj
spring.kafka.consumer.enable.auto.commit=true
spring.kafka.consumer.auto.offset.reset=earliest
spring.kafka.consumer.max.poll.records=10
#自定义配置,控制消费者是否监听
spring.kafka.consumer.enable=true
#============== KAFKA END======================

#============== TOPIC START======================
topic.testRecord=topic.testRecord
#============== TOPIC END======================

3.修改启动类,支持kafka注解

@SpringBootApplication()
@EnableKafka
public class KafkaTest {
    public static void main(String[] args) {
        System.out.println("Hello World!");
        SpringApplication.run(KafkaTest.class, args);
    }
}

4.增加kafka配置类,生成生产者、消费者相关信息

@Configuration
public class KafkaConfig {

    @Value("${spring.kafka.producer.bootstrap.servers}")
    private String producerServer;

    @Value("${spring.kafka.producer.retries}")
    private Integer producerRetries;

    @Value("${spring.kafka.producer.buffer.memory}")
    private String producerBufferMemory;

    @Value("${spring.kafka.producer.acks}")
    private String producerAcks;

    @Value("${spring.kafka.consumer.bootstrap.servers}")
    private String consumerServer;

    @Value("${spring.kafka.consumer.enable.auto.commit}")
    private Boolean consumerAutoCommit;

    @Value("${spring.kafka.consumer.group.id}")
    private String consumerGroupId;

    @Value("${spring.kafka.consumer.auto.offset.reset}")
    private String consumerOffsetReset;

    @Value("${spring.kafka.consumer.max.poll.records}")
    private String consumerPollNum;

    /**
     * 生产者配置信息
     */
    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<String, Object>();
        props.put(ProducerConfig.ACKS_CONFIG, producerAcks);// 为0时,生产者不会等待返回消息发送结果
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, producerServer);
        props.put(ProducerConfig.RETRIES_CONFIG, producerRetries);// 发送失败时,重新发送消息次数
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);// 批量发送消息的间隔时间
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, producerBufferMemory);// 生产者缓存消息的内存字节数
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    /**
     * 生产者工厂
     */
    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    /**
     * 生产者模板
     */
    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    /**
     * 消费者配置信息
     */
    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<String, Object>();
        props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);// 消费者组ID
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, consumerOffsetReset);// offser没有初始化或者不存在时默认的配置
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, consumerServer);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, consumerPollNum);// 每次拉取记录的数量
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000);// 用于检测客户端故障的超时时间
        props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000);// 请求响应的超时时间
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }
    
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<String, String>(consumerConfigs());
    }

    @Bean
    public KafkaListenerContainerFactory<?> containerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> container = new ConcurrentKafkaListenerContainerFactory<>();
        container.setConsumerFactory(consumerFactory());
        container.setBatchListener(true);//批量拉取消息,与消费者的接收参数有关
        return container;
    }

    /**
     * KafkaListener 延迟启动监听工厂
     * @return
     */
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> delayContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> container = new ConcurrentKafkaListenerContainerFactory<String, String>();
        container.setConsumerFactory(new DefaultKafkaConsumerFactory<String, Object>(consumerConfigs()));
        // 禁止自动启动
        container.setAutoStartup(false);
        container.setBatchListener(true);
        return container;
    }
}

5.生产者消息发送测试类

@Component
public class KafkaProducer {

    private static Logger logger = LoggerFactory.getLogger(KafkaProducer.class);

    @Autowired
    KafkaTemplate<String, Object> kafkaTemplate;

    public void sendJsonMessageToKafka(String jsonMessage, String topicName) {
        ListenableFuture<SendResult<String, Object>> listenableFuture = kafkaTemplate.send(topicName, jsonMessage);
        listenableFuture
                .addCallback(
                        o -> logger.info("send message to kafka success !!! topicName={}, partition={}, offset={},msg={}", topicName,
                                o.getRecordMetadata().partition(), o.getRecordMetadata().offset(), o.getProducerRecord().value()),
                        throwable -> this.sendMsgFail(throwable, topicName));
    }

    private void sendMsgFail(Throwable throwable, String topicName) {
        logger.error("send message to kafka fail !!! topicName=" + topicName + " error " + throwable.getMessage());
    }
}
@Component
@Order(100)
public class KafkaTestRunner implements ApplicationRunner {

    @Value("${topic.testRecord}")
    private String topicName;
    
    @Value("${spring.kafka.producer.enable}")
    private Boolean producerEnable;

    @Autowired
    KafkaProducer kafkaProducer;

    @Override
    public void run(ApplicationArguments args) {
        if(producerEnable) {
            new Thread() {
                @Override
                public void run() {
                    sendMessage();
                }
                
            }.start();
        }
    }

    private void sendMessage() {
        for (Integer i = 0; i < 1000; i++) {
            kafkaProducer.sendJsonMessageToKafka(JSON.toJSONString(new Digit(i)), topicName);
        }
    }
}

@Data
class Digit {

    Integer i;

    public Digit(Integer i) {
        super();
        this.i = i;
    }
    
}

6.消费者批量消费测试类

@Component
@SpringBootConfiguration
public class KafkaConsumerTest {

    private static Logger logger = LoggerFactory.getLogger(KafkaConsumerTest.class);
    //如果想立即消费,可以更换containerFactory
    @KafkaListener(id = "delayConsumer",topics = "${topic.testRecord}", containerFactory = "delayContainerFactory", groupId = "${spring.kafka.consumer.group.id}")
    //批量时不能用Object作为参数,否则会报错
    public void delayConsumer(List<ConsumerRecord<String, String>> message) {
        try {
            message.forEach(record -> {
                logger.info("delayConsumer consumer success.partition={}, offset={},msg={}",record.partition(),record.offset(),record.value());
            });
        } catch (Exception e) {
            logger.error("delayConsumer error.",e);
        }
    }
}
@Component
@Order(10)
public class KafkaDelayConsumerRunner implements ApplicationRunner {

    @Autowired
    private KafkaListenerEndpointRegistry registry;
    
    @Value("${spring.kafka.consumer.enable}")
    private Boolean consumerEnable;

    @Override
    public void run(ApplicationArguments args) {
        if (consumerEnable) {
            //唤醒延迟启动的kafka消费者
            registry.getListenerContainer("delayConsumer").start();
        }
    }
}

免责声明:文章转载自《Kafka之SpringBoot集成Kafka实战》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇idea 关于高亮显示与选中字符串相同的内容踩过的坑Nexus Repository Manager 3.0 安装与包上传 Maven、Nuget下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

【失败的尝试】C++中使用string进行switch判断

贴出错误代码: #include <iostream>#include <string>using namespace std;void main(){    string str;    cin>>str;     switch(str)    {    case "ab":        cout<<"o...

java多线程案例

packagecom.alibaba.yuntu.me.biz.district.service.impl.MyUtis; importcom.alibaba.fastjson.JSONObject; importcom.alibaba.yuntu.me.common.base.util.HttpUtil; importlombok.SneakyThro...

Java 性能优化的 50 个细节

在JAVA程序中,性能问题的大部分原因并不在于JAVA语言,而是程序本身。养成良好的编码习惯非常重要,能够显著地提升程序性能。 #尽量在合适的场合使用单例 使用单例可以减轻加载的负担,缩短加载的时间,提高加载的效率,但并不是所有地方都适用于单例 简单来说,单例主要适用于以下三个方面: 控制资源的使用,通过线程同步来控制资源的并发访问; 控制实例的产生,以...

rabbitmq进阶

目录 消息传递 过期时间(TTL) 死信队列 延迟队列 优先级队列 RPC实现 持久化 生产者确认 消费端要点 消息传输保障 消息传递 mandatory mandatory=true,如果交换器无法根据自身的类型和路由键找到一个符合条件的队列,RabbitMQ会调用 Basic.Return 命令将消息返回给生产者,生产者通过调用 chann...

Java AES加密解密工具 -- GUI 、在线传输文件

原理 对于任意长度的明文,AES首先对其进行分组,每组的长度为128位。分组之后将分别对每个128位的明文分组进行加密。 对于每个128位长度的明文分组的加密过程如下: (1)将128位AES明文分组放入状态矩阵中。 (2)AddRoundKey变换:对状态矩阵进行AddRoundKey变换,与膨胀后的密钥进行异或操作(密钥膨胀将在实验原理七中详细讨论)。...

解决来QQ消息后歌曲音量降低问题

今天学了一天,晚上听歌放松一下,谁知碰到了一个很纠结的问题,我正在聊天,每当来QQ消息后,我的歌曲音量自动降低,降到非常小,然后我就调高音量,把音乐的音量调到最大,又把系统音量调到最大,谁知音乐的声音还是很小,鼓捣了一阵,终于找到原因了,在右下角音乐图标点一下,然后点击合成器,就出现下面的界面:看到原因了吧,千千静听的声音比其他三个都低很多,怪不得不管怎么...