Guava中提供了一种事件总线(EventBus)的技术,是观察者模式的一种实现。jdk中也提供了Observer的接口,Guava的优势是更易于使用,并且提供了同步和异步的通知机制。

简介

Guava中提供了一种事件总线(EventBus)的技术,是观察者模式的一种实现。jdk中也提供了Observer的接口,Guava的优势是更易于使用,并且提供了同步和异步的通知机制。

利用Spring的特性,实现对事件总线的使用。

使用

定义一个注解,用来标识哪些类需要被注册到EventBus中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package com.abumaster.example.eventbus.grace;  

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
* 用来标识一个类是事件监听器
* 有此标记的都会被注册到数据总线中
*
* @author zhangguofeng
* @version 1.0
* @date 2021/11/28
*/@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface EventBusListener {
String name() default "";
/** true为异步 ; false为同步*/
boolean async() default true ;
}

async表示,该监听器注册到同步EventBus中还是异步EventBus中。

定义一个component,用来创建EventBus,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package com.abumaster.example.eventbus.grace;  

import com.google.common.eventbus.AsyncEventBus;
import com.google.common.eventbus.EventBus;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
* 事件总线组件
*
* @author zhangguofeng
* @version 1.0
* @date 2021/11/28
*/@Component
@Slf4j
public class EventBusCenter {

@Autowired
private ApplicationContext applicationContext;

/** 同步*/
private final EventBus syncEventBus = new EventBus();

/** 异步*/
private final AsyncEventBus asyncEventBus = new AsyncEventBus(new ThreadPoolExecutor(4,8,
100, TimeUnit.SECONDS,new ArrayBlockingQueue<>(100),
new ThreadPoolExecutor.CallerRunsPolicy()));

/** 同步发布*/
public void postSync(Object event) {
syncEventBus.post(event);
}
/** 异步发布*/
public void postAsync(Object event) {
asyncEventBus.post(event);
}


/**
* 容器初始化后,自动注册所有的监听器
*
* @see EventBusListener
*/
@PostConstruct
public void registerAllListener() {
Map<String, Object> beansWithAnnotation = applicationContext.getBeansWithAnnotation(EventBusListener.class);
beansWithAnnotation.forEach((k,v)->{
EventBusListener anno = v.getClass().getAnnotation(EventBusListener.class);
String dsc = anno.name();
log.info("注入:{} 到{}事件总线中", StringUtils.isEmpty(dsc)?k:dsc, anno.async()?"异步":"同步");
if (anno.async()) {
asyncEventBus.register(v);
} else {
syncEventBus.register(v);
}
});
}

}

定义事件监听器: 针对同一个事件,最好写到一个类中,同时,只能注册到一个事件总线中,因此要记得发布的时候调用对应的同步或异步方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package com.abumaster.example.eventbus.grace;  

import com.abumaster.example.eventbus.simple.event.OrderEvent;
import com.google.common.eventbus.Subscribe;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

/**
* 订单监听器
*
* @author zhangguofeng
* @version 1.0
* @date 2021/11/28
*/@Component
@EventBusListener(name = "订单监听", async = false)
@Slf4j
public class MyOrderListener {
@Subscribe
private void processOne(OrderEvent orderEvent) {
log.info("为订单:{},发送消息!", orderEvent.getId());
}
@Subscribe
private void processTwo(OrderEvent orderEvent) {
log.info("我处理了订单【{}】", orderEvent.getId());
}
}

发布事件,使用component进行发送事件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.abumaster.example.eventbus.grace.service;  

import com.abumaster.example.eventbus.grace.EventBusCenter;
import com.abumaster.example.eventbus.simple.event.OrderEvent;
import com.abumaster.example.eventbus.simple.publisher.OrderGenService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;

/**
* 向事件总线中发布消息
*
* @author zhangguofeng
* @version 1.0
* @date 2021/11/28
*/@Service
@Slf4j
public class OrderService {

/** 注入事件中心服务*/
@Resource
private EventBusCenter eventBusCenter;


public void generateOrder() {
OrderEvent orderEvent = OrderGenService.generateOrder();
log.info("生成一个订单:{}", orderEvent.getId());
eventBusCenter.postAsync(orderEvent);
eventBusCenter.postSync(orderEvent);
}

}

结果如下:

1
2
3
4
5
6
7
8
9
异步:
18:34:26.632[main] INFO - 生成一个订单:36330
18:34:26.637[pool-2-thread-2] INFO - 为订单:36330,发送消息!
18:34:26.637[pool-2-thread-1] INFO - 我处理了订单【36330】

同步:
18:35:14.724[main] INFO - 生成一个订单:63091
18:35:14.727[main] INFO - 我处理了订单【63091】
18:35:14.727[main] INFO - 为订单:63091,发送消息!