前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊carrera的GroovyScriptAction

聊聊carrera的GroovyScriptAction

原创
作者头像
code4it
修改2020-01-13 11:08:41
3320
修改2020-01-13 11:08:41
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下carrera的GroovyScriptAction

Action

DDMQ/carrera-consumer/src/main/java/com/xiaojukeji/carrera/cproxy/actions/Action.java

代码语言:javascript
复制
public interface Action {
    enum Status {
        FAIL, CONTINUE, FINISH, ASYNCHRONIZED
    }
​
    class UnsupportedDataType extends RuntimeException {
    }
​
    default Status act(UpstreamJob job) {
        Object data = job.getData();
        if (data instanceof byte[]) {
            return act(job, (byte[]) data);
        } else if (data instanceof JSONObject) {
            return act(job, (JSONObject) data);
        } else {
            throw new UnsupportedDataType();
        }
    }
​
    default Status act(UpstreamJob job, byte[] bytes) {
        throw new UnsupportedDataType();
    }
​
    default Status act(UpstreamJob job, JSONObject jsonObject) {
        throw new UnsupportedDataType();
    }
​
    default void shutdown() {
        // DO NOTHING BY DEFAULT
    }
​
    default void logMetrics() {
        // DO NOTHING BY DEFAULT
    }
}
  • Action接口定义了Status枚举,也定义了act、shutdown、logMetrics方法

GroovyScriptAction

DDMQ/carrera-consumer/src/main/java/com/xiaojukeji/carrera/cproxy/actions/GroovyScriptAction.java

代码语言:javascript
复制
public class GroovyScriptAction implements Action {
    private final static String CARRERA_GROOVY_CONTEXT = "carreraContext";
​
    @SuppressWarnings("rawtypes")
    private final static LoadingCache<String, Class> cache = CacheBuilder
        .newBuilder()
        .expireAfterAccess(1, TimeUnit.HOURS)
        .build(new CacheLoader<String, Class>() {
            private final AtomicLong al = new AtomicLong(0);
​
            @Override
            public Class load(String key) throws Exception {
                try (GroovyClassLoader groovyLoader = new GroovyClassLoader()) {
                    GroovyCodeSource gcs = AccessController.doPrivileged((PrivilegedAction<GroovyCodeSource>) () -> new GroovyCodeSource(key, "Script" + al.getAndIncrement() + ".groovy", "/groovy/shell"));
                    Class clazz = groovyLoader.parseClass(gcs, false);
                    return clazz;
                } catch (Throwable e) {
                    LogUtils.logErrorInfo("GroovyScript_error", "[GroovyErr]", e);
                    return null;
                }
            }
​
        });
​
    @Override
    public Status act(UpstreamJob job, JSONObject jsonObject) {
        String groovyText = job.getUpstreamTopic().getGroovyScript();
        if (StringUtils.isBlank(groovyText)) {
            return Status.FINISH;
        }
​
        try {
            @SuppressWarnings("rawtypes")
            Class groovyScript = cache.get(groovyText);
            if (groovyScript == null) {
                MetricUtils.qpsAndFilterMetric(job, MetricUtils.ConsumeResult.INVALID);
                return Status.FINISH;
            }
​
            jsonObject.put(CARRERA_GROOVY_CONTEXT, new GroovyContext(job));
            Script script = InvokerHelper.createScript(groovyScript, new Binding(jsonObject));
            Object scriptRet = script.run();
            if (scriptRet instanceof Boolean) {
                if ((Boolean) scriptRet) {
                    jsonObject.remove(CARRERA_GROOVY_CONTEXT);
                    return Status.CONTINUE;
                }
            }
        } catch (MissingPropertyException e) {
            LogUtils.logErrorInfo("GroovyScript_error", "missing property exception, jsonObject:{}, job:{}, e.msg:{}",
                    JsonUtils.toJsonString(jsonObject), job.info(), e.getMessageWithoutLocationText());
        } catch (Throwable e) {
            LogUtils.logErrorInfo("GroovyScript_error", "error when running groovy script, job={}, e={}", job, e.getMessage());
        }
​
        MetricUtils.qpsAndFilterMetric(job, MetricUtils.ConsumeResult.INVALID);
        return Status.FINISH;
    }
}
  • GroovyScriptAction实现了Action接口,它使用guava的LoadingCache定义了groovy class的缓存,其CacheLoader的load方法会创建GroovyClassLoader,然后解析指定GroovyCodeSource的class;其act方法从job.getUpstreamTopic().getGroovyScript()获取groovyText,然后再根据groovyText从cache获取指定的Class,之后通过InvokerHelper.createScript(groovyScript, new Binding(jsonObject))创建Script,然后执行script.run()获取返回值

小结

GroovyScriptAction实现了Action接口,它使用guava的LoadingCache定义了groovy class的缓存,其CacheLoader的load方法会创建GroovyClassLoader,然后解析指定GroovyCodeSource的class;其act方法从job.getUpstreamTopic().getGroovyScript()获取groovyText,然后再根据groovyText从cache获取指定的Class,之后通过InvokerHelper.createScript(groovyScript, new Binding(jsonObject))创建Script,然后执行script.run()获取返回值

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Action
  • GroovyScriptAction
  • 小结
  • doc
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档