Python-Dataframe数据清洗之0值、缺失值、重复数据(以多列去重)、不符合累计递增趋势的异常数据(跳大值和跳小值,兼噪声值)清洗

摘要:
=无均值_数据1[i-5]!=无):m=min((value_data1[i-1]-value_data1[i-2]),(value_data 1[i-2]-value_data1[i-3])(value_dta1[i-3]-value_data1-[i-4]),值_data1[i-4]-value_data 1[i-5]))value_data1]=value_data1]+mif˃value_data1[i+2]和value_data1[i-4]!=非平均值_data1[i-5]!

 起步者的苦苦挣扎......

方法一(单个ID去清洗):这个代码和上面的差不多,只是它进行的是单个递增趋势逐个进行清洗,,总的来说对于常见的异常情况有不错效果

    缺点:效率比较低,半自动化,需要清洗多个ID的异常数据时,手动重复的动作比较多

Python-Dataframe数据清洗之0值、缺失值、重复数据(以多列去重)、不符合累计递增趋势的异常数据(跳大值和跳小值,兼噪声值)清洗第1张Python-Dataframe数据清洗之0值、缺失值、重复数据(以多列去重)、不符合累计递增趋势的异常数据(跳大值和跳小值,兼噪声值)清洗第2张
import cx_Oracle
import pandas as pd
import numpy as np
import time
from sqlalchemy import create_engine
from sqlalchemy.dialects.oracle import 
    BFILE, BLOB, CHAR, CLOB, DATE, 
    DOUBLE_PRECISION, FLOAT, INTERVAL, LONG, NCLOB, 
    NUMBER, NVARCHAR, NVARCHAR2, RAW, TIMESTAMP, VARCHAR, 
    VARCHAR2
import matplotlib.pyplot as plt
import os


os.environ['NLS_LANG']='SIMPLIFIED CHINESE_CHINA.UTF8'

def connectToOracle():
    conn=create_engine('oracle+cx_oracle://用户名:密码@IP:端口/实例名')      #conn连接器
    sql="select MONITOR_ID,COLLECT_DATE,COLLECT_TIME,VALUE_DATA,UPLOAD,LESSEE_ID " 
        "from 表名 " 
        "where MONITOR_ID='锦绣家园LJLL1' and to_char(COLLECT_DATE,'yyyy-mm')='2020-04'" 
        "order by COLLECT_TIME"
    try:
        data=pd.read_sql(sql,conn)
        print("连接成功")
    except:
        print("connecterror")
    conn.dispose()      #关闭连接器

    MDH_Data_Cleansing(data)


def MDH_Data_Cleansing(data):
    print(data.isnull())
    data.info()     #查看数据基本信息
    value_data = np.array(data['value_data'])
    print(data)
    # print(value_data)

    # 对data数组中不符合单调递增趋势的异常数据进行清洗
    n = len(value_data)
    print(n)
    for i in range(1, n - 2):
        if (value_data1[i] < value_data1[i - 1] and value_data1[i - 4] != None and value_data1[i - 5] != None):
            m = min((value_data1[i - 1] - value_data1[i - 2]), (value_data1[i - 2] - value_data1[i - 3]), 
                    (value_data1[i - 3] - value_data1[i - 4]), (value_data1[i - 4] - value_data1[i - 5]))
            value_data1[i] = value_data1[i - 1] + m
        if (value_data1[i] > value_data1[i - 1] and value_data1[i] > value_data1[i + 1] and value_data1[i] > value_data1[i + 2] 
                and value_data1[i - 4] != None and value_data1[i - 5] != None):
            m = min((value_data1[i - 1] - value_data1[i - 2]), (value_data1[i - 2] - value_data1[i - 3]), 
                    (value_data1[i - 3] - value_data1[i - 4]), (value_data1[i - 4] - value_data1[i - 5]))
            value_data1[i] = value_data1[i - 1] + m
        i += 1
    print(1)
    data.drop('value_data', axis=1, inplace=True)  # 删除列value_data
    data['value_data']=value_data       #清洗后列值替换
    print(data)
    data.dropna(subset=['value_data'], axis=0, inplace=True)  # 删除列VALUE_DATA存在缺失值的所在行

    #箱线图分析法检测噪声值
    print(data['value_data'].describe(percentiles=[.25, .75], include=['object', 'float64']))  # describe
    distance_data=data['value_data'].quantile(0.75)-data['value_data'].quantile(0.25)       #四分位距,即箱
    top_data=data['value_data'].quantile(0.75)+1.5*distance_data        #箱线的上限
    bottom_data=data['value_data'].quantile(0.25)-1.5*distance_data     #箱线的下限
    count_data=(data['value_data']>top_data) | (data['value_data']<bottom_data)     #噪声值

    index_toarray = np.array(data[count_data == True].index)    # 取出异常值索引
    print("正常值 vs 噪声值个数:
",count_data.value_counts(),"
噪声值的行索引:",index_toarray)        #打印噪声值数
    #噪声值处理
    data.loc[index_toarray,'value_data']=data['value_data'].median().round(3)      #中位数替换
    print(data)
    '''
    data.drop_duplicates(subset=['new_column'],keep='first',inplace=True)       #根据新列去进行去重
    index_toarray = np.array(data_demo[count_data == False].index)  # 取出异常值 索引
    print("正常值(True) vs 噪声值个数(False):", count_data.value_counts(), "噪声值的行索引:", index_toarray)  # 打印噪声值数和索引
    data.dropna(subset=['value_data'], axis=0, inplace=True)        # 删除列Dataframe数据类型中value_data存在缺失值的所在行,以保证数据的可靠性
    
    '''
    data.drop_duplicates(subset=['monitor_id','collect_date','value_data'],keep='first',inplace=True)      #根据多列进行去重
    MDH_Dataframe_toOracle(data)


def mapping_data_types(data):       #实现Dataframe字段的类型转换(必转,否则就是给自己挖坑,不要问我是怎么知道的)
    dtypedict = {}
    for i, j in zip(data.columns, data.dtypes):
        if "object" in str(j):
            dtypedict.update({i: VARCHAR(256)})
        if "int" in str(j):
            dtypedict.update({i: NUMBER(12,2)})
        if "date" in str(j):
            dtypedict.update({i: DATE(19)})
    return dtypedict


def MDH_Dataframe_toOracle(data):       #将Dataframe数据写入ORACLE数据库
    from sqlalchemy import types, create_engine
    conn=create_engine('oracle+cx_oracle://用户名:密码@IP:端口/实例名',encoding='utf-8',echo=True)    #连接器
    from sqlalchemy.dialects.oracle import 
        BFILE, BLOB, CHAR, CLOB, DATE, 
        DOUBLE_PRECISION, FLOAT, INTERVAL, LONG, NCLOB, 
        NUMBER, NVARCHAR, NVARCHAR2, RAW, TIMESTAMP, VARCHAR, 
        VARCHAR2
    #print(conn)
    dtypedict = mapping_data_types(data)    #映射数据类型

    tableName='monitor_data_his_cleanaftertb'
    data.to_sql(tableName,con=conn,if_exists='append',dtype=dtypedict,chunksize=None,index=False)
    conn.dispose()



if __name__ == '__main__':
    pd.set_option('display.max_columns', None)  # 控制台完整显示列
    pd.set_option('display.max_rows', 1000)  # 行数
    pd.set_option('display.width',500)  # 列数
    pd.set_option('max.colwidth',100)   #列宽

    connectToOracle()
    MDH_Dataframe_toOracle(data)
View Code

方法二(多个ID去清洗):该算法存在缺陷,仅适用于常见的数据异常情况,对于多个连续并且任意的异常数据或噪声值无法处理。利用了普通循环判断+分箱法进行数据检测和平滑,数据量越大,预期的清洗效果越好。。。

Python-Dataframe数据清洗之0值、缺失值、重复数据(以多列去重)、不符合累计递增趋势的异常数据(跳大值和跳小值,兼噪声值)清洗第3张Python-Dataframe数据清洗之0值、缺失值、重复数据(以多列去重)、不符合累计递增趋势的异常数据(跳大值和跳小值,兼噪声值)清洗第4张
import cx_Oracle
import pandas as pd
import numpy as np
from sqlalchemy import create_engine
from sqlalchemy.dialects.oracle import 
    BFILE, BLOB, CHAR, CLOB, DATE, 
    DOUBLE_PRECISION, FLOAT, INTERVAL, LONG, NCLOB, 
    NUMBER, NVARCHAR, NVARCHAR2, RAW, TIMESTAMP, VARCHAR, 
    VARCHAR2
import matplotlib.pyplot as plt

import time
import os

os.environ['NLS_LANG']='SIMPLIFIED CHINESE_CHINA.UTF8'

#连接数据库、获取数据
def connectToOracle():
    conn=create_engine('oracle+cx_oracle://用户名:密码@IP:端口/实例名')      #conn连接器

    # #测试sql
    # sql="select MONITOR_ID,COLLECT_DATE,COLLECT_TIME,VALUE_DATA,UPLOAD,LESSEE_ID,rk " 
    #     "from (" 
    #         "select a.*,row_number() over(partition by MONITOR_ID order by COLLECT_TIME) rk " 
    #         "from MONITOR_DATA_HIS_clearn_test  a " 
    #     ")" 
    #     "where rk>=0"

    sql="select " 
            "datata.MONITOR_ID," 
            "datata.COLLECT_DATE," 
            "datata.COLLECT_TIME," 
            "datata.VALUE_DATA," 
            "datata.UPLOAD," 
            "datata.LESSEE_ID," 
            "datata.rk" 
        " from (" 
        "select MONITOR_ID, " 
            "to_date(substr(to_char(COLLECT_DATE,'yyyy-mm-dd hh:mi:ss'),0,10),'yyyy-mm-dd') COLLECT_DATE," 
            "COLLECT_DATE COLLECT_TIME," 
            "round(VALUE_DATA,2) VALUE_DATA," 
            "UPLOAD," 
            "LESSEE_ID," 
            "row_number() over(partition by MONITOR_ID order by COLLECT_DATE) rk " 
        "from 表名" 
        "where MONITOR_ID in(" 
            "select MONITOR_ID from (" 
                "select b.MONITOR_ID MONITOR_ID " 
                "from (" 
                    "select distinct MONITOR_ID from monitor_data_relation t1 join monitor_item t2 " 
                    "on t1.item_id=t2.item_id " 
                    "where item_name!='负累计流量' and item_name!='反向累计流量' and item_name like '%累计流量%'" 
                ") a " 
            "left join" 
                "(SELECT distinct MONITOR_ID FROM 表名) b " 
            "on a.MONITOR_ID=b.MONITOR_ID" 
            ") where MONITOR_ID is not null " 
        ") " 
    ") datata " 
    "where rk>=1 and VALUE_DATA!=0"

    data=pd.read_sql(sql,conn)
    print("连接成功")
    conn.dispose()      #关闭连接器

    MDH_Data_Cleansing(data)

#数据清洗
def MDH_Data_Cleansing(data):
    print("待清洗数据:
", data)
    #print(data.isnull())
    rk=np.array(data['rk'])     #声明所有递增趋势序号的数组,rk数据模板:[1,2,3,1,2,1,2,3,4···]
    #print(rk)
    m=len(rk)

    #获取每个递增趋势数据的头节点的行索引,存进节点数组arraySwitch
    arraySwitch = []
    for j in range(1,m-1):
        if(rk[j] == 1):
            arraySwitch.append(j-1)
            arraySwitch.append(j)
        j += 1

    arraySwitch.insert(0,0)
    arraySwitch.append(m-1)       #追加最后一个递增趋势数据的尾节点的行索引(即所有递增趋势的数据的总数-1)
    num=len(arraySwitch)
    print("节点数组个数num=",num)
    print("节点数组arraySwitch=",arraySwitch)

    # 对data数组中不符合单调递增趋势的异常数据进行清洗
    dataResult = pd.DataFrame(columns=['value_data'])  # 用于存储最终清洗和降噪完成,合并后的Dframe数据
    s = 0
    while s < num - 1:
        value_data1 = np.array(data.loc[arraySwitch[s]:arraySwitch[s + 1], 'value_data'])  # 循环逐一取出递增趋势数据
        # value_data2=value_data1[0:-1]       #删除每一个取出的递增趋势数据的脏数据(每个递增趋势数据的最后一个值,但最后一个递增趋势数据是没有脏数据的)
        # print(value_data1)
        n = len(value_data1)  # 获取单个递增趋势数据的个数

        # 清洗
        for i in range(1, n - 2):
            if (value_data1[i] < value_data1[i - 1] and value_data1[i - 4] != None and value_data1[i - 5] != None):
                m = min((value_data1[i - 1] - value_data1[i - 2]),(value_data1[i - 2] - value_data1[i - 3]), 
                        (value_data1[i - 3] - value_data1[i - 4]),(value_data1[i - 4] - value_data1[i - 5]))
                value_data1[i] = value_data1[i - 1] + m
            if (value_data1[i] > value_data1[i - 1] and value_data1[i] > value_data1[i + 1] and value_data1[i] > value_data1[i + 2] 
                    and value_data1[i - 4] != None and value_data1[i - 5] != None):
                m = min((value_data1[i - 1] - value_data1[i - 2]), (value_data1[i - 2] - value_data1[i - 3]), 
                        (value_data1[i - 3] - value_data1[i - 4]),(value_data1[i - 4] - value_data1[i - 5]))
                value_data1[i] = value_data1[i - 1] + m
            i += 1
        data_demo = pd.DataFrame(value_data1, columns=['value_data'])
        print("

该递增趋势数据,清洗后/降噪前:
", data_demo)

        # 箱线图分析法检测噪声值
        print("递增趋势数据数据量为", n, ",描述信息:", "
",
              data_demo['value_data'].describe(percentiles=[.25, .75], include=['object', 'float64']))  # describe
        distance_data = data_demo['value_data'].quantile(0.75) - data_demo['value_data'].quantile(0.25)  # 四分位距,即箱
        top_data = data_demo['value_data'].quantile(0.75) + 1.5 * distance_data  # 箱线的上限
        bottom_data = data_demo['value_data'].quantile(0.25) - 1.5 * distance_data  # 箱线的下限
        count_data = ((data_demo['value_data'] >= bottom_data) | (data_demo['value_data'] <= top_data))  # 噪声值

        index_toarray = np.array(data_demo[count_data == False].index)  # 取出异常值索引
        print("正常值(True) vs 噪声值个数(False):
", count_data.value_counts(), "噪声值的行索引:", index_toarray)  # 打印噪声值数和索引
        # 噪声值处理
        data_demo.loc[index_toarray, 'value_data'] = data_demo['value_data'].median().round(3)  # 中位数替换
        print("降噪后:
", data_demo)
        dataResult = dataResult.append(data_demo)  # 循环逐一合并递增趋势数组,存储于Dataframe表dataResult
        s += 2
    '''
    data.drop_duplicates(subset=['new_column'],keep='first',inplace=True)       #根据新列去进行去重
    data.dropna(subset=['value_data'], axis=0, inplace=True)        # 删除列Dataframe数据类型中value_data存在缺失值的所在行,以保证数据的可靠性
    
    '''
    dataResult.index=range(len(dataResult))     #重建Dataframe索引
    data.drop('rk',axis=1,inplace=True)     #删除列rk
    #print(data)
    data.drop('value_data', axis=1, inplace=True)  # 删除列value_data
    #print(data)
    data=pd.concat([data,dataResult],axis=1)     #合并两个Dataframe
    #print(data)
    data.dropna(subset=['value_data'], axis=0, inplace=True)  # 删除列value_data存在缺失值的所在行
    #print(data)
    data.drop_duplicates(subset=['monitor_id', 'collect_date', 'value_data'], keep='first', inplace=True)  # 根据多列进行去重
    print(data)

    MDH_Dataframe_toOracle(data)

#转类型
def mapping_data_types(data):       #实现Dataframe字段的类型转换(必转,否则就是给自己挖坑,不要问我是怎么知道的)
    dtypedict = {}
    for i, j in zip(data.columns, data.dtypes):
        if "object" in str(j):
            dtypedict.update({i: VARCHAR(256)})
        if "int" in str(j):
            dtypedict.update({i: NUMBER(12,2)})
        if "date" in str(j):
            dtypedict.update({i: DATE(19)})
    return dtypedict

#写入数据库
def MDH_Dataframe_toOracle(data):       #将Dataframe数据写入ORACLE数据库
    from sqlalchemy import types, create_engine
    from sqlalchemy.dialects.oracle import 
        BFILE, BLOB, CHAR, CLOB, DATE, 
        DOUBLE_PRECISION, FLOAT, INTERVAL, LONG, NCLOB, 
        NUMBER, NVARCHAR, NVARCHAR2, RAW, TIMESTAMP, VARCHAR, 
        VARCHAR2
    conn = create_engine('oracle+cx_oracle://用户名:密码@IP:端口/实例名', encoding='utf-8',echo=True)  # 连接器
    #print(conn)
    dtypedict = mapping_data_types(data)    #调用转类型方法mapping_data_types,映射数据类型

    tableName='monitor_data_his_cleanaftertb'
    data.to_sql(tableName,con=conn,if_exists='append',dtype=dtypedict,chunksize=None,index=False)
    conn.dispose()



if __name__ == '__main__':
    pd.set_option('display.max_columns', None)  # 控制台完整显示列
    pd.set_option('display.max_rows', 100)  # 行数
    pd.set_option('display.width',500)  # 列数
    pd.set_option('max.colwidth',100)   #列宽

    time_start = time.time()
    connectToOracle()
    time_end = time.time()
    print("执行时间(分钟):", (time_end - time_start) / 60)
View Code

免责声明:文章转载自《Python-Dataframe数据清洗之0值、缺失值、重复数据(以多列去重)、不符合累计递增趋势的异常数据(跳大值和跳小值,兼噪声值)清洗》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇iOS 杂笔-如何解决tableview显示错乱问题解决Flutter boost模块化加入到原有android项目后,首次加载过慢的问题下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

ExtJs之Ext.data.Store

因为上次用过Ext.data.Store,觉得挺重要的,  故转载了一篇http://blog.csdn.net/davidxj/archive/2009/04/23/4103647.aspx Ext.data.Store的基本用法在使用之前,首先要创建一个Ext.data.Store的实例,如下面的代码所示。 var data = [      ...

gitlab 笔记

#http://www.jianshu.com/p/060e7223e211?open_source=weibo_search docker stop gitlabdocker stop redisdocker stop postgresqldocker rm gitlabdocker rm redisdocker rm postgresql docker...

Mysql主从同步在线实施步骤【适合大数据库从库配置】

Mysql主从同步在线实施步骤【适合大数据库从库配置】     MySQL的主从搭建大家有很多种方式,传统的mysqldump方式是很多人的选择之一,但比较适合在新实例中实施,对于较大的数据库则存在停机等不可接受的问题,所以该方式并非理想的选择。使用innobackupex 则可以快速轻松的构建或修复mysql主从架构,该方式的好处是对主库无需备份期间导致...

Java NIO学习系列七:Path、Files、AsynchronousFileChannel

相对于标准Java IO中通过File来指向文件和目录,Java NIO中提供了更丰富的类来支持对文件和目录的操作,不仅仅支持更多操作,还支持诸如异步读写等特性,本文我们就来学习一些Java NIO提供的和文件相关的类: Java NIO Path Java NIO Files Java NIO AsynchronousFileChannel 总结 1....

Django项目:CMDB(服务器硬件资产自动采集系统)--11--07CMDB文件模式测试采集硬件数据

1 #settings.py 2 # ————————01CMDB获取服务器基本信息———————— 3 import os 4 5 BASEDIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))##当前路径 6 7 # 采集资产的方式,选项有:a...

Chrome浏览器自定义设置个人信息存储路径

序言 Chrome浏览器很好用,感觉也很快,但是,也是有那么几个小瑕疵的。例如,Chrome浏览器无法设置安装路径,只能安装在默认的C盘,个人信息默认放在C盘,详细路径如下: C:\Users\XXXX\AppData\Local\Google\Chrome\User Data\Default Copy 对于我这种对C盘有洁癖的人来说,不能忍受啊。 之前我...