【Java篇】 1.CompletableFuture使用线程池进行异步多线程处理

yanqy24 2024-10-26 17:35:01 阅读 84

CompletableFuture使用ThreadPool线程池进行异步多线程处理

多线程编程是提升程序性能和响应速度的关键手段之一, Java 提供了多种方式来实现并发处理,从早期的 <code>Thread 类到后来的 Executor 框架,Java 8 引入了CompletableFuture,结合线程池(ThreadPool)使用,不仅简化了多线程的编写,还大大提高了代码的可读性和可维护性。相比传统的多线程方式,CompletableFuture + ThreadPool的组合为开发者提供了更高层次的抽象,保留了灵活性,又降低了复杂性。

CompletableFuture 在 Java 中实现多线程编程的优势

非阻塞操作

CompletableFuture 提供了非阻塞的方法来处理异步任务。与 Future.get() 不同,CompletableFuture 可以通过回调函数(如 thenApplythenAccept 等)在任务完成时自动执行后续操作,而无需阻塞主线程。

更好的异常处理

CompletableFuture 提供了更灵活的异常处理机制。你可以使用 exceptionallyhandle 等方法来处理异步任务中的异常,使代码更加健壮。

任务组合

CompletableFuture 允许你组合多个异步任务。你可以使用 thenCombinethenCompose 等方法将多个异步任务的结果进行组合,从而实现更复杂的异步逻辑。

链式调用

CompletableFuture 支持链式调用,使代码更加简洁和易读。你可以通过链式调用来定义一系列的异步操作,清晰地描述任务之间的依赖关系。

手动完成

CompletableFuture 可以被显式地完成,这意味着你可以在任何时候手动设置它的值。这在某些需要提前返回结果的场景中非常有用。

并行处理

通过结合使用 CompletableFuture 和线程池,可以高效地利用多线程进行并发处理,提高应用程序的性能和响应能力。

为什么要结合线程池ThreadPool来使用 CompletableFuture

降低资源消耗:通过重复利用已创建的线程,线程池可以显著降低线程创建和销毁所带来的开销¹。这对于需要频繁创建和销毁线程的应用程序尤为重要。

提高响应速度:当任务到达时,线程池可以立即提供可用的线程来执行任务,而不需要等待新线程的创建。这可以显著提高系统的响应速度。

控制并发度:线程池可以限制并发执行的线程数量,防止系统过载。通过调整线程池的大小,可以有效控制并发度,避免资源消耗过大。

提高线程管理性:线程池提供了统一的线程管理和监控机制,例如线程的创建、销毁和状态监控等。这使得开发人员可以更方便地管理和调试线程。

任务队列:线程池通常会使用任务队列来存储待执行的任务,这样可以实现任务的缓冲和调度。当线程池中的线程都在执行任务时,新的任务会被放入任务队列中等待执行。


那接下来就通过异步保存日志的示例来了解一下CompletableFuture

1. 配置线程池

import org.springframework.scheduling.annotation.EnableAsync;

import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import java.util.concurrent.ThreadPoolExecutor;

/**

* @description 线程池配置

* @author yanqy

* @date 2024/7/1 16:47

**/

@Configuration

@EnableAsync

public class ThreadPoolConfig {

@Bean(name = "logThreadPool")

public ThreadPoolTaskExecutor threadPoolTaskExecutor() {

ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

// 设置核心线程数

executor.setCorePoolSize(2);

// 设置最大线程数

executor.setMaxPoolSize(2);

// 设置队列容量

executor.setQueueCapacity(1000);

// 设置线程活跃时间(秒)

executor.setKeepAliveSeconds(60);

// 设置默认线程名称

executor.setThreadNamePrefix("log-thread-");

// 设置拒绝策略

executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

// 等待所有任务结束后再关闭线程池

executor.setWaitForTasksToCompleteOnShutdown(true);

// 初始化线程池

executor.initialize();

return executor;

}

}

2. 创建异步日志服务类

import lombok.extern.log4j.Log4j;

import org.springframework.scheduling.annotation.Async;

import org.springframework.stereotype.Service;

import java.time.LocalDateTime;

import java.util.concurrent.CompletableFuture;

/***

* @description 异步日志Service

* @author yanqy

* @date 2024/7/1 16:51

**/

@Service

@Log4j

public class AsyncLogService {

@Async("logThreadPool")

public CompletableFuture<LocalDateTime> saveLog(String logMessage) {

LocalDateTime now = LocalDateTime.now();

// 模拟异步保存日志的过程

log.info("Async log saving: " + " - " + now);

// 这里可以添加实际保存日志到数据库或文件的代码

return CompletableFuture.completedFuture(now);

}

}

3. Controller中调用异步方法

import com.dtflys.forest.example.service.AsyncLogService;

import org.springframework.web.bind.annotation.PostMapping;

import org.springframework.web.bind.annotation.RequestBody;

import org.springframework.web.bind.annotation.RestController;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.http.ResponseEntity;

/**

* @description 异步日志 Controller

* @author yanqy

* @date 2024/7/1 16:57

**/

@RestController

public class LogController {

private final AsyncLogService asyncLogService;

@Autowired

public LogController(AsyncLogService asyncLogService) {

this.asyncLogService = asyncLogService;

}

@PostMapping("/saveLog")

public ResponseEntity saveLog(@RequestBody String logMessage) {

asyncLogService.saveLog(logMessage);

return null;

}

4. 调用日志

# 日志中log-thread-1、log-thread-2 多线程运行

2024-07-01 17:24:30.853 INFO 13112 --- [ log-thread-1] c.d.f.example.service.AsyncLogService : Async log saving: - 2024-07-01T17:24:30.853

2024-07-01 17:24:31.298 INFO 13112 --- [ log-thread-2] c.d.f.example.service.AsyncLogService : Async log saving: - 2024-07-01T17:24:31.298

2024-07-01 17:24:31.667 INFO 13112 --- [ log-thread-1] c.d.f.example.service.AsyncLogService : Async log saving: - 2024-07-01T17:24:31.667

2024-07-01 17:24:32.026 INFO 13112 --- [ log-thread-2] c.d.f.example.service.AsyncLogService : Async log saving: - 2024-07-01T17:24:32.026

2024-07-01 17:24:32.387 INFO 13112 --- [ log-thread-1] c.d.f.example.service.AsyncLogService : Async log saving: - 2024-07-01T17:24:32.387

2024-07-01 17:24:32.741 INFO 13112 --- [ log-thread-2] c.d.f.example.service.AsyncLogService : Async log saving: - 2024-07-01T17:24:32.741

2024-07-01 17:24:33.117 INFO 13112 --- [ log-thread-1] c.d.f.example.service.AsyncLogService : Async log saving: - 2024-07-01T17:24:33.117

2024-07-01 17:24:33.496 INFO 13112 --- [ log-thread-2] c.d.f.example.service.AsyncLogService : Async log saving: - 2024-07-01T17:24:33.496

2024-07-01 17:24:33.862 INFO 13112 --- [ log-thread-1] c.d.f.example.service.AsyncLogService : Async log saving: - 2024-07-01T17:24:33.862

2024-07-01 17:24:34.213 INFO 13112 --- [ log-thread-2] c.d.f.example.service.AsyncLogService : Async log saving: - 2024-07-01T17:24:34.213

2024-07-01 17:24:34.578 INFO 13112 --- [ log-thread-1] c.d.f.example.service.AsyncLogService : Async log saving: - 2024-07-01T17:24:34.577

5. ThreadPoolConfig 中各项参数的作用 {#ThreadPool配置解释}

核心线程数 (corePoolSize): 这是线程池中始终运行的线程的最小数量。即使这些线程是空闲的,它们也不会被终止。在这个例子中,核心线程数被设置为2。

最大线程数 (maxPoolSize): 这是线程池允许创建的最大线程数。如果队列满了,并且已创建的线程数少于最大线程数,线程池会尝试创建新的线程来处理任务。在这个例子中,最大线程数也被设置为2。

队列容量 (queueCapacity): 这是用于在执行任务之前存放任务的队列的大小。如果所有核心线程都忙,新任务将在队列中等待。在这个例子中,队列容量被设置为1000。

线程活跃时间 (keepAliveSeconds): 如果线程池中的线程数量超过了核心线程数,那么这是超过核心线程数的线程在终止前可以保持空闲状态的最长时间。在这个例子中,线程活跃时间被设置为60秒。

默认线程名称 (threadNamePrefix): 这是为线程池中的线程设置的名称前缀,有助于在日志和调试时识别线程来源。在这个例子中,线程名称前缀被设置为"log-thread-"。

拒绝策略 (RejectedExecutionHandler): 当任务太多,无法被线程池处理时,这个策略决定了如何处理新提交的任务。CallerRunsPolicy是一种拒绝策略,它不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量。

关闭线程池前等待任务完成 (waitForTasksToCompleteOnShutdown): 这个参数设置为true意味着在关闭线程池时(调用shutdown方法),将会等待所有正在执行的任务完成后,才会关闭线程池。在这个例子中,这个参数被设置为true

6. 线程池其他高级配置选项

线程工厂 (ThreadFactory): 这个选项允许您提供一个自定义的线程工厂来创建新线程。您可以使用这个工厂来设置线程的名称、优先级、守护状态等属性。

拒绝策略 (RejectedExecutionHandler): 当任务提交到线程池时,如果线程池已满,您可以设置一个拒绝策略来处理这些额外的任务。除了CallerRunsPolicy之外,还有AbortPolicy(抛出异常)、DiscardPolicy(丢弃任务)和DiscardOldestPolicy(丢弃队列中最旧的任务)等选项。

线程存活时间 (keepAliveTime): 对于非核心线程,这个参数定义了线程在变为空闲状态时可以存活多久。设置适当的存活时间可以优化资源的使用,避免线程长时间空闲占用资源。

任务队列 (BlockingQueue): 用于存放等待执行的任务。您可以选择不同类型的队列,如LinkedBlockingQueueSynchronousQueueArrayBlockingQueue,每种队列都有其特定的行为和性能特点。

线程池增长策略: 您可以实现自定义逻辑来动态调整线程池的大小,例如,根据当前的负载情况增加或减少线程数。

这些高级配置选项可以根据应用程序的具体需求和行为来调整线程池的性能。正确的配置可以提高应用程序的响应速度和资源利用率。

7. CompletableFuture 的核心知识点

异步方法与非异步方法: CompletableFuture提供了许多方法的两种形式:非异步(non-async)和异步(async)。非异步方法通常在当前线程中执行,而异步方法则在不同的线程中执行[1][1]。

方法链: CompletableFuture允许你通过方法链来组合多个异步操作。例如,你可以使用thenApply来转换结果,使用thenAccept来处理结果,或者使用thenCombine来合并两个独立的CompletableFuture的结果[1][1]。

异常处理: CompletableFuture提供了exceptionallyhandle方法来处理异步操作中的异常。这允许你在链中的任何位置优雅地处理错误情况[1][1]。

完成状态: 使用complete方法可以手动设置CompletableFuture的结果,这在某些测试场景或特殊用例中非常有用[1][1]。

组合异步操作: 你可以使用thenCompose方法来组合多个CompletableFuture,其中一个操作的结果是另一个操作的输入[1][1]。

8. ThreadPool 的核心知识点

线程池类型: Java提供了几种类型的线程池,包括FixedThreadPoolCachedThreadPoolScheduledThreadPoolSingleThreadExecutor。每种类型的线程池都有其特定的用途和行为[2][2]。

任务拒绝策略: 当线程池满载时,新提交的任务会被拒绝。RejectedExecutionHandler接口定义了四种拒绝策略,包括直接抛出异常、丢弃任务、丢弃最老的任务,以及由提交任务的线程来执行任务[2][2]。

线程工厂: 通过自定义ThreadFactory,你可以控制线程的创建过程,比如设置线程的名称、优先级和守护状态[2][2]。

线程池的关闭: 使用shutdown方法可以平滑地关闭线程池,它会等待正在执行的任务完成。而shutdownNow方法则会尝试立即停止所有正在执行的任务[2][2]。

任务队列: 线程池使用BlockingQueue来存储等待执行的任务。队列的类型决定了任务处理的顺序和方式,比如FIFO、LIFO或优先级队列[2][2]。

多线程任务的异常处理与结果管理

在多线程处理任务时,除了关注任务的并发执行性能外,异常处理和执行结果的管理同样至关重要。如果忽视了这些问题,即使程序在短期内能够正常运行,长远来看仍有可能导致隐藏的异常和数据不一致问题。CompletableFuture 提供了丰富的异常处理机制,通过 handleexceptionallywhenComplete 等方法,我们可以优雅地捕获和处理异步任务中的异常,确保线程中抛出的错误不会默默地被忽略,而是能够及时被感知和处理。

同时,借助 CompletableFuturethenApplythenCompose 等链式方法,开发者可以方便地管理并处理任务的执行结果,无需显式阻塞线程等待。这样的设计让代码更加简洁,同时避免了复杂的状态管理。

在并发编程中,做好多线程任务的善后工作,不仅可以提升程序的健壮性,还能为应用的长期运行保驾护航。因此,在实现多线程时,一定不要忽视对异常情况的处理和结果的合理管理。



声明

本文内容仅代表作者观点,或转载于其他网站,本站不以此文作为商业用途
如有涉及侵权,请联系本站进行删除
转载本站原创文章,请注明来源及作者。