Guava中提供了一种事件总线(EventBus)的技术,是观察者模式的一种实现。jdk中也提供了Observer的接口,Guava的优势是更易于使用,并且提供了同步和异步的通知机制。
简介
Guava中提供了一种事件总线(EventBus)的技术,是观察者模式的一种实现。jdk中也提供了Observer的接口,Guava的优势是更易于使用,并且提供了同步和异步的通知机制。
利用Spring的特性,实现对事件总线的使用。
使用
定义一个注解,用来标识哪些类需要被注册到EventBus中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| package com.abumaster.example.eventbus.grace; import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target;
@Target({ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) public @interface EventBusListener { String name() default ""; boolean async() default true ; }
|
async表示,该监听器注册到同步EventBus中还是异步EventBus中。
定义一个component,用来创建EventBus,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
| package com.abumaster.example.eventbus.grace; import com.google.common.eventbus.AsyncEventBus; import com.google.common.eventbus.EventBus; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationContext; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit;
@Component @Slf4j public class EventBusCenter { @Autowired private ApplicationContext applicationContext; private final EventBus syncEventBus = new EventBus(); private final AsyncEventBus asyncEventBus = new AsyncEventBus(new ThreadPoolExecutor(4,8, 100, TimeUnit.SECONDS,new ArrayBlockingQueue<>(100), new ThreadPoolExecutor.CallerRunsPolicy())); public void postSync(Object event) { syncEventBus.post(event); } public void postAsync(Object event) { asyncEventBus.post(event); }
@PostConstruct public void registerAllListener() { Map<String, Object> beansWithAnnotation = applicationContext.getBeansWithAnnotation(EventBusListener.class); beansWithAnnotation.forEach((k,v)->{ EventBusListener anno = v.getClass().getAnnotation(EventBusListener.class); String dsc = anno.name(); log.info("注入:{} 到{}事件总线中", StringUtils.isEmpty(dsc)?k:dsc, anno.async()?"异步":"同步"); if (anno.async()) { asyncEventBus.register(v); } else { syncEventBus.register(v); } }); } }
|
定义事件监听器: 针对同一个事件,最好写到一个类中,同时,只能注册到一个事件总线中,因此要记得发布的时候调用对应的同步或异步方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| package com.abumaster.example.eventbus.grace; import com.abumaster.example.eventbus.simple.event.OrderEvent; import com.google.common.eventbus.Subscribe; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component;
@Component @EventBusListener(name = "订单监听", async = false) @Slf4j public class MyOrderListener { @Subscribe private void processOne(OrderEvent orderEvent) { log.info("为订单:{},发送消息!", orderEvent.getId()); } @Subscribe private void processTwo(OrderEvent orderEvent) { log.info("我处理了订单【{}】", orderEvent.getId()); } }
|
发布事件,使用component进行发送事件。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| package com.abumaster.example.eventbus.grace.service; import com.abumaster.example.eventbus.grace.EventBusCenter; import com.abumaster.example.eventbus.simple.event.OrderEvent; import com.abumaster.example.eventbus.simple.publisher.OrderGenService; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import javax.annotation.Resource;
@Service @Slf4j public class OrderService { @Resource private EventBusCenter eventBusCenter; public void generateOrder() { OrderEvent orderEvent = OrderGenService.generateOrder(); log.info("生成一个订单:{}", orderEvent.getId()); eventBusCenter.postAsync(orderEvent); eventBusCenter.postSync(orderEvent); } }
|
结果如下:
1 2 3 4 5 6 7 8 9
| 异步: 18:34:26.632[main] INFO - 生成一个订单:36330 18:34:26.637[pool-2-thread-2] INFO - 为订单:36330,发送消息! 18:34:26.637[pool-2-thread-1] INFO - 我处理了订单【36330】
同步: 18:35:14.724[main] INFO - 生成一个订单:63091 18:35:14.727[main] INFO - 我处理了订单【63091】 18:35:14.727[main] INFO - 为订单:63091,发送消息!
|