前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >thinkPhp使用框架自带队列think-queue

thinkPhp使用框架自带队列think-queue

作者头像
超级小可爱
发布2023-02-22 15:41:14
1.3K0
发布2023-02-22 15:41:14
举报
文章被收录于专栏:小孟开发笔记

首先讲解一下何为异步消息队列: 所谓消息队列,就是一个以队列数据结构为基础的一个实体,这个实体是真实存在的,比如程序中的数组,数据库中的表,或者redis等等,都可以。 异步队列的作用: 个人认为消息队列的主要特点是异步处理,主要目的是减少请求响应时间和解耦。所以主要的使用场景就是将比较耗时而且不需要即时(同步)返回结果的操作作为消息放入消息队列 转载:https://zhuanlan.zhihu.com/p/129383173

————————————————————————————————————–

队列流程:一个消息的 创建 -> 推送 -> 消费 -> 删除

转载:https://github.com/coolseven/notes/blob/master/thinkphp-queue/README.md

1.安装队列依赖

由于框架版本原因可以选择适合的版本

代码语言:javascript
复制
composer require topthink/think-queue

由于我是tp框架5.1的,所以选择了think-queue 2.*

代码语言:javascript
复制
# Thinkphp5.1
composer require topthink/think-queue:2.*
# Thinkphp6
composer require topthink/think-queue:3.*

判断安装成功

代码语言:javascript
复制
php think queue:work -h

2.配置文件

看了网上其他的一些帖子说配置文件在统一目录下/config/queue.php 但是,我这边没有生成,但是根据Queue.php源码可以看出,配置是在config.php文件中的一个键值对

代码语言:javascript
复制
  //  文件路径 App/config/queue.php
  //  队列设置
    'queue' => [
        'connector' => 'Redis', // 驱动方式
        'expire' => 60,         // 缓存有效期
        'default' => "queue",   // 如果未设置队列名称,默认队列名称
        'host' => '127.0.0.1',  // 主机地址
        'port' => 6379,         // 端口
        'password' => '',       // 密码
        'select' => 0,          // 默认选择第一个库
        'timeout' => 0,         // 超时
        'persistent' => false, // 是否长连接
    ],

3.在项目下新建一个Job目录,用来存放处理消息的类

4.控制器编写测试代码

代码语言:javascript
复制
<?php

namespace app\http\controller;

use app\http\Job\MsgPushJob;
use think\Queue;


class Index {
    /**
     * 投递消息(生产者)
     * @return string
     */
    public function push(): string
    {
        //  queue的 push方法 第一个参数可以接收字符或者对象字符串
        $job = 'app\http\Job\MsgPushJob';
        $queueName = 'test';
        $data['msg'] = 'Test queue msg,time:' . date('Y-m-d H:i:s', time());
        $data['user_id'] = 1;
//        $res = Queue::push(MsgPushJob::class, $data, $queueName);  // 可以自动获取
        $res = Queue::push($job, $data, $queueName);   // 可以手动指定
        if ($res == false)
        {
            return '消息投递失败';
        } else {
            return '消息投递成功';
        }
    }
}

5.编写对应的消费者类

代码语言:javascript
复制
<?php

namespace app\http\Job;
use think\Db;
use think\queue\Job;

class MsgPushJob {

    /**
     * php think queue:listen --queue test
     * @param Job $job
     * @param $data
     * @return bool
     * @throws \think\db\exception\DataNotFoundException
     * @throws \think\db\exception\ModelNotFoundException
     * @throws \think\exception\DbException
     */
    public function fire(Job $job, $data): bool
    {
        //  验证消息 只处理user_id = 1的值
        $is_exit = $this->checkMsg($data);
        if ($is_exit)
        {
            try {
                //  这里是处理消息的逻辑
                $res = Db::name('test')->where('id', $data['user_id'])->update(['age' => 10]);
                if (!$res)  return false;
                $job->delete();
            } catch (\Exception $exception) {
                if ($job->attempts() > 3)
                {
                    //  如果消息处理失败次数大于 3 次
                    //  1.可以把失败的消息放入队列重新消费
                    //  2.延迟消息执行
                    //  3.删除消息
                }
            }
        }
        $job->delete();
        return false;
    }

    /**
     * 是否需要消费
     * @param $data
     * @return bool
     */
    public function checkMsg($data): bool
    {
        //  判断消息到达这里的时候,是否还需要继续处理
        if ($data['user_id'] == 1) return true;
        return false;
    }
}

6.测试消息投递

数据表默认数据

启动队列监听,对应的参数可以查阅相关文档

代码语言:javascript
复制
php think queue:listen --queue test

访问控制器接口的时候回来窗口下打印出对应消息者的地址

消息投递成功后,会在redis中生成一条数据(list数据类型),可以在redis中查看

成功消费后数据库的数据

7消息在linux上以守护进程方式运行

代码语言:javascript
复制
生成 test 文件
mknod test c 1 3

nohup php think queue:work --daemon --queue test--tries 2 >  /dev/test 2>&1  &
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1.安装队列依赖
  • 2.配置文件
  • 3.在项目下新建一个Job目录,用来存放处理消息的类
  • 4.控制器编写测试代码
  • 5.编写对应的消费者类
  • 6.测试消息投递
  • 7消息在linux上以守护进程方式运行
相关产品与服务
消息队列 CMQ 版
消息队列 CMQ 版(TDMQ for CMQ,简称 TDMQ CMQ 版)是一款分布式高可用的消息队列服务,它能够提供可靠的,基于消息的异步通信机制,能够将分布式部署的不同应用(或同一应用的不同组件)中的信息传递,存储在可靠有效的 CMQ 队列中,防止消息丢失。TDMQ CMQ 版支持多进程同时读写,收发互不干扰,无需各应用或组件始终处于运行状态。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档