pymysql 线程安全pymysqlpool

摘要:
simpletest.importloggingimportstringmportthreadingimportpandasaspdimportrandomfrompymysqlpoolimportConnectionPoolconfig={'pool_name':'password':'数据库':'pool_size_boundary':#'max_pool_size':
# -*-coding: utf-8-*-
# Author : Christopher Lee
# License: Apache License
# File   : test_example.py
# Date   : 2017-06-18 01-23
# Version: 0.0.1
# Description: simple test.


import logging
import string
import threading

import pandas as pd
import random

from pymysqlpool import ConnectionPool

config = {
    'pool_name': 'test',
    'host': 'localhost',
    'port': 3306,
    'user': 'root',
    'password': 'chris',
    'database': 'test',
    'pool_resize_boundary': 50,
    'enable_auto_resize': True,
    # 'max_pool_size': 10
}

logging.basicConfig(format='[%(asctime)s][%(name)s][%(module)s.%(lineno)d][%(levelname)s] %(message)s',
                    datefmt='%Y-%m-%d %H:%M:%S',
                    level=logging.DEBUG)


def connection_pool():
    # Return a connection pool instance
    pool = ConnectionPool(**config)
    # pool.connect()
    return pool


def test_pool_cursor(cursor_obj=None):
    cursor_obj = cursor_obj or connection_pool().cursor()
    with cursor_obj as cursor:
        print('Truncate table user')
        cursor.execute('TRUNCATE user')

        print('Insert one record')
        result = cursor.execute('INSERT INTO user (name, age) VALUES (%s, %s)', ('Jerry', 20))
        print(result, cursor.lastrowid)

        print('Insert multiple records')
        users = [(name, age) for name in ['Jacky', 'Mary', 'Micheal'] for age in range(10, 15)]
        result = cursor.executemany('INSERT INTO user (name, age) VALUES (%s, %s)', users)
        print(result)

        print('View items in table user')
        cursor.execute('SELECT * FROM user')
        for user in cursor:
            print(user)

        print('Update the name of one user in the table')
        cursor.execute('UPDATE user SET name="Chris", age=29 WHERE id = 16')
        cursor.execute('SELECT * FROM user ORDER BY id DESC LIMIT 1')
        print(cursor.fetchone())

        print('Delete the last record')
        cursor.execute('DELETE FROM user WHERE id = 16')


def test_pool_connection():
    with connection_pool().connection(autocommit=True) as conn:
        test_pool_cursor(conn.cursor())


def test_with_pandas():
    with connection_pool().connection() as conn:
        df = pd.read_sql('SELECT * FROM user', conn)
        print(df)


def delete_users():
    with connection_pool().cursor() as cursor:
        cursor.execute('TRUNCATE user')


def add_users(users, conn):
    def execute(c):
        c.cursor().executemany('INSERT INTO user (name, age) VALUES (%s, %s)', users)
        c.commit()

    if conn:
        execute(conn)
        return
    with connection_pool().connection() as conn:
        execute(conn)


def add_user(user, conn=None):
    def execute(c):
        c.cursor().execute('INSERT INTO user (name, age) VALUES (%s, %s)', user)
        c.commit()

    if conn:
        execute(conn)
        return
    with connection_pool().connection() as conn:
        execute(conn)


def list_users():
    with connection_pool().cursor() as cursor:
        cursor.execute('SELECT * FROM user ORDER BY id DESC LIMIT 5')
        print('...')
        for x in sorted(cursor, key=lambda d: d['id']):
            print(x)


def random_user():
    name = "".join(random.sample(string.ascii_lowercase, random.randint(4, 10))).capitalize()
    age = random.randint(10, 40)
    return name, age


def worker(id_, batch_size=1, explicit_conn=True):
    print('[{}] Worker started...'.format(id_))

    def do(conn=None):
        for _ in range(batch_size):
            add_user(random_user(), conn)

    if not explicit_conn:
        do()
        return

    with connection_pool().connection() as c:
        do(c)

    print('[{}] Worker finished...'.format(id_))


def bulk_worker(id_, batch_size=1, explicit_conn=True):
    print('[{}] Bulk worker started...'.format(id_))

    def do(conn=None):
        add_users([random_user() for _ in range(batch_size)], conn)
        time.sleep(3)

    if not explicit_conn:
        do()
        return

    with connection_pool().connection() as c:
        do(c)

    print('[{}] Worker finished...'.format(id_))


def test_with_single_thread(batch_number, batch_size, explicit_conn=False, bulk_insert=False):
    delete_users()
    wk = worker if not bulk_insert else bulk_worker
    for i in range(batch_number):
        wk(i, batch_size, explicit_conn)
    list_users()


def test_with_multi_threads(batch_number=1, batch_size=1000, explicit_conn=False, bulk_insert=False):
    delete_users()

    wk = worker if not bulk_insert else bulk_worker

    threads = []
    for i in range(batch_number):
        t = threading.Thread(target=wk, args=(i, batch_size, explicit_conn))
        threads.append(t)
        t.start()

    [t.join() for t in threads]
    list_users()


if __name__ == '__main__':
    import time

    start = time.perf_counter()
    test_pool_cursor()
    test_pool_connection()

    test_with_pandas()
    test_with_multi_threads(20, 10, True, bulk_insert=True)
    test_with_single_thread(1, 10, True, bulk_insert=True)
    elapsed = time.perf_counter() - start
    print('Elapsed time is: "{}"'.format(elapsed))

  

免责声明:文章转载自《pymysql 线程安全pymysqlpool》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇Alpha冲刺——总结随笔GeoServer源码解析和扩展 (四)文件系统下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

hdu 3579 Hello Kiki (中国剩余定理)

Hello Kiki Time Limit: 2000/1000 MS (Java/Others)    Memory Limit: 32768/32768 K (Java/Others)Total Submission(s): 1943    Accepted Submission(s): 693 Problem Description One day I...

python 绘图 异常点绘制使用 ax.plot(abnormal_points['ds'], abnormal_points['y'], "rX", label='abnormal points')

from matplotlib import pyplot as plt def my_plot(title, m, fcst, ax=None, uncertainty=True, plot_cap=True, xlabel='ds', ylabel='y', abnormal_points=None ): """Plo...

第一个极小的机器学习的应用

  现在给出一个Web统计信息,他们存储着每小时的访问次数。每一行包含连续的小时和信息,以及该小时Web的访问次数。现在要解决的问题是,估计在何时访问量达到基础设施的极限。极限数据是每小时100000次访问。 1.读取数据: # 获取数据 filepath = r'C:UsersTDDesktopdataMachine Learning1400OS_01_...

SQL试题集(三)

1.用一条SQL语句 查询出每门课都大于80分的学生姓名  name  kecheng  fenshu 张三   语文    81 张三   数学    75 李四   语文    76 李四   数学    90 王五   语文    81 王五   数学    100 王五   英语    90 A: select distinct name f...

python 调用百度ORC进行文字识别

最近大神推荐一个新的东西,orc文字识别,是免费的,感觉特别不错,所以打算自己弄来玩玩。 首先要自己上百度申请一个账号https://cloud.baidu.com/product/ocr.html,登陆百度云,然后添加一个应用。创建好应用以后,就跟图片中一样,其中的API KEY 和 Secret Key 要记住,等会调用的时候要用到。 创建好应用以后,首...

学习Python:StringIO与cStringIO

StringIO的行为与file对象非常像,但它不是磁盘上文件,而是一个内存里的“文件”,我们可以将操作磁盘文件那样来操作StringIO。一个简单的例子,让你对StringIO有一个感性的认识: from StringIO import StringIO # 生成一个StringIO对象,当前缓冲区内容为ABCDEF s = Strin...