案例:对账系统的业务是这样的,用户通过在线商城下单,会生成电子订单,保存在订单库;之后物流会生成派送单给用户发货,派送单保存在派送单库。为了防止漏派送或者重复派送,对账系统每天还会校验是否存在异常订单。对账系统的处理逻辑很简单,系统流程图如下。目前对账系统的处理逻辑是首先查询订单,然后查询派送单,之后对比订单和派送单,将差异写入差异库。
对上面的代码抽象就是这样的,就是在一个单线程里面循环查询订单、派送单,然后执行对账,最后将写入差异库。
while(存在未对账订单){
// 查询未对账订单
pos = getPOrders();
// 查询派送单
dos = getDOrders();
// 执行对账操作
diff = check(pos, dos);
// 差异写入差异库
save(diff);
}
1)上面的系统现在执行很慢,该怎样优化来执行速度呢?
2)实现查询对账订单和查询派送单并行执行的代码应该是怎样的?
while(存在未对账订单){
// 查询未对账订单
Thread T1 = new Thread(()->{
pos = getPOrders();
});
T1.start();
// 查询派送单
Thread T2 = new Thread(()->{
dos = getDOrders();
});
T2.start();
// 等待T1、T2结束
T1.join();
T2.join();
// 执行对账操作
diff = check(pos, dos);
// 差异写入差异库
save(diff);
}
3)思考一下,我们上面的代码还有没有优化的空间呢?
4)使用上面线程池的代码的话,我的join就不能调用了,那我的主线程就不知道什么时候两个查询操作执行完了,这个时候该怎么办?
5)上面的方案是我们自己想出来的,那java其实提供了一个非常方便的实现我们上面方案的工具类CountDownLatch,那使用CountDownLatch怎样优化我们的代码呢?
// 创建2个线程的线程池
Executor executor =
Executors.newFixedThreadPool(2);
while(存在未对账订单){
// 计数器初始化为2
CountDownLatch latch =
new CountDownLatch(2);
// 查询未对账订单
executor.execute(()-> {
pos = getPOrders();
latch.countDown();
});
// 查询派送单
executor.execute(()-> {
dos = getDOrders();
latch.countDown();
});
// 等待两个查询操作结束
latch.await();
// 执行对账操作
diff = check(pos, dos);
// 差异写入差异库
save(diff);
}
6)上面使用CountDownLatch和线程池的方案已经很不错了,在思考一下,我们的这个程序还能不能优化一下?
7)对账需要查询出数据来才可以执行,这种的话对应什么模型?
8)既然看出来了是生产者消费者模型,那就需要一个队列,生产者生产出来东西放到队列,消费者去队列当中取。但是针对我们上面的案例,一个队列的话肯会造成数据混乱,我们应该怎样设计?
9)我们如何用代码来实现查询和对账之间的并行呢?
10)我们上面的方案有哪些要解决的问题?怎样解决?
11)实际项目中java其实给了我们现成的实现上面方案的工具类CyclicBarrier,代码实现的怎样的?
// 订单队列
Vector<P> pos;
// 派送单队列
Vector<D> dos;
// 执行回调的线程池
Executor executor =
Executors.newFixedThreadPool(1);
final CyclicBarrier barrier =
new CyclicBarrier(2, ()->{
executor.execute(()->check());
});
void check(){
P p = pos.remove(0);
D d = dos.remove(0);
// 执行对账操作
diff = check(p, d);
// 差异写入差异库
save(diff);
}
void checkAll(){
// 循环查询订单库
Thread T1 = new Thread(()->{
while(存在未对账订单){
// 查询订单库
pos.add(getPOrders());
// 等待
barrier.await();
}
});
T1.start();
// 循环查询运单库
Thread T2 = new Thread(()->{
while(存在未对账订单){
// 查询运单库
dos.add(getDOrders());
// 等待
barrier.await();
}
});
T2.start();
}