耘朵
一只不务正业的程序媛
Toggle navigation
耘朵
主页
归档
标签
JUC中AQS-AbstractQueuedSynchronizer并发中的同步器
java
线程
2021-10-25 13:03:00
116
0
0
admin
java
线程
## 组件: ### CountDownLatch 可以阻塞当前线程,同时只能有一个线程操作该计数器 代码用例 ``` package com.csyd.concurrency.example.aqs; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @Slf4j public class CountDownLatchExample1 { private final static int threadCount = 200; public static void main(String[] args) throws Exception{ ExecutorService exec = Executors.newCachedThreadPool(); final CountDownLatch countDownLatch = new CountDownLatch(threadCount); for (int i = 0; i < threadCount; i++){ final int threadNum = i; exec.execute(() -> { try { test(threadNum); } catch (Exception e) { log.error("exception", e); } finally { countDownLatch.countDown(); } }); } countDownLatch.await(); log.info("finish"); exec.shutdown(); } private static void test(int threadNum) throws Exception{ Thread.sleep(100); log.info("{}", threadNum); Thread.sleep(100); } } ``` 上例代码中,countDownLatch.await可以加等待时间,超出时间后不等待继续执行,如: countDownLatch.await(10, TimeUnit.MILLISECONDS);//等待10毫秒 ### Semaphore 信号量 控制某个资源可被同时访问的个数 常用于仅能提供有限访问的资源,比如数据库的连接数 常用方法 ``` //拿到许可 semaphore.acquire(); //释放许可 semaphore.release(); //拿到多个许可 semaphore.acquire(3); //释放多个许可 semaphore.release(3); //尝试获取许可 semaphore.tryAcquire() //尝试获取许可,超时的不再获取 semaphore.tryAcquire(5000, TimeUnit.MILLISECONDS) ``` 代码用例 ``` package com.csyd.concurrency.example.aqs; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; @Slf4j public class SemaphoreExample1 { private final static int threadCount = 20; public static void main(String[] args) throws Exception{ ExecutorService exec = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(3); for (int i = 0; i < threadCount; i++){ final int threadNum = i; exec.execute(() -> { try { //拿到许可 semaphore.acquire(); test(threadNum); //释放许可 semaphore.release(); //另一种,尝试获取许可 //if (semaphore.tryAcquire()){ // test(threadNum); // semaphore.release(); //} } catch (Exception e) { log.error("exception", e); } }); } log.info("finish"); exec.shutdown(); } private static void test(int threadNum) throws Exception{ log.info("{}", threadNum); Thread.sleep(1000); } } ``` ### CyclicBarrier 可以多个线程同步等待(await),在所有线程准备完毕后,继续往下进行,也是通过计数器实现的,它的计数器可以执行reset方法重新执行 常用方法 ``` barrier.await(); ``` 代码用例 ``` package com.csyd.concurrency.example.aqs; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @Slf4j public class CyclicBarrierExample1 { //5个准备好后开始执行 private static CyclicBarrier barrier = new CyclicBarrier(5); //另一种方式,准备好后首先进入代码中 //private static CyclicBarrier barrier = new CyclicBarrier(5, () ->{ // log.info("callback is running"); //}); public static void main(String[] args) throws Exception{ ExecutorService executor = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++){ final int threadNum = i; Thread.sleep(1000); executor.execute(() -> { try { race(threadNum); }catch (Exception e){ log.error("exception", e); } }); } } private static void race(int threadNum) throws Exception { Thread.sleep(1000); log.info("{} is ready", threadNum); barrier.await(); //或者await可以加等待时间,但是有异常,需要捕捉才能继续执行 //try { // barrier.await(2000, TimeUnit.MILLISECONDS); //}catch (BrokenBarrierException |TimeoutException e){ // log.warn("BrokenBarrierException"); //} log.info("{} continue", threadNum); } } ``` ### ReentrantLock 与锁(可重入锁) **ReentranLock与synchronized区别** >- 可重入性 >- 锁的实现 synchronized基于JVM实现,ReentranLock基于JDK实现 >- 性能的区别 性能差不多,官方推荐synchronized,写法容易 >- 功能区别 ReentranLock需要手工声明加锁和释放锁 **ReentrantLock独有功能** >- 可指定是公平锁还是非公平锁,synchronized只能指定非公平锁,公平锁就是先等待的线程先获得锁 >- 提供了一个Condition类,可以分组唤醒需要唤醒的线程 >- 提供能够中断等待锁的线程的机制,lock.lockInterruptibly() 代码用例 ``` package com.csyd.concurrency.example.lock; import com.csyd.concurrency.annoations.ThreadSafe; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @Slf4j @ThreadSafe public class LockExample2 { // 请求总数 public static final int clientTotal = 5000; // 同时并发执行的线程数 public static final int threadTotal = 200; public static int count = 0; private final static Lock lock = new ReentrantLock(); public static void main(String[] args) throws Exception{ //定义一个线程池 ExecutorService executorService = Executors.newCachedThreadPool(); //定义信号量,Semaphore是常用的并发测试类 final Semaphore semaphore = new Semaphore(threadTotal); //定义计数器递锁, CountDownLatch并发常用 final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal; i++){ //for循环请求放入线程池中 executorService.execute(() -> { try { // 线程池执行时引入信号量,acquire判断当前进程是否允许被执行,如果达到了一定并发数,可能会临时被阻塞掉, // acquire返回值后,下面的add才会执行 semaphore.acquire(); add(); //释放 semaphore.release(); } catch (Exception e) { log.error("exception", e); } // 计锁,每执行完一次后, 就会减一个 countDownLatch.countDown(); }); } //当countDown减为0,就会执行,也就是所有的线程执行完 countDownLatch.await(); //关闭线程池 executorService.shutdown(); log.info("count:{}", count); } private static void add(){ //加锁 lock.lock(); try { count++; } finally { //解锁 lock.unlock(); } } } ``` **ReentrantReadWriteLock** 没有任何读写锁的时候,才能获取写入锁,用于实现悲观读取,容易造成饥饿,不常用 代码用例 ``` package com.csyd.concurrency.example.lock; import lombok.extern.slf4j.Slf4j; import java.util.Map; import java.util.Set; import java.util.TreeMap; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantReadWriteLock; @Slf4j public class LockExample3 { private final Map<String, Data> map = new TreeMap<>(); private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); private final Lock readLock = lock.readLock(); private final Lock writeLock = lock.writeLock(); public Data get(String key){ readLock.lock(); try { return map.get(key); }finally { readLock.unlock(); } } public Set<String> getAllKeys(){ readLock.lock(); try { return map.keySet(); }finally { readLock.unlock(); } } public Data put(String key, Data value){ writeLock.lock(); try { return map.put(key, value); }finally { readLock.unlock(); } } class Data{ } } ``` **StampedLock** 控制锁有3中模式,写、读、乐观读 代码用例 ``` package com.csyd.concurrency.example.lock; import com.csyd.concurrency.annoations.ThreadSafe; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.StampedLock; @Slf4j @ThreadSafe public class LockExample5 { // 请求总数 public static final int clientTotal = 5000; // 同时并发执行的线程数 public static final int threadTotal = 200; public static int count = 0; private final static StampedLock lock = new StampedLock(); public static void main(String[] args) throws Exception{ //定义一个线程池 ExecutorService executorService = Executors.newCachedThreadPool(); //定义信号量,Semaphore是常用的并发测试类 final Semaphore semaphore = new Semaphore(threadTotal); //定义计数器递锁, CountDownLatch并发常用 final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal; i++){ //for循环请求放入线程池中 executorService.execute(() -> { try { // 线程池执行时引入信号量,acquire判断当前进程是否允许被执行,如果达到了一定并发数,可能会临时被阻塞掉, // acquire返回值后,下面的add才会执行 semaphore.acquire(); add(); //释放 semaphore.release(); } catch (Exception e) { log.error("exception", e); } // 计锁,每执行完一次后, 就会减一个 countDownLatch.countDown(); }); } //当countDown减为0,就会执行,也就是所有的线程执行完 countDownLatch.await(); //关闭线程池 executorService.shutdown(); log.info("count:{}", count); } private static void add(){ //加写锁,会返回一个stamp值 long stamp = lock.writeLock(); try { count++; } finally { //解锁需要携带stamp值 lock.unlock(stamp); } } } ``` 当只有少量竞争者的时候,synchronized是很好的通用锁的实现 竞争者不少,但是线程增长的趋势能够预估的,ReentrantLock是很好的通用的锁实现 synchronized不会产生死锁,JVM会自动解锁 其他的锁都需要unlock ### Condition 协调各个线程,释放信号,利用condition队列 代码用例 ``` package com.csyd.concurrency.example.lock; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; @Slf4j public class LockExample6 { public static void main(String[] args) { ReentrantLock reentrantLock = new ReentrantLock(); Condition condition = reentrantLock.newCondition(); new Thread(() -> { try { //线程1加入到了aps的等待队列中 reentrantLock.lock(); log.info("wait signal"); //1 //调用了condition的方法,也就释放了锁,就从aqs的等待队列里移除了,加入到了condition的等待队列里(aqs有两个队列) condition.await(); //等待着该线程需要一个信号 } catch (InterruptedException e){ e.printStackTrace(); } //线程2释放了,线程1继续执行 log.info("get signal"); //4 reentrantLock.unlock(); }).start(); new Thread(() -> { //线程2因为线程1释放了锁,被唤醒,获取锁,加入到了aqs等待队列中 reentrantLock.lock(); log.info("get lock"); //2 try { Thread.sleep(3000); }catch (InterruptedException e){ e.printStackTrace(); } //发送信号,线程1就从condition队列中拿出来到了aqs的等待队列,此时还没被唤醒 condition.signalAll(); log.info("send signal ~"); //3 //释放锁之后,aqs中只剩线程1,线程1就可以继续执行 reentrantLock.unlock(); }).start(); } } ``` ## 扩展组件 ### FutureTask Callable:一个泛型接口,有个call函数,于Runnable大致相似,但是功能更强大,被线程执行后可以有返回值并抛出异常 Runnable:一个接口,只有一个run方法,创建个类实现它,使用某个线程执行该Runable实现类,就可以实现多线程 Future:一个接口,可以将任务取消,查询任务是否被取消,获取结果等等,可以监视目标线程调用call的情况,调用get方法 FutrueTask:父类是RunnableFuture,父类继承了Runnable和Future,执行了Callable任务 **Future代码用例** ``` package com.csyd.concurrency.example.aqs; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @Slf4j public class FutureExample { static class MyCallable implements Callable<String> { @Override public String call() throws Exception { log.info("do something in callable"); Thread.sleep(5000); return "Done"; } } public static void main(String[] args) throws Exception{ ExecutorService executorService = Executors.newCachedThreadPool(); Future<String> future = executorService.submit(new MyCallable()); log.info("do something in main"); Thread.sleep(1000); String result = future.get(); log.info("result:{}", result); } } ``` **FutureTask代码用例** ``` package com.csyd.concurrency.example.aqs; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.Callable; import java.util.concurrent.FutureTask; @Slf4j public class FutureTaskExample { public static void main(String[] args) throws Exception{ FutureTask<String> futureTask = new FutureTask<String>(new Callable<String>() { @Override public String call() throws Exception { log.info("do something in callable"); Thread.sleep(5000); return "Done"; } }); new Thread(futureTask).start(); log.info("do something in main"); Thread.sleep(1000); String result = futureTask.get(); log.info("result:{}", result); } } ``` ### Fork/Join框架 把大任务分割成若干个小任务,最终汇总每个小任务结果,返回大任务的框架 采用工作窃取算法,比如线程1和线程2,如果线程1执行完了所有任务,线程2还没执行完,线程1就会从尾部窃取线程2的任务执行 **代码用例** ``` package com.csyd.concurrency.example.aqs; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Future; import java.util.concurrent.RecursiveTask; @Slf4j public class ForkJoinTaskExample extends RecursiveTask<Integer> { public static final int threshold = 2; private int start; private int end; public ForkJoinTaskExample(int start, int end){ this.start = start; this.end = end; } @Override protected Integer compute() { int sum = 0; boolean canCompute = (end - start) <= threshold; if (canCompute) { for (int i = start; i <= end; i++){ sum += i; } }else { // 拆分子任务 int middle = (start + end) / 2; ForkJoinTaskExample leftTask = new ForkJoinTaskExample(start, middle); ForkJoinTaskExample rightTask = new ForkJoinTaskExample(middle + 1, end); //执行任务 leftTask.fork(); rightTask.fork(); //合并任务 int leftResult = leftTask.join(); int rightResult = rightTask.join(); sum = leftResult + rightResult; } return sum; } public static void main(String[] args) { ForkJoinPool forkJoinPool = new ForkJoinPool(); //生成一个任务1+2+3+4... ForkJoinTaskExample task = new ForkJoinTaskExample(1, 100); Future<Integer> result = forkJoinPool.submit(task); try { log.info("result:{}", result.get()); } catch (Exception e){ log.error("exception", e); } } } ``` ### BlockingQueue 阻塞队列,当一个线程要入满的队列时,会阻塞,当一个空队列中有线程要出的时候,会阻塞 **不能马上执行时,对应的反应** |操作|Throws Exception|SpecialValue(返回值,一般ture和false)|Blocks(阻塞)|Times Out(阻塞指定的时间,超时返回特殊值)| |----|----|----|----|----|----| |Insert(插入)|add(o)|offer(o)|put(o)|offer(o, timeout, timeunit)| |Remove(移除)|remove(o)|poll()|take()|poll(timeout, timeunit)| |Examine(检查)|element()|peek()| **实现类** - ArrayBlockingQueue:有界的阻塞队列,内部数组,必须初始化时候指定容量大小,先进先出方法存储 - DelayQueue:阻塞内部元素,元素必须实现接口Delay - LinkedBlockingQueue:大小配置可选,内部链表,先进先出 - PriorityBlockingQueue:带优先级的阻塞队列,没有边界,有排序规则,允许null,插入的对象必须实现Comparable - SynchronousQueue:只允许容纳一个元素
上一篇:
线程池
下一篇:
安全发布对象
0
赞
116 人读过
新浪微博
微信
腾讯微博
QQ空间
人人网
文档导航