前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >使用JDK的观察者接口进行消息推送 顶

使用JDK的观察者接口进行消息推送 顶

作者头像
算法之名
发布2019-08-20 10:46:02
4720
发布2019-08-20 10:46:02
举报
文章被收录于专栏:算法之名算法之名

观察者模式就是对对象内部的变化进行观察,当发生改变时做出相应的响应。代码样例见 设计模式整理 !

因为观察者模式较为重要,使用频率较高,JDK早已经提供了内置的观察者接口以及被观察者父类。

JDK中的观察者接口源码如下

代码语言:javascript
复制
public interface Observer {
    /**
     * 当被观察者发生变化时,执行的方法
     *
     * @param   o     被观察者父类,这里一般要强制转化成被观察者子类
     * @param   arg   an argument passed to the <code>notifyObservers</code>
     *                 method.
     */
    void update(Observable o, Object arg);
}

被观察者父类源码,我们可以看到它使用了Vector的List列表来保存观察者接口对象,Vector本身是线程安全的,虽然现在已经用的并不多。

代码语言:javascript
复制
public class Observable {
    //被观察者是否发生了改变
    private boolean changed = false;
    //保存观察者接口对象的列表
    private Vector<Observer> obs;

    /** Construct an Observable with zero Observers. */

    public Observable() {
        obs = new Vector<>();
    }

    /**
     * 注册观察者接口对象,加了显示锁来保证线程安全
     *
     * @param   o   an observer to be added.
     * @throws NullPointerException   if the parameter o is null.
     */
    public synchronized void addObserver(Observer o) {
        if (o == null)
            throw new NullPointerException();
        if (!obs.contains(o)) {
            obs.addElement(o);
        }
    }

    /**
     * 移除观察者接口对象
     * @param   o   the observer to be deleted.
     */
    public synchronized void deleteObserver(Observer o) {
        obs.removeElement(o);
    }

    /**
     * 当被观察者发生改变时,通知观察者
     *
     * @see     java.util.Observable#clearChanged()
     * @see     java.util.Observable#hasChanged()
     * @see     java.util.Observer#update(java.util.Observable, java.lang.Object)
     */
    public void notifyObservers() {
        notifyObservers(null);
    }

    /**
     * 调用观察者对象完成响应操作
     *
     * @param   arg   any object.
     * @see     java.util.Observable#clearChanged()
     * @see     java.util.Observable#hasChanged()
     * @see     java.util.Observer#update(java.util.Observable, java.lang.Object)
     */
    public void notifyObservers(Object arg) {
        /*
         * a temporary array buffer, used as a snapshot of the state of
         * current Observers.
         */
        Object[] arrLocal;

        synchronized (this) {
            if (!changed)
                return;
            //将观察者列表转化成数组
            arrLocal = obs.toArray();
            clearChanged();
        }
        //调用逐个观察者进行响应
        for (int i = arrLocal.length-1; i>=0; i--)
            ((Observer)arrLocal[i]).update(this, arg);
    }

    /**
     * 清除所有的观察者接口对象
     */
    public synchronized void deleteObservers() {
        obs.removeAllElements();
    }

    /**
     * 告知被观察者已经发生了改变,这是一个线程安全的
     */
    protected synchronized void setChanged() {
        changed = true;
    }

    /**
     * 观察者已经做出反应后恢复被观察者的改变状态为未改变
     *
     * @see     java.util.Observable#notifyObservers()
     * @see     java.util.Observable#notifyObservers(java.lang.Object)
     */
    protected synchronized void clearChanged() {
        changed = false;
    }

    /**
     * 测试被观察者是否发生了改变
     *
     * @return  <code>true</code> if and only if the <code>setChanged</code>
     *          method has been called more recently than the
     *          <code>clearChanged</code> method on this object;
     *          <code>false</code> otherwise.
     * @see     java.util.Observable#clearChanged()
     * @see     java.util.Observable#setChanged()
     */
    public synchronized boolean hasChanged() {
        return changed;
    }

    /**
     * 获取观察者的数量
     *
     * @return  the number of observers of this object.
     */
    public synchronized int countObservers() {
        return obs.size();
    }
}

我们先用一个简单的样例来说明如何使用JDK的这一套观察者模式

首先我们需要一个实际的观察者来实现观察者接口

代码语言:javascript
复制
public class Subscribe implements Observer {
    /**
     * 构造函数,让被观察者注册自己,便于自己对被观察者进行观察
     * @param o
     */
    public Subscribe(Observable o) {
        o.addObserver(this);
    }

    /**
     * 被观察者发生变化时,做出响应
     * @param o
     * @param arg
     */
    @Override
    public void update(Observable o, Object arg) {
        System.out.println("收到通知" + ((Publish)o).getData());
    }
}

被观察者

代码语言:javascript
复制
/**
 * 被观察者子类
 */
public class Publish extends Observable {
    @Getter
    private String data = "";

    /**
     * 当data属性发生变化时,通知观察者做出响应
     * @param data
     */
    public void setData(String data) {
        if (this.data !=null && !this.data.equals(data)) {
            this.data = data;
            setChanged();
        }
        notifyObservers();
    }
}

main方法

代码语言:javascript
复制
public class ObserverMain {
    public static void main(String[] args) {
        //创建被观察者子类对象
        Publish publish = new Publish();
        //创建观察者对象,并注册进被观察者子类中
        new Subscribe(publish);
        //被观察者发生变化
        publish.setData("开始");
    }
}

运行结果

收到通知开始

这是一个相对简单的样例,一般我们会使用观察者模式来进行MQ消息队列的发送。

以RabbitMQ为例,有一个门店的队列,交换机

代码语言:javascript
复制
/**
 * rabbitmq配置
 *
 */
@Configuration
public class RabbitmqConfig {
   public static final String STORE_QUEUE = "store";
   @Bean
   public Queue storeQueue() {
      return new Queue(STORE_QUEUE);
   }


   @Bean
   public TopicExchange providerExchange() {
      return new TopicExchange(ServiceProviderCenterMq.MQ_EXCHANGE_PROVIDER);
   }

   @Bean
   public Binding bingingStoreToProvider(){
      return BindingBuilder.bind(storeQueue()).to(providerExchange())
            .with(ServiceProviderCenterMq.ROUTING_KEY_ROLE_ADD);
   }
}

重写消息生产者

代码语言:javascript
复制
@Slf4j
@Component
public class MessageSender implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send(String exchange,String routingKey,Object content) {
        log.info("send content=" + content);
        this.rabbitTemplate.setMandatory(true);
        this.rabbitTemplate.setConfirmCallback(this);
        this.rabbitTemplate.setReturnCallback(this);
        this.rabbitTemplate.convertAndSend(exchange,routingKey,serialize(content));
    }

    /**
     * 确认后回调:
     * @param correlationData
     * @param ack
     * @param cause
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (!ack) {
            log.info("send ack fail, cause = " + cause);
        } else {
            log.info("send ack success");
        }
    }

    /**
     * 失败后return回调:
     *
     * @param message
     * @param replyCode
     * @param replyText
     * @param exchange
     * @param routingKey
     */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.info("send fail return-message = " + new String(message.getBody()) + ", replyCode: " + replyCode + ", replyText: " + replyText + ", exchange: " + exchange + ", routingKey: " + routingKey);
    }

    /**
     * 对消息对象进行二进制序列化
     * @param o
     * @return
     */
    private byte[] serialize(Object o) {
        Kryo kryo = new Kryo();
        ByteArrayOutputStream stream = new ByteArrayOutputStream();
        Output output = new Output(stream);
        kryo.writeObject(output, o);
        output.close();
        return stream.toByteArray();
    }
}

我们的观察者类,对门店服务的服务集合新增服务对象进行观察

代码语言:javascript
复制
/**
 * 服务新增观察者
 */
public class ServiceObserver implements Observer {
    private MessageSender sender = SpringBootUtil.getBean(MessageSender.class);
    public ServiceObserver(Observable o) {
        o.addObserver(this);
    }
    @Override
    public void update(Observable o, Object arg) {
        CompletableFuture.runAsync(() ->
            this.sender.send(ServiceProviderCenterMq.MQ_EXCHANGE_PROVIDER,
                ServiceProviderCenterMq.ROUTING_KEY_ROLE_ADD,
                ((ProviderServiceLevel) o).getServiceProviders().poll())
        );
    }
}

具体的被观察者类为门店服务的分类

代码语言:javascript
复制
public class ProviderServiceLevel extends Observable implements Provider

它有一个服务的队列

代码语言:javascript
复制
@Getter
private Queue<Provider> serviceProviders = new ConcurrentLinkedQueue<>();

也有一个服务的列表

代码语言:javascript
复制
@Getter
private List<Provider> serviceListProviders = new CopyOnWriteArrayList<>();

服务分类添加服务对象的方法,大家可以思考一下为什么使用队列,而不是直接使用列表在观察者中取出服务对象

代码语言:javascript
复制
@Override
public boolean addProvider(Provider provider) {
    ServiceDao serviceDao = SpringBootUtil.getBean(ServiceDao.class);
    //如果添加的是服务分类
    if (provider instanceof ProviderServiceLevel) {
        ParamLevel paramLevel = new ParamLevel(this.id, ((ProviderServiceLevel) provider).getId());
        ((ProviderServiceLevel) provider).setLevel(2);
        serviceDao.addLevelToLevel(paramLevel);
    //如果添加的是服务    
    }else if (provider instanceof ProviderService) {
        ParamLevel paramLevel = new ParamLevel(this.id, ((ProviderService) provider).getService().getId());
        serviceDao.addServiceToLevel(paramLevel);
        this.serviceListProviders.add(provider);
        //将添加的服务入队列
        this.serviceProviders.add(provider);
        setChanged();
        //通知观察者取出队列服务对象,进行MQ的发送
        notifyObservers();
        return true;
    }
    return this.serviceListProviders.add(provider);
}

最后在访问的controller中会放三个缓存

代码语言:javascript
复制
//服务分类缓存
private Map<Long,Provider> cacheLevelProvider = new ConcurrentHashMap<>();
//门店缓存
private Map<Long,Provider> cacheStoreProvider = new ConcurrentHashMap<>();
//观察者缓存
private Map<Long,ServiceObserver> observerMap = new ConcurrentHashMap<>();

具体的添加服务方法如下

代码语言:javascript
复制
/**
 * 新增商家服务
 * @param providerService
 * @return
 */
@Transactional
@SuppressWarnings("uncheck")
@PostMapping("/serviceprovider-anon/newproviderservice")
public Result<String> newProviderService(@RequestParam("storeid") Long storeId,
                                         @RequestParam("servicelevelid") Long serviceLevelId,
                                         @RequestBody ProviderService providerService) {
    Provider serviceLevelProvider;
    Provider service = ProviderFactory.createProviderService(providerService, true);
    if (cacheLevelProvider.containsKey(serviceLevelId)) {
        serviceLevelProvider = this.cacheLevelProvider.get(serviceLevelId);
    }else {
        serviceLevelProvider = this.levelProvider.findProvider(serviceLevelId);
        this.cacheLevelProvider.put(serviceLevelId,serviceLevelProvider);
    }
    //此处如果不将观察者对象加入缓存,就会不断创建观察者,其实我们只需要一个观察者
    if (!observerMap.containsKey(serviceLevelId)) {
        observerMap.put(serviceLevelId,new ServiceObserver((ProviderServiceLevel) serviceLevelProvider));
    }
    //服务分类添加了服务对象,就会发生被观察者状态的改变,使观察者做出响应
    serviceLevelProvider.addProvider(service);
    Provider storeProvider;
    if (cacheStoreProvider.containsKey(storeId)) {
        storeProvider = cacheStoreProvider.get(storeId);
    }else {
        storeProvider = this.storeProvider.findProvider(storeId);
        cacheStoreProvider.put(storeId,storeProvider);
    }
    storeProvider.addProvider(service);
    return Result.success("添加商家服务成功");
}
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档