Java5 多线程实践

摘要:
本文使用网络服务器模型来实践Java5的多线程编程。该模型使用Java5中的线程池、阻塞队列和可重入锁。它还实践了Callable、Future和其他接口,并使用了Java5的另一个新特性泛型。Java5的线程池克服了这些缺点。下面详细介绍了如何使用Java5并发包提供的API来实现服务器。任务完成后,将线程释放到线程池。因为多个线程可能同时竞争和访问计数,所以需要锁定机制。在Java5之前,我们只能使用synchronized来锁定。ReentrantLock是Java5中引入的一种更细粒度的可重入锁。

2006 年 1 月 18 日

Java5增加了新的类库并发集java.util.concurrent,该类库为并发程序提供了丰富的API多线程编程在Java 5中更加容易,灵活。本文通过一个网络服务器模型,来实践Java5的多线程编程,该模型中使用了Java5中的线程池,阻塞队列,可重入锁等,还实践了Callable, Future等接口,并使用了Java 5的另外一个新特性泛型。

简介

本文将实现一个网络服务器模型,一旦有客户端连接到该服务器,则启动一个新线程为该连接服务,服务内容为往客户端输送一些字符信息。一个典型的网络服务器模型如下:

1. 建立监听端口。

2. 发现有新连接,接受连接,启动线程,执行服务线程。

3. 服务完毕,关闭线程。

这个模型在大部分情况下运行良好,但是需要频繁的处理用户请求而每次请求需要的服务又是简短的时候,系统会将大量的时间花费在线程的创建销毁。Java 5的线程池克服了这些缺点。通过对重用线程来执行多个任务,避免了频繁线程的创建与销毁开销,使得服务器的性能方面得到很大提高。因此,本文的网络服务器模型将如下:

1. 建立监听端口,创建线程池。

2. 发现有新连接,使用线程池来执行服务任务。

3. 服务完毕,释放线程到线程池。

下面详细介绍如何使用Java 5的concurrent包提供的API来实现该服务器。


Java5 多线程实践第1张
Java5 多线程实践第2张
Java5 多线程实践第3张
Java5 多线程实践第4张
回页首


初始化

初始化包括创建线程池以及初始化监听端口。创建线程池可以通过调用java.util.concurrent.Executors类里的静态方法newChahedThreadPool或是newFixedThreadPool来创建,也可以通过新建一个java.util.concurrent.ThreadPoolExecutor实例来执行任务。这里我们采用newFixedThreadPool方法来建立线程池。

ExecutorService pool = Executors.newFixedThreadPool(10);
表示新建了一个线程池,线程池里面有10个线程为任务队列服务。

使用ServerSocket对象来初始化监听端口。


private static final int PORT = 19527;
serverListenSocket = new ServerSocket(PORT);
serverListenSocket.setReuseAddress(true);
serverListenSocket.setReuseAddress(true);


Java5 多线程实践第1张
Java5 多线程实践第2张
Java5 多线程实践第3张
Java5 多线程实践第4张
回页首


服务新连接

当有新连接建立时,accept返回时,将服务任务提交给线程池执行。


while(true){
Socket socket = serverListenSocket.accept();
pool.execute(new ServiceThread(socket));
}

这里使用线程池对象来执行线程,减少了每次线程创建和销毁的开销。任务执行完毕,线程释放到线程池。


Java5 多线程实践第1张
Java5 多线程实践第2张
Java5 多线程实践第3张
Java5 多线程实践第4张
回页首


服务任务

服务线程ServiceThread维护一个count来记录服务线程被调用的次数。每当服务任务被调用一次时,count的值自增1,因此ServiceThread提供一个increaseCount和getCount的方法,分别将count值自增1和取得该count值。由于可能多个线程存在竞争,同时访问count,因此需要加锁机制,在Java 5之前,我们只能使用synchronized来锁定。Java 5中引入了性能更加粒度更细的重入锁ReentrantLock。我们使用ReentrantLock保证代码线程安全。下面是具体代码:


private static ReentrantLock lock = new ReentrantLock ();
private static int count = 0;
private int getCount(){
	int ret = 0;
	try{
		lock.lock();
		ret = count;
	}finally{
		lock.unlock();
	}
	return ret;
}	
private void increaseCount(){
	try{
		lock.lock();
		++count;
	}finally{
		lock.unlock();
	}
}

服务线程在开始给客户端打印一个欢迎信息,


increaseCount();
int curCount = getCount();
helloString = "hello, id = " + curCount+"\r\n";
dos = new DataOutputStream(connectedSocket.getOutputStream());
dos.write(helloString.getBytes());

然后使用ExecutorService的submit方法提交一个Callable的任务,返回一个Future接口的引用。这种做法对费时的任务非常有效,submit任务之后可以继续执行下面的代码,然后在适当的位置可以使用Future的get方法来获取结果,如果这时候该方法已经执行完毕,则无需等待即可获得结果,如果还在执行,则等待到运行完毕。


ExecutorService executor = Executors.newSingleThreadExecutor();
Future <String> future = executor.submit(new TimeConsumingTask());
dos.write("let's do soemthing other".getBytes());
String result = future.get();
dos.write(result.getBytes());
其中TimeConsumingTask实现了Callable接口
class TimeConsumingTask implements Callable <String>{
	public String call() throws Exception {
		System.out.println
		("It's a time-consuming task, 
		you'd better retrieve your result in the furture");
		return "ok, here's the result: It takes me lots of time to produce this result";
	}
}

这里使用了Java 5的另外一个新特性泛型,声明TimeConsumingTask的时候使用了String做为类型参数。必须实现Callable接口的call函数,其作用类似与Runnable中的run函数,在call函数里写入要执行的代码,其返回值类型等同于在类声明中传入的类型值。在这段程序中,我们提交了一个Callable的任务,然后程序不会堵塞,而是继续执行dos.write("let's do soemthing other".getBytes());当程序执行到String result = future.get()时如果call函数已经执行完毕,则取得返回值,如果还在执行,则等待其执行完毕。


Java5 多线程实践第1张
Java5 多线程实践第2张
Java5 多线程实践第3张
Java5 多线程实践第4张
回页首


服务器端的完整实现

服务器端的完整实现代码如下:


package com.andrew;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.Serializable;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
public class Server {
	private static int produceTaskSleepTime = 100;
	private static int consumeTaskSleepTime = 1200;
	private static int produceTaskMaxNumber = 100;
	private static final int CORE_POOL_SIZE = 2;
	private static final int MAX_POOL_SIZE = 100;
	private static final int KEEPALIVE_TIME = 3;
	private static final int QUEUE_CAPACITY = (CORE_POOL_SIZE + MAX_POOL_SIZE) / 2;
	private static final TimeUnit TIME_UNIT = TimeUnit.SECONDS;
	private static final String HOST = "127.0.0.1";
	private static final int PORT = 19527;
	private BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<Runnable>(
			QUEUE_CAPACITY);
	//private ThreadPoolExecutor serverThreadPool = null;
	
	private ExecutorService pool = null;
	private RejectedExecutionHandler rejectedExecutionHandler = new
	ThreadPoolExecutor.DiscardOldestPolicy();
	private ServerSocket serverListenSocket = null;
	private int times = 5;
	public void start() {
		// You can also init thread pool in this way.
		/*serverThreadPool = new ThreadPoolExecutor(CORE_POOL_SIZE,
				MAX_POOL_SIZE, KEEPALIVE_TIME, TIME_UNIT, workQueue,
				rejectedExecutionHandler);*/
		pool = Executors.newFixedThreadPool(10);
		try {
			serverListenSocket = new ServerSocket(PORT);
			serverListenSocket.setReuseAddress(true);
			System.out.println("I'm listening");
			while (times-- > 0) {
				Socket socket = serverListenSocket.accept();
				String welcomeString = "hello";
	//serverThreadPool.execute(new ServiceThread(socket, welcomeString));
				pool.execute(new ServiceThread(socket));
			}
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		cleanup();
	}
	public void cleanup() {
		if (null != serverListenSocket) {
			try {
				serverListenSocket.close();
			} catch (IOException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
		//serverThreadPool.shutdown();
		pool.shutdown();
	}
	public static void main(String args[]) {
		Server server = new Server();
		server.start();
	}
}
class ServiceThread implements Runnable, Serializable {
	private static final long serialVersionUID = 0;
	private Socket connectedSocket = null;
	private String helloString = null;
	private static int count = 0;
	private static ReentrantLock lock = new ReentrantLock();
	ServiceThread(Socket socket) {
		connectedSocket = socket;
	}
	public void run() {
		increaseCount();
		int curCount = getCount();
		helloString = "hello, id = " + curCount + "\r\n";
		ExecutorService executor = Executors.newSingleThreadExecutor();
		Future<String> future = executor.submit(new TimeConsumingTask());
		DataOutputStream dos = null;
		try {
			dos = new DataOutputStream(connectedSocket.getOutputStream());
			dos.write(helloString.getBytes());
			try {
				dos.write("let's do soemthing other.\r\n".getBytes());
				String result = future.get();
				dos.write(result.getBytes());
			} catch (InterruptedException e) {
				e.printStackTrace();
			} catch (ExecutionException e) {
				e.printStackTrace();
			}
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} finally {
			if (null != connectedSocket) {
				try {
					connectedSocket.close();
				} catch (IOException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
			if (null != dos) {
				try {
					dos.close();
				} catch (IOException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
			executor.shutdown();
		}
	}
	private int getCount() {
		int ret = 0;
		try {
			lock.lock();
			ret = count;
		} finally {
			lock.unlock();
		}
		return ret;
	}
	private void increaseCount() {
		try {
			lock.lock();
			++count;
		} finally {
			lock.unlock();
		}
	}
}
class TimeConsumingTask implements Callable<String> {
	public String call() throws Exception {
		System.out
				.println("It's a 	time-consuming task, 
				you'd better retrieve your result in the furture");
		return "ok, here's the result: It takes me lots of time to produce this result";
	}
}


Java5 多线程实践第1张
Java5 多线程实践第2张
Java5 多线程实践第3张
Java5 多线程实践第4张
回页首


运行程序

运行服务端,客户端只需使用telnet 127.0.0.1 19527 即可看到信息如下:

免责声明:文章转载自《Java5 多线程实践》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇FreeMarker 基础autossh使用(本机记住ssh密码)下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

菜鸟学JDBC(二)

  上一篇文章( http://blog.csdn.net/rowandjj/article/details/8883383)我们了解了如何通过JDBC连接mysql数据库,并通过一个简单的代码示例演示了具体的操作,这里简单回顾一下流程: 1.装载驱动(Class.forName()....); 2.通过DriverManager类与数据库建立连接(Con...

ios 多线程

转自:http://www.maxiaoguo.com/clothes/254.html 多线程包含:GCD  NSOperation   NSOperation是在GCD语言的基础上开发的,GCD类C语言, NSOperation OC语法 GCD: 名词解释  并行 dispatch_queue_t q = dispatch_queue_crea...

mysql半同步(semi-sync)源码实现

      mysql复制简单介绍了mysql semi-sync的出现的原因,并说明了semi-sync如何保证不丢数据。这篇文章主要侧重于semi-sync的实现,结合源码将semi-sync的实现过程展现给大家。最新的semi-sync源码可以参考官方5.7版本的实现,https://github.com/mysql/mysql-server。 打开...

xml解析兼容性问题的避免

1、引言 js在处理xml过程中,由于浏览器兼容性问题,要做许多的兼容处理,随着浏览器是升级,大多数浏览器都已经默认保持统一标准,以便于我们日常的开发,然而有了微软的IE这个奇葩的存在,形成了大多数前端开发者的噩梦,最近做的一个项目就踩中一个大坑,IE11和以往的IE6、7、8等既然都不一样,并且没和其他浏览器保持统一标准 2、问题描述 前端开发过程中一...

笔试题多线程

1. 描述线程与进程的区别? 一个应用程序实例是一个进程,一个进程内包含一个或多个线程,线程是进程的一部分; 进程之间是相互独立的,他们有各自的私有内存空间和资源,进程内的线程可以共享其所属进程的所有资源; 2. 为什么GUI不支持跨线程访问控件?一般如何解决这个问题? 因为GUI应用程序引入了一个特殊的线程处理模型,为了保证UI控件的线程安全,这个线...

【Chromium中文文档】跨进程通信 (IPC)

跨进程通信 (IPC) 转载请注明出处:https://ahangchen.gitbooks.io/chromium_doc_zh/content/zh//General_Architecture/Inter-process_Communication.html 全书地址 Chromium中文文档 for https://www.chromium.org/...