AtomicXXXFieldUpdater
算是一个小补充
简介 public class AtomicIntegerFieldUpdaterTest { public static void main (String[] args) { AtomicIntegerFieldUpdater<Test> updater = AtomicIntegerFieldUpdater.newUpdater(Test.class, "value" ); Test ts = new Test(); IntStream.rangeClosed(0 , 2 ).forEach(item -> { new Thread(() -> { int value = updater.getAndIncrement(ts); System.out.println("oldV: " + value); }).start(); }); } } class Test { volatile int value; }
1、以AtomicIntegerFieldUpdater
为例,看上面代码。Test类的value属性被volatile修饰了,但是volatile只能保证可见性和有序性。在以往的文章里我们讲过是可以通过volatile+CAS同时解决可见性,有序性,原子性。
2、JUC提供了一种新的功能来保证原子性,AtomicXXXFieldUpdater修饰的类对应的字段,在进行更新时同样可以保证原子性。
使用场景
想让类的属性操作具备原子性,
但是不想使用锁。
大量需要原子类型修饰的对象,相比较比较耗费内存
举个例子:
如果你想要保证原子性,一般是使用AtomicStampedReference<Node>
来包装Node对象。
import org.apache.lucene.util.RamUsageEstimator;import java.util.concurrent.atomic.AtomicStampedReference;public class Calculate_Java_Object_Size { public static void main (String[] args) { Node node = new Node(); long b = RamUsageEstimator.shallowSizeOf(node); System.out.println(b); AtomicStampedReference<Node> nodeAtomicStampedReference = new AtomicStampedReference<Node>(node,0 ); long c = RamUsageEstimator.shallowSizeOf(nodeAtomicStampedReference); System.out.println(c + b); } public static void Api (Object o) { long a = RamUsageEstimator.sizeOf(o); System.out.println(a); long b = RamUsageEstimator.shallowSizeOf(o); System.out.println(b); String c = RamUsageEstimator.humanSizeOf(o); System.out.println(c); } } class Node { Node pre; Node next; Integer value; }
输出结果:
24
40
可以看出如果用了AtomicStampedReference<Node>
,会多出16个字节。如果对象有10000个,那么会多出很多字节。生产过程中的内存都是很贵的。为了减少内存消耗,同时可以保证原子性,就可以使用AtomicXXXFieldUpdater
。
CountDownLatch 简介
countDownLatch这个类使一个线程等待其他线程各自执行完毕后再执行。
是通过一个 state(相当于计数器)的东西来实现的,计数器的初始值是 线程的数量或者任务的数量 。
每当一个线程执行完毕后,计数器的值就-1,当计数器的值为0时,表示所有线程都执行完毕,然后在闭锁上等待的线程就可以恢复工作了。
CountDownLatch的方便之处在于,你可以在一个线程中使用, 也可以在多个线程上使用,一切只依据状态值 ,这样便不会受限于任何的场景。
使用场景一 需求
可能刚从数据库读取了一批数据
利用并发处理这批数据
当所有的数据处理完成后,再去执行后面的操作
解决方案
第一种 :可以利用 join 的方法,但是在线程池中,比较麻烦。
第二种 :利用线程池的awaitTermination,阻塞一段时间。
当使用awaitTermination时,主线程会处于一种等待的状态,等待线程池中所有的线程都运行完毕后才继续运行。
第三种 :利用CountDownLatch,每当任务完成一个,就计数器减一。
package _05_AQS;import java.util.concurrent.CountDownLatch;public class Video32 { public static void main (String[] args) throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(6 ); for (int i = 0 ; i < 6 ; i++) { new Thread(() ->{ System.out.println("\t\t" + Thread.currentThread().getName() + "处理完毕~~~" ); countDownLatch.countDown(); System.out.println("非调用者线程-" + Thread.currentThread().getName() + "-还可以干点其他事" ); }, Country.forEach_Country(i + 1 ).getCountryName()).start(); } countDownLatch.await(); System.out.println("-----------------------------" ); System.out.println("\t 所有任务都已经处理完毕,可以往后执行了!" ); } } enum Country { ONE(1 ,"1号任务" ), TWO(2 ,"2号任务" ), THREE(3 ,"3号任务" ), FOUR(4 ,"4号任务" ), FIVE(5 ,"5号任务" ), SIX(6 ,"6号任务" ); private Integer index; private String countryName; public static Country forEach_Country (Integer index) { Country[] values = Country.values(); for (Country c: values) { if (c.getIndex() == index){ return c; } } return null ; } Country(Integer index, String countryName) { this .index = index; this .countryName = countryName; } public Integer getIndex () { return index; } public String getCountryName () { return countryName; } }
结果
1 号任务处理完毕~~~ 5 号任务处理完毕~~~ 非调用者线程-5 号任务-还可以干点其他事 6 号任务处理完毕~~~ 3 号任务处理完毕~~~ 非调用者线程-3 号任务-还可以干点其他事 4 号任务处理完毕~~~ 2 号任务处理完毕~~~ 非调用者线程-4 号任务-还可以干点其他事 非调用者线程-6 号任务-还可以干点其他事 非调用者线程-1 号任务-还可以干点其他事 ----------------------------- 所有任务都已经处理完毕,可以往后执行了! 非调用者线程-2 号任务-还可以干点其他事
因为countdown只会阻塞调用者,其它线程干完任务就可以干其他事。这里的调用者线程就是main线程。
使用场景二 需求
多个线程协同工作
多个线程需要等待其他线程的工作之后,再进行其后续工作。
被唤醒后继续执行其他操作
public class CountDownLatchExample { public static void main (String[] args) throws InterruptedException { final CountDownLatch latch = new CountDownLatch(1 ); new Thread(() -> { System.out.println(Thread.currentThread().getName() + " Do some initial working." ); try { Thread.sleep(1000 ); latch.await(); System.out.println(Thread.currentThread().getName() + " Do other working." ); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); new Thread(() -> { System.out.println(Thread.currentThread().getName() + " Do some initial working." ); try { Thread.sleep(1000 ); latch.await(); System.out.println(Thread.currentThread().getName() + " Do other working." ); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); new Thread(() -> { System.out.println("asyn prepare for some data." ); try { Thread.sleep(2000 ); System.out.println("Data prepare for done." ); } catch (InterruptedException e) { e.printStackTrace(); }finally { latch.countDown(); } }).start(); } }
结果
Thread-0 Do some initial working. Thread-1 Do some initial working. asyn prepare for some data. Data prepare for done. Thread-0 Do other working. Thread-1 Do other working.
总体来说意思都差不多
常用API 构造方法只有一个
CountDownLatch(int count)
:构造一个以给定计数
实例方法
public void await()
public boolean await(long timeout,TimeUnit unit)
等待一段时间
timeout - 等待的最长时间 , unit - timeout参数的时间单位
如果 指定的等待时间过去 ,则返回值false
如果 计数达到零 ,则方法返回值为true
public void countDown()
减少锁存器的计数, 如果计数达到零,释放所有等待的线程 。
public long getCount()
给离散的平行任务增加逻辑层次关系 需求
并发的从很多的数据库读取大量数据
在读取数据的过程中,某个表可能会出现: 数据丢失、数据精度丢失、数据大小不匹配。
需要进行对数据的各个情况进行检测,这个检测是并发的完成的
所以需要控制如果一个表所有的情况检测完成,再进行后续的操作
解决
利用 CountDownLatch
的计数器
每当一个检测完成,计数器减一
如果计数为0,执行后面操作
import java.util.ArrayList;import java.util.List;import java.util.Random;import java.util.concurrent.CountDownLatch;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;public class test { private static final Random RANDOM = new Random(); public static void main (String[] args) throws Exception { Event[] events = {new Event(1 ), new Event(2 )}; ExecutorService service = Executors.newFixedThreadPool(5 ); for (Event event : events) { List<Table> tables = capture(event); for (Table table : tables) { TaskBatch taskBatch = new TaskBatch(2 ); TrustSourceColumns sourceColumns = new TrustSourceColumns(table, taskBatch); TrustSourceRecordCount recordCount = new TrustSourceRecordCount(table, taskBatch); service.submit(sourceColumns); service.submit(recordCount); } } } static class Event { private int id; Event(int id) { this .id = id; } } interface Watcher { void done (Table table) ; } static class TaskBatch implements Watcher { private final CountDownLatch latch; TaskBatch(int size) { this .latch = new CountDownLatch(size); } @Override public void done (Table table) { latch.countDown(); if (latch.getCount() == 0 ) { System.out.println("The table " + table.tableName + " finished work , " + table.toString()); } } } static class Table { String tableName; long sourceRecordCount; long targetCount; String columnSchema = "columnXXXType = varchar" ; String targetColumnSchema = "" ; public Table (String tableName, long sourceRecordCount) { this .tableName = tableName; this .sourceRecordCount = sourceRecordCount; } @Override public String toString () { return "Table{" + "tableName='" + tableName + '\'' + ", sourceRecordCount=" + sourceRecordCount + ", targetCount=" + targetCount + ", columnSchema='" + columnSchema + '\'' + ", targetColumnSchema='" + targetColumnSchema + '\'' + '}' ; } } private static List<Table> capture (Event event) { List<Table> list = new ArrayList<>(); for (int i = 0 ; i < 4 ; i++) { list.add(new Table("table-" + event.id + "-" + i, i * 1000 )); } return list; } static class TrustSourceRecordCount implements Runnable { private final Table table; private final TaskBatch taskBatch; TrustSourceRecordCount(Table table, TaskBatch taskBatch) { this .table = table; this .taskBatch = taskBatch; } @Override public void run () { try { Thread.sleep(RANDOM.nextInt(100 )); } catch (InterruptedException e) { e.printStackTrace(); } table.targetCount = table.sourceRecordCount; taskBatch.done(table); } } static class TrustSourceColumns implements Runnable { private final Table table; private final TaskBatch taskBatch; TrustSourceColumns(Table table, TaskBatch taskBatch) { this .table = table; this .taskBatch = taskBatch; } @Override public void run () { try { Thread.sleep(RANDOM.nextInt(100 )); } catch (InterruptedException e) { e.printStackTrace(); } table.targetColumnSchema = table.columnSchema; taskBatch.done(table); } } }
结果
The table table-1-1 finished work , Table{tableName='table-1-1', sourceRecordCount=1000, targetCount=1000, columnSchema='columnXXXType = varchar', targetColumnSchema='columnXXXType = varchar'} The table table-1-0 finished work , Table{tableName='table-1-0', sourceRecordCount=0, targetCount=0, columnSchema='columnXXXType = varchar', targetColumnSchema='columnXXXType = varchar'} The table table-2-0 finished work , Table{tableName='table-2-0', sourceRecordCount=0, targetCount=0, columnSchema='columnXXXType = varchar', targetColumnSchema='columnXXXType = varchar'} The table table-1-2 finished work , Table{tableName='table-1-2', sourceRecordCount=2000, targetCount=2000, columnSchema='columnXXXType = varchar', targetColumnSchema='columnXXXType = varchar'} The table table-1-3 finished work , Table{tableName='table-1-3', sourceRecordCount=3000, targetCount=3000, columnSchema='columnXXXType = varchar', targetColumnSchema='columnXXXType = varchar'} The table table-2-1 finished work , Table{tableName='table-2-1', sourceRecordCount=1000, targetCount=1000, columnSchema='columnXXXType = varchar', targetColumnSchema='columnXXXType = varchar'} The table table-2-2 finished work , Table{tableName='table-2-2', sourceRecordCount=2000, targetCount=2000, columnSchema='columnXXXType = varchar', targetColumnSchema='columnXXXType = varchar'} The table table-2-3 finished work , Table{tableName='table-2-3', sourceRecordCount=3000, targetCount=3000, columnSchema='columnXXXType = varchar', targetColumnSchema='columnXXXType = varchar'}
利用CountDownLatch实现回调函数 实现:
在每个线程使计数器减一的时候,利用getCount判断,当前是否所有线程任务执行完成
public class CyclicBarrierTest { static class MyCountDownLatch extends CountDownLatch { private final Runnable runnable; public MyCountDownLatch (int count,Runnable runnable) { super (count); this .runnable = runnable; } @Override public void countDown () { super .countDown(); if (getCount()==0 ){ this .runnable.run(); } } } public static void main (String[] args) { final MyCountDownLatch latch = new MyCountDownLatch(2 , ()->{ System.out.println("All of work finish done. This is call back." ); }); new Thread(){ @Override public void run () { try { Thread.sleep(1000 ); latch.countDown(); System.out.println(getName() + " finished." ); } catch (InterruptedException e) { e.printStackTrace(); } } }.start(); new Thread(){ @Override public void run () { try { Thread.sleep(1000 ); latch.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(getName() + " finished." ); } }.start(); } }
CyclicBarrier 引出
栅栏类似于闭锁(CountDownLatch),它能阻塞一组线程直到某个事件的发生。栅栏与闭锁的关键区别在于, 所有的线程必须同时到达栅栏位置,才能继续执行 。闭锁用于等待事件,而 栅栏用于等待其他线程 。
CyclicBarrier可以使一定数量的线程反复地在栅栏位置处汇集。 当线程到达栅栏位置时将调用await方法,这个方法将阻塞直到所有线程都到达栅栏位置。 如果所有线程都到达栅栏位置,那么栅栏将打开,此时所有的线程都将被释放,而栅栏将被重置以便下次使用。
package _05_AQS;import java.util.concurrent.BrokenBarrierException;import java.util.concurrent.CyclicBarrier;public class Video33 { public static void main (String[] args) { CyclicBarrier cyclicBarrier = new CyclicBarrier(7 ,() -> System.out.println("收集到7颗龙珠,召唤神龙" )); for (int i = 0 ; i < 7 ; i++) { final int temp = i + 1 ; new Thread(() ->{ System.out.println(Thread.currentThread().getName() + "\t收集到第" + temp + "颗龙珠" ); try { int await = cyclicBarrier.await(); System.out.println("还剩几个:" + await); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } },"线程" + String.valueOf(i)).start(); } } }
结果:
线程0 收集到第1颗龙珠 线程2 收集到第3颗龙珠 线程3 收集到第4颗龙珠 线程1 收集到第2颗龙珠 线程4 收集到第5颗龙珠 线程5 收集到第6颗龙珠 线程6 收集到第7颗龙珠 收集到7颗龙珠,召唤神龙 还剩几个:0 还剩几个:6 还剩几个:3 还剩几个:4 还剩几个:5 还剩几个:1 还剩几个:2
可以明显的看到cyclicBarrier会阻塞所有线程,和countdownlatch不一样。
API使用 构造方法 public CyclicBarrier (int parties) public CyclicBarrier (int parties, Runnable barrierAction)
parties
是参与线程的个数
第二个构造方法有一个 Runnable
参数,这个参数的意思是 *最后一个到达线程要做的任务
重要方法 public int await () throws InterruptedException, BrokenBarrierExceptionpublic int await (long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException
线程调用 await() 表示自己已经到达栅栏
BrokenBarrierException 表示栅栏已经被破坏,破坏的原因可能是其中一个线程 await() 时被中断或者超时
其他方法
将屏障重置为初始状态。 如果任何一方正在等待屏障,他们将返回 BrokenBarrierException 。
这样就可以重复利用这个屏障
CyclicBarrier 与 CountDownLatch 区别
CountDownLatch
是一次性的。 CyclicBarrier
是可循环利用的
CountDownLatch
参与的线程的职责是不一样的,有的在倒计时,有的在等待倒计时结束。 CyclicBarrier
参与的线程职责是一样的
牛客-京东2019春招Java开发笔试卷-T27
https://blog.csdn.net/liangyihuai/article/details/83106584
从jdk作者设计的目的来看,javadoc是这么描述它们的:
CountDownLatch: A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.
CyclicBarrier : A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point.
从javadoc的描述可以得出: CountDownLatch:一个或者多个线程,等待其他多个线程完成某件事情之后才能执行; CyclicBarrier:多个线程互相等待,直到到达同一个同步点,再继续一起执行。
对于CountDownLatch来说,重点是“一个线程(多个线程)等待”,而其他的N个线程在完成“某件事情”之后,可以终止,也可以等待。 而对于CyclicBarrier,重点是多个线程,在任意一个线程没有完成,所有的线程都必须互相等待,然后继续一起执行。 CountDownLatch是计数器,线程完成一个记录一个,只不过计数不是递增而是递减,而CyclicBarrier更像是一个阀门,需要所有线程都到达,阀门才能打开,然后继续执行。 按照这个题目的描述等所有线程都到达了这一个阀门处,再一起执行,此题强调的是,一起继续执行,我认为 选B 比较合理!
Exchanger 简介
用于两个工作线程之间交换数据的封装工具类
简单说就是一个线程在完成一定的事务后想与另一个线程交换数据,则 *第一个先拿出数据的线程会一直等待第二个线程,直到第二个线程拿着数据到来时才能彼此交换对应数据
Exchanger<v></v>
泛型类型,其中 V 表示可交换的数据类型
简单的应用 import java.util.concurrent.*;public class ExchangerTest { public static void main (String[] args) { final Exchanger<String> exchanger = new Exchanger<>(); new Thread(()->{ System.out.println(Thread.currentThread().getName() + " start . " ); try { TimeUnit.MILLISECONDS.sleep(200 ); String exchange = exchanger.exchange("I am come from T-A" ); System.out.println(Thread.currentThread().getName() + " get value : " + exchange); System.out.println(Thread.currentThread().getName() + " end . " ); } catch (InterruptedException e) { e.printStackTrace(); } },"A" ).start(); new Thread(()->{ System.out.println(Thread.currentThread().getName() + " start . " ); try { String exchange = exchanger.exchange("I am come from T-B" ); System.out.println(Thread.currentThread().getName() + " get value : " + exchange); System.out.println(Thread.currentThread().getName() + " end . " ); } catch (InterruptedException e) { e.printStackTrace(); } },"B" ).start(); } }
结果:
A start . B start . A get value : I am come from T-B B get value : I am come from T-A B end . A end .
方法介绍 public V exchange (V x) throws InterruptedException
等待另一个线程到达此交换点,然后将给定对象传输给它,接收其对象作为回报。
可以被打断
如果已经有个线程正在等待了,则直接交换数据
数据的分析 import java.util.concurrent.*;public class test { public static void main (String[] args) { final Exchanger<Object> exchanger = new Exchanger<>(); new Thread(() -> { Object Aobj = new Object(); System.out.println("A将会发送:" + Aobj); try { Object Robj = exchanger.exchange(Aobj); System.out.println("A接收的:" + Robj); } catch (InterruptedException e) { e.printStackTrace(); } }, "A" ).start(); new Thread(() -> { Object Bobj = new Object(); System.out.println("B将会发送:" + Bobj); try { Object Robj = exchanger.exchange(Bobj); System.out.println("B接收的:" + Robj); } catch (InterruptedException e) { e.printStackTrace(); } }, "B" ).start(); } }
结果
A将会发送:java.lang.Object@7be2d776 B将会发送:java.lang.Object@5eca8d3a B接收的:java.lang.Object@7be2d776 A接收的:java.lang.Object@5eca8d3a
从这个例子可以看出一个很严重的问题 :发送的对象和接收的对象是同一个对象,可能会用严重的线程安全问题。
ReentrantLock 简介 当多个线程需要访问某个公共资源的时候,我们知道需要通过加锁来保证资源的访问不会出问题。java提供了两种方式来加锁:
一种是关键字: synchronized
,一种是concurrent包下的基于API实现的。
synchronized是 JVM底层支持的,而concurrent包则是 jdk
实现。
公平锁和非公平锁
当有线程竞争锁时,当前线程会首先尝试获得锁而不是在队列中进行排队等候,这对于那些已经在队列中排队的线程来说显得不公平,这也是非公平锁的由来
默认情况下为非公平锁。
锁的存储结构就两个东西:”双向链表” + “int类型状态”。ReenTrantLock的实现是一种自旋锁, 通过循环调用CAS操作来实现加锁。它的性能比较好也是因为避免了使线程进入内核态的阻塞状态。想尽办法避免线程进入内核的阻塞状态是我们去分析和理解锁设计的关键钥匙。
构造方法 public ReentrantLock () public ReentrantLock (boolean fair)
获得锁 public void lock () public void lockInterruptibly () throws InterruptedExceptionpublic boolean tryLock () public boolean tryLock (long timeout, TimeUnit unit) throws InterruptedException
lock
lockInterruptibly
tryLock
如果获得到锁,返回true
如果没有获得到锁,返回false
timeout
:表示等待的时间
tryLock()
在获取的锁的时候,不会考虑此时是否有其他线程在等待,会破坏公平。
如果你希望遵守公平设置此锁,然后用 tryLock(0, TimeUnit.SECONDS)
这几乎是等效的(它也检测中断)。
释放锁
锁的调试 protected Thread getOwner () public final boolean hasQueuedThreads () public final int getQueueLength () protected Collection<Thread> getQueuedThreads ()
getOwner
hasQueuedThreads
getQueueLength
getQueuedThreads
Lock和synchronized的区别 底层实现 :
Lock基于 AQS
实现,通过state和一个CLH队列来维护锁的获取与释放
synchronized需要通过 monitor
,经历一个从用户态到内核态的转变过程,更加耗时
其他区别
synchronized
Lock
是java内置关键字,在jvm层面
是个java类
无法判断是否获取锁的状态
可以判断是否获取到锁
会自动释放锁
需在finally中手工释放锁(unlock()方法释放锁),否则容易造成线程死锁
线程会一直等待下去
如果尝试获取不到锁,线程可以不用一直等待就结束
总结来说 synchronized的锁可重入、不可中断、非公平。而Lock锁可重入、可判断、可公平(两者皆可)。
Semaphore 简介
Semaphore是一种在多线程环境下使用的设施,该设施负责协调各个线程,以保证它们能够正确、合理的使用公共资源的设施 ,也是操作系统中用于控制进程同步互斥的量。
Semaphore是一种计数信号量,用于管理一组资源,内部是基于AQS的共享模式。它相当于给线程规定一个量从而控制允许活动的线程数 。
Semaphore用于限制可以访问某些资源(物理或逻辑的)的线程数目,他维护了一个许可证集合,有多少资源需要限制就维护多少许可证集合,假如这里有N个资源,那就对应于N个许可证,同一时刻也只能有N个线程访问。
一个线程获取许可证就调用acquire方法,用完了释放资源就调用release方法。
除了JDK定义的哪些锁,Semaphore也可以定义锁。Semaphore可以做的功能相当的多,比如秒杀限流
用Semaphore自定义Lock public class SemaphoreTest { public static void main (String[] args) { final SemaphoreLock lock = new SemaphoreLock(); for (int i = 0 ; i < 2 ; i++) { new Thread(()->{ try { lock.lock(); System.out.println(Thread.currentThread().getName() + " get the lock." ); Thread.sleep(3000 ); }catch (Exception e){ e.printStackTrace(); }finally { System.out.println(Thread.currentThread().getName() + " release the lock." ); lock.unlock(); } }).start(); } } static class SemaphoreLock { private final Semaphore semaphore = new Semaphore(1 ); public void lock () throws InterruptedException { semaphore.acquire(); } public void unlock () { semaphore.release(); } } }
结果
Thread-0 get the lock. Thread-0 release the lock. Thread-1 get the lock. Thread-1 release the lock.
跟synchronized的区别
可以看出最大的区别就是可以控制多个线程访问多份资源,而不是只用一个线程访问一份资源
用Semaphore做限流 Semaphore可以控制多个线程访问多份资源,而不是只用一个线程访问一份资源,所以在限流方面也有应用。
package _05_AQS;import java.util.concurrent.Semaphore;import java.util.concurrent.TimeUnit;public class Video34 { public static void main (String[] args) { Semaphore semaphore = new Semaphore(3 ); for (int i = 0 ; i < 10 ; i++) { new Thread(() ->{ try { semaphore.acquire(); System.out.println(Thread.currentThread().getName() + "\t 进入抢购秒杀页面,准备抢小米9" ); try { TimeUnit.SECONDS.sleep(3 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "\t 离开抢购秒杀页面,成功抢到小米9" ); } catch (InterruptedException e) { e.printStackTrace(); }finally { semaphore.release(); } },"用户" + String.valueOf(i)).start(); } } }
结果:
用户0 进入抢购秒杀页面,准备抢小米9 用户2 进入抢购秒杀页面,准备抢小米9 用户1 进入抢购秒杀页面,准备抢小米9 用户1 离开抢购秒杀页面,成功抢到小米9 用户2 离开抢购秒杀页面,成功抢到小米9 用户0 离开抢购秒杀页面,成功抢到小米9 用户4 进入抢购秒杀页面,准备抢小米9 用户3 进入抢购秒杀页面,准备抢小米9 用户5 进入抢购秒杀页面,准备抢小米9 用户3 离开抢购秒杀页面,成功抢到小米9 用户4 离开抢购秒杀页面,成功抢到小米9 用户6 进入抢购秒杀页面,准备抢小米9 用户8 进入抢购秒杀页面,准备抢小米9 用户5 离开抢购秒杀页面,成功抢到小米9 用户7 进入抢购秒杀页面,准备抢小米9
代码里限制了资源为3个,所以同一时间段只能有3个线程进来抢购小米9手机。
常用API 构造方法 public Semaphore (int permits) public Semaphore (int permits , boolean fair)
permits
:同一时间可以访问资源的线程数目
fair
:尽可能的保证公平
重要方法 public void acquire () throws InterruptedExceptionpublic void release ()
acquire多个时,如果没有足够的许可证可用,那么当前线程将被禁用以进行线程调度 ,并且处于休眠状态,直到发生两件事情之一:
一些其他线程调用此信号量的一个release方法,当前线程旁边将分配许可证,并且可用许可证的数量满足此请求;
要么一些其他线程interrupts当前线程。
release多个时,会使许可证增多,最终可能超过初始值
public boolean tryAcquire (int permits) public boolean tryAcquire (int permits, long timeout, TimeUnit unit) throws InterruptedException
其他方法 public int availablePermits ()
返回此信号量中当前可用的许可数。
protected Collection<Thread> getQueuedThreads()
public final int getQueueLength()
getQueuedThreads
返回正在阻塞的线程集合
getQueueLength
返回阻塞获取的线程数
public void acquireUninterruptibly()
public void acquireUninterruptibly(int permits)
可以不被打断
获取许可证
public int drainPermits()
获取当前全部的许可证目标
Condition 简介
Condition主要是用于线程通信的,也就是和Object类的wait,notify有同样的功能。不过Condition的功能更加多样,Conditon可以绑定锁,实现选择性唤醒。
Condition和Lock搭配使用; condition必须使用lock.newCondition();
来创建condition。必须存放在Lock中。否则抛出异常。
package _07_LockAndCondition;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;public class ConditionDemo { public static void main (String[] args) { ShareResource shareResource = new ShareResource(); new Thread(() ->{ for (int i = 0 ; i < 2 ; i++) { shareResource.print3(); } },"A" ).start(); new Thread(() ->{ for (int i = 0 ; i < 2 ; i++) { shareResource.print6(); } },"B" ).start(); new Thread(() ->{ for (int i = 0 ; i < 2 ; i++) { shareResource.print9(); } },"C" ).start(); } } class ShareResource { private volatile Integer flag = 1 ; private Lock lock = new ReentrantLock(); private Condition c1 = lock.newCondition(); private Condition c2 = lock.newCondition(); private Condition c3 = lock.newCondition(); public void print3 () { lock.lock(); try { while (flag != 1 ){ c1.await(); } for (int i = 0 ; i < 3 ; i++) { System.out.println(Thread.currentThread().getName() + "\t" + (i+1 )); } flag = 2 ; c2.signal(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } public void print6 () { lock.lock(); try { while (flag != 2 ){ c2.await(); } for (int i = 0 ; i < 6 ; i++) { System.out.println(Thread.currentThread().getName() + "\t" + (i+1 )); } flag = 3 ; c3.signal(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } public void print9 () { lock.lock(); try { while (flag != 3 ){ c3.await(); } for (int i = 0 ; i < 9 ; i++) { System.out.println(Thread.currentThread().getName() + "\t" + (i+1 )); } flag = 1 ; c1.signal(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } }
结果:
A 1 A 2 A 3 B 1 B 2 B 3 B 4 B 5 B 6 C 1 C 2 C 3 C 4 C 5 C 6 C 7 C 8 C 9 A 1 A 2 A 3 B 1 B 2 B 3 B 4 B 5 B 6 C 1 C 2 C 3 C 4 C 5 C 6 C 7 C 8 C 9
Condition版生产者消费者 public class Producer_04 { public static void main (String[] args) { Consumer4 consumer = new Consumer4(); new Thread(() ->{ for (int i = 0 ; i < 10 ; i++) { try { consumer.increment(); } catch (InterruptedException e) { e.printStackTrace(); } } },"生产者A" ).start(); new Thread(() ->{ for (int i = 0 ; i < 10 ; i++) { try { consumer.decrement(); } catch (InterruptedException e) { e.printStackTrace(); } } },"消费者B" ).start(); new Thread(() ->{ for (int i = 0 ; i < 10 ; i++) { try { consumer.increment(); } catch (InterruptedException e) { e.printStackTrace(); } } },"生产者C" ).start(); new Thread(() ->{ for (int i = 0 ; i < 10 ; i++) { try { consumer.decrement(); } catch (InterruptedException e) { e.printStackTrace(); } } },"消费者D" ).start(); } } class Consumer4 { private volatile Integer num = 0 ; private Lock lock = new ReentrantLock(); private Condition condition = lock.newCondition(); public void increment () throws InterruptedException { lock.lock(); try { while (num != 0 ){ condition.await(); } num++; System.out.println(Thread.currentThread().getName() + "\t" + num); condition.signalAll(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } public void decrement () throws InterruptedException { lock.lock(); try { while (num == 0 ){ condition.await(); } num--; System.out.println(Thread.currentThread().getName() + "\t" + num); condition.signalAll(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } }
ReentrantReadWriteLock读写锁 读写锁的目的是为了让读 — 读之间不加锁
冲突
策略
读 — 读
并行化
读 — 写
串行化
写 — 读
串行化
写 — 写
串行化
package _05_AQS;import java.util.*;public class Video34 { private static final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true ); private static final Lock readLock = lock.readLock(); private static final Lock writeLock = lock.writeLock(); private static final List<String> data = new ArrayList<>(); public static void main (String[] args) throws InterruptedException { new Thread(()->write(),"write-1" ).start(); Thread.sleep(1000 ); new Thread(()->read(),"read-1" ).start(); new Thread(()->read(),"read-2" ).start(); } public static void write () { try { writeLock.lock(); data.add("数据XXX" ); Thread.sleep(3000 ); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("写锁释放" ); writeLock.unlock(); } } public static void read () { try { readLock.lock(); for (String datum : data) { System.out.println(Thread.currentThread().getName() + " 读入:" +datum); } Thread.sleep(3000 ); } catch (Exception e) { e.printStackTrace(); } finally { System.out.println("读锁释放" ); readLock.unlock(); } } }
结果:
写锁释放 read-1 读入:数据XXX read-2 读入:数据XXX 读锁释放 读锁释放
可以看到读写-串行化,读读可以并发。
StampedLock读写锁 ReadWriteLock出现的问题 1、深入分析ReadWriteLock,会发现它有个潜在的问题:如果有线程正在读,写线程需要等待读线程释放锁后才能获取写锁,即读的过程中不允许写线程去抢锁,这是一种悲观的读锁,会出现写饥饿。
2、有100个线程访问某个资源,如果有99线程个需要读锁,1个线程需要写锁,此时,写的线程很难得到执行。
StampedLock改进 3、StampedLock和ReadWriteLock相比,改进之处在于:读的过程中也允许获取写锁后写入
。这样一来,我们读的数据就可能不一致,所以,需要一点额外的代码来判断读的过程中是否有写入,这种读锁是一种乐观锁 。
4、乐观锁的意思就是乐观地估计读的过程中大概率不会有写入,因此被称为乐观锁。反过来,悲观锁则是读的过程中拒绝有写入,也就是写入必须等待。显然乐观锁的并发效率更高,但一旦有小概率的写入导致读取的数据不一致,需要能检测出来,再读一遍就行。
用StampedLock去悲观的读 StampedLock可以完全实现ReadWriteLock的功能。
public class StampedLockDemo1 { private static final StampedLock stampedLock = new StampedLock(); private static Integer DATA = 0 ; public static void write () { long stamp = -1 ; try { stamp = stampedLock.writeLock(); DATA++; System.out.println("写-->" + DATA); } finally { stampedLock.unlockWrite(stamp); } } public static void read () { long stamp = -1 ; try { stamp = stampedLock.readLock(); System.out.println("读-->" + DATA); } finally { stampedLock.unlockRead(stamp); } } public static void main (String[] args) { ExecutorService executor = Executors.newFixedThreadPool(10 ); Runnable writeTask = () -> { for (; ; ) { write(); } }; Runnable readTask = () -> { for (; ; ) { read(); } }; executor.submit(readTask); executor.submit(readTask); executor.submit(readTask); executor.submit(readTask); executor.submit(readTask); executor.submit(readTask); executor.submit(readTask); executor.submit(readTask); executor.submit(readTask); executor.submit(writeTask); } }
独线程一旦先执行,写线程很难再获得锁
输出结果:
读–>0 读–>0 读–>0 读–>0 读–>0 读–>0 读–>0 读–>0 读–>0 读–>0 读–>0 读–>0 写–>1
用StampedLock去乐观的读 只需要改一下read()
public class StampedLockDemo2 { private static final StampedLock stampedLock = new StampedLock(); private static Integer DATA = 0 ; public static void write () { long stamp = -1 ; try { stamp = stampedLock.writeLock(); DATA++; System.out.println("写-->" + DATA); } finally { stampedLock.unlockWrite(stamp); } } public static void read () { long stamp = stampedLock.tryOptimisticRead(); if (!stampedLock.validate(stamp)) { try { stamp = stampedLock.readLock(); System.out.println("悲观读-->" + DATA); return ; } finally { stampedLock.unlockRead(stamp); } } System.out.println("乐观读-->" + DATA); } public static void main (String[] args) { ExecutorService executor = Executors.newFixedThreadPool(10 ); Runnable writeTask = () -> { for (; ; ) { write(); } }; Runnable readTask = () -> { for (; ; ) { read(); } }; executor.submit(readTask); executor.submit(readTask); executor.submit(readTask); executor.submit(readTask); executor.submit(readTask); executor.submit(readTask); executor.submit(readTask); executor.submit(readTask); executor.submit(readTask); executor.submit(writeTask); } }
悲观读–>704 写–>705 写–>706 写–>707 悲观读–>707 写–>708 悲观读–>708 写–>709 写–>710 悲观读–>710 乐观读–>710 乐观读–>690 乐观读–>690 乐
ForkJoin框架 思想:分而治之。将一个大任务分割成若干小任务,最终汇总每个小任务的结果得到这个大任务的结果。
举例说明
1、我们举个例子:如果要计算一个超大数组的和,最简单的做法是用一个循环在一个线程内完成:
2、还有一种方法,可以把数组拆成两部分,分别计算,最后加起来就是最终结果,这样可以用两个线程并行执行:
3、如果拆成两部分还是很大,我们还可以继续拆,用4个线程并行执行:
这就是Fork/Join任务的原理:判断一个任务是否足够小。如果是,直接计算,否则,就分拆成几个小任务分别计算。这个过程可以反复“裂变”成一系列小任务。
编码实现
整个任务流程如下所示 :
首先继承任务,覆写任务的执行方法
通过判断阈值,判断该线程是否可以执行
如果不能执行,则将任务继续递归分配,利用fork方法,并行执行
如果是有返回值的,才需要调用join方法,汇集数据。
主要的两个类:
RecursiveAction
一个递归无结果的ForkJoinTask(没有返回值)
RecursiveTask
一个递归有结果的ForkJoinTask(有返回值)
RecursiveTask public class ForkJoinRecursiveTask { private final static int MAX_THRESHOLD = 3 ; public static void main (String[] args) { final ForkJoinPool joinPool = new ForkJoinPool(); ForkJoinTask<Integer> future = joinPool.submit(new CalculatedRecursiveTask(0 , 1000 )); try { Integer integer = future.get(); System.out.println("执行结果:" + integer); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } private static class CalculatedRecursiveTask extends RecursiveTask <Integer > { private final int start; private final int end; private CalculatedRecursiveTask (int start, int end) { this .start = start; this .end = end; } @Override protected Integer compute () { if (end - start <= MAX_THRESHOLD) { return IntStream.rangeClosed(start, end).sum(); } else { int middle = (end + start) / 2 ; CalculatedRecursiveTask leftTask = new CalculatedRecursiveTask(start, middle); CalculatedRecursiveTask rightTask = new CalculatedRecursiveTask(middle + 1 , end); leftTask.fork(); rightTask.fork(); return leftTask.join() + rightTask.join(); } } } }
这个是有返回值的
RecursiveAction 这个是没有返回值的
public class ForkJoinRecursiveAction { private final static int MAX_THRESHOLD = 3 ; private final static AtomicInteger SUM = new AtomicInteger(); public static void main (String[] args) throws InterruptedException { ForkJoinPool forkJoinPool = new ForkJoinPool(); forkJoinPool.submit(new CalculateRecursiveAction(0 ,1000 )); forkJoinPool.awaitTermination(10 , TimeUnit.SECONDS); System.out.println("执行结果为:" + SUM); } private static class CalculateRecursiveAction extends RecursiveAction { private final int start; private final int end; private CalculateRecursiveAction (int start, int end) { this .start = start; this .end = end; } @Override protected void compute () { if ((end-start)<=MAX_THRESHOLD){ SUM.addAndGet(IntStream.rangeClosed(start,end).sum()); }else { int middle = (start+end)/2 ; CalculateRecursiveAction leftAction = new CalculateRecursiveAction(start,middle); CalculateRecursiveAction rightAction = new CalculateRecursiveAction(middle+1 ,end); leftAction.fork(); rightAction.fork(); } } } }
原理 整个流程需要三个类完成 :
1、ForkJoinPool
既然任务是被逐渐的细化的,那就需要把这些任务存在一个池子里面,这个池子就是ForkJoinPool。
它与其它的ExecutorService区别主要在于它使用**”工作窃取”**,那什么是工作窃取呢?
工作窃取:一个大任务会被划分成无数个小任务,这些任务被分配到不同的队列,这些队列有些干活干的块,有些干得慢。于是干得快的,一看自己没任务需要执行了,就去隔壁的队列里面拿去任务执行。
2、ForkJoinTask
ForkJoinTask就是ForkJoinPool里面的每一个任务。他主要有两个子类:RecursiveAction
和RecursiveTask
。然后通过fork()方法去分配任务执行任务,通过join()方法汇总任务结果。
小总结
Fork/Join是一种基于“分治”的算法:通过分解任务,并行执行,最后合并结果得到最终结果。
ForkJoinPool线程池可以把一个大任务分拆成小任务并行执行,任务类必须继承自RecursiveTask或RecursiveAction。
使用Fork/Join模式可以进行并行计算以提高效率。