使用场景

场景一

工作中需要在web服务对外之前,spring的bean初始化的时候加载数据到缓存中。
但是由于数据量过大,需要多线程加载。要求所有的缓存加载成功之后,这个bean才初始化成功,程序继续往下走。

场景二

商务合作的需求,需要显示的数据来自合作方和我们自己的数据库中。
之前是只显示我们数据库点数据,现在合作方提供了一个接口让我实时调用对方的接口,并把两部分的数据合并后返回给前端。
但是由于合作方的接口不是特别稳定,而且也不能保证高可用,所以可以考虑同时从我们和合作方取数据,设置超时时间,如果都返回了数据就合并给前端,如果对方未能返回数据,还是有我们自己的数据能显示给用户的。

CountDownLatch和CyclicBarrier

综合上述两个场景,我看了CountDownLatch和CyclicBarrier,看说明觉得比较适合我的使用。

CountDownLatch

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package countdownlatchtest;

import com.google.common.collect.Maps;

import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;

public class CountDownLatchService {

private CountDownLatch countDownLatch = new CountDownLatch(4);

/**
* 用来存储所有线程的运行结果
*/
private ConcurrentMap<String, String> resultMap = Maps.newConcurrentMap();



public CountDownLatch getCountDownLatch() {
return countDownLatch;
}

public ConcurrentMap<String, String> getResultMap() {
return resultMap;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package countdownlatchtest;

import org.apache.commons.lang3.RandomUtils;

import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;

public class Worker implements Runnable {

private ConcurrentMap<String, String> map;
private CountDownLatch countDownLatch;

public Worker(ConcurrentMap<String, String> map, CountDownLatch countDownLatch) {
this.map = map;
this.countDownLatch = countDownLatch;
}

@Override
public void run() {

// 这里写代码做某些事
System.out.println(Thread.currentThread().getName() + "\t开始了...");
final int sleep = RandomUtils.nextInt(5, 20);
try {
Thread.sleep(sleep * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}

// 添加结果到map中
map.putIfAbsent(Thread.currentThread().getName(), String.valueOf(sleep));

// 告诉 countDownLatch ,当前线程完成了
System.out.println(Thread.currentThread().getName() + "\t结束了...");
countDownLatch.countDown();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package countdownlatchtest;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Main {

public static void main(String[] args) {
final ExecutorService executorService = Executors.newFixedThreadPool(6);
for (int i = 0; i < 3; i++) {
// 模拟多次执行任务
doTask(executorService);
}
}

private static void doTask(ExecutorService executorService) {
CountDownLatchService countDownLatchService = new CountDownLatchService();
for (int i = 0; i < 4; i++) {
Worker worker = new Worker(countDownLatchService.getResultMap(), countDownLatchService.getCountDownLatch());
executorService.submit(worker);
}
try {
// 阻塞在这里,等待其他线程执行完成
countDownLatchService.getCountDownLatch().await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("所有线程执行成功");
System.out.println("打印结果如下:\n" + countDownLatchService.getResultMap());
}

}

CyclicBarrier

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package cyclicbarriertest;

import com.google.common.collect.Maps;

import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierService implements Runnable {

/**
* 表示当有4个线程执行完的时候,会调用第二个参数 barrierAction 的start()方法
*
* 所以本类实现 Runnable 接口,传入this
*/
private CyclicBarrier cyclicBarrier = new CyclicBarrier(4, this);

/**
* 用来存储所有线程的运行结果
*/
private ConcurrentMap<String, String> resultMap = Maps.newConcurrentMap();

@Override
public void run() {
System.out.println("所有线程执行成功");
System.out.println("打印结果如下:\n" + resultMap);
}

public ConcurrentMap<String, String> getResultMap() {
return resultMap;
}

public CyclicBarrier getCyclicBarrier() {
return cyclicBarrier;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package cyclicbarriertest;

import org.apache.commons.lang3.RandomUtils;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CyclicBarrier;

public class Worker implements Runnable {

private ConcurrentMap<String, String> map;
private CyclicBarrier cyclicBarrier;

public Worker(ConcurrentMap<String, String> map, CyclicBarrier cyclicBarrier) {
this.map = map;
this.cyclicBarrier = cyclicBarrier;
}

@Override
public void run() {

// 这里写代码做某些事
System.out.println(Thread.currentThread().getName() + "\t开始了...");
final int sleep = RandomUtils.nextInt(5, 20);
try {
Thread.sleep(sleep * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}

// 添加结果到map中
map.putIfAbsent(Thread.currentThread().getName(), String.valueOf(sleep));

try {
// 告诉 cyclicBarrier ,当前线程完成了
System.out.println(Thread.currentThread().getName() + "\t结束了...");
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package cyclicbarriertest;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Main {

public static void main(String[] args) {
final ExecutorService executorService = Executors.newFixedThreadPool(6);
for (int j = 0; j < 3; j++) {
// 模拟多次任务执行
doTask(executorService);
}
}

private static void doTask(ExecutorService executorService) {
CyclicBarrierService cyclicBarrierService = new CyclicBarrierService();
for (int i = 0; i < 4; i++) {
Worker worker = new Worker(cyclicBarrierService.getResultMap(), cyclicBarrierService.getCyclicBarrier());
executorService.submit(worker);
}
}
}

总结

写了上面两个例子,后面发现场景一适合用CountDownLatch,场景二适合用CyclicBarrier。场景一我们基本上不存在有任务不能执行完的情况,基本上做到计数器不归0,即使服务启动了也没办法正常使用,场景二很多情况都是任务不能正常的执行完成。