查看hiveserver2日志发现一些有用信息:
2024-04-03 08:25:24,858 INFO org.apache.hive.service.cli.operation.Operation: [b01735e8-83ad-4696-9002-10aa1775842f HiveServer2-Handler-Pool: Thread-1206014]: [opType=EXECUTE_STATEMENT, queryId=hive_20240403082524_4d7bbb9a-fc66-41f0-beaf-69256282a6d6, startTime=1712103924857, sessionId=b01735e8-83ad-4696-9002-10aa1775842f, createTime=1712103924220, userName=srvtetl, ipAddress=101.110.10.01]
2024-04-03 08:25:24,859 INFO org.apache.hadoop.hive.ql.Driver: [b01735e8-83ad-4696-9002-10aa1775842f HiveServer2-Handler-Pool: Thread-1206014]: Compiling command(queryId=hive_20240403082524_4d7bbb9a-fc66-41f0-beaf-69256282a6d6): insert overwrite into table XXXX select * from YYYYY
2024-04-03 08:26:06,976 INFO org.apache.hadoop.hive.ql.Driver: [HiveServer2-Background-Pool: Thread-1210942]: Completed executing command(queryId=hive_20240403082524_4d7bbb9a-fc66-41f0-beaf-69256282a6d6); Time taken: 36.834 seconds
日志中有sql提交用户:userName,还有提交主机:ipAddress,提交sql:insert overwrite into table XXXX select * from YYYYY,编译的时间等信息;
2024-04-03 08:25:30,129 INFO org.apache.hadoop.hive.ql.lockmgr.DbTxnManager: [HiveServer2-Background-Pool: Thread-1210942]: Setting lock request transaction to txnid:20906075 for queryId=hive_20240403082524_4d7bbb9a-fc66-41f0-beaf-69256282a6d6
2024-04-03 08:25:30,130 INFO org.apache.hadoop.hive.ql.lockmgr.DbLockManager: [HiveServer2-Background-Pool: Thread-1210942]: Requesting: queryId=hive_20240403082524_4d7bbb9a-fc66-41f0-beaf-69256282a6d6 LockRequest(component:[LockComponent(type:SHARED_READ, level:DB, dbname:dws, operationType:NO_TXN, isDynamicPartitionWrite:false)], txnid:20906075
2024-04-03 08:25:30,142 INFO org.apache.hadoop.hive.ql.lockmgr.DbLockManager: [HiveServer2-Background-Pool: Thread-1210942]: Response to queryId=hive_20240403082524_4d7bbb9a-fc66-41f0-beaf-69256282a6d6 LockResponse(lockid:4427837, state:ACQUIRED)
2024-04-03 08:25:30,142 INFO org.apache.hadoop.hive.ql.Driver: [HiveServer2-Background-Pool: Thread-1210942]: Executing command(queryId=hive_20240403082524_4d7bbb9a-fc66-41f0-beaf-69256282a6d6): insert overwrite into table XXXX select * from YYYYY
2024-04-03 08:26:06,855 INFO org.apache.hadoop.hive.ql.exec.tez.TezTask: [HiveServer2-Background-Pool: Thread-1210942]: HIVE:
2024-04-03 08:26:06,855 INFO org.apache.hadoop.hive.ql.exec.tez.TezTask: [HiveServer2-Background-Pool: Thread-1210942]: CREATED_FILES: 1
2024-04-03 08:26:06,855 INFO org.apache.hadoop.hive.ql.exec.tez.TezTask: [HiveServer2-Background-Pool: Thread-1210942]: DESERIALIZE_ERRORS: 0
2024-04-03 08:26:06,855 INFO org.apache.hadoop.hive.ql.exec.tez.TezTask: [HiveServer2-Background-Pool: Thread-1210942]: RECORDS_IN_Map_1: 1323812
2024-04-03 08:26:06,855 INFO org.apache.hadoop.hive.ql.exec.tez.TezTask: [HiveServer2-Background-Pool: Thread-1210942]: RECORDS_IN_Map_5: 64981
2024-04-03 08:26:06,855 INFO org.apache.hadoop.hive.ql.exec.tez.TezTask: [HiveServer2-Background-Pool: Thread-1210942]: RECORDS_OUT_1_dws.XXXX : 1
2024-04-03 08:26:06,976 INFO org.apache.hadoop.hive.ql.Driver: [HiveServer2-Background-Pool: Thread-1210942]: Completed executing command(queryId=hive_20240403082524_4d7bbb9a-fc66-41f0-beaf-69256282a6d6); Time taken: 36.834 seconds
2024-04-03 08:26:06,977 INFO org.apache.hadoop.hive.ql.Driver: [HiveServer2-Background-Pool: Thread-1210942]: OK
2024-04-03 08:26:06,977 INFO org.apache.hadoop.hive.ql.lockmgr.DbTxnManager: [HiveServer2-Background-Pool: Thread-1210942]: Stopped heartbeat for query: hive_20240403082524_4d7bbb9a-fc66-41f0-beaf-69256282a6d6
从日志中可以清晰的看到查询开始时间(可以使用compiling时间,也可以使用Executing时间),写入的时间量(RECORDS_OUT_1_dws.XXXX),结束时间(Completed executing或者Stopped heartbeat for query)。
当然整个过程串起来需要一个关键信息,从开始查询到结束关键字就是threadid,但是compiling和后续executing不是一个线程,所以开始时间我们使用Executing时间。
剩下的工作就是写代码把这些实现出来,java或者其他语言都是很简单的。
因为我们应用的日志接了审计,所以这部分日志丢到了kafka,所以我尝试使用flink进行相关处理。
@Slf4j
public class LogParse4Hive {
public static void main(String[] args) throws Exception {
ParameterTool parameterTool = ParameterTool.fromPropertiesFile("app/conf/application.properties");
String elkBootstrapservers = parameterTool.get("elk.bootstrap.servers");
String bdpLogTopic = parameterTool.get("bdp.log.topic");
String zkServer = parameterTool.get("zookeeper.quorum");
String hbaseClientJaasFile = parameterTool.get("hbase.client.jaas.conf");
String hbaseRegionKeytab = parameterTool.get("hbase.region.keytab.path");
String hbaseRegionPrincipal = parameterTool.get("hbase.region.principal");
String krb5Conf = parameterTool.get("krb5.conf.path");
System.setProperty("java.security.auth.login.config", hbaseClientJaasFile);
Configuration conf = new Configuration();
conf.set("hadoop.security.authentication", "kerberos");
UserGroupInformation.setConfiguration(conf);
try {
UserGroupInformation.loginUserFromKeytab(hbaseRegionPrincipal, hbaseRegionKeytab);
} catch (IOException e) {
throw new RuntimeException(e);
}
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EmbeddedRocksDBStateBackend embeddedRocksDBStateBackend = new EmbeddedRocksDBStateBackend();
env.setStateBackend(embeddedRocksDBStateBackend); // 设置 EmbeddedRocksDBStateBackend 作为状态后端
env.enableCheckpointing(600000, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointStorage("hdfs://nameservice1/tianyan/ckpt");
// env.getCheckpointConfig().setCheckpointStorage("file:/c:/ckpt");
// 开启 task级别故障自动 failover
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10000));
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers(elkBootstrapservers)
.setTopics(bdpLogTopic)
.setGroupId("my-test-group")
.setValueOnlyDeserializer(new SimpleStringSchema())
// .setStartingOffsets(OffsetsInitializer.latest()) //设置读取
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
.build();
DataStreamSource<String> ds = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "app-bdp-log");
/**
* {"@timestamp":"2024-03-14T03:29:35.742Z",
* "@metadata":{"beat":"filebeat","type":"_doc","version":"7.17.0"},
* "ecs":{"version":"1.12.0"},
* "host":{"name":"bdsp001"},
* "log":{"offset":193142526,"file":{"path":"/var/log/hive/hadoop-cmf-hive_on_tez3-HIVESERVER2-bdsp001.log.out"}},
* "log_topic":"app-bdsp",
* "type":"java",
* "app":"bdsp",
* "input":{"type":"log"},
* "agent":{"type":"filebeat","version":"7.17.0","hostname":"bdsp001","ephemeral_id":"d921126a-b7c8-41bd-947b-731b76a5dd2b","id":"e9881d5c-7765-4704-8945-61ebc8b06e12","name":"bdsp001"},
* "message":"2024-03-14 11:29:27,036 INFO org.apache.hadoop.hive.ql.txn.compactor.Worker: [bdsp001-59]: Worker thread finished one loop.","filebeat_tag":"custom","env":"uat"
* }
*/
String regex = "(\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2},\\d{3}) (\\w+) (.+?).*?(Thread-\\d+)\\]: (.+)";
Pattern pattern = Pattern.compile(regex, Pattern.DOTALL | Pattern.CASE_INSENSITIVE);
List<String> keywords = Arrays.asList("create", "insert", "records_out_", "stopped heartbeat for query");
SingleOutputStreamOperator<String> neededLogDS = ds.filter(msg -> {
boolean isHS2log = msg.contains("HIVESERVER2");
String lowerCaseMsg = msg.toLowerCase();
boolean isNeededMsg = keywords.stream().anyMatch(keyword -> lowerCaseMsg.contains(keyword));
return isHS2log && isNeededMsg;
});
SingleOutputStreamOperator<Tuple4<String, String, String, String>> hiveserver2Log = neededLogDS.map(json -> {
KafkaLog kafkaLog = JSON.parseObject(json, KafkaLog.class);
String logHost = kafkaLog.getHost().getName();
String logPath = kafkaLog.getLog().getFile().getPath();
String logMessage = kafkaLog.getMessage();
String logMessageLower = logMessage.toLowerCase();
Matcher matcher = pattern.matcher(logMessage);
if (matcher.find()) {
String eventTime = matcher.group(1).trim();
String logLevel = matcher.group(2).trim();
String threadId = matcher.group(4).trim();
String message = matcher.group(5).trim();
return Tuple4.of(eventTime, logHost, threadId, message);
}
return null;
})
// .returns(new TypeHint<Tuple2<String, String>>() {}); // 通过 TypeHint 传达返回数据类型
// .returns(TypeInformation.of(new TypeHint<Tuple2<String, String>>() {})); // 更通用的,是传入TypeInformation,上面的TypeHint也是封装了TypeInformation
.returns(Types.TUPLE(Types.STRING, Types.STRING, Types.STRING, Types.STRING)) // 利用工具类Types的各种静态方法,来生成TypeInformation
.filter(Objects::nonNull);
//日志收集存在乱序
SingleOutputStreamOperator<Tuple4<String, String, String, String>> logsWithWaterMark = hiveserver2Log.assignTimestampsAndWatermarks(
WatermarkStrategy.<Tuple4<String, String, String, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
.withTimestampAssigner((SerializableTimestampAssigner<Tuple4<String, String, String, String>>) (element, recordTimestamp) -> {
String eventTime = element.f0;
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss,SSS");
LocalDateTime timestamp = LocalDateTime.parse(eventTime, formatter);
// 转换为默认时区的时间戳
return timestamp.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
})
);
SingleOutputStreamOperator<HiveLog> processedHiveLog = logsWithWaterMark
.keyBy(tp -> tp.f2) //按线程分组
.window(TumblingEventTimeWindows.of(Time.seconds(15)))// 事件时间滚动窗口,窗口长度为10
.allowedLateness(Time.seconds(10))
.process(new MessageProcessFunction());
// processedHiveLog.print();
tenv.createTemporaryView("hive_log", processedHiveLog);
// tenv.executeSql("select * from hive_log").print();
tenv.executeSql(
" CREATE TABLE flink_hive_log ( " +
" rk string, " +
" cf ROW<startTime string,endTime string,threadId string,queryId string,readTable string,writeTable string,writeRows string,`sql` string>, " +
" PRIMARY KEY (rk) NOT ENFORCED " +
" ) WITH ( " +
" 'connector' = 'hbase-2.2', " +
" 'table-name' = 'hive_log_monitor', " +
" 'zookeeper.quorum' = '" + zkServer + "', " +
" 'properties.hbase.security.authentication' = 'kerberos', " +
" 'zookeeper.znode.parent' = '/hbase', " +
" 'properties.hbase.regionserver.kerberos.principal' = 'hbase/_HOST@bigdata.COM' " +
" ) "
);
tenv.createTemporarySystemFunction("tm_proc", TimeStr.class);
tenv.executeSql(
" insert into flink_hive_log " +
" select " +
" tm_proc(writeTable,endTime), " +
" ROW(startTime,endTime,threadId,queryId,readTable,writeTable,writeRows,`sql`) " +
" from hive_log "
);
env.execute();
}
public static class TimeStr extends ScalarFunction {
public String eval(String table, String timeStr) {
if (StringUtils.isEmpty(timeStr)) {
timeStr = getCurrentTime();
}
if (StringUtils.isEmpty(table)) {
table = "NoneTable";
}
String newTimeStr = timeStr.replace("-", "").replace(":", "").replace(" ", "");
return table + newTimeStr;
}
public String getCurrentTime() {
// 获取当前日期时间
LocalDateTime now = LocalDateTime.now();
// 定义日期时间格式化器
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
// 格式化当前日期时间
return now.format(formatter);
}
}
}
处理流程:
@Slf4j
public class MessageProcessFunction extends ProcessWindowFunction<Tuple4<String, String, String, String>, HiveLog, String, TimeWindow> {
MapState<String, HiveLog> queryStates;
LongCounter queryCounter;
@Override
public void open(Configuration parameters) throws Exception {
//设置状态过期时间,防止日志异常,导致状态过大
MapStateDescriptor<String, HiveLog> mapStateDescriptor = new MapStateDescriptor<>("queryinfo", String.class, HiveLog.class);
StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.of(60, TimeUnit.MINUTES))
.updateTtlOnReadAndWrite() // 读、写,都导致该条数据的ttl计时重置
// .setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp) // 允许返回已过期但尚未被清理的数据
// .useProcessingTime() // ttl计时的时间语义:设置为处理时间
// .cleanupIncrementally(1, false) // 增量清理(每当一条状态数据被访问,则会检查这条状态数据的ttl是否超时,是就删除)
// .cleanupInRocksdbCompactFilter(1000) // 在rocksdb的compact机制中添加过期数据过滤器,以在compact过程中清理掉过期状态数据
.build();
// mapStateDescriptor.enableTimeToLive(ttlConfig);
queryStates = getRuntimeContext().getMapState(mapStateDescriptor);
queryCounter = getRuntimeContext().getLongCounter("ec.query.cnt");
}
@Override
public void process(String key, Context context, Iterable<Tuple4<String, String, String, String>> elements, Collector<HiveLog> collector) throws Exception {
for (Tuple4<String, String, String, String> element : elements) {
String eventTime = element.f0;
String hostName = element.f1;
String threadId = element.f2;
String message = element.f3;
// 正则表达式,同时匹配查询时间,执行线程,queryId,查询sql。
String upperCaseMessage = message.toUpperCase();
if (message.contains("Executing command") && (upperCaseMessage.contains("INSERT") | upperCaseMessage.contains("CREATE"))) {
log.debug("---------> queryStart--->{}", element);
queryStart(eventTime, threadId, hostName, message);
}
//正则表达式,匹配线程对应的写出数据条数:
String patternString = "" + "\\d+";
if (message.matches(".*" + patternString + ".*")) {
log.debug("---------> queryCount--->{}", element);
queryCnt(threadId, message);
}
//正则表达式,匹配线程结束标识,删除状态。
if (message.contains("Stopped heartbeat for query")) {
log.debug("---------> queryEnd--->{}", element);
queryEnd(eventTime, threadId, message);
}
HiveLog hiveLog = queryStates.get(threadId);
if (hiveLog != null) {
if (hiveLog.getStartTime() != null && hiveLog.getWriteRows() != null && hiveLog.getEndTime() != null) {
//查询结束删除状态中存储的threadId
queryStates.remove(threadId);
queryCounter.add(-1);
log.info("delete hivelog {}", threadId);
collector.collect(hiveLog);
}
}
}
}
private void queryEnd(String eventTime, String threadId, String message) throws Exception {
String regex = ".*Stopped heartbeat for query: ([a-z0-9_-]+)";
Pattern pattern = Pattern.compile(regex, Pattern.DOTALL | Pattern.CASE_INSENSITIVE);
Matcher matcher = pattern.matcher(message);
if (matcher.find()) {
String queryId = matcher.group(1).trim(); // queryId
HiveLog hiveLog = queryStates.get(threadId);
if (hiveLog == null) {
hiveLog = new HiveLog();
queryCounter.add(1L);
} else {
String queryIdStart = hiveLog.getQueryId();
if (!queryId.equals(queryIdStart)) {
log.error("ThreadId:{},QueryId from start log is:{},but from stop log:{}", hiveLog.getThreadId(), queryIdStart, queryId);
}
}
hiveLog.setEndTime(eventTime);
queryStates.put(threadId, hiveLog);
log.debug("queryEnd2: hiveLog {},--->{}", hiveLog.getThreadId(), hiveLog);
}
}
private void queryCnt(String threadId, String message) throws Exception {
String regex = ".*RECORDS_OUT_\\d_([\\w.]+):\\s*(\\d+)";
Pattern pattern = Pattern.compile(regex, Pattern.DOTALL | Pattern.CASE_INSENSITIVE);
Matcher matcher = pattern.matcher(message);
if (matcher.find()) {
String outputTableName = matcher.group(1).trim().toLowerCase(); // 表名
String writeRows = matcher.group(2).trim(); // 数值
HiveLog hiveLog = queryStates.get(threadId);
if (hiveLog == null) {
hiveLog = new HiveLog();
queryCounter.add(1L);
} else {
String writeTable = hiveLog.getWriteTable();
if (!outputTableName.equals(writeTable)) {
log.error("ThreadId:{},Output table from sql parse is:{},but from parse log:{}", hiveLog.getThreadId(), writeTable, outputTableName);
}
}
hiveLog.setWriteTable(outputTableName);
hiveLog.setWriteRows(writeRows);
queryStates.put(threadId, hiveLog);
log.debug("queryCnt2: hiveLog {}", hiveLog);
} else {
log.debug("No RECORDS_OUT Info Found");
}
}
private void queryStart(String eventTime, String threadId, String hostName, String message) throws Exception {
String regex = ".*\\(queryId=(.*?)\\):\\s*(.*)";
Pattern pattern = Pattern.compile(regex, Pattern.DOTALL | Pattern.CASE_INSENSITIVE);
Matcher matcher = pattern.matcher(message);
if (matcher.find()) {
String queryId = matcher.group(1).trim(); //queryID
String sql = matcher.group(2).trim(); // 查询sql
RWTable readAndWriteTable = HiveSQLParseUtil.getReadAndWriteTable(sql); //解析成读写表
String readTables = String.join(",", readAndWriteTable.getReadTables()).trim().toLowerCase();
String writeTable = String.join(",", readAndWriteTable.getWriteTables()).trim().toLowerCase();
HiveLog hiveLog = queryStates.get(threadId);
if (hiveLog == null) {
hiveLog = new HiveLog();
queryCounter.add(1L);
}
hiveLog.setStartTime(eventTime);
hiveLog.setHost(hostName);
hiveLog.setThreadId(threadId);
hiveLog.setQueryId(queryId);
hiveLog.setReadTable(readTables);
hiveLog.setReadTable(writeTable);
hiveLog.setSql(sql);
queryStates.put(threadId, hiveLog);
log.debug("queryStart2: hiveLog {}", hiveLog);
} else {
log.debug("No Insert or Create Query Info found");
}
}
}
使用的几个bean:
@Data
@Setter
@Getter
public class KafkaLog {
@JSONField(name = "@timestamp")
private String timestamp;
@JSONField(name = "@metadata")
private Metadata metadata;
private Ecs ecs;
private Host host;
private Log log;
private String logTopic;
private String type;
private String app;
private Input input;
private Agent agent;
private String message;
private String filebeatTag;
private String env;
@Data
public static class Metadata {
private String beat;
private String type;
private String version;
}
@Data
public static class Ecs {
private String version;
}
@Data
public static class Host {
private String name;
}
@Data
public static class Log {
private Long offset;
private File file;
@Data
public static class File {
private String path;
}
}
// 定义Input类
@Data
public static class Input {
private String type;
}
// 定义Agent类
@Data
public static class Agent {
private String type;
private String version;
private String hostname;
private String ephemeralId;
private String id;
private String name;
}
}
@Data
@Setter
@Getter
@NoArgsConstructor
@AllArgsConstructor
public class HiveLog {
private volatile String host;
private volatile String startTime;
private volatile String endTime;
private volatile String threadId;
private volatile String queryId;
// private volatile String txnid;
private volatile String readTable;
private volatile String writeTable;
private volatile String writeRows;
private volatile String sql;
}
@Data
@Setter
@Getter
@NoArgsConstructor
@AllArgsConstructor
public class RWTable {
private Set<String> readTables;
private Set<String> writeTables;
}
使用工具类:
@Slf4j
public class HiveSQLParseUtil {
public static RWTable getReadAndWriteTable(String sql) {
Set<String> readTables = new HashSet<>();
Set<String> writeTables = new HashSet<>();
// 解析 SQL,获得 ASTNode
HiveConf conf = new HiveConf();
conf.set("_hive.hdfs.session.path", "/tmp/hive");
conf.set("_hive.local.session.path", "/tmp/hive");
try {
Context ctx = new Context(conf);
ASTNode astNode = ParseUtils.parse(sql, ctx);
// 遍历 ASTNode,查找输入输出表
TableParserNodeProcessor nodeProcessor = new TableParserNodeProcessor(readTables, writeTables);
GraphWalker ogw = new DefaultGraphWalker(new DefaultRuleDispatcher(nodeProcessor, Maps.newLinkedHashMap(), null));
ogw.startWalking(Lists.newArrayList(astNode), new HashMap<>());
} catch (Exception e) {
log.error("=================>SQL 解析异常:{}", sql);
}
return new RWTable(readTables, writeTables);
}
}
@AllArgsConstructor
@NoArgsConstructor
public class TableParserNodeProcessor implements NodeProcessor {
private Set<String> readTables;
private Set<String> writeTables;
/**
* 遍历 ASTNode,查找输入输出表
*
* @param node 当前 ASTNode
*/
@Override
public Object process(Node node, Stack<Node> stack, NodeProcessorCtx nodeProcessorCtx, Object... objects) throws SemanticException {
ASTNode astNode = (ASTNode) node;
switch (astNode.getToken().getType()) {
case HiveParser.TOK_CREATETABLE: // 建表,create
case HiveParser.TOK_TAB: // 输出表,insert
String outputTableName = BaseSemanticAnalyzer.getUnescapedName((ASTNode) astNode.getChild(0)).toLowerCase();
writeTables.add(outputTableName);
break;
case HiveParser.TOK_TABREF: // from
String inputTableName = BaseSemanticAnalyzer.getUnescapedName((ASTNode) astNode.getChild(0)).toLowerCase();
readTables.add(inputTableName);
break;
}
return null;
}
}
未完待续
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。