前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >使用swoole的Process实现生产者消费者模型

使用swoole的Process实现生产者消费者模型

作者头像
跑马溜溜的球
发布2020-12-07 15:32:04
6880
发布2020-12-07 15:32:04
举报
文章被收录于专栏:日积月累1024

零.源码

代码语言:javascript
复制
<?php
abstract class Schedule{
    protected $_consumerList = array();
    protected $_msgqkey = null;

    protected $_consumerNum = 2;
    protected $_finishFlag = 'ALLDONE';

    public function __construct($cNum = 0){
        if ($cNum){
            $this->_consumerNum = $cNum;
        }
    }

    public function setConsumerNum($num = 0){
        if ($num){
            $this->_consumerNum = $num;
            return true;
        }

        return false;
    }

    public function setFinishFlag($flag = null){
        if ($flag){
            $this->_finishFlag = $flag;
            return true;
        }

        return false;
    }

    public function run(){
        $this->_consumerList = array();
        for($i=0; $i<$this->_consumerNum; $i++){
            $consumer = new swoole_process(function($worker){
                $this->_consumerFunc($worker);
            });

            if ($this->_msgqkey){
                $consumer->useQueue($this->_msgqkey);
            }
            else{
                $consumer->useQueue();
            }
            $pid = $consumer->start();

            $this->_consumerList[$pid] = $consumer;
        }

        $producer = new swoole_process(function($worker){
            //echo "i'm passer\n";
            exit(0);
        });

        if ($this->_msgqkey){
            $producer->useQueue($this->_msgqkey);
        }
        else{
            $producer->useQueue();
        }

        $pid = $producer->start();
        echo "begin:\n";
        echo sprintf("msgqkey:%s\n", $producer->msgQueueKey);

        $this->_producerFunc($producer);
    }

    protected function _producerFunc($worker){
        if ($this->_onlyConsume()){
            return;
        }

        foreach ($this->doProduce($worker) as $data){
            $worker->push($data);
        }

        //任务数据被取完
        while(true){
            $c = $worker->statQueue();
            $n = $c['queue_num'];
            if ($n === 0){
                break;  
            }
        }

        //放入consumer进程程结束标识
        foreach($this->_consumerList as $pid => $w){
            $w->push($this->_finishFlag);
        }

        //确认结束
        while(true){
            $c = $worker->statQueue();
            $n = $c['queue_num'];
            if ($n === 0){
                break;  
            }
        }

        $worker->freeQueue();
    }

    protected function _consumerFunc($worker){
        while(1){
            $data = $worker->pop();
            $pid = $worker->pid;
            if ($data == $this->_finishFlag){
                echo "consumer $pid exit\n";
                $worker->exit(0);
            }
            else{
                $this->doConsume($data, $worker);
            }
        }
    }

    protected function _onlyConsume(){
        return !! $this->_msgqkey;
    }

    abstract protected function doProduce($worker);

    abstract protected function doConsume($data, $worker);
}

一.功能说明

  1. 实现了生产者消费者模型,一个生产者向任务队列写数据,N个消费者取数据做处理。
  2. 数据处理完后生产者与消费者自动退出
  3. 在消费者意外挂掉的情况下,允许单独运行消费者继续处理之前队列中的任务

二.使用说明

1. 生产者消费者demo

代码语言:javascript
复制
<?php
class Taskdemo extends Schedule{
    protected $_consumerNum = 5;

    protected function doProduce($worker){
        $all = 100;
        for($i=0; $i<$all; $i+=4){
            yield json_encode(array('data'=>$i));
        }
    }

    protected function doConsume($data, $worker){
        //your process
        sleep(1);
        echo "consumer:{$worker->pid} redv {$data}\n";
    }
}

说明 1. 要继承Schedule 2. _consumerNum为消费者个数,不设置,默认2个。 3. doProduce(worker)用于产生任务数据的函数,要求返回值必须是数组或迭代器,每一项为一条任务数据。worker为swoole进程句柄。 4. doConsume(data,worker)用于消费者处理数据的函数。data为单条消息,worker为swoole进程句柄。 5. 一般情况进程句柄

2. 处理程序中途挂掉的情况

步骤:

1.确认当前队列的key 程序运行时,会打出

代码语言:javascript
复制
msgqkey:1078263

也可以使用命令行

代码语言:javascript
复制
ipcs -q
------ Message Queues --------
key        msqid      owner      perms      used-bytes   messages    
0x001073f7 2359298    ballqiu    666        165          15

key值即是所需要的

  1. 修改Taskdemo,加入一行代码
代码语言:javascript
复制
protected $_msgqkey = 0x001073f7;
  1. 重新运行程序
  2. 手动删除队列
代码语言:javascript
复制
ipcrm -q $msgqkey

三.实现原理

  • 使用swoole的Process,主进程调用doProduce()向消息队列写任务数据,fork出的n个子进程从队列取数据。队列就是linux用于进程间通信的消息队列。
  • 子进程从队列里不停取任务处理,如果拿到“完成标识串”(一个特定字符串),就退出。
  • 主进程发现队列数据被处理完后,如果有n个子进程,就向队列发n个到“完成标识串”。然后再次检查队列,队列空时删除队列,自身退出。

四.注意事项

  • 消息队列的一些使用上的限制,可以参见这里
  • 默认的”完成标识串”是ALLDONE,如需修改,可在Taskdemo中增加
代码语言:javascript
复制
protected $_finishFlag = 'youflag';
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2016/12/30 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 零.源码
  • 一.功能说明
  • 二.使用说明
    • 1. 生产者消费者demo
      • 2. 处理程序中途挂掉的情况
      • 三.实现原理
      • 四.注意事项
      相关产品与服务
      消息队列 CMQ 版
      消息队列 CMQ 版(TDMQ for CMQ,简称 TDMQ CMQ 版)是一款分布式高可用的消息队列服务,它能够提供可靠的,基于消息的异步通信机制,能够将分布式部署的不同应用(或同一应用的不同组件)中的信息传递,存储在可靠有效的 CMQ 队列中,防止消息丢失。TDMQ CMQ 版支持多进程同时读写,收发互不干扰,无需各应用或组件始终处于运行状态。
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档