现在项目(web)中有个需求,每天临晨对一个 WEB 目标进行页面爬取,爬取过程是一个多线程任务,这个任务由 Quartz(Spring2 整合)cronTrigger 来调度。 大概同时会派出5-10个爬虫线程,执行爬虫线程的线程池,也是由 Spring 配置的 SimpleThreadPoolTaskExecutor。
现在的情况:众所周知,Quartz 缺省维持了一组自己的线程池,default pool size = 10。 实际上我全系统只有一个任务,每天运行一次,那么,每次调度触发时,池中始终只有一个线程会被用到。 这个线程开始执行任务后, 单独配置的爬虫线程池再接着开始执行爬取任务。
--------------------
在互联网应用里,去相关的网站自动的获取一些信息,是很常见的一种应用
比如,我这里现在要实现的一个功能,就是明天定时去ebay,amazon两个网站,获取产品信息,
从而,为自动分析的商务引擎提供基础的运算数据。
这里由于需要提取的信息很多,商家太多,所以,需要threadpool来进行多线程的实现,这里
使用httpclient作为http的实现,用自己的组件jager来提供模板解析的功能通过模板来提取
指定页面信息,由于这里ebay和amazon会分析ip,如果一个ip访问过多的话,作为舞弊行为,
而拒绝你继续访问。
这也就是我们用threadpool来进行多线程的原因,这这里在每个线程里,都会去一个指定的网站
获取代理服务器的信息,然后根据提取以后的代理服务器的信息,通过程序的代理上ebay和amazon
这样也就不会发现舞弊的行为了。当一个ip失效,会switch到另一个代理ip继续。
threadpool的代码比较简单,在1.5里提供了一个ThreadPoolExecutor的类,这个类已经基本
实现了线程池的功能。
自己简单封装了一个ThreadPoolManager的类
代码如下
public class ThreadPoolManager
{
private static Log log = LogFactory.getLog(ThreadPoolManager.class);
private ThreadPoolManager()
{}
public static ThreadPoolManager newInstance()
{
return new ThreadPoolManager();
}
private ThreadPoolExecutor taskPool = null; // Thread pool used to hold running threads
private BlockingQueue taskQueue = null; // Blocking queue used to hold waiting threads
private int corePoolSize = 2;
private int maxPoolSize = 2;
private int queueSize = 4;
private long waitTime = 60;
public final static String THREADPOOL_POOLSIZE_KEY = "corePoolSize";
public final static String THREADPOOL_MAXSIZE_KEY = "maxPoolSize";
public final static String THREADPOOL_QUEUESIZE_KEY = "queueSize";
public final static String THREADPOOL_WAITTIME_KEY = "waitTime";
void init(Properties p)
{
corePoolSize = StringUtils.str2Int(p.getProperty(THREADPOOL_POOLSIZE_KEY), 2);
maxPoolSize = StringUtils.str2Int(p.getProperty(THREADPOOL_MAXSIZE_KEY), 2);
queueSize = StringUtils.str2Int(p.getProperty(THREADPOOL_QUEUESIZE_KEY), 2);
waitTime = StringUtils.str2Int(p.getProperty(THREADPOOL_WAITTIME_KEY), 60);
taskQueue = new ArrayBlockingQueue(queueSize);
taskPool = new ThreadPoolExecutor(corePoolSize, maxPoolSize, waitTime,
TimeUnit.SECONDS, taskQueue, new ThreadPoolExecutor.CallerRunsPolicy());
log.info("Initialize thread pool succeed. ThreadPool: corePoolSize = "
+ corePoolSize + ", queueSize = " + queueSize
+ ", maxPoolSize = " + maxPoolSize);
}
public void run(Runnable command)
{
this.taskPool.execute(command);
}
}
调用的方式
Sample 代码
引用:
public class HitJobStart
{
.....
public void start()
{
if(findProxies == null)
return;
ThreadPoolManager pm = ThreadPoolManager.newInstance();
Properties p = new Properties();
p.setProperty(pm.THREADPOOL_MAXSIZE_KEY, ""+concurrent);
p.setProperty(pm.THREADPOOL_POOLSIZE_KEY, ""+concurrent);
p.setProperty(pm.THREADPOOL_QUEUESIZE_KEY, ""+concurrent);
pm.init(p);
for(FindProxy fp:findProxies)
{
hit(fp, urls,
timeoutMillionSecond, pm);
}
}
private void hit(FindProxy fp, final String[] urls, final int timeout, ThreadPoolManager pm)
{
fp.load();
List<Proxy> ps = fp.getValidProxys();
for(final Proxy p : ps)
{
pm.run(new Runnable(){
public void run()
{
hit(p, urls, timeout);
}
});
}
}
....
}
这里调用的地方是
pm.run(new Runnable(){
public void run()
{
hit(p, urls, timeout);
}
});
这样就把一个线程压到线程池里了。