前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Hive日志解析

Hive日志解析

原创
作者头像
用户7647874
修改2024-04-03 15:56:40
850
修改2024-04-03 15:56:40

目的

  • 监控hive表数据量变化,同时关注表数据写入时间,对hive sql进行审计等

方案

  1. 使用Hive Hook拦截hive执行过程,并获取相关信息,但是hive表数据量无法从信息中获取;
  2. 使用hive count查看表信息,需要结合方案1获取hivesql和查询开始时间、结束时间等;
  3. 从hive stats中获取,前提是开启了表的stats,且无法获取执行sql,需要结合方案1;
  4. 从hive日志中获取,hive日志中有查询开始时间、查询queryId、查询txnid、查询写入数据量、完成时间等。

实现

查看hiveserver2日志发现一些有用信息:

  • sql通过hiveserver2提交后会先进行编译:
代码语言:bash
复制
2024-04-03 08:25:24,858 INFO  org.apache.hive.service.cli.operation.Operation: [b01735e8-83ad-4696-9002-10aa1775842f HiveServer2-Handler-Pool: Thread-1206014]: [opType=EXECUTE_STATEMENT, queryId=hive_20240403082524_4d7bbb9a-fc66-41f0-beaf-69256282a6d6, startTime=1712103924857, sessionId=b01735e8-83ad-4696-9002-10aa1775842f, createTime=1712103924220, userName=srvtetl, ipAddress=101.110.10.01]
2024-04-03 08:25:24,859 INFO  org.apache.hadoop.hive.ql.Driver: [b01735e8-83ad-4696-9002-10aa1775842f HiveServer2-Handler-Pool: Thread-1206014]: Compiling command(queryId=hive_20240403082524_4d7bbb9a-fc66-41f0-beaf-69256282a6d6): insert overwrite into table XXXX select * from YYYYY
2024-04-03 08:26:06,976 INFO  org.apache.hadoop.hive.ql.Driver: [HiveServer2-Background-Pool: Thread-1210942]: Completed executing command(queryId=hive_20240403082524_4d7bbb9a-fc66-41f0-beaf-69256282a6d6); Time taken: 36.834 seconds

日志中有sql提交用户:userName,还有提交主机:ipAddress,提交sql:insert overwrite into table XXXX select * from YYYYY,编译的时间等信息;

  • 然后开始执行:
代码语言:bash
复制
2024-04-03 08:25:30,129 INFO  org.apache.hadoop.hive.ql.lockmgr.DbTxnManager: [HiveServer2-Background-Pool: Thread-1210942]: Setting lock request transaction to txnid:20906075 for queryId=hive_20240403082524_4d7bbb9a-fc66-41f0-beaf-69256282a6d6
2024-04-03 08:25:30,130 INFO  org.apache.hadoop.hive.ql.lockmgr.DbLockManager: [HiveServer2-Background-Pool: Thread-1210942]: Requesting: queryId=hive_20240403082524_4d7bbb9a-fc66-41f0-beaf-69256282a6d6 LockRequest(component:[LockComponent(type:SHARED_READ, level:DB, dbname:dws, operationType:NO_TXN, isDynamicPartitionWrite:false)], txnid:20906075
2024-04-03 08:25:30,142 INFO  org.apache.hadoop.hive.ql.lockmgr.DbLockManager: [HiveServer2-Background-Pool: Thread-1210942]: Response to queryId=hive_20240403082524_4d7bbb9a-fc66-41f0-beaf-69256282a6d6 LockResponse(lockid:4427837, state:ACQUIRED)
2024-04-03 08:25:30,142 INFO  org.apache.hadoop.hive.ql.Driver: [HiveServer2-Background-Pool: Thread-1210942]: Executing command(queryId=hive_20240403082524_4d7bbb9a-fc66-41f0-beaf-69256282a6d6): insert overwrite into table XXXX select * from YYYYY
  • 直接写入数据量:
代码语言:bash
复制
2024-04-03 08:26:06,855 INFO  org.apache.hadoop.hive.ql.exec.tez.TezTask: [HiveServer2-Background-Pool: Thread-1210942]: HIVE:
2024-04-03 08:26:06,855 INFO  org.apache.hadoop.hive.ql.exec.tez.TezTask: [HiveServer2-Background-Pool: Thread-1210942]:    CREATED_FILES: 1
2024-04-03 08:26:06,855 INFO  org.apache.hadoop.hive.ql.exec.tez.TezTask: [HiveServer2-Background-Pool: Thread-1210942]:    DESERIALIZE_ERRORS: 0
2024-04-03 08:26:06,855 INFO  org.apache.hadoop.hive.ql.exec.tez.TezTask: [HiveServer2-Background-Pool: Thread-1210942]:    RECORDS_IN_Map_1: 1323812
2024-04-03 08:26:06,855 INFO  org.apache.hadoop.hive.ql.exec.tez.TezTask: [HiveServer2-Background-Pool: Thread-1210942]:    RECORDS_IN_Map_5: 64981
2024-04-03 08:26:06,855 INFO  org.apache.hadoop.hive.ql.exec.tez.TezTask: [HiveServer2-Background-Pool: Thread-1210942]:    RECORDS_OUT_1_dws.XXXX : 1
  • 查询结束:
代码语言:bash
复制
2024-04-03 08:26:06,976 INFO  org.apache.hadoop.hive.ql.Driver: [HiveServer2-Background-Pool: Thread-1210942]: Completed executing command(queryId=hive_20240403082524_4d7bbb9a-fc66-41f0-beaf-69256282a6d6); Time taken: 36.834 seconds
2024-04-03 08:26:06,977 INFO  org.apache.hadoop.hive.ql.Driver: [HiveServer2-Background-Pool: Thread-1210942]: OK
2024-04-03 08:26:06,977 INFO  org.apache.hadoop.hive.ql.lockmgr.DbTxnManager: [HiveServer2-Background-Pool: Thread-1210942]: Stopped heartbeat for query: hive_20240403082524_4d7bbb9a-fc66-41f0-beaf-69256282a6d6

从日志中可以清晰的看到查询开始时间(可以使用compiling时间,也可以使用Executing时间),写入的时间量(RECORDS_OUT_1_dws.XXXX),结束时间(Completed executing或者Stopped heartbeat for query)。

当然整个过程串起来需要一个关键信息,从开始查询到结束关键字就是threadid,但是compiling和后续executing不是一个线程,所以开始时间我们使用Executing时间。

剩下的工作就是写代码把这些实现出来,java或者其他语言都是很简单的。

因为我们应用的日志接了审计,所以这部分日志丢到了kafka,所以我尝试使用flink进行相关处理。

代码语言:java
复制

@Slf4j
public class LogParse4Hive {
    public static void main(String[] args) throws Exception {

        ParameterTool parameterTool = ParameterTool.fromPropertiesFile("app/conf/application.properties");
        String elkBootstrapservers = parameterTool.get("elk.bootstrap.servers");

        String bdpLogTopic = parameterTool.get("bdp.log.topic");


        String zkServer = parameterTool.get("zookeeper.quorum");
        String hbaseClientJaasFile = parameterTool.get("hbase.client.jaas.conf");

        String hbaseRegionKeytab = parameterTool.get("hbase.region.keytab.path");
        String hbaseRegionPrincipal = parameterTool.get("hbase.region.principal");

        String krb5Conf = parameterTool.get("krb5.conf.path");

        System.setProperty("java.security.auth.login.config", hbaseClientJaasFile);

        Configuration conf = new Configuration();
        conf.set("hadoop.security.authentication", "kerberos");
        UserGroupInformation.setConfiguration(conf);
        try {
            UserGroupInformation.loginUserFromKeytab(hbaseRegionPrincipal, hbaseRegionKeytab);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }


        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        EmbeddedRocksDBStateBackend embeddedRocksDBStateBackend = new EmbeddedRocksDBStateBackend();
        env.setStateBackend(embeddedRocksDBStateBackend);  // 设置 EmbeddedRocksDBStateBackend 作为状态后端

        env.enableCheckpointing(600000, CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointStorage("hdfs://nameservice1/tianyan/ckpt");

//        env.getCheckpointConfig().setCheckpointStorage("file:/c:/ckpt");

        // 开启  task级别故障自动 failover
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10000));

        StreamTableEnvironment tenv = StreamTableEnvironment.create(env);


        KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
                .setBootstrapServers(elkBootstrapservers)
                .setTopics(bdpLogTopic)
                .setGroupId("my-test-group")
                .setValueOnlyDeserializer(new SimpleStringSchema())
//                .setStartingOffsets(OffsetsInitializer.latest()) //设置读取
                .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
                .build();


        DataStreamSource<String> ds = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "app-bdp-log");
        /**
         * {"@timestamp":"2024-03-14T03:29:35.742Z",
         * "@metadata":{"beat":"filebeat","type":"_doc","version":"7.17.0"},
         * "ecs":{"version":"1.12.0"},
         * "host":{"name":"bdsp001"},
         * "log":{"offset":193142526,"file":{"path":"/var/log/hive/hadoop-cmf-hive_on_tez3-HIVESERVER2-bdsp001.log.out"}},
         * "log_topic":"app-bdsp",
         * "type":"java",
         * "app":"bdsp",
         * "input":{"type":"log"},
         * "agent":{"type":"filebeat","version":"7.17.0","hostname":"bdsp001","ephemeral_id":"d921126a-b7c8-41bd-947b-731b76a5dd2b","id":"e9881d5c-7765-4704-8945-61ebc8b06e12","name":"bdsp001"},
         * "message":"2024-03-14 11:29:27,036 INFO  org.apache.hadoop.hive.ql.txn.compactor.Worker: [bdsp001-59]: Worker thread finished one loop.","filebeat_tag":"custom","env":"uat"
         * }
         */
        String regex = "(\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2},\\d{3}) (\\w+)  (.+?).*?(Thread-\\d+)\\]: (.+)";
        Pattern pattern = Pattern.compile(regex, Pattern.DOTALL | Pattern.CASE_INSENSITIVE);

        List<String> keywords = Arrays.asList("create", "insert", "records_out_", "stopped heartbeat for query");
        SingleOutputStreamOperator<String> neededLogDS = ds.filter(msg -> {
            boolean isHS2log = msg.contains("HIVESERVER2");

            String lowerCaseMsg = msg.toLowerCase();
            boolean isNeededMsg = keywords.stream().anyMatch(keyword -> lowerCaseMsg.contains(keyword));

            return isHS2log && isNeededMsg;
        });

        SingleOutputStreamOperator<Tuple4<String, String, String, String>> hiveserver2Log = neededLogDS.map(json -> {
            KafkaLog kafkaLog = JSON.parseObject(json, KafkaLog.class);
            String logHost = kafkaLog.getHost().getName();
            String logPath = kafkaLog.getLog().getFile().getPath();
            String logMessage = kafkaLog.getMessage();
            String logMessageLower = logMessage.toLowerCase();
            Matcher matcher = pattern.matcher(logMessage);
            if (matcher.find()) {
                String eventTime = matcher.group(1).trim();
                String logLevel = matcher.group(2).trim();
                String threadId = matcher.group(4).trim();
                String message = matcher.group(5).trim();
                return Tuple4.of(eventTime, logHost, threadId, message);
            }
            return null;
        })
                // .returns(new TypeHint<Tuple2<String, String>>() {});   // 通过 TypeHint 传达返回数据类型
                // .returns(TypeInformation.of(new TypeHint<Tuple2<String, String>>() {}));  // 更通用的,是传入TypeInformation,上面的TypeHint也是封装了TypeInformation
                .returns(Types.TUPLE(Types.STRING, Types.STRING, Types.STRING, Types.STRING))  // 利用工具类Types的各种静态方法,来生成TypeInformation
                .filter(Objects::nonNull);

        //日志收集存在乱序

        SingleOutputStreamOperator<Tuple4<String, String, String, String>> logsWithWaterMark = hiveserver2Log.assignTimestampsAndWatermarks(
                WatermarkStrategy.<Tuple4<String, String, String, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
                        .withTimestampAssigner((SerializableTimestampAssigner<Tuple4<String, String, String, String>>) (element, recordTimestamp) -> {
                            String eventTime = element.f0;
                            DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss,SSS");
                            LocalDateTime timestamp = LocalDateTime.parse(eventTime, formatter);

                            // 转换为默认时区的时间戳
                            return timestamp.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();

                        })

        );


        SingleOutputStreamOperator<HiveLog> processedHiveLog = logsWithWaterMark
                .keyBy(tp -> tp.f2) //按线程分组
                .window(TumblingEventTimeWindows.of(Time.seconds(15)))// 事件时间滚动窗口,窗口长度为10
                .allowedLateness(Time.seconds(10))
                .process(new MessageProcessFunction());

//        processedHiveLog.print();

        tenv.createTemporaryView("hive_log", processedHiveLog);
//        tenv.executeSql("select * from hive_log").print();

        tenv.executeSql(
                " CREATE TABLE flink_hive_log (                                                                                                                 " +
                        "  rk string,                                                                                                                              " +
                        "  cf ROW<startTime string,endTime string,threadId string,queryId string,readTable string,writeTable string,writeRows string,`sql` string>, " +
                        "  PRIMARY KEY (rk) NOT ENFORCED                                                                                                           " +
                        " ) WITH (                                                                                                                                 " +
                        "  'connector' = 'hbase-2.2',                                                                                                              " +
                        "  'table-name' = 'hive_log_monitor',                                                                                                      " +
                        "  'zookeeper.quorum' = '" + zkServer + "',                                                                                                " +
                        "  'properties.hbase.security.authentication' = 'kerberos',                                                                                " +
                        "  'zookeeper.znode.parent' = '/hbase',                                                                                                    " +
                        "  'properties.hbase.regionserver.kerberos.principal' = 'hbase/_HOST@bigdata.COM'                                                           " +
                        " )                                                                                                                                        "
        );

        tenv.createTemporarySystemFunction("tm_proc", TimeStr.class);

        tenv.executeSql(
                " insert into flink_hive_log                                                       " +
                        " select                                                                        " +
                        " tm_proc(writeTable,endTime),                                                  " +
                        " ROW(startTime,endTime,threadId,queryId,readTable,writeTable,writeRows,`sql`)  " +
                        " from hive_log                                                                 "
        );

        env.execute();


    }

    public static class TimeStr extends ScalarFunction {

        public String eval(String table, String timeStr) {
            if (StringUtils.isEmpty(timeStr)) {
                timeStr = getCurrentTime();
            }
            if (StringUtils.isEmpty(table)) {
                table = "NoneTable";
            }
            String newTimeStr = timeStr.replace("-", "").replace(":", "").replace(" ", "");

            return table + newTimeStr;

        }

        public String getCurrentTime() {
            // 获取当前日期时间
            LocalDateTime now = LocalDateTime.now();

            // 定义日期时间格式化器
            DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");

            // 格式化当前日期时间
            return now.format(formatter);
        }
    }
}

处理流程:

代码语言:java
复制
@Slf4j
public class MessageProcessFunction extends ProcessWindowFunction<Tuple4<String, String, String, String>, HiveLog, String, TimeWindow> {

    MapState<String, HiveLog> queryStates;
    LongCounter queryCounter;

    @Override
    public void open(Configuration parameters) throws Exception {

        //设置状态过期时间,防止日志异常,导致状态过大
        MapStateDescriptor<String, HiveLog> mapStateDescriptor = new MapStateDescriptor<>("queryinfo", String.class, HiveLog.class);
        StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.of(60, TimeUnit.MINUTES))
                .updateTtlOnReadAndWrite() //  读、写,都导致该条数据的ttl计时重置
//                .setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp) // 允许返回已过期但尚未被清理的数据
//                .useProcessingTime()  // ttl计时的时间语义:设置为处理时间
//                .cleanupIncrementally(1, false)  // 增量清理(每当一条状态数据被访问,则会检查这条状态数据的ttl是否超时,是就删除)
//                .cleanupInRocksdbCompactFilter(1000) // 在rocksdb的compact机制中添加过期数据过滤器,以在compact过程中清理掉过期状态数据
                .build();


//        mapStateDescriptor.enableTimeToLive(ttlConfig);

        queryStates = getRuntimeContext().getMapState(mapStateDescriptor);
        queryCounter = getRuntimeContext().getLongCounter("ec.query.cnt");

    }

    @Override
    public void process(String key, Context context, Iterable<Tuple4<String, String, String, String>> elements, Collector<HiveLog> collector) throws Exception {

        for (Tuple4<String, String, String, String> element : elements) {
            String eventTime = element.f0;
            String hostName = element.f1;
            String threadId = element.f2;
            String message = element.f3;

            // 正则表达式,同时匹配查询时间,执行线程,queryId,查询sql。
            String upperCaseMessage = message.toUpperCase();
            if (message.contains("Executing command") && (upperCaseMessage.contains("INSERT") | upperCaseMessage.contains("CREATE"))) {
                log.debug("---------> queryStart--->{}", element);
                queryStart(eventTime, threadId, hostName, message);
            }


            //正则表达式,匹配线程对应的写出数据条数:
            String patternString = "" + "\\d+";
            if (message.matches(".*" + patternString + ".*")) {
                log.debug("---------> queryCount--->{}", element);
                queryCnt(threadId, message);
            }


            //正则表达式,匹配线程结束标识,删除状态。
            if (message.contains("Stopped heartbeat for query")) {
                log.debug("---------> queryEnd--->{}", element);
                queryEnd(eventTime, threadId, message);
            }
            HiveLog hiveLog = queryStates.get(threadId);
            if (hiveLog != null) {
                if (hiveLog.getStartTime() != null && hiveLog.getWriteRows() != null && hiveLog.getEndTime() != null) {
                    //查询结束删除状态中存储的threadId
                    queryStates.remove(threadId);
                    queryCounter.add(-1);
                    log.info("delete hivelog {}", threadId);
                    collector.collect(hiveLog);
                }
            }

        }
    }


    private void queryEnd(String eventTime, String threadId, String message) throws Exception {
        String regex = ".*Stopped heartbeat for query: ([a-z0-9_-]+)";
        Pattern pattern = Pattern.compile(regex, Pattern.DOTALL | Pattern.CASE_INSENSITIVE);
        Matcher matcher = pattern.matcher(message);

        if (matcher.find()) {
            String queryId = matcher.group(1).trim(); // queryId

            HiveLog hiveLog = queryStates.get(threadId);
            if (hiveLog == null) {
                hiveLog = new HiveLog();
                queryCounter.add(1L);
            } else {
                String queryIdStart = hiveLog.getQueryId();
                if (!queryId.equals(queryIdStart)) {
                    log.error("ThreadId:{},QueryId from start log is:{},but from stop log:{}", hiveLog.getThreadId(), queryIdStart, queryId);
                }
            }

            hiveLog.setEndTime(eventTime);

            queryStates.put(threadId, hiveLog);
            log.debug("queryEnd2: hiveLog {},--->{}", hiveLog.getThreadId(), hiveLog);

        }
    }

    private void queryCnt(String threadId, String message) throws Exception {

        String regex = ".*RECORDS_OUT_\\d_([\\w.]+):\\s*(\\d+)";
        Pattern pattern = Pattern.compile(regex, Pattern.DOTALL | Pattern.CASE_INSENSITIVE);
        Matcher matcher = pattern.matcher(message);

        if (matcher.find()) {
            String outputTableName = matcher.group(1).trim().toLowerCase(); // 表名
            String writeRows = matcher.group(2).trim(); // 数值

            HiveLog hiveLog = queryStates.get(threadId);
            if (hiveLog == null) {
                hiveLog = new HiveLog();
                queryCounter.add(1L);
            } else {
                String writeTable = hiveLog.getWriteTable();
                if (!outputTableName.equals(writeTable)) {
                    log.error("ThreadId:{},Output table from sql parse is:{},but from parse log:{}", hiveLog.getThreadId(), writeTable, outputTableName);
                }
            }

            hiveLog.setWriteTable(outputTableName);
            hiveLog.setWriteRows(writeRows);

            queryStates.put(threadId, hiveLog);

            log.debug("queryCnt2: hiveLog {}", hiveLog);

        } else {
            log.debug("No RECORDS_OUT Info Found");
        }
    }


    private void queryStart(String eventTime, String threadId, String hostName, String message) throws Exception {
        String regex = ".*\\(queryId=(.*?)\\):\\s*(.*)";

        Pattern pattern = Pattern.compile(regex, Pattern.DOTALL | Pattern.CASE_INSENSITIVE);
        Matcher matcher = pattern.matcher(message);

        if (matcher.find()) {
            String queryId = matcher.group(1).trim();     //queryID
            String sql = matcher.group(2).trim();  // 查询sql
            RWTable readAndWriteTable = HiveSQLParseUtil.getReadAndWriteTable(sql); //解析成读写表

            String readTables = String.join(",", readAndWriteTable.getReadTables()).trim().toLowerCase();
            String writeTable = String.join(",", readAndWriteTable.getWriteTables()).trim().toLowerCase();

            HiveLog hiveLog = queryStates.get(threadId);
            if (hiveLog == null) {
                hiveLog = new HiveLog();
                queryCounter.add(1L);
            }
            hiveLog.setStartTime(eventTime);
            hiveLog.setHost(hostName);
            hiveLog.setThreadId(threadId);
            hiveLog.setQueryId(queryId);
            hiveLog.setReadTable(readTables);
            hiveLog.setReadTable(writeTable);
            hiveLog.setSql(sql);

            queryStates.put(threadId, hiveLog);
            log.debug("queryStart2: hiveLog {}", hiveLog);

        } else {
            log.debug("No Insert or Create Query Info found");
        }
    }


}

使用的几个bean:

代码语言:java
复制
@Data
@Setter
@Getter
public class KafkaLog {
    @JSONField(name = "@timestamp")
    private String timestamp;

    @JSONField(name = "@metadata")
    private Metadata metadata;

    private Ecs ecs;
    private Host host;
    private Log log;
    private String logTopic;
    private String type;
    private String app;
    private Input input;
    private Agent agent;
    private String message;
    private String filebeatTag;
    private String env;

    @Data
    public static class Metadata {
        private String beat;
        private String type;
        private String version;
    }

    @Data
    public static class Ecs {
        private String version;
    }

    @Data
    public static class Host {
        private String name;
    }

    @Data
    public static class Log {
        private Long offset;
        private File file;

        @Data
        public static class File {
            private String path;
        }
    }

    // 定义Input类
    @Data
    public static class Input {
        private String type;
    }

    // 定义Agent类
    @Data
    public static class Agent {
        private String type;
        private String version;
        private String hostname;
        private String ephemeralId;
        private String id;
        private String name;
    }


}

@Data
@Setter
@Getter
@NoArgsConstructor
@AllArgsConstructor
public class HiveLog {

   private volatile String host;
   private volatile String startTime;
   private volatile String endTime;
   private volatile String threadId;
   private volatile String queryId;
//   private volatile String txnid;
   private volatile String readTable;
   private volatile String writeTable;
   private volatile String writeRows;
   private volatile String sql;
}
@Data
@Setter
@Getter
@NoArgsConstructor
@AllArgsConstructor
public class RWTable {

    private Set<String> readTables;
    private Set<String> writeTables;

}

使用工具类:

代码语言:java
复制

@Slf4j
public class HiveSQLParseUtil {

    public static RWTable getReadAndWriteTable(String sql) {

        Set<String> readTables = new HashSet<>();
        Set<String> writeTables = new HashSet<>();

        // 解析 SQL,获得 ASTNode
        HiveConf conf = new HiveConf();
        conf.set("_hive.hdfs.session.path", "/tmp/hive");
        conf.set("_hive.local.session.path", "/tmp/hive");
        try {
            Context ctx = new Context(conf);
            ASTNode astNode = ParseUtils.parse(sql, ctx);
            // 遍历 ASTNode,查找输入输出表

            TableParserNodeProcessor nodeProcessor = new TableParserNodeProcessor(readTables, writeTables);
            GraphWalker ogw = new DefaultGraphWalker(new DefaultRuleDispatcher(nodeProcessor, Maps.newLinkedHashMap(), null));
            ogw.startWalking(Lists.newArrayList(astNode), new HashMap<>());
        } catch (Exception e) {
            log.error("=================>SQL 解析异常:{}", sql);
        }

        return new RWTable(readTables, writeTables);
    }


}
@AllArgsConstructor
@NoArgsConstructor
public class TableParserNodeProcessor implements NodeProcessor {
    private Set<String> readTables;
    private Set<String> writeTables;


    /**
     * 遍历 ASTNode,查找输入输出表
     *
     * @param node 当前 ASTNode
     */
    @Override
    public Object process(Node node, Stack<Node> stack, NodeProcessorCtx nodeProcessorCtx, Object... objects) throws SemanticException {
        ASTNode astNode = (ASTNode) node;
        switch (astNode.getToken().getType()) {
            case HiveParser.TOK_CREATETABLE: // 建表,create
            case HiveParser.TOK_TAB: // 输出表,insert
                String outputTableName = BaseSemanticAnalyzer.getUnescapedName((ASTNode) astNode.getChild(0)).toLowerCase();
                writeTables.add(outputTableName);
                break;
            case HiveParser.TOK_TABREF: // from
                String inputTableName = BaseSemanticAnalyzer.getUnescapedName((ASTNode) astNode.getChild(0)).toLowerCase();
                readTables.add(inputTableName);
                break;
        }

        return null;
    }
}

未完待续

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 目的
  • 方案
  • 实现
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档