允许一个或者多个线程等待操作完成
假设有10个人排队,我们将其分成5个人一批,使用CountDownLatc 来协调。
public class LatchSample {
/**
* @param args
* @throws InterruptedException
*/
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(6);
for(int i = 0; i < 5 ; i++){
Thread t = new Thread(new FirstBatchWorker(latch));
t.start();
}
for(int i = 0 ; i < 5;i++){
Thread t = new Thread(new SecondBatchWorker(latch));
t.start();
}
while(latch.getCount() != 1){
Thread.sleep(100L);
}
System.out.println(" wait gor first batch finish");
latch.countDown();
}
}
class FirstBatchWorker implements Runnable {
private CountDownLatch latch;
public FirstBatchWorker(CountDownLatch latch) {
this.latch = latch;
}
/**
* @see java.lang.Runnable#run()
*/
@Override
public void run() {
System.out.println("First batch executed!");
latch.countDown();
}
}
class SecondBatchWorker implements Runnable {
private CountDownLatch latch;
public SecondBatchWorker(CountDownLatch latch) {
this.latch = latch;
}
@Override
public void run() {
try {
latch.await();
System.out.println("Second batch Executed!");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
CountDownLatch调度也较为简单,后一批的线程进行 await 等待前一批 countdown 足够多次。局限性是不能重用
允许多个线程瞪大到达某个屏障
public class CyclicBarrierSample {
/**
* @param args
*/
public static void main(String[] args) {
CyclicBarrier barrier = new CyclicBarrier(5, new Runnable() {
@Override
public void run() {
System.out.println("Action.... go again!");
}
});
for (int i = 0; i < 5; i++) {
Thread t = new Thread(new CyclicWorker(barrier));
t.start();
}
}
}
class CyclicWorker implements Runnable {
private CyclicBarrier barrier;
public CyclicWorker(CyclicBarrier barrier) {
this.barrier = barrier;
}
/**
* @see java.lang.Runnable#run()
*/
@Override
public void run() {
try {
for (int i = 0; i < 3; i++) {
System.out.println("Executed!");
barrier.await();
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}
Java 版本信号量的实现,通过允许一定数量的允许(Permit)的方式,来表达限制通用资源访问的目的。租车时,当很多空出租车就位时,为防止过度拥挤,调度员指挥排队等待坐车的队伍一次进来5个人上车,等这5个人坐车出发,再放进去下一批,这和 Semaphore的工作原理类似
线程尝试获得许可,获得许可则进入任务,任务执行完,然后释放许可。这时等待许可的其他线程,可以获得许可进入工作状态,知道全部处理结束。
public class SemaphoreSample {
/**
* @param args
*/
public static void main(String[] args) {
System.out.println("Action ...Go!");
Semaphore semaphore = new Semaphore(5);
for(int i = 0; i < 10;i++){
Thread t = new Thread(new SemaphoreWorker(semaphore));
t.start();
}
}
}
class SemaphoreWorker implements Runnable{
private String name;
private Semaphore semaphore;
public SemaphoreWorker(Semaphore semaphore){
this.semaphore = semaphore;
}
@Override
public void run() {
try {
log("is waitting for permit");
semaphore.acquire();
log("acquired a permit");
log("excuted");
} catch (InterruptedException e) {
e.printStackTrace();
} finally{
log("releae a permit");
semaphore.release();
}
}
private void log(String msg){
if(name == null){
name = Thread.currentThread().getName();
}
System.out.println(name+", "+ msg);
}
}
Semapore 类似计数器,实现逻辑基于 Acquire/release,如果 semaphore 的数值初始化成1 相当于互斥锁只有一个资源供竞争。
线程安全Map ,List 和 Set。当应用侧重 Map 存放的速度,推荐使用 ConcurrentHashMap, 如果需要使用大量数据进行频繁的修改,推荐使用 ConcurrentSkipListMap
image
image
CopyOnWriteArraySet 包装了 CopyOnWriteArrayList 来实现 CopyOnWrite 原理是任何修改操作,add,set remove 都会拷贝原数组,修改后替换原理的数组,通过这种防御方式,实现另类的线程安全。这种数据结构,适合读多写少的操作。
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}