推荐一个 PHP 管道插件 League\\Pipeline

本文 2924.5字,需要 7.31 分钟

Pipeline 设计模式

水管太长,只要有一处破了,就会漏水了,而且不利于复杂环境弯曲转折使用。所以我们都会把水管分成很短的一节一节管道,然后最大化的让管道大小作用不同,因地制宜,组装在一起,满足各种各样的不同需求。

由此得出 Pipeline 的设计模式,就是将复杂冗长的流程 (processes) 截成各个小流程,小任务。每个最小量化的任务就可以复用,通过组装不同的小任务,构成复杂多样的流程 (processes)。

最后将「输入」引入管道,根据每个小任务对输入进行操作 (加工、过滤),最后输出满足需要的结果。

今天主要学习学习「Pipeline」,顺便推荐一个 PHP 插件:league/pipeline

gulp

第一次知道「pipe」的概念,来自 gulp 的使用。

gulp 是基于 NodeJS 的自动任务运行器,她能自动化地完成Javascriptsassless 等文件的测试、检查、合并、压缩、格式化、浏览器自动刷新、部署文件生成,并监听文件在改动后重复指定的这些步骤。在实现上,她借鉴了 Unix 操作系统的管道 (pipe) 思想,前一级的输出,直接变成后一级的输入,使得在操作上非常简单。

var gulp = require('gulp');
var less = require('gulp-less');
var minifyCSS = require('gulp-csso');
var concat = require('gulp-concat');
var sourcemaps = require('gulp-sourcemaps');

gulp.task('css', function(){
  return gulp.src('client/templates/*.less')
    .pipe(less())
    .pipe(minifyCSS())
    .pipe(gulp.dest('build/css'))
});

gulp.task('js', function(){
  return gulp.src('client/javascript/*.js')
    .pipe(sourcemaps.init())
    .pipe(concat('app.min.js'))
    .pipe(sourcemaps.write())
    .pipe(gulp.dest('build/js'))
});

gulp.task('default', [ 'html', 'css', 'js' ]);

上面的两个 task 主要是将 less、所有 js 文件进行解析、压缩、输出等流程操作,然后存到对应的文件夹下;每一步操作的输出就是下一步操作的输入,犹如管道的流水一般。

Illuminate\Pipeline

Laravel 框架中的中间件,就是利用 Illuminate\Pipeline 来实现的,本来想写写我对 「Laravel 中间件」源码的解读,但发现网上已经有很多帖子都有表述了,所以本文就简单说说如何使用 Illuminate\Pipeline

写个 demo

public function demo(Request $request)
{
    $pipe1 = function ($payload, Closure $next) {
        $payload = $payload + 1;
        return $next($payload);
    };

    $pipe2 = function ($payload, Closure $next) {
        $payload = $payload * 3;
        return $next($payload);
    };

    $data = $request->input('data', 0);

    $pipeline = new Pipeline();

    return $pipeline
        ->send($data)
        ->through([$pipe1, $pipe2])
        ->then(function ($data) {
            return $data;
        });
}

对于该源码的分析,可以推荐看这篇文章,分析的挺透彻了:

Laravel Pipeline 组件的实现 https://www.insp.top/article/realization-of-pipeline-component-for-laravel

League\Pipeline

上面对 gulpIlluminate\Pipeline 的简单使用,只是告诉我们「Pipeline」应用比较广泛。如果让我们自己也写一个类似的插件出来呢,我想应该也不是很难。

下面我拿 League\Pipeline 插件来扒一扒它的源代码,看如何实现的。

简述

This package provides a plug and play implementation of the Pipeline Pattern. It’s an architectural pattern which encapsulates sequential processes. When used, it allows you to mix and match operation, and pipelines, to create new execution chains. The pipeline pattern is often compared to a production line, where each stage performs a certain operation on a given payload/subject. Stages can act on, manipulate, decorate, or even replace the payload. If you find yourself passing results from one function to another to complete a series of tasks on a given subject, you might want to convert it into a pipeline. https://pipeline.thephpleague.com/

安装插件

composer require league/pipeline

写个 demo

use League\Pipeline\Pipeline;

// 创建两个闭包函数
$pipe1 = function ($payload) {
    return $payload + 1;
};

$pipe2 = function ($payload) {
    return $payload * 3;
};

$route->map(
    'GET',
    '/demo',
    function (ServerRequestInterface $request, ResponseInterface $response
    ) use ($service, $pipe1, $pipe2) {
        $params = $request->getQueryParams();

        // 正常使用
        $pipeline1 = (new Pipeline)
            ->pipe($pipe1)
            ->pipe($pipe2);

        $callback1 = $pipeline1->process($params['data']);

        $response->getBody()->write("<h1>正常使用</h1>");
        $response->getBody()->write("<p>结果:$callback1</p>");

        // 使用魔术方法
        $pipeline2 = (new Pipeline())
            ->pipe($pipe1)
            ->pipe($pipe2);

        $callback2 = $pipeline2($params['data']);

        $response->getBody()->write("<h1>使用魔术方法</h1>");
        $response->getBody()->write("<p>结果:$callback2</p>");

        // 使用 Builder
        $builder = new PipelineBuilder();
        $pipeline3 = $builder
            ->add($pipe1)
            ->add($pipe2)
            ->build();

        $callback3 = $pipeline3($params['data']);

        $response->getBody()->write("<h1>使用 Builder</h1>");
        $response->getBody()->write("<p>结果:$callback3</p>");
        return $response;
    }
);

运行结果

解读源代码

整个插件就这几个文件:

PipelineInterface

<?php
declare(strict_types=1);

namespace League\Pipeline;

interface PipelineInterface extends StageInterface
{
    /**
     * Create a new pipeline with an appended stage.
     *
     * @return static
     */
    public function pipe(callable $operation): PipelineInterface;
}

interface StageInterface
{
    /**
     * Process the payload.
     *
     * @param mixed $payload
     *
     * @return mixed
     */
    public function __invoke($payload);
}

该接口主要是利用链式编程的思想,不断添加管道「pipe」,然后增加一个魔术方法,来让传入的参数运转起来。

先看看这个魔术方法的作用:

mixed __invoke ([ $... ] ) 当尝试以调用函数的方式调用一个对象时,__invoke() 方法会被自动调用。 参考来自:http://php.net/manual/zh/language.oop5.magic.php 如:

<?php
class CallableClass 
{
    function __invoke($x) {
        var_dump($x);
    }
}
$obj = new CallableClass;
$obj(5);
var_dump(is_callable($obj));
?>

返回结果:

int(5)
bool(true)

Pipeline

<?php
declare(strict_types=1);

namespace League\Pipeline;

class Pipeline implements PipelineInterface
{
    /**
     * @var callable[]
     */
    private $stages = [];

    /**
     * @var ProcessorInterface
     */
    private $processor;

    public function __construct(ProcessorInterface $processor = null, callable ...$stages)
    {
        $this->processor = $processor ?? new FingersCrossedProcessor;
        $this->stages = $stages;
    }

    public function pipe(callable $stage): PipelineInterface
    {
        $pipeline = clone $this;
        $pipeline->stages[] = $stage;

        return $pipeline;
    }

    public function process($payload)
    {
        return $this->processor->process($payload, ...$this->stages);
    }

    public function __invoke($payload)
    {
        return $this->process($payload);
    }
}

其中核心类 Pipeline 的作用主要就是两个:

  1. 添加组装各个管道「pipe」;
  2. 组装后,引水流动,执行 process($payload),输出结果。

Processor

接好各种管道后,那就要「引水入渠」了。该插件提供了两个基础执行类,比较简单,直接看代码就能懂。

// 按照 $stages 数组顺利,遍历执行管道方法,再将结果传入下一个管道,让「水」一层层「流动」
class FingersCrossedProcessor implements ProcessorInterface
{
    public function process($payload, callable ...$stages)
    {
        foreach ($stages as $stage) {
            $payload = $stage($payload);
        }

        return $payload;
    }
}

// 增加一个额外的「过滤网」,经过每个管道后的结果,都需要 check,一旦不满足则终止,直接输出结果。
class InterruptibleProcessor implements ProcessorInterface
{
    /**
     * @var callable
     */
    private $check;

    public function __construct(callable $check)
    {
        $this->check = $check;
    }

    public function process($payload, callable ...$stages)
    {
        $check = $this->check;

        foreach ($stages as $stage) {
            $payload = $stage($payload);

            if (true !== $check($payload)) {
                return $payload;
            }
        }

        return $payload;
    }
}

interface ProcessorInterface
{
    /**
     * Process the payload using multiple stages.
     *
     * @param mixed $payload
     *
     * @return mixed
     */
    public function process($payload, callable ...$stages);
}

我们完全也可以利用该接口,实现我们的方法来组装管道和「过滤网」。

PipelineBuilder

最后提供了一个 Builder,这个也很好理解:

class PipelineBuilder implements PipelineBuilderInterface
{
    /**
     * @var callable[]
     */
    private $stages = [];

    /**
     * @return self
     */
    public function add(callable $stage): PipelineBuilderInterface
    {
        $this->stages[] = $stage;

        return $this;
    }

    public function build(ProcessorInterface $processor = null): PipelineInterface
    {
        return new Pipeline($processor, ...$this->stages);
    }
}

interface PipelineBuilderInterface
{
    /**
     * Add an stage.
     *
     * @return self
     */
    public function add(callable $stage): PipelineBuilderInterface;

    /**
     * Build a new Pipeline object.
     */
    public function build(ProcessorInterface $processor = null): PipelineInterface;
}

总结

无论是对不同技术的横向理解,还是基于 Laravel 或者某些开源插件,我们都能学习到技术之上的通用原理和方法。再将这些原理和方法反作用于我们的实际代码开发中。

最近闲来没事,自己参考 Laravel 去写个简易框架,也将League\Pipeline 引入到框架中使用。

「未完待续」

原文发布于微信公众号 - coding01(coding01)

原文发表时间:2018-07-08

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏腾讯云中间件团队的专栏

Raft 算法原理及其在 CMQ 中的应用(上)

Raft 算法是一种分布式一致性算法。与 paxos 相比,它更易理解和工程化。我们完整实现了该算法并将其应用在自研的高可靠消息中间件 CMQ 中,同时沉淀出对...

2K0
来自专栏针针小站

【Skill】斐讯K2T – 从Telnet到退货

首先先确认一下路由器的系统版本 确保系统在140版本以下 因为140以上的系统修复了这个漏洞 无法开启Telnet

3611
来自专栏北京马哥教育

从零开始的 Python 爬虫速成指南

2014
来自专栏极乐技术社区

『教程』如何突破微信小程序模板消息限制实现无限制主动推送

基于微信的通知渠道,微信小程序为开发者提供了可以高效触达用户的模板消息能力,在用户本人与小程序页面有交互行为后触发,通过微信聊天列表中的服务通知可快捷进入查看消...

5180
来自专栏Coding01

推荐一个 PHP 管道插件 League\Pipeline

水管太长,只要有一处破了,就会漏水了,而且不利于复杂环境弯曲转折使用。所以我们都会把水管分成很短的一节一节管道,然后最大化的让管道大小作用不同,因地制宜,组装在...

893
来自专栏SeanCheney的专栏

Scrapy1.4最新官方文档总结 1 介绍·安装安装

《Learning Scrapy》这本书是2016年1月出版的,作者使用的版本是Scrapy 1.0.3。 现在,Scrapy的最新版本是1.4。 ? Scra...

2728
来自专栏北京马哥教育

Python分布式爬虫详解(一)

当项目上升到一定境界时候,需要同时抓取几百个甚至上千个网站,这个时候,单个的爬虫已经满足不了需求。比如我们日常用的百度,它每天都会爬取大量的网站,一台服务器肯定...

1543
来自专栏张戈的专栏

HTTP加速器varnish安装小记(1)

上午共享的那个varnish 安装手册,个人看了下,有点不知所云,好吧~看来还是先安装玩玩! 苦逼公司服务器没法连外网,不能用什么 wget 或 yum 命令直...

3978
来自专栏Python攻城狮

UDP-用户数据报协议1.介绍2.udp网络程序-发送数据udp网络程序-发送、接收数据echo服务器广播用代码给飞秋发信息收消息_没绑定端口号收消息_绑定端口多线程聊天

UDP --- 用户数据报协议,是一个无连接的简单的面向数据报的运输层协议。UDP不提供可靠性,它只是把应用程序传给IP层的数据报发送出去,但是并不能保证它们能...

1524
来自专栏Java与Android技术栈

基于Vert.x和RxJava 2构建通用的爬虫框架

最近由于业务需要监控一些数据,虽然市面上有很多优秀的爬虫框架,但是我仍然打算从头开始实现一套完整的爬虫框架。

1273

扫码关注云+社区

领取腾讯云代金券