java并发技术讲解(Java并发编程笔记-JDK里非常实用的几个并发工具类)
java并发技术讲解(Java并发编程笔记-JDK里非常实用的几个并发工具类)还有一个构造器Semaphore(int permits boolean fair),手动控制使用公平锁(也就是先排队的先获取许可证)或是非公平锁。public class CyclicBarrierDemo { private static class CyclicBarrierThread implements Runnable { private final CyclicBarrier cyclicBarrier; public CyclicBarrierThread(CyclicBarrier cyclicBarrier) { this.cyclicBarrier = cyclicBarrier; } void queryInterface() throws InterruptedException { //模拟接口调用 Thread.sleep(1000);
简介在日常工作过程中,并发编程的知识我们或多或少都会用到一些,所以今天就来简单认识一下java.util.cucurrent包给我们提供的三个非常实用的并发工具类,CountDownLatch、CyclicBarrier、semaphore。
CountDownLatchCountDownLatch允许一个或多个线程等待在其他线程中执行的一组操作完成。
通过构造器传入初始计数器的值,计数器的值和线程数可以不一致。每当线程完成了一个任务后,通过调用实例的countDown()方法,计数器的值就会减1,计数器无法重置计数。当计数器值到达0时,它表示所有的已经完成了任务,然后在闭锁上等待CountDownLatch.await()方法的线程就可以恢复执行任务。countDown()方法允许在同一线程里多次调用。一个简单使用场景,比如我们在做单元测试的时候可以使用CountDownLatch来简单进行接口的压力测试,demo如下:
public class CountDownLatchDemo { private static class CountDownLatchThread implements Runnable { private final CountDownLatch countDownLatch1; private final CountDownLatch countDownLatch2; public CountDownLatchThread(CountDownLatch countDownLatch1 CountDownLatch countDownLatch2) { this.countDownLatch1 = countDownLatch1; this.countDownLatch2 = countDownLatch2; } void queryInterface() throws InterruptedException { //模拟接口调用 Thread.sleep(1000); } @Override public void run() { try { countDownLatch1.await(); try { queryInterface(); } catch (InterruptedException e) { e.printStackTrace(); } countDownLatch2.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) throws InterruptedException { int count = 10; CountDownLatch countDownLatch1 = new CountDownLatch(2); CountDownLatch countDownLatch2 = new CountDownLatch(count); for (int i = 0; i < count; i ) { new Thread(new CountDownLatchThread(countDownLatch1 countDownLatch2)).start(); } long start = System.currentTimeMillis(); countDownLatch1.countDown(); countDownLatch1.countDown(); countDownLatch2.await(); long end = System.currentTimeMillis(); System.out.println("10线程并发接口耗时:" (end - start) "(ms)"); } }
CyclicBarrierCyclicBarrier允许一组线程到达一个屏障时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行,屏障可循环利用。比如我们跟团旅游,导游都是要等所有人到齐了之后才会动身去景点。
CyclicBarrier的构造器有2个,一个是CyclicBarrier(int parties),parties表示屏障拦截的线程数量,每个线程调用await方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞;
CyclicBarrier还提供一个更高级的构造函数CyclicBarrier(int parties,Runnable barrierAction),用于在线程到达屏障时,优先执行barrierAction,方便处理更复杂的业务场景。
demo如下:
public class CyclicBarrierDemo { private static class CyclicBarrierThread implements Runnable { private final CyclicBarrier cyclicBarrier; public CyclicBarrierThread(CyclicBarrier cyclicBarrier) { this.cyclicBarrier = cyclicBarrier; } void queryInterface() throws InterruptedException { //模拟接口调用 Thread.sleep(1000); } @Override public void run() { try { System.out.println(Thread.currentThread().getName() "等待屏障"); cyclicBarrier.await(); queryInterface(); System.out.println(Thread.currentThread().getName() "再次等待屏障"); cyclicBarrier.await(); queryInterface(); System.out.println(Thread.currentThread().getName() "所有工作执行完成"); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } } private static class BarrierAction implements Runnable { @Override public void run() { System.out.println(Thread.currentThread().getName() ">>>BarrierAction执行"); } } public static void main(String[] args) { int count = 3; CyclicBarrier cyclicBarrier = new CyclicBarrier(count new BarrierAction()); for (int i = 0; i < count; i ) { new Thread(new CyclicBarrierThread(cyclicBarrier)) .start(); } } }
SemaphoreSemaphore(信号量),通常用于限制并发访问某些资源的线程数量。Semaphore通过构造器Semaphore(int permits)接受一个整型的数字,初始化一组可用的许可证,默认使用非公平锁。
还有一个构造器Semaphore(int permits boolean fair),手动控制使用公平锁(也就是先排队的先获取许可证)或是非公平锁。
线程使用Semaphore的acquire/tryAcquire方法获取一个许可证,没有就阻塞,使用完之后调用release方法归还许可证。Semaphore典型应用场景可以用于做流量控制。
public class SemaphoreDemo { private static long count = 0; private static class SemaphoreThread implements Runnable { private final Semaphore semaphore; public SemaphoreThread(Semaphore semaphore) { this.semaphore = semaphore; } @Override public void run() { try { semaphore.acquire(); System.out.println(Thread.currentThread().getName() ">>>count=" count); count ; semaphore.release(); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) { int count = 10; //设置最多一个线程获取许可证 相当于synchronizeed final Semaphore semaphore = new Semaphore(1); for (int i = 0; i < count; i ) { new Thread(new SemaphoreThread(semaphore)).start(); } } }
Semaphore还提供一些其他方法:
- int availablePermits() :返回此信号量中当前可用的许可证数。
- int getQueueLength():返回正在等待获取许可证的线程数。
- boolean hasQueuedThreads() :是否有线程正在等待获取许可证。