Process进程与Thread线程
注意: 很多场景下多线程是模拟出来的,真正的多线程是指多个CPU,即多核。如果是模拟出来的多线程,即一个CPU下,在同一时间点,CPU只能执行一个代码,因为切换的很快,就有同时执行的错觉。
核心概念:
1.Thread class 继承Thread类
继承Thread类:
实现Runnable接口:
与Callable相比 Runnable效率低且没有返回值
定义:静态代理是在编译期间就已经确定代理类的实现方式,代理类和委托类是一对一的关系。
实现方式:手动编写代理类,在代理类中调用委托类的方法,并可以在方法调用前后添加额外的逻辑。
优点:易于理解和实现,可以直观地看到代理类的实现逻辑。
缺点:每个需要代理的类都需要一个代理类,增加了代码量和维护成本;不够灵活,无法做到通用代理。
// 委托类
interface Subject {
void request();
}
// 委托类的具体实现
class RealSubject implements Subject {
public void request() {
System.out.println("RealSubject: Processing request");
}
}
// 代理类
class Proxy implements Subject {
private RealSubject realSubject = new RealSubject();
public void request() {
// 可以在调用委托类方法前后添加额外逻辑
System.out.println("Proxy: Before request");
realSubject.request();
System.out.println("Proxy: After request");
}
}
定义:动态代理是在运行时动态生成代理类的方式,代理类在程序运行时创建,无需预先定义,可以实现通用的代理逻辑。
实现方式:使用Java的反射机制,在运行时创建代理类,并实现代理逻辑。
优点:可以实现通用的代理逻辑,减少重复代码;更加灵活,可根据需要动态添加代理。
缺点:相比静态代理,动态代理实现更为复杂。
// 委托类
interface Subject {
void request();
}
// 委托类的具体实现
class RealSubject implements Subject {
public void request() {
System.out.println("RealSubject: Processing request");
}
}
// 动态代理处理器
class DynamicProxyHandler implements InvocationHandler {
private Object target;
public DynamicProxyHandler(Object target) {
this.target = target;
}
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
System.out.println("Proxy: Before request");
Object result = method.invoke(target, args);
System.out.println("Proxy: After request");
return result;
}
}
// 创建动态代理
public class Main {
public static void main(String[] args) {
RealSubject realSubject = new RealSubject();
//newProxyInstance是 Java 提供的用于创建代理对象的静态方法
Subject proxy = (Subject) Proxy.newProxyInstance(
//指定类加载器,用于加载代理类
realSubject.getClass().getClassLoader(),
//指定被代理类实现的接口,确保代理对象和被代理对象拥有相同的接口。
realSubject.getClass().getInterfaces(),
//指定一个实现InvocationHandler接口的代理处理器,用于处理代理对象的方法调用。
new DynamicProxyHandler(realSubject)
);
proxy.request();
}
}
代码解释:
当调用 proxy.request() 方法时,实际上会触发代理对象的 invoke 方法。
在 DynamicProxyHandler 的 invoke 方法中,会执行代理逻辑,比如输出 “Proxy: Before request”,然后调用被代理对象的 request 方法。
被代理对象的 request 方法执行完成后,会再次回到 DynamicProxyHandler 的 invoke 方法中,继续执行代理逻辑,比如输出 “Proxy: After request”。
方法 | 说明 |
---|---|
setPriority(int newPriority) | 更改线程的优先级 |
static void sleep(long millis) | 在指定的毫秒数内让当前正在执行的线程休眠 |
void join() | 等待该线程终止 |
static void yield() | 暂停当前正在执行的线程对象,并执行其他线程 |
boolean isAlive() | 测试线程是否处于活动状态 |
1.建议线程正常停止–>利用循环次数
2.建议使用标志位
3.不要使用stop、destroy等jdk不建议使用的方法
四个条件
1.互斥条件:一个资源每次只能被一个进程使用
2.请求与保持条件:一个进程因请求资源而阻塞时,对已获取的资源保持不放
3.不剥夺条件:进程已获得的资源,在未使用完之前,不能强行剥夺
4.循环等待条件:若干线程之间形成一种头尾相接的循环等待资源关系
new ReentrantLock() 获得锁
线程常用的方法:
JUC:java.util.concurrent
判断等待、业务、通知
public class Demo04 {
public static void main(String[] args) {
Data data = new Data();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "A").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "B").start();
}
}
// 判断等待,业务,通知
class Data {
private int i = 0;
// +1
public synchronized void increment() throws InterruptedException {
if (i != 0) {
this.wait();
}
i++;
System.out.println(Thread.currentThread().getName() + "=>" + i);
// 通知其他线程我+1完成
this.notifyAll();
}
// -1
public synchronized void decrement() throws InterruptedException {
if (i==0){
this.wait();
}
i--;
System.out.println(Thread.currentThread().getName() + "=>" + i);
// 通知其他线程,我-1完毕
this.notifyAll();
}
}
问题存在:A、B、C、D四个线程!虚假唤醒问题
if改成while解决虚假唤醒,原因是if只会判断一次
public class Demo04 {
public static void main(String[] args) {
Data data = new Data();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "A").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "B").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "C").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "D").start();
}
}
// 判断等待,业务,通知
class Data {
private int i = 0;
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
// +1
public void increment() throws InterruptedException {
lock.lock();
try {
while (i != 0) {
condition.await();
}
i++;
System.out.println(Thread.currentThread().getName() + "=>" + i);
// 通知其他线程我+1完成
condition.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
// -1
public void decrement() throws InterruptedException {
lock.lock();
try {
while (i==0){
condition.await();
}
i--;
System.out.println(Thread.currentThread().getName() + "=>" + i);
// 通知其他线程,我-1完毕
condition.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
通过signal()指定那个线程执行
public class Demo05 {
public static void main(String[] args) {
Data01 data01 = new Data01();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data01.A();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "A").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data01.B();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "B").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data01.C();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "C").start();
}
}
// 判断等待,业务,通知
//A执行完调用B,B执行完调用C,C执行完调用A
class Data01 {
private int num = 1;// 1A 2B 3C
private Lock lock = new ReentrantLock();
private Condition condition1 = lock.newCondition();
private Condition condition2 = lock.newCondition();
private Condition condition3 = lock.newCondition();
public void A() throws InterruptedException {
lock.lock();
try {
// 业务代码,判断=>执行=>通知!
while (num!=1){
condition1.await();
}
System.out.println(Thread.currentThread().getName()+"=>AAAAA");
num = 2;
// 唤醒指定的线程,B
condition2.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void B() throws InterruptedException {
lock.lock();
try {
while (num!=2){
condition2.await();
}
num = 3;
System.out.println(Thread.currentThread().getName()+"=>BBBBB");
// 唤醒指定的线程,C
condition3.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void C() throws InterruptedException {
lock.lock();
try {
while (num!=3){
condition3.await();
}
num = 1;
System.out.println(Thread.currentThread().getName()+"=>CCCCC");
// 唤醒指定的线程,A
condition1.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
public class Test01 {
public static void main(String[] args) {
Phone phone = new Phone();
new Thread(() -> {
phone.call();
}, "A").start();
new Thread(() -> {
phone.send();
}, "B").start();
}
}
class Phone {
public synchronized void send() {
System.out.println("发短信");
}
public synchronized void call() {
//现象二时添加,让线程先睡4秒种
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("打电话");
}
}
现象一:两个方法都使用synchronized关键字,先执行打电话
现象二:让线程先睡4秒,结果依然是先打电话
原因:synchromized锁的是方法的调用者,并且开启的两个线程方法使用的是同一把锁,那么就会出现谁先拿到谁先执行的现象。及时我们让call方法sleep了4秒,依然是call方法先执行。
public class Test02 {
public static void main(String[] args) {
Phone2 phone1 = new Phone2();
Phone2 phone2 = new Phone2();
new Thread(() -> {
phone1.call();
}, "A").start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(() -> {
// phone1.hello();
phone2.send();
}, "B").start();
}
}
class Phone2 {
public synchronized void send() {
System.out.println("发短信");
}
public synchronized void call() {
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("打电话");
}
public void hello() {
System.out.println("hello");
}
}
现象三:在之前代码的基础上,新添加一个普通的方法,此时先执行hello方法
现象四:新实例化一个phone对象,使用不同的对象去调用方法,此时先执行发短信
原因:普通方法没有锁就不是同步方法不受锁的影响,又由于时间的延迟,所以先打印hello.现象四中,使用了不同的对象,以至于是锁的对象不是同一个,所以先发短信。
public class Test03 {
public static void main(String[] args) {
Phone3 phone1 = new Phone3();
Phone3 phone2 = new Phone3();
new Thread(() -> {
phone1.call();
}, "A").start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(() -> {
//phone1.send();
phone2.send();
}, "B").start();
}
}
class Phone3 {
public static synchronized void send() {
System.out.println("发短信");
}
public static synchronized void call() {
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("打电话");
}
}
现象五:在之前代码的基础上,方法前添加static关键字,先执行打电话
现象六:再添加一个对象,使用不同的对象进行方法的打印,依然是先执行打电话
原因:当我们添加了static关键字以后,此时我们先打印打电话,不仅是因为我们拿到的是同一个对象的锁,还因为我们的锁是直接锁的该类的Class模板。当我们再新添加一个对象时,由于我们使用了static,直接锁在模板上,所以依然是先执行打电话
public class Test04 {
public static void main(String[] args) {
Phone4 phone1 = new Phone4();
Phone4 phone2 = new Phone4();
new Thread(() -> {
phone1.call();
}, "A").start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(() -> {
// phone1.send();
phone2.send();
}, "B").start();
}
}
class Phone4 {
public synchronized void send() {
System.out.println("发短信");
}
public static synchronized void call() {
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("打电话");
}
}
现象七:当去掉一个锁方法的static关键字以后,先打印发短信
现象八:当我们新建一个对象后,是有不同的对象去调用,还是先打印发短信
原因:现象7中,只添加了一个static关键字,模板只有一部分被锁,则先打印发短信。现象8中,又新建了一个对象,因为锁的东西不一样(打电话锁模板,发短信锁对象),所以先执行发短信。
只有当锁的对象或模板时同一个的时候,才能借助调用的顺序来执行。
多线程下不安全;可能会报错:java.util.ConcurrentModificationException (并发修改异常)
// java.util.ConcurrentModificationException:并发修改异常
//解决方案:
//List<String> list = new Vector<>(); 效率低
//List<String> strings = Collections.synchronizedList(new ArrayList<>());
//List<String> strings = new CopyOnWriteArrayList<>(); 推荐
public class Test11 {
public static void main(String[] args) {
List<String> strings = new ArrayList<>();
for (int i = 0; i < 10; i++) {
new Thread(()->{
strings.add(UUID.randomUUID().toString().substring(0,5));
System.out.println(strings);
},String.valueOf(i)).start();
}
}
}
//CopyOnWriteArrayList 写入时复制,是COW思想 是一种优化策略
//多个线程调用时,list 读取数据时候固定的
//写入的为了避免覆盖,在写入的时候复制一份,复制完之后再给调用者
底层new了一个HashMap,在add时,map.put(e,PRESENT) key是不能重复的,PRESENT是一个常量无意义
// java.util.ConcurrentModificationException:并发修改异常
//解决方案:
//Set<String> strings = Collections.synchronizedSet(new HashSet<>());
//Set<String> strings = new CopyOnWriteArraySet<>();
public class Test11 {
public static void main(String[] args) {
// Set<String> strings = Collections.synchronizedSet(new HashSet<>());
HashSet<String> strings = new HashSet<>();
for (int i = 0; i < 10; i++) {
new Thread(()->{
strings.add(UUID.randomUUID().toString().substring(0,5));
System.out.println(strings);
}).start();
}
}
}
工作中不会使用到HashMap
// java.util.ConcurrentModificationException:并发修改异常
//解决方案:
//使用Map<String, String> concurrentHashMap = new ConcurrentHashMap<>();
public class Test11 {
public static void main(String[] args) {
// 默认相当于
Map<String, String> map = new HashMap<>(16, 0.75F);
for (int i = 0; i < 10; i++) {
new Thread(()->{
map.put(Thread.currentThread().getName(),UUID.randomUUID().toString().substring(0,5));
System.out.println(map);
}).start();
}
}
}
Callable接口类似于Runnable接口,线程第三种创建方式。
可以抛出异常。
可以有返回值。
实现方法不同。run/call方法
底层:Runnable有一个实现类FutureTask
new Thread(new Runnable()).start = new Thread(new FutureTask(Callable)).start
public class CallableTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask futureTask = new FutureTask(new MyThread());// 适配类
new Thread(futureTask,"A").start();
new Thread(futureTask,"B").start();// 打印一个Call,结果会被缓存,提高效率
// get方法可能会产生阻塞 原因:要等待结果的返回
Integer s = (Integer) futureTask.get();
System.out.println(s);
}
}
class MyThread implements Callable<Integer>{
@Override
public Integer call(){
System.out.println("Call");
return 1024;
}
}
输出:
call()
1024
原因结果是存入缓存,
1.多线程任务汇总。2.多线程任务阻塞住,等待发令枪响,一起执行。
// 计数器
//每次有线程调用,数量-1,当计数器归零,countDownLatch.await()就会被唤醒向下执行。
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
// 总数是6,必须要是执行任务的时候使用
CountDownLatch countDownLatch = new CountDownLatch(6);
for (int i = 0; i < 6; i++) {
new Thread(()->{
System.out.println(Thread.currentThread().getName()+"=>Go Out");
countDownLatch.countDown();// 数量-1
}).start();
}
countDownLatch.await();// 等待计数器归零,然后再往下执行
System.out.println("关门");
}
}
输出:
Thread-0=>Go Out
Thread-5=>Go Out
Thread-3=>Go Out
Thread-1=>Go Out
Thread-2=>Go Out
Thread-4=>Go Out
关门
// 相当于加法计数器
public class CyclicBarrierDemo {
public static void main(String[] args) {
// 集齐七颗龙珠召唤神龙
CyclicBarrier cyclicBarrier = new CyclicBarrier(7, () -> {// 如果计数器为7,线程只有6个,则会等待,不进行召唤神龙
System.out.println("召唤神龙");
});
for (int i = 0; i < 7; i++) {
final int temp = i;
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "收集" + temp + "个龙珠!");
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
}
}
}
输出:
Tread-1收集1个龙珠!
Tread-5收集5个龙珠!
Tread-0收集0个龙珠!
Tread-2收集2个龙珠!
Tread-3收集3个龙珠!
Tread-4收集4个龙珠!
召唤神龙
//semaphore.acquire();获得,假设已经满了则等待,等待其他线程释放。
//semaphore.release();释放,会将当前的信号量释放+1,然后唤醒等待的线程。
public class SemaphoreDemo {
public static void main(String[] args) {
// 线程数量:停车位!限流
Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < 6; i++) {
new Thread(()->{
try {
semaphore.acquire();// 得到
System.out.println(Thread.currentThread().getName()+"抢到车位!");
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName()+"离开车位!");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
semaphore.release();// 释放
}
}).start();
}
}
}
ReadWriteLock接口有一个实现类ReentrantReadWriteLock类。
读可以被多个线程同时读,写的时候只能有一个线程去写
创建ReentrantReadWriteLock使用writeLock/readLock获取锁,最后释放锁
/**
* 独占锁(写锁):一次只能被一个线程占有
* 共享锁(读锁):多个线程可以同时占有
* ReentrantLock:
* 读-读:可以共存
* 读-写:不可以共存
* 写-写:不可以共存
*/
public class ReentrantLockDemo {
public static void main(String[] args) {
MyCacheLock myCache = new MyCacheLock();
// 5个线程写
for (int i = 1; i <= 5; i++) {
final int temp = i;
new Thread(() -> {
myCache.put(temp + "", temp + "");
}, String.valueOf(i)).start();
}
// 5个线程读
for (int i = 1; i <= 5; i++) {
final int temp = i;
new Thread(() -> {
myCache.get(temp + "");
}, String.valueOf(i)).start();
}
}
}
class MyCacheLock {
private volatile Map<String, Object> map = new HashMap<>();
// 读写锁,更加细粒度的控制
private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
// 写,同时只有一个线程写
public void put(String key, Object obj) {
readWriteLock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName() + "写入");
map.put(key, obj);
System.out.println(Thread.currentThread().getName() + "写入OK");
} catch (Exception e) {
e.printStackTrace();
} finally {
readWriteLock.writeLock().unlock();
}
}
// 读,所有线程都可以读
public void get(String key) {
readWriteLock.readLock().lock();
try {
System.out.println(Thread.currentThread().getName() + "读取");
map.get(key);
System.out.println(Thread.currentThread().getName() + "读取OK");
} catch (Exception e) {
e.printStackTrace();
} finally {
readWriteLock.readLock().unlock();
}
}
}
写时队列满时阻塞等待,读时队列空时阻塞等待
使用场景:多线程并发处理,线程池
方式 | 抛出异常 | 有返回值,不抛异常 | 阻塞,一直等待 | 阻塞,超时等待 |
---|---|---|---|---|
添加 | add() | offer() | put() | offer(,) |
移除 | remove() | pull() | take() | pull(,) |
检测队首元素 | element() | peek() | - | - |
public class Test {
public static void main(String[] args) throws InterruptedException {
test4();
}
// 抛出异常:java.lang.IllegalStateException: Queue full
public static void test1(){
// 队列的大小为3
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
// add()方法返回boolean值
boolean flag1 = blockingQueue.add("a");
boolean flag2 = blockingQueue.add("b");
boolean flag3 = blockingQueue.add("c");
boolean flag4 = blockingQueue.add("d");// add添加元素超过队列的长度会抛出异常java.lang.IllegalStateException: Queue full
System.out.println(blockingQueue.element());// 获得队首元素
System.out.println("=========");
// remove()返回本次移除的元素
Object e1 = blockingQueue.remove();
Object e2 = blockingQueue.remove();
Object e3 = blockingQueue.remove();
Object e4 = blockingQueue.remove();// 队列中没有元素仍继续移除元素会抛出异常java.util.NoSuchElementException
}
// 有返回值,不抛出异常
public static void test2(){
// 队列的大小为3
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
// offer返回boolean值
boolean flag1 = blockingQueue.offer("a");
boolean flag2 = blockingQueue.offer("b");
boolean flag3 = blockingQueue.offer("c");
//boolean flag4 = blockingQueue.offer("d");// offer添加元素超过队列的长度会返回false
System.out.println(blockingQueue.peek());// 获得队首元素
System.out.println("=========");
// poll()返回本次移除的元素
Object poll1 = blockingQueue.poll();
Object poll2 = blockingQueue.poll();
Object poll3 = blockingQueue.poll();
Object poll4 = blockingQueue.poll();// 队列中没有元素仍继续移除元素会打印出null
}
// 阻塞,一直等待
public static void test3() throws InterruptedException {
// 队列的大小为3
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
// put没有返回值
blockingQueue.put("a");
blockingQueue.put("b");
blockingQueue.put("c");
//blockingQueue.put("d");// put添加元素超过队列的长度会一直等待
System.out.println("=========");
// take()返回本次移除的元素
Object take1 = blockingQueue.take();
Object take2 = blockingQueue.take();
Object take3 = blockingQueue.take();
Object take4 = blockingQueue.take();// 队列中没有元素仍继续移除元素会一直等待
}
// 阻塞,超时等待
public static void test4() throws InterruptedException {
// 队列的大小为3
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
// offer返回boolean值
boolean flag1 = blockingQueue.offer("a");
boolean flag2 = blockingQueue.offer("b");
boolean flag3 = blockingQueue.offer("c");
// offer添加元素超过队列的长度会返回false;并且等待指定时间后推出,向下执行
boolean flag4 = blockingQueue.offer("d", 2, TimeUnit.SECONDS);
System.out.println("=========");
// poll()返回本次移除的元素
Object poll1 = blockingQueue.poll();
Object poll2 = blockingQueue.poll();
Object poll3 = blockingQueue.poll();
// 队列中没有元素仍继续移除元素会打印出null,等待指定之间后退出。
Object poll4 = blockingQueue.poll(2,TimeUnit.SECONDS);
}
}
同步队列与其他的BlockingQueue不一样,SynchronousQueue不存储元素,put了一个元素,必须从里面先take取处理,否则不能再put进去值;
优点:降低资源的消耗,提高响应速度,方便管理
3大方法,7大参数,4种拒绝策略
在线程池创建中不允许使用Executors去创建,而是通过ThreadPoolExecutor创建,避免资源耗尽OOM的风险
3大方法
Executors.newSingleThreadExecutor() //单个线程
Executors.newFixedThreadExecutor(5) //创建一个固定的线程池大小
Executors.newCachedThreadExecutor() //可伸缩的线程池
7大参数
new ThreadPoolExecutor(
int corePoolSize, //核心线程数
int maximumPoolSize, //最大线程数
long keepAliveTime,//等待时长
TimeUnit unit,//时间单位
BlockingQueue workQueue,//阻塞队列
ThreadFactory threadFactory,//线程工程(不用动 默认即可)
RejecteadExecutionHandler handle//拒绝策略)
ps:
如何去定义最大线程数:
- IO密集型:判断程序中十分消耗IO的线程,线程数大于他们数量
- CPU密集型:处理器几核就设置几核,保存CPU效率最高
获取CPU核数 Runtime.getRuntime().availableProcessors()
4种拒绝策略
new ThreadPoolExecutor.AbortPolicy(); // 抛出异常
new ThreadPoolExecutor.CallerRunsPolicy();// 哪来的去哪(主线程来的,就回去让主线程执行)
new ThreadPoolExecutor.DiscardPolicy();// 丢掉任务,不抛出异常
new ThreadPoolExecutor.DiscardOldestPolicy();// 尝试和最早的竞争,竞争失败了也丢掉任务,也不抛出异常
lambda表达式、链式编程、函数式接口、Stream流式计算
函数式接口就是接口中只有一个方法
接收一个输入参数,并返回一个结果。通常用于对输入进行转换或映射。
@FunctionlInterface
public interface Function<T,R>{ //T传入参数,R返回参数
R apply(T t);
}
示例:
public class FunctionDemo {
public static void main(String[] args) {
Function<String, String> function = (str) -> {return str;};
System.out.println(function.apply("aaaaaaaaaa"));
}
}
接收一个输入参数,返回一个布尔值结果。通常用于对输入进行条件判断。
@FunctionalInterface
public interface Predicate<T>{ //传入参数T,只能返回Boolean值
boolean test(T t);
}
示例:
public class PredicateDemo {
public static void main(String[] args) {
Predicate<String> predicate = (str) -> {return str.isEmpty();};
// false
System.out.println(predicate.test("aaa"));
// true
System.out.println(predicate.test(""));
}
}
不接受任何输入参数,但返回一个结果。通常用于提供数据的场景,比如工厂方法等。
@FunctionalInterface
public interface Suppier<T>{ //无参,有返回值
T get( );
}
示例:
public class Demo4 {
public static void main(String[] args) {
Supplier<String> supplier = ()->{return "1024";};
System.out.println(supplier.get());
}
}
接收一个输入参数,但不返回任何结果。通常用于对参数进行一些操作,比如打印、修改状态等。
@FunctionalInterface
public interface Consummer <T>{ //有参,无返回值
void accept(T t);
}
示例:
public class Demo3 {
public static void main(String[] args) {
Consumer<String> consumer = (str)->{
System.out.println(str);
};
consumer.accept("abc");
}
}
ForkJoin 特点: 工作窃取!采用双端队列
MapReduce 核心思想->把大任务拆分为小任务
如何使用:
ForkJoin 的计算类
public class ForkJoinDemo extends RecursiveTask<Long> {
private long star;
private long end;
/** 临界值 */
private long temp = 1000000L;
public ForkJoinDemo(long star, long end) {
this.star = star;
this.end = end;
}
/**
* 计算方法
* @return
*/
@Override
protected Long compute() {
if ((end - star) < temp) {
Long sum = 0L;
for (Long i = star; i < end; i++) {
sum += i;
}
return sum;
}else {
// 使用ForkJoin 分而治之 计算
//1 . 计算平均值
long middle = (star + end) / 2;
ForkJoinDemo forkJoinDemo1 = new ForkJoinDemo(star, middle);
// 拆分任务,把线程压入线程队列
forkJoinDemo1.fork();
ForkJoinDemo forkJoinDemo2 = new ForkJoinDemo(middle, end);
forkJoinDemo2.fork();
long taskSum = forkJoinDemo1.join() + forkJoinDemo2.join();
return taskSum;
}
}
}
测试类
public class ForkJoinTest {
private static final long SUM = 20_0000_0000;
public static void main(String[] args) throws ExecutionException, InterruptedException {
test2();
test3();
}
/**
* 使用ForkJoin 方法
*/
public static void test2() throws ExecutionException, InterruptedException {
long star = System.currentTimeMillis();
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTask<Long> task = new ForkJoinDemo(0L, SUM);
ForkJoinTask<Long> submit = forkJoinPool.submit(task);
Long along = submit.get();
System.out.println(along);
long end = System.currentTimeMillis();
System.out.println("时间:" + (end - star));
System.out.println("-----------");
}
/**
* 使用 Stream 并行流 计算
*/
public static void test3() {
long star = System.currentTimeMillis();
long sum = LongStream.range(0L, 20_0000_0000L).parallel().reduce(0, Long::sum);
System.out.println(sum);
long end = System.currentTimeMillis();
System.out.println("时间:" + (end - star));
System.out.println("-----------");
}
}
Future 设计的初衷:对将来的某个事件结果进行建模
Future接口有一个实现类CompletableFuture
(1)没有返回值的runAsync异步回调
public static void main(String[] args) throws ExecutionException, InterruptedException
{
// 发起 一个 请求
System.out.println(System.currentTimeMillis());
System.out.println("---------------------");
CompletableFuture<Void> future = CompletableFuture.runAsync(()->{
//发起一个异步任务
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+".....");
});
System.out.println(System.currentTimeMillis());
System.out.println("------------------------------");
//输出执行结果
System.out.println(future.get()); //获取执行结果
}
(2)有返回值的异步回调supplyAsync
//有返回值的异步回调
CompletableFuture<Integer> completableFuture=CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread().getName());
try {
TimeUnit.SECONDS.sleep(2);
int i=1/0;
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1024;
});
System.out.println(completableFuture.whenComplete((t, u) -> {
//success 回调
System.out.println("t=>" + t); //正常的返回结果
System.out.println("u=>" + u); //抛出异常的 错误信息
}).exceptionally((e) -> {
//error回调
System.out.println(e.getMessage());
return 404;
}).get());
whenComplete: 有两个参数,一个是t 一个是u
T:是代表的 正常返回的结果;
U:是代表的 抛出异常的错误信息;
如果发生了异常,get可以获取到exceptionally返回的值;
JMM:JAVA内存模型,不存在的东西,是一个概念,也是一个约定!
关于JMM的一些同步的约定:
1、线程解锁前,必须把共享变量立刻刷回主存;
2、线程加锁前,必须读取主存中的最新值到工作内存中;
3、加锁和解锁是同一把锁;
线程中分为 工作内存、主内存
JMM8种操作:
Read(读取): 作用于主内存变量,它把一个变量的值从主内存传输到线程的工作内存中,以便随后的load动作使用;
load(载入):作用于工作内存的变量,它把read操作从主存中变量放入工作内存中;
Use(使用): 作用于工作内存中的变量,它把工作内存中的变量传输给执行引擎,每当虚拟机遇到一个需要使用到变量的值,就会使用到这个指令;
assign(赋值): 作用于工作内存中的变量,它把一个从执行引擎中接受到的值放入工作内存的变量副本中;
store(存储): 作用于主内存中的变量,它把一个从工作内存中一个变量的值传送到主内存中,以便后续的write使用;
write(写入): 作用于主内存中的变量,它把store操作从工作内存中得到的变量的值放入主内存的变量中;
lock(锁定): 作用于主内存的变量,把一个变量标识为线程独占状态;
unlock(解锁): 作用于主内存的变量,它把一个处于锁定状态的变量释放出来,释放后的变量才可以被其他线程锁定;
问题:
线程B修改了值,但线程A不能及时可见怎么办?
解决办法采用:Volatile
Volatile 是 Java 虚拟机提供 轻量级的同步机制
1、保证可见性
2、不保证原子性
3、禁止指令重排
public class JMMDemo01 {
// 如果不加volatile 程序会死循环
// 加了volatile是可以保证可见性的
private volatile static Integer number = 0;
public static void main(String[] args) {
//main线程
//子线程1
new Thread(()->{
while (number==0){
}
}).start();
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
//子线程2
new Thread(()->{
while (number==0){
}
}).start();
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
number=1; //主线程对其修改为1
System.out.println(number);
}
}
原子性:不可分割;
线程A在执行任务的时候,不能被打扰的,也不能被分割的,要么同时成功,要么同时失败。
public class VDemo02 {
private static volatile int number = 0;
public static void add(){
//++ 不是一个原子性操作,是两个~3个操作
number++;
}
public static void main(String[] args) {
//理论上number === 20000
for (int i = 1; i <= 20; i++) {
new Thread(()->{
for (int j = 1; j <= 1000 ; j++) {
add();
}
}).start();
}
while (Thread.activeCount()>2){
//main gc
Thread.yield();
}
System.out.println(Thread.currentThread().getName()+",num="+number);
}
}
如果不加lock和synchronized ,怎么样保证原子性?
采用原子类
java.util.concurrent.atomic–AtomicBoolean/AtomicInteger/AtomicLong
原子类的底层是Unsafe这个类,可以直接操作内存修改值
public class VDemo02 {
private static volatile AtomicInteger number = new AtomicInteger();
public static void add(){
// number++;
number.incrementAndGet(); //底层是CAS保证的原子性
}
public static void main(String[] args) {
//理论上number === 20000
for (int i = 1; i <= 20; i++) {
new Thread(()->{
for (int j = 1; j <= 1000 ; j++) {
add();
}
}).start();
}
while (Thread.activeCount()>2){
//当线程数大于2时,让他们一直处理 礼让 当等于2时表示执行完毕 main gc
Thread.yield();
}
System.out.println(Thread.currentThread().getName()+",num="+number);
}
}
处理器在进行指令重排的时候,会考虑数据之间的依赖性!
volatile可以避免指令重排:volatile中会加一道内存的屏障,这个内存屏障可以保证在这个屏障中的指令顺序。
哪里使用内存屏障最多: 单例模式
单例思想 构造器私有化
1)饿汉式
不管三七二十一,上来就new一个对象
public class Hungry {
/**
* 可能会浪费空间
*/
private byte[] data1=new byte[1024*1024];
private byte[] data2=new byte[1024*1024];
private byte[] data3=new byte[1024*1024];
private byte[] data4=new byte[1024*1024];
//构造器私有化
private Hungry(){
}
private final static Hungry hungry = new Hungry();
//static 共享数据 数据可见
public static Hungry getInstance(){
return hungry;
}
}
2)DCL(双重检锁)懒汉式
//懒汉式单例模式
public class LazyMan {
private static boolean key = false;
private LazyMan(){
//避免因为反射 通过空参获取构造器 来创建对象 就在里面加锁
synchronized (LazyMan.class){
//若通过反射获取两个对象,此时只上锁是不行的 还需要标志位来判断
if (key==false){
key=true;
}
else{
throw new RuntimeException("不要试图使用反射破坏异常");
}
}
System.out.println(Thread.currentThread().getName()+" ok");
}
private volatile static LazyMan lazyMan;
//双重检测锁模式 简称DCL懒汉式
public static LazyMan getInstance(){
//需要加锁
if(lazyMan==null){
synchronized (LazyMan.class){
if(lazyMan==null){
lazyMan=new LazyMan();
/** 底层是以下三步
* 1、分配内存空间
* 2、执行构造方法,初始化对象
* 3、把这个对象指向这个空间
*
* 就有可能出现指令重排问题
* 比如执行的顺序是1 3 2 等
* 我们就可以添加volatile保证指令重排问题
*/
}
}
}
return lazyMan;
}
//单线程下 是ok的
//但是如果是并发的
public static void main(String[] args) throws NoSuchMethodException, IllegalAccessException, InvocationTargetException, InstantiationException, NoSuchFieldException {
//Java中有反射
// LazyMan instance = LazyMan.getInstance();
//从反射中获取对象,并篡改
//篡改标志位
Field key = LazyMan.class.getDeclaredField("key");
key.setAccessible(true);
//null 获取空参构造器
Constructor<LazyMan> declaredConstructor = LazyMan.class.getDeclaredConstructor(null);
declaredConstructor.setAccessible(true); //无视了私有的构造器
//通过反射获得对象 newInstance
LazyMan lazyMan1 = declaredConstructor.newInstance();
key.set(lazyMan1,false);
LazyMan instance = declaredConstructor.newInstance();
System.out.println(instance);
System.out.println(lazyMan1);
System.out.println(instance == lazyMan1);
}
}
3)静态内部类
//静态内部类
public class Holder {
private Holder(){
}
public static Holder getInstance(){
return InnerClass.holder;
}
public static class InnerClass{
private static final Holder holder = new Holder();
}
}
在单例模式下,都是不安全的,因为存在反射
反射破坏不了枚举
此时解决办法上 枚举 enum本身也是一个Class类
enum没有 无参构造器 有一个有参构造(String s,int i)
//enum 是什么? enum本身就是一个Class 类
public enum EnumSingle {
INSTANCE;
public EnumSingle getInstance(){
return INSTANCE;
}
}
class Test{
public static void main(String[] args) throws NoSuchMethodException, IllegalAccessException, InvocationTargetException, InstantiationException {
EnumSingle instance1 = EnumSingle.INSTANCE;
Constructor<EnumSingle> declaredConstructor = EnumSingle.class.getDeclaredConstructor(String.class,int.class);
declaredConstructor.setAccessible(true);
//EnumSingle.class.getDeclaredConstructor()此时 报NoSuchMethodException 没有空参构造
EnumSingle instance2 = declaredConstructor.newInstance();
System.out.println(instance1);
System.out.println(instance2);
//EnumSingle.class.getDeclaredConstructor(String.class,int.class)此时报不能破坏单例
}
}
比较当前工作内存中的值 和 主内存中的值,如果这个值是期望的,那么则执行操作!如果不是就一直循环,使用的是自旋锁。
缺点:
- 循环会耗时;
- 一次性只能保证一个共享变量的原子性;
- 它会存在ABA问题
public class casDemo {
//CAS : compareAndSet 比较并交换
public static void main(String[] args) {
AtomicInteger atomicInteger = new AtomicInteger(2020);
//boolean compareAndSet(int expect, int update)
//期望值、更新值
//如果实际值 和 我的期望值相同,那么就更新
//如果实际值 和 我的期望值不同,那么就不更新
System.out.println(atomicInteger.compareAndSet(2020, 2021));
System.out.println(atomicInteger.get()); //输出:2021
//因为期望值是2020 实际值却变成了2021 所以会修改失败
//CAS 是CPU的并发原语
System.out.println(atomicInteger.compareAndSet(2020, 2021));
System.out.println(atomicInteger.get());
}
}
解决ABA问题:思想是乐观锁–原子引用(带版本号的原子操作)
public class CASDemo {
/**AtomicStampedReference 注意,如果泛型是一个包装类,注意对象的引用问题
* 正常在业务操作,这里面<Integer>比较的都是一个个对象
*/
//AtomicStampedReference<>(初始值,初始时间戳(版本号))
static AtomicStampedReference<Integer> atomicStampedReference =
new AtomicStampedReference<>(1, 1);
// CAS compareAndSet : 比较并交换!
public static void main(String[] args) {
new Thread(() -> {
int stamp = atomicStampedReference.getStamp(); // 获得版本号
System.out.println("a1=>" + stamp);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 修改操作时,版本号更新 + 1
atomicStampedReference.compareAndSet(1, 2,
atomicStampedReference.getStamp(),
atomicStampedReference.getStamp() + 1);
System.out.println("a2=>" + atomicStampedReference.getStamp());
// 重新把值改回去, 版本号更新 + 1
System.out.println(atomicStampedReference.compareAndSet(2, 1,
atomicStampedReference.getStamp(),
atomicStampedReference.getStamp() + 1));
System.out.println("a3=>" + atomicStampedReference.getStamp());
}, "a").start();
// 乐观锁的原理相同!
new Thread(() -> {
int stamp = atomicStampedReference.getStamp(); // 获得版本号
System.out.println("b1=>" + stamp);
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(atomicStampedReference.compareAndSet(1, 3,
stamp, stamp + 1));
System.out.println("b2=>" + atomicStampedReference.getStamp());
}, "b").start();
}
}
默认都是非公平锁
1.公平锁:非常公平,不能插队,必须先来后到
2.非公平锁:非常不公平,允许插队,可以改变顺序
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
1)Synchonized 锁
public class Demo01 {
public static void main(String[] args) {
Phone phone = new Phone();
new Thread(()->{
phone.sms();
},"A").start();
new Thread(()->{
phone.sms();
},"B").start();
}
}
class Phone{
public synchronized void sms(){
System.out.println(Thread.currentThread().getName()+" sms");
call();//这里也有一把锁 (相当于包含关系)
}
public synchronized void call(){
System.out.println(Thread.currentThread().getName()+"call");
}
}
输出:
Asms
Acall
Bsms
Bcall
2)Lock锁
//lock
public class Demo02 {
public static void main(String[] args) {
Phone2 phone = new Phone2();
new Thread(()->{
phone.sms();
},"A").start();
new Thread(()->{
phone.sms();
},"B").start();
}
}
class Phone2{
Lock lock=new ReentrantLock();
public void sms(){
lock.lock(); //细节:这个是两把锁,两个钥匙
//lock.lock(); lock锁必须配对,否则就会死锁在里面
try {
System.out.println(Thread.currentThread().getName()+"=> sms");
call();//这里也有一把锁
} catch (Exception e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}
public void call(){
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + "=> call");
}catch (Exception e){
e.printStackTrace();
}
finally {
lock.unlock();
}
}
T2线程在没有获取锁的时候会自旋等待
当T1释放锁之后,T2才能获取锁继续执行
死锁产生的四个必要条件
互斥条件: 资源是独占的且排他使用,进程互斥使用资源,即任意时刻一个资源只能给一个进程使用,其他进程若申请一个资源,而该资源被另一进程占有时,则申请者等待直到资源被占有者释放。
不可剥夺条件: 进程所获得的资源在未使用完毕之前,不被其他进程强行剥夺,而只能由获得该资源的进程资源释放。
请求和保持条件: 进程每次申请它所需要的一部分资源,在申请新的资源的同时,继续占用已分配到的资源。
循环等待条件: 在发生死锁时必然存在一个进程等待队列{P1,P2,…,Pn},其中P1等待P2占有的资源,P2等待P3占有的资源,…,Pn等待P1占有的资源,形成一个进程等待环路
public class DeadLock {
public static void main(String[] args) {
String lockA= "lockA";
String lockB= "lockB";
new Thread(new MyThread(lockA,lockB),"t1").start();
new Thread(new MyThread(lockB,lockA),"t2").start();
}
}
class MyThread implements Runnable{
private String lockA;
private String lockB;
public MyThread(String lockA, String lockB) {
this.lockA = lockA;
this.lockB = lockB;
}
@Override
public void run() {
synchronized (lockA){
System.out.println(Thread.currentThread().getName()+" lock"+lockA+"===>get"+lockB);
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (lockB){
System.out.println(Thread.currentThread().getName()+" lock"+lockB+"===>get"+lockA);
}
}
}
}
查看死锁:
1、使用jps定位进程号 命令:jps-l
2、使用jstack 进程号 找到死锁信息
JUC
常用代码:
Runtime.getRuntime().availableProcessors() 获取cpu核数
DOS命令
进入编译文件 进行javap -c xxx.class 获取字节码文件