快捷搜索:  汽车  科技

jdk怎么设置三个环境变量(JDK源码详解Semaphore)

jdk怎么设置三个环境变量(JDK源码详解Semaphore)内部嵌套类 Sync:private final Sync sync; // 初始化 Semaphore,传入指定的许可数量,非公平 public Semaphore(int permits) { sync = new NonfairSync(permits); } // 初始化 Semaphore,传入指定的许可数量,指定是否公平 public Semaphore(int permits boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); }构造器初始化了 Sync 变量,根据传入的 fair 值指定为 FairSync 或 NonFairSync,下面分析这三个类。Semaphore 的方法如下:其中主要方法是 acquire() 和 release()

Semaphore

Semaphore 是并发包中的一个工具类,可理解为信号量。通常可以作为限流器使用,即限制访问某个资源的线程个数,比如用于限制连接池的连接数。

打个通俗的比方,可以把 Semaphore 理解为一辆公交车:车上的座位数(初始的“许可” permits 数量)是固定的,行驶期间如果有人上车(获取许可),座位数(许可数量)就会减少,当人满的时候不能再继续上车了(获取许可失败);而有人下车(释放许可)后就空出了一些座位,其他人就可以继续上车了。

下面具体分析其代码实现。

代码分析

Semaphore 的方法如下:

jdk怎么设置三个环境变量(JDK源码详解Semaphore)(1)

其中主要方法是 acquire() 和 release() 相关的一系列方法,它们的作用类似。我们先从构造器开始分析。

构造器

private final Sync sync; // 初始化 Semaphore,传入指定的许可数量,非公平 public Semaphore(int permits) { sync = new NonfairSync(permits); } // 初始化 Semaphore,传入指定的许可数量,指定是否公平 public Semaphore(int permits boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); }

构造器初始化了 Sync 变量,根据传入的 fair 值指定为 FairSync 或 NonFairSync,下面分析这三个类。

内部嵌套类 Sync:

abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 1192457210091910933L; // 构造器,将父类 AQS 的 state 变量初始化为给定的 permits Sync(int permits) { setState(permits); } // 非公平方式尝试获取许可(减少 state 的值) final int nonfairTryAcquireShared(int acquires) { // 自旋操作 for (;;) { // 获取许可值(state),并尝试 CAS 修改为减去后的结果 int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available remaining)) return remaining; } } // 释放许可(增加 state 的值) protected final boolean tryReleaseShared(int releases) { for (;;) { // 操作与获取类似,不同的在于此处是增加 state 值 int current = getState(); int next = current releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current next)) return true; } } // 一些方法未给出... }

可以看到 Sync 类继承自 AQS,并重写了 AQS 的 tryReleaseShared 方法,其中获取和释放许可分别对应的是对 AQS 中 state 值的减法和加法操作。具体可参考前文对 AQS 共享模式的分析「万字长文详解!JDK源码-AbstractQueuedSynchronizer(3)」。

NonFairSync (非公平版本实现):

static final class NonfairSync extends Sync { private static final long serialVersionUID = -2694183684443567898L; // 调用父类 Sync 的构造器来实现 NonfairSync(int permits) { super(permits); } // 重写 AQS 的 tryAcquireShared 方法,代码实现在父类 Sync 中 protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); } }

FairSync (公平版本实现):

static final class FairSync extends Sync { private static final long serialVersionUID = 2014338818796000944L; // 构造器调用父类 Sync 的构造器来实现 FairSync(int permits) { super(permits); } // 重写 AQS 的 tryAcquireShared 方法,尝试获取许可(permit) protected int tryAcquireShared(int acquires) { for (;;) { // 若队列中有其他线程等待,则获取失败(这就是体现“公平”的地方) if (hasQueuedPredecessors()) return -1; // 获取当前的许可值 int available = getState(); // 计算剩余值 int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available remaining)) return remaining; } } }

PS: 体现“公平”的地方在于 tryAcquireShared 方法中,公平的版本会先判断队列中是否有其它线程在等待(hasQueuedPredecessors 方法)。

主要方法的代码实现:

// 获取一个许可(可中断) public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); } // 获取一个许可(不响应中断) public void acquireUninterruptibly() { sync.acquireShared(1); } // 尝试获取一个许可 public boolean tryAcquire() { return sync.nonfairTryAcquireShared(1) >= 0; } // 尝试获取一个许可(有超时等待) public boolean tryAcquire(long timeout TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1 unit.toNanos(timeout)); } // 释放一个许可 public void release() { sync.releaseShared(1); }

还有一系列类似的操作,只不过获取/释放许可的数量可以指定:

// 获取指定数量的许可(可中断) public void acquire(int permits) throws InterruptedException { if (permits < 0) throw new IllegalArgumentException(); sync.acquireSharedInterruptibly(permits); } // 获取指定数量的许可(不可中断) public void acquireUninterruptibly(int permits) { if (permits < 0) throw new IllegalArgumentException(); sync.acquireShared(permits); } // 尝试获取指定数量的许可 public boolean tryAcquire(int permits) { if (permits < 0) throw new IllegalArgumentException(); return sync.nonfairTryAcquireShared(permits) >= 0; } // 尝试获取指定数量的许可(有超时等待) public boolean tryAcquire(int permits long timeout TimeUnit unit) throws InterruptedException { if (permits < 0) throw new IllegalArgumentException(); return sync.tryAcquireSharedNanos(permits unit.toNanos(timeout)); } // 释放指定数量的许可 public void release(int permits) { if (permits < 0) throw new IllegalArgumentException(); sync.releaseShared(permits); }

可以看到,Semaphore 的主要方法都是在嵌套类 FairSync 和 NonFairSync 及其父类 Sync 中实现的,内部嵌套类也是 AQS 的典型用法。

场景举例

为了便于理解 Semaphore 的用法,下面简单举例分析(仅供参考):

public class SemaphoreTest { public static void main(String[] args) { // 初始化 Semaphore // 这里的许可数为 2,即同时最多有 2 个线程可以获取到 Semaphore semaphore = new Semaphore(2); for (int i = 0; i < 50; i ) { new Thread(() -> { try { // 获取许可 semaphore.acquire(); System.out.println(Thread.currentThread().getName() " 正在执行.."); // 模拟操作 TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } finally { // 释放许可 semaphore.release(); } }).start(); } } } /* 执行结果(仅供参考): Thread-0 正在执行.. Thread-1 正在执行.. Thread-2 正在执行.. Thread-3 正在执行.. ... */

这里把 Semaphore 的初始许可值设为 2,表示最多有两个线程可同时获取到许可(运行程序可发现线程是两两一起执行的)。设置为其他值也是类似的。

比较特殊的是,如果把 Semaphore 的初始许可值设为 1,可以当做“互斥锁”来使用。

小结

Semaphore 是并发包中的一个工具类,其内部是基于 AQS 共享模式实现的。通常可以作为限流器使用,比如限定连接池等的大小。

BlockingQueue

BlockingQueue 意为“阻塞队列”,它在 JDK 中是一个接口。

所谓阻塞,简单来说就是当某些条件不满足时,让线程处于等待状态。例如经典的“生产者-消费者”模型,当存放产品的容器满的时候,生产者处于等待状态;而当容器为空的时候,消费者处于等待状态。阻塞队列的概念与该场景类似。

BlockingQueue 的继承关系如下:

jdk怎么设置三个环境变量(JDK源码详解Semaphore)(2)

可以看到 BlockingQueue 继承自 Queue 接口,它的常用实现类有 ArrayBlockingQueue、LinkedBlockingQueue、DelayQueue 等。同前面一样,本文先分析 BlockingQueue 接口的方法定义,后文再分析其实现类的代码。

PS: 从这个继承体系也可以看出来,直接实现接口的是抽象类,而实现类则通常继承自抽象类。为什么要这样设计呢?因为有些接口的实现类会有多个,而这些类之间有一部分逻辑是相似或者相同的,因此就把这部分逻辑提取到抽象类中,避免代码冗余。

代码分析

BlockingQueue 的方法定义如下:

jdk怎么设置三个环境变量(JDK源码详解Semaphore)(3)

其方法简单分析如下:

// 将指定元素插入到队列,若成功返回 true,否则抛出异常 boolean add(E e); // 将指定元素插入到队列,若成功返回 true,否则返回 false boolean offer(E e); // 将指定元素插入到队列,若队列已满则等待 void put(E e) throws InterruptedException; // 将指定元素插入到队列,若成功返回 true,否则返回 false,有超时等待 boolean offer(E e long timeout TimeUnit unit) throws InterruptedException // 获取并移除队列的头部,若为空则等待 E take() throws InterruptedException; // 获取并移除队列的头部,有超时等待(若超时返回 null) E poll(long timeout TimeUnit unit) throws InterruptedException; // 返回队列可以接收的容量,若无限制则返回 Integer.MAX_VALUE int remainingCapacity(); // 删除指定的元素(如果存在),返回是否删除成功 boolean remove(Object o); // 是否包含指定元素 public boolean contains(Object o); // 从此队列中删除所有可用元素,并将它们添加到给定集合中 int drainTo(Collection<? super E> c); // 从此队列中删除所有可用元素,并将它们添加到给定集合中(指定大小) int drainTo(Collection<? super E> c int maxElements);

主要方法小结如下:

Throws exceptions

Special value

Blocks

Time out

Insert

add(e)

offer(e)

put(e)

offer(e time unit)

Remove

remove()

poll()

take()

poll(time unit)

Examine

element()

peek()

-

-

Queue 接口前文「Queue Deque」已进行分析,这里不再赘述。

典型用法

生产者:

class Producer implements Runnable { private final BlockingQueue queue; Producer(BlockingQueue q) { queue = q; } public void run() { try { // 将产品放入队列 while (true) { queue.put(produce()); } } catch (InterruptedException ex) { ... handle ...} } Object produce() { ... } }

消费者:

class Consumer implements Runnable { private final BlockingQueue queue; Consumer(BlockingQueue q) { queue = q; } public void run() { try { // 从队列中消费产品 while (true) { consume(queue.take()); } } catch (InterruptedException ex) { ... handle ...} } void consume(Object x) { ... } }

测试类:

class Setup { void main() { BlockingQueue q = new SomeQueueImplementation(); // 创建并启动一个生产者和两个消费者 Producer p = new Producer(q); Consumer c1 = new Consumer(q); Consumer c2 = new Consumer(q); new Thread(p).start(); new Thread(c1).start(); new Thread(c2).start(); } }

PS: 上述代码是 BlockingQueue 的文档提供的,仅供参考。

小结

BlockingQueue 是一个接口,它主要定义了阻塞队列的一些方法。阻塞队列在并发编程中使用较多,比如线程池。

ArrayBlockingQueue

上面「BlockingQueue」简要分析了 BlockingQueue 接口的主要方法,ArrayBlockingQueue 就是该接口的一个主要实现类,本文分析该类的常用方法实现。

ArrayBlockingQueue 的类继承结构如下:

jdk怎么设置三个环境变量(JDK源码详解Semaphore)(4)

从 ArrayBlockingQueue 的名字大概可以猜出来,它的内部是由数组实现的,下面分析其代码实现。

代码分析

构造器

构造器 1:

// 构造器 1:初始化 ArrayBlockingQueue 对象,使用给定的容量 public ArrayBlockingQueue(int capacity) { // 调用构造器 2 进行初始化,默认使用非公平锁 this(capacity false); }

构造器 2:

// 构造器 2:使用给定容量及是否公平初始化 ArrayBlockingQueue 对象 public ArrayBlockingQueue(int capacity boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); // 用给定的容量初始化内部数组 this.items = new Object[capacity]; // 创建锁对象(根据 fair 参数确定是否公平锁) lock = new ReentrantLock(fair); // lock 绑定两个 Condition 条件 notEmpty = lock.newCondition(); notFull = lock.newCondition(); }

构造器 3:

// 构造器 3:使用给定的容量、是否公平,及给定的集合初始化 ArrayBlockingQueue public ArrayBlockingQueue(int capacity boolean fair Collection<? extends E> c) { // 使用构造器 2 初始化 ArrayBlockingQueue 对象 this(capacity fair); final ReentrantLock lock = this.lock; lock.lock(); // Lock only for visibility not mutual exclusion try { int i = 0; try { // 遍历给定集合的元素,将其插入数组 for (E e : c) { checkNotNull(e); items[i ] = e; } // 注意可能会发生数组越界 } catch (ArrayIndexOutOfBoundsException ex) { throw new IllegalArgumentException(); } // 数组中元素的数量 count = i; // 入队操作(put、offer 等方法)的数组下标,若数组已满则为 0 putIndex = (i == capacity) ? 0 : i; } finally { // 注意释放锁 lock.unlock(); } }

主要成员变量

/** The queued items */ // 内部保存元素的数组 final Object[] items; /** items index for next take poll peek or remove */ // 出队操作索引 int takeIndex; /** items index for next put offer or add */ // 入队操作索引 int putIndex; /** Number of elements in the queue */ int count; /* * Concurrency control uses the classic two-condition algorithm * found in any textbook. * 双条件(notEmpty、notFull)算法用于并发控制 */ /** Main lock guarding all access */ // 使用 ReentrantLock 保证线程安全 final ReentrantLock lock; /** Condition for waiting takes */ // 等待 take 操作(消费)的条件 private final Condition notEmpty; /** Condition for waiting puts */ // 等待 put 操作(生产)的条件 private final Condition notFull;

主要入队方法:add(E) offer(E) offer(E timeout Unit) put(E)

1. add(E) 方法

public boolean add(E e) { // 调用父类 AbstractQueue 的 add 方法 return super.add(e); } // AbstractQueue 的 add 方法 public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full"); }

add(E) 方法调用了父类 AbstractQueue 的 add(E) 方法,可以看到,实际上还是调用了 offer(E) 方法。因此 add(E) 和 offer(E) 实现基本是一致的,下面分析 offer(E) 方法。

2. offer(E) offer(E timeout Unit) 方法

public boolean offer(E e) { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lock(); try { // 若队列已满,立即返回 false if (count == items.length) return false; else { // 入队 enqueue(e); return true; } } finally { lock.unlock(); } }

enqueue 方法:

// 入队操作 private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; items[putIndex] = x; // 若队列已满,则下标置为 0 if ( putIndex == items.length) putIndex = 0; count ; // 唤醒 notEmpty 条件下等待的线程 notEmpty.signal(); }

offer(E) 方法是将一个元素入队:若队列已满直接返回 false,否则执行入队操作,并唤醒 notEmpty 条件下等待的线程。

以“生产者-消费者”模型类比,执行 offer(E) 操作后表示队列已经有产品了(不为空,即 notEmpty),消费者可以消费了。

offer(E timeout Unit) 方法操作与 offer(E) 类似,只是多了超时等待,如下:

public boolean offer(E e long timeout TimeUnit unit) throws InterruptedException { checkNotNull(e); long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) { if (nanos <= 0) return false; nanos = notFull.awaitNanos(nanos); } enqueue(e); return true; } finally { lock.unlock(); } }

3. put(E) 方法

public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { // 队列满的时候,notFull 条件等待 while (count == items.length) notFull.await(); // 入队 enqueue(e); } finally { lock.unlock(); } }

put(E) 也是将一个元素入队:若队列已满,则 notFull 条件下的线程等待。

以“生产者-消费者”模型类比,就是容器已满,生产者等待;否则执行入队,并唤醒消费者。

入队方法小结

1. add(E): 入队成功返回 true,否则抛出 IllegalStateException 异常;

2. offer(E): 入队成功返回 true,失败返回 false;

3. offer(E timeout Unit): 同 offer(E),加了超时等待;

4. put(E): 无返回值,队列满的时候等待。

主要出队方法:poll() poll(long unit) take() peek()

1. poll() 方法

public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { // 队列为空时返回 null,否则将 takeIndex 位置元素出队 return (count == 0) ? null : dequeue(); } finally { lock.unlock(); } }

2. take(E) 方法

public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { // 队列为空时等待 while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } }

3. poll(E unit) 方法

public E poll(long timeout TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) { if (nanos <= 0) return null; nanos = notEmpty.awaitNanos(nanos); } return dequeue(); } finally { lock.unlock(); } }

与 poll() 方法操作类似,只是多了超时等待。

上述三个方法都使用 dequeue 方法进行出队,如下:

// 出队操作 private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") // 获取 takeIndex 位置的元素 E x = (E) items[takeIndex]; // 将 该位置清空 items[takeIndex] = null; // 队列已经空了 if ( takeIndex == items.length) takeIndex = 0; count--; // 迭代器操作用到,本文暂不深入分析 if (itrs != null) itrs.elementDequeued(); // 队列已经不满(not full)了,可以继续生产 notFull.signal(); return x; }

4. peek() 方法

public E peek() { final ReentrantLock lock = this.lock; lock.lock(); try { return itemAt(takeIndex); // null when queue is empty } finally { lock.unlock(); } } // 返回数组中指定位置的元素 final E itemAt(int i) { return (E) items[i]; }

peek() 方法与前面几个出队操作不同,peek 方法只会获取队列的头元素,而不会将其删除。

出队方法小结

1. poll(): 获取队列头部元素,并将其移除,队列为空时返回 null;

2. take(): 获取队列头部元素,并将其移除,队列为空时阻塞等待;

3. poll(long unit): 获取队列头部元素,并将其移除,队列为空时等待一段时间,若超时返回 null;

4. peek(): 获取队列头部元素,但不移除该元素。

小结

1. ArrayBlockingQueue 是基于数组的阻塞队列实现,它在初始化时需要指定容量;

2. 内部使用了 ReentrantLock 保证线程安全;

3. 常用方法:

入队:add offer put

出队:poll take peek

本文分析了其常用的方法,此外,还有一些方法使用频率没那么高且稍微复杂,例如 iterator() 和 drainTo(),后文再进行分析。

猜您喜欢: