点击上方“Java团长”,选择“置顶公众号”
干货文章第一时间送达!
面试互联网公司不得不说的高并发!
在这里想写写自己在学习并发处理的学习思路,也会聊聊自己遇到的那些坑,以此为记,希望鞭策自己不断学习、永不放弃!
具体笔者认为大体可分为三部分: 第一部分:Java多线程编程。 第二部分:高并发的解决思路。 第三部分:分布式架构中redis、zookeeper分布式锁的应用。
本文着重讲解第一块。
1、Java内存模型与线程。
并发编程主要讨论以下几点:多个线程操作相同资源,保证线程安全,合理使用资源。
通常我们可以将物理计算机中出现的并发问题类比到JVM中的并发。
物理计算机处理器、高速缓存、主内存间交互关系如图:
处理器和内存的运行速度存在几个数量级别的差距,因此为解决此矛盾引入了高速缓存这一概念。当多个处理器的运行任务都涉及到同一块主内存区域时,将可能导致各自缓存数据的不一致问题,为解决一致性问题,需要各个处理器访问缓存时都遵循一些协议,在读写时要根据协议来进行操作。(MSI、MESI、MOSI、Synapse、Firefly及Dragon Protocol等)
处理器为提高性能,会对输入代码乱序执行(Out-Of-Order Execution) 优化。
类比Java内存模型,线程、主内存、工作内存交互关系如图:
JMM定义了程序中各个变量访问规则,即在虚拟机中将内存取出和存储的底层细节。
线程A如果要跟线程B要通信的话,必须经历以下两个步骤: 1)线程A把本地内存A中更新过的共享变量的值刷新到主内存中。 2)线程B去主内存中读取A更新过的共享变量的值。
线程的工作内存中保存了该线程使用到变量的主内存副本拷贝(也可理解为此线程的私有拷贝),线程对变量的操作(读取、赋值等)都在工作内存中进行,而不能直接读写主内存中变量。不同线程之间的通信业需要通过主内存来完成。 主内存对应Java堆中对象实例数据部分,而工作内存则对应虚拟机栈中部分区域。
在此还有非常重要的点需要提及!
指令重排序
执行程序时,为提高性能,编译器和处理器常常会对指令做出重排序。分三种:
1)编译器优化的重排序。 2)指令并行重排序。 3)内存系统重排序。
JMM的编译器会禁止特定类型的编译器重排序,对于处理器重排序(后两者),则要求Java编译器在生成指令序列时,插入特定类型的内存屏障指令,通过内存屏障指令来禁止特定类型的处理器重排序。
内存之间的交互操作
JMM中定义了8种操作来来描述工作内存与主内存之间的实现细节。
JMM规定了执行上述八种操作时必须满足的规则(与happens-before原则是等效的,即先行发生原则):
2、测试工具
PostMan、Apache Bench、JMeter、LoadRunner
3、线程安全性
原子性:提供了互斥访问,同一时刻只能由一个线程来对它进行操作。 可见性:一个线程对主内存的修改可以及时被其他线程观察到。 有序性:一个线程观察其它线程中指令执行顺序,由于指令重排序的存在,观察的结果一般为杂乱无章的。 Java程序的天然有序性可以总结为:如果本线程内观察,所有的操作都是有序的;如果在一个线程观察另一个线程,所有的操作都是无须的。前者指的是线程内的串行语义,后者指的是指令重排序和工作内存和主内存同步延迟现象。
原子性-Atomic包
通过CAS来保证原子性,即Compare And Swap 比较交换: CAS利用处理器提供的CMPXCHG指令实现,自旋CAS实现的基本思路就是循环进行CAS直到成功为止。 比较内存的值与预期的值,若相同则修改预期的值。
CAS虽然可以进行高效的进行原子操作,但是CAS仍在存在三大问题。
测试:
public class AtomicExample1 {
// 请求总数
public static int clientTotal = 5000;
// 同时并发执行的线程数
public static int threadTotal = 200;
public static AtomicInteger count = new AtomicInteger(0);
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal ; i++) {
executorService.execute(() -> {
try {
semaphore.acquire();
add();
semaphore.release();
} catch (Exception e) {
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
log.info("count:{}", count.get());
}
private static void add() {
count.incrementAndGet();
// count.getAndIncrement();
}
}
AtomicInteger 源码实现
public final int incrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
//当前的指为var2,底层穿过来的值var5 如果当前的值与底层传过来的值一样的话,则将其更新问var5+var4
AtomicLong与LongAdder
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
if ((as = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
longAccumulate(x, null, uncontended);
}
}
/**
* Equivalent to {@code add(1)}.
*/
public void increment() {
add(1L);
}
AtomicBoolean
public final boolean compareAndSet(boolean expect, boolean update) {
int e = expect ? 1 : 0;
int u = update ? 1 : 0;
return unsafe.compareAndSwapInt(this, valueOffset, e, u);
}
AtomicReference
public final V getAndSet(V newValue) {
return (V)unsafe.getAndSetObject(this, valueOffset, newValue);
}
public final Object getAndSetObject(Object var1, long var2, Object var4) {
Object var5;
do {
var5 = this.getObjectVolatile(var1, var2);
} while(!this.compareAndSwapObject(var1, var2, var5, var4));
return var5;
}
AtomicIntegerFieldUpdater
public class AtomicExample5 {
private static AtomicIntegerFieldUpdater<AtomicExample5> updater =
AtomicIntegerFieldUpdater.newUpdater(AtomicExample5.class, "count");
@Getter
public volatile int count = 100;
public static void main(String[] args) {
AtomicExample5 example5 = new AtomicExample5();
if (updater.compareAndSet(example5, 100, 120)) {
log.info("update success 1, {}", example5.getCount());
}
if (updater.compareAndSet(example5, 100, 120)) {
log.info("update success 2, {}", example5.getCount());
} else {
log.info("update failed, {}", example5.getCount());
}
}
}
AtomicStampedReference
public boolean compareAndSet(V expectedReference,
V newReference,
int expectedStamp,
int newStamp) {
Pair<V> current = pair;
return
expectedReference == current.reference &&
expectedStamp == current.stamp &&
((newReference == current.reference &&
newStamp == current.stamp) ||
casPair(current, Pair.of(newReference, newStamp)));
}
AtomicLongArray 维护数组
原子性-锁及对比
线程安全-可见性
导致共享变量在线程间不可见的原因
1)线程交叉执行。 2)重排序结合线程交叉执行。 3)共享变量更新后的值没有在工作内存与主内存及时更新。
JMM关于synchronizd的两条规定:
volatile-可见性 通过加入内存屏障和禁止重排序优化实现。
必须符合以下场景才可使用:
private static void add() {
count++;
// 1、count 取出当前内存中的值
// 2、+1
// 3、count 写回主存
//即:两个线程同时执行+1写回主存就出现问题。
}
volatile通常用来作为状态标记量
volatile boolean inited = false;
//线程1:
context = loadContext();
inited = true;
//线程2;
while (!inited){
sleep();
}
doSomethingWithConfig(context);
4、安全发布对象
发布对象:使一个对象能够被当前范围之外代码所使用。
对象逸出:一种错误的发布。当一个对象还没有构造完成,就能被其它线程所见。
安全发布对象
对此单例模式是个很好的学习例子:
public class SingletonExample4 {
// 私有构造函数
private SingletonExample4() {
}
// 1、memory = allocate() 分配对象的内存空间
// 2、ctorInstance() 初始化对象
// 3、instance = memory 设置instance指向刚分配的内存
// JVM和cpu优化,发生了指令重排
// 1、memory = allocate() 分配对象的内存空间
// 3、instance = memory 设置instance指向刚分配的内存
// 2、ctorInstance() 初始化对象
// 单例对象
private volatile static SingletonExample4 instance = null;
// 静态的工厂方法
public static SingletonExample4 getInstance() {
if (instance == null) { // 双重检测机制 // B
synchronized (SingletonExample4.class) { // 同步锁
if (instance == null) {
instance = new SingletonExample4(); // A - 3
}
}
}
return instance;
}
}
通过枚举实现单例模式
/**
* 枚举模式:最安全
*/
@ThreadSafe
@Recommend
public class SingletonExample7 {
// 私有构造函数
private SingletonExample7() {
}
public static SingletonExample7 getInstance() {
return Singleton.INSTANCE.getInstance();
}
private enum Singleton {
INSTANCE;
private SingletonExample7 singleton;
// JVM保证这个方法绝对只调用一次
Singleton() {
singleton = new SingletonExample7();
}
public SingletonExample7 getInstance() {
return singleton;
}
}
}
5、线程安全策略
1) 不可变对象
满足条件:
2) 线程封闭
3) 线程不安全写法
4) 同步容器
5) 线程安全-并发容器-J.U.C
ArrayList -> CopyOnWriteArrayList
public void add(int index, E element) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
if (index > len || index < 0)
throw new IndexOutOfBoundsException("Index: "+index+
", Size: "+len);
Object[] newElements;
int numMoved = len - index;
if (numMoved == 0)
newElements = Arrays.copyOf(elements, len + 1);
else {
newElements = new Object[len + 1];
System.arraycopy(elements, 0, newElements, 0, index);
System.arraycopy(elements, index, newElements, index + 1,
numMoved);
}
newElements[index] = element;
setArray(newElements);
} finally {
lock.unlock();
}
}
public E get(int index) {
return get(getArray(), index);
}
HashSet、TreeSet --> CopyOnWriteArraySet、ConcurrentSkipListSet ConcurrentSkipListSet对批量操作不能保证原子性。
参考:JDK1.8源码分析之ConcurrentSkipListSet(八)
https://www.cnblogs.com/leesf456/p/5549820.html
HashMap、TreeMap --> ConcurrentHashMap、ConcurrentSkipListMap ConcurrentHashMap效率相对比ConcurrentSkipListMap高,ConcurrentSkipListMap有些其不具有的特性:
6、J.U.C之AQSAbstractQueuedSynchronizer-AQS
AQS同步组件
1)等待多线程完成的CountDownLatch(JDK1.5)
允许一个或多个线程等待其他线程完成操作。 其构造函数接收一个int类型的参数作为计数器,调用countDown方法的时候,计数器的值会减1,CountDownLatch的await方法会阻塞当前线程,直到N变为零。 应用:并行计算,解析Excel中多个sheet的数据。
2)控制并发线程数的 Semaphore 用来控制同时访问特定资源线程的数量。 应用:流量控制,特别是公共资源有限的场景,如数据库连接。
//可用的许可的数量
Semaphore(int permits)
//获取一个许可
aquire()
//使用完成后归还许可
release()
//尝试获取许可证
tryAcquire()
3)同步屏障 CyclicBarrier
让一组线程达到一个屏障(同步点)时被阻塞,直到最后一个线程到达屏障时,才会开门,所有被屏障拦截的线程才会继续执行。 应用:多线程计算数据,最后合并计算结果的场景。
CyclicBarrier和CountDownLatch的区别
//屏障拦截的线程数量
CyclicBarrier(int permits)
//已经到达屏障
await()
//CyclicBarrier阻塞线程的数量
getNumberWaiting()
4)重入锁 ReentrantLock (排他锁:同时允许单个线程访问。) 支持重进入的锁,表示该锁能够支持一个线程对资源的重复加锁。即实现重进入:任意线程获取到锁之后能够再次获取该锁而不会被锁阻塞。
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
公平锁就是等待时间最长的线程最优先获取锁,也就是说获取锁的是顺序的(FIFO)。而非公平则允许插队。
非公平因为不保障顺序,则效率相对较高,而公平锁则可以减少饥饿发生的概率。
ReentrantReadWriteLock (读写锁,实现悲观读取,同时允许多个线程访问) 在写线程访问时,所有的读线程和其他写线程均被堵塞。其维护了一对锁,通过分离读锁、写锁,使得并发性比排他锁有很大提升。
适用于读多写少的环境,能够提供比排他锁更好的并发与吞吐量。 不足:ReentrantReadWriteLock是读写锁,在多线程环境下,大多数情况是读的情况远远大于写的操作,因此可能导致写的饥饿问题。
StampedLock 是ReentrantReadWriteLock 的增强版,是为了解决ReentrantReadWriteLock的一些不足。 StampedLock读锁并不会阻塞写锁,设计思路也比较简单,就是在读的时候发现有写操作,再去读多一次。。 StampedLock有两种锁,一种是悲观锁,另外一种是乐观锁。如果线程拿到乐观锁就读和写不互斥,如果拿到悲观锁就读和写互斥。
参考:Java8对读写锁的改进:StampedLock
https://blog.csdn.net/sunfeizhi/article/details/52135136
5)Condition 参考:Java线程(九):Condition-线程通信更高效的方式
https://blog.csdn.net/ghsau/article/details/7481142
6)FutureTask 参考:Java并发编程:Callable、Future和FutureTask
https://www.cnblogs.com/dolphin0520/p/3949310.html
7)Fork/Join 参考:Fork/Join 模式高级特性
https://www.ibm.com/developerworks/cn/java/j-lo-forkjoin/index.html
8)BlocklingQueue
参考:Java中的阻塞队列
http://www.infoq.com/cn/articles/java-blocking-queue
7、线程池
参考:Java 四种线程池的用法分析
https://blog.csdn.net/u011974987/article/details/51027795
越写越多,受不了了,参考项以后再补,先休息!
参考资料: