WebSocket和kafka实现数据实时推送到前端

摘要:
1、 需求后台最近接触到一个需要将kafka中的数据推送到前端以实时显示的需求。最后,确定使用WebSocket实现实时数据推送。2、 websocket简介互联网上有很多关于websocket的文章,所以我不会详细介绍它。这里只是一个简短的介绍。WebSocket协议是一种基于TCP的新型网络协议。第一个是WebSocket所需的依赖项,另外两个是kafka的依赖项<dependencies><!27*虽然在push逻辑的实现中不需要接收前端数据,但作为webSocket教程或注释,它仍然添加了接收数据的逻辑。
一. 需求背景
     最近新接触一个需求,需要将kafka中的数据实时推送到前端展示。最开始想到的是前端轮询接口数据,但是无法保证轮询的频率和消费的频率完全一致,或造成数据缺失等问题。最终确定用利用WebSocket实现数据的实时推送。
 
二. websocket简介
     网上已经有好多介绍WebSocket的文章了,就不详细介绍了,这里只做简单介绍。 WebSocket协议是基于TCP的一种新的网络协议。它实现了浏览器与服务器全双工(full-duplex)通信——允许服务器主动发送信息给客户端。
 
三. 服务端实现
  1. pom文件
  这里需要引用三个依赖。第一个为WebSocket需要的依赖,另外两个为kafka的依赖
 

<dependencies>
<!-- webSocket所需依赖 -->
<dependency>
<groupId>javax</groupId>
<artifactId>javaee-api</artifactId>
<version>7.0</version>
</dependency>
<!-- kafka 所需依赖 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.9.2</artifactId>
<version>0.8.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>RELEASE</version>
</dependency>
</dependencies>

2. webSocket服务端实现

 1 //此处定义接口的uri
 2 @ServerEndpoint("/wbSocket")
 3 public class WebSocket {
 4     private Session session;
 5     public static CopyOnWriteArraySet<WebSocket> wbSockets = new CopyOnWriteArraySet<WebSocket>(); //此处定义静态变量,以在其他方法中获取到所有连接
 6     
 7     /**
 8      * 建立连接。
 9      * 建立连接时入参为session
10      */
11     @OnOpen
12     public void onOpen(Session session){
13         this.session = session;
14         wbSockets.add(this); //将此对象存入集合中以在之后广播用,如果要实现一对一订阅,则类型对应为Map。由于这里广播就可以了随意用Set
15         System.out.println("New session insert,sessionId is "+ session.getId());
16     }
17     /**
18      * 关闭连接
19      */
20     @OnClose
21     public void onClose(){
22         wbSockets.remove(this);//将socket对象从集合中移除,以便广播时不发送次连接。如果不移除会报错(需要测试)
23         System.out.println("A session insert,sessionId is "+ session.getId());
24     }
25     /**
26      * 接收前端传过来的数据。
27      * 虽然在实现推送逻辑中并不需要接收前端数据,但是作为一个webSocket的教程或叫备忘,还是将接收数据的逻辑加上了。
28      */
29     @OnMessage
30     public void onMessage(String message ,Session session){
31         System.out.println(message + "from " + session.getId());
32     }
33 
34     public void sendMessage(String message) throws IOException {
35         this.session.getBasicRemote().sendText(message);
36     }
37 }

3. kafka消费者实现

复制代码
 1 public class ConsumerKafka extends Thread {
 2 
 3     private KafkaConsumer<String,String> consumer;
 4     private String topic = "kafkaTopic";
 5 
 6     public ConsumerKafka(){
 7 
 8     }
 9 
10     @Override
11     public void run(){
12         //加载kafka消费者参数
13         Properties props = new Properties();
14         props.put("bootstrap.servers", "localhost:9092");
15         props.put("group.id", "ytna");
16         props.put("enable.auto.commit", "true");
17         props.put("auto.commit.interval.ms", "1000");
18         props.put("session.timeout.ms", "15000");
19         props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
20         props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
21         //创建消费者对象
22         consumer = new KafkaConsumer<String,String>(props);
23         consumer.subscribe(Arrays.asList(this.topic));
24         //死循环,持续消费kafka
25         while (true){
26             try {
27                //消费数据,并设置超时时间
28                 ConsumerRecords<String, String> records = consumer.poll(100);
29                 //Consumer message
30                 for (ConsumerRecord<String, String> record : records) {
31                     //Send message to every client
32                     for (WebSocket webSocket :wbSockets){
33                         webSocket.sendMessage(record.value());
34                     }
35                 }
36             }catch (IOException e){
37                 System.out.println(e.getMessage());
38                 continue;
39             }
40         }
41     }
42 
43     public void close() {
44         try {
45             consumer.close();
46         } catch (Exception e) {
47             System.out.println(e.getMessage());
48         }
49     }
50 
51     //供测试用,若通过tomcat启动需通过其他方法启动线程
52     public static void main(String[] args){
53         ConsumerKafka consumerKafka = new ConsumerKafka();
54         consumerKafka.start();
55     }
56 }
复制代码
 
P.S. 需要注意的是WebSocket对tomcat版本是有要求的,笔者使用的是7.0.7.8。
 
四. 前端简单实现
复制代码
 1 <!DOCTYPE html>
 2 <html lang="en">
 3 <head>
 4     <meta charset="UTF-8">
 5     <title>WebSocket client</title>
 6     <script type="text/javascript">
 7         var socket;
 8         if (typeof (WebSocket) == "undefined"){
 9             alert("This explorer don't support WebSocket")
10         }
11 
12         function connect() {
13             //Connect WebSocket server
14             socket =new WebSocket("ws://127.0.0.1:8080/wbSocket");
15             //open
16             socket.onopen = function () {
17                 alert("WebSocket is open");
18             }
19             //Get message
20             socket.onmessage = function (msg) {
21                 alert("Message is " + msg);
22             }
23             //close
24             socket.onclose = function () {
25                 alert("WebSocket is closed");
26             }
27             //error
28             socket.onerror = function (e) {
29                 alert("Error is " + e);
30             }
31         }
32 
33         function close() {
34             socket.close();
35         }
36 
37         function sendMsg() {
38             socket.send("This is a client message ");
39         }
40     </script>
41 </head>
42 <body>
43     <button onclick="connect()">connect</button>
44     <button onclick="close()">close</button>
45     <button onclick="sendMsg()">sendMsg</button>
46 </body>
47 </html>
复制代码
 
五. 结语
     以上基本可以实现将kafka数据实时推送到前端。这是笔者第一篇笔记,不足之处请指出、谅解。
     源码:https://github.com/youtNa/webSocketkafka
   引用:1. webSocket百度百科

免责声明:文章转载自《WebSocket和kafka实现数据实时推送到前端》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇视频监控存储空间大小与传输带宽计算方法百度地图三种定位方式测试(高精度、低功耗、仅用设备)下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

会话管理(Cookie/Session技术)

什么是会话:用户打开浏览器,点击多个超链接,访问服务器的多个web资源,然后关闭浏览器,整个过程就称为一个会话; 会话过程需要解决的问题:每个用户在使用浏览器与服务器进行会话的过程中,都可能会产生一些数据,这些输入如何来进行保存?比如用户在购物网站浏览的商品记录,用户添加购物车的记录等等这些信息如何进行存储?在程序中会话跟踪是一件非常重要的事情,一个用户的...

三、搞定Service接口和实现类

1.建包com.myz.service.interfaces,用于存放接口,包com.myz.service.imps,用于存放实现类 2.包com.myz.service.interfaces下新建接口EmployeeServiceInterface packagecom.myz.service.interfaces; importjava.io.Se...

Kafka无法消费!?究竟是bug的“沦陷”还是配置的“扭曲”?

在一个月黑风高的夜晚,突然收到现网生产环境Kafka消息积压的告警,梦中惊醒啊,马上起来排查日志。 问题现象 消费请求卡死在查找Coordinator Coordinator为何物?Coordinator用于管理Consumer Group中各个成员,负责消费offset位移管理和Consumer Rebalance。Consumer在消费时必须先确认Co...

kafka集群搭建(windows环境下)

一、简介 Kafka 是一个实现了分布式的、具有分区、以及复制的日志的一个服务。它通过一套独特的设计提供了消息系统中间件的功能。它是一种发布订阅功能的消息系统。 1、名词介绍 Message 消息,就是要发送的内容,一般包装成一个消息对象。   Topic 通俗来讲的话,就是放置“消息”的地方,也就是说消息投递的一个容器。假如把消息看作是信封的话,那么 T...

基于 Cookie 的 SSO 中间件 kisso

kisso  =  cookie sso 基于 Cookie 的 SSO 中间件,它是一把快速开发 java Web 登录系统(SSO)的瑞士军刀。欢迎大家使用 kisso !!  kisso 帮助文档下载 1、支持单点登录 2、支持登录Cookie缓存 3、支持防止 xss攻击, SQL注入,脚本注入 4、支持 Base64 / MD5 / A...

前端下载文件(GET、POST方法)

GET location.href = 'http://t.zoukankan.com/path/to/download/url' POST axios.post('/path/to/download/url', this.searchParams, { responseType: 'blob'}).then(res => { let blob =...