之前我们说了观察者模式的原理,实现,应用场景,有着异步非阻塞,同步阻塞等实现方式
今天我们去实现给一个简单基于观察者模式的通用框架
首先说一下异步非阻塞观察者模式的实现,可以有两种,一种是每次都创建一个新的线程来执行代码逻辑,另外一种则是,在UserController中,使用线程池来多次执行异步任务
对于第一种实现,是非常不支持的,因为线程对于Java来说,是一个大对象,频繁的创建和销毁是正确,于是可以考虑对于第二种的实现方式,利用一个线程池来进行异步执行,但是,我们还是对于业务代码的耦合太深,于是我们需要进行了解耦,来设计一个框架,从而做到解耦,这时候,我们可以模仿Google EventBus来实现一个框架
EventBus框架功能需求的介绍
我们要求在这个框架中,既能支持同步阻塞的实现,也能支持异步非阻塞的实现
直接使用Guava EventBus的话,我们的实现可以如下
public class UserController {
private UserService userService; // 依赖注入 private EventBus eventBus; private static final int DEFAULT_EVENTBUS_THREAD_POOL_SIZE = 20; public UserController() { //eventBus = new EventBus(); // 同步阻塞模式 eventBus = new AsyncEventBus(Executors.newFixedThreadPool(DEFAULT_EVENTBUS_THREAD_POOL_SIZE)); // 异步非阻塞模式 } public void setRegObservers(List<Object> observers) { for (Object observer : observers) { eventBus.register(observer); } } public Long register(String telephone, String password) { //省略输入参数的校验代码 //省略userService.register()异常的try-catch代码 long userId = userService.register(telephone, password); eventBus.post(userId); return userId; } } public class RegPromotionObserver { private PromotionService promotionService; // 依赖注入 @Subscribe public void handleRegSuccess(long userId) { promotionService.issueNewUserExperienceCash(userId); } } public class RegNotificationObserver { private NotificationService notificationService; @Subscribe public void handleRegSuccess(long userId) { notificationService.sendInboxMessage(userId, “…”); } } |
其实现思路就是,在eventBus中,传入一个线程池,并且在原有的观察者上,加上@Subscribe来说明是调用的接口,然后通过register来注册,利用post来给Observer发送消息
那么我们只需要关注@Subscribe这个注解,EventBus本身即可,需要关注的函数就是register() post()
我们具体讲一下这几个类和函数
EventBus,AsyncEventBus
EventBus实现了同步阻塞的观察者模式,AsyncEventBus则是一种装饰器,提供了异步非阻塞的观察者模式
EventBus eventBus = new EventBus(); // 同步阻塞模式
EventBus eventBus = new AsyncEventBus(Executors.newFixedThreadPool(8));// 异步阻塞模式
register()函数
注册观察者的
public void register(Object object);
unregister()函数
删除某个观察者的
public void unregister(Object object);
post()函数
用于给观察者发消息的
public void post(Object event);
不过于经典的观察者不同的是,在调用post函数发送消息的时候,是发送给了所有的观察者,然后进行匹配,匹配到的消息类型才会接收到
比如 AObserver能接受XMsg,BObserver能接收YMsg,Cobserver能接收到ZMsg,其中X是Y的父类,那么接受请求的时候,能够收到的消息的可匹配的观察者如下
XMsg xMsg = new XMsg();
YMsg yMsg = new YMsg();
ZMsg zMsg = new ZMsg();
post(xMsg); => AObserver、BObserver接收到消息
post(yMsg); => BObserver接收到消息
post(zMsg); => CObserver接收到消息
然后就是@Subscribe注解,利用这个注解来表示能够接受消息的函数
那么,了解了这些,就可以手动的实现一个EventBus框架了
其中EventBus的核心函数是register和post
主要的类是EventBus和@Subscribe注解
那么其实register就是在EventBus这个类中维护的一个map中添加了一个对象,对于Post(),则是找到这个map中对应的方法,利用反射,发送过去
那么,在我们的具体实现中,整个框架被拆分为了5个类 EventBus,AsyncEventBus,Subscibe,ObserverAction,ObserverRegistry
首先是Subscuve
这是一个注解,用于表名观察者中哪个函数还可以接受消息,用于在方法之上
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
@Beta
public @interface Subscribe {}
然后是ObserverAction
用于表示被@Subscibe注解的方法,其中targer是被观察者的类,method是方法
public class ObserverAction {
private Object target; private Method method; public ObserverAction(Object target, Method method) { this.target = Preconditions.checkNotNull(target); this.method = method; this.method.setAccessible(true); } public void execute(Object event) { // event是method方法的参数 try { method.invoke(target, event); } catch (InvocationTargetException | IllegalAccessException e) { e.printStackTrace(); } } } |
然后是ObserverRegistry
前面讲到的Observer注册表,一个复杂的类,是保存了类和方法的对应关系,利用了一个map,这个map中,维护了多个class对象和CopyOnWriteArraySet的对应关系
public class ObserverRegistry {
private ConcurrentMap<Class<?>, CopyOnWriteArraySet<ObserverAction>> registry = new ConcurrentHashMap<>(); public void register(Object observer) { Map<Class<?>, Collection<ObserverAction>> observerActions = findAllObserverActions(observer); for (Map.Entry<Class<?>, Collection<ObserverAction>> entry : observerActions.entrySet()) { Class<?> eventType = entry.getKey(); Collection<ObserverAction> eventActions = entry.getValue(); CopyOnWriteArraySet<ObserverAction> registeredEventActions = registry.get(eventType); if (registeredEventActions == null) { registry.putIfAbsent(eventType, new CopyOnWriteArraySet<>()); registeredEventActions = registry.get(eventType); } registeredEventActions.addAll(eventActions); } } public List<ObserverAction> getMatchedObserverActions(Object event) { List<ObserverAction> matchedObservers = new ArrayList<>(); Class<?> postedEventType = event.getClass(); for (Map.Entry<Class<?>, CopyOnWriteArraySet<ObserverAction>> entry : registry.entrySet()) { Class<?> eventType = entry.getKey(); Collection<ObserverAction> eventActions = entry.getValue(); if (eventType.isAssignableFrom(postedEventType)) { matchedObservers.addAll(eventActions); } } return matchedObservers; } private Map<Class<?>, Collection<ObserverAction>> findAllObserverActions(Object observer) { Map<Class<?>, Collection<ObserverAction>> observerActions = new HashMap<>(); Class<?> clazz = observer.getClass(); for (Method method : getAnnotatedMethods(clazz)) { Class<?>[] parameterTypes = method.getParameterTypes(); Class<?> eventType = parameterTypes[0]; if (!observerActions.containsKey(eventType)) { observerActions.put(eventType, new ArrayList<>()); } observerActions.get(eventType).add(new ObserverAction(observer, method)); } return observerActions; } private List<Method> getAnnotatedMethods(Class<?> clazz) { List<Method> annotatedMethods = new ArrayList<>(); for (Method method : clazz.getDeclaredMethods()) { if (method.isAnnotationPresent(Subscribe.class)) { Class<?>[] parameterTypes = method.getParameterTypes(); Preconditions.checkArgument(parameterTypes.length == 1, “Method %s has @Subscribe annotation but has %s parameters.” + “Subscriber methods must have exactly 1 parameter.”, method, parameterTypes.length); annotatedMethods.add(method); } } return annotatedMethods; } } |
EventBus
这就很简单了,只是调用了ObserverRegistry的对应方法
public class EventBus {
private Executor executor; private ObserverRegistry registry = new ObserverRegistry(); public EventBus() { this(MoreExecutors.directExecutor()); } protected EventBus(Executor executor) { this.executor = executor; } public void register(Object object) { registry.register(object); } public void post(Object event) { List<ObserverAction> observerActions = registry.getMatchedObserverActions(event); for (ObserverAction observerAction : observerActions) { executor.execute(new Runnable() { @Override public void run() { observerAction.execute(event); } }); } } } |
AsyncEventBus
是一种对于EventBus的装饰器设计,利用EventBus原有的构造函数来创建了一个新的EventBus
本章重点
我们实现了一个EventBus框架,这个框架可以有效的让程序员只聚焦于业务开发,实现了同步阻塞和异步非阻塞的消息发送