jdk怎么设置三个环境变量(JDK源码详解Semaphore)
jdk怎么设置三个环境变量(JDK源码详解Semaphore)内部嵌套类 Sync:private final Sync sync; // 初始化 Semaphore,传入指定的许可数量,非公平 public Semaphore(int permits) { sync = new NonfairSync(permits); } // 初始化 Semaphore,传入指定的许可数量,指定是否公平 public Semaphore(int permits boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); }构造器初始化了 Sync 变量,根据传入的 fair 值指定为 FairSync 或 NonFairSync,下面分析这三个类。Semaphore 的方法如下:其中主要方法是 acquire() 和 release()
SemaphoreSemaphore 是并发包中的一个工具类,可理解为信号量。通常可以作为限流器使用,即限制访问某个资源的线程个数,比如用于限制连接池的连接数。
打个通俗的比方,可以把 Semaphore 理解为一辆公交车:车上的座位数(初始的“许可” permits 数量)是固定的,行驶期间如果有人上车(获取许可),座位数(许可数量)就会减少,当人满的时候不能再继续上车了(获取许可失败);而有人下车(释放许可)后就空出了一些座位,其他人就可以继续上车了。
下面具体分析其代码实现。
代码分析
Semaphore 的方法如下:
其中主要方法是 acquire() 和 release() 相关的一系列方法,它们的作用类似。我们先从构造器开始分析。
构造器
private final Sync sync;
// 初始化 Semaphore,传入指定的许可数量,非公平
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
// 初始化 Semaphore,传入指定的许可数量,指定是否公平
public Semaphore(int permits boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
构造器初始化了 Sync 变量,根据传入的 fair 值指定为 FairSync 或 NonFairSync,下面分析这三个类。
内部嵌套类 Sync:
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
// 构造器,将父类 AQS 的 state 变量初始化为给定的 permits
Sync(int permits) {
setState(permits);
}
// 非公平方式尝试获取许可(减少 state 的值)
final int nonfairTryAcquireShared(int acquires) {
// 自旋操作
for (;;) {
// 获取许可值(state),并尝试 CAS 修改为减去后的结果
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available remaining))
return remaining;
}
}
// 释放许可(增加 state 的值)
protected final boolean tryReleaseShared(int releases) {
for (;;) {
// 操作与获取类似,不同的在于此处是增加 state 值
int current = getState();
int next = current releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current next))
return true;
}
}
// 一些方法未给出...
}
可以看到 Sync 类继承自 AQS,并重写了 AQS 的 tryReleaseShared 方法,其中获取和释放许可分别对应的是对 AQS 中 state 值的减法和加法操作。具体可参考前文对 AQS 共享模式的分析「万字长文详解!JDK源码-AbstractQueuedSynchronizer(3)」。
NonFairSync (非公平版本实现):
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
// 调用父类 Sync 的构造器来实现
NonfairSync(int permits) {
super(permits);
}
// 重写 AQS 的 tryAcquireShared 方法,代码实现在父类 Sync 中
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
FairSync (公平版本实现):
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
// 构造器调用父类 Sync 的构造器来实现
FairSync(int permits) {
super(permits);
}
// 重写 AQS 的 tryAcquireShared 方法,尝试获取许可(permit)
protected int tryAcquireShared(int acquires) {
for (;;) {
// 若队列中有其他线程等待,则获取失败(这就是体现“公平”的地方)
if (hasQueuedPredecessors())
return -1;
// 获取当前的许可值
int available = getState();
// 计算剩余值
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available remaining))
return remaining;
}
}
}
PS: 体现“公平”的地方在于 tryAcquireShared 方法中,公平的版本会先判断队列中是否有其它线程在等待(hasQueuedPredecessors 方法)。
主要方法的代码实现:
// 获取一个许可(可中断)
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
// 获取一个许可(不响应中断)
public void acquireUninterruptibly() {
sync.acquireShared(1);
}
// 尝试获取一个许可
public boolean tryAcquire() {
return sync.nonfairTryAcquireShared(1) >= 0;
}
// 尝试获取一个许可(有超时等待)
public boolean tryAcquire(long timeout TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1 unit.toNanos(timeout));
}
// 释放一个许可
public void release() {
sync.releaseShared(1);
}
还有一系列类似的操作,只不过获取/释放许可的数量可以指定:
// 获取指定数量的许可(可中断)
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
// 获取指定数量的许可(不可中断)
public void acquireUninterruptibly(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireShared(permits);
}
// 尝试获取指定数量的许可
public boolean tryAcquire(int permits) {
if (permits < 0) throw new IllegalArgumentException();
return sync.nonfairTryAcquireShared(permits) >= 0;
}
// 尝试获取指定数量的许可(有超时等待)
public boolean tryAcquire(int permits long timeout TimeUnit unit)
throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
return sync.tryAcquireSharedNanos(permits unit.toNanos(timeout));
}
// 释放指定数量的许可
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}
可以看到,Semaphore 的主要方法都是在嵌套类 FairSync 和 NonFairSync 及其父类 Sync 中实现的,内部嵌套类也是 AQS 的典型用法。
场景举例
为了便于理解 Semaphore 的用法,下面简单举例分析(仅供参考):
public class SemaphoreTest {
public static void main(String[] args) {
// 初始化 Semaphore
// 这里的许可数为 2,即同时最多有 2 个线程可以获取到
Semaphore semaphore = new Semaphore(2);
for (int i = 0; i < 50; i ) {
new Thread(() -> {
try {
// 获取许可
semaphore.acquire();
System.out.println(Thread.currentThread().getName() " 正在执行..");
// 模拟操作
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 释放许可
semaphore.release();
}
}).start();
}
}
}
/* 执行结果(仅供参考):
Thread-0 正在执行..
Thread-1 正在执行..
Thread-2 正在执行..
Thread-3 正在执行..
...
*/
这里把 Semaphore 的初始许可值设为 2,表示最多有两个线程可同时获取到许可(运行程序可发现线程是两两一起执行的)。设置为其他值也是类似的。
比较特殊的是,如果把 Semaphore 的初始许可值设为 1,可以当做“互斥锁”来使用。
小结
Semaphore 是并发包中的一个工具类,其内部是基于 AQS 共享模式实现的。通常可以作为限流器使用,比如限定连接池等的大小。
BlockingQueueBlockingQueue 意为“阻塞队列”,它在 JDK 中是一个接口。
所谓阻塞,简单来说就是当某些条件不满足时,让线程处于等待状态。例如经典的“生产者-消费者”模型,当存放产品的容器满的时候,生产者处于等待状态;而当容器为空的时候,消费者处于等待状态。阻塞队列的概念与该场景类似。
BlockingQueue 的继承关系如下:
可以看到 BlockingQueue 继承自 Queue 接口,它的常用实现类有 ArrayBlockingQueue、LinkedBlockingQueue、DelayQueue 等。同前面一样,本文先分析 BlockingQueue 接口的方法定义,后文再分析其实现类的代码。
PS: 从这个继承体系也可以看出来,直接实现接口的是抽象类,而实现类则通常继承自抽象类。为什么要这样设计呢?因为有些接口的实现类会有多个,而这些类之间有一部分逻辑是相似或者相同的,因此就把这部分逻辑提取到抽象类中,避免代码冗余。
代码分析
BlockingQueue 的方法定义如下:
其方法简单分析如下:
// 将指定元素插入到队列,若成功返回 true,否则抛出异常
boolean add(E e);
// 将指定元素插入到队列,若成功返回 true,否则返回 false
boolean offer(E e);
// 将指定元素插入到队列,若队列已满则等待
void put(E e) throws InterruptedException;
// 将指定元素插入到队列,若成功返回 true,否则返回 false,有超时等待
boolean offer(E e long timeout TimeUnit unit)
throws InterruptedException
// 获取并移除队列的头部,若为空则等待
E take() throws InterruptedException;
// 获取并移除队列的头部,有超时等待(若超时返回 null)
E poll(long timeout TimeUnit unit)
throws InterruptedException;
// 返回队列可以接收的容量,若无限制则返回 Integer.MAX_VALUE
int remainingCapacity();
// 删除指定的元素(如果存在),返回是否删除成功
boolean remove(Object o);
// 是否包含指定元素
public boolean contains(Object o);
// 从此队列中删除所有可用元素,并将它们添加到给定集合中
int drainTo(Collection<? super E> c);
// 从此队列中删除所有可用元素,并将它们添加到给定集合中(指定大小)
int drainTo(Collection<? super E> c int maxElements);
主要方法小结如下:
Throws exceptions |
Special value |
Blocks |
Time out | |
Insert |
add(e) |
offer(e) |
put(e) |
offer(e time unit) |
Remove |
remove() |
poll() |
take() |
poll(time unit) |
Examine |
element() |
peek() |
- |
- |
Queue 接口前文「Queue Deque」已进行分析,这里不再赘述。
典型用法
生产者:
class Producer implements Runnable {
private final BlockingQueue queue;
Producer(BlockingQueue q) { queue = q; }
public void run() {
try {
// 将产品放入队列
while (true) { queue.put(produce()); }
} catch (InterruptedException ex) { ... handle ...}
}
Object produce() { ... }
}
消费者:
class Consumer implements Runnable {
private final BlockingQueue queue;
Consumer(BlockingQueue q) { queue = q; }
public void run() {
try {
// 从队列中消费产品
while (true) { consume(queue.take()); }
} catch (InterruptedException ex) { ... handle ...}
}
void consume(Object x) { ... }
}
测试类:
class Setup {
void main() {
BlockingQueue q = new SomeQueueImplementation();
// 创建并启动一个生产者和两个消费者
Producer p = new Producer(q);
Consumer c1 = new Consumer(q);
Consumer c2 = new Consumer(q);
new Thread(p).start();
new Thread(c1).start();
new Thread(c2).start();
}
}
PS: 上述代码是 BlockingQueue 的文档提供的,仅供参考。
小结
BlockingQueue 是一个接口,它主要定义了阻塞队列的一些方法。阻塞队列在并发编程中使用较多,比如线程池。
ArrayBlockingQueue上面「BlockingQueue」简要分析了 BlockingQueue 接口的主要方法,ArrayBlockingQueue 就是该接口的一个主要实现类,本文分析该类的常用方法实现。
ArrayBlockingQueue 的类继承结构如下:
从 ArrayBlockingQueue 的名字大概可以猜出来,它的内部是由数组实现的,下面分析其代码实现。
代码分析
构造器
构造器 1:
// 构造器 1:初始化 ArrayBlockingQueue 对象,使用给定的容量
public ArrayBlockingQueue(int capacity) {
// 调用构造器 2 进行初始化,默认使用非公平锁
this(capacity false);
}
构造器 2:
// 构造器 2:使用给定容量及是否公平初始化 ArrayBlockingQueue 对象
public ArrayBlockingQueue(int capacity boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
// 用给定的容量初始化内部数组
this.items = new Object[capacity];
// 创建锁对象(根据 fair 参数确定是否公平锁)
lock = new ReentrantLock(fair);
// lock 绑定两个 Condition 条件
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
构造器 3:
// 构造器 3:使用给定的容量、是否公平,及给定的集合初始化 ArrayBlockingQueue
public ArrayBlockingQueue(int capacity boolean fair
Collection<? extends E> c) {
// 使用构造器 2 初始化 ArrayBlockingQueue 对象
this(capacity fair);
final ReentrantLock lock = this.lock;
lock.lock(); // Lock only for visibility not mutual exclusion
try {
int i = 0;
try {
// 遍历给定集合的元素,将其插入数组
for (E e : c) {
checkNotNull(e);
items[i ] = e;
}
// 注意可能会发生数组越界
} catch (ArrayIndexOutOfBoundsException ex) {
throw new IllegalArgumentException();
}
// 数组中元素的数量
count = i;
// 入队操作(put、offer 等方法)的数组下标,若数组已满则为 0
putIndex = (i == capacity) ? 0 : i;
} finally {
// 注意释放锁
lock.unlock();
}
}
主要成员变量
/** The queued items */
// 内部保存元素的数组
final Object[] items;
/** items index for next take poll peek or remove */
// 出队操作索引
int takeIndex;
/** items index for next put offer or add */
// 入队操作索引
int putIndex;
/** Number of elements in the queue */
int count;
/*
* Concurrency control uses the classic two-condition algorithm
* found in any textbook.
* 双条件(notEmpty、notFull)算法用于并发控制
*/
/** Main lock guarding all access */
// 使用 ReentrantLock 保证线程安全
final ReentrantLock lock;
/** Condition for waiting takes */
// 等待 take 操作(消费)的条件
private final Condition notEmpty;
/** Condition for waiting puts */
// 等待 put 操作(生产)的条件
private final Condition notFull;
主要入队方法:add(E) offer(E) offer(E timeout Unit) put(E)
1. add(E) 方法
public boolean add(E e) {
// 调用父类 AbstractQueue 的 add 方法
return super.add(e);
}
// AbstractQueue 的 add 方法
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
add(E) 方法调用了父类 AbstractQueue 的 add(E) 方法,可以看到,实际上还是调用了 offer(E) 方法。因此 add(E) 和 offer(E) 实现基本是一致的,下面分析 offer(E) 方法。
2. offer(E) offer(E timeout Unit) 方法
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 若队列已满,立即返回 false
if (count == items.length)
return false;
else {
// 入队
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
enqueue 方法:
// 入队操作
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
// 若队列已满,则下标置为 0
if ( putIndex == items.length)
putIndex = 0;
count ;
// 唤醒 notEmpty 条件下等待的线程
notEmpty.signal();
}
offer(E) 方法是将一个元素入队:若队列已满直接返回 false,否则执行入队操作,并唤醒 notEmpty 条件下等待的线程。
以“生产者-消费者”模型类比,执行 offer(E) 操作后表示队列已经有产品了(不为空,即 notEmpty),消费者可以消费了。
offer(E timeout Unit) 方法操作与 offer(E) 类似,只是多了超时等待,如下:
public boolean offer(E e long timeout TimeUnit unit)
throws InterruptedException {
checkNotNull(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(e);
return true;
} finally {
lock.unlock();
}
}
3. put(E) 方法
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 队列满的时候,notFull 条件等待
while (count == items.length)
notFull.await();
// 入队
enqueue(e);
} finally {
lock.unlock();
}
}
put(E) 也是将一个元素入队:若队列已满,则 notFull 条件下的线程等待。
以“生产者-消费者”模型类比,就是容器已满,生产者等待;否则执行入队,并唤醒消费者。
入队方法小结
1. add(E): 入队成功返回 true,否则抛出 IllegalStateException 异常;
2. offer(E): 入队成功返回 true,失败返回 false;
3. offer(E timeout Unit): 同 offer(E),加了超时等待;
4. put(E): 无返回值,队列满的时候等待。
主要出队方法:poll() poll(long unit) take() peek()
1. poll() 方法
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 队列为空时返回 null,否则将 takeIndex 位置元素出队
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
2. take(E) 方法
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 队列为空时等待
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
3. poll(E unit) 方法
public E poll(long timeout TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
return dequeue();
} finally {
lock.unlock();
}
}
与 poll() 方法操作类似,只是多了超时等待。
上述三个方法都使用 dequeue 方法进行出队,如下:
// 出队操作
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
// 获取 takeIndex 位置的元素
E x = (E) items[takeIndex];
// 将 该位置清空
items[takeIndex] = null;
// 队列已经空了
if ( takeIndex == items.length)
takeIndex = 0;
count--;
// 迭代器操作用到,本文暂不深入分析
if (itrs != null)
itrs.elementDequeued();
// 队列已经不满(not full)了,可以继续生产
notFull.signal();
return x;
}
4. peek() 方法
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return itemAt(takeIndex); // null when queue is empty
} finally {
lock.unlock();
}
}
// 返回数组中指定位置的元素
final E itemAt(int i) {
return (E) items[i];
}
peek() 方法与前面几个出队操作不同,peek 方法只会获取队列的头元素,而不会将其删除。
出队方法小结
1. poll(): 获取队列头部元素,并将其移除,队列为空时返回 null;
2. take(): 获取队列头部元素,并将其移除,队列为空时阻塞等待;
3. poll(long unit): 获取队列头部元素,并将其移除,队列为空时等待一段时间,若超时返回 null;
4. peek(): 获取队列头部元素,但不移除该元素。
小结
1. ArrayBlockingQueue 是基于数组的阻塞队列实现,它在初始化时需要指定容量;
2. 内部使用了 ReentrantLock 保证线程安全;
3. 常用方法:
入队:add offer put
出队:poll take peek
本文分析了其常用的方法,此外,还有一些方法使用频率没那么高且稍微复杂,例如 iterator() 和 drainTo(),后文再进行分析。