前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >TP5系列 | Queue消息队列

TP5系列 | Queue消息队列

作者头像
Tinywan
发布2019-08-06 14:33:59
4.1K0
发布2019-08-06 14:33:59
举报
文章被收录于专栏:开源技术小栈开源技术小栈

ThinkPHP5 Queue消息队列

优点

1、Queue内置了 Redis,Database,Topthink ,Sync这四种驱动,本文使用Redis驱动

2、Queue消息队列适用于大并发或者返回结果 时间有点长并需要批量操作的第三方接口,可用于短信发送、邮件发送、APP推送

3、Queue消息消息可进行发布,获取,执行,删除,重发,失败处理,延迟执行,超时控制等操作

流程图

创建队列

文件路径:application\common\queue\TestQueue.php

TestQueue.php 参考代码

代码语言:javascript
复制
<?php
namespace app\common\queue;

use think\facade\Log;
use think\queue\Job;

class TestQueue
{
    public function fire(Job $job, $data)
    {
        $isJobDone = $this->testJob($data);
        // 如果任务执行成功后 记得删除任务,不然这个任务会重复执行,直到达到最大重试次数后失败后,执行failed方法
        if ($isJobDone) {
            $job->delete();
        } else {
            //通过这个方法可以检查这个任务已经重试了几次了
            $attempts = $job->attempts();
            echo $attempts;
            if ($attempts == 0 || $attempts == 1) {
                // 重新发布这个任务
                $job->release(2); //$delay为延迟时间,延迟2S后继续执行
            } elseif ($attempts == 2) {
                $job->release(5); // 延迟5S后继续执行
            }
        }
    }

    /**
     * @Desc: 任务执行失败后自动执行方法
     * @param $data
     */
    public function failed($data)
    {
        // ...任务达到最大重试次数后,失败了
        Log::error('任务达到最大重试次数后,失败了 '.json_encode($data));
    }

    /**
     * @Desc: 自定义需要加入的队列任务
     */
    private function testJob($data)
    {
        $jsonData = json_encode($data);
        echo "1、具体执行任务接受到的参数:{$jsonData} \r\n";
        if($data){
            echo "2、恭喜你!{$data['email']} 邮件发送成功了 \r\n";
            return true;
        }else{
            echo "2、很遗憾,{$data['email']} 邮件发送失败了 \r\n";
            return false;
        }
    }
}

配置队列

1、这里使用Redis驱动来存储队列消息

2、队列配置文件路径:application\config\queue

配置参考代码

代码语言:javascript
复制
return [
    'connector'  => 'Redis',
    'expire'     => 3600,
    'default'    => 'REDIS_QUEUE',
    'host'       => 'dnmp-redis',
    'port'       => 6379,
    'password'   => '',
    'select'     => 0,
    'timeout'    => 0,
    'persistent' => false,
];

生产者参考代码

代码语言:javascript
复制
/**
* @Desc: 生产者生产消息
*/
public function productMsg()
{
    // 当前任务所需的业务数据,不能为 resource 类型,其他类型最终将转化为json形式的字符串
    $data = [
        'email' => rand(11,99).'@qq.com',
        'username' => 'Tinywan'
    ];

    // 当前任务归属的队列名称,如果为新队列,会自动创建
    $queueName = 'testQueue';

    // 将该任务推送到消息队列,等待对应的消费者去执行
    $isPushed = Queue::push(TestQueue::class, $data, $queueName);

    // database 驱动时,返回值为 1|false; redis驱动时,返回值为 随机字符串|false
    if ($isPushed !== false) {
        echo '['.$data['email'].']'." 队列加入成功 \r\n";
    } else {
        echo "队列加入失败 \r\n";
    }
}

为了方便演示,这里使用cli模式。

执行生产者php product_msg.php

代码语言:javascript
复制
# php product_msg.php
[27@qq.com] 队列加入成功
# php product_msg.php
[77@qq.com] 队列加入成功

1、这时候消息已经被持久化到Redis中去了(通过列表去存储)

2、推送成功,虽然我们这时候已经写好了消费者,但是我们并没有开始消费。但是推送消息依然是成功的。这个就是中间件的优势。他连接两个系统,但是又不会互相耦合,生产者并不会因为消费者的异常而影响到自己。

3、消息推送成功之后,如果没有消费者,消息会堆积在队列中。不过别怕,消息堆积很正常,并且一般的中间件堆积能力是非常强的。比如阿里就宣传自己mq可以堆积上亿条数据。

查看Redis消息与队列

代码语言:javascript
复制
> docker exec -it dnmp-redis redis-cli
127.0.0.1:6379> keys *
127.0.0.1:6379> keys *
1) "queues:testQueue"
127.0.0.1:6379> TYPE queues:testQueue
list
127.0.0.1:6379> LRANGE queues:testQueue 0 -1
1) "{\"job\":\"app\\\\common\\\\queue\\\\TestQueue\",\"data\":{\"email\":\"27@qq.com\",\"username\":\"Tinywan\"},\"id\":\"MLgNb4LFALhtmp7HZtfXMFPRUT0r94Bi\",\"attempts\":1}"
2) "{\"job\":\"app\\\\common\\\\queue\\\\TestQueue\",\"data\":{\"email\":\"77@qq.com\",\"username\":\"Tinywan\"},\"id\":\"JM16vvjMylfJDnOpldJaHda8xMwuYYzP\",\"attempts\":1}"
127.0.0.1:6379>

目前该队列中有两条消息存在

消费者

开始消费消息。执行cli 命令 php think queue:work--queue队列名称

代码语言:javascript
复制
# php think queue:work --queue testQueue
1、具体执行任务接受到的参数: {"email":"27@qq.com","username":"Tinywan"}
2、恭喜你!27@qq.com 邮件发送成功了
Processed: app\common\queue\TestQueue

这里每消费掉一条消息,Redis数据库中将会减少一条消息

查看Redis队列消息

代码语言:javascript
复制
127.0.0.1:6379> LRANGE queues:testQueue 0 -1
1) "{\"job\":\"app\\\\common\\\\queue\\\\TestQueue\",\"data\":{\"email\":\"77@qq.com\",\"username\":\"Tinywan\"},\"id\":\"JM16vvjMylfJDnOpldJaHda8xMwuYYzP\",\"attempts\":1}"
127.0.0.1:6379>

命令行挂起守护进程执行

代码语言:javascript
复制
/usr/bin/php /var/www/tp5/think queue:work --daemon --queue testQueue --memory 256

--daemon 是否循环执行,如果不加该参数则该命令处理完下一个消息就退出 --queue 要处理的队列的名称 --delay 0 如果本次任务执行抛出异常且任务未被删除时,设置其下次执行前延迟多少秒,默认为0。 --memory 该进程允许使用的内存上限,以M为单位。

流程图

消费信息如下

代码语言:javascript
复制
# php think queue:work --daemon --queue testQueue
1、具体执行任务接受到的参数: {"email":"77@qq.com","username":"Tinywan"}
2、恭喜你!77@qq.com 邮件发送成功了
Processed: app\common\queue\TestQueue
1、具体执行任务接受到的参数: {"email":"80@qq.com","username":"Tinywan"}
2、恭喜你!80@qq.com 邮件发送成功了
Processed: app\common\queue\TestQueue
1、具体执行任务接受到的参数: {"email":"34@qq.com","username":"Tinywan"}
2、恭喜你!34@qq.com 邮件发送成功了
Processed: app\common\queue\TestQueue

1、命令行模式可以常驻内存不停的执行php代码。这样就可以达到类似于静态语言的java的效果。

2、一开始监听队列。刚刚在队列中堆积的消息立刻就被获取到,开始执行了代码。最后执行完成,删除了消息。

3、在 queue:work--daemon 单进程循环消费的时候,改了代码是不会生效的。这时脚本语言有点类似于静态语言在执行。所以需要我们用 queue:restart 重启 work 进程

命令行挂起守护进程执行

代码语言:javascript
复制
nohup /usr/bin/php /var/www/tp5/think queue:work --daemon --queue testQueue --memory 256 > /dev/null 2> /dev/null &

查看进程是否在运行

代码语言:javascript
复制
# ps
PID   USER     TIME  COMMAND
    1 root      0:00 php-fpm: master process (/usr/local/etc/php-fpm.conf)
    6 www-data  0:00 php-fpm: pool www
    7 www-data  0:00 php-fpm: pool www
   16 root      0:00 sh
   56 root      0:00 sh
  113 root      0:00 php think queue:work --daemon --queue testQueue

你再也不用守在终端了,以后只负责生产消息就可以了。Redis队列也不会积累消息了

PS :这里使用的是Docker环境测试,如果是正式的Linux环境,请使用 ps-axu|grep queue查看守护进程

代码语言:javascript
复制
127.0.0.1:6379> LRANGE queues:testQueue 0 -1
(empty list or set)
127.0.0.1:6379> LRANGE queues:testQueue 0 -1
(empty list or set)
127.0.0.1:6379> LRANGE queues:testQueue 0 -1
(empty list or set)
127.0.0.1:6379> LRANGE queues:testQueue 0 -1
(empty list or set)
127.0.0.1:6379> LRANGE queues:testQueue 0 -1
(empty list or se

其他(中间件)

中间件系统的定义是两个独立的不同的系统在中间构建起传递消息的工具。但是同一个系统也可以通过中间件来榨取性能,大家肯定项目中遇到过性能瓶颈。

比如发送邮件,发送短信,转换视频格式等等。这些业务都是比较耗性能,又对先后顺序不敏感的业务。这种业务就非常适合使用消息队列系统来异步处理,使性能提升。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-08-05,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Tinywan的杂货摊 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • ThinkPHP5 Queue消息队列
相关产品与服务
云数据库 Redis
腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档