多线程实现数据库的并发操作

摘要:
在Java中,程序需要操作数据库。操作数据的第一件事是获取数据库的连接对象。使用多个线程将数据导入数据库将加快操作进度。但是,多个线程共享连接对象是不安全的,因为您可以在Java中使用ThreadLocal为每个线程保存一个连接对象。代码如下:packagecom.quar.innovation。数据库;导入java.sql.连接

在Java中,程序需要操作数据库,操作数据首要事就是要获得数据库的Connection对象,利用多线程对数据导入数据库中将会加快操作进度,但是多个线程共享Connection对象,是不安全的,因为可以利用Java中的ThreadLocal为每个线程保存一个Connection对象,代码如下:

package com.quar.innovation.db;

import java.sql.Connection;
import java.sql.DriverManager;

public class ConnnectionManager {

	private static final ThreadLocal<Connection> connectionHolder = new ThreadLocal<Connection>();
	
	private static final String BETADBURL = "jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=utf8&autoReconnect=true&user=root&password=root";

	
	public static Connection getConnectionFromThreadLocal() {
		Connection conn = connectionHolder.get();
		try {
			if (conn == null || conn.isClosed()) {
				Connection con = ConnnectionManager.getConnection();
				connectionHolder.set(con);
				System.out.println("[Thread]" + Thread.currentThread().getName());
				return con;
			}
			return conn;
		} catch (Exception e) {
			System.out.println("[ThreadLocal Get Connection Error]" + e.getMessage());
		}
		return null;
		
		
	}
	
	public static Connection getConnection() {
		Connection conn = null;
		try {
			Class.forName("com.mysql.jdbc.Driver");
			conn = (Connection) DriverManager.getConnection(BETADBURL);
		} catch (Exception e) {
			System.out.println("[Get Connection Error]" + e.getMessage());
		}
		return conn;
	}
}

通过ThreadLocal就可以为每个线程保留一份Connection对象,利用Java的ThreadPoolExecutor启动线程池,完成数据库操作,完整代码如下:

public class QunarThreadPoolExecutor extends ThreadPoolExecutor {

    // 记录每个线程执行任务开始时间
    private ThreadLocal<Long> start = new ThreadLocal<Long>();
    
    // 记录所有任务完成使用的时间
    private AtomicLong totals = new AtomicLong();
    
    // 记录线程池完成的任务数
    private AtomicInteger tasks = new AtomicInteger();
	
	public QunarThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
			BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
		super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
	}

	public QunarThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
			BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
		super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
	}

	public QunarThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
			BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
		super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
	}

	public QunarThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
			BlockingQueue<Runnable> workQueue) {
		super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
	}
	
	 /**
     * 每个线程在调用run方法之前调用该方法
     * */ 
    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
        start.set(System.currentTimeMillis());
    }

    /**
     * 每个线程在执行完run方法后调用该方法
     * */
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        tasks.incrementAndGet();
        totals.addAndGet(System.currentTimeMillis() - start.get());
    }

    @Override
    protected void terminated() {
        super.terminated();
        System.out.println("完成"+ tasks.get() +"个任务,平均耗时: [" + totals.get() / tasks.get() + "] ms");
    }


public class DataUpdater implements Runnable {

	private PreparedStatement pst;
	
	private List<UserProfileItem> userProfiles;
	
	private final String SQL = "insert into userprofile (`uid` ,`profile` , `logday`) VALUES (?, ? ,?) ON DUPLICATE KEY UPDATE `profile`= ? ";
	
	public DataUpdater(List<UserProfileItem> userProfiles) {
		this.userProfiles = userProfiles;
	}
	
	public void run() {
		try {
			pst = ConnnectionManager.getConnectionFromThreadLocal().prepareStatement(SQL);
			for (UserProfileItem userProfile : userProfiles) {
				if(userProfile.getUid() != null && !userProfile.getUid().isEmpty() && 
						userProfile.getProfile() != null && !userProfile.getProfile().isEmpty()) {
					pst.setString(1, userProfile.getUid());
					pst.setString(2, userProfile.getProfile());
					pst.setInt(3, userProfile.getLogday());
					pst.setString(4, userProfile.getProfile());
					pst.addBatch();
				}
			}
			pst.executeBatch();
		} catch (Exception e) {
			System.err.println("[SQL ERROR MESSAGE]" + e.getMessage());
		} finally {
			 close(pst);
		}
		
	}

	public void close(PreparedStatement pst) {
		if (pst != null) {
			try {
				pst.close();
			} catch (SQLException e) {
				System.err.println("[Close Statement Error]" + e.getMessage());
			}
		}
	}
}


public class UserProfileItem {

	private String uid;
	
	private String profile;
	
	private int logday;
	
	public UserProfileItem(String uid, String profile , int logday) {
		this.logday = logday;
		this.profile = profile;
		this.uid = uid;
	}

	public String getUid() {
		return uid;
	}

	public String getProfile() {
		return profile;
	}

	public int getLogday() {
		return logday;
	}
	
}

public class DataUpdaterMain {
	
	private LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
	
	private QunarThreadPoolExecutor qunarThreadPoolExecutor = new QunarThreadPoolExecutor(5, 8, 5, TimeUnit.MINUTES, queue);
	
	
	public void shutThreadPool(ThreadPoolExecutor executor) {
		if (executor != null) {
			executor.shutdown();
			try {
				if (!executor.awaitTermination(20 , TimeUnit.MINUTES)) {
					executor.shutdownNow();
				} 
			} catch (Exception e) {
				System.err.println("[ThreadPool Close Error]" + e.getMessage());
			}
			
		}
	}
	
	public void close(Reader reader) {
		if (reader != null) {
			try {
				reader.close();
			} catch (IOException e) {
				System.err.println("[Close Io Error]" + e.getMessage());
			}
		}
	}
	
	public void closeConnection(Connection conn , Statement st) {
		try {
			if (conn != null) {
				conn.close();
			}
			if (st != null) {
				conn.close();
			}
		} catch (Exception e) {
			System.err.println("[Close MySQL Error]" + e.getMessage());
		}
	}
	
	
	public boolean update(String file ,int logday) {
		long start = System.currentTimeMillis();
		BufferedReader br = null;
		int num = 0;
		try {
			br = new BufferedReader(new InputStreamReader(new FileInputStream(file)));
			String line = null;
			List<UserProfileItem> userProfiles = new LinkedList<UserProfileItem>();
			while ((line = br.readLine()) != null) {
				++num;
				String []items = line.split("	");
				if (items.length == 2) {
					String uid = items[0];
					String profile = items[1];
					userProfiles.add(new UserProfileItem(uid, profile, logday));
					if (userProfiles.size() >= 100) {
						qunarThreadPoolExecutor.execute(new DataUpdater(userProfiles));
						userProfiles = new LinkedList<UserProfileItem>();
					}
				} else {
					System.err.println("[Data Error]" + line);
				}
			}
			qunarThreadPoolExecutor.execute(new DataUpdater(userProfiles));;
		} catch (Exception e) {
			e.printStackTrace();
			System.err.println("[Read File Error]" + e.getMessage());
			return false;
		}  finally {
			System.err.println("[Update] take time " + (System.currentTimeMillis() - start) + ".ms");
			System.err.println("[Update] update item " + num);
			shutThreadPool(qunarThreadPoolExecutor);;
			close(br);
		}
		return true;
	}
	
	public static void main(String []args) {
		String file = "D:\workspaces\promotionwordData.log";
		int logday = Integer.parseInt("20150606");
		DataUpdaterMain dataUpdaterMain = new DataUpdaterMain();
		dataUpdaterMain.update(file, logday);
	}
}

免责声明:文章转载自《多线程实现数据库的并发操作》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇Jenkins使用手册及总结WPF/Silverlight HierarchicalDataTemplate 模版的使用下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

Mybatis操作Mysql批量更新的一个坑-&amp;amp;allowMultiQueries=true允许批量更新

前言           利用Mybatis批量更新或者批量插入,实际上即使Mybatis完美支持你的sql,你也得看看你操作的数据库是否完全支持,而同事,最近就遇到这样的一个坑! 问题         先带大家来看一段sql的配置:      1 <update id="updateAllAvailable"> 2 <foreach...

关系数据库和NoSQL结合使用:MySQL + MongoDB

Home Page作者使用一个案例来说明MySQL+MongoDB结合使用,发挥各自所长,并且认为他们互补性很强。当然,这其中不可避免引入DDD中的编程设计模式 Repository仓储模式,通过它能够将数据存储方式和应用分离开来,这样,我们的程序就不受限于任何存储方式,无论是NoSQL或关系数据库。这个案例是一个按效果付费Pay-for-use的分析案例...

MySQL-快速入门(11)用户管理

1、权限表 存储用户权限信息表主要有:user、db、host、tables_priv、columns_priv、procs_priv。 1》user表: 记录允许连接到服务器的账号信息,里面的权限是全局级别的。user表有42个字段,这些字段可以分为4类,分别是用户列、权限列、安全列、资源控制列。 2》db和host表 3》tables_priv和col...

Redis实现之数据库(一)

服务器中的数据库 Redis服务器将所有数据库都保存在服务器状态redis.h/redisServer结构体的db数组中,db数组的每个项都是一个redis.h/redisDb结构体,每个redisDb结构体代表一个数据库 redis.h struct redisServer { …… //一个数组,保存着服务器中所有数据库 redisD...

产品经理 数据分析

如果要解决一个问题,首先我们要准确地定义这个问题(按照上期所讲,这个需要有深刻的业务洞察能力),然后通过一系列的数据分析,定位原因,最后讨论并实施对策(即实现项目影响)。左右两个图对比,我们可以看到,数据分析的能力框架并不是天马行空想出来的,它是从解决问题的流程中提炼出来的。 无论是业务洞察还是数据分析,都是服务于原因定位。而当原因定位好之后,如果和你配合...

laravel中migration 数据迁移

简介 数据库迁移就像是数据库的版本控制,可以让你的团队轻松修改并共享应用程序的数据库结构。迁移通常与 Laravel 的数据库结构生成器配合使用,让你轻松地构建数据库结构。如果你曾经试过让同事手动在数据库结构中添加字段,那么数据库迁移可以让你不再需要做这样的事情。 Laravel Schema facade 对所有 Laravel 支持的数据库系统提供了创...