Java并发19-CountDownLatch&CyclicBarrier
对账系统流程
抽象后的代码:
while(存在未对账订单){
// 查询未对账订单
pos = getPOrders();
// 查询派送单
dos = getDOrders();
// 执行对账操作
diff = check(pos, dos);
// 差异写入差异库
save(diff);
}
订单存到订单库,根据订单产生派送单;派送单存在派送单库,根据派送单执行派送。
对账系统每天根据订单进行对账,比对订单和派送单。
利用并行优化
单线程下getPOrders()
&getDOrders()
执行耗时长,可以采用并行,让这两个方法同时执行。
while(存在未对账订单){
// 查询未对账订单
Thread T1 = new Thread(()->{
pos = getPOrders();
});
T1.start();
// 查询派送单
Thread T2 = new Thread(()->{
dos = getDOrders();
});
T2.start();
// 等待T1、T2结束
T1.join();
T2.join();
// 执行对账操作
diff = check(pos, dos);
// 差异写入差异库
save(diff);
}
但上面代码,会导致每次循环都产生新线程,可以用线程池。
CountDownLatch实现线程等待
// 创建2个线程的线程池
Executor executor = Executors.newFixedThreadPool(2);
while(存在未对账订单){
// 查询未对账订单
executor.execute(()-> {
pos = getPOrders();
});
// 查询派送单
executor.execute(()-> {
dos = getDOrders();
});
/* ??如何实现等待??*/
// 执行对账操作
diff = check(pos, dos);
// 差异写入差异库
save(diff);
}
思路:最直接的办法是弄一个计数器,初始值设置成 2,当执行完pos = getPOrders();这个操作之后将计数器减 1,执行完dos = getDOrders();之后也将计数器减 1,在主线程里,等待计数器等于 0;当计数器等于 0 时,说明这两个查询操作执行完了。等待计数器等于 0 其实就是一个条件变量,用管程实现起来也很简单。
// 创建2个线程的线程池
Executor executor = Executors.newFixedThreadPool(2);
while(存在未对账订单){
// 计数器初始化为2
CountDownLatch latch = new CountDownLatch(2);
// 查询未对账订单
executor.execute(()-> {
pos = getPOrders();
latch.countDown();
});
// 查询派送单
executor.execute(()-> {
dos = getDOrders();
latch.countDown();
});
// 等待两个查询操作结束
latch.await();
// 执行对账操作
diff = check(pos, dos);
// 差异写入差异库
save(diff);
}
用CyclicBarrier实现线程同步
实际上可以让查询在操作的同时,check()&save()方法也同时执行。
这样可以设计两个队列,一个订单队列,一个派送单队列。
订单队列进行查询订单,每查询到一个即生产了一个订单数据。
派送单队列查询派送单,每查询到一个即生产了一个派送单数据。
生产出的订单和派送单数据要一一对应,都生产出来后,再通知管理check()&save()的线程进行操作。
因此,图中线程T1和线程T2要互相等待,生产出对应数据后再通知T3,即T3等待T1&T2的完成的通知。
思路:利用一个计数器来解决,计数器初始化为 2,线程 T1 和 T2 生产完一条数据都将计数器减 1,如果计数器大于 0 则线程 T1 或者 T2 等待。如果计数器等于 0,则通知线程 T3,并唤醒等待的线程 T1 或者 T2,与此同时,将计数器重置为 2,这样线程 T1 和线程 T2 生产下一条数据的时候就可以继续使用这个计数器。
CyclicBarrier
的计数器有自动重置的功能,当减到 0 的时候,会自动重置你设置的初始值。
// 订单队列
Vector<P> pos;
// 派送单队列
Vector<D> dos;
// 执行回调的线程池
Executor executor = Executors.newFixedThreadPool(1);
final CyclicBarrier barrier =
new CyclicBarrier(2, ()->{
executor.execute(()->check());
});
void check(){
P p = pos.remove(0);
D d = dos.remove(0);
// 执行对账操作
diff = check(p, d);
// 差异写入差异库
save(diff);
}
void checkAll(){
// 循环查询订单库
Thread T1 = new Thread(()->{
while(存在未对账订单){
// 查询订单库
pos.add(getPOrders());
// 等待
barrier.await();
}
});
T1.start();
// 循环查询运单库
Thread T2 = new Thread(()->{
while(存在未对账订单){
// 查询运单库
dos.add(getDOrders());
// 等待
barrier.await();
}
});
T2.start();
}
总结
CountDownLatch
主要用来解决一个线程等待多个线程的场景;CyclicBarrier
是一组线程之间互相等待。
除此之外CountDownLatch
的计数器不能循环利用,也就是说一旦计数器减到 0,再有线程调用await()
,该线程会直接通过。
CyclicBarrier
的计数器可以循环利用,具备自动重置的功能,一旦计数器减到 0 会自动重置设置的初始值。
除此之外,CyclicBarrier
还可以设置回调函数。
思考
CyclicBarrier
的回调函数使用了一个固定大小的线程池,是否有必要?
我问题其实是两个:
1.为啥要用线程池,而不是在回调函数中直接调用?
2.线程池为啥使用单线程的?1.使用线程池是为了异步操作,否则回调函数是同步调用,即本次对账操作执行完才能进行下一轮的检查。
2.线程数量固定为1,防止了多线程并发导致的数据不一致,因为订单和派送单是两个队列,只有单线程去两个队列中取消息才不会出现消息不匹配的问题。——傲
推荐使用ThreadPoolExecutor去实现线程池,并且实现里面的RejectedExecutionHandler和ThreadFactory,这样可以方便当调用订单查询和派送单查询的时候出现full gc的时候 dump文件 可以快速定位出现问题的线程是哪个业务线程,如果是CountDownLatch,建议设置超时时间,避免由于业务死锁没有调用countDown()导致现线程睡死的情况。
——曾轼麟