admin管理员组

文章数量:1032076

Guava的Futures与ListenableFuture

1. 简介

Guava为我们提供了ListenableFuture,在默认的Java Future上具有丰富的API让我们看看如何利用这一点来发挥我们的优势。

2.Future, ListenableFuture and Futures

让我们简要看看这些不同的类是什么以及它们之间的关系。

2.1.Future

Java 5开始,我们可以使用java.util.concurrent.future来表示异步任务。

Future允许我们访问已经完成或将来可能完成的任务的结果,以及取消它们的支持。

2.2.ListenableFuture

使用java.util.concurrent.Future时缺少的一个功能是能够添加侦听器以在完成时运行,这是大多数流行的异步框架提供的常见功能。

Guava通过允许我们将listeners附加到其com.googlemon.util.concurrent.ListenableFuture来解决这个问题。

2.3.Futures

Guava为我们提供了便利类com.googlemon.util.concurrent.Futures,以便更轻松地使用他们的ListenableFuture。

该类提供了与ListenableFuture交互的各种方式,其中包括支持添加成功/失败回调,并允许我们使用聚合或转换协调多个Future。

3. 简单用法

现在让我们看看如何以最简单的方式使用ListenableFuture;创建和添加回调。

3.1. 创建ListenableFuture

我们获得ListenableFuture的最简单方法是将任务提交给ListeningExecutorService(很像我们使用普通的ExecutorService来获取正常的Future):

代码语言:javascript代码运行次数:0运行复制
ExecutorService execService = Executors.newSingleThreadExecutor();
ListeningExecutorService lExecService = MoreExecutors.listeningDecorator(execService);

ListenableFuture<Integer> asyncTask = lExecService.submit(() -> {
    TimeUnit.MILLISECONDS.sleep(500); // long running task
    return 5;
});Copy

请注意我们如何使用MoreExecutors类将我们的 ExecutorService 装饰为ListeningExecutorService。我们可以参考Java 中的线程池简介-Java快速进阶教程来了解有关MoreExecutors 的更多信息。

如果我们已经有一个返回 Future 的 API,并且我们需要将其转换为 ListenableFuture,这可以通过初始化其具体实现ListenableFutureTask 轻松完成

代码语言:javascript代码运行次数:0运行复制
// old api
public FutureTask<String> fetchConfigTask(String configKey) {
    return new FutureTask<>(() -> {
        TimeUnit.MILLISECONDS.sleep(500);
        return String.format("%s.%d", configKey, new Random().nextInt(Integer.MAX_VALUE));
    });
}

// new api
public ListenableFutureTask<String> fetchConfigListenableTask(String configKey) {
    return ListenableFutureTask.create(() -> {
        TimeUnit.MILLISECONDS.sleep(500);
        return String.format("%s.%d", configKey, new Random().nextInt(Integer.MAX_VALUE));
    });
}Copy

我们需要注意,除非我们将这些任务提交给执行者,否则这些任务不会运行。直接与ListenableFutureTask交互并不常见,仅在极少数情况下进行(例如:实现我们自己的ExecutorService)。参考Guava的AbstractListeningExecutorService了解实际用法。

如果我们的异步任务无法使用ListeningExecutorService或提供的Futures实用程序方法,并且我们需要手动设置 future 值,我们也可以使用com.googlemon.util.concurrent.SettableFuture。对于更复杂的用法,我们还可以考虑com.googlemon.util.concurrent.AbstractFuture。

3.2. 添加监听器/回调

我们可以将侦听器添加到ListenableFuture的一种方法是向Futures.addCallback()注册一个回调,为我们提供在成功或失败时访问结果或异常的权限:

代码语言:javascript代码运行次数:0运行复制
Executor listeningExecutor = Executors.newSingleThreadExecutor();
ListenableFuture<Integer> asyncTask = new ListenableFutureService().succeedingTask()
Futures.addCallback(asyncTask, new FutureCallback<Integer>() {
    @Override
    public void onSuccess(Integer result) {
        // do on success
    }

    @Override
    public void onFailure(Throwable t) {
        // do on failure
    }
}, listeningExecutor);Copy

我们还可以通过将侦听器直接添加到ListenableFuture来添加侦听器。请注意,此侦听器将在将来成功完成或异常完成时运行。另外,请注意,我们无法访问异步任务的结果:

代码语言:javascript代码运行次数:0运行复制
Executor listeningExecutor = Executors.newSingleThreadExecutor();
int nextTask = 1;
Set<Integer> runningTasks = ConcurrentHashMap.newKeySet();
runningTasks.add(nextTask);
ListenableFuture<Integer> asyncTask = new ListenableFutureService().succeedingTask()
asyncTask.addListener(() -> runningTasks.remove(nextTask), listeningExecutor);Copy

4. 复杂用法

现在让我们看看如何在更复杂的场景中使用这些Future。

4.1. 扇入

我们有时可能需要调用多个异步任务并收集它们的结果,通常称为扇入操作。

Guava为我们提供了两种方法。但是,我们应该根据我们的要求谨慎选择正确的方法。假设我们需要协调以下异步任务:

代码语言:javascript代码运行次数:0运行复制
ListenableFuture<String> task1 = service.fetchConfig("config.0");
ListenableFuture<String> task2 = service.fetchConfig("config.1");
ListenableFuture<String> task3 = service.fetchConfig("config.2");Copy

扇入多个Future的一种方法是使用Futures.allAsList() 方法。这允许我们收集所有Future的结果,如果它们都成功,按照提供的Future的顺序。如果这些Future中的任何一个失败,那么整个结果就是一个失败的future:

代码语言:javascript代码运行次数:0运行复制
ListenableFuture<List<String>> configsTask = Futures.allAsList(task1, task2, task3);
Futures.addCallback(configsTask, new FutureCallback<List<String>>() {
    @Override
    public void onSuccess(@Nullable List<String> configResults) {
        // do on all futures success
    }

    @Override
    public void onFailure(Throwable t) {
        // handle on at least one failure
    }
}, someExecutor);Copy

如果我们需要收集所有异步任务的结果,无论它们是否失败,我们都可以使用Futures.successAsList()。这将返回一个列表,其结果将与传递到参数中的任务具有相同的顺序,并且失败的任务将为其在列表中各自的位置分配null

代码语言:javascript代码运行次数:0运行复制
ListenableFuture<List<String>> configsTask = Futures.successfulAsList(task1, task2, task3);
Futures.addCallback(configsTask, new FutureCallback<List<String>>() {
    @Override
    public void onSuccess(@Nullable List<String> configResults) {
        // handle results. If task2 failed, then configResults.get(1) == null
    }

    @Override
    public void onFailure(Throwable t) {
        // handle failure
    }
}, listeningExecutor);Copy

在上面的用法中,我们应该小心,如果Future任务通常在成功时返回null,它将与失败的任务(也将结果设置为null)无法区分。

4.2. 带合路器的扇入

如果我们需要协调返回不同结果的多个Future,上述解决方案可能还不够。在这种情况下,我们可以使用扇入操作的组合器变体来协调这种Future组合。

与简单的扇入操作类似,Guava为我们提供了两种变体;一种是当所有任务成功完成时成功的变体,另一种是即使某些任务失败,也可以成功使用Futures.whenAllSuccess()和Futures.whenAllComplete()方法。

让我们看看如何使用Futures.whenAllSuccess() 来组合来自多个Future的不同结果类型:

代码语言:javascript代码运行次数:0运行复制
ListenableFuture<Integer> cartIdTask = service.getCartId();
ListenableFuture<String> customerNameTask = service.getCustomerName();
ListenableFuture<List<String>> cartItemsTask = service.getCartItems();

ListenableFuture<CartInfo> cartInfoTask = Futures.whenAllSucceed(cartIdTask, customerNameTask, cartItemsTask)
    .call(() -> {
        int cartId = Futures.getDone(cartIdTask);
        String customerName = Futures.getDone(customerNameTask);
        List<String> cartItems = Futures.getDone(cartItemsTask);
        return new CartInfo(cartId, customerName, cartItems);
    }, someExecutor);

Futures.addCallback(cartInfoTask, new FutureCallback<CartInfo>() {
    @Override
    public void onSuccess(@Nullable CartInfo result) {
        //handle on all success and combination success
    }

    @Override
    public void onFailure(Throwable t) {
        //handle on either task fail or combination failed
    }
}, listeningExecService);Copy

如果我们需要允许某些任务失败,我们可以使用Futures.whenAllComplete()。虽然语义与上述基本相似,但我们应该意识到,当调用Futures.getDone() 时,失败的Future将抛出ExecutionException

4.3. transform

有时我们需要转换一个成功后的Future结果。Guava为我们提供了两种方法,可以使用Futures.transform()和Futures.lazyTransform()来实现。

让我们看看如何使用Futures.transform() 来转换Future的结果。只要转换计算不重,就可以使用此方法:

代码语言:javascript代码运行次数:0运行复制
ListenableFuture<List<String>> cartItemsTask = service.getCartItems();

Function<List<String>, Integer> itemCountFunc = cartItems -> {
    assertNotNull(cartItems);
    return cartItems.size();
};

ListenableFuture<Integer> itemCountTask = Futures.transform(cartItemsTask, itemCountFunc, listenExecService);Copy

我们还可以使用Futures.lazyTransform() 将转换函数应用于java.util.concurrent.Future。我们需要记住,这个选项不会返回一个ListenableFuture,而是一个普通的java.util.concurrent.Future,并且每次在结果的Future调用get()时,转换函数都适用。

4.4. 链接Future

我们可能会遇到我们的Future需要调用其他Future的情况。在这种情况下,Guava为我们提供了async()变体,以安全地链接这些Future以一个接一个地执行。

让我们看看如何使用Futures.submitAsync() 从提交的Callable内部调用Future:

代码语言:javascript代码运行次数:0运行复制
AsyncCallable<String> asyncConfigTask = () -> {
    ListenableFuture<String> configTask = service.fetchConfig("config.a");
    TimeUnit.MILLISECONDS.sleep(500); //some long running task
    return configTask;
};

ListenableFuture<String> configTask = Futures.submitAsync(asyncConfigTask, executor);Copy

如果我们想要真正的链式,其中一个Future的结果被输入到另一个Future的计算中,我们可以使用Futures.transformAsync()

代码语言:javascript代码运行次数:0运行复制
ListenableFuture<String> usernameTask = service.generateUsername("john");
AsyncFunction<String, String> passwordFunc = username -> {
    ListenableFuture<String> generatePasswordTask = service.generatePassword(username);
    TimeUnit.MILLISECONDS.sleep(500); // some long running task
    return generatePasswordTask;
};

ListenableFuture<String> passwordTask = Futures.transformAsync(usernameTask, passwordFunc, executor);Copy

Guava还为我们提供了Futures.scheduleAsync()和Futures.catchingAsync()分别提交计划任务和提供错误恢复的回退任务。虽然它们迎合了不同的场景,但我们不会讨论它们,因为它们与其他async() 调用相似。

5. 用法注意事项

现在让我们调查一下我们在处理Future时可能遇到的一些常见陷阱以及如何避免它们。

5.1. 工作执行者与监听执行者

在使用Guava Future时,了解工作执行者和监听执行者之间的区别非常重要。例如,假设我们有一个异步任务来获取配置:

代码语言:javascript代码运行次数:0运行复制
public ListenableFuture<String> fetchConfig(String configKey) {
    return lExecService.submit(() -> {
        TimeUnit.MILLISECONDS.sleep(500);
        return String.format("%s.%d", configKey, new Random().nextInt(Integer.MAX_VALUE));
    });
}Copy

假设我们想将Listener附加到上述Future:

代码语言:javascript代码运行次数:0运行复制
ListenableFuture<String> configsTask = service.fetchConfig("config.0");
Futures.addCallback(configsTask, someListener, listeningExecutor);Copy

请注意,这里的lExecService是运行异步任务的执行器,而listeningExecutor是调用侦听器的执行器。

如上所示,我们应该始终考虑将这两个执行器分开,以避免我们的侦听器和工作线程竞争相同的线程池资源的情况。共享同一个执行器可能会导致我们的繁重任务使侦听器执行量不足。或者一个写得不好的重量级Listener最终会阻止我们重要的繁重任务。

5.2. 小心使用 directExecutor()

虽然我们可以在单元测试中使用 MoreExecutors.directExecutor() 和MoreExecutors.newDirectExecutorService() 来更轻松地处理异步执行,但我们应该小心在生产代码中使用它们。

当我们从上述方法获取执行器时,我们提交给它的任何任务,无论是重量级的还是侦听器,都将在当前线程上执行。如果当前执行上下文是需要高吞吐量的上下文,则这可能很危险。

例如,使用directExecutor并在 UI 线程中向其提交重量级任务将自动阻止我们的 UI 线程。

我们还可能面临这样一种情况:我们的侦听器最终会减慢所有其他侦听器的速度(即使是那些不参与directExecutor 的侦听器)。这是因为 Guava 在各自的执行器中执行 while 循环中的所有侦听器,但directExecutor将导致侦听在与while循环相同的线程中运行。

5.3. 嵌套Future不好

在使用链式Future时,我们应该注意不要从另一个Future内部调用一个Future,以致它创建嵌套Future:

代码语言:javascript代码运行次数:0运行复制
public ListenableFuture<String> generatePassword(String username) {
    return lExecService.submit(() -> {
        TimeUnit.MILLISECONDS.sleep(500);
        return username + "123";
    });
}

String firstName = "john";
ListenableFuture<ListenableFuture<String>> badTask = lExecService.submit(() -> {
    final String username = firstName.replaceAll("[^a-zA-Z]+", "")
        .concat("@service");
    return generatePassword(username);
});Copy

如果我们看到代码有ListenableFuture<ListenableFuture<V>>,那么我们应该知道这是一个写得不好的Future,因为有可能取消和完成外部Future可能会竞争,而取消可能不会传播到内部Future。

如果我们看到上述情况,我们应该始终使用Futures.async() 变体以连接的方式安全地解开这些链式Future。

5.4. 小心jdkFutureAdapters.listenInPoolThread()

Guava建议,我们可以利用其ListenableFuture的最佳方式是将所有使用Future的代码转换为ListenableFuture。

如果这种转换在某些情况下不可行,Guava为我们提供了适配器,以使用JdkFutureAdapters.listenInPoolThread()overrides执行此操作。虽然这似乎很有帮助,但 Guava 警告我们,这些是重量级适配器,应尽可能避免使用。

6. 结论

在本文中,我们已经了解了如何使用Guava的ListenableFuture来丰富我们对Future的使用,以及如何使用_Futures_API来更轻松地使用这些Future。

我们还看到了在使用这些Future和提供的执行者时可能犯的一些常见错误。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。 原始发表:2023-02-16,如有侵权请联系 cloudcommunity@tencent 删除javaguava教程线程异步

Guava的Futures与ListenableFuture

1. 简介

Guava为我们提供了ListenableFuture,在默认的Java Future上具有丰富的API让我们看看如何利用这一点来发挥我们的优势。

2.Future, ListenableFuture and Futures

让我们简要看看这些不同的类是什么以及它们之间的关系。

2.1.Future

Java 5开始,我们可以使用java.util.concurrent.future来表示异步任务。

Future允许我们访问已经完成或将来可能完成的任务的结果,以及取消它们的支持。

2.2.ListenableFuture

使用java.util.concurrent.Future时缺少的一个功能是能够添加侦听器以在完成时运行,这是大多数流行的异步框架提供的常见功能。

Guava通过允许我们将listeners附加到其com.googlemon.util.concurrent.ListenableFuture来解决这个问题。

2.3.Futures

Guava为我们提供了便利类com.googlemon.util.concurrent.Futures,以便更轻松地使用他们的ListenableFuture。

该类提供了与ListenableFuture交互的各种方式,其中包括支持添加成功/失败回调,并允许我们使用聚合或转换协调多个Future。

3. 简单用法

现在让我们看看如何以最简单的方式使用ListenableFuture;创建和添加回调。

3.1. 创建ListenableFuture

我们获得ListenableFuture的最简单方法是将任务提交给ListeningExecutorService(很像我们使用普通的ExecutorService来获取正常的Future):

代码语言:javascript代码运行次数:0运行复制
ExecutorService execService = Executors.newSingleThreadExecutor();
ListeningExecutorService lExecService = MoreExecutors.listeningDecorator(execService);

ListenableFuture<Integer> asyncTask = lExecService.submit(() -> {
    TimeUnit.MILLISECONDS.sleep(500); // long running task
    return 5;
});Copy

请注意我们如何使用MoreExecutors类将我们的 ExecutorService 装饰为ListeningExecutorService。我们可以参考Java 中的线程池简介-Java快速进阶教程来了解有关MoreExecutors 的更多信息。

如果我们已经有一个返回 Future 的 API,并且我们需要将其转换为 ListenableFuture,这可以通过初始化其具体实现ListenableFutureTask 轻松完成

代码语言:javascript代码运行次数:0运行复制
// old api
public FutureTask<String> fetchConfigTask(String configKey) {
    return new FutureTask<>(() -> {
        TimeUnit.MILLISECONDS.sleep(500);
        return String.format("%s.%d", configKey, new Random().nextInt(Integer.MAX_VALUE));
    });
}

// new api
public ListenableFutureTask<String> fetchConfigListenableTask(String configKey) {
    return ListenableFutureTask.create(() -> {
        TimeUnit.MILLISECONDS.sleep(500);
        return String.format("%s.%d", configKey, new Random().nextInt(Integer.MAX_VALUE));
    });
}Copy

我们需要注意,除非我们将这些任务提交给执行者,否则这些任务不会运行。直接与ListenableFutureTask交互并不常见,仅在极少数情况下进行(例如:实现我们自己的ExecutorService)。参考Guava的AbstractListeningExecutorService了解实际用法。

如果我们的异步任务无法使用ListeningExecutorService或提供的Futures实用程序方法,并且我们需要手动设置 future 值,我们也可以使用com.googlemon.util.concurrent.SettableFuture。对于更复杂的用法,我们还可以考虑com.googlemon.util.concurrent.AbstractFuture。

3.2. 添加监听器/回调

我们可以将侦听器添加到ListenableFuture的一种方法是向Futures.addCallback()注册一个回调,为我们提供在成功或失败时访问结果或异常的权限:

代码语言:javascript代码运行次数:0运行复制
Executor listeningExecutor = Executors.newSingleThreadExecutor();
ListenableFuture<Integer> asyncTask = new ListenableFutureService().succeedingTask()
Futures.addCallback(asyncTask, new FutureCallback<Integer>() {
    @Override
    public void onSuccess(Integer result) {
        // do on success
    }

    @Override
    public void onFailure(Throwable t) {
        // do on failure
    }
}, listeningExecutor);Copy

我们还可以通过将侦听器直接添加到ListenableFuture来添加侦听器。请注意,此侦听器将在将来成功完成或异常完成时运行。另外,请注意,我们无法访问异步任务的结果:

代码语言:javascript代码运行次数:0运行复制
Executor listeningExecutor = Executors.newSingleThreadExecutor();
int nextTask = 1;
Set<Integer> runningTasks = ConcurrentHashMap.newKeySet();
runningTasks.add(nextTask);
ListenableFuture<Integer> asyncTask = new ListenableFutureService().succeedingTask()
asyncTask.addListener(() -> runningTasks.remove(nextTask), listeningExecutor);Copy

4. 复杂用法

现在让我们看看如何在更复杂的场景中使用这些Future。

4.1. 扇入

我们有时可能需要调用多个异步任务并收集它们的结果,通常称为扇入操作。

Guava为我们提供了两种方法。但是,我们应该根据我们的要求谨慎选择正确的方法。假设我们需要协调以下异步任务:

代码语言:javascript代码运行次数:0运行复制
ListenableFuture<String> task1 = service.fetchConfig("config.0");
ListenableFuture<String> task2 = service.fetchConfig("config.1");
ListenableFuture<String> task3 = service.fetchConfig("config.2");Copy

扇入多个Future的一种方法是使用Futures.allAsList() 方法。这允许我们收集所有Future的结果,如果它们都成功,按照提供的Future的顺序。如果这些Future中的任何一个失败,那么整个结果就是一个失败的future:

代码语言:javascript代码运行次数:0运行复制
ListenableFuture<List<String>> configsTask = Futures.allAsList(task1, task2, task3);
Futures.addCallback(configsTask, new FutureCallback<List<String>>() {
    @Override
    public void onSuccess(@Nullable List<String> configResults) {
        // do on all futures success
    }

    @Override
    public void onFailure(Throwable t) {
        // handle on at least one failure
    }
}, someExecutor);Copy

如果我们需要收集所有异步任务的结果,无论它们是否失败,我们都可以使用Futures.successAsList()。这将返回一个列表,其结果将与传递到参数中的任务具有相同的顺序,并且失败的任务将为其在列表中各自的位置分配null

代码语言:javascript代码运行次数:0运行复制
ListenableFuture<List<String>> configsTask = Futures.successfulAsList(task1, task2, task3);
Futures.addCallback(configsTask, new FutureCallback<List<String>>() {
    @Override
    public void onSuccess(@Nullable List<String> configResults) {
        // handle results. If task2 failed, then configResults.get(1) == null
    }

    @Override
    public void onFailure(Throwable t) {
        // handle failure
    }
}, listeningExecutor);Copy

在上面的用法中,我们应该小心,如果Future任务通常在成功时返回null,它将与失败的任务(也将结果设置为null)无法区分。

4.2. 带合路器的扇入

如果我们需要协调返回不同结果的多个Future,上述解决方案可能还不够。在这种情况下,我们可以使用扇入操作的组合器变体来协调这种Future组合。

与简单的扇入操作类似,Guava为我们提供了两种变体;一种是当所有任务成功完成时成功的变体,另一种是即使某些任务失败,也可以成功使用Futures.whenAllSuccess()和Futures.whenAllComplete()方法。

让我们看看如何使用Futures.whenAllSuccess() 来组合来自多个Future的不同结果类型:

代码语言:javascript代码运行次数:0运行复制
ListenableFuture<Integer> cartIdTask = service.getCartId();
ListenableFuture<String> customerNameTask = service.getCustomerName();
ListenableFuture<List<String>> cartItemsTask = service.getCartItems();

ListenableFuture<CartInfo> cartInfoTask = Futures.whenAllSucceed(cartIdTask, customerNameTask, cartItemsTask)
    .call(() -> {
        int cartId = Futures.getDone(cartIdTask);
        String customerName = Futures.getDone(customerNameTask);
        List<String> cartItems = Futures.getDone(cartItemsTask);
        return new CartInfo(cartId, customerName, cartItems);
    }, someExecutor);

Futures.addCallback(cartInfoTask, new FutureCallback<CartInfo>() {
    @Override
    public void onSuccess(@Nullable CartInfo result) {
        //handle on all success and combination success
    }

    @Override
    public void onFailure(Throwable t) {
        //handle on either task fail or combination failed
    }
}, listeningExecService);Copy

如果我们需要允许某些任务失败,我们可以使用Futures.whenAllComplete()。虽然语义与上述基本相似,但我们应该意识到,当调用Futures.getDone() 时,失败的Future将抛出ExecutionException

4.3. transform

有时我们需要转换一个成功后的Future结果。Guava为我们提供了两种方法,可以使用Futures.transform()和Futures.lazyTransform()来实现。

让我们看看如何使用Futures.transform() 来转换Future的结果。只要转换计算不重,就可以使用此方法:

代码语言:javascript代码运行次数:0运行复制
ListenableFuture<List<String>> cartItemsTask = service.getCartItems();

Function<List<String>, Integer> itemCountFunc = cartItems -> {
    assertNotNull(cartItems);
    return cartItems.size();
};

ListenableFuture<Integer> itemCountTask = Futures.transform(cartItemsTask, itemCountFunc, listenExecService);Copy

我们还可以使用Futures.lazyTransform() 将转换函数应用于java.util.concurrent.Future。我们需要记住,这个选项不会返回一个ListenableFuture,而是一个普通的java.util.concurrent.Future,并且每次在结果的Future调用get()时,转换函数都适用。

4.4. 链接Future

我们可能会遇到我们的Future需要调用其他Future的情况。在这种情况下,Guava为我们提供了async()变体,以安全地链接这些Future以一个接一个地执行。

让我们看看如何使用Futures.submitAsync() 从提交的Callable内部调用Future:

代码语言:javascript代码运行次数:0运行复制
AsyncCallable<String> asyncConfigTask = () -> {
    ListenableFuture<String> configTask = service.fetchConfig("config.a");
    TimeUnit.MILLISECONDS.sleep(500); //some long running task
    return configTask;
};

ListenableFuture<String> configTask = Futures.submitAsync(asyncConfigTask, executor);Copy

如果我们想要真正的链式,其中一个Future的结果被输入到另一个Future的计算中,我们可以使用Futures.transformAsync()

代码语言:javascript代码运行次数:0运行复制
ListenableFuture<String> usernameTask = service.generateUsername("john");
AsyncFunction<String, String> passwordFunc = username -> {
    ListenableFuture<String> generatePasswordTask = service.generatePassword(username);
    TimeUnit.MILLISECONDS.sleep(500); // some long running task
    return generatePasswordTask;
};

ListenableFuture<String> passwordTask = Futures.transformAsync(usernameTask, passwordFunc, executor);Copy

Guava还为我们提供了Futures.scheduleAsync()和Futures.catchingAsync()分别提交计划任务和提供错误恢复的回退任务。虽然它们迎合了不同的场景,但我们不会讨论它们,因为它们与其他async() 调用相似。

5. 用法注意事项

现在让我们调查一下我们在处理Future时可能遇到的一些常见陷阱以及如何避免它们。

5.1. 工作执行者与监听执行者

在使用Guava Future时,了解工作执行者和监听执行者之间的区别非常重要。例如,假设我们有一个异步任务来获取配置:

代码语言:javascript代码运行次数:0运行复制
public ListenableFuture<String> fetchConfig(String configKey) {
    return lExecService.submit(() -> {
        TimeUnit.MILLISECONDS.sleep(500);
        return String.format("%s.%d", configKey, new Random().nextInt(Integer.MAX_VALUE));
    });
}Copy

假设我们想将Listener附加到上述Future:

代码语言:javascript代码运行次数:0运行复制
ListenableFuture<String> configsTask = service.fetchConfig("config.0");
Futures.addCallback(configsTask, someListener, listeningExecutor);Copy

请注意,这里的lExecService是运行异步任务的执行器,而listeningExecutor是调用侦听器的执行器。

如上所示,我们应该始终考虑将这两个执行器分开,以避免我们的侦听器和工作线程竞争相同的线程池资源的情况。共享同一个执行器可能会导致我们的繁重任务使侦听器执行量不足。或者一个写得不好的重量级Listener最终会阻止我们重要的繁重任务。

5.2. 小心使用 directExecutor()

虽然我们可以在单元测试中使用 MoreExecutors.directExecutor() 和MoreExecutors.newDirectExecutorService() 来更轻松地处理异步执行,但我们应该小心在生产代码中使用它们。

当我们从上述方法获取执行器时,我们提交给它的任何任务,无论是重量级的还是侦听器,都将在当前线程上执行。如果当前执行上下文是需要高吞吐量的上下文,则这可能很危险。

例如,使用directExecutor并在 UI 线程中向其提交重量级任务将自动阻止我们的 UI 线程。

我们还可能面临这样一种情况:我们的侦听器最终会减慢所有其他侦听器的速度(即使是那些不参与directExecutor 的侦听器)。这是因为 Guava 在各自的执行器中执行 while 循环中的所有侦听器,但directExecutor将导致侦听在与while循环相同的线程中运行。

5.3. 嵌套Future不好

在使用链式Future时,我们应该注意不要从另一个Future内部调用一个Future,以致它创建嵌套Future:

代码语言:javascript代码运行次数:0运行复制
public ListenableFuture<String> generatePassword(String username) {
    return lExecService.submit(() -> {
        TimeUnit.MILLISECONDS.sleep(500);
        return username + "123";
    });
}

String firstName = "john";
ListenableFuture<ListenableFuture<String>> badTask = lExecService.submit(() -> {
    final String username = firstName.replaceAll("[^a-zA-Z]+", "")
        .concat("@service");
    return generatePassword(username);
});Copy

如果我们看到代码有ListenableFuture<ListenableFuture<V>>,那么我们应该知道这是一个写得不好的Future,因为有可能取消和完成外部Future可能会竞争,而取消可能不会传播到内部Future。

如果我们看到上述情况,我们应该始终使用Futures.async() 变体以连接的方式安全地解开这些链式Future。

5.4. 小心jdkFutureAdapters.listenInPoolThread()

Guava建议,我们可以利用其ListenableFuture的最佳方式是将所有使用Future的代码转换为ListenableFuture。

如果这种转换在某些情况下不可行,Guava为我们提供了适配器,以使用JdkFutureAdapters.listenInPoolThread()overrides执行此操作。虽然这似乎很有帮助,但 Guava 警告我们,这些是重量级适配器,应尽可能避免使用。

6. 结论

在本文中,我们已经了解了如何使用Guava的ListenableFuture来丰富我们对Future的使用,以及如何使用_Futures_API来更轻松地使用这些Future。

我们还看到了在使用这些Future和提供的执行者时可能犯的一些常见错误。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。 原始发表:2023-02-16,如有侵权请联系 cloudcommunity@tencent 删除javaguava教程线程异步

本文标签: Guava的Futures与ListenableFuture