websocket长连接压力测试踩过的坑

摘要:
Websocket协议压测记录背景:公司的行情系统是采用的websocket协议,有请求和订阅两种方式向服务器申请最新行情信息。

Websocket协议压测记录

背景:

公司的行情系统是采用的websocket协议,有请求和订阅两种方式向服务器申请最新行情信息。请求方式是一次的,订阅方式是建立连接后,服务器定时向客户端推送行情信息。

初步测试方案:

因考虑到websocket是双工通讯,是长连接,并且本次压测的性能指标是系统能建立的最大连接数,并且是建立连接后服务器能持续向客户端推送行情信息。

基于以上原因考虑用python采用多线程建立连接,为了验证能否收到推送的信息,把返回的行情信息保存到文本文件中。Python脚本如下:

import websocket

import time

import threading

import gzip

#import json

#from threadpool import ThreadPool, makeRequests

#from websocket import create_connection

SERVER_URL = "ws://pro-web-new.devtest.exshell-dev.com/r1/main/ws"

#SERVER_URL = "wss://i.cg.net/wi/ws"

#SERVER_URL = "wss://www.exshell.com/r1/main/ws"

def on_message(ws, message):

print(message)

def on_error(ws, error):

print(error)

def on_close(ws):

print("### closed ###")

def on_open(ws):

def send_trhead():

send_info = '{"sub": "market.ethusdt.kline.1min","id": "id10"}'

#send_info = '{"event":"subscribe", "channel":"btc_usdt.deep"}'

while True:

#time.sleep(5)

#ws.send(json.dumps(send_info))

ws.send(send_info)

while (1):

compressData = ws.recv()

result = gzip.decompress(compressData).decode('utf-8')

if result[:7] == '{"ping"':

ts = result[8:21]

pong = '{"pong":' + ts + '}'

ws.send(pong)

ws.send(send_info)

else:

#print(result)

with open('./test_result.txt', 'a') as f:

f.write(threading.currentThread().name+' ')

f.write(result+' ')

t = threading.Thread(target=send_trhead)

t.start()

print(threading.currentThread().name)

def on_start(a):

# time.sleep(2)

# websocket.enableTrace(True)

# ws = websocket.WebSocketApp(SERVER_URL,

# on_message=on_message,

# on_error=on_error,

# on_close=on_close)

# ws.on_open = on_open

# ws.run_forever()

#print(a[2])

try:

ws = websocket.create_connection(SERVER_URL)

on_open(ws)

except Exception as e:

print('error is :',e)

print('connect ws error,retry...')

time.sleep(5)

if __name__ == "__main__":

# pool = ThreadPool(3)

# test = list()

# for ir in range(3):

# test.append(ir)

#

# requests = makeRequests(on_start, test)

# [pool.putRequest(req) for req in requests]

# pool.wait()

# # #on_start(1)

for ir in range(20):

on_start(1)

time.sleep(0.1)

初步测试结果:

在压测的过程中,发现连接数达到一定程度(单机1400连接),连接就断掉了,监控发现压力机内存基本消耗光了,因建立连接,并接收返回的信息,随着连接数增加,内存消耗大,只能断开连接,释放内存。

调整测试方案:

和架构、开发讨论后,准备在websocket客户端采用AIO异步通讯方式增大压力,因当时是考虑到长连接未考虑这种方式,查询资料,发现websocket服务端可以采用AIO异步通讯方式,在websocket客户端尝试一下,采用locust + python的方式,也查找了一些资料,发现方案可行。

Locust是一款可扩展的,分布式的,性能测试的,开源的,用Python编写的性能测试工具。对于测试HTTP协议的接口是比较方便的,但是它也支持测试别的协议的接口,不过需要重写Locust类。脚本如下:

from locust import Locust, events, task, TaskSet

import websocket

import time

import gzip

class WebSocketClient():

def __init__(self, host, port):

self.host = host

self.port = port

class WebSocketLocust(Locust):

def __init__(self, *args, **kwargs):

self.client = WebSocketClient("172.31.15.85", 9503)

class UserBehavior(TaskSet):

ws = websocket.WebSocket()

#self.ws.connect("ws://10.98.64.103:8807")

ws.connect("ws://pro-web-new.devtest.exshell-dev.com/r1/main/ws")

@task(1)

def buy(self):

try:

start_time = time.time()

#self.ws.send('{"url":"/buy","data":{"id":"123","issue":"20170822","doubled_num":2}}')

#result = self.ws.recv()

send_info = '{"sub": "market.ethusdt.kline.1min","id": "id10"}'

# send_info = '{"event":"subscribe", "channel":"btc_usdt.deep"}'

while True:

# time.sleep(5)

# ws.send(json.dumps(send_info))

ws.send(send_info)

while (1):

compressData = ws.recv()

result = gzip.decompress(compressData).decode('utf-8')

if result[:7] == '{"ping"':

ts = result[8:21]

pong = '{"pong":' + ts + '}'

ws.send(pong)

ws.send(send_info)

else:

# print(result)

with open('./test_result.txt', 'a') as f:

#f.write(threading.currentThread().name + ' ')

f.write(result + ' ')

except Exception as e:

print("error is:",e)

class ApiUser(WebSocketLocust):

task_set = UserBehavior

min_wait = 100

max_wait = 200

用命令执行脚本:

Locust -f websocket_client_locust.py –-no-web -c 1 -r 1 -t 60s

单个用户执行成功,并能生成文件。但多个用户执行的时候就报错,报错信息如下:This socket is already used by another greenlet: <bound method Waiter.switch of <gevent.hub.Waiter object at 0x7f01f0594900>>

错误原因说的是socke正在被使用,但是我的代码中是新的socket,简单分析了一下,应该不会出现问题,但是我的socek的使用部分是一个全局的client,然后程序运行的时候出现了上述错误.仔细推测我找出了原因:

geven是个协程库,那么多个协程共用一个socek的时候就会出现上述错误了,于是我把socket改成了局部的,问题解决.

修改前:

websocket长连接压力测试踩过的坑第1张

修改后:

websocket长连接压力测试踩过的坑第2张

修改后代码:

from locust import Locust, events, task, TaskSet

import websocket

import time

import gzip

class WebSocketClient():

def __init__(self, host):

self.host = host

#self.port = port

class WebSocketLocust(Locust):

def __init__(self, *args, **kwargs):

self.client = WebSocketClient("172.31.15.85")

class UserBehavior(TaskSet):

# ws = websocket.WebSocket()

# #self.ws.connect("ws://10.98.64.103:8807")

# ws.connect("ws://pro-web-new.devtest.exshell-dev.com/r1/main/ws")

@task(1)

def buy(self):

try:

ws = websocket.WebSocket()

# self.ws.connect("ws://10.98.64.103:8807")

ws.connect("ws://pro-web-new.devtest.exshell-dev.com/r1/main/ws")

start_time = time.time()

#self.ws.send('{"url":"/buy","data":{"id":"123","issue":"20170822","doubled_num":2}}')

#result = self.ws.recv()

send_info = '{"sub": "market.ethusdt.kline.1min","id": "id10"}'

# send_info = '{"event":"subscribe", "channel":"btc_usdt.deep"}'

while True:

# time.sleep(5)

# ws.send(json.dumps(send_info))

ws.send(send_info)

while (1):

compressData = ws.recv()

result = gzip.decompress(compressData).decode('utf-8')

if result[:7] == '{"ping"':

ts = result[8:21]

pong = '{"pong":' + ts + '}'

ws.send(pong)

ws.send(send_info)

# else:

# # print(result)

# with open('./test_result.txt', 'a') as f:

# #f.write(threading.currentThread().name + ' ')

# f.write(result + ' ')

except Exception as e:

print("error is:",e)

class ApiUser(WebSocketLocust):

task_set = UserBehavior

min_wait = 100

max_wait = 200

压测开始,随着用户数上升,压力机端发生如下错误:500和502错误

websocket长连接压力测试踩过的坑第3张

这是协议进行握手时失败,查询后端行情应用服务器,也有大量报错。

查看服务器发现打开最大文件数是1024,调整到65535,用如下命令调整:

第一步,修改/etc/sysctl.conf文件,在文件中添加如下行:

net.ipv4.ip_local_port_range = 1024 65000

这表明将系统对本地端口范围限制设置为1024~65000之间。请注意,本地端口范围的最小值必须大于或等于1024;而端口范围的最大值则应小于或等于65535.修改完后保存此文件。

第二步,执行sysctl命令:

[speng@as4 ~]$ sysctl -p

如果系统没有错误提示,就表明新的本地端口范围设置成功。如果按上述端口范围进行设置,则理论上单独一个进程最多可以同时建立60000多个TCP客户端连接。

调整完成后复测,发现还是报这个错误,请开发进行定位分析应用程序。

免责声明:文章转载自《websocket长连接压力测试踩过的坑》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇推荐一款DataGridView的打印解决方案分布式-JOB(XXL-Job)下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

centos环境下安装FastDFS配置详解(包含配置nginx)

项目中使用了FastDFS作为文件系统,这里记录一下安装和配置过程,个人使用部署过程中耗费了好长时间和精力,遇到了很多的坑,总结成了一篇详细的部署文档,以备下次安装使用。 项目场景 由于是测试环境,所以只提供了一台服务器,后续软件的安装和配置都在这台服务器上完成。(IP:10.129.44.128) 1、安装gcc(编译时需要) FastDFS是C语言开发...

Java日志框架logback剖析

Logback和log4j非常相似,优点如下: 1、更快的实现 Logback的内核重写,在一些关键执行路径上性能提升10倍以上。同时,初始化内存加载也更小。 2、非常充分的测试 Logback经过数年的测试,这是简单重要的原因选择logback而不是log4j。 3、Logback-classic非常自然实现了SLF4j 因为logback-classi...

SQL中连接(JOIN)子句介绍

本文主要介绍 SQL(Structured Query Language)中连接(JOIN)子句的相关知识,同时通过用法示例介绍连接的常见用法。 说明:本文的用法示例是面向 MySQL 数据库的。 1 概述SQL 中 JOIN 子句用于把来自两个或多个表的行结合起来。 在实际的数据库应用中,经常需要从多个数据表中读取数据,这时就可以使用 SQL 语句中的连...

移动端屏幕适配+事件+常见问题解决

移动端屏幕适配 <meta name="viewport" content="width=device-width, initial-scale=1, maximum-scale=1, minimum-scale=1, user-scalable=no"> 移动端屏幕适配与响应式的区别移动端屏幕适配: 移动端 宽高% / rem 字体px 宽...

ES数据库安装6.6

ES数据库安装 elastica searchelasticsearch的概念:是一个实时的分布式搜索和分析引擎,它可以用于全文搜索,结构化搜索以及分析。它是一个建立在全文搜索引擎 Apache Lucene 基础上的搜索引擎,使用 Java 语言编写。 1、elasticsearch和MongoDB/redis/memcache一样,是非关系性数据库是一...

opensips搭建问题解决笔记

只是笔记而已,不是很详细,莫怪 opensips搭建问题解决笔记:# opensipsctl startINFO: Starting OpenSIPS :ERROR: PID file /var/run/opensips.pid does not exist -- OpenSIPS start failed Nov 6 15:57:53 webcon201...