基于PySpark的网络服务异常检测系统 (四) Mysql与SparkSQL对接同步数据 kmeans算法计算预测异常

摘要:
基于DjangRestframework和Spark的异常检测系统,使用MySQL和Redis数据库、Celery消息队列、SparkSQL和SparkMlib分析服务,使用kmeans和随机森林算法分析网络服务数据;数据分为完整数据和正常数据。每天通过自动运行计时作业从完整数据中导入正常数据,用于算法的模型训练。使用芹菜将(指定的时间段)正常样本批量导入数据库defadd_normal

基于Django Restframework和Spark的异常检测系统,数据库为MySQL、Redis, 消息队列为Celery,分析服务为Spark SQL和Spark Mllib,使用kmeans和随机森林算法对网络服务数据进行分析;数据分为全量数据和正常数据,每天通过自动跑定时job从全量数据中导入正常数据供算法做模型训练。

使用celery批量导入(指定时间段)正常样本到数据库

def add_normal_cat_data(data):
    """
    构建数据model  用yield每次返回1000条数据
    :param data
    :return:
    """
    tmp_cat_normal_models = []

    for cat_data in data:
        response_time = cat_data.get('response_time')
        request_count = cat_data.get('request_count') or 1
        fail_count = cat_data.get('fail_count') or 1
        cat_data['id'] = str(uuid4())
        if response_time < 1.2 and (fail_count / request_count) < 0.2:
            cat_obj = CatNormalResource(
                **cat_data
            )
            tmp_cat_normal_models.append(cat_obj)

        if len(tmp_cat_normal_models) >= 1000:
            yield tmp_cat_normal_models
            tmp_cat_normal_models = []

    yield tmp_cat_normal_models


@celery_app.task
def insert_normal_cat_data(data):
    """
    使用异步,每次用bulk 批量插入 1000条数据
    :param data:
    :return:
    """
    try:
        for i in add_normal_cat_data(data):
            CatNormalResource.objects.bulk_create(i)
    except Exception as e:
        print(e)
        raise RsError('插入数据库失败')

通过contab定时job,每天自动导入正常样本

 1 def get_current_timestamp():
 2     """
 3     获取当前时间戳
 4     :return:
 5     """
 6     return int(time.time()) * 1000
 7 
 8 
 9 def convert_datetime_to_timestamp(dtime):
10     """
11     把datetime转换为时间戳
12     :param datetime:
13     :return:
14     """
15     timestamp = time.mktime(dtime.timetuple())
16     return int(timestamp) * 1000
17 
18 
19 def get_cache_cat_data(start_time, end_time, force=False):
20     """
21     获取指定时间段的cat数据
22     :param start_time:
23     :param end_time:
24     :return:
25     """
26     key = 'GET_CAT_RES_DATA_{0}_TO_{1}'.format(
27         start_time, end_time
28     )
29     content = cache.get(key)
30     if force or not content:
31         content = get_cat_res_data(start_time, end_time)
32         if content:
33             cache.set(key, content, timeout=CACHE_TIMEOUT_DEFAULT)
34 
35     return content
36 
37 
38 def add_normal_cat_data(data):
39     """
40     构建数据model  用yield每次返回1000条数据
41     :param data
42     :return:
43     """
44     tmp_cat_normal_models = []
45 
46     for cat_data in data:
47         response_time = cat_data.get('response_time')
48         request_count = cat_data.get('request_count') or 1
49         fail_count = cat_data.get('fail_count') or 1
50         cat_data['id'] = str(uuid4())
51         if response_time < 1.2 and (fail_count / request_count) < 0.2:
52             cat_obj = CatNormalResource(
53                 **cat_data
54             )
55             tmp_cat_normal_models.append(cat_obj)
56 
57         if len(tmp_cat_normal_models) >= 1000:
58             yield tmp_cat_normal_models
59             tmp_cat_normal_models = []
60 
61     yield tmp_cat_normal_models
62 
63 
64 @celery_app.task
65 def insert_normal_cat_data(data):
66     """
67     使用异步,每次用bulk 批量插入 1000条数据
68     :param data:
69     :return:
70     """
71     try:
72         for i in add_normal_cat_data(data):
73             CatNormalResource.objects.bulk_create(i)
74     except Exception as e:
75         print(e)
76         raise RsError('插入数据库失败')
77 
78 
79 def insert_normal_cat_job():
80     """
81     定时导入前一天的正常数据
82     :return:
83     """
84     logger.info('insert_normal_cat_job  ....')
85     dt_time = datetime.datetime.now() + datetime.timedelta(days=-1)
86     start_time = convert_datetime_to_timestamp(dt_time)
87     end_time = get_current_timestamp()
88     data = get_cache_cat_data(start_time, end_time)
89     insert_normal_cat_data.delay(data)

SparkSQL读取指定时间段数据,使用Kmeans预测新数据异常

 1 class SparkAnomaly(object):
 2     def __init__(self, appid, start_time, end_time):
 3         self.appid = appid
 4         self.start_time = start_time
 5         self.end_time = end_time
 6         self.spark_sql = SparkSql()
 7         self.cat_res = self.spark_sql.load_table_dataframe('cat_resource')
 8         self.cat_normal_res = self.spark_sql.load_table_dataframe(
 9             'cat_normal_resource'
10         )
11         self.filter_str = "appid = {0} " 
12                           "and create_time >= {1} " 
13                           "and update_time <= {2}".format(
14             self.appid, self.start_time, self.end_time,
15         )
16         self.model_filter_str = "appid = {0}".format(self.appid)
17 
18     def get_kmeans_model(self):
19         """
20         得到kmeans聚类模型
21         :return:
22         """
23         df = self.cat_normal_res.filter(self.model_filter_str)
24         parsed_data_rdd = df.rdd.map(lambda x: array([x[4], x[5], x[6]]))
25 
26         # 建立聚类模型
27         clusters = KMeans.train(
28             parsed_data_rdd, 3,
29             maxIterations=10,
30             initializationMode="random"
31         )
32 
33         return clusters
34 
35     def get_kmeans_predict(self):
36         """
37         获取appid指定时间段的预测结果
38         :return:
39         """
40         df = self.cat_res.filter(self.filter_str)
41         parsed_data_rdd = df.rdd.map(lambda x: array([x[4], x[5], x[6]]))
42         clusters = self.get_kmeans_model()
43         predict_result = clusters.predict(parsed_data_rdd)
44         return predict_result.collect()
45 
46 
47 def get_kmeans_result(appid, start_time, end_time):
48     """
49     获取appid指定时间段的cat数据
50     :param appid:
51     :param start_time:
52     :param end_time:
53     :return:
54     """
55     cat_result_obj = CatResultData.objects.filter(
56         appid=appid,
57         start_time=start_time,
58         end_time=end_time,
59         algorithm_name="kmeans"
60     ).first()
61     if not cat_result_obj:
62         arg_result = SparkAnomaly(appid, start_time, end_time)
63         content = arg_result.get_kmeans_predict()
64         cat_result_obj = CatResultData.objects.create(
65             appid=appid,
66             start_time=start_time,
67             end_time=end_time,
68             algorithm_name="kmeans",
69             result_data=content
70         )
71     ser_data = CatResultDataSerializer(cat_result_obj).data
72     ser_data['result_data'] = json.loads(ser_data['result_data'])
73     return ser_data

以上代码为系统的部分代码,详细代码请见我的github  https://github.com/a342058040/network_anomaly_detection

免责声明:文章转载自《基于PySpark的网络服务异常检测系统 (四) Mysql与SparkSQL对接同步数据 kmeans算法计算预测异常》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇Linux 压缩与解压命令Djiango 中间件下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

如何解决在Linux CLI终端界面中汉字方块乱码

解决Linux 纯命令界面下中文是方块乱码的问题  最近在学习Linux安全并给磁盘LUKS加密时,发现在telinit 1模式下,原本正常的中文字符均变成了方块乱码(如下图),这使得我们很难晓得命令的执行结果究竟是对还是错,为后续工作增加了一定的困难,磨刀不误砍柴工,那就先解决这个小问题。 ( 在纯命令界面下,中文变成方块乱码:) ( 在图像界面中却)...

达梦数据库:第一章:MySQL数据库与达梦数据库的区别

达梦数据库管理系统是达梦公司推出的具有完全自主知识产权的高性能数据库管理系统,简称DM,它具有如下特点: 1、通用性达梦数据库管理系统兼容多种硬件体系,可运行于X86、X64、SPARC、POWER等硬件体系之上。DM各种平台上的数据存储结构和消息通信结构完全一致,使得DM各种组件在不同的硬件平台上具有一致的使用特性。达梦数据库管理系统产品实现了平台无关性...

SQL语法

1、什么是 SQL? SQL 指结构化查询语言 SQL 使我们有能力访问数据库 SQL 是一种 ANSI 的标准计算机语言 编者注:ANSI,美国国家标准化组织 2、RDBMS RDBMS 指的是关系型数据库管理系统。 RDBMS 是 SQL 的基础,同样也是所有现代数据库系统的基础,比如 MS SQL Server, IBM DB2, Oracle,...

常用MySQL操作

常用MySQL操作 更改MySQL数据库root的密码 将绝对路径加入环境变量并设置开机启动 # PATH=$PATH:/usr/local/mysql/bin # echo "PATH=$PATH:/usr/local/mysql/bin" >> /etc/profile # source /etc/profile 给root用户设定密码 #...

Navicat for MySQL下载、安装与破解

一:下载Navicat for MySQL   进入 Navicat for MySQL下载 ,根据需要选择下载的版本,我选择的是Windows 64bit,任意选择一个镜像地址下载。          二:安装Navicat for MySQL   运行 → 下一步 → 点击“我同意” → 选择安装路径 → 保留默认,下一步 → 选择是否创建桌面图标,建...

MySql/Oracle树形结构查询

Oracle树形结构递归查询 在Oracle中,对于树形查询可以使用start with ... connect by  select * from treeTable start with connect by id = prior parent_id; 若将一个树状结构存储在一张表里,需要在表中存入两个字段ID和PARENTID,表示每一条记录的p...