Guava中提供了一种事件总线(EventBus)的技术,是观察者模式的一种实现。jdk中也提供了Observer的接口,Guava的优势是更易于使用,并且提供了同步和异步的通知机制。

简介

Guava中提供了一种事件总线(EventBus)的技术,是观察者模式的一种实现。jdk中也提供了Observer的接口,Guava的优势是更易于使用,并且提供了同步和异步的通知机制。
观察者模式,在软件的开发中有广泛的应用,用于模块之间的解耦。比如,订单系统中,当创建订单后,会向客户、商家发送通知,由他们进行不同的处理。

优点:

  1. 编程简单,异步,同步,异常处理方便;
  2. 单机模式,不需要额外的组件依赖。

缺点:
单机,消息没有持久化。

对消息敏感,或者高吞吐情况下,还是要使用MQ。

使用

引入如下的依赖:

1
2
3
4
5
<dependency>  
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>30.0-jre</version>
</dependency>

名词解释

使用EventBus的几个关键概念如下:

  1. 事件(Event),EventBus中处理的数据单元,可以是任意Object,一般根据业务定义类;
  2. 订阅(Subscribing),将监听者注册到EventBus中的动作,订阅后,就能接收到Event了;
  3. 监听者(Listener),处理Event的对象,暴露出处理方法;
  4. 处理方法(Handler method),加上@Subscribe注解的方法,用来接收事件并处理;
  5. 发布事件(Posting an event),将事件发送到EventBus,所有的监听此事件的监听者都会收到通知。

创建事件

也就是定义一个数据总线中传递的数据单元。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package com.abumaster.example.eventbus.simple.event;  

import lombok.Data;

import java.util.Date;

/**
* 订单事件
*
* @author zhangguofeng
* @version 1.0
* @date 2021/11/28
*/@Data
public class OrderEvent {
/** 订单ID*/
private Long id;
/** 用户id*/
private String userId;
/** 描述*/
private String desc;
/** 订单创建时间*/
private Date date;

}

创建监听者

监听事件发生后的处理函数。
针对OrderEvent事件,有商家和消费者关注,商家监听到后,会针对该订单进行确认和安排发货,消费者监听到订单创建后,则会收到通知。。。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package com.abumaster.example.eventbus.simple.listener;  

import com.abumaster.example.eventbus.simple.event.OrderEvent;
import com.google.common.eventbus.Subscribe;
import lombok.extern.slf4j.Slf4j;

/**
* 简单的事件监听器.
* 使用注解,声明 订阅了什么事件 @Subscribe
* * @author zhangguofeng
* @version 1.0
* @date 2021/11/28
*/@Slf4j
public class SimpleEventListener {

@Subscribe
public void customerListener(OrderEvent orderEvent) {
log.info("[消费者] 订单:{}, 商家处理中!", orderEvent.getId());
}
@Subscribe
public void merchantListener(OrderEvent orderEvent) {
log.info("[商家] 处理订单:{}", orderEvent.getId());
}
}

创建EventBus

提供了两种ventBus,一种是同步的,由发送事件的线程依次调用监听处理函数;
另一种是异步的,则会用其他线程处理(自己创建一个线程池在初始化时作为参数)。

1
2
3
4
/** 声明一个事件总线*/  
private static final EventBus EVENT_BUS = new EventBus();
/** 异步的消息总线*/
private static final AsyncEventBus ASYNC_EVENT_BUS = new AsyncEventBus(Executors.newCachedThreadPool());

将监听器注册到EventBus中

监听器注册到EventBus中,才能够使用。

1
2
EVENT_BUS.register(new SimpleEventListener());  
ASYNC_EVENT_BUS.register(new SimpleEventListener());

发送事件

1
2
3
4
5
// 模拟生成一个订单
OrderEvent orderEvent = new OrderEvent();
// 将订单发送到EventBus
EVENT_BUS.post(orderEvent);

异常的处理

创建EventBus的时候,构造函数中有一个参数,是个接口SubscriberExceptionHandler,我们定义一个处理类,并实现这个接口,来处理异常信息。

1
2
3
4
5
6
7
8
9
10
11
@Slf4j  
public class ErrorHandler implements SubscriberExceptionHandler {

@Override
public void handleException(Throwable exception, SubscriberExceptionContext context) {
log.info(" 事件:{}错误!{}",context.getEvent(), exception.toString());
}
}

/** 声明一个事件总线*/
private static final EventBus EVENT_BUS = new EventBus(new ErrorHandler());