package czp.opensource.edf;
public class EDFEvent {
/**
* 事件源
*/
private Object from;
/**
* 异常
*/
private Throwable error;
/**
* 事件的具体内容
*/
private Object body;
/**
* 事件类型
*/
private EDFEventType type;
public EDFEvent(Object from, Throwable error, Object body, EDFEventType type) {
this.from = from;
this.error = error;
this.body = body;
this.type = type;
}
public EDFEvent(Object body, EDFEventType type) {
this.body = body;
this.type = type;
}
public EDFEvent() {
}
/**
*
* @return 事件的内容
*/
public Object getBody() {
return body;
}
/**
*
* @param body 事件的内容
*/
public void setBody(Object body) {
this.body = body;
}
/**
*
* @param from 事件的发送者
*/
public void setFrom(Object from) {
this.from = from;
}
/**
*
* @param error 事件的异常
*/
public void setError(Throwable error) {
this.error = error;
}
/**
*
* @return 事件的发送者
*/
public Object getFrom() {
return from;
}
/**
*
* @return 事件异常
*/
public Throwable getError() {
return error;
}
/**
*
* @return 事件类型
*/
public EDFEventType getType() {
return this.type;
}
/**
*
* @param type 事件类型
*/
public void setType(EDFEventType type) {
this.type = type;
}
@Override
public String toString() {
StringBuilder str = new StringBuilder();
str.append("EDFEvent from:").append(from).append(",error:").append(error)
.append(",type:").append(type).append(",body:").append(body);
return str.toString();
}
}
//---------
package czp.opensource.edf;
/**
* 事件的处理器
*
* @author CaoZhongping
*
*/
public interface EDFEventHandler {
/**
* 处理具体的事件
* @param event 容器分配的事件
* @return Object 返回值
*/
Object handle(EDFEvent event);
/**
* 处理异常事件
* @param event 容器分配的异常事件
*/
void handdleError(EDFEvent event);
/**
* @return EDFEventType 关注的事件类型
*/
EDFEventType getMatchType();
}
//------
package czp.opensource.edf;
/**
* 事件类型接口默认提供两个实现:
* <pre>
* 1 错误类型ERROR_TYPE,当处理事件发生
* 异常时会发送该类型的事件并调用handle的handleError
* 调用者一般不需要发送异常类型的事件
* 2 所有类型ALL_TYPE,如果一个handle注册了ERROR_TYPE
* 则表示该关注所有事件,因为isMatch恒返回true
* 使用者需要实现 getType()和isMatch(),例如:
*
* public class SimpleEDFEventType implements EDFEventType {
private String myType;
public SimpleEDFEventType(String myType) {
this.myType = myType;
}
@Override
public String getType() {
return myType;
}
@Override
public boolean isMatch(EDFEventType type) {
return type.getType().equalsIgnoreCase(getType());
}
}
*
* </pre>
*
* @author CaoZhongping
*
*/
public interface EDFEventType {
/**
* @return 事件类型的名称
*/
String getType();
/**
* 反应器在处理事件时会调用
* 该方法找到该事件的handler
* 这个方法应该尽可能的简单
*
* @param type 事件类型
* @return boolean 是否匹配
*/
boolean isMatch(EDFEventType type);
/**
* 错误事件类型
*/
EDFEventType ERROR_TYPE = new EDFEventType(){
private String type="EDFEventType_ERROR";
@Override
public String getType() {
return type;
}
@Override
public boolean isMatch(EDFEventType eventType) {
return eventType.getType().equals(getType());
}
};
/**
* 所有的事件类型isMatch恒为true
* 也就是关注所有的事件
*/
EDFEventType ALL_TYPE = new EDFEventType() {
private String type="EDFEventType_ALL";
@Override
public boolean isMatch(EDFEventType type) {
return true;
}
@Override
public String getType() {
return type;
}
};
}
//-----------
package czp.opensource.edf;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
/**
* 事件驱动框架的事件反应器
*
* @author CaoZhongping
*
*/
public class EDFReactor implements Runnable{
/**
* 每个处理器默认的处理超时时间
*/
private static final long DEFAULT_TIME_OUT_SEC = 15;
/**
* 默认的队列大小
*/
private final static int DEFAULT_QUEUE_CAPCITY = 50;
/**
* 事件队列
*/
private BlockingQueue<EDFEvent> queue;
/**
* 分派线程池
*/
private ExecutorService dispatchService;
/**
* 工作线程池
*/
private ExecutorService workService;
/**
* 队列容量
*/
private int eventQueueCapability;
/**
* 事件处理器
*/
private Map<String, List<EDFEventHandler>> handlers;
/**
* 创建反应器的线程
*/
private Thread createThread;
private volatile boolean isStop;
public EDFReactor(ExecutorService dispatchService,
ExecutorService workService,int eventQueueMaxSize) {
this.dispatchService = dispatchService;
this.workService = workService;
this.queue = new ArrayBlockingQueue<EDFEvent>(eventQueueMaxSize);
this.handlers = new ConcurrentHashMap<String, List<EDFEventHandler>>();
this.createThread = Thread.currentThread();
}
public EDFReactor()
{
this(Executors.newSingleThreadExecutor(), Executors.newCachedThreadPool(), DEFAULT_QUEUE_CAPCITY);
}
/**
* 添加处理器
* @param eventHandler
*/
public void addEventHandler(EDFEventHandler eventHandler)
{
String type = eventHandler.getMatchType().getType();
List<EDFEventHandler> ls = handlers.get(type);
if(ls==null)
{
ls = new CopyOnWriteArrayList<EDFEventHandler>();
ls.add(eventHandler);
handlers.put(type, ls);
}else{
ls.add(eventHandler);
}
}
/**
* 启动反应器
*/
public void start(){
if(dispatchService.isShutdown())
{
throw new RuntimeException("the EDFReactor has Shutdown ,Please create a new EDFReactor");
}
this.dispatchService.execute(this);
}
/**
* 停止反应器
* 只有反应器的创建线程可以终止它
*/
public void stop()
{
if(Thread.currentThread().equals(createThread))
{
this.isStop = true;
this.queue.clear();
this.handlers.clear();
this.dispatchService.shutdownNow();
this.workService.shutdownNow();
}else{
throw new RuntimeException("EDFReactor stop() only be called by create thread");
}
}
/**
* @return 队列的容量
*/
public int getEventQueueCapability() {
return eventQueueCapability;
}
/**
*添加事件到队列
* @param event
*/
public void sendEvent(EDFEvent event)
{
this.queue.offer(event);
}
/**
* 分派线程开始
*/
@Override
public void run() {
while(!isStop)
{
try {
EDFEvent ev = queue.take();
handleEvent(ev);
} catch (Exception e) {
sendExceptionEvent(e);
}
}
}
/**
* 在工作线程里调用具体的事件处理器
*
* @param ev
*/
private void handleEvent(EDFEvent ev) {
List<EDFEventHandler> ls = handlers.get(ev.getType().getType());
if(ls!=null)
{
for(EDFEventHandler hdl:ls)
{
Future<?> future = null;
try {
final EDFEventTask task = new EDFEventTask(ev,hdl);
future = workService.submit(task);
if(!future.isDone())
{
future.get(DEFAULT_TIME_OUT_SEC, TimeUnit.SECONDS);
}
}catch (Exception e) {
sendExceptionEvent(e);
}
}
}
}
/**
* 发送异常事件
* @param throwable
*/
public void sendExceptionEvent(Throwable throwable) {
EDFEvent event = new EDFEvent();
event.setFrom(this);
event.setError(throwable);
event.setType(EDFEventType.ERROR_TYPE);
sendEvent(event);
}
/**
* 具体的事件
*
* @author CaoZhongping
*
*/
private static class EDFEventTask implements Callable<Object> {
private EDFEvent event;
private EDFEventHandler handle;
public EDFEventTask(EDFEvent event, EDFEventHandler handle) {
this.event = event;
this.handle = handle;
}
@Override
public Object call() throws Exception {
if(event.getType().isMatch(EDFEventType.ERROR_TYPE))
{
handle.handdleError(event);
}else if(handle.getMatchType().isMatch(event.getType()))
{
return handle.handle(event);
}
return null;
}
}
}
//---------
package czp.opensource.edf.demo;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import czp.opensource.edf.EDFEvent;
import czp.opensource.edf.EDFEventHandler;
import czp.opensource.edf.EDFEventType;
import czp.opensource.edf.EDFReactor;
/**
* <pre>
* 参见:czp.opensource.edf.demo.SimpleDemo
* </pre>
* @author CaoZhongping
*
*/
public class SimpleDemo {
public static void main(String[] args) throws InterruptedException {
//创建一个反应器通常一个系统创建一个reactor
final EDFReactor reactor = new EDFReactor();
//添加一个处理器
reactor.addEventHandler(new SimpleEDFEventHandle());
//启动反应器
reactor.start();
final CountDownLatch sd = new CountDownLatch(1);
ExecutorService service = Executors.newFixedThreadPool(5);
service.execute(new Runnable() {
@Override
public void run() {
int i = 0;
while(i++<20)
{
try {
//创建事件
EDFEvent event = new EDFEvent();
event.setType(new SimpleEDFEventType("test"));
event.setFrom(this);
//发送事件
reactor.sendEvent(event );
Thread.sleep(10);
} catch (Exception e) {
e.printStackTrace();
}
}
sd.countDown();
}
});
sd.await();
service.shutdownNow();
//系统注销时stop反应器
reactor.stop();
}
}
/**
* 简单的处理器
*
* @author CaoZhongping
*
*/
class SimpleEDFEventHandle implements EDFEventHandler {
private EDFEventType myType = new SimpleEDFEventType("test");
@Override
public Object handle(EDFEvent event) {
System.out.println("get msg "+event);
return null;
}
@Override
public void handdleError(EDFEvent event) {
System.out.println(event.getError());
}
@Override
public EDFEventType getMatchType() {
return myType;
}
}
/**
* 简单的事件类型
*
* @author CaoZhongping
*/
class SimpleEDFEventType implements EDFEventType {
private String myType;
public SimpleEDFEventType(String myType) {
this.myType = myType;
}
@Override
public String getType() {
return myType;
}
@Override
public boolean isMatch(EDFEventType type) {
return type.getType().equalsIgnoreCase(getType());
}
@Override
public String toString() {
return "SimpleEDFEventType [myType=" + myType + "]";
}
}
EDF
最新推荐文章于 2023-04-21 11:19:24 发布