首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

Java 线程锁技术

笔记摘要

这里介绍了java5中的线程锁技术:Lock和Condition,实现线程间的通信,其中的读锁和写锁的使用通过一个缓存系统进行了演示,对于Condition的应用通过一个阻塞队列进行演示。

线程锁技术:Lock & Condition 实现线程同步通信所属包:java.util.concurrent.locks

1. Lock

Lock比传统线程模型中的synchronized方式更加面向对象,相对于synchronized 方法和语句它具有更广泛的锁定操作,此实现允许更灵活的结构,可以具有差别很大的属性,可以支持多个相关的 Condition 对象。

于现实生活中类似,锁本身也是一个对象。两个线程执行的代码片段要实现同步互斥的结果,它们必须用同一个Lock对象,锁是上在代表要操作的资源的类的内部方法中,而不是线程代码中。

ReentrantLock

常用形式如下

Lock lock = new ReentrantLock();

public void doSth(){

lock.lock();

try {

// 执行某些操作

}finally {

lock.unlock();

}

}读写锁

分为读锁和写锁,多个读锁不互斥,读锁与写锁互斥,写锁与写锁互斥,这是由JVM自己控制的。你只要上好相应的锁即可。如果你的代码只读数据,可以很多人同时读,但不能同时写,那就上读锁;如果你的代码修改数据,只能有一个人在写,且不能同时读取,那就上写锁。总之,读的时候上读锁,写的时候上写锁!

读写锁的使用情景:

如果代码只读数据,就可以很多人共同读取,但不能同时写。

如果代码修改数据,只能有一个人在写,且不能同时读数据。

API中ReentrantReadWriteLock类提供的一个读写锁缓存示例:

class CachedData {

Object data;

volatile boolean cacheValid;

ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();

void processCachedData() {

rwl.readLock().lock();

if (!cacheValid) {

// Must release read lock before acquiring write lock

rwl.readLock().unlock();

rwl.writeLock().lock();

// Recheck state because another thread might have acquired

  // write lock and changed state before we did.

if (!cacheValid) {

data = ...

cacheValid = true;

}

// Downgrade by acquiring read lock before releasing write lock

rwl.readLock().lock();

rwl.writeLock().unlock(); // Unlock write, still hold read

}

use(data);

rwl.readLock().unlock();

}

}

读写锁的应用:编写一个缓存系统

注解:为了避免线程的安全问题,synchronized和ReadWriteLock都可以,synchronized也防止了并发读取,性能较低有一个线程先进去,开始读取数据,进行判断,发现没有数据,其他线程就没有必要进去了,就释放读锁,加上写锁,去查找数据写入,为了避免写入的其他对象等待,再做一次判断,数据写入完成后,释放写锁,上读锁,防止写入,还原原来的状态。

两次判断:第一次为了写入数据,所以释放读锁,上写锁。第二次为了防止阻塞的线程重复写入

import java.util.HashMap;

import java.util.Map;

import java.util.concurrent.locks.ReadWriteLock;

import java.util.concurrent.locks.ReentrantReadWriteLock;

public class CacheDemo {

//定义一个map用于缓存对象

private Map cache = new HashMap();

//获取一个读写锁对象

private ReadWriteLock rwl = new ReentrantReadWriteLock();

//带有缓存的获取指定值的方法

public Object getData(String key){

rwl.readLock().lock(); //上读锁

Object value = null;

try{

value = cache.get(key); //获取要查询的值

if(value == null){ //线程出现安全问题的地方

rwl.readLock().unlock(); //没有数据,释放读锁,上写锁

// 多个线程去上写锁,第一个上成功后,其他线程阻塞,第一个线程开始执行下面的代码,最后

// 释放写锁后,后面的线程继续上写锁,为了避免后面的线程重复写入,进行二次判断

rwl.writeLock().lock();

try{

if(value==null){ //二次判断,防止其他线程重复写数据

value = "aaaa"; //实际是去查询数据库

}

}finally{

rwl.writeLock().unlock(); //写完数据,释放写锁

}

rwl.readLock().lock(); //恢复读锁

}

}finally{

rwl.readLock().unlock(); //最终释放读锁

}

return value; //返回获取到的值

}

}

虚假唤醒:用while代替if

Lock lock = new ReentrantLock();

try {

lock.lock();

//需要加锁的代码

}finally {

lock.unlock();

}

读写锁测试

public class ReadWriteLockTest {

public static void main(String[] args) {

final Queue3 q3 = new Queue3();

for(int i=0;i

{

new Thread(){

public void run(){

while(true){

q3.get();

}

}

}.start();

new Thread(){

public void run(){

while(true){

q3.put(new Random().nextInt(10000));

}

}

}.start();

}

}

}

class Queue3{

private Object data = null;

ReadWriteLock rwl = new ReentrantReadWriteLock

();

public void get(){

rwl.readLock().lock();

try {

System.out.println(Thread.currentThread().getName() + " be ready to read data!");

Thread.sleep((long)(Math.random()*1000));

System.out.println(Thread.currentThread().getName() + "have read data :" + data);

} catch (InterruptedException e) {

e.printStackTrace();

}finally{

rwl.readLock().unlock();

}

}

public void put(Object data){

rwl.writeLock().lock();

try {

System.out.println(Thread.currentThread().getName() + " be ready to write data!");

Thread.sleep((long)(Math.random()*1000));

this.data = data;

System.out.println(Thread.currentThread().getName() + " have write data: " + data);

} catch (InterruptedException e) {

e.printStackTrace();

}finally{

rwl.writeLock().unlock();

}

}

}

Thread-0 be ready to read data!

Thread-2 be ready to read data!

Thread-4 be ready to read data!

Thread-0have read data :null

Thread-2have read data :null

Thread-4have read data :null

Thread-5 be ready to write data!

Thread-5 have write data: 7975

Thread-5 be ready to write data!

Thread-5 have write data: 9832

Thread-3 be ready to write data!

Thread-3 have write data: 2813

Thread-3 be ready to write data!

Thread-3 have write data: 7998

Thread-1 be ready to write data!

Thread-1 have write data: 6737

Thread-1 be ready to write data!

...2. Condition

用于实现线程间的通信,是为了解决Object.wait()、nitify()、notifyAll()难以使用的问题

Condition 将 Object 监视器方法(wait、notify 和 notifyAll)分解成截然不同的对象,以便通过将这些对象与任意 Lock 实现组合使用,为每个对象提供多个等待 set(wait-set)。其中,Lock 替代了 synchronized 方法和语句的使用,Condition 替代了 Object 监视器方法wait和notify的使用

一个锁内部可以有多个Condition,即有多路等待通知,传统的线程机制中一个监视器对象上只能有一路等待和通知,要想实现多路等待和通知,必须嵌套使用多个同步监视器对象。使用一个监视器往往会产生顾此失彼的情况。

在等待 Condition 时,允许发生“虚假唤醒”,这通常作为对基础平台语义的让步。对于大多数应用程序,这带来的实际影响很小,因为 Condition 应该总是在一个循环中被等待,并测试正被等待的状态声明。某个实现可以随意移除可能的虚假唤醒,但建议应用程序程序员总是假定这些虚假唤醒可能发生,因此总是在一个循环中等待。

Condition的应用:阻塞队列(使用了两个监视器)

说明:该应用是 java.util.concurrent.locks包中Condition接口中的示例代码。使用了两个Condition分别用于管理取数据的线程,和存数据的线程,这样就可以明确的唤醒需要的一类线程,如果使用一个Condition,当队列满了之后,唤醒的并不一定就是取数据的线程

class BoundedBuffer {

final Lock lock = new ReentrantLock();

final Condition notFull = lock.newCondition();

final Condition notEmpty = lock.newCondition();

final Object[] items = new Object[100];

int putptr, takeptr, count;

public void put(Object x) throws InterruptedException {

lock.lock();

try {

while (count == items.length) //循环判断队列是否已存满

notFull.await(); //如果队列存满了,则要存入数据的线程等待

items[putptr] = x;

if (++putptr == items.length) putptr = 0;//当队列放满,指针回到0

++count; //添加了一个数据

notEmpty.signal(); //队列中有数据了,所以就唤醒取数据的线程

} finally {

lock.unlock();

}

}

public Object take() throws InterruptedException {

lock.lock();

try {

while (count == 0) //循环判断,队列是否有空位

notEmpty.await(); //要取的线程等待

Object x = items[takeptr];

if (++takeptr == items.length) takeptr = 0;

--count; //取走一个,说明队列有空闲的位置,

notFull.signal(); //所以通知存入的线程

return x;

} finally {

lock.unlock();

}

}

}

Condition测试

public class ConditionCommunication {

public static void main(String[] args) {

final Business business = new Business();

new Thread(

new Runnable() {

@Override

public void run() {

for(int i=1;i

business.sub(i);

}

}

}

).start();

for(int i=1;i

business.main(i);

}

}

class Business {

Lock lock = new ReentrantLock();

Condition condition = lock.newCondition();

private boolean bShouldSub = true;

public void sub(int i){

lock.lock();

try{

while(!bShouldSub){

try {

condition.await();

} catch (Exception e) {

e.printStackTrace();

}

}

for(int j=1;j

System.out.println("sub thread sequence of " + j + ",loop of " + i);

}

bShouldSub = false;

condition.signal();

}finally{

lock.unlock();

}

}

public void main(int i){

lock.lock();

try{

while(bShouldSub){

try {

condition.await();

} catch (Exception e) {

e.printStackTrace();

}

}

for(int j=1;j

System.out.println("main thread sequence of " + j + ",loop of " + i);

}

bShouldSub = true;

condition.signal();

}finally{

lock.unlock();

}

}

}

}

输出结果

sub thread sequence of 1,loop of 1

sub thread sequence of 2,loop of 1

main thread sequence of 1,loop of 1

main thread sequence of 2,loop of 1

main thread sequence of 3,loop of 1

main thread sequence of 4,loop of 1

sub thread sequence of 1,loop of 2

sub thread sequence of 2,loop of 2

main thread sequence of 1,loop of 2

main thread sequence of 2,loop of 2

main thread sequence of 3,loop of 2

main thread sequence of 4,loop of 2

sub thread sequence of 1,loop of 3

sub thread sequence of 2,loop of 3

main thread sequence of 1,loop of 3

main thread sequence of 2,loop of 3

main thread sequence of 3,loop of 3

main thread sequence of 4,loop of 3

sub thread sequence of 1,loop of 4

sub thread sequence of 2,loop of 4

main thread sequence of 1,loop of 4

main thread sequence of 2,loop of 4

main thread sequence of 3,loop of 4

main thread sequence of 4,loop of 4

sub thread sequence of 1,loop of 5

sub thread sequence of 2,loop of 5

main thread sequence of 1,loop of 5

main thread sequence of 2,loop of 5

main thread sequence of 3,loop of 5

main thread sequence of 4,loop of 5

使用ReentrantLock和Condition实现一个简单的阻塞队列MyArrayBlockingQueue,如果调用take方法时集合中没有数据,那么调用线程就阻塞;如果调用put方法时,集合数据已满,那么也会引起调用线程阻塞。但是,这两个阻塞的条件时不同的,分别为为notFull和notEmpty

import java.util.concurrent.locks.Condition;

import java.util.concurrent.locks.Lock;

import java.util.concurrent.locks.ReentrantLock;

public class MyArrayBlockingQueue {

// 数据数组

private final T[] items;

// 锁

private final Lock lock = new ReentrantLock();

// 队满的条件

private Condition notFull = lock.newCondition();

// 队空条件

private Condition notEmpty = lock.newCondition();

// 头部索引

private int head;

// 尾部索引

private int tail;

// 数据的个数

private int count;

public MyArrayBlockingQueue(int maxSize) {

items = (T[]) new Object[maxSize];

}

public MyArrayBlockingQueue() {

this(10);

}

public void put(T t) {

lock.lock();

try {

while (count == getCapacity()) {

System.out.println("数据已满,等待");

notFull.await();

}

items[tail] = t;

if (++tail == getCapacity()) {

tail = 0;

}

++count;

notEmpty.signalAll(); // 唤醒等待数据的线程

} catch (InterruptedException e) {

e.printStackTrace();

} finally {

lock.unlock();

}

}

public T take() {

lock.lock();

try {

while (count == 0) {

System.out.println("还没有数据,请等待");

notEmpty.await();

}

T ret = items[head];

items[head] = null;

if (++head == getCapacity()) {

head = 0;

}

--count;

notFull.signalAll(); // 唤醒添加数据的线程

return ret;

} catch (InterruptedException e) {

e.printStackTrace();

} finally {

lock.unlock();

}

return null;

}

public int getCapacity() {

return items.length;

}

public int size() {

lock.lock();

try {

return count;

} finally {

lock.unlock();

}

}

public static void main(String[] args) {

MyArrayBlockingQueue aQueue = new MyArrayBlockingQueue();

aQueue.put(3);

aQueue.put(24);

for (int i = 0; i < 5; i++) {

System.out.println(aQueue.take());

}

}

}

输出结果

3

24

还没有数据,请等待3. Condition练习

一共有3个线程,两个子线程先后循环2次,接着主线程循环3次,接着又回到两 个子线程先后循环2次,再回到主线程又循环3次,如此循环5次。

思路:老二先执行,执行完唤醒老三,老三执行完唤醒老大,老大执行完唤醒老二,以此循环,所以定义3个Condition对象和一个执行标识即可

示例出现的问题:两个文件中有同名类的情况

解决方案:可以将一个文件中的那个同名外部类放进类中,但是静态不能创建内部类的实例对象,所以需要加上static,这样两个类的名称就不一样了。一个是原来的类名,一个是在自己类名前面加上外部类的类名。

import java.util.concurrent.locks.Condition;

import java.util.concurrent.locks.Lock;

import java.util.concurrent.locks.ReentrantLock;

public class ThreeConditionCommunication {

public static void main(String[] args){

final Business business = new Business();

//创建并启动子线程老二

new Thread(new Runnable(){

@Override

public void run() {

for(int i=1;i

business.sub2(i);

}

}

}).start();

//创建并启动子线程老三

new Thread(new Runnable(){

@Override

public void run() {

for(int i=1;i

business.sub3(i);

}

}

}).start();

//主线程

for(int i=1;i

business.main(i);

}

}

static class Business{

Lock lock = new ReentrantLock();

Condition condition1 = lock.newCondition();

Condition condition2 = lock.newCondition();

Condition condition3 = lock.newCondition();

//定义一个变量来决定线程的执行权

private int ShouldSub = 1;

public void sub2(int i){

//上锁,不让其他线程执行

lock.lock();

try{

if(ShouldSub != 2){ //如果不该老二执行,就等待

try {

condition2.await();

} catch (InterruptedException e) {

e.printStackTrace();

}

}

for(int j=1;j

System.out.println("sub thread sequence of"+i+",loop of "+j);

}

ShouldSub = 3; //准备让老三执行

condition3.signal(); //唤醒老三

}finally{

lock.unlock();

}

}

public void sub3(int i){

lock.lock();

try{

if(ShouldSub != 3){

try {

condition3.await();

} catch (InterruptedException e) {

e.printStackTrace();

}

}

for(int j=1;j

System.out.println("sub2 thread sequence of"+i+",loop of "+j);

}

ShouldSub = 1; //准备让老大执行

condition1.signal(); //唤醒老大

}finally{

lock.unlock();

}

}

//主线程

public void main(int i){

lock.lock();

try{

if(ShouldSub!=1){

try {

condition1.await();

} catch (InterruptedException e) {

e.printStackTrace();

}

}

for(int j=1;j

System.out.println("main thread sequence of"+i+", loop of "+j);

}

ShouldSub = 2; //准备让老二执行

condition2.signal(); //唤醒老二

}finally{

lock.unlock();

}

}

}

}

输出结果

main thread sequence of1, loop of 1

main thread sequence of1, loop of 2

main thread sequence of1, loop of 3

sub thread sequence of1,loop of 1

sub thread sequence of1,loop of 2

sub2 thread sequence of1,loop of 1

sub2 thread sequence of1,loop of 2

main thread sequence of2, loop of 1

main thread sequence of2, loop of 2

main thread sequence of2, loop of 3

sub thread sequence of2,loop of 1

sub thread sequence of2,loop of 2

sub2 thread sequence of2,loop of 1

sub2 thread sequence of2,loop of 2

main thread sequence of3, loop of 1

main thread sequence of3, loop of 2

main thread sequence of3, loop of 3

sub thread sequence of3,loop of 1

sub thread sequence of3,loop of 2

sub2 thread sequence of3,loop of 1

sub2 thread sequence of3,loop of 2

main thread sequence of4, loop of 1

main thread sequence of4, loop of 2

main thread sequence of4, loop of 3

sub thread sequence of4,loop of 1

sub thread sequence of4,loop of 2

sub2 thread sequence of4,loop of 1

sub2 thread sequence of4,loop of 2

main thread sequence of5, loop of 1

main thread sequence of5, loop of 2

main thread sequence of5, loop of 3

sub thread sequence of5,loop of 1

sub thread sequence of5,loop of 2

sub2 thread sequence of5,loop of 1

sub2 thread sequence of5,loop of 24. 多路等待和通知

class BoundedBuffer {

final Lock lock = new ReentrantLock();

final Condition notFull = lock.newCondition();

final Condition notEmpty = lock.newCondition();

final Object[] items = new Object[100];

int putptr, takeptr, count;

public void put(Object x) throws InterruptedException {

lock.lock();

try {

while (count == items.length)

notFull.await();

items[putptr] = x;

if (++putptr == items.length) putptr = 0;

++count;

notEmpty.signal();

} finally {

lock.unlock();

}

}

public Object take() throws InterruptedException {

lock.lock();

try {

while (count == 0)

notEmpty.await();

Object x = items[takeptr];

if (++takeptr == items.length) takeptr = 0;

--count;

notFull.signal();

return x;

} finally {

lock.unlock();

}

}

}

  • 发表于:
  • 原文链接https://kuaibao.qq.com/s/20200411A02VSO00?refer=cp_1026
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券