前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >中小公司数据治理最佳实践-数据接入

中小公司数据治理最佳实践-数据接入

原创
作者头像
世界改造者
修改2020-07-07 10:18:34
9580
修改2020-07-07 10:18:34
举报
文章被收录于专栏:中小公司数据治理

数据接入准则:

意义:规范的数据接入能大大减少后续的维护及使用代价

规定:

  1. 意义明确:(有实时更新的wiki说明)(数据中台-元数据负责)
    1. 日志级别:明确说明在整个公司业务流程中的位置
    2. 记录级别:每条日志的打印时机和对应业务操作对应关系
    3. 字段级别:每个字段的具体意义,比如:枚举和业务的对应关系;
  2. 格式规范:(流程规范性负责)
    1. 最佳实践要求:
      1. 扩展性
      2. 易读性
      3. 后续解析代价
      4. 压缩
    2. 范例:可以考虑无格式,tag分割,json,protobuf (越来越严格,接入代价大,但是容易自动化,节省解析/开发资源)
  3. 责任人明确:数据后续有效性维护问题(数据中台-元数据负责)
  4. 使用方明确:后续影响面评估,数据意义/格式变更通知机制(数据中台-元数据负责)

数据接入实现

公司的一般数据源包括:日志文件,业务mysql,kafka中数据

接入的数据分为实时接入和天级接入:

log实时接入:

flume->kafka->spark->hdfs file

log天级接入:

  1. 用sh copy,然后hive load
  2. flume收集落盘,hive load
  3. 如果有实时写入:
    1. 可以采用kafka->flume
    2. 也可以spark实时解析日志(在白天预解析,减少夜间计算时间)

注意事项:日志非准确跨天问题。(我们采用扫描最新一个日志文件没前一天的数据就开始计算)

mysql实时接入:

maxwell->kafka->spark->hbase

hbase只提供简单rowkey 点查询,后续可能会考虑clickhouse

mysql天级接入:

  1. sqoop/mysql client
  2. kafka->spark 实时落盘,夜间合并快照表

最佳实践

  1. 在数仓接入初期,强力推行可扩展的json/protobuf格式,将从log到hive 建表过程自动化,可以大大减少数据接入工作。
  2. 要推行一个规范,给出方便的工具;最优情况:为规范遵守方带来益处大于带来麻烦;次优:给出方便工具;否则强制推行会阻力很大(就是以权压人)
  3. 公司数据规范举例:
代码语言:javascript
复制
1.现状:

    线上日志保留时间不统一。有的只保留当天,第二天出现问题无从追查。
    日志格式不统一,有的用tab分隔的,有的json。

 2.目标:

    便于程序调试
    线上问题追查
    方便后续解析

计划统一后续日志的打印规范,现征求意见稿如下:

有任何建议及时邮件或者在此留言。


3.应用日志
3.1.日志打印规范:

    日志文件:
        一小时一个(单个文件最好不要超过1G,否则在线问题追查时,grep太浪费cpu)
        保留至少3天(第二天发现第一天的统计报表问题可以返回现场追查)
    日志格式(需要入hive和进行spike处理的日志都必须为json格式):
        日志时间 日志级别 进程名称 行号 json 的dict结构。(方便进行问题追查)

dict允许嵌套,dict的key的命名方式只能包含【大小写26个英文字符、数字、下划线】。(其它字符保留给配置文件的元字符)

遇到list时不再往里解析作为一个整体处理
3.2.示例

2018-09-10 20:57:16INFO main com.soul.dw.syncmysqlhbase.Utils.main(Utils.java:43){"userID":"123344", "detail":"im a good boy!!!!", "dict":{"dict_content1":"content1","dict_content1":"content"}, "list":["fdasdfs", }

可能的问题:如果json的内容中有换行符号,会导致被当做多行日志处理,格式不符合,解析失败。(现在也有这个问题,非此次引入。要解决只能用二进制编码流方式收集日志,比如用kafkaclient直接写入kafka)


3.3.实现

    Log4j java文件和配置()
        Log4j不支持按时间切割的,保留固定文件数据自动删除,需要自己实现。抄袭的网上解决方案,已通过测试。
        对应的log4j配置:(MyDailyRollingFileAppender,MaxFileSize为保存日志的份数)
    logback配置(待补充,谁熟悉可以帮忙解决一下)

4.Nginx日志打印规范

    log_format  main  '$remote_addr - $remote_user [$time_local] "$request" '

                          '$status $body_bytes_sent "$http_referer" '

                          '"$http_user_agent" "$http_x_forwarded_for"';
                          
代码语言:javascript
复制
log4j 按时间分割,自动删除类&及配置
注意事项:必须建立包 org.apache.log4j,在其目录下实现此类
MyDailyRollingFileAppender.java

package org.apache.log4j;
/**
 * copy form https://www.cnblogs.com/rembau/p/5201001.html
 *@ClassName MyDailyRollingFileAppender
 *@Description TODO
 **/
 
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
import java.io.File;
import java.io.FileFilter;
import java.io.IOException;
import java.text.ParseException;
import java.util.*;
 
public class MyDailyRollingFileAppender extends DailyRollingFileAppender {
    private static Logger logger = LoggerFactory.getLogger(MyDailyRollingFileAppender.class);
    private int maxFileSize = 60;
 
 
    void rollOver() throws IOException {
        super.rollOver();
 
        logger.debug("保留文件数量" + maxFileSize + ",日志文件名称为:" + fileName);
        List<File> fileList = getAllLogs();
        sortFiles(fileList);
        logger.debug(fileList.toString());
        deleteOvermuch(fileList);
    }
 
    /**
     * 删除过多的文件
     * @param fileList 所有日志文件
     */
    private void deleteOvermuch(List<File> fileList) {
        if (fileList.size() > maxFileSize) {
            for (int i = 0;i < fileList.size() - maxFileSize;i++) {
                fileList.get(i).delete();
                logger.debug("删除日志" + fileList.get(i));
            }
        }
    }
 
    /**
     * 根据文件名称上的特定格式的时间排序日志文件
     * @param fileList
     */
    private void sortFiles(List<File> fileList) {
        Collections.sort(fileList, new Comparator<File>() {
            public int compare(File o1, File o2) {
                try {
                    if (getDateStr(o1).isEmpty()) {
                        return 1;
                    }
                    Date date1 = sdf.parse(getDateStr(o1));
 
                    if (getDateStr(o2).isEmpty()) {
                        return -1;
                    }
                    Date date2 = sdf.parse(getDateStr(o2));
 
                    if (date1.getTime() > date2.getTime()) {
                        return 1;
                    } else if (date1.getTime() < date2.getTime()) {
                        return -1;
                    }
                } catch (ParseException e) {
                    logger.error("", e);
                }
                return 0;
            }
        });
    }
 
    private String getDateStr(File file) {
        if (file == null) {
            return "null";
        }
        return file.getName().replaceAll(new File(fileName).getName(), "");
    }
 
    /**
     *  获取所有日志文件,只有文件名符合DatePattern格式的才为日志文件
     * @return
     */
    private List<File> getAllLogs() {
        final File file = new File(fileName);
        File logPath = file.getParentFile();
        if (logPath == null) {
            logPath = new File(".");
        }
 
        File files[] = logPath.listFiles(new FileFilter() {
            public boolean accept(File pathname) {
                try {
                    if (getDateStr(pathname).isEmpty()) {
                        return true;
                    }
                    sdf.parse(getDateStr(pathname));
                    return true;
                } catch (ParseException e) {
                    logger.error("", e);
                    return false;
                }
            }
        });
        return Arrays.asList(files);
    }
    public int getMaxFileSize() {
        return maxFileSize;
    }
 
    public void setMaxFileSize(int maxFileSize) {
        this.maxFileSize = maxFileSize;
    }
}
代码语言:javascript
复制
log4j.properties

# https://www.cnblogs.com/rembau/p/5201001.html
### set log levels ###
log4j.rootLogger = debug, stdout, file1, file2
 
### 输出到控制台 ###
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern = [%p]%-d{yyyy-MM-dd HH:mm:ss} %t %l %m%n
 
### 输出到日志文件 ###
log4j.appender.file2 = org.apache.log4j.MyDailyRollingFileAppender
log4j.appender.file2.File = /tmp/test.log
log4j.appender.file2.Append = true
log4j.appender.file2.Threshold = DEBUG
log4j.appender.file2.MaxFileSize=72
log4j.appender.file2.DatePattern = '.'yyyy-MM-dd-HH
log4j.appender.file2.layout = org.apache.log4j.PatternLayout
log4j.appender.file2.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss} %p %t %l %m%n

logback配置(logback日志支持小时级、天级的日志文件切分和按策略自动删除,但是目前小时级的历史日志自动删除有问题,建议使用天级别的日志切分和删除历史日志文件)

代码语言:javascript
复制
logback.xml
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
    <property resource="log.properties" />
    <!-- console -->
    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <pattern>[%d{yyyy-MM-dd HH:mm:ss.SSS}][%level]%class %m%n</pattern>
        </encoder>
    </appender>
 
    <!-- info 只打印info日志-->
    <appender name="systemFile"
        class="ch.qos.logback.core.rolling.RollingFileAppender">
        <filter class="ch.qos.logback.classic.filter.LevelFilter"> 
            <level>INFO</level> 
            <onMatch>ACCEPT</onMatch> 
            <onMismatch>DENY</onMismatch> 
        </filter> 
 
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <fileNamePattern>${log.dir}/tagging-info.%d{yyyy-MM-dd}.log</fileNamePattern>
            <maxHistory>N</maxHistory>   <!-- 保存N+1天日志,大于N+1前的日志文件会被自动删除 -->
            <cleanHistoryOnStart>true</cleanHistoryOnStart>
        </rollingPolicy>
 
        <encoder>
            <pattern>[%d{yyyy-MM-dd HH:mm:ss.SSS}][%level]%class %m%n</pattern>
            <charset>UTF-8</charset>
        </encoder>
    </appender>
 
     
 
    <!-- error -->
    <appender name="errorFile" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <filter class="ch.qos.logback.classic.filter.LevelFilter"> 
            <level>ERROR</level> 
            <onMatch>ACCEPT</onMatch> 
            <onMismatch>DENY</onMismatch> 
        </filter> 
 
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <fileNamePattern>${log.dir}/tagging-error.%d{yyyy-MM-dd-HH}.log</fileNamePattern>
            <maxHistory>1</maxHistory>
            <cleanHistoryOnStart>true</cleanHistoryOnStart>
        </rollingPolicy>
 
        <encoder>
            <pattern>[%d{yyyy-MM-dd HH:mm:ss.SSS}][%level]%class %m%n</pattern>
            <charset>UTF-8</charset>
        </encoder>
    </appender>    
 
    <root level="INFO">
        <appender-ref ref="STDOUT" />
        <appender-ref ref="systemFile" />
        <appender-ref ref="errorFile" />
    </root>
</configuration>

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 数据接入准则:
  • 数据接入实现
    • log实时接入:
      • log天级接入:
        • mysql实时接入:
          • mysql天级接入:
            • logback配置(logback日志支持小时级、天级的日志文件切分和按策略自动删除,但是目前小时级的历史日志自动删除有问题,建议使用天级别的日志切分和删除历史日志文件)
        • 最佳实践
        相关产品与服务
        云数据库 SQL Server
        腾讯云数据库 SQL Server (TencentDB for SQL Server)是业界最常用的商用数据库之一,对基于 Windows 架构的应用程序具有完美的支持。TencentDB for SQL Server 拥有微软正版授权,可持续为用户提供最新的功能,避免未授权使用软件的风险。具有即开即用、稳定可靠、安全运行、弹性扩缩等特点。
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档