前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >深析Pipeline设计模式

深析Pipeline设计模式

作者头像
IT大咖说
发布2021-03-17 17:04:45
4.1K0
发布2021-03-17 17:04:45
举报
文章被收录于专栏:IT大咖说

目标

通过提供初始输入并传递处理后的输出以供下一阶段使用,从而允许在一系列阶段中进行数据处理。

解释

Pipeline模式为管道模式,也称为流水线模式。通过预先设定好的一系列的阶段来处理输入的数据,每个阶段的输出即是下一个阶段的输入。

模型图如下:

pipeline模式

从图中可以看出,整个流水线内数据流转是从上游到下游,上游的输出是下游的输入,按阶段依次执行。

Source: 表示数据来源,比如:KafkaSource。

Channel:表示对数据进行处理的组件,比如:JsonChannel,对数据进行json转换和处理。

Sink:表示数据落地或下沉的地方,比如:KafkaSink,表示数据发送到指定的kafka;DbSInk表示数据落地到DB。

可以看出,Pipeline是由Source(必须有),Channel(不一定需要),Sink(必须有)三种类型的组件自由组合而成的。

代码示例

代码语言:javascript
复制
/**
 *  生命周期
 */
public interface LifeCycle {
    /**
     *  初始化
     * @param config
     */
    void init(String config);

    /**
     *  启动
     */
    void startup();

    /**
     *  结束
     */
    void shutdown();
}
代码语言:javascript
复制
/**
 *  组件
 */
public interface Component<T> extends LifeCycle {
    /**
     *  组件名称
     * @return
     */
    String getName();

    /**
     *  获取下游组件
     * @return
     */
    Collection<Component> getDownStrems();

    /**
     *  执行
     */
    void execute(T o);
}
代码语言:javascript
复制
/**
 *  组件抽象实现
 * @param <T>   输入
 * @param <R>   输出
 */
public abstract class AbstractComponent<T, R> implements Component<T>{

    @Override
    public void execute(T o) {
        // 当前组件执行
        R r = doExecute(o);
        System.out.println(getName() + " receive " + o + " return " + r);
        // 获取下游组件,并执行
        Collection<Component> downStreams = getDownStrems();
        if (!CollectionUtils.isEmpty(downStreams)) {
            downStreams.forEach(c -> c.execute(r));
        }
    }

    /**
     *  具体组件执行处理
     * @param o 传入的数据
     * @return
     */
    protected abstract R doExecute(T o);

    @Override
    public void startup() {
        // 下游 -> 上游 依次启动
        Collection<Component> downStreams = getDownStrems();
        if (!CollectionUtils.isEmpty(downStreams)) {
            downStreams.forEach(Component::startup);
        }
        // do startup
        System.out.println("--------- " + getName() + " is start --------- ");
    }

    @Override
    public void shutdown() {
        // 上游 -> 下游 依次关闭
        // do shutdown
        System.out.println("--------- " + getName() + " is shutdown --------- ");

        Collection<Component> downStreams = getDownStrems();
        if (!CollectionUtils.isEmpty(downStreams)) {
            downStreams.forEach(Component::shutdown);
        }
    }
}
代码语言:javascript
复制
/**
 *  数据来源
 */
public abstract class Source<T, R> extends AbstractComponent<T, R>{

}
代码语言:javascript
复制
/**
 *  数据处理
 */
public abstract class Channel<T, R> extends AbstractComponent<T, R> {

}
代码语言:javascript
复制
/**
 *  数据落地/下沉
 */
public abstract class Sink<T, R> extends AbstractComponent<T, R> {

}

上面我们封装了基本的组件实现,下面扩展一下具体的实现,用一个简单的例子说明:

IntegerSource -> IncrChannel -> StringChannel -> ConsoleSink

从上面组件名称和方向可以判断出来我们要做的流水线是什么,大概过程如:

输入一个数字 -> 数字+1 -> 转为字符串 -> 控制台输出

那么我们开始来实现这个过程吧。

代码语言:javascript
复制
/**
 *  来源
 */
public class IntegerSource extends Source<Integer,  Integer> {

    private int val = 0;

    @Override
    protected Integer doExecute(Integer o) {
        return o;
    }

    @Override
    public void init(String config) {
        System.out.println("--------- " + getName() + " init --------- ");
        val = 1;
    }

    @Override
    public void startup() {
        super.startup();
        execute(val);
    }

    @Override
    public String getName() {
        return "Integer-Source";
    }

    @Override
    public Collection<Component> getDownStrems() {
        return Collections.singletonList(new IncrChannel());
    }
}
代码语言:javascript
复制
/**
 *  处理:数字+1
 */
public class IncrChannel extends Channel<Integer, Integer> {

    @Override
    protected Integer doExecute(Integer o) {
        return o + 1;
    }

    @Override
    public String getName() {
        return "Incr-Channel";
    }

    @Override
    public Collection<Component> getDownStrems() {
        return Collections.singletonList(new StringChannel());
    }

    @Override
    public void init(String config) {

    }
}
代码语言:javascript
复制
/**
 *  处理:转为字符串
 */
public class StringChannel extends Channel<Integer, String> {

    @Override
    protected String doExecute(Integer o) {
        return "str" + o;
    }

    @Override
    public String getName() {
        return "String-Channel";
    }

    @Override
    public Collection<Component> getDownStrems() {
        return Collections.singletonList(new ConsoleSink());
    }

    @Override
    public void init(String config) {

    }
}
代码语言:javascript
复制
/**
 *  控制台
 */
public class ConsoleSink extends Sink<String, Void> {

    @Override
    protected Void doExecute(String o) {
        return null;
    }

    @Override
    public String getName() {
        return "Console-Sink";
    }

    @Override
    public Collection<Component> getDownStrems() {
        return null;
    }

    @Override
    public void init(String config) {

    }
}

好了,扩展实现已完成,整个流水线基本已设置好,我们来测试一下吧

代码语言:javascript
复制
/**
 *  流水线
 */
public class Pipeline implements LifeCycle{
    /**
     *  数据源
     */
    private Source source;

    public Pipeline(Source source) {
        this.source = source;
    }

    @Override
    public void init(String config) {
        // 初始化
        System.out.println("--------- Pipeline init --------- ");
        source.init(null);
    }

    @Override
    public void startup() {
        // 启动
        System.out.println("--------- Pipeline startup --------- ");
        source.startup();
    }

    @Override
    public void shutdown() {
        // 结束
        source.shutdown();
        System.out.println("--------- Pipeline shutdown --------- ");
    }
}
代码语言:javascript
复制
Pipeline pipeline = new Pipeline(new IntegerSource());
pipeline.init(null);
pipeline.startup();
pipeline.shutdown();

执行后结果如下:

--------- Pipeline init --------- --------- Integer-Source init ---------

--------- Pipeline startup ---------

--------- Console-Sink is start ---------

--------- String-Channel is start ---------

--------- Incr-Channel is start ---------

--------- Integer-Source is start ---------

Integer-Source receive 1 return 1

Incr-Channel receive 1 return 2

String-Channel receive 2 return str2

Console-Sink receive str2 return null

--------- Integer-Source is shutdown ---------

--------- Incr-Channel is shutdown ---------

--------- String-Channel is shutdown ---------

--------- Console-Sink is shutdown ---------

--------- Pipeline shutdown ---------

总结

本文我们介绍了常见的设计模式之Pipeline模式,并通过简单的代码示例说明了这种模式的实现及目标。

来源:

https://www.toutiao.com/i6936390184397308447/

“IT大咖说”欢迎广大技术人员投稿,投稿邮箱:aliang@itdks.com

来都来了,走啥走,留个言呗~

IT大咖说 | 关于版权

由“IT大咖说(ID:itdakashuo)”原创的文章,转载时请注明作者、出处及微信公众号。投稿、约稿、转载请加微信:ITDKS10(备注:投稿),茉莉小姐姐会及时与您联系!

感谢您对IT大咖说的热心支持!

相关推荐 推荐文章

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-03-15,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 IT大咖说 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 目标
  • 解释
  • 代码示例
  • 总结
相关产品与服务
云数据库 MySQL
腾讯云数据库 MySQL(TencentDB for MySQL)为用户提供安全可靠,性能卓越、易于维护的企业级云数据库服务。其具备6大企业级特性,包括企业级定制内核、企业级高可用、企业级高可靠、企业级安全、企业级扩展以及企业级智能运维。通过使用腾讯云数据库 MySQL,可实现分钟级别的数据库部署、弹性扩展以及全自动化的运维管理,不仅经济实惠,而且稳定可靠,易于运维。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档