webSocket+jwt实现方式

摘要:
最初的项目是在头中添加一个jwt令牌以实现身份验证。因为websocket不支持在头中添加信息(也许我打开它的方式是错误的?1.websocket的核心依赖项是<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter websocket</artifactId><dependencie>2config@ConfigurationpublicclassWebSocketConfig{@BeanpublicServerEndpointExporterserverEndpointExporter(){returnnewServerEndpointExporter;}}3.WebSocketServer@Slf4j @ ServerEndpoint@ComponentpublicclassWebSocketServer{/***并发包的线程安全集用于存储与每个客户端对应的WebSocket对象。

背景:

原项目是通过前端定时器获取消息,存在消息滞后、空刷服务器、浪费带宽和资源的问题,在springboot项目集成websocket可以实现实时点对点消息推送。

原项目是在header添加jwt令牌实现认证,由于websocket不支持在头部添加信息(或许是我打开的方式不对?),最终只能采用在url添加令牌参数实现认证,感觉不够优雅,后续再想办法重构改进。

ps:至于放行websocket相关url,完全不要去考虑,危害巨大。

1、websocket核心依赖

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>

2、config

@Configuration
public class WebSocketConfig {
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}

3、WebSocketServer

@Slf4j
@ServerEndpoint("/webSocket/{code}")
@Component
public class WebSocketServer {
    /**
     * concurrent包的线程安全Set,用来存放每个客户端对应的WebSocket对象。
     */
    private static CopyOnWriteArraySet<WebSocketServer> webSocketSet = new CopyOnWriteArraySet<>();

    /**
     * 与客户端的连接会话,需要通过它来给客户端发送数据
     */
    private Session session;

    /**
     * 接收识别码
     */
    private String code = "";

    /**
     * 连接建立成功调用的方法
     */
    @OnOpen
    public void onOpen(Session session, @PathParam("code") String code) {
        this.session = session;
        //如果存在就先删除一个,防止重复推送消息,实际这里实现了set,不删除问题也不大
        webSocketSet.removeIf(webSocket -> webSocket.code.equals(code));
        webSocketSet.add(this);
        this.code = code;
        log.info("建立WebSocket连接,code:" + code+",当前连接数:"+webSocketSet.size());
    }

    /**
     * 连接关闭调用的方法
     */
    @OnClose
    public void onClose() {
        webSocketSet.remove(this);
        log.info("关闭WebSocket连接,code:" + this.code+",当前连接数:"+webSocketSet.size());
    }

    /**
     * 收到客户端消息后调用的方法
     *
     * @param message 客户端发送过来的消息
     */
    @OnMessage
    public void onMessage(String message, Session session) {
        log.info("收到来[" + code + "]的信息:" + message);

    }

    @OnError
    public void onError(Session session, Throwable error) {
        log.error("websocket发生错误");
        error.printStackTrace();
    }

    /**
     * 实现服务器主动推送
     */
    private void sendMessage(String message) throws IOException {
        this.session.getBasicRemote().sendText(message);
    }


    /**
     * 群发自定义消息
     */
    public void sendAll(String message) {
        log.info("推送消息到" + code + ",推送内容:" + message);
        for (WebSocketServer item : webSocketSet) {
            try {
                item.sendMessage(message);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 定点推送
     */
    public void sendTo(String message, @PathParam("code") String code) {
        for (WebSocketServer item : webSocketSet) {
            try {
                if (item.code.equals(code)) {
                    log.info("推送消息到[" + code + "],推送内容:" + message);
                    item.sendMessage(message);
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || getClass() != o.getClass()) {
            return false;
        }
        WebSocketServer that = (WebSocketServer) o;
        return Objects.equals(session, that.session) &&
                Objects.equals(code, that.code);
    }

    @Override
    public int hashCode() {
        return Objects.hash(session, code);
    }
}

4、令牌过滤器

@Slf4j
@Component
public class JwtTokenFilter extends OncePerRequestFilter {

    @Resource
    JwtProperties jwtProperties;

    @Resource
    TokenProvider tokenProvider;

    @Resource
    OnlineUserService onlineUserService;

    @Override
    protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain filterChain) throws ServletException, IOException {
        // http连接时,客户端应该是在头信息中携带令牌
        String authorizationHeader = request.getHeader(jwtProperties.getHeader());
        if(StringUtils.isBlank(authorizationHeader)) {
            // websocket连接时,令牌放在url参数上,以后重构
            authorizationHeader = request.getParameter(jwtProperties.getHeader());
        }

        String token = null;
        if(!StringUtils.isEmpty(authorizationHeader) && authorizationHeader.startsWith(jwtProperties.getTokenStartWith())){
            token = authorizationHeader.replace(jwtProperties.getTokenStartWith(),"");
        }
        //验证token
        if(StringUtils.isNotBlank(token) && tokenProvider.validateToken(token)){
            //验证token是否在缓存中
            OnlineUserDto onlineUserDto = onlineUserService.getOne(jwtProperties.getOnlineKey() + token);
            if(onlineUserDto!=null){
                Authentication authentication = tokenProvider.getAuthentication(token, request);
                SecurityContextHolder.getContext().setAuthentication(authentication);
                log.debug("set Authentication to security context for '{}', uri: {}", authentication.getName(), request.getRequestURI());
            }
        }

        filterChain.doFilter(request, response);
    }
}

5、在业务中调用方式(伪代码)

    @Resource
    private WebSocketServer webSocketServer;

    // 向客户端推送实时消息
    webSocketServer.sendTo(content, sysUser.getId());

6、前端,伪代码

    getMessageCount() {
      getMyMessageCount().then(res => {
        const count = res
        this.messageCount = count > 0 ? count : null
      })
    },
    initWebSocket() {
      const wsUri = process.env.VUE_APP_WS_API + '/webSocket/' + this.user.id + '?Authorization=' + getToken()
      this.websock = new WebSocket(wsUri)
      this.websock.onopen = this.webSocketOnOpen
      this.websock.onerror = this.webSocketOnError
      this.websock.onmessage = this.webSocketOnMessage
    },
    webSocketOnOpen(e) {
      console.log('websocket 已经连接', e)
    },
    webSocketOnError(e) {
      this.$notify({
        title: 'WebSocket连接发生错误',
        type: 'error',
        duration: 0
      })
    },
    webSocketOnMessage(e) {
      const data = e.data
      this.$notify({
        title: '',
        message: data,
        type: 'success',
        dangerouslyUseHTMLString: true,
        duration: 5500
      })
      this.getMessageCount()
    }

免责声明:文章转载自《webSocket+jwt实现方式》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇扫雷思想及主函数异步通知机制的总结下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

taro 消息机制

Taro 提供了 Taro.Events 来实现消息机制,使用时需要实例化它 同时 Taro 还提供了一个全局消息中心 Taro.eventCenter 以供使用,它是 Taro.Events 的实例 https://nervjs.github.io/taro/docs/events.html...

MySQL8.0新增配置参数

activate_all_roles_on_login 此参数在版本8.0.2引入,是一个可以动态调整的global级参数,默认值为OFF。此参数用于控制在账户登录时是否激活已经授予的角色,如果为ON则授予的角色会被激活,设置为OFF时只能通过SET DEFAULT ROLE显式激活用户角色。activate_all_roles_on_login设置只在账...

Kafka消费者-从Kafka读取数据

(1)Customer和Customer Group (1)两种常用的消息模型 队列模型(queuing)和发布-订阅模型(publish-subscribe)。 队列的处理方式是一组消费者从服务器读取消息,一条消息只由其中的一个消费者来处理。 发布-订阅模型中,消息被广播给所有的消费者,接收到消息的消费者都可以处理此消息。 (2)Kafka的消费者和消...

关于系统用户数,并发用户数,在线用户数,吞吐量

1、  关于系统用户数,并发用户数和在线用户数 系统用户数 侠义上来说,可以理解为系统注册用户数;广义上来说,可以理解为所有访问过系统的用户数 在线用户数 侠义上来说,可以理解为已登录系统的用户数;广义来说,可以理解为当前时间访问系统的用户数。 并发用户数 可以分两种: 1)同一时间点,执行同一(业务)操作的用户数 2)同一时间点,执行不同(业务)操作的...

Horizon Web管理界面

一、horizon 介绍: Horizon 为 Openstack 提供一个 WEB 前端的管理界面 (UI 服务 )通过 Horizone 所提供的 DashBoard 服务 , 管理员可以使用通过 WEB UI 对 Openstack 整体云环境进行管理 , 并可直观看到各种操作结果与运行状态。 DashBoard 与其他组件的关系 登陆 Dashb...

PHP实现IP访问限制及提交次数的方法详解

一、原理 提交次数是肯定要往数据库里写次数这个数据的,比如用户登陆,当用户出错时就忘数据库写入出错次数1,并且出错时间,再出错写2,当满比如5次时提示不允许再登陆,请明天再试,然后用DateDiff计算出错时和now()的时间,如果大于24就再开放让他试。 封IP的话特别是给IP断就比较简单了, 先说给IP段开放的情况:先取出客户访问的IP,为了解释方便,...