观察者模式就是对对象内部的变化进行观察,当发生改变时做出相应的响应。代码样例见 设计模式整理 !
因为观察者模式较为重要,使用频率较高,JDK早已经提供了内置的观察者接口以及被观察者父类。
JDK中的观察者接口源码如下
public interface Observer {
/**
* 当被观察者发生变化时,执行的方法
*
* @param o 被观察者父类,这里一般要强制转化成被观察者子类
* @param arg an argument passed to the <code>notifyObservers</code>
* method.
*/
void update(Observable o, Object arg);
}
被观察者父类源码,我们可以看到它使用了Vector的List列表来保存观察者接口对象,Vector本身是线程安全的,虽然现在已经用的并不多。
public class Observable {
//被观察者是否发生了改变
private boolean changed = false;
//保存观察者接口对象的列表
private Vector<Observer> obs;
/** Construct an Observable with zero Observers. */
public Observable() {
obs = new Vector<>();
}
/**
* 注册观察者接口对象,加了显示锁来保证线程安全
*
* @param o an observer to be added.
* @throws NullPointerException if the parameter o is null.
*/
public synchronized void addObserver(Observer o) {
if (o == null)
throw new NullPointerException();
if (!obs.contains(o)) {
obs.addElement(o);
}
}
/**
* 移除观察者接口对象
* @param o the observer to be deleted.
*/
public synchronized void deleteObserver(Observer o) {
obs.removeElement(o);
}
/**
* 当被观察者发生改变时,通知观察者
*
* @see java.util.Observable#clearChanged()
* @see java.util.Observable#hasChanged()
* @see java.util.Observer#update(java.util.Observable, java.lang.Object)
*/
public void notifyObservers() {
notifyObservers(null);
}
/**
* 调用观察者对象完成响应操作
*
* @param arg any object.
* @see java.util.Observable#clearChanged()
* @see java.util.Observable#hasChanged()
* @see java.util.Observer#update(java.util.Observable, java.lang.Object)
*/
public void notifyObservers(Object arg) {
/*
* a temporary array buffer, used as a snapshot of the state of
* current Observers.
*/
Object[] arrLocal;
synchronized (this) {
if (!changed)
return;
//将观察者列表转化成数组
arrLocal = obs.toArray();
clearChanged();
}
//调用逐个观察者进行响应
for (int i = arrLocal.length-1; i>=0; i--)
((Observer)arrLocal[i]).update(this, arg);
}
/**
* 清除所有的观察者接口对象
*/
public synchronized void deleteObservers() {
obs.removeAllElements();
}
/**
* 告知被观察者已经发生了改变,这是一个线程安全的
*/
protected synchronized void setChanged() {
changed = true;
}
/**
* 观察者已经做出反应后恢复被观察者的改变状态为未改变
*
* @see java.util.Observable#notifyObservers()
* @see java.util.Observable#notifyObservers(java.lang.Object)
*/
protected synchronized void clearChanged() {
changed = false;
}
/**
* 测试被观察者是否发生了改变
*
* @return <code>true</code> if and only if the <code>setChanged</code>
* method has been called more recently than the
* <code>clearChanged</code> method on this object;
* <code>false</code> otherwise.
* @see java.util.Observable#clearChanged()
* @see java.util.Observable#setChanged()
*/
public synchronized boolean hasChanged() {
return changed;
}
/**
* 获取观察者的数量
*
* @return the number of observers of this object.
*/
public synchronized int countObservers() {
return obs.size();
}
}
我们先用一个简单的样例来说明如何使用JDK的这一套观察者模式
首先我们需要一个实际的观察者来实现观察者接口
public class Subscribe implements Observer {
/**
* 构造函数,让被观察者注册自己,便于自己对被观察者进行观察
* @param o
*/
public Subscribe(Observable o) {
o.addObserver(this);
}
/**
* 被观察者发生变化时,做出响应
* @param o
* @param arg
*/
@Override
public void update(Observable o, Object arg) {
System.out.println("收到通知" + ((Publish)o).getData());
}
}
被观察者
/**
* 被观察者子类
*/
public class Publish extends Observable {
@Getter
private String data = "";
/**
* 当data属性发生变化时,通知观察者做出响应
* @param data
*/
public void setData(String data) {
if (this.data !=null && !this.data.equals(data)) {
this.data = data;
setChanged();
}
notifyObservers();
}
}
main方法
public class ObserverMain {
public static void main(String[] args) {
//创建被观察者子类对象
Publish publish = new Publish();
//创建观察者对象,并注册进被观察者子类中
new Subscribe(publish);
//被观察者发生变化
publish.setData("开始");
}
}
运行结果
收到通知开始
这是一个相对简单的样例,一般我们会使用观察者模式来进行MQ消息队列的发送。
以RabbitMQ为例,有一个门店的队列,交换机
/**
* rabbitmq配置
*
*/
@Configuration
public class RabbitmqConfig {
public static final String STORE_QUEUE = "store";
@Bean
public Queue storeQueue() {
return new Queue(STORE_QUEUE);
}
@Bean
public TopicExchange providerExchange() {
return new TopicExchange(ServiceProviderCenterMq.MQ_EXCHANGE_PROVIDER);
}
@Bean
public Binding bingingStoreToProvider(){
return BindingBuilder.bind(storeQueue()).to(providerExchange())
.with(ServiceProviderCenterMq.ROUTING_KEY_ROLE_ADD);
}
}
重写消息生产者
@Slf4j
@Component
public class MessageSender implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
public void send(String exchange,String routingKey,Object content) {
log.info("send content=" + content);
this.rabbitTemplate.setMandatory(true);
this.rabbitTemplate.setConfirmCallback(this);
this.rabbitTemplate.setReturnCallback(this);
this.rabbitTemplate.convertAndSend(exchange,routingKey,serialize(content));
}
/**
* 确认后回调:
* @param correlationData
* @param ack
* @param cause
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (!ack) {
log.info("send ack fail, cause = " + cause);
} else {
log.info("send ack success");
}
}
/**
* 失败后return回调:
*
* @param message
* @param replyCode
* @param replyText
* @param exchange
* @param routingKey
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("send fail return-message = " + new String(message.getBody()) + ", replyCode: " + replyCode + ", replyText: " + replyText + ", exchange: " + exchange + ", routingKey: " + routingKey);
}
/**
* 对消息对象进行二进制序列化
* @param o
* @return
*/
private byte[] serialize(Object o) {
Kryo kryo = new Kryo();
ByteArrayOutputStream stream = new ByteArrayOutputStream();
Output output = new Output(stream);
kryo.writeObject(output, o);
output.close();
return stream.toByteArray();
}
}
我们的观察者类,对门店服务的服务集合新增服务对象进行观察
/**
* 服务新增观察者
*/
public class ServiceObserver implements Observer {
private MessageSender sender = SpringBootUtil.getBean(MessageSender.class);
public ServiceObserver(Observable o) {
o.addObserver(this);
}
@Override
public void update(Observable o, Object arg) {
CompletableFuture.runAsync(() ->
this.sender.send(ServiceProviderCenterMq.MQ_EXCHANGE_PROVIDER,
ServiceProviderCenterMq.ROUTING_KEY_ROLE_ADD,
((ProviderServiceLevel) o).getServiceProviders().poll())
);
}
}
具体的被观察者类为门店服务的分类
public class ProviderServiceLevel extends Observable implements Provider
它有一个服务的队列
@Getter
private Queue<Provider> serviceProviders = new ConcurrentLinkedQueue<>();
也有一个服务的列表
@Getter
private List<Provider> serviceListProviders = new CopyOnWriteArrayList<>();
服务分类添加服务对象的方法,大家可以思考一下为什么使用队列,而不是直接使用列表在观察者中取出服务对象
@Override
public boolean addProvider(Provider provider) {
ServiceDao serviceDao = SpringBootUtil.getBean(ServiceDao.class);
//如果添加的是服务分类
if (provider instanceof ProviderServiceLevel) {
ParamLevel paramLevel = new ParamLevel(this.id, ((ProviderServiceLevel) provider).getId());
((ProviderServiceLevel) provider).setLevel(2);
serviceDao.addLevelToLevel(paramLevel);
//如果添加的是服务
}else if (provider instanceof ProviderService) {
ParamLevel paramLevel = new ParamLevel(this.id, ((ProviderService) provider).getService().getId());
serviceDao.addServiceToLevel(paramLevel);
this.serviceListProviders.add(provider);
//将添加的服务入队列
this.serviceProviders.add(provider);
setChanged();
//通知观察者取出队列服务对象,进行MQ的发送
notifyObservers();
return true;
}
return this.serviceListProviders.add(provider);
}
最后在访问的controller中会放三个缓存
//服务分类缓存
private Map<Long,Provider> cacheLevelProvider = new ConcurrentHashMap<>();
//门店缓存
private Map<Long,Provider> cacheStoreProvider = new ConcurrentHashMap<>();
//观察者缓存
private Map<Long,ServiceObserver> observerMap = new ConcurrentHashMap<>();
具体的添加服务方法如下
/**
* 新增商家服务
* @param providerService
* @return
*/
@Transactional
@SuppressWarnings("uncheck")
@PostMapping("/serviceprovider-anon/newproviderservice")
public Result<String> newProviderService(@RequestParam("storeid") Long storeId,
@RequestParam("servicelevelid") Long serviceLevelId,
@RequestBody ProviderService providerService) {
Provider serviceLevelProvider;
Provider service = ProviderFactory.createProviderService(providerService, true);
if (cacheLevelProvider.containsKey(serviceLevelId)) {
serviceLevelProvider = this.cacheLevelProvider.get(serviceLevelId);
}else {
serviceLevelProvider = this.levelProvider.findProvider(serviceLevelId);
this.cacheLevelProvider.put(serviceLevelId,serviceLevelProvider);
}
//此处如果不将观察者对象加入缓存,就会不断创建观察者,其实我们只需要一个观察者
if (!observerMap.containsKey(serviceLevelId)) {
observerMap.put(serviceLevelId,new ServiceObserver((ProviderServiceLevel) serviceLevelProvider));
}
//服务分类添加了服务对象,就会发生被观察者状态的改变,使观察者做出响应
serviceLevelProvider.addProvider(service);
Provider storeProvider;
if (cacheStoreProvider.containsKey(storeId)) {
storeProvider = cacheStoreProvider.get(storeId);
}else {
storeProvider = this.storeProvider.findProvider(storeId);
cacheStoreProvider.put(storeId,storeProvider);
}
storeProvider.addProvider(service);
return Result.success("添加商家服务成功");
}