您的当前位置:首页正文

CompletableFuture 超时功能有大坑!使用不当直接生产事故!

2025-02-02 来源:个人技术集锦

CompletableFuture 超时功能有大坑!使用不当直接生产事故!

本文未经允许禁止转载!

上一篇文章《》中我们讨论了 CompletableFuture 超时功能的具体实现,从整体实现来说,JDK21前的版本有着内存泄露的bug,不过很少对实际生产有影响,因为任务的编排涉及的对象并不多,少量内存泄露最终会被回收掉。从单一功能内聚的角度来说,超时功能的实现是没有问题;然而由于并发编程的复杂性,可能会出现 Delayer 线程延迟执行的情况。本文将详细复现与讨论 CompletableFuture 超时功能的大坑,同时提供一些最佳实践指导。

2024年9月8日更新:负责人 更新了代码示例,。

1. 问题复现

感谢 负责人 提供:

public class CfDelayDysfunctionDemo {
  
  public static void main(String[] args) {
    dysfunctionDemo();
    System.out.println();
    cffuOrTimeoutFixDysfunctionDemo();
  }
  
  private static void dysfunctionDemo() {
    logWithTimeAndThread("dysfunctionDemo begin");
    final long tick = System.currentTimeMillis();
    final List<CompletableFuture<?>> sequentCfs = new ArrayList<>();

    CompletableFuture<Integer> incomplete = new CompletableFuture<>();

    CompletableFuture<?> cf = incomplete.orTimeout(100, TimeUnit.MILLISECONDS)
      .handle((v, ex) -> {
        logWithTimeAndThread("[1] timout");
        sleep(1000);
        return null;
      });
    sequentCfs.add(cf);

    cf = incomplete.orTimeout(100, TimeUnit.MILLISECONDS)
      .handle((v, ex) -> {
        logWithTimeAndThread("[2] timout");
        sleep(1000);
        return null;
      });
    sequentCfs.add(cf);

    cf = incomplete.orTimeout(100, TimeUnit.MILLISECONDS)
      .handle((v, ex) -> {
        logWithTimeAndThread("[3] timout");
        sleep(1000);
        return null;
      });
    sequentCfs.add(cf);

    CompletableFuture.allOf(sequentCfs.toArray(CompletableFuture[]::new)).join();
    logWithTimeAndThread("dysfunctionDemo end in " + (System.currentTimeMillis() - tick) + "ms");
  }

  private static void cffuOrTimeoutFixDysfunctionDemo() {
    logWithTimeAndThread("cffuOrTimeoutFixDysfunctionDemo begin");
    final long tick = System.currentTimeMillis();
    final List<CompletableFuture<?>> sequentCfs = new ArrayList<>();

    CompletableFuture<Integer> incomplete = new CompletableFuture<>();

    CompletableFuture<?> cf = CompletableFutureUtils.cffuOrTimeout(incomplete, 100, TimeUnit.MILLISECONDS)
      .handle((v, ex) -> {
        logWithTimeAndThread("[1] timout");
        sleep(1000);
        return null;
      });
    sequentCfs.add(cf);

    cf = CompletableFutureUtils.cffuOrTimeout(incomplete, 100, TimeUnit.MILLISECONDS)
      .handle((v, ex) -> {
        logWithTimeAndThread("[2] timout");
        sleep(1000);
        return null;
      });
    sequentCfs.add(cf);

    cf = CompletableFutureUtils.cffuOrTimeout(incomplete, 100, TimeUnit.MILLISECONDS)
      .handle((v, ex) -> {
        logWithTimeAndThread("[3] timout");
        sleep(1000);
        return null;
      });
    sequentCfs.add(cf);

    CompletableFuture.allOf(sequentCfs.toArray(CompletableFuture[]::new)).join();
    logWithTimeAndThread("cffuOrTimeoutFixDysfunctionDemo end in " + (System.currentTimeMillis() - tick) + "ms");
  }

  private static void logWithTimeAndThread(String msg) {
    System.out.printf("%tF %<tT.%<tL [%s] %s%n",
                      System.currentTimeMillis(), Thread.currentThread().getName(), msg);
  }
}

执行结果如下:

代码思路是这样的:有3个运行1秒的任务,在超时之后运行,不切线程池(都在 Delayer 线程运行),运行了3秒,不能在设置100ms的超时后运行,因为单线程排队了。handle 方法传入的回调函数在 Delayer 线程中执行了。

示例代码中解决超时线程延迟执行的方法是使用CFFU提供的安全 timeout 方法,本文后面会分析相关源码。

2. 问题分析

为什么handle方法里的回调会在 CompletableFutureDelayScheduler 中执行?

// 这里的代码逐步深入到调用栈内部
public <U> CompletableFuture<U> handle(
    BiFunction<? super T, Throwable, ? extends U> fn) {
    return uniHandleStage(null, fn);
}

private <V> CompletableFuture<V> uniHandleStage(
    Executor e, BiFunction<? super T, Throwable, ? extends V> f) {
    if (f == null) throw new NullPointerException();
    CompletableFuture<V> d = newIncompleteFuture();
    Object r;
    if ((r = result) == null)
       	// 加入回调栈中后续再执行
        unipush(new UniHandle<T,V>(e, d, this, f));
    else if (e == null)
      	// 有结果,直接执行
        d.uniHandle(r, f, null);
    else {
        try {
            e.execute(new UniHandle<T,V>(null, d, this, f));
        } catch (Throwable ex) {
            d.result = encodeThrowable(ex);
        }
    }
    return d;
}

final <S> boolean uniHandle(Object r,
                                BiFunction<? super S, Throwable, ? extends T> f,
                                UniHandle<S,T> c) {
        S s; Throwable x;
        if (result == null) {
            try {
              	// 此次调用中 c 为空,无需关注UniHandle,甚至不需要知道UniHandle的具体职责
                if (c != null && !c.claim())
                    return false;
                if (r instanceof AltResult) {
                    x = ((AltResult)r).ex;
                    s = null;
                } else {
                    x = null;
                    @SuppressWarnings("unchecked") S ss = (S) r;
                    s = ss;
                }
              	// 执行回调
                completeValue(f.apply(s, x));
            } catch (Throwable ex) {
                completeThrowable(ex);
            }
        }
        return true;
    }

我们把出现问题的原因简单总结一下:

CompletionStage 中不带 async 的方法可能会在不同的线程中执行。一般情况下,如果CF的结果已经计算出来,后续的回调在调用线程中执行,如果结果没有计算出来,后续的回调在上一步计算的线程中执行。

以下是一个简化的代码示例:

@Slf4j
public class TimeoutBugDemo {
    public static void main(String[] args) {
        new CompletableFuture<Integer>()
            .orTimeout(1, TimeUnit.SECONDS)
            .handle((v, ex) -> {
                log.info("v: {}", v, ex);
                return -1;
            }).join();
    }
}

handle 方法传入的回调方法会在delayer线程中执行,从执行日志看也确实如此:

Task :TimeoutBugDemo.main()
11:58:53.465 [CompletableFutureDelayScheduler] INFO com.example.demo.cftimeout.TimeoutBugDemo -- v: null
java.util.concurrent.TimeoutException: null
at java.base/java.util.concurrent.CompletableFuture$Timeout.run(CompletableFuture.java:2920)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at java.base/java.lang.Thread.run(Thread.java:1583)

3. CFFU 是如何解决线程传导的?

// CFFU 代码实现
public static <C extends CompletableFuture<?>> C cffuOrTimeout(
        C cfThis, Executor executorWhenTimeout, long timeout, TimeUnit unit) {
    requireNonNull(cfThis, "cfThis is null");
    requireNonNull(executorWhenTimeout, "executorWhenTimeout is null");
    requireNonNull(unit, "unit is null");

    return hopExecutorIfAtCfDelayerThread(orTimeout(cfThis, timeout, unit), executorWhenTimeout);
}

// 核心实现代码
private static <C extends CompletableFuture<?>> C hopExecutorIfAtCfDelayerThread(C cf, Executor executor) {
    CompletableFuture<Object> ret = newIncompleteFuture(cf);

    // use `cf.handle` method(instead of `cf.whenComplete`) and return null in order to
    // prevent reporting the handled exception argument of this `action` at subsequent `exceptionally`
    cf.handle((v, ex) -> {
        if (!atCfDelayerThread()) completeCf(ret, v, ex);
      	// 使用 executor 后,CF的后续回调操作就不会在Dalayer 线程中执行了
        else executor.execute(() -> completeCf(ret, v, ex));
        return null;
    }).exceptionally(ex -> reportUncaughtException("handle of executor hop", ex));

    return (C) ret;
}

private static void completeCf(CompletableFuture<Object> cf, Object value, @Nullable Throwable ex) {
    try {
      	// 写入到新CF中
        if (ex == null) cf.complete(value);
        else cf.completeExceptionally(ex);
    } catch (Throwable t) {
        if (ex != null) t.addSuppressed(ex);
        reportUncaughtException("completeCf", t);
        throw t; // rethrow exception, report to caller
    }
}

基本思路将结果写入到新的 CompletableFuture 中,为了避免后续回调使用 Delayer 线程,改用新增的线程,保证线程传导的安全性。

4. 最佳实践的启示

  1. 使用优秀的 CompletableFuture 类库: CFFU,避免编程出错,减轻开发负担。
  2. 可参考我在一文中所讲的,如果使用CompletableFuture,应该尽量显示使用async*方法,同时显式传入执行器executor参数。
  3. 改为使用 Guava 中的 ListenableFuture。
显示全文