本文共 5788 字,大约阅读时间需要 19 分钟。
Semaphore是一个线程控制器,初始化时,给定一个线程数量,用来控制同一时刻,只能有指定数量的线程在执行,如果有其他线程,只能等待其中的线程释放掉之后,才可以继续执行。
给出一个简单的使用例子
public class SemaphoreDemo { public final static int SEM_SIZE = 10; public static void main(String[] args) { Semaphore semaphore = new Semaphore(SEM_SIZE); MyThread t1 = new MyThread("t1", semaphore); MyThread t2 = new MyThread("t2", semaphore); t1.start(); t2.start(); int permits = 5; System.out.println(Thread.currentThread().getName() + " trying to acquire"); try { semaphore.acquire(permits); System.out.println(Thread.currentThread().getName() + " acquire successfully"); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { semaphore.release(); System.out.println(Thread.currentThread().getName() + " release successfully"); } }}class MyThread extends Thread { private Semaphore semaphore; public MyThread(String name, Semaphore semaphore) { super(name); this.semaphore = semaphore; } public void run() { int count = 3; System.out.println(Thread.currentThread().getName() + " trying to acquire"); try { semaphore.acquire(count); System.out.println(Thread.currentThread().getName() + " acquire successfully"); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { semaphore.release(count); System.out.println(Thread.currentThread().getName() + " release successfully"); } }}
执行结果
main trying to acquiremain acquire successfullyt2 trying to acquiret2 acquire successfullyt1 trying to acquiremain release successfullyt1 acquire successfullyt2 release successfullyt1 release successfully
main线程先去尝试获取,传入进去的参数是5,这时候,state的值由最初设定的10,变为5,接下来,t2线程尝试去获取,将state的值降低减到2,当t1线程尝试去获取时,发现state的值,是小于acquire的值,获取失败,进行挂起,当main线程释放之后,t1线程就可以继续执行。
Semaphore的源码分析
Semaphore的内部结构是
Sync, 分别由NonfairSync 和 FairSync实现,整体来说和ReentrantLock比较类似,不同的是,sync方法当中实现的是AQS中的tryAcquireShared,对共享锁的获取方式和tryReleaseShared,
当线程中调用acquire方法的执行代码逻辑
public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
在AQS类中
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
默认的非公平的情况下,会判断当前state值和acquire,返回剩余的state值
final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }
如果小于0,doAcquireSharedInterruptibly方法
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); //将当前线程添加到链表最后 boolean failed = true; try { for (;;) { final Node p = node.predecessor(); //找到当前节点的前驱节点 if (p == head) { int r = tryAcquireShared(arg); // 如果是头节点,判断state的值是否大于0 if (r >= 0) { setHeadAndPropagate(node, r); //如果state值大于0,将当前节点设为head节点 p.next = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && //如果state的值小于0,挂起节点状态位SIGNAL的后一个节点 parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
关于release方法
public void release() { sync.releaseShared(1); }
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) return true; }}
释放时,将state的值,在原来基础上加1,如果更新成功返回true,接下来执行释放操作
private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; //获取到头节点的状态 if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) //将头结点状态更新为0 continue; // loop to recheck cases unparkSuccessor(h); //唤醒头结点的下个节点 } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
唤醒头结点的后一个节点
以上分析,可以看出Semaphore和CountDownLatch的实现方式是很类似的。两者都是用的是同步队列,而CyclicBarrier使用的是AQS中的条件队列。
如有问题欢迎指正~
转载地址:http://zlvti.baihongyu.com/