前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >hyperf框架使用rabbitMQ生产消息至laravel/lumen进行消费

hyperf框架使用rabbitMQ生产消息至laravel/lumen进行消费

作者头像
美团骑手
发布2021-12-24 18:48:39
6450
发布2021-12-24 18:48:39
举报
文章被收录于专栏:技术进阶技术进阶

背景

需要做项目迁移时,例如laravel迁移至hyperf时,因为基本上都是一步一步迁移的,仍有例如支付回调等依旧在laravel框架中进行消费的情况。需要接管处理消息的queue进行数据格式改造,利用构造同样命名空间的job去进行投递,他会序列化数据,可以debug一下内容哦,然后投递至rabbitMQ后,laravel进行消费就好啦。其中hyperf的版本背景为2.1

话不多说开干

  1. 在app下建立Job目录为例,大家可以根据情况来
  2. 在Job目录下建立Job.php,复制以下代码
代码语言:javascript
复制
<?php

declare(strict_types=1);

namespace AppJob;

/**
 * Class Job
 * @package AppJob
 */
class Job
{
    protected $job;
    public $connection;
    public $queue;
    public $delay;

    /**
     * Job constructor.
     */
    public function __invoke()
    {
        $this->job = null;
        $this->connection = null;
        $this->queue = null;
        $this->delay = null;
    }

    /**
     * Set the desired delay for the job.
     *
     * @param  DateTime|int|null  $delay
     * @return $this
     */
    public function delay($delay)
    {
        $this->delay = $delay;

        return $this;
    }

    /**
     * Set the desired queue for the job.
     *
     * @param  string|null  $queue
     * @return $this
     */
    public function onQueue($queue)
    {
        $this->queue = $queue;

        return $this;
    }

    /**
     * Set the desired connection for the job.
     *
     * @param  string|null  $connection
     * @return $this
     */
    public function onConnection($connection)
    {
        $this->connection = $connection;

        return $this;
    }

}

接管Producer.php,继续创建Producer.php,复制以下代码进去

代码语言:javascript
复制
<?php

declare(strict_types=1);

namespace AppJob;

use HyperfAmqpConnection;
use HyperfAmqpMessageProducerMessageInterface;
use PhpAmqpLibMessageAMQPMessage;
use PhpAmqpLibWireAMQPTable;
use HyperfAmqpBuilder;

/**
 * 生产者
 * Class Producer
 * @package AppJob
 */
class Producer extends Builder
{
    public $exchange_type;
    public $exchange_passive;
    public $exchange_durable;
    public $exchange_auto_delete;

    public $queue_passive;
    public $queue_durable;
    public $queue_exclusive;
    public $queue_auto_delete;
    public $queue_nowait;

    public function checkExchange($channel, $producerMessage)
    {
        $exchange   = $producerMessage->getExchange();
        $queue      = $producerMessage->getQueue();
        $routingKey = $producerMessage->getRoutingKey();
        $ttl        = $producerMessage->getTtl();

        $this->exchange_type = env('RABBITMQ_EXCHANGE_TYPE', 'direct');
        $this->exchange_passive = env('RABBITMQ_EXCHANGE_PASSIVE', false);
        $this->exchange_durable = env('RABBITMQ_EXCHANGE_DURABLE', true);
        $this->exchange_auto_delete = env('RABBITMQ_EXCHANGE_PASSIVE', false);

        $this->queue_passive = env('RABBITMQ_QUEUE_PASSIVE', false);
        $this->queue_durable = env('RABBITMQ_QUEUE_DURABLE', true);
        $this->queue_exclusive = env('RABBITMQ_QUEUE_EXCLUSIVE', false);
        $this->queue_auto_delete = env('RABBITMQ_QUEUE_AUTODELETE', false);

        //定义交换器
        $channel->exchange_declare($exchange, $this->exchange_type, $this->exchange_passive, $this->exchange_durable, $this->exchange_auto_delete);

        //定义队列
        $channel->queue_declare($queue, $this->queue_passive, $this->queue_durable, $this->queue_exclusive, $this->queue_auto_delete);
        //绑定队列到交换器上
        $channel->queue_bind($queue, $exchange, $routingKey);

        if ($ttl > 0) {
            // $delayExchange   = 'delayed_exchange_' . $exchange;
            // $delayQueue      = 'delayed_queue_' . $queue . '_' . $ttl;
            // $delayRoutingKey = $routingKey . $ttl;
            $delayExchange   = $exchange;
            $delayQueue      = $queue . '_deferred_' . $ttl;
            $delayRoutingKey = $delayQueue;
            //定义延迟交换器
            $channel->exchange_declare($delayExchange, $this->exchange_type, $this->exchange_passive, $this->exchange_durable, $this->exchange_auto_delete);

            //定义延迟队列
            $channel->queue_declare($delayQueue, $this->queue_passive, $this->queue_durable, $this->queue_exclusive, $this->queue_auto_delete, false, new AMQPTable(array(
                "x-dead-letter-exchange"    => $exchange,
                "x-dead-letter-routing-key" => $routingKey,
                "x-message-ttl"             => $ttl * 1000,
            )));
            //绑定延迟队列到交换器上
            $channel->queue_bind($delayQueue, $delayExchange, $delayRoutingKey);

            $producerMessage->setExchange($delayExchange);
            $producerMessage->setRoutingKey($delayRoutingKey);
        }
    }

    /**
     * @param ProducerMessageInterface $producerMessage
     * @param $routingKey
     * @param $exchange
     * @param bool $confirm
     * @param int $timeout
     * @return bool
     * @throws Exception
     * @throws Throwable
     */
    public function produce(ProducerMessageInterface $producerMessage, $routingKey, $exchange, bool $confirm = false, int $timeout = 5): bool
    {
        return retry(1, function () use ($exchange, $routingKey, $producerMessage, $confirm, $timeout) {
            return $this->produceMessage($producerMessage, $routingKey, $exchange, $confirm, $timeout);
        });
    }

    /**
     * @param ProducerMessageInterface $producerMessage
     * @param $routingKey
     * @param $exchange
     * @param bool $confirm
     * @param int $timeout
     * @return bool
     * @throws Throwable
     */
    private function produceMessage(ProducerMessageInterface $producerMessage, $routingKey, $exchange, bool $confirm = false, int $timeout = 5)
    {
        $result = false;

        $this->injectMessageProperty($producerMessage, $routingKey, $exchange);

        $delay = $producerMessage->getTtl();
        if ($delay > 0) {
            $message = new AMQPMessage($producerMessage->payload(), array_merge($producerMessage->getProperties(), [
                'expiration' => $delay * 1000,
            ]));
        } else {
            $message = new AMQPMessage($producerMessage->payload(), $producerMessage->getProperties());
        }
        // $message = new AMQPMessage($producerMessage->payload(), $producerMessage->getProperties());

        $pool = $this->getConnectionPool($producerMessage->getPoolName());
        /** @var Connection $connection */
        $connection = $pool->get();
        if ($confirm) {
            $channel = $connection->getConfirmChannel();
        } else {
            $channel = $connection->getChannel();
        }
        $channel->set_ack_handler(function () use (&$result) {
            $result = true;
        });

        try {
            // 检测交换机和队列
            $this->checkExchange($channel, $producerMessage);

            $channel->basic_publish($message, $producerMessage->getExchange(), $producerMessage->getRoutingKey());
            $channel->wait_for_pending_acks_returns($timeout);
        } catch (Throwable $exception) {
            // Reconnect the connection before release.
            $connection->reconnect();
            throw $exception;
        } finally {
            $connection->release();
        }

        return $confirm ? $result : true;
    }

    private function injectMessageProperty(ProducerMessageInterface $producerMessage, $routingKey, $exchange)
    {
        $producerMessage->setRoutingKey($routingKey);
        $producerMessage->setExchange($exchange);
    }
}

接管ProducerMessage.php,继续创建ProducerMessage.php,复制以下代码进去

代码语言:javascript
复制
<?php

declare(strict_types=1);

namespace AppJob;

use HyperfAmqpConstants;
use HyperfAmqpMessageMessage;
use HyperfAmqpMessageProducerMessageInterface;

/**
 * 生产消息
 * Class Job
 * @package AppJob
 */
abstract class ProducerMessage extends Message implements ProducerMessageInterface
{
    /**
     * @var string
     */
    protected $payload = '';

    /**
     * @var string
     */
    protected $routingKey = '';

    /**
     * @var array
     */
    protected $properties
        = [
            'content_type' => 'text/plain',
            'delivery_mode' => Constants::DELIVERY_MODE_PERSISTENT,
        ];

    public function getProperties(): array
    {
        return $this->properties;
    }

    public function setPayload($data): self
    {
        $this->payload = $data;
        return $this;
    }

    public function payload(): string
    {
        return $this->serialize();
    }

    public function serialize(): string
    {
        return json_encode($this->payload);
    }

    /**
     * @var integer 延迟时间(秒)
     */
    protected $ttl = 0;

    public function setTtl($ttl)
    {
        $this->ttl = $ttl;
        return $this;
    }

    public function getTtl()
    {
        return $this->ttl;
    }

    protected $queue = 'default';

    public function setQueue($name)
    {
        $this->queue = $name;

        return $this;
    }

    public function getQueue()
    {
        return $this->queue;
    }
}

序列化数据,创建SerializeJobData.php,复制以下代码进去

代码语言:javascript
复制
<?php

declare(strict_types=1);

namespace AppJob;
use HyperfUtilsStr;

/**
 * 序列化队列数据
 * Class SerializeJobData
 * @package AppJob
 */
class SerializeJobData extends ProducerMessage
{
    public function __construct($job)
    {
        // 设置不同 pool
        $this->poolName = 'default';
        /**
         * 当驱动为redis时
         * use IlluminateSupportStr;
         * 'id' => Str::random(32),'attempts' => 0,
         */
        if (env('QUEUE_DRIVER', 'rabbitmq') == 'rabbitmq') {
            $this->payload = [
                'displayName' => get_class($job),
                'job' => 'IlluminateQueueCallQueuedHandler@call',
                'maxTries' => isset($job->tries) ? $job->tries : null,
                'timeout' => isset($job->timeout) ? $job->timeout : null,
                'data' => [
                    'commandName' => get_class($job),
                    'command' => serialize(clone $job)
                ]
            ];
        } else {
            $this->payload = [
                'displayName' => get_class($job),
                'job' => 'IlluminateQueueCallQueuedHandler@call',
                'maxTries' => isset($job->tries) ? $job->tries : null,
                'timeout' => isset($job->timeout) ? $job->timeout : null,
                'data' => [
                    'commandName' => get_class($job),
                    'command' => serialize(clone $job)
                ],
                'id' => Str::random(32),
                'attempts' => 0
            ];
        }

    }
}

创建助手函数 注意我的内容哦 按需修改

代码语言:javascript
复制
if (!function_exists('producerPushData')) {
    /**
     * 投递信息
     * @param ProducerMessageInterface $message 消息
     * @param string $routingKey 默认 default
     * @param string $exchange 所投入的queue
     * @param bool $confirm 是否需要确认
     * @param int $timeout 超时时间
     * @return bool
     * @throws Throwable
     */
    function producerPushData($message, $routingKey = 'default', $exchange = '', bool $confirm = false, int $timeout = 5)
    {
        $exchange = !empty($exchange) ? $exchange : env('RABBITMQ_EXCHANGE_NAME', 'sweetheart');
        return make(Producer::class)->produce($message, $routingKey, $exchange, $confirm, $timeout);
    }
}

使用方式 注意需要和laravel/lumen 保持同样的命名空间哦

创建的job需要继承 ```AppJobJob``

代码语言:javascript
复制
<?php

declare(strict_types=1);

use AppJobJob;

/**
 * Class TestJob
 */
class TestJob extends Job
{
    /**
     * @var
     */
    protected $data;

    /**
     * TestJob constructor.
     * @param $data
     */
    public function __construct($data)
    {
        $this->data = $data;
    }

    /**
     * 处理逻辑
     */
    public function __handle()
    {

    }
}
代码语言:javascript
复制
use AppJobSerializeJobData;
use TestJob;
$data = [];
$job = new TestJob($data);
producerPushData((new SerializeJobData($job)));
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2021-11-05 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 背景
  • 话不多说开干
    • 创建的job需要继承 ```AppJobJob``
    相关产品与服务
    文件存储
    文件存储(Cloud File Storage,CFS)为您提供安全可靠、可扩展的共享文件存储服务。文件存储可与腾讯云服务器、容器服务、批量计算等服务搭配使用,为多个计算节点提供容量和性能可弹性扩展的高性能共享存储。腾讯云文件存储的管理界面简单、易使用,可实现对现有应用的无缝集成;按实际用量付费,为您节约成本,简化 IT 运维工作。
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档