前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >PHP使用topthink/think-queue消息队列实例

PHP使用topthink/think-queue消息队列实例

作者头像
OwenZhang
发布2021-12-08 14:22:57
1K0
发布2021-12-08 14:22:57
举报
文章被收录于专栏:Owen's World

常住队列消费命令

sudo nohup php7.2 think queue:work --daemon --queue createAdminLogQueue --tries 2 > out.file 2>&1 &

sudo php7.2 think queue:listen --queue createAdminLogQueue

单次队列消费命令

sudo php7.2 think queue:work --daemon --queue createAdminLogQueue

队列添加php代码快

代码语言:javascript
复制
       // 当前队列归属的队列名称
        $jobHandlerClassName = 'app\hook\adminLog\job\AdminLogCreateQueueJob';
        //队列名称
        $jobQueueName = "createAdminLogQueue";
        // 插入队列
        $isPushed = Queue::push($jobHandlerClassName, $data, $jobQueueName);
        if( $isPushed == false ){
            \Log::error("createAdminLogQueue创建队列失败".$data, []);
        }
复制代码

使用tp5勾子实现think-queue消息队列实例,实现后台操作日志到添加到数据库

前提:thinkphp5框架基础上,已包含topthink/think-queue消息队列依赖包,可以用composer下载,这里不懂可以百度,就不说你。

1、创建admin_op_log数据表(字段不要更改)

代码语言:javascript
复制
CREATE TABLE `admin_op_log` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `op_time` varchar(20) NOT NULL,
  `admin_id` int(11) NOT NULL,
  `employee_no` varchar(30) NOT NULL,
  `op_type` tinyint(4) NOT NULL COMMENT '操作类型:1-新增 2-修改 3-删除',
  `op_object` varchar(50) NOT NULL COMMENT '操作对象',
  `op_object_id` int(11) NOT NULL DEFAULT '0' COMMENT '操作对象id',
  `op_info` varchar(2000) NOT NULL,
  `op_status` tinyint(4) unsigned zerofill NOT NULL DEFAULT '0000' COMMENT '操作状态 0-成功 1-失败',
  `op_ip` varchar(50) NOT NULL,
  `sys` varchar(10) DEFAULT '343' COMMENT '平台判断',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=3981 DEFAULT CHARSET=utf8mb4
复制代码

2、配置(配置文件位于 application/extra/queue.php)

代码语言:javascript
复制
return [
    'connector' => 'database'  //驱动类型,可选择 sync(默认):同步执行,database:数据库驱动,redis:Redis驱动,topthink:Topthink驱动
];
复制代码

3、创建接口。

代码语言:javascript
复制
//添加操作日志
$context = AdminLog::getInstance()->newAdminLogContext(AdminLogContext::ADMIN_OP_TYPE_UPDATE, $this::ADMIN_OP_TABLE_NAME, AdminLogContext::ADMIN_OP_STATUS_SUCCESS, $authorId, $opInfo);
\think\facade\Hook::listen('admin_log', $context);
复制代码

4、增加任务,创建任务类,这里为了区分模块单独创建了一个模块和类。

application/hook/adminLog/controller/AdminLog.php

代码语言:javascript
复制
<?php

namespace app\hook\adminLog\controller;

use app\hook\adminLog\context\AdminLogContext;
use app\lucky\admin\controller\AdminBase;
use app\lucky\common\InstanceTrait;
use think\App;
use think\Queue;
use think\Request;

class AdminLog extends AdminBase
{
    use InstanceTrait;

    public $sys = '';

    public function __construct(App $app = null, Request $request = null)
    {
        parent::__construct($app, $request);
        $this->sys = \session('sys') ? \session('sys') : 343;
    }

    /**
     * 获取newAdminLogContext
     */
    public function newAdminLogContext($operateType, $operateObject, $operateStatus, $operateObjectId, $operateInfo)
    {
        $operateInfo = json_encode($operateInfo);
        return new AdminLogContext([
            'operateUserId' => $this->loginInfo['admin_id'],
            'operateUserEmployeeNo' => $this->loginInfo['employee_no'],
            'operateType' => $operateType,
            'operateObject' => $operateObject,
            'operateObjectId' => $operateObjectId,
            'operateInfo' => $operateInfo,
            'operateStatus' => $operateStatus,
            'operateIp' => $this->requestIp
        ]);
    }


    /**
     * @description 日志添加
     */
    public function run(AdminLogContext $context)
    {
        $data = [
            "admin_id" => $context->operateUserId,
            "employee_no" => $context->operateUserEmployeeNo,
            "op_type" => $context->operateType,
            "op_object" => $context->operateObject,
            "op_object_id" => $context->operateObjectId,
            "op_info" => $context->operateInfo,
            "op_status" => $context->operateStatus,
            "op_ip" => $context->operateIp,
            "op_time" => time(),
            "sys" => $this->sys
        ];

        // 当前队列归属的队列名称
        $jobHandlerClassName = 'app\hook\adminLog\job\AdminLogCreateQueueJob';
        //队列名称
        $jobQueueName = "createAdminLogQueue";
        // 插入队列
        $isPushed = Queue::push($jobHandlerClassName, $data, $jobQueueName);
        if( $isPushed == false ){
            \Log::error("createAdminLogQueue创建队列失败".$data, []);
        }
    }

}
复制代码

application/hook/adminLog/ job/AdminLogCreateQueueJob.php

代码语言:javascript
复制
<?php

namespace app\hook\adminLog\job;

use app\hook\adminLog\service\AdminOpLogService;
use think\queue\Job;
use think\facade\Log;

class AdminLogCreateQueueJob
{
    // php think queue:work --queue BlogViewSyncJob
    //消费队列
    public function perform($data)
    {
        $adminOpLogService = new AdminOpLogService();
        $createFlge =$adminOpLogService->create($data);
        if (!$createFlge){
            \Log::error("createAdminLogQueue消费队列失败", []);
        }
        return $createFlge;
    }

    /**
     * fire是消息队列默认调用的方法
     * @param Job $job 当前的任务对象
     * @param array|mixed $data 发布任务时自定义的数据
     */
    public function fire(Job $job, $data)
    {
        //消费队列
        $isJobDone = $this->perform($data);

        if ($isJobDone) {
            //如果任务执行成功, 记得删除任务
            $job->delete();
        } else {
            //检查任务重试3次数后删除队列
            if ($job->attempts() > 3) {
                $job->delete();
            }
        }
    }
}
复制代码

application/hook/adminLog/context/AdminLogContext.php

代码语言:javascript
复制
<?php

namespace app\hook\adminLog\context;

class AdminLogContext extends \app\common\Context
{
    //操作类型:1-新增 2-修改 3-删除
    const ADMIN_OP_TYPE_CREATE = 1;
    const ADMIN_OP_TYPE_UPDATE = 2;
    const ADMIN_OP_TYPE_DELETE = 3;

    //操作状态 0-成功 1-失败'
    const ADMIN_OP_STATUS_SUCCESS = 0;
    const ADMIN_OP_STATUS_FAIL = 1;

    //操作类型:1-新增 2-修改 3-删除
    public $operateType;

    //操作人ID
    public $operateUserId;

    //操作人EmployeeNo
    public $operateUserEmployeeNo;

    //操作对象 表名
    public $operateObject;

    //操作对象id 表名id
    public $operateObjectId;

    //操作信息
    public $operateInfo;

    //操作状态 0-成功 1-失败'
    public $operateStatus;

    //操作人IP
    public $operateIp;

    //操作时间
    public $operateTime = '';

}
复制代码

application/hook/adminLog/model/AdminOpLogModel.php

代码语言:javascript
复制
<?php

namespace app\hook\adminLog\model;

use think\Db;

class AdminOpLogModel
{

    /**
     * 新增操作记录
     * @param $data
     */
    public function insert($data)
    {
        return DB::table("admin_op_log")->insert($data);
    }

}
复制代码

application/hook/adminLog/service/AdminOpLogService.php

代码语言:javascript
复制
<?php

namespace app\hook\adminLog\service;

use app\hook\adminLog\model\AdminOpLogModel;
use think\Exception;
use think\facade\Log;

class AdminOpLogService
{
    public $model = '';

    public function __construct()
    {
        $this->model = new AdminOpLogModel();
    }

    /**
     * 添加
     * @param array $data 数据数组
     * @return bool 返回类型
     */
    public function create($data)
    {
        try {
            return $this->model->insert($data);
        }catch (Exception $e) {
            \Log::error("添加操作日志失败(admin_op_log表插入失败)" . $e->getMessage(), []);
            return false;
        }
    }


}
复制代码

5、在控制台监听任务并执行消费任务

代码语言:javascript
复制
cd到项目下执行命令 C:\wamp\www\test>php think queue:listen
执行全部 sudo php think queue:listen --queue createAdminLogQueue
执行一次 sudo php think queue:work --queue createAdminLogQueue
复制代码

thinkqueue 后台运行常驻程序

进入项目路径,在目录下执行命令

在后台运行两条进程,常驻内存,不断的处理任务消息队列任务,如果要用指定版本php7.2表示使用7.2版本来执行,默认用php就可以来

代码语言:javascript
复制
sudo nohup php think queue:work --daemon --queue jobQueue --tries 2 >  out.file  2>&1  &
sudo nohup php7.2 think queue:work --daemon --queue jobQueueSlow --tries 2 >  out2.file  2>&1  &
复制代码
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2021年10月25日,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • application/hook/adminLog/controller/AdminLog.php
  • application/hook/adminLog/ job/AdminLogCreateQueueJob.php
  • application/hook/adminLog/context/AdminLogContext.php
  • application/hook/adminLog/model/AdminOpLogModel.php
  • application/hook/adminLog/service/AdminOpLogService.php
    相关产品与服务
    消息队列 CMQ 版
    消息队列 CMQ 版(TDMQ for CMQ,简称 TDMQ CMQ 版)是一款分布式高可用的消息队列服务,它能够提供可靠的,基于消息的异步通信机制,能够将分布式部署的不同应用(或同一应用的不同组件)中的信息传递,存储在可靠有效的 CMQ 队列中,防止消息丢失。TDMQ CMQ 版支持多进程同时读写,收发互不干扰,无需各应用或组件始终处于运行状态。
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档