首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊如何利用管道模式来进行业务编排(下篇)

聊聊如何利用管道模式来进行业务编排(下篇)

原创
作者头像
lyb-geek
发布2022-09-20 09:48:05
2940
发布2022-09-20 09:48:05
举报
文章被收录于专栏:Linyb极客之路Linyb极客之路

前言

上篇文章我们介绍利用管道模式来进行业务编排的2种实现方式。本文又来介绍其他实现方式

实现方式

方式一:利用springboot自动装配

1、新建管道实体

@Data
@AllArgsConstructor
@NoArgsConstructor
public class PipelineDefinition {

    public static final String PREFIX = "lybgeek_pipeline_";

    private String comsumePipelineName;

    private List<String> pipelineClassNames;
}
@Data
@AllArgsConstructor
@NoArgsConstructor
@ConfigurationProperties(prefix = PipelineDefinitionProperties.PREFIX)
public class PipelineDefinitionProperties {

    public final static String PREFIX = "lybgeek.pipeline";

    private List<PipelineDefinition> chain;
}

2、编写自动装配类

@Configuration
@EnableConfigurationProperties(PipelineDefinitionProperties.class)
public class PipelineAutoConfiguration implements BeanFactoryAware,InitializingBean, SmartInitializingSingleton {


    @Autowired
    private PipelineDefinitionProperties pipelineDefinitionProperties;

    private DefaultListableBeanFactory defaultListableBeanFactory;


    @Override
    public void setBeanFactory(BeanFactory beanFactory) throws BeansException {

        defaultListableBeanFactory = (DefaultListableBeanFactory)beanFactory;

    }

    private void registerPipeline(DefaultListableBeanFactory defaultListableBeanFactory, PipelineDefinition pipelineDefinition) {
        LinkedBlockingDeque linkedBlockingDeque = buildPipelineQuque(pipelineDefinition);
        GenericBeanDefinition beanDefinition = (GenericBeanDefinition) BeanDefinitionBuilder.genericBeanDefinition(ChannelPipeline.class).getBeanDefinition();
        beanDefinition.getPropertyValues().addPropertyValue("channelHandlers",linkedBlockingDeque);
        defaultListableBeanFactory.registerBeanDefinition(PipelineDefinition.PREFIX + pipelineDefinition.getComsumePipelineName() ,beanDefinition);
    }

    @SneakyThrows
    private LinkedBlockingDeque buildPipelineQuque(PipelineDefinition pipelineDefinition) {
        List<String> pipelineClassNames = pipelineDefinition.getPipelineClassNames();
        if(CollectionUtil.isEmpty(pipelineClassNames)){
           throw new PipelineException("pipelineClassNames must config");
        }

        LinkedBlockingDeque linkedBlockingDeque = new LinkedBlockingDeque();
        for (String pipelineClassName : pipelineClassNames) {
            Class<?> pipelineClassClass = Class.forName(pipelineClassName);
            if(!AbstactChannelHandler.class.isAssignableFrom(pipelineClassClass)){
                throw new PipelineException("pipelineClassNames must be 【com.github.lybgeek.pipeline.handler.AbstactChannelHandler】 subclass");
            }
            Object pipeline = pipelineClassClass.getDeclaredConstructor().newInstance();
            linkedBlockingDeque.addLast(pipeline);
        }

        return linkedBlockingDeque;
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        if(CollectionUtil.isNotEmpty(pipelineDefinitionProperties.getChain())){
            for (PipelineDefinition pipelineDefinition : pipelineDefinitionProperties.getChain()) {
                registerPipeline(defaultListableBeanFactory, pipelineDefinition);
            }
        }
    }

    @Override
    public void afterSingletonsInstantiated() {
        Map<String, ChannelPipeline> pipelineBeanMap = defaultListableBeanFactory.getBeansOfType(ChannelPipeline.class);
        pipelineBeanMap.forEach((key,bean)->{
            bean.setHandlerContext(ChannelHandlerContext.getCurrentContext());
        });

    }
}

3、编写spring.factories

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.github.lybgeek.pipeline.spring.autoconfigure.PipelineAutoConfiguration\

业务项目如何使用该方式实现业务编排

示例:

1、创建管道执行器

@Slf4j
public class UserCheckChannelHandler extends AbstactChannelHandler {

    
    @Override
    public boolean handler(ChannelHandlerContext chx) {
        ChannelHandlerRequest channelHandlerRequest = chx.getChannelHandlerRequest();
        System.out.println("yml------------------------------------步骤一:用户数据校验【"+channelHandlerRequest.getRequestId()+"】");
        Object params = channelHandlerRequest.getParams();
        if(params instanceof User){
            User user = (User)params;
            if(StringUtils.isBlank(user.getFullname())){
                log.error("用户名不能为空");
                return false;
            }
            return true;
        }


        return false;
    }
}
@Slf4j
public class UserFillUsernameAndEmailChannelHandler extends AbstactChannelHandler {
    @SneakyThrows
    @Override
    public boolean handler(ChannelHandlerContext chx) {
        ChannelHandlerRequest channelHandlerRequest = chx.getChannelHandlerRequest();
        System.out.println("yml------------------------------------步骤二:用户名以及邮箱填充【将汉语转成拼音填充】【"+channelHandlerRequest.getRequestId()+"】");
        Object params = channelHandlerRequest.getParams();
        if(params instanceof User){
            User user = (User)params;
            String fullname = user.getFullname();
            HanyuPinyinOutputFormat hanyuPinyinOutputFormat = new HanyuPinyinOutputFormat();
            hanyuPinyinOutputFormat.setToneType(HanyuPinyinToneType.WITHOUT_TONE);
            String username = PinyinHelper.toHanYuPinyinString(fullname, hanyuPinyinOutputFormat);
            user.setUsername(username);
            user.setEmail(username + "@qq.com");
            return true;
        }


        return false;
    }
}

。。。其他执行器具体查看链接代码

2、配置yml文件

lybgeek:
  pipeline:
    chain:
      - comsumePipelineName: userYmlService
        pipelineClassNames:
          - com.github.lybgeek.pipeline.spring.test.yml.handler.UserCheckChannelHandler
          - com.github.lybgeek.pipeline.spring.test.yml.handler.UserFillUsernameAndEmailChannelHandler
          - com.github.lybgeek.pipeline.spring.test.yml.handler.UserPwdEncryptChannelHandler
          - com.github.lybgeek.pipeline.spring.test.yml.handler.UserMockSaveChannelHandler
          - com.github.lybgeek.pipeline.spring.test.yml.handler.UserPrintChannleHandler

3、具体业务service引入管道bean

@Service
public class UserYmlServiceImpl implements UserYmlService {


    @Autowired
    private ApplicationContext applicationContext;

    @Override
    public boolean save(User user) {

        ChannelPipeline pipeline =  applicationContext.getBean(ChannelPipeline.class,PipelineDefinition.PREFIX + StringUtils.uncapitalize(UserYmlService.class.getSimpleName()));

        return pipeline.start(ChannelHandlerRequest.builder().params(user).build());
    }
}

4、测试

    @Test
    public void testPipelineYml(){
        boolean isOk = userYmlService.save(user);
        Assert.assertTrue(isOk);

    }

方式二:利用spring自定义标签

1、定义xsd约束文件pipeline.xsd

<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<xsd:schema xmlns:xsd="http://www.w3.org/2001/XMLSchema"
            xmlns:beans="http://www.springframework.org/schema/beans"
            xmlns:tool="http://www.springframework.org/schema/tool"
            xmlns="http://lybgeek.github.com/schema/pipeline"
            targetNamespace="http://lybgeek.github.com/schema/pipeline">

    <xsd:import namespace="http://www.w3.org/XML/1998/namespace"/>
    <xsd:import namespace="http://www.springframework.org/schema/beans"
                schemaLocation="http://www.springframework.org/schema/beans/spring-beans.xsd"/>
    <xsd:import namespace="http://www.springframework.org/schema/tool"/>

    <xsd:annotation>
        <xsd:documentation>
            <![CDATA[ Namespace support for pipeline services ]]></xsd:documentation>
    </xsd:annotation>


    <xsd:complexType name="pipelineType">
        <xsd:choice>
            <xsd:element ref="pipelineHandler" minOccurs="1" maxOccurs="unbounded"/>
        </xsd:choice>
        <xsd:attribute name="id" type="xsd:ID">
            <xsd:annotation>
                <xsd:documentation><![CDATA[ The unique identifier for a bean. ]]></xsd:documentation>
            </xsd:annotation>
        </xsd:attribute>
        <xsd:attribute name="consumePipelinesServiceClassName" type="xsd:string" use="required">
            <xsd:annotation>
                <xsd:documentation><![CDATA[ consumePipelinesService class name  ]]></xsd:documentation>
            </xsd:annotation>
        </xsd:attribute>
        <xsd:attribute name="consumePipelinesMethod" type="xsd:string" use="required">
            <xsd:annotation>
                <xsd:documentation><![CDATA[ consumePipelinesMethod name  ]]></xsd:documentation>
            </xsd:annotation>
        </xsd:attribute>
        <xsd:attribute name="argsType" type="xsd:string" use="required">
            <xsd:annotation>
                <xsd:documentation><![CDATA[ consumePipelinesMethod args type , multiple args types are separated by commas ]]></xsd:documentation>
            </xsd:annotation>
        </xsd:attribute>
    </xsd:complexType>

    <xsd:complexType name="pipelineHandlerType">
        <xsd:attribute name="className" type="xsd:string" use="required">
            <xsd:annotation>
                <xsd:documentation><![CDATA[ pipelineHanlder class name]]></xsd:documentation>
            </xsd:annotation>
        </xsd:attribute>
        <xsd:attribute name="order" type="xsd:string" use="required">
            <xsd:annotation>
                <xsd:documentation><![CDATA[ pipeline class name]]></xsd:documentation>
            </xsd:annotation>
        </xsd:attribute>
    </xsd:complexType>



    <xsd:element name="pipelineHandler" type="pipelineHandlerType">
        <xsd:annotation>
            <xsd:documentation><![CDATA[ The pipelineHandler config ]]></xsd:documentation>
        </xsd:annotation>
    </xsd:element>

    <xsd:element name="pipeline" type="pipelineType">
        <xsd:annotation>
            <xsd:documentation><![CDATA[ The pipeline config ]]></xsd:documentation>
        </xsd:annotation>
    </xsd:element>

</xsd:schema>

2、配置xsd约束文件

在classpath下的resources文件夹新建META-INF文件夹,再建立一个文件spring.schemas,内容如下

http\://lybgeek.github.com/schema/pipeline/pipeline.xsd=META-INF/pipeline.xsd

3、定义解析自定义标签的类

public class PipelineNamespaceHandler extends NamespaceHandlerSupport {

    @Override
    public void init() {
       registerBeanDefinitionParser("pipeline",new PipelineBeanDefinitionParser());
    }
}
public class PipelineBeanDefinitionParser implements BeanDefinitionParser {

    @Override
    public BeanDefinition parse(Element element, ParserContext parserContext) {
        PipelineConfig pipelineConfig = buildPipelineConfig(element);
        List<HandlerInvotation> handlerInvotations = this.buildHandlerInvotations(pipelineConfig);
        GenericBeanDefinition beanDefinition = getGenericBeanDefinition(element, parserContext, pipelineConfig, handlerInvotations);
        return beanDefinition;
    }

    private GenericBeanDefinition getGenericBeanDefinition(Element element, ParserContext parserContext, PipelineConfig pipelineConfig, List<HandlerInvotation> handlerInvotations) {
        GenericBeanDefinition beanDefinition = (GenericBeanDefinition) BeanDefinitionBuilder.genericBeanDefinition().getBeanDefinition();
        beanDefinition.getPropertyValues().addPropertyValue("pipelineServiceClz",pipelineConfig.getConsumePipelinesService());
        beanDefinition.getPropertyValues().addPropertyValue("handlerInvotations",handlerInvotations);
        beanDefinition.getPropertyValues().addPropertyValue("createByXml",true);
        beanDefinition.setBeanClass(ComsumePipelineFactoryBean.class);
        String beanName = BeanUtils.generateBeanName(element,"id",parserContext,pipelineConfig.getConsumePipelinesService().getSimpleName());
        parserContext.getRegistry().registerBeanDefinition(beanName,beanDefinition);
        return beanDefinition;
    }

    @SneakyThrows
    private List<HandlerInvotation> buildHandlerInvotations(PipelineConfig pipelineConfig){
        List<HandlerInvotation> handlerInvotations = new ArrayList<>();
        for (PipelineHandlerConfig pipelineHandlerConfig : pipelineConfig.getPipelineChain()) {
            if(!AbstactChannelHandler.class.isAssignableFrom(pipelineHandlerConfig.getPipelineClass())){
                throw new PipelineException("pipelineHandler className must be 【com.github.lybgeek.pipeline.handler.AbstactChannelHandler】 subclass");
            }

            AbstactChannelHandler channelHandler = (AbstactChannelHandler) pipelineHandlerConfig.getPipelineClass().getDeclaredConstructor().newInstance();
            HandlerInvotation invotation = HandlerInvotation.builder()
                    .args(pipelineConfig.getArgs())
                    .handler(channelHandler)
                    .order(pipelineHandlerConfig.getOrder())
                    .consumePipelinesMethod(pipelineConfig.getConsumePipelinesMethod())
                    .build();
            handlerInvotations.add(invotation);

        }
        return handlerInvotations;
    }

    @SneakyThrows
    private PipelineConfig buildPipelineConfig(Element element){
        String argsType = element.getAttribute("argsType");
        String[] argsTypeArr = trimArrayElements(commaDelimitedListToStringArray(argsType));
        String consumePipelinesMethod = element.getAttribute("consumePipelinesMethod");
        String consumePipelinesServiceClassName = element.getAttribute("consumePipelinesServiceClassName");


        Class[] args = null;
        if(ArrayUtil.isNotEmpty(argsTypeArr)){
            args = new Class[argsTypeArr.length];
            for (int i = 0; i < argsTypeArr.length; i++) {
                Class argType = Class.forName(argsTypeArr[i]);
                args[i] = argType;
            }
        }

        List<PipelineHandlerConfig> pipelineHandlerConfigs = buildPipelineHandlerConfig(element);

        return PipelineConfig.builder().args(args)
                .consumePipelinesMethod(consumePipelinesMethod)
                .consumePipelinesService(Class.forName(consumePipelinesServiceClassName))
                .pipelineChain(pipelineHandlerConfigs)
                .build();
    }

    @SneakyThrows
    private List<PipelineHandlerConfig> buildPipelineHandlerConfig(Element element){
        NodeList nodeList = element.getChildNodes();
        if (nodeList == null) {
            return Collections.emptyList();
        }

        List<PipelineHandlerConfig> pipelineHandlerConfigs = new ArrayList<>();
        for (int i = 0; i < nodeList.getLength(); i++) {
            if (!(nodeList.item(i) instanceof Element)) {
                continue;
            }
            Element childElement = (Element) nodeList.item(i);
            if ("pipelineHandler".equals(childElement.getNodeName()) || "pipelineHandler".equals(childElement.getLocalName())) {
                String pipelineHanlderClassName = childElement.getAttribute("className");
                String pipelineHanlderOrder = childElement.getAttribute("order");
                Class pipelineHanlderClass = Class.forName(pipelineHanlderClassName);
                PipelineHandlerConfig pipelineHandlerConfig = PipelineHandlerConfig.builder()
                        .PipelineClass(pipelineHanlderClass)
                        .order(Integer.valueOf(pipelineHanlderOrder))
                        .build();
                pipelineHandlerConfigs.add(pipelineHandlerConfig);
            }
        }

        return pipelineHandlerConfigs;
    }
}

4、注册解析类

在META-INF文件夹新建spring.handlers文件,内容如下

http\://lybgeek.github.com/schema/pipeline=com.github.lybgeek.pipeline.spring.shema.PipelineNamespaceHandler

业务项目如何使用该方式实现业务编排

示例:

1、创建管道执行器

@Slf4j
public class UserCheckChannelHandler extends AbstactChannelHandler {

    
    @Override
    public boolean handler(ChannelHandlerContext chx) {
        ChannelHandlerRequest channelHandlerRequest = chx.getChannelHandlerRequest();
        System.out.println("XML------------------------------------步骤一:用户数据校验【"+channelHandlerRequest.getRequestId()+"】");
        String json = JSON.toJSONString(channelHandlerRequest.getParams());
        List<User> users = JSON.parseArray(json,User.class);
        if(CollectionUtil.isEmpty(users) || StringUtils.isBlank(users.get(0).getFullname())){
            log.error("用户名不能为空");
            return false;
        }
        return true;


    }
}
@Slf4j
public class UserFillUsernameAndEmailChannelHandler extends AbstactChannelHandler {
    @SneakyThrows
    @Override
    public boolean handler(ChannelHandlerContext chx) {
        ChannelHandlerRequest channelHandlerRequest = chx.getChannelHandlerRequest();
        System.out.println("XML------------------------------------步骤二:用户名以及邮箱填充【将汉语转成拼音填充】【"+channelHandlerRequest.getRequestId()+"】");
        String json = JSON.toJSONString(channelHandlerRequest.getParams());
        List<User> users = JSON.parseArray(json,User.class);
        if(CollectionUtil.isNotEmpty(users)){
            User user = users.get(0);
            String fullname = user.getFullname();
            HanyuPinyinOutputFormat hanyuPinyinOutputFormat = new HanyuPinyinOutputFormat();
            hanyuPinyinOutputFormat.setToneType(HanyuPinyinToneType.WITHOUT_TONE);
            String username = PinyinHelper.toHanYuPinyinString(fullname, hanyuPinyinOutputFormat);
            user.setUsername(username);
            user.setEmail(username + "@qq.com");
            return true;

        }



        return false;
    }
}

。。。其他执行器具体查看链接代码

2、定义管道xml

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:lybgeek="http://lybgeek.github.com/schema/pipeline"
       xmlns="http://www.springframework.org/schema/beans"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd
       http://lybgeek.github.com/schema/pipeline http://lybgeek.github.com/schema/pipeline/pipeline.xsd">

    
    <lybgeek:pipeline consumePipelinesServiceClassName="com.github.lybgeek.pipeline.spring.test.xml.service.UserXmlService" consumePipelinesMethod="save" argsType="com.github.lybgeek.pipeline.spring.test.model.User">
        <lybgeek:pipelineHandler className="com.github.lybgeek.pipeline.spring.test.xml.handler.UserCheckChannelHandler" order="1"/>
        <lybgeek:pipelineHandler className="com.github.lybgeek.pipeline.spring.test.xml.handler.UserFillUsernameAndEmailChannelHandler" order="2"/>
        <lybgeek:pipelineHandler className="com.github.lybgeek.pipeline.spring.test.xml.handler.UserPwdEncryptChannelHandler" order="3"/>
        <lybgeek:pipelineHandler className="com.github.lybgeek.pipeline.spring.test.xml.handler.UserMockSaveChannelHandler" order="4"/>
        <lybgeek:pipelineHandler className="com.github.lybgeek.pipeline.spring.test.xml.handler.UserPrintChannleHandler" order="5"/>
    </lybgeek:pipeline>

3、创建业务管道类

public interface UserXmlService {
    boolean save(User user);
}

直接定义接口即可

4、项目启动类上加上@ImportResource("classpath:/pipeline.xml")

@SpringBootApplication
@ImportResource("classpath:/pipeline.xml")
public class SpringPipelineApplication  {

    public static void main(String[] args) {
        SpringApplication.run(SpringPipelineApplication.class);
    }



}

5、测试

    @Test
    public void testPipelineXml(){
        boolean isOk = userXmlService.save(user);
        Assert.assertTrue(isOk);

    }

总结

本文的管道模式的核心逻辑内核和上篇文章是一样,只是把管道执行器通过配置文件集中管理起来,这个后续维护也比较不容易出错

demo链接

https://github.com/lyb-geek/springboot-learning/tree/master/springboot-pipeline

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 实现方式
    • 方式一:利用springboot自动装配
      • 业务项目如何使用该方式实现业务编排
        • 方式二:利用spring自定义标签
          • 业务项目如何使用该方式实现业务编排
          • 总结
          • demo链接
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档