前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >swoft使用rabbitmq消息队列

swoft使用rabbitmq消息队列

作者头像
槽痞
发布2021-01-12 14:36:58
1.3K0
发布2021-01-12 14:36:58
举报
文章被收录于专栏:PHP开发者那些事

官方其实是实现了swoft/amqp组件,但是你会在sowft的官方文档里发现,根本找不到有任何关于它的使用说明。而且当使用composer require sowft/amqp你会发现无法安装成功,还会颇有嘲讽的提示你composer里没有找到这货。

需要手动增加如下配置到composer.json中。解决来源:https://github.com/swoft-cloud/swoft/issues/1376

代码语言:javascript
复制
{
    "repositories": {
        "swoft-amqp": {
            "type": "git",
              "url": "https://github.com/swoft-cloud/swoft-amqp.git"
        }
    }
}

在安装过程中,本地cygwin测试环境即使配了上面的地址,能下载README.md啥的,唯独无法下载最关键的swoft-amqpsrc的文件夹,最后没办法只能直接从git下载将src文件夹放到vendor中。

嘲讽+1

没有找到官方关于swoft/amqp的文档,因此只能从看源码摸索配置。 嘲讽再+1

安装好后,在bean.php中增加如下配置:

代码语言:javascript
复制
    'amqp'              => [
        'class'    => Swoft\Amqp\Client::class,
        'host'     => '127.0.0.1',
        'port'     => 5672,
        'user' => 'guest',
        'password' => 'guest',
        'vhost' => '/',
        'queue' => 'test_queue',
        'exchange' => 'exchange_test',
        'route' => 'example-test-routing-key',
        'type' => AMQPExchangeType::DIRECT,
//        'channels' => [],
    ],
    'amqp.pool' => [
        'class' => \Swoft\Amqp\Pool::class,
        'client' => bean('amqp'),
    ],

主要的几个方法: 在sowft\Amqp\Amqp中可以看到。

代码语言:javascript
复制
 * @method static Connection channel(string $channel = null)
 * @method static void push(string $message, array $prop = [], string $route = '')
 * @method static string|null pop()
 * @method static void listen(Closure $callback = null)
 */

swoft框架中如何将数据路由到指定的队列? 在原有的bean.phpamqp配置项中增加配置channels:

代码语言:javascript
复制
 'amqp'              => [
      'class'    => Swoft\Amqp\Client::class,
      'host'     => '127.0.0.1',
      'port'     => 5672,
      'user' => 'guest',
      'password' => 'guest',
      'vhost' => '/',
      'queue' => 'test_queue',
      'exchange' => 'exchange_test',
      'route' => 'example-test-routing-key',
      'type' => AMQPExchangeType::DIRECT,
      'channels' => [ 
          'channel_1'=>[  //对应的key即为channel_id
              'exchange' => 'exchange_test_02',
              'queue' => 'subway',
          ],
      ],
  ],

在具体的逻辑代码中在获取$channel时指定需要获取的channels里的配置channle_id

代码语言:javascript
复制
    public function index(Response $response): Response
    {
        $channel = Amqp::channel('channel_1');
        CLog::info('run method: '.__METHOD__);
        for($i=0; $i<10; $i++)
        {
            $channel->push('hey!-----'.$i.date('Y-m-d h:i:s'),[],'example-test-routing-key' );
        }

        $name = 'steve';
        return context()->getResponse()->withContent('Hello' . ($name === '' ? '' : ", {$name}"));
    }
    /**
     * @RequestMapping("listen")
     *
     * @param Response $response
     *
     * @return Response
     */
    public function listen(Response $response): Response
    {
        CLog::info('run method: '.__METHOD__);
        $channel = Amqp::channel('channel_1');
        $channel->listen(function ($message){
            //$message:数据结构(json_encode)之后
            //{"body":"hey!-----9","body_size":10,"is_truncated":false,"content_encoding":null,"delivery_info":{"channel":{"callbacks":{"amq.ctag-epGZgfHej3YrjZk2FBvp0A":{}}},"delivery_tag":99,"redelivered":false,"exchange":"exchange_test","routing_key":"example-test-routing-key","consumer_tag":"amq.ctag-epGZgfHej3YrjZk2FBvp0A"}}
            CLog::info('message:'. json_encode($message));
        });

        $name = 'steve';
        return context()->getResponse()->withContent('Hello' . ($name === '' ? '' : ", {$name}"));
    }

rabbitMQ的pubsub(发布订阅)模式:

  • 类型设置typefanout。具体配置可参考bean.php相关部分。
  • 消息发布者。参考代码App\Http\Controller\Test\Amqp中的pub()方法的实现。
代码语言:javascript
复制
 public function pub(Request $request): Response
    {
        $content = $request->get('content', 'this-is-a-test');
        $channel = Amqp::connection('amqp.pubsub')->channel();
        for($i=0; $i<1; $i++)
        {
            $channel->push($content);
        }
        $name = __METHOD__;
        return context()->getResponse()->withContent('Hello' . ($name === '' ? '' : ", {$name}"));
    }
  • 消息订阅者。参考代码App\Process\Sub1ProcressApp\Process\Sub2Procress
代码语言:javascript
复制
    public function run(Process $process): void
    {
        $channel = Amqp::connection('amqp.pubsub')->channel('channel_pubsub_02');
        $channel->listen(function ($message){
            //$message:数据结构(json_encode)之后
            //{"body":"hey!-----9","body_size":10,"is_truncated":false,"content_encoding":null,"delivery_info":{"channel":{"callbacks":{"amq.ctag-epGZgfHej3YrjZk2FBvp0A":{}}},"delivery_tag":99,"redelivered":false,"exchange":"exchange_test","routing_key":"example-test-routing-key","consumer_tag":"amq.ctag-epGZgfHej3YrjZk2FBvp0A"}}
//            CLog::info('sub2 message:'. json_encode($message));
            CLog::info('sub2 message:'. $message->body);
        });
    }
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2021-01-08,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档