Leaf——美团点评分布式ID生成系统 https://github.com/Meituan-Dianping/Leaf
public static class UpdateThreadFactory implements ThreadFactory {
private static int threadInitNumber = 0;
private static synchronized int nextThreadNum() {
return threadInitNumber++;
}
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "Thread-Segment-Update-" + nextThreadNum());
}
}
if (!buffer.isInitOk()) {
synchronized (buffer) {
if (!buffer.isInitOk()) {
try {
updateSegmentFromDb(key, buffer.getCurrent());
logger.info("Init buffer. Update leafkey {} {} from db", key, buffer.getCurrent());
buffer.setInitOk(true);
} catch (Exception e) {
logger.warn("Init buffer {} exception", buffer.getCurrent(), e);
}
}
}
}
long duration = System.currentTimeMillis() - buffer.getUpdateTimestamp();
int nextStep = buffer.getStep();
// 15分钟内更新号段,就放大步长
if (duration < SEGMENT_DURATION) {
if (nextStep * 2 > MAX_STEP) {
// do nothing
} else {
nextStep = nextStep * 2;
}
} else if (duration < SEGMENT_DURATION * 2) {
// do nothing with nextStep
} else {
// 30分钟以上才更新号段,就缩小步长,避免停机造成大量号段浪费
// 最小步长是数据库里配置的
nextStep = nextStep / 2 >= buffer.getMinStep() ? nextStep / 2 : nextStep;
}
buffer.rLock().lock();
try {
final Segment segment = buffer.getCurrent();
if (!buffer.isNextReady() && (segment.getIdle() < 0.9 * segment.getStep()) && buffer.getThreadRunning().compareAndSet(false, true)) {
service.execute(new Runnable() {
buffer.wLock().lock();
buffer.setNextReady(true);
buffer.getThreadRunning().set(false);
buffer.wLock().unlock();
buffer.wLock().lock();
try {
final Segment segment = buffer.getCurrent();
long value = segment.getValue().getAndIncrement();
if (value < segment.getMax()) {
return new Result(value, Status.SUCCESS);
}
private void waitAndSleep(SegmentBuffer buffer) {
int roll = 0;
while (buffer.getThreadRunning().get()) {
roll += 1;
if (roll > 10000) {
try {
TimeUnit.MILLISECONDS.sleep(10);
break;
} catch (InterruptedException e) {
logger.warn("Thread {} Interrupted", Thread.currentThread().getName());
break;
}
}
}
}
private final long workerIdBits = 10L;
private final long maxWorkerId = ~(-1L << workerIdBits);//最大能够分配的workerid =1023
使用位运算提高效率
使用负数 + 非运算~
代替减 1 操作
public int nextPos() {
return (currentPos + 1) % 2;
}
public int nextPos() {
return (currentPos + 1) & 1;
}
public class ZeroIDGen implements IDGen {
@Override
public Result get(String key) {
return new Result(0, Status.SUCCESS);
}
@Override
public boolean init() {
return true;
}
}
public SnowflakeService() throws InitException {
Properties properties = PropertyFactory.getProperties();
boolean flag = Boolean.parseBoolean(properties.getProperty(Constants.LEAF_SNOWFLAKE_ENABLE, "true"));
if (flag) {
// init
} else {
idGen = new ZeroIDGen();
logger.info("Zero ID Gen Service Init Successfully");
}
}
private void ScheduledUploadData(final CuratorFramework curator, final String zk_AddressNode) {
Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "schedule-upload-time");
thread.setDaemon(true);
return thread;
}
}).scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
updateNewData(curator, zk_AddressNode);
}
}, 1L, 3L, TimeUnit.SECONDS);//每3s上报数据
}
private static final String PROP_PATH = System.getProperty("java.io.tmpdir") + File.separator + PropertyFactory.getProperties().getProperty("leaf.name") + "/leafconf/{port}/workerID.properties";
public boolean init() {
try {
// 从zk获取workId
} catch (Exception e) {
LOGGER.error("Start node ERROR {}", e);
try {
Properties properties = new Properties();
properties.load(new FileInputStream(new File(PROP_PATH.replace("{port}", port + ""))));
workerID = Integer.valueOf(properties.getProperty("workerID"));
LOGGER.warn("START FAILED ,use local node file properties workerID-{}", workerID);
} catch (Exception e1) {
LOGGER.error("Read file error ", e1);
return false;
}
}
return true;
}