本文共 9296 字,大约阅读时间需要 30 分钟。
条件用于在线程之间传递信号。通过条件和重入锁,可以实现更灵活的线程通信。例如,可以在特定条件下唤醒等待的线程。
public class ConditionTest { static class NumberWrapper { public int value = 1; } public static void main(String[] args) { final Lock lock = new ReentrantLock(); final Condition reachThreeCondition = lock.newCondition(); final Condition reachSixCondition = lock.newCondition(); final NumberWrapper num = new NumberWrapper(); Thread threadA = new Thread(new Runnable() { @Override public void run() { lock.lock(); System.out.println("ThreadA获得lock"); try { System.out.println("threadA start write"); while (num.value <= 3) { System.out.println(num.value); num.value++; } reachThreeCondition.signal(); } finally { lock.unlock(); System.out.println("ThreadA释放lock"); } lock.lock(); try { System.out.println("ThreadA获得lock"); reachSixCondition.await(); System.out.println("threadA start write"); while (num.value <= 9) { System.out.println(num.value); num.value++; } } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); System.out.println("ThreadA释放lock"); } } }); Thread threadB = new Thread(new Runnable() { @Override public void run() { try { lock.lock(); System.out.println("ThreadB获得lock"); Thread.sleep(5000); while (num.value <= 3) { reachThreeCondition.await(); } } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); System.out.println("ThreadB释放lock"); } try { lock.lock(); System.out.println("ThreadB获得lock"); System.out.println("threadB start write"); while (num.value <= 6) { System.out.println(num.value); num.value++; } reachSixCondition.signal(); } finally { lock.unlock(); System.out.println("ThreadB释放lock"); } } }); threadB.start(); threadA.start(); }} 条件与重入锁配合使用,提供了更灵活的线程通信方式。通过条件,可以在特定条件下唤醒等待的线程,实现线程之间的协调。
CountDownLatch用于一个线程等待其他多个线程完成某项任务。
public class CountDownLatchTest { public static void main(String[] args) { final CountDownLatch c = new CountDownLatch(3); Thread t1 = new Thread(new Runnable() { @Override public void run() { try { System.out.println("开始等"); c.await(); System.out.println("完事"); } catch (InterruptedException e) { e.printStackTrace(); } } }); Thread t2 = new Thread(new Runnable() { @Override public void run() { for (int i = 3; i > 0; i--) { c.countDown(); System.out.println(i); } } }); t1.start(); t2.start(); }} 直接创建,new CountDownLatch(int num);
await(): 阻塞,直到 countDown 方法被执行了 num 次。countDown(): 减 1。CountDownLatch 适用于一个线程等待其他线程完成某件事情。通过 countDown 方法,可以控制线程的等待状态。
CyclicBarrier 是 CountDownLatch 的扩展版本,支持多个线程互相等待。
public class MainMission { private CyclicBarrier barrier; private final static int threadCounts = 5; public void runMission() { ExecutorService exec = Executors.newFixedThreadPool(threadCounts); barrier = new CyclicBarrier(threadCounts + 1); for (int i = 0; i < 5; i++) { exec.execute(new Mission(barrier)); } try { barrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } try { Thread.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("所有任务都执行完了"); exec.shutdown(); } public static void main(String[] args) { MainMission m = new MainMission(); m.runMission(); }}class Mission implements Runnable { private CyclicBarrier barrier; public Mission(CyclicBarrier barrier) { this.barrier = barrier; } @Override public void run() { System.out.println(Thread.currentThread().getName() + "开始执行任务"); try { int sleepSecond = new Random().nextInt(10) * 1000; System.out.println(Thread.currentThread().getName() + "要执行" + sleepSecond + "秒任务"); Thread.sleep(sleepSecond); } catch (InterruptedException e) { e.printStackTrace(); } try { barrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } System.out.println(System.currentTimeMillis()); System.out.println(Thread.currentThread().getName() + "执行完毕"); }} 直接创建,new CyclicBarrier(int num);
await(): 阻塞,直到阻塞的线程数量达到 num 个。CyclicBarrier 强调多个线程互相等待。只有所有线程完成后,才能继续执行后续任务。
Semaphore 是一种信号量,用于控制进入次数。
public class MySemaphore implements Runnable { Semaphore position; private int id; public MySemaphore(int i, Semaphore s) { this.id = i; this.position = s; } @Override public void run() { try { if (position.availablePermits() > 0) { System.out.println("顾客[" + this.id + "]进入厕所,有空位"); } else { System.out.println("顾客[" + this.id + "]进入厕所,没空位,排队"); } position.acquire(); System.out.println("######### 顾客[" + this.id + "]获得坑位"); Thread.sleep((int) (Math.random() * 100000)); System.out.println("@@@@@@@@@ 顾客[" + this.id + "]使用完毕"); position.release(); } catch (Exception e) { e.printStackTrace(); } } public static void main(String args[]) { ExecutorService list = Executors.newCachedThreadPool(); Semaphore position = new Semaphore(2); for (int i = 0; i < 10; i++) { list.submit(new MySemaphore(i + 1, position)); } list.shutdown(); try { position.acquireUninterruptibly(2); System.out.println("使用完毕,需要清扫了"); position.release(2); } catch (InterruptedException e) { e.printStackTrace(); } }} 直接创建,new Semaphore(int num);
availablePermits(): 查看可用许可。acquire(): 尝试获取一个许可。release(): 释放一个许可。acquireUninterruptibly(int num): 尝试获取 num 个许可,不会被中断。Semaphore 用于控制进入次数,避免资源冲突。它在资源有限的场景中非常有用。
ReentrantLock 是一种重入锁,支持多个线程同时持有锁。
new ReentrantLock();: 非公平锁new ReentrantLock(true);: 公平锁ReentrantLock 可以在同一线程中多次获得锁,适用于需要多次锁的场景。
ReentrantReadWriteLock 是一种可重入的读写锁。
readLock().lock();: 读锁writeLock().lock();: 写锁readLock().unlock();: 解读锁writeLock().unlock();: 解写锁ReentrantReadWriteLock 提供了更细粒度的锁控制,适用于多读多写场景。
Callable 接口可以返回执行结果,与 Runnable 对比:
public class CallableTest implements Callable{ @Override public Integer call() { System.out.println("开始计算"); try { Thread.sleep(1000); return 123; } catch (InterruptedException e) { e.printStackTrace(); } }}public class Main { public static void main(String[] args) { FutureTask futureTask = new FutureTask<>(new CallableTest()); Thread thread = new Thread(futureTask); thread.start(); try { System.out.println(futureTask.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } }}
Callable 接口提供了更高级的功能,适用于需要返回结果的场景。
// 有数量限制的线程池ExecutorService service = Executors.newFixedThreadPool(4);// 没有数量限制的线程池ExecutorService service = Executors.newCachedThreadPool();// 单线程池ExecutorService service = Executors.newSingleThreadExecutor();
线程池提供了高效的线程管理方式,可以根据需求配置线程数量。
通过以上内容,可以更好地理解 Java 中的并发控制机制。
转载地址:http://qecuz.baihongyu.com/