下图来自pmq的官方文档
如果提交线程数大小不等于在soa配置中获取的提交线程大小,则修改成配置中的提交线程数大小,
同时设置线程池的核心线程池大小,执行提交偏移量操作(重点),提交偏移的过程也会涉及到元数据更新操作
提交的过程中,通过随机的方式来避免数据库的洪峰压力
提交提交分为单条和批量,而批量采用同步并发容器CountDownLatch提交
此时会进行查询,获取标识flag,查询提交偏移量如果>0,则执行后续操作,此时必须满足有偏移的消息和偏移信息
里面有两个概念:偏移量版本和偏移量的概念,偏移量我们好理解,而偏移量版本是针对每次修改版本都会+1
在偏移量或者版本号修改的过程中,此时的元数据必然需要进行改变,此时会调用到更新元数据的操作
此时元数据更新:
/**
* 如果提交线程数大小不等于在soa配置中获取的提交线程大小,则修改成配置中的提交线程数大小,
* 同时设置线程池的核心线程池大小,执行提交偏移量操作(重点)
*/
@Override
public void startBroker() {
commitThreadSize = soaConfig.getCommitThreadSize();
executorRun = new ThreadPoolExecutor(commitThreadSize + 1, commitThreadSize + 1, 10L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(50), SoaThreadFactory.create("commit-run", Thread.MAX_PRIORITY - 1, true),
new ThreadPoolExecutor.CallerRunsPolicy());
soaConfig.registerChanged(new Runnable() {
@Override
public void run() {
if (commitThreadSize != soaConfig.getCommitThreadSize()) {
commitThreadSize = soaConfig.getCommitThreadSize();
executorRun.setCorePoolSize(commitThreadSize + 1);
executorRun.setMaximumPoolSize(commitThreadSize + 1);
}
}
});
executorRun.execute(() -> {
//提交消息偏移量
commitOffset();
});
}
为了缓解提交过快,在kafka中可以设置一扰动值,而为了避免压力会执行sleep操作
/**
* 提交偏移量
*/
protected void commitOffset() {
log.info("doSubmitOffset");
while (isRunning) {
//执行提交
doCommit();
// 通过随机的方式来避免数据库的洪峰压力
Util.sleep(soaConfig.getCommitSleepTime());
}
}
执行提交:提交消息分为单条和批量,而单条则直接执行,而批量则采用同步并发容器CountDownLatch提交。
此时会涉及到单条和批量提交
//执行提交操作
protected void doCommit() {
//获取偏移量版本
Map<Long, OffsetVersionEntity> offsetVersionMap = queueOffsetService.getOffsetVersion();
Map<Long, ConsumerQueueVersionDto> map = new HashMap<>(mapAppPolling.get());
if (map.size() > 0) {
final int size = map.size();
Transaction transaction = Tracer.newTransaction("Timer-service", "commit");
try {
int countSize = map.size() < commitThreadSize ? map.size() : commitThreadSize;
//单条消息
if (countSize == 1) {
for (Map.Entry<Long, ConsumerQueueVersionDto> entry : map.entrySet()) {
doCommitOffset(entry.getValue(), 0, offsetVersionMap, size);
}
} else {
//多条消息,同步并发容器CountDownLatch提交
CountDownLatch countDownLatch = new CountDownLatch(countSize);
for (Map.Entry<Long, ConsumerQueueVersionDto> entry : map.entrySet()) {
executorRun.execute(new Runnable() {
@Override
public void run() {
//提交消息
doCommitOffset(entry.getValue(), 0, offsetVersionMap, size);
countDownLatch.countDown();
}
});
}
countDownLatch.await();
}
transaction.setStatus(Transaction.SUCCESS);
} catch (Exception e) {
transaction.setStatus(e);
} finally {
transaction.complete();
}
}
}
同时:此时会进行查询,获取标识flag,查询提交偏移量如果>0,则执行后续操作,此时必须满足有偏移的消息和偏移信息,这里我觉得这里的bean操作可以使用bean拷贝,使用cglib的bean拷贝工具可以保证代码的简洁性。
//提交偏移量
protected boolean doCommitOffset(ConsumerQueueVersionDto request, int flag,
Map<Long, OffsetVersionEntity> offsetVersionMap, int count) {
Transaction catTransaction = null;
OffsetVersionEntity offsetVersionEntity = offsetVersionMap.get(request.getQueueOffsetId());
if (checkOffsetAndVersion(request, offsetVersionEntity)) {
try {
catTransaction = Tracer.newTransaction("Timer-service",
MqConstanst.CONSUMERPRE + "/commitOffset-" + flag);
QueueOffsetEntity queueOffsetEntity = new QueueOffsetEntity();
queueOffsetEntity.setId(request.getQueueOffsetId());
queueOffsetEntity.setOffsetVersion(request.getOffsetVersion());
queueOffsetEntity.setOffset(request.getOffset());
queueOffsetEntity.setConsumerGroupName(request.getConsumerGroupName());
queueOffsetEntity.setTopicName(request.getTopicName());
//此时会进行查询,获取标识flag,查询提交偏移量如果>0,则执行后续操作,此时必须满足有偏移的消息和偏移信息
if (queueOffsetService.commitOffset(queueOffsetEntity) > 0 && offsetVersionEntity != null) {
reentrantLock.lock();
if (request.getOffsetVersion() == offsetVersionEntity.getOffsetVersion()
&& request.getOffset() > offsetVersionEntity.getOffset()) {
offsetVersionEntity.setOffset(request.getOffset());
//这里使用if是不是更好一些呢
} else if (request.getOffsetVersion() > offsetVersionEntity.getOffsetVersion()) {
offsetVersionEntity.setOffsetVersion(request.getOffsetVersion());
offsetVersionEntity.setOffset(request.getOffset());
}
reentrantLock.unlock();
}
catTransaction.setStatus(Transaction.SUCCESS);
return true;
} catch (Exception e) {
failMapAppPolling.put(request.getQueueOffsetId(), request);
log.error("doSubmitOffset失败", e);
catTransaction.setStatus(e);
return false;
} finally {
catTransaction.complete();
}
}
return true;
}
这就是关于偏移量的代码。
这里涉及到偏移量版本的概念:OffsetVersionEntity
/**
* 偏移量版本实体类
*/
public class OffsetVersionEntity {
//id
private long id;
//偏移量
private long offset;
//偏移量版本
private long offsetVersion;
}
同时查询消息偏移量实体:
/**
* @author dal-generator
*/
public class QueueOffsetEntity {
/**
*
*/
private long id;
/**
* 订阅者组id
*/
private long consumerGroupId;
/**
* 订阅者组名称
*/
private String consumerGroupName;
/**
* 客户端消费者name
*/
private String consumerName;
/**
* 生产者id或者消费者id
*/
private long consumerId;
/**
* 主题id
*/
private long topicId;
/**
* 主题名称,如果
*/
private String topicName;
/**
* 如果是失败队列此字段名称表示原始的topic名称,topic_name为consumer_group_name+原始的topic_name+"_fail",否则topic_name和origin_topic_name一致
*/
private String originTopicName;
/**
* 1,表示正常队列,2,表示失败队列
*/
private int topicType;
/**
* 分区id
*/
private long queueId;
/**
* 消费者提交的偏移量
*/
private long offset;
/**
* 订阅时的起始偏移量
*/
private long startOffset;
/**
* 偏移版本号,当手动修改偏移时,会升级版本号,如果客户端提交更新偏移量的时候,只能按照版本号相同,偏移量大的值更新
*/
private long offsetVersion;
/**
* 1,表示客户端此queue停止消费,0,表示正常消费
*/
private int stopFlag;
/**
* ip+db_name +tb_name
*/
private String dbInfo;
/**
* 操作人
*/
private String insertBy;
/**
* 创建时间
*/
private Date insertTime;
/**
* 操作人
*/
private String updateBy;
/**
* 更新时间
*/
private Date updateTime;
/**
* 逻辑删除
*/
private int isActive;
/**
* 元数据更新时间
*/
private Date metaUpdateTime;
/**
* 原始的消费者组名
*/
private String originConsumerGroupName;
/**
* 1,为集群模式,2,为广播模式,3,为代理模式
*/
private int consumerGroupMode;
/**
* 环境
*/
private String subEnv;
上面是偏移量的基本代码。当然会涉及到元数据更新的问题。下面是元数据更新的操作。
考虑元数据更新的多种请求,同时对停止当前的偏移量进行判断,如果等于消费偏移量,则重新执行提交操作,否者执行删除偏移操作。
在偏移量或者版本号修改的过程中,此时的元数据必然需要进行改变,此时会调用到更新元数据的操作updateQueueMeta:
//更新元数据
@Override
public void updateQueueMeta(ConsumerQueueDto consumerQueue) {
synchronized (lockMetaObj) {
doUpdateQueueMeta(consumerQueue);
}
}
执行更新元数据操作:
/**
* 消费队列版本dto
*/
public class ConsumerQueueVersionDto {
//队列偏移id
private long queueOffsetId;
//偏移量版本
private volatile long offsetVersion;
//偏移量
private volatile long offset;
//消费者组名称
private String consumerGroupName;
//主题名称
private String topicName;
}
执行元数据更新操作思路:
1.查看消息队列中线程大小和当前的线程数是否相等,如果不等,则更新元数据线程信息
2.获取队列偏移量版本,比较版本,如果相同,则此时偏移量满了,需要执行扩容操作,然后获取,此时是元数据发生更新
3.发生队列重新消费,偏移量更新,为了防止拉取到数据后,会有阻塞,清除掉后,会消除阻塞
4.停止消费,检查偏移量版本和消费拿到的版本是否相同,如果相同,则执行再一次提交操作,否者则说明版本不同,此时需要执行删除操作
5.异常
代码:
//执行更新队列元数据操作
protected void doUpdateQueueMeta(ConsumerQueueDto consumerQueue) {
Transaction transaction = Tracer.newTransaction("mq-group", "updateQueueMeta-" + consumerQueue.getTopicName());
try {
if (consumerQueue.getTimeout() == 0) {
timeOutCount.set(0);
}
//获取当前的消费队列dto
ConsumerQueueDto temp = consumerQueueRef.get();
//获取flag,如果不同,则更新线程池线程数大小
boolean flag = consumerQueue.getThreadSize() != temp.getThreadSize();
if (flag) {
log.info("update_thread_size,更新线程数" + consumerQueue.getTopicName());
executor.setCorePoolSize(consumerQueue.getThreadSize() + 2);
executor.setMaximumPoolSize(consumerQueue.getThreadSize() + 2);
}
// 此时是元数据发生更新,比较版本,版本相同,则说明需要扩容
if (consumerQueue.getOffsetVersion() == temp.getOffsetVersion()) {
log.info("update meta with topic:" + consumerQueue.getTopicName());
// 更新元数据
updateQueueMetaWithOutOffset(consumerQueue);
} else {
// 此时需要进行偏移更新
log.info("queue offset changed,发生队列重新消费" + consumerQueue.getTopicName());
consumerQueueRef.set(consumerQueue);
// 此时是为了防止拉取到数据后,会有阻塞,清除掉后,会消除阻塞
messages.clear();
// 防止清除后,messages 里面有数据
Util.sleep(100);
// 再次清理,确保message 里面为空,同时使得拉取消息释放锁。
messages.clear();
// 确保更新拉取消息的起始值,为偏移重置的值,加锁是防止拉取与重置同时操作
consumerQueue.setLastId(consumerQueue.getOffset());
// 说明修改偏移了需要重新,拉取
this.lastId = consumerQueue.getOffset();
}
if (isRunning && consumerQueue.getStopFlag() == 1) {
log.info("stop deal,停止消费" + consumerQueue.getTopicName());
TraceMessageItem item = new TraceMessageItem();
doCommit(temp);
item.status = "停止消费";
item.msg = temp.getOffset() + "-" + temp.getOffsetVersion();
traceMsgCommit.add(item);
}
isRunning = consumerQueue.getStopFlag() == 0;
transaction.setStatus(Transaction.SUCCESS);
} catch (Exception e) {
transaction.setStatus(e);
} finally {
transaction.complete();
}
}
执行doCommit操作:
/**
* 执行提交消息
* @param temp
*/
protected void doCommit(ConsumerQueueDto temp) {
BatchRecorderItem item = batchRecorder.getLastestItem();
if (item != null) {
doCommit(temp, item);
}
}
//执行提交操作
private void doCommit(ConsumerQueueDto temp, BatchRecorderItem batchRecorderItem) {
if (batchRecorderItem == null)
return;
CommitOffsetRequest request = new CommitOffsetRequest();
//检查偏移量版本和消费拿到的版本是否相同,如果相同,则执行再一次提交操作,否者则说明版本不同,此时需要执行删除操作
if (checkOffsetVersion(temp)) {
List<ConsumerQueueVersionDto> queueVersionDtos = new ArrayList<>();
request.setQueueOffsets(queueVersionDtos);
ConsumerQueueVersionDto consumerQueueVersionDto = new ConsumerQueueVersionDto();
// consumerQueueVersionDto.setOffset(temp.getOffset());
consumerQueueVersionDto.setOffset(batchRecorderItem.maxId);
consumerQueueVersionDto.setOffsetVersion(temp.getOffsetVersion());
consumerQueueVersionDto.setQueueOffsetId(temp.getQueueOffsetId());
consumerQueueVersionDto.setConsumerGroupName(temp.getConsumerGroupName());
consumerQueueVersionDto.setTopicName(temp.getTopicName());
// request.setFailIds(preFailIds);
queueVersionDtos.add(consumerQueueVersionDto);
mqResource.commitOffset(request);
}
batchRecorder.delete(batchRecorderItem.batchReacorderId);
}
mqResource.commitOffset(request):
//提交偏移量
public void commitOffset(CommitOffsetRequest request) {
if (request == null) {
return;
}
//提交偏移量操作
String url = MqConstanst.CONSUMERPRE + "/commitOffset";
try {
post(request, url, 10, CommitOffsetResponse.class, false);
} catch (Exception e) {
CatRequest request2 = new CatRequest();
request2.setMethod("commitOffset");
request2.setJson(JsonUtil.toJson(request));
request2.setMsg(e.getMessage());
addCat(request2);
}
}
执行提交偏移量操作:
// 发送心跳,直接返回
@PostMapping("/commitOffset")
public CommitOffsetResponse commitOffset(@RequestBody CommitOffsetRequest request) {
return consumerCommitService.commitOffset(request);
}
最终会调用doCommitOffset操作:
/**
* 提交偏移量
* @param request
* @return
*/
@Override
public CommitOffsetResponse commitOffset(CommitOffsetRequest request) {
// Transaction catTransaction = Tracer.newTransaction("Timer-service",
// "commitOffset");
CommitOffsetResponse response = new CommitOffsetResponse();
response.setSuc(true);
Map<Long, ConsumerQueueVersionDto> map = mapAppPolling.get();
try {
//如果请求不为空,同时请求的队列偏移量不为空,则执行提交操作
if (request != null && !CollectionUtils.isEmpty(request.getQueueOffsets())) {
request.getQueueOffsets().forEach(t1 -> {
ConsumerQueueVersionDto temp = map.get(t1.getQueueOffsetId());
boolean flag1 = true;
if (temp == null) {
synchronized (lockObj1) {
temp = map.get(t1.getQueueOffsetId());
if (temp == null) {
map.put(t1.getQueueOffsetId(), t1);
flag1 = false;
}
}
}
if (flag1) {
if (temp.getOffsetVersion() < t1.getOffsetVersion()) {
clearOldData();
map.put(t1.getQueueOffsetId(), t1);
} else if (temp.getOffsetVersion() == t1.getOffsetVersion()
&& temp.getOffset() < t1.getOffset()) {
clearOldData();
map.put(t1.getQueueOffsetId(), t1);
}
}
});
if (request.getFlag() == 1) {
Map<Long, OffsetVersionEntity> offsetVersionMap = queueOffsetService.getOffsetVersion();
request.getQueueOffsets().forEach(t1 -> {
//执行提交偏移量
doCommitOffset(t1, 1, offsetVersionMap, 0);
});
}
}
} catch (Exception e) {
}
// catTransaction.setStatus(Transaction.SUCCESS);
// catTransaction.complete();
return response;
}
执行提交偏移操作:
//提交偏移量
protected boolean doCommitOffset(ConsumerQueueVersionDto request, int flag,
Map<Long, OffsetVersionEntity> offsetVersionMap, int count) {
Transaction catTransaction = null;
OffsetVersionEntity offsetVersionEntity = offsetVersionMap.get(request.getQueueOffsetId());
if (checkOffsetAndVersion(request, offsetVersionEntity)) {
try {
//flag是标识
catTransaction = Tracer.newTransaction("Timer-service",
MqConstanst.CONSUMERPRE + "/commitOffset-" + flag);
QueueOffsetEntity queueOffsetEntity = new QueueOffsetEntity();
queueOffsetEntity.setId(request.getQueueOffsetId());
queueOffsetEntity.setOffsetVersion(request.getOffsetVersion());
queueOffsetEntity.setOffset(request.getOffset());
queueOffsetEntity.setConsumerGroupName(request.getConsumerGroupName());
queueOffsetEntity.setTopicName(request.getTopicName());
//此时会进行查询,获取标识flag,查询提交偏移量如果>0,则执行后续操作,此时必须满足有偏移的消息和偏移信息
if (queueOffsetService.commitOffset(queueOffsetEntity) > 0 && offsetVersionEntity != null) {
reentrantLock.lock();
if (request.getOffsetVersion() == offsetVersionEntity.getOffsetVersion()
&& request.getOffset() > offsetVersionEntity.getOffset()) {
offsetVersionEntity.setOffset(request.getOffset());
} else if (request.getOffsetVersion() > offsetVersionEntity.getOffsetVersion()) {
offsetVersionEntity.setOffsetVersion(request.getOffsetVersion());
offsetVersionEntity.setOffset(request.getOffset());
}
reentrantLock.unlock();
}
catTransaction.setStatus(Transaction.SUCCESS);
return true;
} catch (Exception e) {
failMapAppPolling.put(request.getQueueOffsetId(), request);
log.error("doSubmitOffset失败", e);
catTransaction.setStatus(e);
return false;
} finally {
catTransaction.complete();
}
}
return true;
}