前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊如何利用管道模式来进行业务编排(上篇)

聊聊如何利用管道模式来进行业务编排(上篇)

原创
作者头像
lyb-geek
发布2022-08-30 10:00:43
3820
发布2022-08-30 10:00:43
举报
文章被收录于专栏:Linyb极客之路Linyb极客之路

前言

1、什么是管道模式

管道模式不属于我们常说的23种设计模式中的一种,它可以看成是责任链模式的一种变体。所谓的管道模式用技术话来说,就是把数据传递给一个任务队列,由任务队列按次序依次对数据进行加工处理。

2、什么样的场景适合用管道模式

当业务流程比较复杂时,需要拆分成多个子步骤,且每个子步骤可以自由组合,替换,新增,删除的场景

实现管道的一般套路

1、封装管道数据透传上下文

public class ChannelHandlerContext extends ConcurrentHashMap<String,Object> {

    protected static Class<? extends ChannelHandlerContext> contextClass = ChannelHandlerContext.class;

    protected static final TransmittableThreadLocal<? extends ChannelHandlerContext> CHAIN_CONTEXT = new TransmittableThreadLocal<ChannelHandlerContext>() {
        @Override
        protected ChannelHandlerContext initialValue() {
            try {
                return contextClass.getDeclaredConstructor().newInstance();
            } catch (Throwable e) {
                throw new RuntimeException(e);
            }
        }
    };

    /**
     * 覆盖默认的管道上下文
     *
     * @param clazz
     */
    public static void setContextClass(Class<? extends ChannelHandlerContext> clazz) {
        contextClass = clazz;
    }

    /**
     * 获取当前管道上下文
     *
     *
     */
    public static final ChannelHandlerContext getCurrentContext() {
        return CHAIN_CONTEXT.get();
    }

    /**
     * 释放上下文资源
     *
     * @return
     */
    public void release() {
        this.clear();
        CHAIN_CONTEXT.remove();
    }

    /**
     *
     * 获取上下文默认值
     * @param key
     * @param defaultValue
     * @return
     */
    public Object getDefault(String key, Object defaultValue) {
        return Optional.ofNullable(get(key)).orElse(defaultValue);
    }

    public static final String CHANNEL_HANDLER_REQUEST_KEY = "channelHandlerRequest";

    public ChannelHandlerRequest getChannelHandlerRequest() {
        return (ChannelHandlerRequest) this.getDefault(CHANNEL_HANDLER_REQUEST_KEY,ChannelHandlerRequest.builder().build());
    }


}

2、定义管道抽象执行器

public abstract class AbstactChannelHandler {

    private String channelHandlerName;

    public String getChannelHandlerName() {
        return channelHandlerName;
    }

    public void setChannelHandlerName(String channelHandlerName) {
        this.channelHandlerName = channelHandlerName;
    }

    public abstract boolean handler(ChannelHandlerContext chx);


}

3、定义管道

@Slf4j
public class ChannelPipeline {

    private LinkedBlockingDeque<AbstactChannelHandler> channelHandlers = new LinkedBlockingDeque();

    private ChannelHandlerContext handlerContext;


    public ChannelPipeline addFirst(AbstactChannelHandler channelHandler){
       return addFirst(null,channelHandler);
    }

    public ChannelPipeline addLast(AbstactChannelHandler channelHandler){
      return addLast(null,channelHandler);
    }

    public ChannelPipeline addFirst(String channelHandlerName,AbstactChannelHandler channelHandler){
        if(StringUtils.isNotBlank(channelHandlerName)){
            channelHandler.setChannelHandlerName(channelHandlerName);
        }
        channelHandlers.addFirst(channelHandler);
        return this;
    }

    public ChannelPipeline addLast(String channelHandlerName,AbstactChannelHandler channelHandler){
        if(org.apache.commons.lang3.StringUtils.isNotBlank(channelHandlerName)){
            channelHandler.setChannelHandlerName(channelHandlerName);
        }
        channelHandlers.addLast(channelHandler);
        return this;
    }


    public void setChannelHandlers(LinkedBlockingDeque<AbstactChannelHandler> channelHandlers) {
        this.channelHandlers = channelHandlers;
    }

    public ChannelHandlerContext getHandlerContext() {
        return handlerContext;
    }

    public void setHandlerContext(ChannelHandlerContext handlerContext) {
        this.handlerContext = handlerContext;
    }

    public boolean start(ChannelHandlerRequest channelHandlerRequest){
         if(channelHandlers.isEmpty()){
             log.warn("channelHandlers is empty");
             return false;
         }

        return handler(channelHandlerRequest);
    }

    private boolean handler(ChannelHandlerRequest channelHandlerRequest) {
        if(StringUtils.isBlank(channelHandlerRequest.getRequestId())){
            channelHandlerRequest.setRequestId(String.valueOf(SnowflakeUtils.getNextId()));
        }
        handlerContext.put(ChannelHandlerContext.CHANNEL_HANDLER_REQUEST_KEY,channelHandlerRequest);
        boolean isSuccess = true;
        try {
            for (AbstactChannelHandler channelHandler : channelHandlers) {
                  isSuccess = channelHandler.handler(handlerContext);
                if(!isSuccess){
                    break;
                }
            }

            if(!isSuccess){
                channelHandlers.clear();
            }
        } catch (Exception e) {
            log.error("{}",e.getMessage());
            isSuccess = false;
        } finally {
            handlerContext.release();
        }
        return isSuccess;
    }

}

4、根据业务的复杂度拆分不同子任务管道执行器

@Slf4j
public class UserCheckChannelHandler extends AbstactChannelHandler {

    
    @Override
    public boolean handler(ChannelHandlerContext chx) {
        ChannelHandlerRequest channelHandlerRequest = chx.getChannelHandlerRequest();
        System.out.println("------------------------------------步骤一:用户数据校验【"+channelHandlerRequest.getRequestId()+"】");
        Object params = channelHandlerRequest.getParams();
        if(params instanceof User){
            User user = (User)params;
            if(StringUtils.isBlank(user.getFullname())){
                log.error("用户名不能为空");
                return false;
            }
            return true;
        }


        return false;
    }
}
@Slf4j
public class UserFillUsernameAndEmailChannelHandler extends AbstactChannelHandler {
    @SneakyThrows
    @Override
    public boolean handler(ChannelHandlerContext chx) {
        ChannelHandlerRequest channelHandlerRequest = chx.getChannelHandlerRequest();
        System.out.println("------------------------------------步骤二:用户名以及邮箱填充【将汉语转成拼音填充】【"+channelHandlerRequest.getRequestId()+"】");
        Object params = channelHandlerRequest.getParams();
        if(params instanceof User){
            User user = (User)params;
            String fullname = user.getFullname();
            HanyuPinyinOutputFormat hanyuPinyinOutputFormat = new HanyuPinyinOutputFormat();
            hanyuPinyinOutputFormat.setToneType(HanyuPinyinToneType.WITHOUT_TONE);
            String username = PinyinHelper.toHanYuPinyinString(fullname, hanyuPinyinOutputFormat);
            user.setUsername(username);
            user.setEmail(username + "@qq.com");
            return true;
        }


        return false;
    }
}
public class UserPwdEncryptChannelHandler extends AbstactChannelHandler {
    @Override
    public boolean handler(ChannelHandlerContext chx) {
        ChannelHandlerRequest channelHandlerRequest = chx.getChannelHandlerRequest();
        System.out.println("------------------------------------步骤三:用户密码明文转密文【"+channelHandlerRequest.getRequestId()+"】");
        Object params = channelHandlerRequest.getParams();
        if(params instanceof User){
            String encryptPwd = DigestUtil.sha256Hex(((User) params).getPassword());
            ((User) params).setPassword(encryptPwd);
            return true;
        }

        return false;
    }
}
public class UserMockSaveChannelHandler extends AbstactChannelHandler {

    @Override
    public boolean handler(ChannelHandlerContext chx) {
        ChannelHandlerRequest channelHandlerRequest = chx.getChannelHandlerRequest();
        System.out.println("------------------------------------步骤四:模拟用户数据落库【"+channelHandlerRequest.getRequestId()+"】");
        Object params = channelHandlerRequest.getParams();
        if(params instanceof User){
            Map<String, User> userMap = new HashMap<>();
            User user = (User)params;
            userMap.put(user.getUsername(),user);
            chx.put("userMap",userMap);
            return true;
        }


        return false;
    }
}
public class UserPrintChannleHandler extends AbstactChannelHandler {
    @Override
    public boolean handler(ChannelHandlerContext chx) {
        ChannelHandlerRequest channelHandlerRequest = chx.getChannelHandlerRequest();
        System.out.println("------------------------------------步骤五:打印用户数据【"+channelHandlerRequest.getRequestId()+"】");
        Object params = channelHandlerRequest.getParams();
        if(params instanceof User){
            Object userMap = chx.get("userMap");
            if(userMap instanceof Map){
                Map map = (Map)userMap;
                if(map.containsKey(((User) params).getUsername())){
                    System.out.println(map.get(((User) params).getUsername()));
                    return true;
                }
            }
        }

        return false;
    }
}

5、对各个子任务进行编排组合

@Service
public class UserServiceImpl implements UserService {

    @Override
    public boolean save(User user) {
       return ChannelPipelineExecutor.pipeline()
                .addLast(new UserCheckChannelHandler())
                .addLast(new UserFillUsernameAndEmailChannelHandler())
                .addLast(new UserPwdEncryptChannelHandler())
                .addLast(new UserMockSaveChannelHandler())
                .addLast(new UserPrintChannleHandler())
                .start(ChannelHandlerRequest.builder().params(user).build());
    }
}

6、测试

  Faker faker = Faker.instance(Locale.CHINA);
        User user = User.builder().age(20)
                .fullname(faker.name().fullName())
                .mobile(faker.phoneNumber().phoneNumber())
                .password("123456").build();
        userService.save(user);

查看控制台

思考一下:上述实现的管道模式,有没有优化的空间?

在步骤5对各个子任务进行编排组合,假设子业务存在N个步骤,我们需要addLast N次,感觉有点硬编码了。因此我们可以做如下改造

改造

1、定义管道注解

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@Documented
@Component
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public @interface Pipeline {

    Class consumePipelinesService();

    String consumePipelinesMethod();

    Class[] args() default {};

    int order();
}

2、定义管道扫描器

public class PipelineClassPathBeanDefinitionScanner extends ClassPathBeanDefinitionScanner {

    public PipelineClassPathBeanDefinitionScanner(BeanDefinitionRegistry registry) {
        super(registry);
    }


    @Override
    protected Set<BeanDefinitionHolder> doScan(String... basePackages) {
        Set<BeanDefinitionHolder> beanDefinitionHolders = super.doScan(basePackages);
        for (BeanDefinitionHolder beanDefinitionHolder : beanDefinitionHolders) {
            GenericBeanDefinition beanDefinition = (GenericBeanDefinition) beanDefinitionHolder.getBeanDefinition();
            String className = beanDefinition.getBeanClassName();
            beanDefinition.getPropertyValues().addPropertyValue("pipelineServiceClz",className);
            beanDefinition.setBeanClass(ComsumePipelineFactoryBean.class);

        }

        return beanDefinitionHolders;

    }

    @Override
    protected boolean isCandidateComponent(AnnotatedBeanDefinition beanDefinition) {
        return beanDefinition.getMetadata().isInterface();
    }
}

3、定义管道注册器

public class PipelineImportBeanDefinitionRegistrar implements ImportBeanDefinitionRegistrar {

    @Override
    public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {


        PipelineClassPathBeanDefinitionScanner scanner = new PipelineClassPathBeanDefinitionScanner(registry);
        scanner.addIncludeFilter(new AnnotationTypeFilter(FunctionalInterface.class));
        Set<String> basePackages = getBasePackages(importingClassMetadata);
        String[] basePackageArr = {};
        scanner.scan(basePackages.toArray(basePackageArr));

    }

    protected Set<String> getBasePackages(AnnotationMetadata importingClassMetadata) {
        Map<String, Object> attributes = importingClassMetadata.getAnnotationAttributes(EnabledPipeline.class.getCanonicalName());

        Set<String> basePackages = new HashSet<>();

        for (String pkg : (String[]) attributes.get("basePackages")) {
            if (StringUtils.hasText(pkg)) {
                basePackages.add(pkg);
            }
        }

        if (basePackages.isEmpty()) {
            basePackages.add(
                    ClassUtils.getPackageName(importingClassMetadata.getClassName()));
        }
        return basePackages;
    }
}

4、定义EnableXXX注解

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@Documented
@Import(PipelineImportBeanDefinitionRegistrar.class)
public @interface EnabledPipeline {

    String[] basePackages() default {};
}

注: 此外还需定义管道代理和管道factoryBean,因为篇幅就不贴了。感兴趣的朋友就查看文末的demo链接

5、将原有的管道任务执行器,改造成如下

@Slf4j
@Pipeline(consumePipelinesService = UserService.class,consumePipelinesMethod = "save",args = {User.class},order = 1)
public class UserCheckChannelHandler extends AbstactChannelHandler {

    
    @Override
    public boolean handler(ChannelHandlerContext chx) {
        ChannelHandlerRequest channelHandlerRequest = chx.getChannelHandlerRequest();
        System.out.println("------------------------------------步骤一:用户数据校验【"+channelHandlerRequest.getRequestId()+"】");
        String json = JSON.toJSONString(channelHandlerRequest.getParams());
        List<User> users = JSON.parseArray(json,User.class);
        if(CollectionUtil.isEmpty(users) || StringUtils.isBlank(users.get(0).getFullname())){
            log.error("用户名不能为空");
            return false;
        }
        return true;


    }
}
@Slf4j
@Pipeline(consumePipelinesService = UserService.class,consumePipelinesMethod = "save",args = {User.class},order = 2)
public class UserFillUsernameAndEmailChannelHandler extends AbstactChannelHandler {
    @SneakyThrows
    @Override
    public boolean handler(ChannelHandlerContext chx) {
        ChannelHandlerRequest channelHandlerRequest = chx.getChannelHandlerRequest();
        System.out.println("------------------------------------步骤二:用户名以及邮箱填充【将汉语转成拼音填充】【"+channelHandlerRequest.getRequestId()+"】");
        String json = JSON.toJSONString(channelHandlerRequest.getParams());
        List<User> users = JSON.parseArray(json,User.class);
        if(CollectionUtil.isNotEmpty(users)){
            User user = users.get(0);
            String fullname = user.getFullname();
            HanyuPinyinOutputFormat hanyuPinyinOutputFormat = new HanyuPinyinOutputFormat();
            hanyuPinyinOutputFormat.setToneType(HanyuPinyinToneType.WITHOUT_TONE);
            String username = PinyinHelper.toHanYuPinyinString(fullname, hanyuPinyinOutputFormat);
            user.setUsername(username);
            user.setEmail(username + "@qq.com");
            return true;

        }



        return false;
    }
}

。。。省略剩余管道任务执行器

6、原来的步骤编排,仅需写接口即可

@FunctionalInterface
public interface UserService {

    boolean save(User user);

}

仅需这样即可进行编排

7、测试

在启动类上加上@EnabledPipeline注解。示例如下

@SpringBootApplication
@EnabledPipeline(basePackages = "com.github.lybgeek.pipeline.spring.test")
public class SpringPipelineApplication  {

    public static void main(String[] args) {
        SpringApplication.run(SpringPipelineApplication.class);
    }

}
 @Test
    public void testPipeline(){
        boolean isOk = userService.save(user);
        Assert.assertTrue(isOk);

    }

编排的效果和之前的一样

总结

本文主要实现2种不同形式的管道模式,一种基于注解,编排步骤通过注解直接写在了执行器上,通过执行器去定位业务执行方法。另外一种是业务方法里面自己组合调用执行器。通过注解这方式虽然避免了业务方法自己去编排执行器,但也存在当执行器一多的话,就需要翻每个执行器类,看他的执行器顺序,这样可能会出现执行器因为顺序问题,而达不到我们想要的组合效果。基于这个问题,我将在下篇文章,在介绍其他2种实现方式

demo链接

https://github.com/lyb-geek/springboot-learning/tree/master/springboot-pipeline

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 实现管道的一般套路
  • 改造
  • 总结
  • demo链接
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档