Master-Worker模式是常用的并发模式,核心是Master和Worker两个进程,Master接收和分配任务,各个Worker执行任务并返回结果,由Master来归纳和总结。好处是能将大任务分解为若干个小任务,提高系统的吞吐量。
代码示例:
public class Master {
// 1.接收任务的queue
private ConcurrentLinkedQueue<Task> taskQueue;
// 2.管理worker的map
private HashMap<String, Thread> workers;
// 3.存储worker处理完的结果的map
private ConcurrentHashMap<String, Object> resultSet;
// 4.构造函数,构造workers
public Master(Worker worker, int workCount) {
taskQueue = new ConcurrentLinkedQueue<Task>();
workers = new HashMap<String, Thread>();
resultSet = new ConcurrentHashMap<String, Object>();
worker.setTaskQueue(taskQueue);
worker.setResultSet(resultSet);
for (int i = 0; i < workCount; i++) {
workers.put("worker"+i, new Thread(worker));
}
}
// 5.接收任务
public void submit(Task task) {
taskQueue.add(task);
}
// 6.执行任务
public void execute() {
for (Entry<String, Thread> work : workers.entrySet()) {
work.getValue().start();
}
}
// 7.判断所有任务是否执行完成
public boolean isCompleted() {
for (Entry<String, Thread> work : workers.entrySet()) {
if(work.getValue().getState() != Thread.State.TERMINATED) {
return false;
}
}
return true;
}
// 8.返回结果
public long getResult() {
long result = 0;
for (Entry<String, Object> task : resultSet.entrySet()) {
result += (int)task.getValue();
}
return result;
}
}
public class Worker implements Runnable {
private ConcurrentLinkedQueue<Task> taskQueue;
private ConcurrentHashMap<String, Object> resultSet;
public void setTaskQueue(ConcurrentLinkedQueue<Task> taskQueue) {
this.taskQueue = taskQueue;
}
public void setResultSet(ConcurrentHashMap<String, Object> resultSet) {
this.resultSet = resultSet;
}
@Override
public void run() {
// 循环地从taskQueue中取出任务,并执行,直到任务队列为空
while(true) {
// 取出任务并从队列移除该任务
Task task = taskQueue.poll();
if(task == null) break;
handle(task);
}
}
public void handle(Task task) {
int result = task.getId();
try {
// 模拟执行任务消耗的时间
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
resultSet.put(String.valueOf(task.getId()), result);
}
}
public class Test {
public static void main(String[] args) {
Master master = new Master(new Worker(), 20);
for (int i = 0; i < 100; i++) {
Task task = new Task();
task.setId(i+1);
task.setName("任务"+i+1);
// 提交任务
master.submit(task);
}
long start = System.currentTimeMillis();
// 执行任务
master.execute();
while(true) {
if(master.isCompleted()) {
long end = System.currentTimeMillis();
long result = master.getResult();
System.out.println("结果为:" + result + ",所用时间:" + (end-start));
break;
}
}
}
}
1、Master拥有一个任务队列,一个Worker组和一个结果集
2、每个Worker能操作任务队列和结果集,因为需要从任务队列中取任务,以及结果存放到结果集中
总结:
Master-Worker模式是一种将串行任务并行化的方案,被分解的子任务在系统中可以被并行处理,同时,如果有需要,Master进程不需要等待所有子任务都完成计算,就可以根据已有的部分结果集计算最终结果集。