Java并发19-CountDownLatch&CyclicBarrier


Java并发19-CountDownLatch&CyclicBarrier

对账系统流程

某对账系统流程图

抽象后的代码:

while(存在未对账订单){
  // 查询未对账订单
  pos = getPOrders();
  // 查询派送单
  dos = getDOrders();
  // 执行对账操作
  diff = check(pos, dos);
  // 差异写入差异库
  save(diff);
} 

订单存到订单库,根据订单产生派送单;派送单存在派送单库,根据派送单执行派送。

对账系统每天根据订单进行对账,比对订单和派送单。

利用并行优化

对账系统单线程执行示意图

单线程下getPOrders()&getDOrders()执行耗时长,可以采用并行,让这两个方法同时执行。

对账系统并行执行示意图

while(存在未对账订单){
  // 查询未对账订单
  Thread T1 = new Thread(()->{
    pos = getPOrders();
  });
  T1.start();
  // 查询派送单
  Thread T2 = new Thread(()->{
    dos = getDOrders();
  });
  T2.start();
  // 等待T1、T2结束
  T1.join();
  T2.join();
  // 执行对账操作
  diff = check(pos, dos);
  // 差异写入差异库
  save(diff);
} 

但上面代码,会导致每次循环都产生新线程,可以用线程池。

CountDownLatch实现线程等待

// 创建2个线程的线程池
Executor executor = Executors.newFixedThreadPool(2);
while(存在未对账订单){
  // 查询未对账订单
  executor.execute(()-> {
    pos = getPOrders();
  });
  // 查询派送单
  executor.execute(()-> {
    dos = getDOrders();
  });
  
  /* ??如何实现等待??*/
  
  // 执行对账操作
  diff = check(pos, dos);
  // 差异写入差异库
  save(diff);
}   

思路:最直接的办法是弄一个计数器,初始值设置成 2,当执行完pos = getPOrders();这个操作之后将计数器减 1,执行完dos = getDOrders();之后也将计数器减 1,在主线程里,等待计数器等于 0;当计数器等于 0 时,说明这两个查询操作执行完了。等待计数器等于 0 其实就是一个条件变量,用管程实现起来也很简单。

// 创建2个线程的线程池
Executor executor = Executors.newFixedThreadPool(2);
while(存在未对账订单){
  // 计数器初始化为2
  CountDownLatch latch = new CountDownLatch(2);
  // 查询未对账订单
  executor.execute(()-> {
    pos = getPOrders();
    latch.countDown();
  });
  // 查询派送单
  executor.execute(()-> {
    dos = getDOrders();
    latch.countDown();
  });
  
  // 等待两个查询操作结束
  latch.await();
  
  // 执行对账操作
  diff = check(pos, dos);
  // 差异写入差异库
  save(diff);
}

用CyclicBarrier实现线程同步

实际上可以让查询在操作的同时,check()&save()方法也同时执行。

完全并行示意图

这样可以设计两个队列,一个订单队列,一个派送单队列。

订单队列进行查询订单,每查询到一个即生产了一个订单数据。

派送单队列查询派送单,每查询到一个即生产了一个派送单数据。

生产出的订单和派送单数据要一一对应,都生产出来后,再通知管理check()&save()的线程进行操作。

因此,图中线程T1和线程T2要互相等待,生产出对应数据后再通知T3,即T3等待T1&T2的完成的通知。

双队列示意图

思路:利用一个计数器来解决,计数器初始化为 2,线程 T1 和 T2 生产完一条数据都将计数器减 1,如果计数器大于 0 则线程 T1 或者 T2 等待。如果计数器等于 0,则通知线程 T3,并唤醒等待的线程 T1 或者 T2,与此同时,将计数器重置为 2,这样线程 T1 和线程 T2 生产下一条数据的时候就可以继续使用这个计数器。

CyclicBarrier的计数器有自动重置的功能,当减到 0 的时候,会自动重置你设置的初始值。

// 订单队列
Vector<P> pos;
// 派送单队列
Vector<D> dos;
// 执行回调的线程池 
Executor executor = Executors.newFixedThreadPool(1);
final CyclicBarrier barrier =
  new CyclicBarrier(2, ()->{
    executor.execute(()->check());
  });
  
void check(){
  P p = pos.remove(0);
  D d = dos.remove(0);
  // 执行对账操作
  diff = check(p, d);
  // 差异写入差异库
  save(diff);
}
  
void checkAll(){
  // 循环查询订单库
  Thread T1 = new Thread(()->{
    while(存在未对账订单){
      // 查询订单库
      pos.add(getPOrders());
      // 等待
      barrier.await();
    }
  });
  T1.start();  
  // 循环查询运单库
  Thread T2 = new Thread(()->{
    while(存在未对账订单){
      // 查询运单库
      dos.add(getDOrders());
      // 等待
      barrier.await();
    }
  });
  T2.start();
}

总结

CountDownLatch主要用来解决一个线程等待多个线程的场景;CyclicBarrier是一组线程之间互相等待。

除此之外CountDownLatch的计数器不能循环利用,也就是说一旦计数器减到 0,再有线程调用await(),该线程会直接通过。

CyclicBarrier的计数器可以循环利用,具备自动重置的功能,一旦计数器减到 0 会自动重置设置的初始值。

除此之外,CyclicBarrier还可以设置回调函数。

思考

CyclicBarrier的回调函数使用了一个固定大小的线程池,是否有必要?

我问题其实是两个:
1.为啥要用线程池,而不是在回调函数中直接调用?
2.线程池为啥使用单线程的?

1.使用线程池是为了异步操作,否则回调函数是同步调用,即本次对账操作执行完才能进行下一轮的检查。
2.线程数量固定为1,防止了多线程并发导致的数据不一致,因为订单和派送单是两个队列,只有单线程去两个队列中取消息才不会出现消息不匹配的问题。

——

推荐使用ThreadPoolExecutor去实现线程池,并且实现里面的RejectedExecutionHandler和ThreadFactory,这样可以方便当调用订单查询和派送单查询的时候出现full gc的时候 dump文件 可以快速定位出现问题的线程是哪个业务线程,如果是CountDownLatch,建议设置超时时间,避免由于业务死锁没有调用countDown()导致现线程睡死的情况。

——曾轼麟


文章作者: Wendell
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Wendell !
评论
  目录