前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊PowerJob日志的上报及存储

聊聊PowerJob日志的上报及存储

原创
作者头像
code4it
发布2024-02-03 22:42:06
1700
发布2024-02-03 22:42:06
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下PowerJob的日志上报及存储

OmsLoggerFactory.build

tech/powerjob/worker/log/OmsLoggerFactory.java

代码语言:javascript
复制
public class OmsLoggerFactory {

    public static OmsLogger build(Long instanceId, String logConfig, WorkerRuntime workerRuntime) {
        LogConfig cfg;
        if (StringUtils.isEmpty(logConfig)) {
            cfg = new LogConfig();
        } else {
            try {
                cfg = JsonUtils.parseObject(logConfig, LogConfig.class);
            } catch (Exception ignore) {
                cfg = new LogConfig();
            }
        }

        switch (LogType.of(cfg.getType())) {
            case LOCAL:
                return new OmsLocalLogger(cfg);
            case STDOUT:
                return new OmsStdOutLogger(cfg);
            case NULL:
                return new OmsNullLogger();
            case LOCAL_AND_ONLINE:
                return new OmsServerAndLocalLogger(cfg, instanceId, workerRuntime.getOmsLogHandler());
            default:
                return new OmsServerLogger(cfg, instanceId, workerRuntime.getOmsLogHandler());
        }
    }
}

默认logConfig为null,cfg是new LogConfig(),其build出来的是OmsServerLogger

OmsServerLogger

tech/powerjob/worker/log/impl/OmsServerLogger.java

代码语言:javascript
复制
public class OmsServerLogger extends AbstractOmsLogger {

    private final long instanceId;
    private final OmsLogHandler omsLogHandler;

    public OmsServerLogger(LogConfig logConfig, long instanceId, OmsLogHandler omsLogHandler) {
        super(logConfig);
        this.instanceId = instanceId;
        this.omsLogHandler = omsLogHandler;
    }

    @Override
    public void debug0(String messagePattern, Object... args) {
        process(LogLevel.DEBUG, messagePattern, args);
    }

    @Override
    public void info0(String messagePattern, Object... args) {
        process(LogLevel.INFO, messagePattern, args);
    }

    @Override
    public void warn0(String messagePattern, Object... args) {
        process(LogLevel.WARN, messagePattern, args);
    }

    @Override
    public void error0(String messagePattern, Object... args) {
        process(LogLevel.ERROR, messagePattern, args);
    }

    private void process(LogLevel level, String messagePattern, Object... args) {
        String logContent = genLogContent(messagePattern, args);
        omsLogHandler.submitLog(instanceId, level, logContent);
    }

}

OmsServerLogger的process方法调用的是OmsLogHandler的submitLog方法

submitLog

tech/powerjob/worker/background/OmsLogHandler.java

代码语言:javascript
复制
@Slf4j
public class OmsLogHandler {

    private final String workerAddress;
    private final Transporter transporter;
    private final ServerDiscoveryService serverDiscoveryService;

    // 处理线程,需要通过线程池启动
    public final Runnable logSubmitter = new LogSubmitter();
    // 上报锁,只需要一个线程上报即可
    private final Lock reportLock = new ReentrantLock();
    // 生产者消费者模式,异步上传日志
    private final BlockingQueue<InstanceLogContent> logQueue = Queues.newLinkedBlockingQueue(10240);

    // 每次上报携带的数据条数
    private static final int BATCH_SIZE = 20;
    // 本地囤积阈值
    private static final int REPORT_SIZE = 1024;

    public OmsLogHandler(String workerAddress, Transporter transporter, ServerDiscoveryService serverDiscoveryService) {
        this.workerAddress = workerAddress;
        this.transporter = transporter;
        this.serverDiscoveryService = serverDiscoveryService;
    }

    /**
     * 提交日志
     * @param instanceId 任务实例ID
     * @param logContent 日志内容
     */
    public void submitLog(long instanceId, LogLevel logLevel, String logContent) {

        if (logQueue.size() > REPORT_SIZE) {
            // 线程的生命周期是个不可循环的过程,一个线程对象结束了不能再次start,只能一直创建和销毁
            new Thread(logSubmitter).start();
        }

        InstanceLogContent tuple = new InstanceLogContent(instanceId, System.currentTimeMillis(), logLevel.getV(), logContent);
        boolean offerRet = logQueue.offer(tuple);
        if (!offerRet) {
            log.warn("[OmsLogHandler] [{}] submit log failed, maybe your log speed is too fast!", instanceId);
        }
    }

    //......
}    

OmsLogHandler的submitLog方法每次先判断logQueue大小是否大于REPORT_SIZE(1024),是则启动logSubmitter线程,否则放入logQueue队列

LogSubmitter

tech/powerjob/worker/background/OmsLogHandler.java

代码语言:javascript
复制
    private class LogSubmitter implements Runnable {

        @Override
        public void run() {

            boolean lockResult = reportLock.tryLock();
            if (!lockResult) {
                return;
            }

            try {

                final String currentServerAddress = serverDiscoveryService.getCurrentServerAddress();
                // 当前无可用 Server
                if (StringUtils.isEmpty(currentServerAddress)) {
                    if (!logQueue.isEmpty()) {
                        logQueue.clear();
                        log.warn("[OmsLogHandler] because there is no available server to report logs which leads to queue accumulation, oms discarded all logs.");
                    }
                    return;
                }

                List<InstanceLogContent> logs = Lists.newLinkedList();

                while (!logQueue.isEmpty()) {
                    try {
                        InstanceLogContent logContent = logQueue.poll(100, TimeUnit.MILLISECONDS);
                        logs.add(logContent);

                        if (logs.size() >= BATCH_SIZE) {
                            WorkerLogReportReq req = new WorkerLogReportReq(workerAddress, Lists.newLinkedList(logs));
                            // 不可靠请求,WEB日志不追求极致
                            TransportUtils.reportLogs(req, currentServerAddress, transporter);
                            logs.clear();
                        }

                    }catch (Exception ignore) {
                        break;
                    }
                }

                if (!logs.isEmpty()) {
                    WorkerLogReportReq req = new WorkerLogReportReq(workerAddress, logs);
                    TransportUtils.reportLogs(req, currentServerAddress, transporter);
                }

            }finally {
                reportLock.unlock();
            }
        }
    }

LogSubmitter不断地从logQueue.poll数据,在logs的大小大于等于BATCH_SIZE(20)时通过TransportUtils.reportLogs给server上报日志

reportLogs

tech/powerjob/worker/common/utils/TransportUtils.java

代码语言:javascript
复制
    public static void reportLogs(WorkerLogReportReq req, String address, Transporter transporter) {
        final URL url = easyBuildUrl(ServerType.SERVER, S4W_PATH, S4W_HANDLER_REPORT_LOG, address);
        transporter.tell(url, req);
    }

reportLogs请求S4W_HANDLER_REPORT_LOG

processWorkerLogReport

tech/powerjob/server/core/handler/AbWorkerRequestHandler.java

代码语言:javascript
复制
    @Override
    @Handler(path = S4W_HANDLER_REPORT_LOG, processType = ProcessType.NO_BLOCKING)
    public void processWorkerLogReport(WorkerLogReportReq req) {

        WorkerLogReportEvent event = new WorkerLogReportEvent()
                .setWorkerAddress(req.getWorkerAddress())
                .setLogNum(req.getInstanceLogContents().size());
        try {
            processWorkerLogReport0(req, event);
            event.setStatus(WorkerLogReportEvent.Status.SUCCESS);
        } catch (RejectedExecutionException re) {
            event.setStatus(WorkerLogReportEvent.Status.REJECTED);
        } catch (Throwable t) {
            event.setStatus(WorkerLogReportEvent.Status.EXCEPTION);
            log.warn("[WorkerRequestHandler] process worker report failed!", t);
        } finally {
            monitorService.monitor(event);
        }
    }

server端的processWorkerLogReport接收WorkerLogReportReq,执行processWorkerLogReport0方法

WorkerRequestHandlerImpl

tech/powerjob/server/core/handler/WorkerRequestHandlerImpl.java

代码语言:javascript
复制
    protected void processWorkerLogReport0(WorkerLogReportReq req, WorkerLogReportEvent event) {
        // 这个效率应该不会拉垮吧...也就是一些判断 + Map#get 吧...
        instanceLogService.submitLogs(req.getWorkerAddress(), req.getInstanceLogContents());
    }

WorkerRequestHandlerImpl的processWorkerLogReport0执行的是instanceLogService.submitLogs

submitLogs

tech/powerjob/server/core/instance/InstanceLogService.java

代码语言:javascript
复制
    @Async(value = PJThreadPool.LOCAL_DB_POOL)
    public void submitLogs(String workerAddress, List<InstanceLogContent> logs) {

        List<LocalInstanceLogDO> logList = logs.stream().map(x -> {
            instanceId2LastReportTime.put(x.getInstanceId(), System.currentTimeMillis());

            LocalInstanceLogDO y = new LocalInstanceLogDO();
            BeanUtils.copyProperties(x, y);
            y.setWorkerAddress(workerAddress);
            return y;
        }).collect(Collectors.toList());

        try {
            CommonUtils.executeWithRetry0(() -> localInstanceLogRepository.saveAll(logList));
        }catch (Exception e) {
            log.warn("[InstanceLogService] persistent instance logs failed, these logs will be dropped: {}.", logs, e);
        }
    }

InstanceLogService的submitLogs是个异步方法,它将InstanceLogContent转换为LocalInstanceLogDO,然后执行localInstanceLogRepository.saveAll保存

LocalJpaConfig

tech/powerjob/server/persistence/config/LocalJpaConfig.java

代码语言:javascript
复制
@Configuration
@EnableTransactionManagement
@EnableJpaRepositories(
        // repository包名
        basePackages = LocalJpaConfig.LOCAL_PACKAGES,
        // 实体管理bean名称
        entityManagerFactoryRef = "localEntityManagerFactory",
        // 事务管理bean名称
        transactionManagerRef = "localTransactionManager"
)
public class LocalJpaConfig {

    public static final String LOCAL_PACKAGES = "tech.powerjob.server.persistence.local";

    private static Map<String, Object> genDatasourceProperties() {

        JpaProperties jpaProperties = new JpaProperties();
        jpaProperties.setOpenInView(false);
        jpaProperties.setShowSql(false);

        HibernateProperties hibernateProperties = new HibernateProperties();
        // 每次启动都删除数据(重启后原来的Instance已经通过故障转移更换了Server,老的日志数据也没什么意义了)
        hibernateProperties.setDdlAuto("create");
        return hibernateProperties.determineHibernateProperties(jpaProperties.getProperties(), new HibernateSettings());
    }

    @Bean(name = "localEntityManagerFactory")
    public LocalContainerEntityManagerFactoryBean initLocalEntityManagerFactory(@Qualifier("omsLocalDatasource") DataSource omsLocalDatasource,EntityManagerFactoryBuilder builder) {
        return builder
                .dataSource(omsLocalDatasource)
                .properties(genDatasourceProperties())
                .packages(LOCAL_PACKAGES)
                .persistenceUnit("localPersistenceUnit")
                .build();
    }

    @Bean(name = "localTransactionManager")
    public PlatformTransactionManager initLocalTransactionManager(@Qualifier("localEntityManagerFactory") LocalContainerEntityManagerFactoryBean localContainerEntityManagerFactoryBean) {
        return new JpaTransactionManager(Objects.requireNonNull(localContainerEntityManagerFactoryBean.getObject()));
    }

    @Bean(name = "localTransactionTemplate")
    public TransactionTemplate initTransactionTemplate(@Qualifier("localTransactionManager") PlatformTransactionManager ptm) {
        TransactionTemplate tt =  new TransactionTemplate(ptm);
        // 设置隔离级别
        tt.setIsolationLevel(TransactionDefinition.ISOLATION_DEFAULT);
        return tt;
    }
}

LocalJpaConfig针对tech.powerjob.server.persistence.local的dao采用了omsLocalDatasource数据源

MultiDatasourceConfig

tech/powerjob/server/persistence/config/MultiDatasourceConfig.java

代码语言:javascript
复制
@Configuration
public class MultiDatasourceConfig {

    private static final String H2_DRIVER_CLASS_NAME = "org.h2.Driver";
    private static final String H2_JDBC_URL_PATTERN = "jdbc:h2:file:%spowerjob_server_db";
    private static final int H2_MIN_SIZE = 4;
    private static final int H2_MAX_ACTIVE_SIZE = 10;

    @Primary
    @Bean("omsRemoteDatasource")
    @ConfigurationProperties(prefix = "spring.datasource.core")
    public DataSource initOmsCoreDatasource() {
        return DataSourceBuilder.create().build();
    }

    @Bean("omsLocalDatasource")
    public DataSource initOmsLocalDatasource() {
        String h2Path = OmsFileUtils.genH2WorkPath();
        HikariConfig config = new HikariConfig();
        config.setDriverClassName(H2_DRIVER_CLASS_NAME);
        config.setJdbcUrl(String.format(H2_JDBC_URL_PATTERN, h2Path));
        config.setAutoCommit(true);
        // 池中最小空闲连接数量
        config.setMinimumIdle(H2_MIN_SIZE);
        // 池中最大连接数量
        config.setMaximumPoolSize(H2_MAX_ACTIVE_SIZE);

        // JVM 关闭时删除文件
        try {
            FileUtils.forceDeleteOnExit(new File(h2Path));
        }catch (Exception ignore) {
        }
        return new HikariDataSource(config);
    }
}

MultiDatasourceConfig定义了两个数据源,一个是远程的数据源,比如mysql,一个是本地的h2数据源

processFinishedInstance

tech/powerjob/server/core/instance/InstanceManager.java

代码语言:javascript
复制
    public void processFinishedInstance(Long instanceId, Long wfInstanceId, InstanceStatus status, String result) {

        log.info("[Instance-{}] process finished, final status is {}.", instanceId, status.name());

        // 上报日志数据
        HashedWheelTimerHolder.INACCURATE_TIMER.schedule(() -> instanceLogService.sync(instanceId), 60, TimeUnit.SECONDS);

        // workflow 特殊处理
        if (wfInstanceId != null) {
            // 手动停止在工作流中也认为是失败(理论上不应该发生)
            workflowInstanceManager.move(wfInstanceId, instanceId, status, result);
        }

        // 告警
        if (status == InstanceStatus.FAILED) {
            alert(instanceId, result);
        }
        // 主动移除缓存,减小内存占用
        instanceMetadataService.invalidateJobInfo(instanceId);
    }

InstanceManager的processFinishedInstance方法会延时60s执行instanceLogService.sync(instanceId)

sync

tech/powerjob/server/core/instance/InstanceLogService.java

代码语言:javascript
复制
    @Async(PJThreadPool.BACKGROUND_POOL)
    public void sync(Long instanceId) {

        Stopwatch sw = Stopwatch.createStarted();
        try {
            // 先持久化到本地文件
            File stableLogFile = genStableLogFile(instanceId);
            // 将文件推送到 MongoDB

            FileLocation dfsFL = new FileLocation().setBucket(Constants.LOG_BUCKET).setName(genMongoFileName(instanceId));

            try {
                dFsService.store(new StoreRequest().setLocalFile(stableLogFile).setFileLocation(dfsFL));
                log.info("[InstanceLog-{}] push local instanceLogs to mongoDB succeed, using: {}.", instanceId, sw.stop());
            }catch (Exception e) {
                log.warn("[InstanceLog-{}] push local instanceLogs to mongoDB failed.", instanceId, e);
            }

        }catch (Exception e) {
            log.warn("[InstanceLog-{}] sync local instanceLogs failed.", instanceId, e);
        }
        // 删除本地数据库数据
        try {
            instanceId2LastReportTime.remove(instanceId);
            CommonUtils.executeWithRetry0(() -> localInstanceLogRepository.deleteByInstanceId(instanceId));
            log.info("[InstanceLog-{}] delete local instanceLog successfully.", instanceId);
        }catch (Exception e) {
            log.warn("[InstanceLog-{}] delete local instanceLog failed.", instanceId, e);
        }
    }

InstanceLogService的sync方法先通过genStableLogFile将日志持久化到server端的本地日志文件,接着将该任务实例日志的元信息(哪个任务实例、在哪个server、本地日志文件的路径)存储到dFsService(它有oss、gridfs、minio、mysql四种实现,具体看server的配置文件是启动哪个,如果是mysql则是存储到powerjob_files表中),最后通过localInstanceLogRepository.deleteByInstanceId清空该任务实例在h2中的LOCAL_INSTANCE_LOG表的记录

genStableLogFile

代码语言:javascript
复制
    private File genStableLogFile(long instanceId) {
        String path = genLogFilePath(instanceId, true);
        int lockId = ("stFileLock-" + instanceId).hashCode();
        try {
            segmentLock.lockInterruptibleSafe(lockId);

            return localTransactionTemplate.execute(status -> {

                File f = new File(path);
                if (f.exists()) {
                    return f;
                }

                try {
                    // 创建父文件夹(文件在开流时自动会被创建)
                    FileUtils.forceMkdirParent(f);

                    // 本地存在数据,从本地持久化(对应 SYNC 的情况)
                    if (instanceId2LastReportTime.containsKey(instanceId)) {
                        try (Stream<LocalInstanceLogDO> allLogStream = localInstanceLogRepository.findByInstanceIdOrderByLogTime(instanceId)) {
                            stream2File(allLogStream, f);
                        }
                    }else {

                        FileLocation dfl = new FileLocation().setBucket(Constants.LOG_BUCKET).setName(genMongoFileName(instanceId));
                        Optional<FileMeta> dflMetaOpt = dFsService.fetchFileMeta(dfl);
                        if (!dflMetaOpt.isPresent()) {
                            OmsFileUtils.string2File("SYSTEM: There is no online log for this job instance.", f);
                            return f;
                        }

                        dFsService.download(new DownloadRequest().setTarget(f).setFileLocation(dfl));
                    }
                    return f;
                }catch (Exception e) {
                    CommonUtils.executeIgnoreException(() -> FileUtils.forceDelete(f));
                    throw new RuntimeException(e);
                }
            });
        }finally {
            segmentLock.unlock(lockId);
        }
    }

    private static String genLogFilePath(long instanceId, boolean stable) {
        if (stable) {
            return OmsFileUtils.genLogDirPath() + String.format("%d-stable.log", instanceId);
        }else {
            return OmsFileUtils.genLogDirPath() + String.format("%d-temporary.log", instanceId);
        }
    }    

genStableLogFile它先判断该server是否有存储该任务实例的日志文件(~/powerjob/server/online_log/%d-stable.log),有则直接返回;否则判断该instanceId2LastReportTime是否包含该任务实例,包含则从localInstanceLogRepository拉取日志然后写入到文件;不包含则通过dFsService.fetchFileMeta拉取元信息,然后下载到本地再返回

相关表结构

LOCAL_INSTANCE_LOG

代码语言:javascript
复制
CREATE TABLE PUBLIC.LOCAL_INSTANCE_LOG (
	ID BIGINT NOT NULL AUTO_INCREMENT,
	INSTANCE_ID BIGINT,
	LOG_CONTENT CHARACTER VARYING,
	LOG_LEVEL INTEGER,
	LOG_TIME BIGINT,
	WORKER_ADDRESS CHARACTER VARYING(255),
	CONSTRAINT CONSTRAINT_8 PRIMARY KEY (ID)
);
CREATE INDEX IDXPJ6CD8W5EAW8QBKMD84I8KYS7 ON PUBLIC.LOCAL_INSTANCE_LOG (INSTANCE_ID);
CREATE UNIQUE INDEX PRIMARY_KEY_8 ON PUBLIC.LOCAL_INSTANCE_LOG (ID);

powerjob_files

代码语言:javascript
复制
CREATE TABLE `powerjob_files` (
  `id` bigint NOT NULL AUTO_INCREMENT COMMENT 'ID',
  `bucket` varchar(255) COLLATE utf8mb4_general_ci NOT NULL COMMENT '分桶',
  `name` varchar(255) COLLATE utf8mb4_general_ci NOT NULL COMMENT '文件名称',
  `version` varchar(255) COLLATE utf8mb4_general_ci NOT NULL COMMENT '版本',
  `meta` varchar(255) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '元数据',
  `length` bigint NOT NULL COMMENT '长度',
  `status` int NOT NULL COMMENT '状态',
  `data` longblob NOT NULL COMMENT '文件内容',
  `extra` varchar(255) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '其他信息',
  `gmt_create` datetime NOT NULL COMMENT '创建时间',
  `gmt_modified` datetime DEFAULT NULL COMMENT '更新时间',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;

小结

  • PowerJob的worker端的OmsServerLogger的process方法调用的是OmsLogHandler的submitLog方法,它每次先判断logQueue大小是否大于REPORT_SIZE(1024),是则启动logSubmitter线程,否则放入logQueue队列LogSubmitter不断地从logQueue.poll数据,在logs的大小大于等于BATCH_SIZE(20)时通过TransportUtils.reportLogs给server上报日志
  • server端的AbWorkerRequestHandler的processWorkerLogReport接收WorkerLogReportReq,执行processWorkerLogReport0方法,它执行的是instanceLogService.submitLogs;InstanceLogService的submitLogs是个异步方法,它将InstanceLogContent转换为LocalInstanceLogDO,然后执行localInstanceLogRepository.saveAll保存;server端有两份数据源,一份是mysql,一份是h2,而localInstanceLog存储到的是h2的LOCAL_INSTANCE_LOG表
  • 另外server端在任务实例结束时会执行InstanceManager的processFinishedInstance方法,它会延时60s执行instanceLogService.sync(instanceId);sync方法先通过genStableLogFile将日志持久化到server端的本地日志文件,接着将该任务实例日志的元信息(哪个任务实例、在哪个server、本地日志文件的路径)存储到dFsService(它有oss、gridfs、minio、mysql四种实现,具体看server的配置文件是启动哪个,如果是mysql则是存储到powerjob_files表中),最后通过localInstanceLogRepository.deleteByInstanceId清空该任务实例在h2中的LOCAL_INSTANCE_LOG表的记录

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • OmsLoggerFactory.build
    • OmsServerLogger
      • submitLog
        • LogSubmitter
          • reportLogs
          • processWorkerLogReport
            • WorkerRequestHandlerImpl
              • submitLogs
                • LocalJpaConfig
                  • MultiDatasourceConfig
                  • processFinishedInstance
                    • sync
                      • genStableLogFile
                      • 相关表结构
                        • LOCAL_INSTANCE_LOG
                          • powerjob_files
                          • 小结
                          相关产品与服务
                          云数据库 MongoDB
                          腾讯云数据库 MongoDB(TencentDB for MongoDB)是腾讯云基于全球广受欢迎的 MongoDB 打造的高性能 NoSQL 数据库,100%完全兼容 MongoDB 协议,支持跨文档事务,提供稳定丰富的监控管理,弹性可扩展、自动容灾,适用于文档型数据库场景,您无需自建灾备体系及控制管理系统。
                          领券
                          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档