您的当前位置:首页正文

深入理解Java 线程并发编排工具: 概述和应用场景

2024-11-30 来源:个人技术集锦

前言

在 Java 的 java.util.concurrent (JUC) 包中,提供了四种核心并发工具类:CountDownLatchCyclicBarrierSemaphoreCondition。它们在多线程编程中用于协调线程的执行顺序和资源访问,确保在复杂的并发场景下各任务按照预期顺序和条件完成。通过合理使用这些工具,可以大幅提升程序的可靠性和执行效率。本文将逐一介绍这四种工具的特点和使用场景,帮助大家更好地掌握多线程编程中的关键协作机制。

概述

1. CountDownLatch

等待一个线程或者一组线程全部执行完成后再执行后续的逻辑, 常用于一组任务的并发控制,无法重用。

  • 内部维护一个计数器字段count,使用时需要设置初始数量,表示多少个线程同时执行完成CountDownLatch countDownLatch =new CountDownLatch(3)
  • 常用API
    • countDownLatch.countDown() :每次调用,计数器count减1,直到count为0.
    • countDownLatch.await() :等待阻塞,当count为0时放行。
    • countDownLatch.await(long timeout, TimeUnit unit) :和await()一样,只是可以设置一个超时时间,当等待时间大于超时时间,不管count是否为0,都会跳出阻塞执行后面逻辑
    • countDownLatch.getCount() :获取count计数器数值

2. CyclicBarrier

允许一组线程相互等待,直到它们到达共同的屏障点,再同时继续执行,适用于多线程周期性汇合的场景。不同于 CountDownLatchCyclicBarrier 可重用。

  • 同样内部也维护了一个count计数器,使用时需要设置初始数量,表示多少个线程同时执行操作CyclicBarrier barrier=new CyclicBarrier(3)
  • 携带回调方法:就是到达屏障点(count =0)后需要额外进行的任务
CyclicBarrier barrier = new CyclicBarrier(3, new Runnable() {
    @Override
    public void run() {
        System.out.println("合照打卡");
    }
});
  • 常用API

    • barrier.await() :CyclicBarrier 中的 await 与 CountDownLatch 的并不一致,CyclicBarrier 没有类似于 countDown 的方法,CyclicBarrier 的 await 方法你可以近似认为它是 CountDownLatch 中 await 和 countDown 的组合。当调用 CyclicBarrier 的 await 方法后,它会阻塞,且将计数器 -1,如果计数器变为 0 后,则跳出等待。不需要显式地减少计数器。
    • barrier. await(long timeout, TimeUnit unit) :该方法在阻塞一段指定的时间后,如果等待的线程未能在超时时间内到达同步点,将抛出 TimeoutException 异常。
    • barrier.getParties() :获取当前count的数值。
    • barrier.reset() :复原count为初始值。
    • barrier.isBroken() :当CyclicBarrier为不可用状态时,返回true。

注:若其中一个线程在等待过程中抛出了 TimeoutException 异常,这将引起其他所有线程在调用 await 时抛出BrokenBarrierException异常。此时,CyclicBarrier 进入不可用状态,必须调用 reset 方法对其进行重置,方可继续使用。这种机制确保在超时或异常情况下,程序能够及时恢复到正常的同步状态。

3. Semaphore(信号量)

是一个用于控制并发访问资源的同步工具,用于控制多个线程对共享资源的并发访问量,通过许可机制允许一定数量的线程同时访问资源,适用于限流、资源池等场景。

  • 内部同样维护了一个计数器字段permits,在semaphore中的定义为令牌,允许指定的线程数量获取到令牌并往下继续执行。成功调用acquire()方法后,permits 减1,直到减为0。Semaphore semaphore =new Semaphore(3)

  • 主要API

    • semaphore.acquire() :获取一个令牌,如果获取到令牌则继续向下执行,没获取到则线程阻塞直到获得令牌为止。
    • semaphore.tryAcquire(long timeout, TimeUnit unit) :获取一个令牌并设置超时时间,在超时时间内获取到令牌返回true,等待时间大于超时时间为获得令牌返回false。
    • semaphore.release() :释放一个令牌,供其他线程使用。
    • semaphore. availablePermits() :返回当前可用的令牌数量。

4. Condition

ReentrantLock 搭配使用,提供比 wait/notify 更灵活的线程等待和通知机制,可实现精准的条件等待和唤醒,常用于复杂线程协作的场景。

private final static ReentrantLock LOCK = new ReentrantLock();
private final static Condition condition = LOCK.newCondition();
  • 主要API

    • condition.await() :线程将锁(就是当前线程通过LOCK.lock()获取的全局锁)交出去(释放全局锁供其他线程获取)并进入阻塞,等待其他线程唤醒。并且当前阻塞可以被中断,并抛出异常 InterruptedException
    • condition.await( int timeout, TimeUnit unit ) :基本与await()方法一样,只是加了一个超时时间,如果等待时间大于超时时间还未被唤醒,跳出阻塞执行后面的逻辑。
    • condition.signal() :随机唤醒一个正在等待的线程。
    • condition.signalAll() :唤醒所有等待中的线程。

注:上面api的调用必须在LOCK.lock()加锁块中使用,否则会抛出IllegalMonitorStateException异常。

案例

源码获取:

子项目:juc-two

CountDownLatch-马拉松场景

对于CountDownLatch 模拟一个参加马拉松比赛的场景。

在这个马拉松比赛的场景中,假设小明、小刚和小红参加成都马拉松。由于他们的配速不同,所以每个人跑完全程的完成时间也不同,但他们约好必须等到所有人到达终点后一起坐车回家 。

他们跑完全程分别用时:

  • 小明:5s
  • 小刚:6s
  • 小红:7s

创建参数者跑步线程类Running.class

@Slf4j
public class Running implements Runnable {
    //名称
    private String name;
    //用时
    private int time;
    public Running(String name, int time) {
        this.name = name;
        this.time = time;
    }
    @Override
    public void run() {
        log.info(name + "开始跑步了----");
        try {
            Thread.sleep(time);
            log.info(name + "达到终点了----用时:{}秒", time/1000);
            //到达终点后计数器减1
            CountDownLatchMain.countDownLatch.countDown();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

创建执行类CountDownLatchMain.class

@Slf4j
public class CountDownLatchMain {
    //自定义线程池
    private static ThreadPoolExecutor executor = new ThreadPoolExecutor(
        Runtime.getRuntime().availableProcessors(),
        Runtime.getRuntime().availableProcessors(),
        5000,
        TimeUnit.MICROSECONDS,
        new LinkedBlockingDeque());    
    
    //初始化线程编排对象
    public static CountDownLatch countDownLatch =new CountDownLatch(3);
    
    public static void main(String[] args) {
        log.info("马拉松正式开始---");
        try {
            executor.execute(new Running("小明", 5000));
            executor.execute(new Running("小刚", 6000));
            executor.execute(new Running("小红", 7000));
            log.info("等待所有人到达终点---");
            //阻塞等待全部线程完成
            countDownLatch.await();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        log.info("全部到达终点---");
        log.info("一起坐车回家了");
    }
}

执行main() 方法

CyclicBarrier-马拉松场景

CyclicBarrier同样使用马拉松场景,可以与上面 CountDownLatch进行对照学习他们的相似与不同之处。

为了体现CyclicBarrier的特性,对内容进行了调整

小明、小刚和小红一起参加成都马拉松(设定全程为30公里)。他们配速不同,因此每人跑完10公里的时间各异。他们约定每跑完10公里都要等彼此(可重用特性)到齐后再一起合照打卡(回调函数使用),随后继续比赛,直到抵达终点拍照打卡后各自开车回家。

他们每跑完10公里分别用时:

  • 小明:1s
  • 小刚:2s
  • 小红:3s

创建参数者跑步线程类Running.class

@Slf4j
public class Running implements Runnable {
    //名称
    private String name;
    //用时
    private int time;
    public Running(String name, int time) {
        this.name = name;
        this.time = time;
    }
    @Override
    public void run() {
        log.info(name + "开始跑步了----");
        try {
            Thread.sleep(time);
            log.info(name + "达到10公里了----用时:{}秒", time / 1000);
            //到达终点后计数器减1
            CyclicBarrierMain.barrier.await();
            Thread.sleep(time);
            log.info(name + "达到20公里了----用时:{}秒", time*2 / 1000);
            CyclicBarrierMain.barrier.await();
            Thread.sleep(time);
            log.info(name + "达到终点30公里----用时:{}秒", time*3 / 1000);
            CyclicBarrierMain.barrier.await();
            log.info(name + "独自开车回家了---");
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

创建执行类CyclicBarrierMain.class

@Slf4j
public class CyclicBarrierMain {
    //自定义线程池
    private static ThreadPoolExecutor executor = new ThreadPoolExecutor(
        Runtime.getRuntime().availableProcessors(),
        Runtime.getRuntime().availableProcessors(),
        5000,
        TimeUnit.MICROSECONDS,
        new LinkedBlockingDeque());
    //初始化线程编排对象
    public static CyclicBarrier barrier = new CyclicBarrier(3, new Runnable() {
        @Override
        public void run() {
            System.out.println("合照打卡~~~");
        }});

    public static void main(String[] args) {
        log.info("马拉松正式开始---");
        try {
            executor.execute(new Running("小明", 1000));
            executor.execute(new Running("小刚", 2000));
            executor.execute(new Running("小红", 3000));
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

执行main() 方法

Semaphore-公交车占座场景

模拟一个坐公交车的场景,当车上没有空位时,站立的人需要等待有人下车了才能获得座位。

小明、小刚、小红、小绿在同一个公交站等同一路公交车。假设公交车只有3个座位,并且前一个站到后面一个站行驶所需要的平均时间为1s,写出他们4人在上车后占座的情况。

4人上车后需要经历的站点数量分别为:

  • 小明:2
  • 小刚:4
  • 小红:4
  • 小绿:4

创建乘客上车行为线程类 Seat.class

@Slf4j
public class Seat implements Runnable {
    //用户名
    private String name;
    //需要经历的站点数量
    private Integer num;
    public Seat(String name, Integer num) {
        this.name = name;
        this.num = num;
    }
    @Override
    public void run() {
        long startTime = System.currentTimeMillis();
        log.info(name + "上车了----");
        try {
            //获取令牌数量
            int tokenCount = SemaphoreMain.semaphore.availablePermits();
            if (tokenCount == 0) {
                log.info(name + "--等待--空缺座位----");
            }
            //获取令牌并设置超时时间
            boolean b = SemaphoreMain.semaphore.tryAcquire(SemaphoreMain.TIME * num, TimeUnit.MILLISECONDS);
            long awaitTime = System.currentTimeMillis();
            if (b) {
                log.info(name + "抢占到了座位----");
                if (tokenCount > 0) {
                    Thread.sleep(SemaphoreMain.TIME * num);
                } else {
                    //等待时间
                    long await = awaitTime - startTime;
                    Thread.sleep(SemaphoreMain.TIME * num - await);
                }
            } else {
                log.info(name + "整个行程没有抢到座位----");
            }
            long endTime = System.currentTimeMillis();
            log.info(name + "到站下车----等待用时:{}秒---占座时长:{}秒", (awaitTime - startTime) / 1000, Math.round((endTime - awaitTime) / 1000.0));
            //释放令牌
            SemaphoreMain.semaphore.release();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

创建公交车运行类 SemaphoreMain.class

@Slf4j
public class SemaphoreMain {
    //自定义线程池
    private static ThreadPoolExecutor executor = new ThreadPoolExecutor(
        4,
        4,
        5000,
        TimeUnit.MICROSECONDS,
        new LinkedBlockingDeque());
    
    //初始化信号量并设置令牌数量
    public static Semaphore semaphore =new Semaphore(3);
    
    //每一个站公交行驶时间 ms
    public static int TIME = 1000;
    
    public static void main(String[] args) {
        log.info("公交车到达站台----");
        executor.execute(new Seat("小明",2));
        executor.execute(new Seat("小刚", 4));
        executor.execute(new Seat("小红",4));
        executor.execute(new Seat("小绿", 4));
    }
}

运行 main() 方法

Condition-线程等待唤醒场景

简单使用:多个线程进入等待状态,最后的一个线程唤醒所有等待线程

创建等待线程类AwaitThread.class

@Slf4j
public class AwaitThread implements Runnable{
    private String name;
    public AwaitThread(String name) {
        this.name = name;
    }
    @Override
    public void run() {
        ConditionMain.LOCK.lock();
        try {
            log.info("线程{}开始等待-----", name);
            ConditionMain.condition.await();
            log.info("线程{}被唤醒-----", name);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            ConditionMain.LOCK.unlock();
        }
    }
}

创建唤醒线程类SignalThread.class

@Slf4j
public class SignalThread implements Runnable{
    private String name;
    public SignalThread(String name) {
        this.name = name;
    }
    @Override
    public void run() {
        ConditionMain.LOCK.lock();
        try {
            log.info("线程{}开始唤醒其他线程-----", name);
            ConditionMain.condition.signalAll();
        } catch (Exception e) {
            throw new RuntimeException(e);
        } finally {
            ConditionMain.LOCK.unlock();
        }
    }
}

创建主线程执行类ConditionMain.class

@Slf4j
public class ConditionMain {
    //自定义线程池
    private static ThreadPoolExecutor executor = new ThreadPoolExecutor(
        4,
        4,
        5000,
        TimeUnit.MICROSECONDS,
        new LinkedBlockingDeque());

    //定义ReentrantLock对象
    public final static ReentrantLock LOCK = new ReentrantLock();
    
    //获取并定义Condition对象
    public final static Condition condition = LOCK.newCondition();
    
    public static void main(String[] args) {
        executor.execute(new AwaitThread("线程1"));
        executor.execute(new AwaitThread("线程2"));
        executor.execute(new AwaitThread("线程3"));
        executor.execute(new SignalThread("线程4"));
    }
}

执行main() 方法

显示全文