前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊PowerJob的IdGenerateService

聊聊PowerJob的IdGenerateService

作者头像
code4it
发布2024-01-19 16:37:15
990
发布2024-01-19 16:37:15
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下PowerJob的IdGenerateService

IdGenerateService

tech/powerjob/server/core/uid/IdGenerateService.java

代码语言:javascript
复制
@Slf4j
@Service
public class IdGenerateService {

    private final SnowFlakeIdGenerator snowFlakeIdGenerator;

    private static final int DATA_CENTER_ID = 0;

    public IdGenerateService(ServerInfoService serverInfoService) {
        long id = serverInfoService.fetchServiceInfo().getId();
        snowFlakeIdGenerator = new SnowFlakeIdGenerator(DATA_CENTER_ID, id);
        log.info("[IdGenerateService] initialize IdGenerateService successfully, ID:{}", id);
    }

    /**
     * 分配分布式唯一ID
     * @return 分布式唯一ID
     */
    public long allocate() {
        return snowFlakeIdGenerator.nextId();
    }

}

IdGenerateService的构造器接收ServerInfoService,然后通过serverInfoService.fetchServiceInfo().getId()获取machineId,最后创建SnowFlakeIdGenerator,其DATA_CENTER_ID为0;其allocate返回的是snowFlakeIdGenerator.nextId()

ServerInfoService

tech/powerjob/server/remote/server/self/ServerInfoService.java

代码语言:javascript
复制
public interface ServerInfoService {

    /**
     * fetch current server info
     * @return ServerInfo
     */
    ServerInfo fetchServiceInfo();

}

ServerInfoService定义了fetchServiceInfo方法,返回ServerInfo

ServerInfoServiceImpl

tech/powerjob/server/remote/server/self/ServerInfoServiceImpl.java

代码语言:javascript
复制
@Slf4j
@Service
public class ServerInfoServiceImpl implements ServerInfoService {

    private final ServerInfo serverInfo;

    private final ServerInfoRepository serverInfoRepository;

    private static final long MAX_SERVER_CLUSTER_SIZE = 10000;

    private static final String SERVER_INIT_LOCK = "server_init_lock";
    private static final int SERVER_INIT_LOCK_MAX_TIME = 15000;


    @Autowired
    public ServerInfoServiceImpl(LockService lockService, ServerInfoRepository serverInfoRepository) {

        this.serverInfo = new ServerInfo();

        String ip = NetUtils.getLocalHost();
        serverInfo.setIp(ip);
        serverInfo.setBornTime(System.currentTimeMillis());
        this.serverInfoRepository = serverInfoRepository;

        Stopwatch sw = Stopwatch.createStarted();

        while (!lockService.tryLock(SERVER_INIT_LOCK, SERVER_INIT_LOCK_MAX_TIME)) {
            log.info("[ServerInfoService] waiting for lock: {}", SERVER_INIT_LOCK);
            CommonUtils.easySleep(100);
        }

        try {

            // register server then get server_id
            ServerInfoDO server = serverInfoRepository.findByIp(ip);
            if (server == null) {
                ServerInfoDO newServerInfo = new ServerInfoDO(ip);
                server = serverInfoRepository.saveAndFlush(newServerInfo);
            } else {
                serverInfoRepository.updateGmtModifiedByIp(ip, new Date());
            }

            if (server.getId() < MAX_SERVER_CLUSTER_SIZE) {
                serverInfo.setId(server.getId());
            } else {
                long retryServerId = retryServerId();
                serverInfo.setId(retryServerId);
                serverInfoRepository.updateIdByIp(retryServerId, ip);
            }

        } catch (Exception e) {
            log.error("[ServerInfoService] init server failed", e);
            throw e;
        } finally {
            lockService.unlock(SERVER_INIT_LOCK);
        }

        log.info("[ServerInfoService] ip:{}, id:{}, cost:{}", ip, serverInfo.getId(), sw);
    }

    @Scheduled(fixedRate = 15000, initialDelay = 15000)
    public void heartbeat() {
        serverInfoRepository.updateGmtModifiedByIp(serverInfo.getIp(), new Date());
    }


    private long retryServerId() {

        List<ServerInfoDO> serverInfoList = serverInfoRepository.findAll();

        log.info("[ServerInfoService] current server record num in database: {}", serverInfoList.size());

        // clean inactive server record first
        if (serverInfoList.size() > MAX_SERVER_CLUSTER_SIZE) {

            // use a large time interval to prevent valid records from being deleted when the local time is inaccurate
            Date oneDayAgo = DateUtils.addDays(new Date(), -1);
            int delNum =serverInfoRepository.deleteByGmtModifiedBefore(oneDayAgo);
            log.warn("[ServerInfoService] delete invalid {} server info record before {}", delNum, oneDayAgo);

            serverInfoList = serverInfoRepository.findAll();
        }

        if (serverInfoList.size() > MAX_SERVER_CLUSTER_SIZE) {
            throw new PowerJobException(String.format("The powerjob-server cluster cannot accommodate %d machines, please rebuild another cluster", serverInfoList.size()));
        }

        Set<Long> uedServerIds = serverInfoList.stream().map(ServerInfoDO::getId).collect(Collectors.toSet());
        for (long i = 1; i <= MAX_SERVER_CLUSTER_SIZE; i++) {
            if (uedServerIds.contains(i)) {
                continue;
            }

            log.info("[ServerInfoService] ID[{}] is not used yet, try as new server id", i);
            return i;
        }
        throw new PowerJobException("impossible");
    }

    @Autowired(required = false)
    public void setBuildProperties(BuildProperties buildProperties) {
        if (buildProperties == null) {
            return;
        }
        String pomVersion = buildProperties.getVersion();
        if (StringUtils.isNotBlank(pomVersion)) {
            serverInfo.setVersion(pomVersion);
        }
    }

    @Override
    public ServerInfo fetchServiceInfo() {
        return serverInfo;
    }
}

ServerInfoServiceImpl实现了ServerInfoService接口,其构造器注入lockService和serverInfoRepository,先通过lockService.tryLock抢到server_init_lock,然后serverInfoRepository.findByIp找到ServerInfoDO执行saveAndFlush或者updateGmtModifiedByIp;其fetchServiceInfo返回的是serverInfo信息;它还以fixedRate为15s调度了heartbeat,主要是更新gmtModifed

SnowFlakeIdGenerator

tech/powerjob/server/core/uid/SnowFlakeIdGenerator.java

代码语言:javascript
复制
public class SnowFlakeIdGenerator {
    /**
     * 起始的时间戳(a special day for me)
     */
    private final static long START_STAMP = 1555776000000L;
    /**
     * 序列号占用的位数
     */
    private final static long SEQUENCE_BIT = 6;
    /**
     * 机器标识占用的位数
     */
    private final static long MACHINE_BIT = 14;
    /**
     * 数据中心占用的位数
     */
    private final static long DATA_CENTER_BIT = 2;
    /**
     * 每一部分的最大值
     */
    private final static long MAX_DATA_CENTER_NUM = ~(-1L << DATA_CENTER_BIT);
    private final static long MAX_MACHINE_NUM = ~(-1L << MACHINE_BIT);
    private final static long MAX_SEQUENCE = ~(-1L << SEQUENCE_BIT);
    /**
     * 每一部分向左的位移
     */
    private final static long MACHINE_LEFT = SEQUENCE_BIT;
    private final static long DATA_CENTER_LEFT = SEQUENCE_BIT + MACHINE_BIT;
    private final static long TIMESTAMP_LEFT = DATA_CENTER_LEFT + DATA_CENTER_BIT;
    /**
     * 数据中心
     */
    private final long dataCenterId;
    /**
     * 机器标识
     */
    private final long machineId;
    /**
     * 序列号
     */
    private long sequence = 0L;
    /**
     * 上一次时间戳
     */
    private long lastTimestamp = -1L;

    public SnowFlakeIdGenerator(long dataCenterId, long machineId) {
        if (dataCenterId > MAX_DATA_CENTER_NUM || dataCenterId < 0) {
            throw new IllegalArgumentException("dataCenterId can't be greater than MAX_DATA_CENTER_NUM or less than 0");
        }
        if (machineId > MAX_MACHINE_NUM || machineId < 0) {
            throw new IllegalArgumentException("machineId can't be greater than MAX_MACHINE_NUM or less than 0");
        }
        this.dataCenterId = dataCenterId;
        this.machineId = machineId;
    }

    /**
     * 产生下一个ID
     */
    public synchronized long nextId() {
        long currStamp = getNewStamp();
        if (currStamp < lastTimestamp) {
            return futureId();
        }

        if (currStamp == lastTimestamp) {
            //相同毫秒内,序列号自增
            sequence = (sequence + 1) & MAX_SEQUENCE;
            //同一毫秒的序列数已经达到最大
            if (sequence == 0L) {
                currStamp = getNextMill();
            }
        } else {
            //不同毫秒内,序列号置为0
            sequence = 0L;
        }

        lastTimestamp = currStamp;

        return (currStamp - START_STAMP) << TIMESTAMP_LEFT //时间戳部分
                | dataCenterId << DATA_CENTER_LEFT       //数据中心部分
                | machineId << MACHINE_LEFT             //机器标识部分
                | sequence;                             //序列号部分
    }

    /**
     * 发生时钟回拨时借用未来时间生成Id,避免运行过程中任务调度和工作流直接进入不可用状态
     * 注:该方式不可解决原算法中停服状态下时钟回拨导致的重复id问题
     */
    private long futureId() {
        sequence = (sequence + 1) & MAX_SEQUENCE;
        if (sequence == 0L) {
            lastTimestamp = lastTimestamp + 1;
        }

        return (lastTimestamp - START_STAMP) << TIMESTAMP_LEFT //时间戳部分
                | dataCenterId << DATA_CENTER_LEFT       //数据中心部分
                | machineId << MACHINE_LEFT             //机器标识部分
                | sequence;                             //序列号部分
    }

    private long getNextMill() {
        long mill = getNewStamp();
        while (mill <= lastTimestamp) {
            mill = getNewStamp();
        }
        return mill;
    }

    private long getNewStamp() {
        return System.currentTimeMillis();
    }
}

SnowFlakeIdGenerator的dataCenterId(最大值为3)和machineId(最大值为16383),sequence最大值为63。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2024-01-10,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 本文主要研究一下PowerJob的IdGenerateService
  • IdGenerateService
  • ServerInfoService
    • ServerInfoServiceImpl
    • SnowFlakeIdGenerator
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档