springboot整合kafka实现消息推送

摘要:
前言本文主要介绍kafka在弹簧靴中的集成。˃˃项目xmlns=“http://maven.apache.org/POM/4.0.0“xmlns:xsi=”http://www.w3.org/2001/XMLSchema-instance“xsi:schemaLocation=”http://maven.apache.org/POM/4.0.0http://maven.apache.org/xsd/maven-4.0.0.xsd“˃4.0.0com.lxgspringboot kafka1.0-SNAPSHOTpomspringbootkafkaspringbbootkafka commonspringbookkafka consumerspringbootka product˃org.springboot.bootspri” ng启动启动程序父级2.1.3.RELEASEorg.springframework.boot弹簧启动启动程序weborg.springframework.kafka弹簧kafka˃org.projectlombok˃1.18.12?

前言

本篇文章主要介绍的是springboot整合kafka。

安装kafka

1.使用docker安装kafka,移步 https://www.cnblogs.com/lixianguo/p/13254950.html

创建工程

1.创建一个名为springboot-kafka的pom项目作为父工程,将main和resource文件夹都删除,pom文件添加配置
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.lxg</groupId>
    <artifactId>springboot-kafka</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>pom</packaging>

    <name>springboot-kafka</name>
    <modules>
        <module>springboot-kafka-common</module>
        <module>springboot-kafka-consumer</module>
        <module>springboot-kafka-producer</module>
    </modules>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.3.RELEASE</version>
    </parent>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.12</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.67</version>
        </dependency>
    </dependencies>

</project>
3.创建公共服务模块

创建一个名为springboot-kafka-common的微服务,打包方式为jar,存放一些公共配置和公共类,如util等
1.配置pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <groupId>com.lxg</groupId>
        <artifactId>springboot-kafka</artifactId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>
    <artifactId>springboot-kafka-common</artifactId>
</project>

pom文件中以父工程作为父依赖,就不需要额外引入依赖了
2.新建一个user实体类

@Data
public class User implements Serializable {
    /**
     * id
     */
    private Integer id;

    /**
     * 用户名字
     */
    private String username;

    /**
     * 密码
     */
    private String password;
}

3.创建application-common.yml配置文件,主要添加kafka的公共配置

spring:
  kafka:
    #kafka配置
    bootstrap-servers: 192.168.56.102:9092
    producer:
      retries: 0
      # 每次批量发送消息的数量
      batch-size: 16384
      buffer-memory: 33554432
      # 指定消息key和消息体的编解码方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      # 指定默认消费者group id
      group-id: test-consumer-group
      auto-offset-reset: earliest
      enable-auto-commit: true
      auto-commit-interval: 5000
      # 指定消息key和消息体的编解码方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    #自己定义的主题名称,在微服务中使用Value注解注入调用,如果kafka中没有该主题,则会自动创建
    topic:
      userTopic: userInfo
4.创建消息生产者,即创建一个名为springboot-kafka-producer的普通springboot项目

1.pom文件配置

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <groupId>com.lxg</groupId>
    <artifactId>springboot-kafka-producer</artifactId>
    <version>1.0-SNAPSHOT</version>

    <modelVersion>4.0.0</modelVersion>

    <dependencies>
        <dependency>
            <groupId>com.lxg</groupId>
            <artifactId>springboot-kafka-common</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>
    </dependencies>

</project>

2.application.yml配置文件,配置端口,设置微服务名称,引入公共服务模块中的application-common.yml

server:
  port: 8081
spring:
  application:
    name: kafka-producer
  profiles:
    active: common

3.controller层
创建UserController

@Slf4j
@Controller
@RequestMapping("/api/user")
public class UserController {
    @Autowired
    private UserService userService;

    @ResponseBody
    @GetMapping("/getUser")
    public void getUser() {
        userService.sendUserMsg();
        log.info("getUser");
    }
}

4.service层
创建UserService

public interface UserService {
    /**
     * 发送用户信息
     *
     * @return
     */
    Boolean sendUserMsg();
}

创建UserServiceImpl

@Slf4j
@Service
public class UserServiceImpl implements UserService {
    @Value("${spring.kafka.topic.userTopic}")
    private String userTopic;

    @Autowired
    KafkaTemplate kafkaTemplate;


    @Override
    public Boolean sendUserMsg() {
        User user = new User();
        user.setId(1);
        user.setUsername("lxg");
        user.setPassword("6767167");
        kafkaTemplate.send(userTopic, JSONObject.toJSONString(user));
        log.info("lxg");
        return Boolean.TRUE;
    }
}

5.创建启动类

@SpringBootApplication
public class WebApplication {
    public static void main(String[] args) {
        SpringApplication.run(WebApplication.class, args);
    }
}
5.创建消息消费者

1.pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <groupId>com.lxg</groupId>
    <artifactId>springboot-kafka-consumer</artifactId>
    <version>1.0-SNAPSHOT</version>

    <modelVersion>4.0.0</modelVersion>

    <dependencies>
        <dependency>
            <groupId>com.lxg</groupId>
            <artifactId>springboot-kafka-common</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>
    </dependencies>
</project>

2.创建yml配置文件

server:
  port: 8082
spring:
  application:
    name: kafka-consumer
  profiles:
    active: common

3.创建consumer消费者类

@Slf4j
@Component
public class UserConsumer {

    @KafkaListener(topics = {"${spring.kafka.topic.userTopic}"})
    public void userConsumer(String message) {
        log.info("receive msg " + message);
    }
}

4.启动类

@SpringBootApplication
public class ConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }
}

测试

启动producer和consumer两个服务模块
访问producer微服务中的接口 http://localhost:8081/api/user/getUser
会发现consumer微服务中的控制台打印了producer中创建并推送过来的的user实体

本文GitHub源码:https://github.com/lixianguo5097/springboot/tree/master/springboot-kafka

CSDN:https://blog.csdn.net/qq_27682773
简书:https://www.jianshu.com/u/e99381e6886e
博客园:https://www.cnblogs.com/lixianguo
个人博客:https://www.lxgblog.com

免责声明:文章转载自《springboot整合kafka实现消息推送》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇01 Vue基础简便删除已经存在的oracle数据库用户UPAY3LINGXI_YS下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

Kafka(四) —— KafkaProducer源码阅读

一、doSend()方法 Kafka中的每一条消息都对应一个ProducerRecord对象。 public class ProducerRecord<K, V> { private final String topic; private final Integer partition; private final...

RabbitMQ、Kafka、RocketMQ的优劣势

今天我们一起来探讨:  全量的消息队列究竟有哪些?  Kafka、RocketMQ、RabbitMQ的优劣势比较  以及消息队列的选型 最全MQ消息队列有哪些 那么目前在业界有哪些比较知名的消息引擎呢?如下图所示 这里面几乎完全列举了当下比较知名的消息引擎,包括:  ZeroMQ  推特的Distributedlog  ActiveMQ:Apach...

Kafka系列二之部署与使用

Kafka部署与使用 写在前面从上一篇Kafka的架构介绍和安装中,可能,你还一直很蒙,kafka到底该怎么使用呢?接下来,我们就来介绍Kafka的部署与使用。上篇文章中我们说到,Kafka的几个重要组成是:1.producer 2.consumer 3.broker 4.topic .因此我们就是围绕这几个组件来使用我们的Kafka. 如何开始呢?...

springboot依赖管理

问题引入: springboot工程A依赖工程B, B中定义 jackson 的依赖版本为 2.11.2, 在A中除去[spring-boot-starter-web:2.1.3.RELEASE]对 jackson 的依赖. 查看工程A依赖的 jsckson 版本, 结果是:2.9.8 这是为什么呢? 答案要从 springboot 依赖管理探寻. 我们知...

.Net Core 商城微服务项目系列(十三):搭建Log4net+ELK+Kafka日志框架

之前是使用NLog直接将日志发送到了ELK,本篇将会使用Docker搭建ELK和kafka,同时替换NLog为Log4net。 一.搭建kafka 1.拉取镜像 //下载zookeeper docker pull wurstmeister/zookeeper //下载kafka docker pull wurstmeister/kafka:2.11-...

flume kafka 配置指南

1、官方网站也有配置: https://flume.apache.org/FlumeUserGuide.html#kafka-source 2、clodera 官方配置 https://www.cloudera.com/documentation/kafka/2-0-x/topics/kafka_flume.html 1 tier1.sources =...