专栏首页中小公司数据治理中小公司数据治理最佳实践-数据接入
原创

中小公司数据治理最佳实践-数据接入

数据接入准则:

意义:规范的数据接入能大大减少后续的维护及使用代价

规定:

  1. 意义明确:(有实时更新的wiki说明)(数据中台-元数据负责)
    1. 日志级别:明确说明在整个公司业务流程中的位置
    2. 记录级别:每条日志的打印时机和对应业务操作对应关系
    3. 字段级别:每个字段的具体意义,比如:枚举和业务的对应关系;
  2. 格式规范:(流程规范性负责)
    1. 最佳实践要求:
      1. 扩展性
      2. 易读性
      3. 后续解析代价
      4. 压缩
    2. 范例:可以考虑无格式,tag分割,json,protobuf (越来越严格,接入代价大,但是容易自动化,节省解析/开发资源)
  3. 责任人明确:数据后续有效性维护问题(数据中台-元数据负责)
  4. 使用方明确:后续影响面评估,数据意义/格式变更通知机制(数据中台-元数据负责)

数据接入实现

公司的一般数据源包括:日志文件,业务mysql,kafka中数据

接入的数据分为实时接入和天级接入:

log实时接入:

flume->kafka->spark->hdfs file

log天级接入:

  1. 用sh copy,然后hive load
  2. flume收集落盘,hive load
  3. 如果有实时写入:
    1. 可以采用kafka->flume
    2. 也可以spark实时解析日志(在白天预解析,减少夜间计算时间)

注意事项:日志非准确跨天问题。(我们采用扫描最新一个日志文件没前一天的数据就开始计算)

mysql实时接入:

maxwell->kafka->spark->hbase

hbase只提供简单rowkey 点查询,后续可能会考虑clickhouse

mysql天级接入:

  1. sqoop/mysql client
  2. kafka->spark 实时落盘,夜间合并快照表

最佳实践

  1. 在数仓接入初期,强力推行可扩展的json/protobuf格式,将从log到hive 建表过程自动化,可以大大减少数据接入工作。
  2. 要推行一个规范,给出方便的工具;最优情况:为规范遵守方带来益处大于带来麻烦;次优:给出方便工具;否则强制推行会阻力很大(就是以权压人)
  3. 公司数据规范举例:
1.现状:

    线上日志保留时间不统一。有的只保留当天,第二天出现问题无从追查。
    日志格式不统一,有的用tab分隔的,有的json。

 2.目标:

    便于程序调试
    线上问题追查
    方便后续解析

计划统一后续日志的打印规范,现征求意见稿如下:

有任何建议及时邮件或者在此留言。


3.应用日志
3.1.日志打印规范:

    日志文件:
        一小时一个(单个文件最好不要超过1G,否则在线问题追查时,grep太浪费cpu)
        保留至少3天(第二天发现第一天的统计报表问题可以返回现场追查)
    日志格式(需要入hive和进行spike处理的日志都必须为json格式):
        日志时间 日志级别 进程名称 行号 json 的dict结构。(方便进行问题追查)

dict允许嵌套,dict的key的命名方式只能包含【大小写26个英文字符、数字、下划线】。(其它字符保留给配置文件的元字符)

遇到list时不再往里解析作为一个整体处理
3.2.示例

2018-09-10 20:57:16INFO main com.soul.dw.syncmysqlhbase.Utils.main(Utils.java:43){"userID":"123344", "detail":"im a good boy!!!!", "dict":{"dict_content1":"content1","dict_content1":"content"}, "list":["fdasdfs", }

可能的问题:如果json的内容中有换行符号,会导致被当做多行日志处理,格式不符合,解析失败。(现在也有这个问题,非此次引入。要解决只能用二进制编码流方式收集日志,比如用kafkaclient直接写入kafka)


3.3.实现

    Log4j java文件和配置()
        Log4j不支持按时间切割的,保留固定文件数据自动删除,需要自己实现。抄袭的网上解决方案,已通过测试。
        对应的log4j配置:(MyDailyRollingFileAppender,MaxFileSize为保存日志的份数)
    logback配置(待补充,谁熟悉可以帮忙解决一下)

4.Nginx日志打印规范

    log_format  main  '$remote_addr - $remote_user [$time_local] "$request" '

                          '$status $body_bytes_sent "$http_referer" '

                          '"$http_user_agent" "$http_x_forwarded_for"';
                          
log4j 按时间分割,自动删除类&及配置
注意事项:必须建立包 org.apache.log4j,在其目录下实现此类
MyDailyRollingFileAppender.java

package org.apache.log4j;
/**
 * copy form https://www.cnblogs.com/rembau/p/5201001.html
 *@ClassName MyDailyRollingFileAppender
 *@Description TODO
 **/
 
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
import java.io.File;
import java.io.FileFilter;
import java.io.IOException;
import java.text.ParseException;
import java.util.*;
 
public class MyDailyRollingFileAppender extends DailyRollingFileAppender {
    private static Logger logger = LoggerFactory.getLogger(MyDailyRollingFileAppender.class);
    private int maxFileSize = 60;
 
 
    void rollOver() throws IOException {
        super.rollOver();
 
        logger.debug("保留文件数量" + maxFileSize + ",日志文件名称为:" + fileName);
        List<File> fileList = getAllLogs();
        sortFiles(fileList);
        logger.debug(fileList.toString());
        deleteOvermuch(fileList);
    }
 
    /**
     * 删除过多的文件
     * @param fileList 所有日志文件
     */
    private void deleteOvermuch(List<File> fileList) {
        if (fileList.size() > maxFileSize) {
            for (int i = 0;i < fileList.size() - maxFileSize;i++) {
                fileList.get(i).delete();
                logger.debug("删除日志" + fileList.get(i));
            }
        }
    }
 
    /**
     * 根据文件名称上的特定格式的时间排序日志文件
     * @param fileList
     */
    private void sortFiles(List<File> fileList) {
        Collections.sort(fileList, new Comparator<File>() {
            public int compare(File o1, File o2) {
                try {
                    if (getDateStr(o1).isEmpty()) {
                        return 1;
                    }
                    Date date1 = sdf.parse(getDateStr(o1));
 
                    if (getDateStr(o2).isEmpty()) {
                        return -1;
                    }
                    Date date2 = sdf.parse(getDateStr(o2));
 
                    if (date1.getTime() > date2.getTime()) {
                        return 1;
                    } else if (date1.getTime() < date2.getTime()) {
                        return -1;
                    }
                } catch (ParseException e) {
                    logger.error("", e);
                }
                return 0;
            }
        });
    }
 
    private String getDateStr(File file) {
        if (file == null) {
            return "null";
        }
        return file.getName().replaceAll(new File(fileName).getName(), "");
    }
 
    /**
     *  获取所有日志文件,只有文件名符合DatePattern格式的才为日志文件
     * @return
     */
    private List<File> getAllLogs() {
        final File file = new File(fileName);
        File logPath = file.getParentFile();
        if (logPath == null) {
            logPath = new File(".");
        }
 
        File files[] = logPath.listFiles(new FileFilter() {
            public boolean accept(File pathname) {
                try {
                    if (getDateStr(pathname).isEmpty()) {
                        return true;
                    }
                    sdf.parse(getDateStr(pathname));
                    return true;
                } catch (ParseException e) {
                    logger.error("", e);
                    return false;
                }
            }
        });
        return Arrays.asList(files);
    }
    public int getMaxFileSize() {
        return maxFileSize;
    }
 
    public void setMaxFileSize(int maxFileSize) {
        this.maxFileSize = maxFileSize;
    }
}
log4j.properties

# https://www.cnblogs.com/rembau/p/5201001.html
### set log levels ###
log4j.rootLogger = debug, stdout, file1, file2
 
### 输出到控制台 ###
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern = [%p]%-d{yyyy-MM-dd HH:mm:ss} %t %l %m%n
 
### 输出到日志文件 ###
log4j.appender.file2 = org.apache.log4j.MyDailyRollingFileAppender
log4j.appender.file2.File = /tmp/test.log
log4j.appender.file2.Append = true
log4j.appender.file2.Threshold = DEBUG
log4j.appender.file2.MaxFileSize=72
log4j.appender.file2.DatePattern = '.'yyyy-MM-dd-HH
log4j.appender.file2.layout = org.apache.log4j.PatternLayout
log4j.appender.file2.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss} %p %t %l %m%n

logback配置(logback日志支持小时级、天级的日志文件切分和按策略自动删除,但是目前小时级的历史日志自动删除有问题,建议使用天级别的日志切分和删除历史日志文件)

logback.xml
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
    <property resource="log.properties" />
    <!-- console -->
    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <pattern>[%d{yyyy-MM-dd HH:mm:ss.SSS}][%level]%class %m%n</pattern>
        </encoder>
    </appender>
 
    <!-- info 只打印info日志-->
    <appender name="systemFile"
        class="ch.qos.logback.core.rolling.RollingFileAppender">
        <filter class="ch.qos.logback.classic.filter.LevelFilter"> 
            <level>INFO</level> 
            <onMatch>ACCEPT</onMatch> 
            <onMismatch>DENY</onMismatch> 
        </filter> 
 
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <fileNamePattern>${log.dir}/tagging-info.%d{yyyy-MM-dd}.log</fileNamePattern>
            <maxHistory>N</maxHistory>   <!-- 保存N+1天日志,大于N+1前的日志文件会被自动删除 -->
            <cleanHistoryOnStart>true</cleanHistoryOnStart>
        </rollingPolicy>
 
        <encoder>
            <pattern>[%d{yyyy-MM-dd HH:mm:ss.SSS}][%level]%class %m%n</pattern>
            <charset>UTF-8</charset>
        </encoder>
    </appender>
 
     
 
    <!-- error -->
    <appender name="errorFile" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <filter class="ch.qos.logback.classic.filter.LevelFilter"> 
            <level>ERROR</level> 
            <onMatch>ACCEPT</onMatch> 
            <onMismatch>DENY</onMismatch> 
        </filter> 
 
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <fileNamePattern>${log.dir}/tagging-error.%d{yyyy-MM-dd-HH}.log</fileNamePattern>
            <maxHistory>1</maxHistory>
            <cleanHistoryOnStart>true</cleanHistoryOnStart>
        </rollingPolicy>
 
        <encoder>
            <pattern>[%d{yyyy-MM-dd HH:mm:ss.SSS}][%level]%class %m%n</pattern>
            <charset>UTF-8</charset>
        </encoder>
    </appender>    
 
    <root level="INFO">
        <appender-ref ref="STDOUT" />
        <appender-ref ref="systemFile" />
        <appender-ref ref="errorFile" />
    </root>
</configuration>

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 理想开发流程

    1.产品经理(数据分析师)一起给出评估产品功能效果的数据指标,及预期的指标数值范围

    世界改造者
  • 中小公司数据治理最佳实践-总则

    世界改造者
  • Log4j自定义Appender介绍

    转自:http://gemantic.iteye.com/blog/1234996

    sanmutongzi
  • Log4j官方文档翻译(三、配置)

      之前的章节介绍了log4j的核心组件,本章将会通过配置文件介绍一下核心组建的配置。   主要在配置文件中配置log4j的日志级别,定义appender、...

    用户1154259
  • log4j介绍以及使用教程

      Log4j是Apache的一个开放源代码项目,通过使用Log4j,我们可以控制日志信息输送的目的地是控制台、文件、GUI组件、甚至是套接 口服务 器、NT的...

    阿豪聊干货
  • 面试官:SpringBoot中关于日志工具的使用,我想问你几个常见问题

    公众号[JavaQ]原创,专注分享Java基础原理分析、实战技术、微服务架构、分布式系统构建,诚邀点赞关注!

    JavaQ
  • 日志级别记录规范

    之前看了网上发的日志级别的使用规范和「日志管理与分析权威指南」里面的日志级别规范和说明,具体内容如下。

    后场技术
  • 干货分享 | 腾讯自研数据库CynosDB分布式存储的核心原理

    点击上方蓝字关注每天学习数据库 作者简介:许中清,腾讯云自研云原生数据库CynosDB的分布式存储CynosStore负责人,负责数据库内核开发、数据库产品架...

    腾讯云数据库 TencentDB
  • Web攻击日志初探

    前段时间偶然间在一朋友处获得了多个系统的web日志,并被被要求针对这些日志进行分析。一时兴起,随便打开了一个,打开后发现日志数量极大,接着又打开了好几个,发现每...

    FB客服
  • 孙旭:CynosDB for PostgreSQL一主多读架构

    3月16日在北京举行的腾讯云自研数据库CynosDB交流会圆满落下帷幕。现将技术团队分享的内容整理如下。

    云加社区技术沙龙

扫码关注云+社区

领取腾讯云代金券