【JVM并发编程专题】——多线程安全——aqs工具与原理
aqs专题——起源上一章,我们学了java的关键字synchronized的应用与原理;我们知道在虚拟机底层,自jdk1.6以后做了很多优化工作;但是可针对于复杂的业务场景,最佳的优化策略可能会发生变化,要是我们利用java的一些机制实现同样的锁优化,即遵循前期失败自旋重试,后期进入阻塞这种设计,那将利于我们设计出更丰富的锁做了基础准备,然而在jdk并发包中,一种被称之为aqs阻塞同步器的类,成为
aqs专题——起源
上一章,我们学了java的关键字synchronized的应用与原理;我们知道在虚拟机底层,自jdk1.6以后做了很多优化工作;但是可针对于复杂的业务场景,最佳的优化策略可能会发生变化,要是我们利用java的一些机制实现同样的锁优化,即遵循前期失败自旋重试,后期进入阻塞这种设计,那将利于我们设计出更丰富的锁做了基础准备,然而在jdk并发包中,一种被称之为aqs阻塞同步器的类,成为了我们设计出各种各样的锁的基础,同样jdk中利用这个同步器设计出了ReentrantLock、CountLantch等线程安全以及协作的工具;
aqs专题——应用——ReentrantLock
非公平锁——可能后面竞争的线程优先拿到锁
class Bank {
private int account = 100;
private Lock lock = new ReentrantLock();
public int getAccount() {
return account;
}
public void save(int money) {
lock.lock();
try{
account += money;
}finally{
lock.unlock();
}
}
}
公平锁——先到先得,排队进行
class Bank {
private int account = 100;
private Lock lock = new ReentrantLock(true);
public int getAccount() {
return account;
}
public void save(int money) {
lock.lock();
try{
account += money;
}finally{
lock.unlock();
}
}
}
Reentrylock——常用api罗列
public static void main(String[] args) throws Exception {
Lock lock = new ReentrantLock();
/**
* @不可中断锁
* 此方式会始终处于等待中,即使调用B.interrupt()也不能中断,除非线程A调用LOCK.unlock()释放锁
*/
lock.lock();
/**
* @可中断锁
* 此方式会等待,但当调用B.interrupt()会被中断等待,并抛出InterruptedException异常,否则会与lock()一样始终处于等待中,直到线程A释放锁
*/
lock.lockInterruptibly();
/**
* @尝试枷锁
* 1、利用是否成功加锁得返回值进行业务代码得执行
* 2,尝试加锁并轮询等待1000秒,传入时间还有TimeUnit时间单位即可
*/
boolean locksucessStatus = lock.tryLock();
locksucessStatus = lock.tryLock(1000, TimeUnit.SECONDS);
}
Reentrylock——newCondition使用
条件队列,主要是在业务上控制,满足一定条件后阻塞线程,满足一定条件后激活线程
public class DataBase {
private List<String> dataRow;
private ReentrantLock lock;
private Condition conditionWrite;
private Condition conditionRead;
private volatile int writeQueueLength;
public DataBase() {
/**
* @创建锁
* 并利用锁生成条件队列,队列中就是一堆阻塞线程;利用通知——等待模式进行线程阻塞和唤醒类似于wait和notfily
* writeQueueLength是实时得获取最新得写队列得数量,如果这个时候写锁存在,那么读锁就只能进入等待队列
*/
this.lock = new ReentrantLock();
this.conditionRead=lock.newCondition();
this.conditionWrite=lock.newCondition();
this.writeQueueLength=lock.getWaitQueueLength(conditionWrite);
/**
* @创建初始化数据
*/
dataRow=new ArrayList<String>();
dataRow.add("1");
dataRow.add("2");
dataRow.add("3");
dataRow.add("4");
dataRow.add("5");
dataRow.add("6");
dataRow.add("8");
}
/**
* @读操作
*/
public void select() throws Exception{
if(writeQueueLength>0){//写锁存在,读锁进入等待队列
conditionRead.await();
}
System.out.println("开始读取数据");
}
/**
* @写操作
*/
public void update() throws Exception{
boolean lockStatus = lock.tryLock();
if(!lockStatus){
conditionWrite.await();
}
System.out.println("正在插入数据");
Thread.sleep(1000);
System.out.println("插入成功");
conditionRead.signalAll();//所有读得数据唤醒
conditionWrite.signal();//唤醒写锁一个继续插
}
}
aqs专题——应用——ReentrantReadWriteLock
上述得Condition,我们只是简单得举例一下使用,并非ReentrantReadWriteLock
得真正实现;那么我慢看看jdk自带得ReentrantReadWriteLock 是怎么应用得
并发原则:
(1)读写锁,读读并发,读写、写写不可并发
(2)读写锁必须释放读锁才能获取写锁,
(3)读写锁,读锁获取是为了写锁时,获取会阻塞读锁;写锁可以保证一段代码的操作的原子性
注意事项:当然千万不要在读锁添加写的操作 不然没意义
public static void main(String[] args) throws Exception {
ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
Lock readLock = reentrantReadWriteLock.readLock();//获取读锁
Lock writeLock = reentrantReadWriteLock.readLock();//获取写锁
readLock.lock();//读读可以并发,读写不能并发
writeLock.lock();//写写不能并发
}
大致看一下源码我们发现
public void lock() {
sync.acquireShared(1);//读锁利用了aqs的共享模式
}
public void lock() {
sync.acquire(1);//写锁用了aqs的互斥模式
}
aqs专题——源码分析——Reentrantlock源码
利用alt+7我们发现Reentrantlock的核心就是这三大aqs同步器作为内部类,实现了Reentrantlock的锁模式,所谓的公平和非公平其实就是同步器的实现不一样
同步器一开始是空的
构造方法中完成了初始化
lock方法就是调用同步器的lock方法
NonfairSync lock实现
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
final void lock() {
/**
* @state是一个共享数字
* 0代表无锁
* 1代表有锁,且锁一次
* 以此类推1以上是重入锁
*/
/**
* (1)利用compareAndSetState首先将共享state,期望此时没有其他人竞争然后更新为1
* 将锁的拥有线程更新为自己,便于后续操作
*/
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
/**
* (2)compareAndSetState失败,进入后续操作(这个操作的流程被封装在了AbstractQueuedSynchronizer中,非公平锁只是提供了模板方法)
*/
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
AbstractQueuedSynchronizerd的acquire流程
public class AbstractQueuedSynchronizer{
/**
* (1)tryAcquire再次尝试 cas
* (2)acquireQueued将线程加入进入阻塞队列
* (3)selfInterrupt中断当前线程
* @param arg
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(AbstractQueuedSynchronizer.Node.EXCLUSIVE), arg))
selfInterrupt();
}
}
NonfairSync的tryAcquire模板方法
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
Reentrantlock的nonfairTryAcquire方法
final boolean nonfairTryAcquire(int acquires) {
/**
* 1、先利用0直接volatie判断是不是0,不是0直接下一步就不用cas这个操作耗费性能
* 2、cas再次比较更新,更改就将拥有着标记为自己,以便于整个环境中对一些特殊判断做特殊处理
*/
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
/**
* 特殊判断如果线程是自己的话就进入重入锁机制
*/
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
FairSync的lock实现,看看公平锁如何实现
【尝试加锁实现】
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
/**
* @只是多了一段代码,hasQueuedPredecessors,判断自己是不是头节点
*/
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
/**
* @如果是当线程,根据可重入的概念,state继续+1
*/
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
public final boolean hasQueuedPredecessors() {
AbstractQueuedSynchronizer.Node t = tail; // 尾部节点
AbstractQueuedSynchronizer.Node h = head;// 头部节点
AbstractQueuedSynchronizer.Node s;
/**
* @如果h==t,证明链表中《头等于尾,只有一个节点就是当前》直接返回fasle,,整体取反直接true
* @如果(s = h.next) == null,《头节点没有下一个节点,只有一个节点》,整体返回false,取反变true 证明当前线程就是头节点
* @不然就最后判断 ,s.thread != Thread.currentThread(),《下一个节点是当前线程》,整体返回false,取反变true
*/
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
我们继续回到外部方法;
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
【将线程抽象成节点】——阻塞队列其实就是一个node结构的链表
private Node addWaiter(Node mode) {
/**
*@将当前线程封装成node节点
*/
Node node = new Node(Thread.currentThread(), mode);
/**
* @建一个指针指向尾节点
*/
Node pred = tail;
/**
* @用比较更新安全的替换最后一个节点为当前节点并将之前的最后节点的下一个节点搞成自己
*/
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
/**
* @如果尾节点是空
* 为了优化,懒加载没初始化,此时初始化一个空节点
*/
enq(node);
return node;
}
private Node enq(final Node node) {
/**
* @之所以无限循环是为了不断cas重试,但是成功后只会执行一次
*/
for (;;) {
Node t = tail;
/**
* @初始化一个空节点
*/
if (t == null) {
if (compareAndSetHead(new Node()))
tail = head;
}
/**
* @最后将第一个节点的下个节点设为当前节点
*/
else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
创建完之后,就开始以下逻辑
【aqs自旋逻辑+aqs阻塞逻辑】
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
/**
* @返回尾部节点
*/
final Node p = node.predecessor();
/**
* @再次尝试获取锁
* (1)自己的上一个节点是头节点
* (2)尝试获取锁成功
*/
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
/**
* @对当前点停止尝试获锁
* 如果当前节点之前的节点已经进入阻塞状态了,那么就可以判定当前节点不可能获取到锁,为了防止CPU不停的执行for循环
* 条件(1)前继节点还是空闲状体
* 条件(2)用LockSupport方法阻塞当前线程,并且当亲线程被添加到了wattier中中断了
*/
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
CANCELLED 取消状态
SIGNAL 等待触发状态,前节点可能是head或者前节点为取消状态CANCELLED
CONDITION 等待条件状态,在等待队列中
PROPAGATE 状态需要向后传播
/**
* @param pred 前继节点(当前节点的上一个节点)
* @param node 当前节点
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/**
* @Park的条件,前继节点还在等待触发
* 前继节点还在等待触发,还没当前节点的什么事儿,所以当前节点可以被park
*/
return true;
if (ws > 0) {
/**
* @清除CANCELLED的节点,在链表中清除
* 前继节点是CANCELLED ,则需要充同步队列中删除,
* 并检测前继节点的前继状态,若还是为CANCELLED ,还需要重复上述步
* 这样就清除了所有没用的节点
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/**
* 到这一步,waitstatus只有可能有2种状态,一个是0,一个是PROPAGATE,无论是哪个都需要把当前节点的状态设置为SIGNAL
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
解锁逻辑
ReentrantLock的unlock
public void unlock() {
sync.release(1);
}
AbstractQueuedSynchronizer的Release
public final boolean release(int arg) {
if (tryRelease(arg)) {
/**
* 拿出头节点
* 首先头不是空切等待状态不是0,
* 然后就解锁头节点
* 注意,公平和非公平都是整个解决方法,然而实现上只需要决定放入队列的位置就能实现公平性,解决仍然只解头部
*/
AbstractQueuedSynchronizer.Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
ReentrantLock的Sync的tryRelease
protected final boolean tryRelease(int releases) {
/**
* 每次-1,重入锁问题
*/
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
/**
* 默认一个false,就是避免重入锁还没完
*/
boolean free = false;
/**
* @重入锁全部减完了,置空
*/
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;//如果是重入锁则是false不能进行放行队列的操作
}
aqs专题——源码分析——基本结构
那么我们来整体看看aqs的内部结构吧!以便于系统性的简洁的掌握aqs的内容;
类名:AbstractQueuedSynchronizer
阻塞队列的线程安全:内置了大量的unsafe方法以及volatile保证了比较更新的原子性以及数值修改的共享;
利用unsafe获取字段的偏移量
利用unsafe获取主内存的值
利用unsafe比较更小主内存的值
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long stateOffset;
private static final long headOffset;
private static final long tailOffset;
private static final long waitStatusOffset;
private static final long nextOffset;
static {
try {
stateOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
headOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
tailOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
waitStatusOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("waitStatus"));
nextOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("next"));
} catch (Exception ex) { throw new Error(ex); }
}
private final boolean compareAndSetHead(Node update) {
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
private static final boolean compareAndSetWaitStatus(Node node,int expect,int update) {
return unsafe.compareAndSwapInt(node, waitStatusOffset,
expect, update);
}
private static final boolean compareAndSetNext(Node node,Node expect,Node update) {
return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
}
}
阻塞队列父类同步器拥有者:
阻塞队列的优化细节:每当一个线程尝试获取锁时,获取失败进入自旋状态;依据周边节点的watiStaus作为判断条件来决定何时停止自旋进入阻塞;
阻塞队列的运作方式:LockSupport对线程进行阻塞和唤醒
阻塞队列的数据结构:Node
static final class Node {
/**
* @节点关键状态
*/
static final Node EXCLUSIVE = null;//标识独占模式的节点,独占模式nextWaiter变量是null
static final int CANCELLED = 1;//取消状态(被中断或者等待超时,该状态的节点不再被使用)
static final int SIGNAL = -1;//唤醒状态
static final int CONDITION = -2; //条件队列中,调用signal()之后,被唤醒的节点将从等待队列中转移到同步队列中继续等待
static final int PROPAGATE = -3; //共享模式中,可以同步唤醒的数字标记
volatile int waitStatus;//保存节点状态,值为 0、CANCELLED、SIGNAL、CONDITION、PROPAGATE
/**
* @链表关键数据结构
*/
volatile Node prev; //链表的上一个节点
volatile Node next;//链表的下一个节点
volatile Thread thread; //保存被阻塞的线程
/**
* 找到前驱节点
* @return
* @throws NullPointerException
*/
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() {}
/**
* 直接指定下一个等待者创建节点
* @param nextWaiter
*/
Node(Node nextWaiter) {
this.nextWaiter = nextWaiter;
U.putObject(this, THREAD, Thread.currentThread());
}
/**
* 直接指定等待状态创建节点
* @param nextWaiter
*/
Node(int waitStatus) {
U.putInt(this, WAITSTATUS, waitStatus);
U.putObject(this, THREAD, Thread.currentThread());
}
/**
* @特殊模式(共享模式)
*/
static final Node SHARED = new Node(); //标识共享模式的节点,共享模式下Node节点的nextWaiter变量设置为这个值
final boolean isShared() { //判断节点是否是共享模式,通过判断nextWaiter==SHARED
return nextWaiter == SHARED;
}
/**
* @特殊模式(条件队列)Condition用于条件队列的实现,非共享模式
* 创建了一个新的链表方向,用nextWaiter作为另一个方向的引用会被外部参数替代,比如newConditon
*/
Node nextWaiter;
/**
* @链表数据的内存地址,cas的时候需要
*/
static {
try {
//分别拿到Node节点的next、prev、thread、waitStatus变量的句柄,CAS通过句柄修改这些变量
NEXT = U.objectFieldOffset
(Node.class.getDeclaredField("next"));
PREV = U.objectFieldOffset
(Node.class.getDeclaredField("prev"));
THREAD = U.objectFieldOffset
(Node.class.getDeclaredField("thread"));
WAITSTATUS = U.objectFieldOffset
(Node.class.getDeclaredField("waitStatus"));
} catch (ReflectiveOperationException e) {
throw new Error(e);
}
}
private static final long NEXT;//下一个
static final long PREV;//上一个
private static final long THREAD;//线程
private static final long WAITSTATUS;//等待状态
private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
/** CAS设置节点的状态 */
final boolean compareAndSetWaitStatus(int expect, int update) {
return U.compareAndSwapInt(this, WAITSTATUS, expect, update);
}
/** CAS设置后继节点 */
final boolean compareAndSetNext(Node expect, Node update) {
return U.compareAndSwapObject(this, NEXT, expect, update);
}
}
aqs专题——源码分析——条件模式(等待通知模型)
ReentrantLock的方法
public Condition newCondition() {
return sync.newCondition();
}
sync的方法
final ConditionObject newCondition() {
return new ConditionObject();
}
条件队列,场景就是用户自定义条件进行线程的阻塞和唤醒,类似于wait,nofily,核心原理主要是在aqs内部实现了ConditionObject的新型链表结构;反正都是链表处理各种特殊情况考虑一波,由于雷同性太大我就部仔细去 研究,大概大家有个思路就行
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.LockSupport;
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/**
* @维护头和尾部
*/
private transient AbstractQueuedSynchronizer.Node firstWaiter;
private transient AbstractQueuedSynchronizer.Node lastWaiter;
public ConditionObject() { }
/**
* @当前线程进入等待
*/
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
/**
* @添加节点
*/
AbstractQueuedSynchronizer.Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
/**
*
*/
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
/**
* @向添加队列中添加新的节点,为当前线程
*/
private AbstractQueuedSynchronizer.Node addConditionWaiter() {
AbstractQueuedSynchronizer.Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != AbstractQueuedSynchronizer.Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
AbstractQueuedSynchronizer.Node node = new AbstractQueuedSynchronizer.Node(Thread.currentThread(), AbstractQueuedSynchronizer.Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
/**
* @唤醒一个
* @param first
*/
private void doSignal(AbstractQueuedSynchronizer.Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
/**
* @唤醒所有
* @param first
*/
private void doSignalAll(AbstractQueuedSynchronizer.Node first) {
lastWaiter = firstWaiter = null;
do {
AbstractQueuedSynchronizer.Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
/**
* 将被取消的Node从链表中剔除
*/
private void unlinkCancelledWaiters() {
AbstractQueuedSynchronizer.Node t = firstWaiter;
AbstractQueuedSynchronizer.Node trail = null;
while (t != null) {
AbstractQueuedSynchronizer.Node next = t.nextWaiter;
if (t.waitStatus != AbstractQueuedSynchronizer.Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
} else
trail = t;
t = next;
}
}
}
aqs专题——源码分析——共享模式原理(待更新)
aqs专题——源码分析——读写锁原理(待更新)
JVM并发编程专题章节:
多线程安全——aqs
多线程协作
多线程管理
多线程框架
多线程测试
GodSchool
致力于简洁的知识工程,输出高质量的知识产出,我们一起努力
博主私人微信:supperlzf
更多推荐
所有评论(0)