前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >datax(19):源码解读内置Transformer「建议收藏」

datax(19):源码解读内置Transformer「建议收藏」

作者头像
全栈程序员站长
发布2022-08-30 08:58:54
1.4K0
发布2022-08-30 08:58:54
举报
文章被收录于专栏:全栈程序员必看

大家好,又见面了,我是你们的朋友全栈君。

通过datax(18)已经对transformer有了初步了解,继续撸代码,看datax已经内置的5种简单类型transformer;

一、概述

目前datax内置了5种常用的transformer,分别如下

  1. 截取SubstrTransformer
  2. 填充PadTransformer
  3. 替换ReplaceTransformer
  4. 过滤FilterTransformer
  5. Groovy类型GroovyTransformer

二、SubstrTransformer

主要是对record中的column的值进行按照长度截取;

  • 参数:3个
    • 第一个参数:字段编号,对应record中第几个字段。
    • 第二个参数:字段值的开始位置。
    • 第三个参数:目标字段长度。
  • 返回: 从字符串的指定位置(包含)截取指定长度的字符串。如果开始位置非法抛出异常。如果字段为空值,直接返回(即不参与本transformer)
  • 举例:
代码语言:javascript
复制
dx_substr(1,"2","5")  column 1的value为“dataxTest”=>"taxTe"
dx_substr(1,"5","10")  column 1的value为“dataxTest”=>"Test"

里面第一个方法是给该transformer一个唯一的标识符name(后续几个transformer的子类类似,不在赘述)

代码语言:javascript
复制
 /** * 给该transform起一个唯一标识符 */
  public SubstrTransformer() { 
   
    setTransformerName("dx_substr");
  }

第二个方法是实现父类的evaluate

代码语言:javascript
复制
  /** * 参数:3个 <br> * 第一个参数:字段编号,对应record中第几个字段。 <br> * 第二个参数:字段值的开始位置。 <br> * 第三个参数:目标字段长度。 <br> * 举例: <br> * dx_substr(1,"2","5") column 1的value为“dataxTest”=>"taxTe" * dx_substr(1,"5","10") column 1的value为“dataxTest”=>"Test" * * @param record Record 行记录,UDF进行record的处理后,更新相应的record * @param paras Object transformer函数参数 * @return Record从字符串的指定位置(包含)截取指定长度的字符串。如果开始位置非法抛出异常。如果字段为空值,直接返回 * (即不参与本transformer) */
  @Override
  public Record evaluate(Record record, Object... paras) { 
   

    int columnIndex;
    int startIndex;
    int targetLen;
    try { 
   
      // 参数异常检测
      if (paras.length != 3) { 
   
        throw new RuntimeException("dx_substr paras must be 3");
      }
      //获取对应参数值
      columnIndex = (Integer) paras[0];
      startIndex = Integer.valueOf((String) paras[1]);
      targetLen = Integer.valueOf((String) paras[2]);

    } catch (Exception e) { 
   
      throw DataXException.asDataXException(TransformerErrorCode.TRANSFORMER_ILLEGAL_PARAMETER,
          "paras:" + Arrays.asList(paras).toString() + " => " + e.getMessage());
    }
    // 根据index从record中获取 column
    Column column = record.getColumn(columnIndex);
    try { 
   
      String oriValue = column.asString();
      //如果字段为空,跳过subStr处理
      if (oriValue == null) { 
   
        return record;
      }
      String newValue;
      int oriLen = oriValue.length();
      if (startIndex > oriLen) { 
   
        throw new RuntimeException(String
            .format("dx_substr startIndex(%s) out of range(%s)", startIndex, oriLen));
      }
      if (startIndex + targetLen >= oriLen) { 
   
        newValue = oriValue.substring(startIndex);
      } else { 
   
        newValue = oriValue.substring(startIndex, startIndex + targetLen);
      }
      record.setColumn(columnIndex, new StringColumn(newValue));
    } catch (Exception e) { 
   
      throw DataXException.asDataXException(TRANSFORMER_RUN_EXCEPTION, e.getMessage(), e);
    }
    return record;
  }

三、PadTransformer

  • 参数:4个
    • 第一个参数:字段编号,对应record中第几个字段。
    • 第二个参数:“l”,“r”, 指示是在头进行pad,还是尾进行pad。
    • 第三个参数:目标字段长度。
    • 第四个参数:需要pad的字符。
  • 返回: 如果源字符串长度小于目标字段长度,按照位置添加pad字符后返回。如果长于,直接截断(都截右边)。如果字段为空值,转换为空字符串进行pad,即最后的字符串全是需要pad的字符
  • 举例:
代码语言:javascript
复制
         dx_pad(1,"l","4","A"), 如果column 1 的值为 xyz=> Axyz, 值为 xyzzzzz => xyzz
         dx_pad(1,"r","4","A"), 如果column 1 的值为 xyz=> xyzA, 值为 xyzzzzz => xyzz
代码语言:javascript
复制
public PadTransformer() { 
   
    setTransformerName("dx_pad");
  }

  /** * 参数:4个 <br/> * 第一个参数:字段编号,对应record中第几个字段。 <br/> * 第二个参数:"l","r", 指示是在头进行pad,还是尾进行pad。 <br/> * 第三个参数:目标字段长度。 <br/> * 第四个参数:需要pad的字符。 <br/> * 举例: <br/> * dx_pad(1,"l","4","A"), 如果column 1 的值为 xyz=> Axyz, 值为 xyzzzzz => xyzz <br/> * dx_pad(1,"r","4","A"), 如果column 1 的值为 xyz=> xyzA, 值为 xyzzzzz => xyzz <br/> * * @param record Record 行记录,UDF进行record的处理后,更新相应的record * @param paras Object transformer函数参数 * @return Record 如果源字符串长度小于目标字段长度,按照位置添加pad字符后返回。如果长于,直接截断(都截右边)。 * 如果字段为空值,转换为空字符串进行pad,即最后的字符串全是需要pad的字符 */
  @Override
  public Record evaluate(Record record, Object... paras) { 
   

    int columnIndex;
    String padType;
    int length;
    String padString;
    try { 
   
      if (paras.length != 4) { 
   
        throw new RuntimeException("dx_pad paras must be 4");
      }
      columnIndex = (Integer) paras[0];
      padType = (String) paras[1];
      length = Integer.valueOf((String) paras[2]);
      padString = (String) paras[3];
    } catch (Exception e) { 
   
      throw DataXException.asDataXException(TransformerErrorCode.TRANSFORMER_ILLEGAL_PARAMETER,
          "paras:" + Arrays.asList(paras).toString() + " => " + e.getMessage());
    }

    Column column = record.getColumn(columnIndex);
    try { 
   
      String oriValue = column.asString();
      //如果字段为空,作为空字符串处理
      if (oriValue == null) { 
   
        oriValue = "";
      }
      String newValue;
      //"l","r", 指示是在头进行pad,还是尾进行pad
      if (!padType.equalsIgnoreCase("r") && !padType.equalsIgnoreCase("l")) { 
   
        throw new RuntimeException(String.format("dx_pad first para(%s) support l or r", padType));
      }

      if (length <= oriValue.length()) { 
   
        // 如果目标长度len 小于 真实原数数据长度oriValue.length,则使用截取,截取原来数据0到len
        newValue = oriValue.substring(0, length);
      } else { 
   
        newValue = doPad(padType, oriValue, length, padString);
      }
      record.setColumn(columnIndex, new StringColumn(newValue));
    } catch (Exception e) { 
   
      throw DataXException.asDataXException(TRANSFORMER_RUN_EXCEPTION, e.getMessage(), e);
    }
    return record;
  }

  /** * 真正实现填充的逻辑 * * @param padType String 需要填充的类型 * @param oriValue String 原始数据 * @param length int 需要填充后的长度 * @param padString String 需要填充的字符串 * @return String */
  private String doPad(String padType, String oriValue, int length, String padString) { 
   

    String finalPad = "";
    int needLen = length - oriValue.length();
    while (needLen > 0) { 
   

      if (needLen >= padString.length()) { 
   
        finalPad += padString;
        needLen -= padString.length();
      } else { 
   
        finalPad += padString.substring(0, needLen);
        needLen = 0;
      }
    }
    //"l","r", 指示是在头进行pad,还是尾进行pad
    if (padType.equalsIgnoreCase("l")) { 
   
      return finalPad + oriValue;
    } else { 
   
      return oriValue + finalPad;
    }
  }

四、ReplaceTransformer

  • 参数:4个
    • 第一个参数:字段编号,对应record中第几个字段。
    • 第二个参数:字段值的开始位置。
    • 第三个参数:需要替换的字段长度。
    • 第四个参数:需要替换的字符串。
  • 返回: 从字符串的指定位置(包含)替换指定长度的字符串。如果开始位置非法抛出异常。如果字段为空值,直接返回(即不参与本transformer)
  • 举例:
代码语言:javascript
复制
dx_replace(1,"2","4","****")  column 1的value为“dataxTest”=>"da****est"
dx_replace(1,"5","10","****")  column 1的value为“dataxTest”=>"data****"
代码语言:javascript
复制
/** * 参数:4个 * 第一个参数:字段编号,对应record中第几个字段。 * 第二个参数:字段值的开始位置。 * 第三个参数:需要替换的字段长度。 * 第四个参数:需要替换的字符串。 * 举例: * dx_replace(1,"2","4","****") column 1的value为“dataxTest”=>"da****est" * dx_replace(1,"5","10","****") column 1的value为“dataxTest”=>"data****" * * @param record Record 行记录,UDF进行record的处理后,更新相应的record * @param paras Object transformer函数参数 * @return 从字符串的指定位置(包含)替换指定长度的字符串。如果开始位置非法抛出异常。如果字段为空值,直接返回(即不参与本transformer) */
  @Override
  public Record evaluate(Record record, Object... paras) { 
   

    int columnIndex;
    int startIndex;
    int targetLen;
    String replaceStr;
    try { 
   
      if (paras.length != 4) { 
   
        throw new RuntimeException("dx_replace paras must be 4");
      }

      columnIndex = (Integer) paras[0];
      startIndex = Integer.valueOf((String) paras[1]);
      targetLen = Integer.valueOf((String) paras[2]);
      replaceStr = (String) paras[3];
    } catch (Exception e) { 
   
      throw DataXException.asDataXException(TransformerErrorCode.TRANSFORMER_ILLEGAL_PARAMETER,
          "paras:" + Arrays.asList(paras).toString() + " => " + e.getMessage());
    }
    Column column = record.getColumn(columnIndex);
    try { 
   
      String oriValue = column.asString();
      //如果字段为空,跳过replace处理
      if (oriValue == null) { 
   
        return record;
      }
      String newValue;
      int oriLen = oriValue.length();
      if (startIndex > oriLen) { 
   
        throw new RuntimeException(String
            .format("dx_replace startIndex(%s) out of range(%s)", startIndex, oriLen));
      }
      newValue = oriValue.substring(0, startIndex) + replaceStr;
      if (startIndex + targetLen < oriLen) { 
   
        newValue = newValue + oriValue.substring(startIndex + targetLen);
      }
      record.setColumn(columnIndex, new StringColumn(newValue));
    } catch (Exception e) { 
   
      throw DataXException
          .asDataXException(TransformerErrorCode.TRANSFORMER_RUN_EXCEPTION, e.getMessage(), e);
    }
    return record;
  }

五、FilterTransformer

  • 参数:
    • 第一个参数:字段编号,对应record中第几个字段。
    • 第二个参数:运算符,支持一下运算符:like, not like, >, =, <, >=, !=, <=
    • 第三个参数:正则表达式(java正则表达式)、值。
  • 返回:
    • 如果匹配正则表达式,返回Null,表示过滤该行。不匹配表达式时,表示保留该行。(注意是该行)。对于>=<都是对字段直接compare的结果.
    • like , not like是将字段转换成String,然后和目标正则表达式进行全匹配。
    • , =, <, >=, !=, <= 对于DoubleColumn比较double值,对于LongColumn和DateColumn比较long值,其他StringColumn,BooleanColumn以及ByteColumn均比较的是StringColumn值。
    • 如果目标column为空(null),对于 = null的过滤条件,将满足条件,被过滤。!=null的过滤条件,null不满足过滤条件,不被过滤。 like,字段为null不满足条件,不被过滤,和not like,字段为null满足条件,被过滤。
  • 举例:
代码语言:javascript
复制
dx_filter(1,"like","dataTest")  
dx_filter(1,">=","10")  
代码语言:javascript
复制
/** * 过滤transformer类 * Created by liqiang on 16/3/4. */
public class FilterTransformer extends Transformer { 
   

  public FilterTransformer() { 
   
    setTransformerName("dx_filter");
  }

  /** * 参数: <br> * 第一个参数:字段编号,对应record中第几个字段。 <br> * 第二个参数:运算符,支持一下运算符:like, not like, >, =, <, >=, !=, <= <br> * 第三个参数:正则表达式(java正则表达式)、值。 <br> * 返回: * 如果匹配正则表达式,返回Null,表示过滤该行。不匹配表达式时,表示保留该行。(注意是该行)。对于>=<都是对字段直接compare的结果. <br> * like , not like是将字段转换成String,然后和目标正则表达式进行全匹配。 <br> * , =, <, >=, !=, <= 对于DoubleColumn比较double值,对于LongColumn和DateColumn比较long值, <br> * 其他StringColumn,BooleanColumn以及ByteColumn均比较的是StringColumn值。 <br> * <p> * 如果目标column为空(null),对于 = null的过滤条件,将满足条件,被过滤。!=null的过滤条件,null不满足过滤条件,不被过滤。 <br> * like,字段为null不满足条件,不被过滤,和not like,字段为null满足条件,被过滤。 <br> * 举例: <br> * dx_filter(1,"like","dataTest") <br> * dx_filter(1,">=","10") <br> * * @param record Record 行记录,UDF进行record的处理后,更新相应的record * @param paras Object transformer函数参数 * @return Record */
  @Override
  public Record evaluate(Record record, Object... paras) { 
   
    int columnIndex;
    String code;
    String value;
    try { 
   
      if (paras.length != 3) { 
   
        throw new RuntimeException("dx_filter paras must be 3");
      }
      columnIndex = (Integer) paras[0];
      code = (String) paras[1];
      value = (String) paras[2];
      if (StringUtils.isEmpty(value)) { 
   
        throw new RuntimeException("dx_filter para 2 can't be null");
      }
    } catch (Exception e) { 
   
      throw DataXException.asDataXException(TransformerErrorCode.TRANSFORMER_ILLEGAL_PARAMETER,
          "paras:" + Arrays.asList(paras).toString() + " => " + e.getMessage());
    }
    Column column = record.getColumn(columnIndex);
    try { 
   
      if ("like".equalsIgnoreCase(code)) { 
   
        return doLike(record, value, column);
      } else if ("not like".equalsIgnoreCase(code)) { 
   
        return doNotLike(record, value, column);
      } else if (">".equalsIgnoreCase(code)) { 
   
        return doGreat(record, value, column, false);
      } else if ("<".equalsIgnoreCase(code)) { 
   
        return doLess(record, value, column, false);
      } else if ("=".equalsIgnoreCase(code) || "==".equalsIgnoreCase(code)) { 
   
        return doEqual(record, value, column);
      } else if ("!=".equalsIgnoreCase(code)) { 
   
        return doNotEqual(record, value, column);
      } else if (">=".equalsIgnoreCase(code)) { 
   
        return doGreat(record, value, column, true);
      } else if ("<=".equalsIgnoreCase(code)) { 
   
        return doLess(record, value, column, true);
      } else { 
   
        throw new RuntimeException("dx_filter can't support code:" + code);
      }
    } catch (Exception e) { 
   
      throw DataXException.asDataXException(TRANSFORMER_RUN_EXCEPTION, e.getMessage(), e);
    }
  }


  private Record doGreat(Record record, String value, Column column, boolean hasEqual) { 
   

    //如果字段为空,直接不参与比较。即空也属于无穷小
    if (column.getRawData() == null) { 
   
      return record;
    }
    if (column instanceof DoubleColumn) { 
   
      Double ori = column.asDouble();
      double val = Double.parseDouble(value);
      if (hasEqual) { 
   
        return ori >= val ? null : record;
      } else { 
   
        return ori > val ? null : record;
      }
    } else if (column instanceof LongColumn || column instanceof DateColumn) { 
   
      Long ori = column.asLong();
      long val = Long.parseLong(value);
      if (hasEqual) { 
   
        return ori >= val ? null : record;
      } else { 
   
        return ori > val ? null : record;
      }
    } else if (column instanceof StringColumn || column instanceof BytesColumn
        || column instanceof BoolColumn) { 
   
      String ori = column.asString();
      if (hasEqual) { 
   
        return ori.compareTo(value) >= 0 ? null : record;
      } else { 
   
        return ori.compareTo(value) > 0 ? null : record;
      }
    } else { 
   
      throw new RuntimeException(
          ">=,> can't support this columnType:" + column.getClass().getSimpleName());
    }
  }

  private Record doLess(Record record, String value, Column col, boolean hasEqual) { 
   

    //如果字段为空,直接不参与比较。即空也属于无穷大
    if (col.getRawData() == null) { 
   
      return record;
    }
    if (col instanceof DoubleColumn) { 
   
      Double ori = col.asDouble();
      double val = Double.parseDouble(value);
      if (hasEqual) { 
   
        return ori <= val ? null : record;
      } else { 
   
        return ori < val ? null : record;
      }
    } else if (col instanceof LongColumn || col instanceof DateColumn) { 
   
      Long ori = col.asLong();
      long val = Long.parseLong(value);
      if (hasEqual) { 
   
        return ori <= val ? null : record;
      } else { 
   
        return ori < val ? null : record;
      }
    } else if (col instanceof StringColumn || col instanceof BytesColumn
        || col instanceof BoolColumn) { 
   
      String ori = col.asString();
      if (hasEqual) { 
   
        return ori.compareTo(value) <= 0 ? null : record;
      } else { 
   
        return ori.compareTo(value) < 0 ? null : record;
      }
    } else { 
   
      throw new RuntimeException(
          "<=,< can't support this columnType:" + col.getClass().getSimpleName());
    }
  }

  /** * DateColumn将比较long值,StringColumn,ByteColumn以及BooleanColumn比较其String值 * * @param record Record * @param value String * @param col Column * @return Record 如果相等,则过滤。 */
  private Record doEqual(Record record, String value, Column col) { 
   
    //如果字段为空,只比较目标字段为"null",否则null字段均不过滤
    if (col.getRawData() == null) { 
   
      return "null".equalsIgnoreCase(value) ? null : record;
    }
    if (col instanceof DoubleColumn) { 
   
      Double ori = col.asDouble();
      double val = Double.parseDouble(value);
      return ori == val ? null : record;
    } else if (col instanceof LongColumn || col instanceof DateColumn) { 
   
      Long ori = col.asLong();
      long val = Long.parseLong(value);
      return ori == val ? null : record;
    } else if (col instanceof StringColumn || col instanceof BytesColumn
        || col instanceof BoolColumn) { 
   
      String ori = col.asString();
      return ori.compareTo(value) == 0 ? null : record;
    } else { 
   
      throw new RuntimeException("can't support this columnType:" + col.getClass().getSimpleName());
    }
  }

  /** * DateColumn将比较long值,StringColumn,ByteColumn以及BooleanColumn比较其String值 * * @param record Record * @param value String * @param col Column * @return Record 如果不相等,则过滤。 */
  private Record doNotEqual(Record record, String value, Column col) { 
   
    //如果字段为空,只比较目标字段为"null", 否则null字段均过滤。
    if (col.getRawData() == null) { 
   
      return "null".equalsIgnoreCase(value) ? record : null;
    }
    if (col instanceof DoubleColumn) { 
   
      Double ori = col.asDouble();
      double val = Double.parseDouble(value);
      return ori != val ? null : record;
    } else if (col instanceof LongColumn || col instanceof DateColumn) { 
   
      Long ori = col.asLong();
      long val = Long.parseLong(value);
      return ori != val ? null : record;
    } else if (col instanceof StringColumn || col instanceof BytesColumn
        || col instanceof BoolColumn) { 
   
      String ori = col.asString();
      return ori.compareTo(value) != 0 ? null : record;
    } else { 
   
      throw new RuntimeException("can't support this columnType:" + col.getClass().getSimpleName());
    }
  }

  private Record doLike(Record record, String value, Column column) { 
   
    String oriValue = column.asString();
    return oriValue != null && oriValue.matches(value) ? null : record;
  }

  private Record doNotLike(Record record, String value, Column column) { 
   
    String oriValue = column.asString();
    return oriValue != null && oriValue.matches(value) ? record : null;
  }
}

六、GroovyTransformer

首先需要知道groovy是什么:运行在jvm上,吸收Python、Ruby和Smalltalk等特性的一种脚本语言!!!可以和java代码库相互操作; 一句话概括就是:用户可以写一些groovy代码,使用GroovyTransformer加载运行实现transform的作用!!!

  • 参数。
    • 第一个参数: groovy code
    • 第二个参数(列表或者为空):extraPackage
  • 备注:
    • dx_groovy只能调用一次。不能多次调用。
    • groovy code中支持java.lang, java.util的包,可直接引用的对象有record,以及element下的各种column(BoolColumn.class,BytesColumn.class,DateColumn.class,DoubleColumn.class,LongColumn.class,StringColumn.class)。不支持其他包,如果用户有需要用到其他包,可设置extraPackage,注意extraPackage不支持第三方jar包。
    • groovy code中,返回更新过的Record(比如record.setColumn(columnIndex, new StringColumn(newValue));),或者null。返回null表示过滤此行。
    • 用户可以直接调用静态的Util方式(GroovyTransformerStaticUtil),目前GroovyTransformerStaticUtil的方法列表 (按需补充):
  • 举例:
代码语言:javascript
复制
groovy 实现的subStr:
        String code = "Column column = record.getColumn(1);\n" +
                " String oriValue = column.asString();\n" +
                " String newValue = oriValue.substring(0, 3);\n" +
                " record.setColumn(1, new StringColumn(newValue));\n" +
                " return record;";
        dx_groovy(record);
代码语言:javascript
复制
groovy 实现的Replace
String code2 = "Column column = record.getColumn(1);\n" +
                " String oriValue = column.asString();\n" +
                " String newValue = \"****\" + oriValue.substring(3, oriValue.length());\n" +
                " record.setColumn(1, new StringColumn(newValue));\n" +
                " return record;";
代码语言:javascript
复制
groovy 实现的Pad
String code3 = "Column column = record.getColumn(1);\n" +
                " String oriValue = column.asString();\n" +
                " String padString = \"12345\";\n" +
                " String finalPad = \"\";\n" +
                " int NeedLength = 8 - oriValue.length();\n" +
                "        while (NeedLength > 0) {\n" +
                "\n" +
                "            if (NeedLength >= padString.length()) {\n" +
                "                finalPad += padString;\n" +
                "                NeedLength -= padString.length();\n" +
                "            } else {\n" +
                "                finalPad += padString.substring(0, NeedLength);\n" +
                "                NeedLength = 0;\n" +
                "            }\n" +
                "        }\n" +
                " String newValue= finalPad + oriValue;\n" +
                " record.setColumn(1, new StringColumn(newValue));\n" +
                " return record;";
代码语言:javascript
复制
/** * Groovy类的transformer * Created by liqiang on 16/3/4. */
public class GroovyTransformer extends Transformer { 
   

  public GroovyTransformer() { 
   
    setTransformerName("dx_groovy");
  }

  private Transformer groovyTransformer;

  /** * 参数 <br> * 第一个参数: groovy code <br> * 第二个参数(列表或者为空):extraPackage <br> * 备注: <br> * dx_groovy只能调用一次。不能多次调用。 <br> * groovy code中支持java.lang, java.util的包,可直接引用的对象有record,以及element下的各种 <br> * column(BoolColumn.class,BytesColumn.class,DateColumn.class,DoubleColumn.class,LongColumn.class, * <br> * StringColumn.class)。不支持其他包,如果用户有需要用到其他包,可设置extraPackage,注意extraPackage不支持第三方jar包。 <br> * groovy code中,返回更新过的Record(比如record.setColumn(columnIndex, new StringColumn(newValue));), <br> * 或者null。返回null表示过滤此行。 <br> * 用户可以直接调用静态的Util方式(GroovyTransformerStaticUtil),目前GroovyTransformerStaticUtil的方法列表 (按需补充): <br> * * @param record Record 行记录,UDF进行record的处理后,更新相应的record * @param paras Object transformer函数参数 * @return Record */
  @Override
  public Record evaluate(Record record, Object... paras) { 
   
    if (groovyTransformer == null) { 
   
      //全局唯一
      if (paras.length < 1 || paras.length > 2) { 
   
        throw DataXException.asDataXException(TransformerErrorCode.TRANSFORMER_ILLEGAL_PARAMETER,
            "dx_groovy paras must be 1 or 2. now paras is: " + Arrays.asList(paras).toString());
      }
      synchronized (this) { 
   
        if (groovyTransformer == null) { 
   
          String code = (String) paras[0];
          @SuppressWarnings("unchecked")
          List<String> extraPackage = paras.length == 2 ? (List<String>) paras[1] : null;
          initGroovyTransformer(code, extraPackage);
        }
      }
    }
    return this.groovyTransformer.evaluate(record);
  }

  /** * 初始化 GroovyTransformer。<br> * 1 根据code和包列表,构造出完整的groovy代码段。<br> * 2 反射加载该groovy。<br> * 3 将2反射构造出的groovy对象强制类型转为,最后赋给groovyTransformer(Transformer类型)。<br> * * @param code String Groovy代码片段 * @param extraPackage List<String> 额外的import的包 */
  private void initGroovyTransformer(String code, List<String> extraPackage) { 
   
    GroovyClassLoader loader = new GroovyClassLoader(GroovyTransformer.class.getClassLoader());
    String groovyRule = getGroovyRule(code, extraPackage);
    Class groovyClass;
    try { 
   
      groovyClass = loader.parseClass(groovyRule);
    } catch (CompilationFailedException cfe) { 
   
      throw DataXException.asDataXException(TRANSFORMER_GROOVY_INIT_EXCEPTION, cfe);
    }
    try { 
   
      Object t = groovyClass.newInstance();
      if (!(t instanceof Transformer)) { 
   
        throw DataXException
            .asDataXException(TRANSFORMER_GROOVY_INIT_EXCEPTION, "datax bug! contact askdatax");
      }
      this.groovyTransformer = (Transformer) t;
    } catch (Throwable ex) { 
   
      throw DataXException.asDataXException(TRANSFORMER_GROOVY_INIT_EXCEPTION, ex);
    }
  }


  /** * 根据code 和 引用的包,构建出groovy代码片段 * * @param code * @param extraPackagesStrList * @return */
  private String getGroovyRule(String code, List<String> extraPackagesStrList) { 
   
    StringBuffer sb = new StringBuffer();
    if (extraPackagesStrList != null) { 
   
      for (String extraPackagesStr : extraPackagesStrList) { 
   
        if (StringUtils.isNotEmpty(extraPackagesStr)) { 
   
          sb.append(extraPackagesStr);
        }
      }
    }
    sb.append(
        "import static com.alibaba.datax.core.transport.transformer.GroovyTransformerStaticUtil.*;");
    sb.append("import com.alibaba.datax.common.element.*;");
    sb.append("import com.alibaba.datax.common.exception.DataXException;");
    sb.append("import com.alibaba.datax.transformer.Transformer;");
    sb.append("import java.util.*;");
    sb.append("public class RULE extends Transformer").append("{");
    sb.append("public Record evaluate(Record record, Object... paras) {");
    sb.append(code);
    sb.append("}}");
    return sb.toString();
  }
}

注:

  1. 对源码进行略微改动,主要修改为 1 阿里代码规约扫描出来的,2 clean code;
  2. 所有代码都已经上传到github(master分支和dev),可以免费白嫖

发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/145297.html原文链接:https://javaforall.cn

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022年5月1,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、概述
  • 二、SubstrTransformer
  • 三、PadTransformer
  • 四、ReplaceTransformer
  • 五、FilterTransformer
  • 六、GroovyTransformer
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档