说明:Servlet也是多线程结构。servlet类中定义的成员变量,被所有的客户线程共享。当容器同时收到对同一Servlet的多个请求,那这个Servlet的service方法将在多线程中并发的执行。
[b]一、线程池示例[/b]
(1)threadpool.xml
(2)解析XML文件
(3)工作线程
(4)管理线程池
(5)调用
(6)具有线程池的工作队列
我们通常想要的是同一组固定的工作线程相结合的工作队列,它使用 wait() 和 notify() 来通知等待线程新的工作已经到达了。该工作队列通常被实现成具有相关监视器对象的某种链表。
大多数专家建议使用 notifyAll() 而不是 notify() ,而且理由很充分:使用 notify() 具有难以捉摸的风险,只有在某些特定条件下使用该方法才是合适的。另一方面,如果使用得当, notify() 具有比 notifyAll() 更可取的性能特征;特别是, notify() 引起的环境切换要少得多,这一点在服务器应用程序中是很重要的。
[b]二、如何停止线程[/b]
(1)运行状态下,通过置一个volatile变量值来结束一个循环线程。
(2)非运行状态下(调用sleep,wait,或者被I/O阻塞,可能是文件或者网络等等),使用interrupt(),让线程在run方法中停止。
当interrupt()被调用的时候,InterruptedException将被抛出,所以你可以再run方法中捕获这个异常,让线程安全退出:
当线程被I/O阻塞的时候,调用interrupt()的情况是依赖与实际运行的平台的。在Solaris和Linux平台上将会抛出InterruptedIOException的异常,但是Windows上面不会有这种异常。所以,我们处理这种问题不能依靠于平台的实现。
也可以使用InterruptibleChannel接口。 实现了InterruptibleChannel接口的类可以在阻塞的时候抛出ClosedByInterruptException。
需要注意一点,当线程处于写文件的状态时,调用interrupt()不会中断线程。
(3)一个示例
[b]三、关于volatile[/b]
volatile是一个类型修饰符(type specifier)。它是被设计用来修饰被不同线程访问和修改的变量。如果没有volatile,基本上会导致这样的结果:要么无法编写多线程程序,要么编译器失去大量优化的机会。
一般说来,volatile用在如下的几个地方:
1、中断服务程序中修改的供其它程序检测的变量需要加volatile;
2、多任务环境下各任务间共享的标志应该加volatile;
3、存储器映射的硬件寄存器通常也要加volatile说明,因为每次对它的读写都可能由不同意义;
[b]一、线程池示例[/b]
(1)threadpool.xml
<?xml version="1.0" encoding="UTF-8"?>
<config>
<threadPool>
<minPools>10</minPools> <!--线程池最小线程-->
<maxPools>100</maxPools> <!--线程池最大线程-->
<checkThreadPeriod>5</checkThreadPeriod> <!--检查线程池中线程的周期5分钟-->
</threadPool>
</config>
(2)解析XML文件
import java.io.InputStream;
import java.util.Iterator;
import org.dom4j.Document;
import org.dom4j.Element;
import org.dom4j.io.SAXReader;
import org.springframework.core.io.ClassPathResource;
public class XMLReader {
private Document document;
private static final XMLReader instance =
new XMLReader();
/**
* 私有的默认构造子
*/
private XMLReader() {
loadXML();
}
/**
* 静态工厂方法
*/
public static XMLReader getInstance()
{
return instance;
}
private void loadXML(){
InputStream is = null;
SAXReader reader =null;
try {
is = (new ClassPathResource("threadpool.xml")).getInputStream();
reader = new SAXReader();
document = reader.read(is);
is.close();
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 读取指定值
* @param name
* @return
*/
public String getThreadPoolPara(String name){
String str = "";
try {
Element root = document.getRootElement(); // 获得根元素
Iterator lv = root.elementIterator("threadPool");
Element el = null;
while (lv.hasNext()) {
el = (Element) lv.next();
str = el.element(name).getText();
}
} catch (Exception e) {
System.out.println(e.toString());
}
return str;
}
}
(3)工作线程
class WorkThread extends Thread {
private boolean runningFlag;
private String argument;
public boolean isRunning() {
return runningFlag;
}
public synchronized void setRunning(boolean flag) {
runningFlag = flag;
if (flag)
this.notify();
}
public String getArgument() {
return this.argument;
}
public void setArgument(String string) {
argument = string;
}
public WorkThread(int threadNumber) {
runningFlag = false;
System.out.println("thread " + threadNumber + "started.");
}
public synchronized void run() {
try {
while (true) {
if (!runningFlag) {
this.wait();
} else {
System.out.println("processing " + getArgument()
+ "... done.");
sleep(5000);
System.out.println("Thread is sleeping...");
setRunning(false);
}
}
} catch (InterruptedException e) {
System.out.println("Interrupt");
}
}
}
(4)管理线程池
import java.util.*;
class ThreadPoolManager {
private int maxPools;
private int minPools;
private int checkThreadPeriod;
// private java.util.Timer timer = null;
public Vector vector;
@SuppressWarnings("unchecked")
public ThreadPoolManager() {
setMaxPools(Integer.parseInt(XMLReader.getInstance().getThreadPoolPara("maxPools")));
setMinPools(Integer.parseInt(XMLReader.getInstance().getThreadPoolPara("minPools")));
setCheckThreadPeriod(Integer.parseInt(XMLReader.getInstance().getThreadPoolPara("checkThreadPeriod")));
System.out.println("Starting thread pool...");
vector = new Vector();
for (int i = 1; i <= minPools; i++) {
WorkThread thread = new WorkThread(i);
vector.addElement(thread);
thread.start();
}
// timer = new Timer(true);
// timer.schedule(new CheckThreadTask(this),0,checkThreadPeriod);
}
@SuppressWarnings("unchecked")
public void process(String argument) {
int i;
for (i = 0; i < vector.size(); i++) {
WorkThread currentThread = (WorkThread) vector.elementAt(i);
if (!currentThread.isRunning()) {
System.out.println("Thread " + (i + 1) + " is processing:"
+ argument);
currentThread.setArgument(argument);
currentThread.setRunning(true);
return;
}
if(i == vector.size()-1){//没的空闲线程了,就新建一个
if(vector.size() < maxPools){
WorkThread thread = new WorkThread(i);
vector.addElement(thread);
thread.setArgument(argument);
thread.setRunning(true);
thread.start();
}
}
}
if (i == maxPools) {
System.out.println("pool is full, try in another time.");
}
}
public int getCheckThreadPeriod() {
return checkThreadPeriod;
}
public void setCheckThreadPeriod(int checkThreadPeriod) {
this.checkThreadPeriod = checkThreadPeriod;
}
public int getMaxPools() {
return maxPools;
}
public void setMaxPools(int maxPools) {
this.maxPools = maxPools;
}
public int getMinPools() {
return minPools;
}
public void setMinPools(int minPools) {
this.minPools = minPools;
}
}// end of class ThreadPoolManager
(5)调用
public static void main(String[] args) {
try {
BufferedReader br = new BufferedReader(new InputStreamReader(
System.in));
String s;
ThreadPoolManager manager = new ThreadPoolManager();
while ((s = br.readLine()) != null) {
manager.process(s);
}
} catch (IOException e) {
}
}
(6)具有线程池的工作队列
我们通常想要的是同一组固定的工作线程相结合的工作队列,它使用 wait() 和 notify() 来通知等待线程新的工作已经到达了。该工作队列通常被实现成具有相关监视器对象的某种链表。
public class WorkQueue
{
private final int nThreads;
private final PoolWorker[] threads;
private final LinkedList queue;
public WorkQueue(int nThreads)
{
this.nThreads = nThreads;
queue = new LinkedList();
threads = new PoolWorker[nThreads];
for (int i=0; i<nThreads; i++) {
threads[i] = new PoolWorker();
threads[i].start();
}
}
public void execute(Runnable r) {
synchronized(queue) {
queue.addLast(r);
queue.notify();
}
}
private class PoolWorker extends Thread {
public void run() {
Runnable r;
while (true) {
synchronized(queue) {
while (queue.isEmpty()) {
try
{
queue.wait();
}
catch (InterruptedException ignored)
{
}
}
r = (Runnable) queue.removeFirst();
}
// If we don't catch RuntimeException,
// the pool could leak threads
try {
r.run();
}
catch (RuntimeException e) {
// You might want to log something here
}
}
}
}
}
大多数专家建议使用 notifyAll() 而不是 notify() ,而且理由很充分:使用 notify() 具有难以捉摸的风险,只有在某些特定条件下使用该方法才是合适的。另一方面,如果使用得当, notify() 具有比 notifyAll() 更可取的性能特征;特别是, notify() 引起的环境切换要少得多,这一点在服务器应用程序中是很重要的。
[b]二、如何停止线程[/b]
(1)运行状态下,通过置一个volatile变量值来结束一个循环线程。
(2)非运行状态下(调用sleep,wait,或者被I/O阻塞,可能是文件或者网络等等),使用interrupt(),让线程在run方法中停止。
当interrupt()被调用的时候,InterruptedException将被抛出,所以你可以再run方法中捕获这个异常,让线程安全退出:
try {
....
wait();
} catch (InterruptedException iex) {
throw new RuntimeException("Interrupted",iex);
}
当线程被I/O阻塞的时候,调用interrupt()的情况是依赖与实际运行的平台的。在Solaris和Linux平台上将会抛出InterruptedIOException的异常,但是Windows上面不会有这种异常。所以,我们处理这种问题不能依靠于平台的实现。
也可以使用InterruptibleChannel接口。 实现了InterruptibleChannel接口的类可以在阻塞的时候抛出ClosedByInterruptException。
需要注意一点,当线程处于写文件的状态时,调用interrupt()不会中断线程。
(3)一个示例
import java.io.BufferedReader;
import java.io.FileDescriptor;
import java.io.FileInputStream;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.channels.Channels;
public class InterruptInput {
static BufferedReader in = new BufferedReader(
new InputStreamReader(
Channels.newInputStream(
(new FileInputStream(FileDescriptor.in)).getChannel())));
public static void main(String args[]) {
try {
System.out.println("Enter lines of input (user ctrl+Z Enter to terminate):");
System.out.println("(Input thread will be interrupted in 10 sec.)");
// interrupt input in 10 sec
(new TimeOut()).start();
String line = null;
while ((line = in.readLine()) != null) {
System.out.println("Read line:'"+line+"'");
}
} catch (Exception ex) {
System.out.println(ex.toString()); // java.nio.channels.ClosedChannelException
}
}
public static class TimeOut extends Thread {
int sleepTime = 10000;
Thread threadToInterrupt = null;
public TimeOut() {
// interrupt thread that creates this TimeOut.
threadToInterrupt = Thread.currentThread();
setDaemon(true);
}
public void run() {
try {
sleep(10000); // wait 10 sec
} catch(InterruptedException ex) {/*ignore*/}
threadToInterrupt.interrupt();
}
}
}
[b]三、关于volatile[/b]
volatile是一个类型修饰符(type specifier)。它是被设计用来修饰被不同线程访问和修改的变量。如果没有volatile,基本上会导致这样的结果:要么无法编写多线程程序,要么编译器失去大量优化的机会。
一般说来,volatile用在如下的几个地方:
1、中断服务程序中修改的供其它程序检测的变量需要加volatile;
2、多任务环境下各任务间共享的标志应该加volatile;
3、存储器映射的硬件寄存器通常也要加volatile说明,因为每次对它的读写都可能由不同意义;