之前我们说了观察者模式的原理,实现,应用场景,有着异步非阻塞,同步阻塞等实现方式

今天我们去实现给一个简单基于观察者模式的通用框架

首先说一下异步非阻塞观察者模式的实现,可以有两种,一种是每次都创建一个新的线程来执行代码逻辑,另外一种则是,在UserController中,使用线程池来多次执行异步任务

对于第一种实现,是非常不支持的,因为线程对于Java来说,是一个大对象,频繁的创建和销毁是正确,于是可以考虑对于第二种的实现方式,利用一个线程池来进行异步执行,但是,我们还是对于业务代码的耦合太深,于是我们需要进行了解耦,来设计一个框架,从而做到解耦,这时候,我们可以模仿Google EventBus来实现一个框架

EventBus框架功能需求的介绍

我们要求在这个框架中,既能支持同步阻塞的实现,也能支持异步非阻塞的实现

直接使用Guava EventBus的话,我们的实现可以如下

public class UserController {

private UserService userService; // 依赖注入

private EventBus eventBus;

private static final int DEFAULT_EVENTBUS_THREAD_POOL_SIZE = 20;

public UserController() {

//eventBus = new EventBus(); // 同步阻塞模式

eventBus = new AsyncEventBus(Executors.newFixedThreadPool(DEFAULT_EVENTBUS_THREAD_POOL_SIZE)); // 异步非阻塞模式

}

public void setRegObservers(List<Object> observers) {

for (Object observer : observers) {

eventBus.register(observer);

}

}

public Long register(String telephone, String password) {

//省略输入参数的校验代码

//省略userService.register()异常的try-catch代码

long userId = userService.register(telephone, password);

eventBus.post(userId);

return userId;

}

}

public class RegPromotionObserver {

private PromotionService promotionService; // 依赖注入

@Subscribe

public void handleRegSuccess(long userId) {

promotionService.issueNewUserExperienceCash(userId);

}

}

public class RegNotificationObserver {

private NotificationService notificationService;

@Subscribe

public void handleRegSuccess(long userId) {

notificationService.sendInboxMessage(userId, “…”);

}

}

其实现思路就是,在eventBus中,传入一个线程池,并且在原有的观察者上,加上@Subscribe来说明是调用的接口,然后通过register来注册,利用post来给Observer发送消息

那么我们只需要关注@Subscribe这个注解,EventBus本身即可,需要关注的函数就是register() post()

我们具体讲一下这几个类和函数

EventBus,AsyncEventBus

EventBus实现了同步阻塞的观察者模式,AsyncEventBus则是一种装饰器,提供了异步非阻塞的观察者模式

EventBus eventBus = new EventBus(); // 同步阻塞模式

EventBus eventBus = new AsyncEventBus(Executors.newFixedThreadPool(8));// 异步阻塞模式

register()函数

注册观察者的

public void register(Object object);

unregister()函数

删除某个观察者的

public void unregister(Object object);

post()函数

用于给观察者发消息的

public void post(Object event);

不过于经典的观察者不同的是,在调用post函数发送消息的时候,是发送给了所有的观察者,然后进行匹配,匹配到的消息类型才会接收到

比如 AObserver能接受XMsg,BObserver能接收YMsg,Cobserver能接收到ZMsg,其中X是Y的父类,那么接受请求的时候,能够收到的消息的可匹配的观察者如下

XMsg xMsg = new XMsg();

YMsg yMsg = new YMsg();

ZMsg zMsg = new ZMsg();

post(xMsg); => AObserver、BObserver接收到消息

post(yMsg); => BObserver接收到消息

post(zMsg); => CObserver接收到消息

然后就是@Subscribe注解,利用这个注解来表示能够接受消息的函数

那么,了解了这些,就可以手动的实现一个EventBus框架了

其中EventBus的核心函数是register和post

主要的类是EventBus和@Subscribe注解

那么其实register就是在EventBus这个类中维护的一个map中添加了一个对象,对于Post(),则是找到这个map中对应的方法,利用反射,发送过去

那么,在我们的具体实现中,整个框架被拆分为了5个类 EventBus,AsyncEventBus,Subscibe,ObserverAction,ObserverRegistry

首先是Subscuve

这是一个注解,用于表名观察者中哪个函数还可以接受消息,用于在方法之上

@Retention(RetentionPolicy.RUNTIME)

@Target(ElementType.METHOD)

@Beta

public @interface Subscribe {}

然后是ObserverAction

用于表示被@Subscibe注解的方法,其中targer是被观察者的类,method是方法

public class ObserverAction {

private Object target;

private Method method;

public ObserverAction(Object target, Method method) {

this.target = Preconditions.checkNotNull(target);

this.method = method;

this.method.setAccessible(true);

}

public void execute(Object event) { // event是method方法的参数

try {

method.invoke(target, event);

} catch (InvocationTargetException | IllegalAccessException e) {

e.printStackTrace();

}

}

}

然后是ObserverRegistry

前面讲到的Observer注册表,一个复杂的类,是保存了类和方法的对应关系,利用了一个map,这个map中,维护了多个class对象和CopyOnWriteArraySet的对应关系

public class ObserverRegistry {

private ConcurrentMap<Class<?>, CopyOnWriteArraySet<ObserverAction>> registry = new ConcurrentHashMap<>();

public void register(Object observer) {

Map<Class<?>, Collection<ObserverAction>> observerActions = findAllObserverActions(observer);

for (Map.Entry<Class<?>, Collection<ObserverAction>> entry : observerActions.entrySet()) {

Class<?> eventType = entry.getKey();

Collection<ObserverAction> eventActions = entry.getValue();

CopyOnWriteArraySet<ObserverAction> registeredEventActions = registry.get(eventType);

if (registeredEventActions == null) {

registry.putIfAbsent(eventType, new CopyOnWriteArraySet<>());

registeredEventActions = registry.get(eventType);

}

registeredEventActions.addAll(eventActions);

}

}

public List<ObserverAction> getMatchedObserverActions(Object event) {

List<ObserverAction> matchedObservers = new ArrayList<>();

Class<?> postedEventType = event.getClass();

for (Map.Entry<Class<?>, CopyOnWriteArraySet<ObserverAction>> entry : registry.entrySet()) {

Class<?> eventType = entry.getKey();

Collection<ObserverAction> eventActions = entry.getValue();

if (eventType.isAssignableFrom(postedEventType)) {

matchedObservers.addAll(eventActions);

}

}

return matchedObservers;

}

private Map<Class<?>, Collection<ObserverAction>> findAllObserverActions(Object observer) {

Map<Class<?>, Collection<ObserverAction>> observerActions = new HashMap<>();

Class<?> clazz = observer.getClass();

for (Method method : getAnnotatedMethods(clazz)) {

Class<?>[] parameterTypes = method.getParameterTypes();

Class<?> eventType = parameterTypes[0];

if (!observerActions.containsKey(eventType)) {

observerActions.put(eventType, new ArrayList<>());

}

observerActions.get(eventType).add(new ObserverAction(observer, method));

}

return observerActions;

}

private List<Method> getAnnotatedMethods(Class<?> clazz) {

List<Method> annotatedMethods = new ArrayList<>();

for (Method method : clazz.getDeclaredMethods()) {

if (method.isAnnotationPresent(Subscribe.class)) {

Class<?>[] parameterTypes = method.getParameterTypes();

Preconditions.checkArgument(parameterTypes.length == 1,

“Method %s has @Subscribe annotation but has %s parameters.”

+ “Subscriber methods must have exactly 1 parameter.”,

method, parameterTypes.length);

annotatedMethods.add(method);

}

}

return annotatedMethods;

}

}

EventBus

这就很简单了,只是调用了ObserverRegistry的对应方法

public class EventBus {

private Executor executor;

private ObserverRegistry registry = new ObserverRegistry();

public EventBus() {

this(MoreExecutors.directExecutor());

}

protected EventBus(Executor executor) {

this.executor = executor;

}

public void register(Object object) {

registry.register(object);

}

public void post(Object event) {

List<ObserverAction> observerActions = registry.getMatchedObserverActions(event);

for (ObserverAction observerAction : observerActions) {

executor.execute(new Runnable() {

@Override

public void run() {

observerAction.execute(event);

}

});

}

}

}

AsyncEventBus

是一种对于EventBus的装饰器设计,利用EventBus原有的构造函数来创建了一个新的EventBus

本章重点

我们实现了一个EventBus框架,这个框架可以有效的让程序员只聚焦于业务开发,实现了同步阻塞和异步非阻塞的消息发送

发表评论

邮箱地址不会被公开。 必填项已用*标注