专栏首页前端,Java专栏php实现redis消息发布订阅
原创

php实现redis消息发布订阅

基础介绍

Pub/Sub功能(means Publish, Subscribe)即发布及订阅功能

  • 基于事件的系统中,Pub/Sub是目前广泛使用的通信模型,它采用事件作为基本的通信机制,提供大规模系统所要求的松散耦合的交互模式:订阅者(如客户端)以事件订阅的方式表达出它有兴趣接收的一个事件或一类事件;发布者(如服务器)可将订阅者感兴趣的事件随时通知相关订阅者。
  • 消息发布者,即publish客户端,无需独占链接,你可以在publish消息的同时,使用同一个redis-client链接进行其他操作(例如:INCR等)
  • 消息订阅者,即subscribe客户端,需要独占链接,即进行subscribe期间,redis-client无法穿插其他操作,此时client以阻塞的方式等待“publish端”的消息;这一点很好理解,因此subscribe端需要使用单独的链接,甚至需要在额外的线程中使用。

当使用银行卡消费的时候,银行往往会通过微信、短信或邮件通知用户这笔交易的信息,这便是一种发布订阅模式,这里的发布是交易信息的发布,订阅则是各个渠道。这在实际工作中十分常用,Redis 支持这样的一个模式。

发布订阅模式首先需要消息源,也就是要有消息发布出来,比如例子中的银行通知。首先是银行的记账系统,收到了交易的命令,成功记账后,它就会把消息发送出来,这个时候,订阅者就可以收到这个消息进行处理了,观察者模式就是这个模式的典型应用了。

终端实现

订阅,频道为'chat'

发布消息

代码实现

subscribe.php

<?php
ini_set('default_socket_timeout', -1);  //php配置设置不超时
$redis = new Redis();
$redis->connect("127.0.0.1",6379);
//$redis->setOption(Redis::OPT_READ_TIMEOUT, -1);  //redis方式设置不超时,推荐

$redis->subscribe(['chan'],'callback');     //callback为回调函数名称
//$redis->subscribe(['chan'],array(new TestCall(),'callback') ); //如果回调函数是类中的方法名,这样写

// 回调函数,这里写处理逻辑
function callback($instance, $channelName, $message)
{
         echo $channelName, "==>", $message, PHP_EOL;
         
         //$instance,即为上面创建的redis实例对象,在回调函数中,默认的这个参数就是,因此不需专门传参。 这里除了SUBSCRIBE、PSUBSCRIBE、UNSUBSCRIBE、PUNSUBSCRIBE这4条命令之外其它命令都不能使用
         //如果要使用redis中的其他命令,这样实现
         $newredis = new Redis();
        $newredis->connect("127.0.0.1", 6379);
        echo $newredis->get('test') . PHP_EOL;
        $newredis->close();
        
          //可以根据$channelName, $message,处理不同的业务逻辑
          switch($chan) {
               case 'chan-1':
                  ...
                  break;
         
               case 'chan-2':
                              ...
                   break;
           }
           
           switch($message) {
               case 'msg1':
                  ...
                  break;
         
               case 'msg2':
                              ...
                   break;
           }
    
}

publish.php

<?php

$redis = new Redis();
$redis->connect("127.0.0.1",6379);

$redis->publish('chan','this is a message');

代码介绍

  1. subscribe.php中设置不超时

方法1:ini_set('default_socket_timeout', -1);方法2: $redis->setOption(Redis::OPT_READ_TIMEOUT, -1);

如果不设置不超时,60s后会报一个错误

PHP Fatal error: Uncaught RedisException: read error on connection to 127.0.0.1:6379 in subscribe.php:6

方式一的实现,是通过临时修改ini的配置值,default_socket_timeout默认为60s,default_socket_timeout是socket流的超时参数,即socket流从建立到传输再到关闭整个过程必须要在这个参数设置的时间以内完成,如果不能完成,那么PHP将自动结束这个socket并返回一个警告。

方式二是通过修改redis的配置项,因此仅对redis连接生效,相对于方式1,不会产生意外的对其他方法的影响。

批量订阅

redis的psubscribe支持通过模式匹配的方式实现批量订阅,订阅方式

回调函数写函数名或者redis->psubscribe(['my*'],array(new TestCall(),'psubscribe')); //回调函数为类中的方法,类名写你自己定义的类

subscribe.php

<?php
//ini_set('default_socket_timeout', -1);  //不超时
$redis = new Redis();
$redis->connect("127.0.0.1",6379);
$redis->setOption(Redis::OPT_READ_TIMEOUT, -1);

//匹配方式1:发布可用$redis->publish('mymest','this is a message');
//$redis->psubscribe(['my*'],'psubscribe');    

//匹配方式2:发布可用$redis->publish('mydest','this is a message');
//$redis->psubscribe(['my?est'],'psubscribe');

//匹配方式3:发布可用$redis->publish('myaest','this is a message');或$redis->publish('myeest','this is a message');
$redis->psubscribe(['my[ae]est'],'psubscribe');

function psubscribe($redis, $pattern, $chan, $msg) {
      echo "Pattern: $pattern\n";
      echo "Channel: $chan\n";
      echo "Payload: $msg\n";
}

模式匹配规则

支持以下几种,以hello举例:

h?llo subscribes to hello, hallo and hxllo h*llo subscribes to hllo and heeeello h[ae]llo subscribes to hello and hallo, but not hillo 特殊字符用\转义

pubsub方法介绍

public function pubsub( argument )

pubsub获取pub/sub系统的信息,$keyword可用为"channels", "numsub", 或者"numpat",三种,传入不同的keyword返回的数据不同

     * $redis->pubsub('channels'); // All channels 获取所有的频道,返回数组
     * $redis->pubsub('channels', '*pattern*'); // Just channels matching your pattern,返回符合匹配模式的频道
     * $redis->pubsub('numsub', array('chan1', 'chan2')); // Get subscriber counts for 'chan1' and 'chan2'    //返回每个订阅频道的数量,返回数组
     * $redis->pubsub('numpat'); // Get the number of pattern subscribers 获取模式匹配方式的订阅的数量,即$redis->psubscribe(['my[ae]est'],'psubscribe');返回数量为1,$redis->subscribe(['chan'],'callback');    这种方式获取不到,因此返回数量为0

参考:

Redis发布订阅模式

希望说的对大家有所帮助,不对的地方欢迎大家留言

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Laravel集合的简单理解

    本篇文章给大家带来的内容是关于Laravel集合的简单理解,有一定的参考价值,有需要的朋友可以参考一下,希望对你有所帮助。

    叫我可儿呀
  • 初学Swoole:PHP7安装Swoole的步骤

    本篇文章给大家带来的内容是关于初学Swoole:PHP7安装Swoole的步骤,有一定的参考价值,有需要的朋友可以参考一下,希望对你有所帮助。

    叫我可儿呀
  • php7安装swoole扩展的步骤详解

    里面讲到了swoole,对于这个东西我相信大家(接近1年phper)都是听过它,但没有真正去用它,当然也是不知道如何使用(me too)。

    叫我可儿呀
  • ubuntu安装redis以及phpredis扩展

    安装步骤: 1.执行:git clone https://github.com/nicolasff/phpredis.git 2.执行:cd phpredi...

    苦咖啡
  • redis架构演变与redis-cluster群集读写方案

    redis-cluster是近年来redis架构不断改进中的相对较好的redis高可用方案。本文涉及到近年来redis多实例架构的演变过程,包括普通主从架构(M...

    用户1263954
  • Redis 从入门到放飞(下)

    Redis在实际使用过程中更多的用作缓存,然而缓存的数据一般都是需要设置生存时间的,即:到期后数据销毁。

    芋道源码
  • centos7编译安装Redis

    redis能够兼容绝大部分的POSIX系统,比如Linux、OS X、OpenBSD、NetBSD、FreeBSD,其中比较典型的是Linux操作系统(如Cen...

    行 者
  • redis-基本介绍与linux安装

    HyperLogLog:超小内存唯一值计数,12kb HyperLogLog,本质是字符串

    suwanbin
  • 05. SpringCloud实战项目-Docker安装mysql

    Jackson0714
  • docker安装redis

    Jackson0714

扫码关注云+社区

领取腾讯云代金券