| 知乎专栏 |
Springboot 官方不建议在生产环境使用 SimpleAsyncTaskExecutor,建议使用 自定义线程池。
2024-02-02T16:24:38.585+08:00 WARN 86223 --- [watch-development] [nio-8080-exec-2] s.w.s.m.m.a.RequestMappingHandlerAdapter : !!! Performing asynchronous handling through the default Spring MVC SimpleAsyncTaskExecutor. This executor is not suitable for production use under load. Please, configure an AsyncTaskExecutor through the WebMvc config. ------------------------------- !!!
Bean 注入配置线程池
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
@SpringBootApplication
@EnableAsync
public class Application {
public static void main(String[] args) {
// close the application context to shut down the custom ExecutorService
SpringApplication.run(Application.class, args).close();
}
@Bean
public Executor asyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(2);
executor.setMaxPoolSize(2);
executor.setQueueCapacity(500);
executor.setThreadNamePrefix("Netkiller -");
executor.initialize();
return executor;
}
@Bean("thread")
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 设置核心线程数
executor.setCorePoolSize(5);
// 设置最大线程数
executor.setMaxPoolSize(10);
// 设置队列容量
executor.setQueueCapacity(20);
// 设置线程活跃时间(秒)
executor.setKeepAliveSeconds(60);
// 设置线程名称
executor.setThreadNamePrefix("hello-");
// 设置拒绝策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 等待所有任务结束后再关闭线程池
executor.setWaitForTasksToCompleteOnShutdown(true);
return executor;
}
}
设置线程池参数
@SpringBootApplication
@EnableAsync
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
@Component
public class Task {
@Async
public void doTaskOne() throws Exception {
// 业务逻辑
}
@Async("asyncExecutor")
public void doTaskTwo() throws Exception {
// 业务逻辑
}
@Async("thread")
public void doTaskThree() throws Exception {
// 业务逻辑
}
}
注意:@Async 不会用到刚刚定义的线程池,@Async("asyncExecutor"),@Async("thread") 会正确调用
线程池能接受多少队列?
下面配置是 executor.setQueueCapacity(10); 也就是 10个,但是实测结果跟你想的不同
package cn.netkiller.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ThreadPoolExecutor;
@Configuration
@EnableAsync
public class ThreadPoolTaskExecutorConfiguration {
@Bean("asyncExecutor")
public ThreadPoolTaskExecutor executor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setThreadGroupName("job");
executor.setThreadNamePrefix("async-job-");
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(10);
executor.setKeepAliveSeconds(60);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
executor.setAwaitTerminationSeconds(60);
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.initialize();
return executor;
}
}
实测结果是,首次执行可以容纳 20 个线程,20个线程执行完毕之后,再添加任务,就只接受 10 个,超过的部分会跑出异常
Executor [java.util.concurrent.ThreadPoolExecutor@7e729046[Running, pool size = 10, active threads = 10, queued tasks = 10, completed tasks = 0]] did not accept task: org.springframework.aop.interceptor.AsyncExecutionInterceptor$$Lambda$1775/0x0000000801b6afb0@20eaccc2
这是因为线程池可以容纳 10 个任务,队列还能排队 10 个任务。
package cn.netkiller.wallet.config;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
@Configuration
@EnableAsync
public class ExecutorConfiguration {
/** Set the ThreadPoolExecutor's core pool size. */
private int corePoolSize = 10;
/** Set the ThreadPoolExecutor's maximum pool size. */
private int maxPoolSize = 200;
/** Set the capacity for the ThreadPoolExecutor's BlockingQueue. */
private int queueCapacity = 10;
@Bean
public Executor OneAsync() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(corePoolSize);
executor.setMaxPoolSize(maxPoolSize);
executor.setQueueCapacity(queueCapacity);
executor.setThreadNamePrefix("MySimpleExecutor-");
executor.initialize();
return executor;
}
@Bean
public Executor TwoAsync() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(corePoolSize);
executor.setMaxPoolSize(maxPoolSize);
executor.setQueueCapacity(queueCapacity);
executor.setThreadNamePrefix("MyExecutor-");
// rejection-policy:当pool已经达到max size的时候,如何处理新任务
// CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
}
@Service
public class DemoAsyncServiceImpl implements DemoAsyncService {
public static Random random =new Random();
@Async("OneAsync")
public Future<String> doTaskOne() throws Exception {
System.out.println("开始做任务一");
long start = System.currentTimeMillis();
Thread.sleep(random.nextInt(10000));
long end = System.currentTimeMillis();
System.out.println("完成任务一,耗时:" + (end - start) + "毫秒");
return new AsyncResult<>("任务一完成");
}
@Async("TwoAsync")
public Future<String> doTaskTwo() throws Exception {
System.out.println("开始做任务二");
long start = System.currentTimeMillis();
Thread.sleep(random.nextInt(10000));
long end = System.currentTimeMillis();
System.out.println("完成任务二,耗时:" + (end - start) + "毫秒");
return new AsyncResult<>("任务二完成");
}
@Async
public Future<String> doTaskThree() throws Exception {
System.out.println("开始做任务三");
long start = System.currentTimeMillis();
Thread.sleep(random.nextInt(10000));
long end = System.currentTimeMillis();
System.out.println("完成任务三,耗时:" + (end - start) + "毫秒");
return new AsyncResult<>("任务三完成");
}
}
这种方式可以直接使用 @Async
package cn.netkiller.config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
@Configuration
@EnableAsync
public class ThreadPoolTaskExecutorConfiguration implements AsyncConfigurer {
@Value("${spring.task.execution.pool.core-size}")
private int corePoolSize;
@Value("${spring.task.execution.pool.max-size}")
private int maxPoolSize;
@Value("${spring.task.execution.pool.queue-capacity}")
private int queueCapacity;
// @Value("${spring.task.execution.thread-name-prefix}")
private final String threadNamePrefix = "async-";
@Value("${spring.task.execution.pool.keep-alive}")
private int keepAliveSeconds;
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//最大线程数
executor.setMaxPoolSize(maxPoolSize);
//核心线程数
executor.setCorePoolSize(corePoolSize);
//任务队列的大小
executor.setQueueCapacity(queueCapacity);
//线程前缀名
executor.setThreadNamePrefix(threadNamePrefix);
//线程存活时间
executor.setKeepAliveSeconds(keepAliveSeconds);
/**
* 拒绝处理策略
* CallerRunsPolicy():交由调用方线程运行,比如 main 线程。
* AbortPolicy():直接抛出异常。
* DiscardPolicy():直接丢弃。
* DiscardOldestPolicy():丢弃队列中最老的任务。
*/
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//线程初始化
executor.initialize();
return executor;
}
}
package cn.netkiller.config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurerSupport;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
@Configuration
@EnableAsync
public class ThreadPoolTaskExecutorConfiguration extends AsyncConfigurerSupport {
@Value("${spring.task.execution.pool.core-size}")
private int corePoolSize;
@Value("${spring.task.execution.pool.max-size}")
private int maxPoolSize;
@Value("${spring.task.execution.pool.queue-capacity}")
private int queueCapacity;
// @Value("${spring.task.execution.thread-name-prefix}")
private final String threadNamePrefix = "async-";
@Value("${spring.task.execution.pool.keep-alive}")
private int keepAliveSeconds;
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//最大线程数
executor.setMaxPoolSize(maxPoolSize);
//核心线程数
executor.setCorePoolSize(corePoolSize);
//任务队列的大小
executor.setQueueCapacity(queueCapacity);
//线程前缀名
executor.setThreadNamePrefix(threadNamePrefix);
//线程存活时间
executor.setKeepAliveSeconds(keepAliveSeconds);
/**
* 拒绝处理策略
* CallerRunsPolicy():交由调用方线程运行,比如 main 线程。
* AbortPolicy():直接抛出异常。
* DiscardPolicy():直接丢弃。
* DiscardOldestPolicy():丢弃队列中最老的任务。
*/
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//线程初始化
executor.initialize();
return executor;
}
}
这种方式多用在 Spring 2.x 中
package cn.netkiller.config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
@Configuration
@EnableAsync
public class ThreadPoolTaskExecutorConfiguration {
@Value("${spring.task.execution.pool.core-size}")
private int corePoolSize;
@Value("${spring.task.execution.pool.max-size}")
private int maxPoolSize;
@Value("${spring.task.execution.pool.queue-capacity}")
private int queueCapacity;
// @Value("${spring.task.execution.thread-name-prefix}")
private final String threadNamePrefix = "";
@Value("${spring.task.execution.pool.keep-alive}")
private int keepAliveSeconds;
@Bean
public Executor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//最大线程数
executor.setMaxPoolSize(maxPoolSize);
//核心线程数
executor.setCorePoolSize(corePoolSize);
//任务队列的大小
executor.setQueueCapacity(queueCapacity);
//线程前缀名
executor.setThreadNamePrefix(threadNamePrefix);
//线程存活时间
executor.setKeepAliveSeconds(keepAliveSeconds);
/**
* 拒绝处理策略
* CallerRunsPolicy():交由调用方线程运行,比如 main 线程。
* AbortPolicy():直接抛出异常。
* DiscardPolicy():直接丢弃。
* DiscardOldestPolicy():丢弃队列中最老的任务。
*/
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//线程初始化
executor.initialize();
return executor;
}
}
注意:使用@Bean方式必须配合 @Async("threadPoolTaskExecutor")
自定义连接池之后,系统内会存在两个连接吃 SimpleAsyncTaskExecutor 和 ThreadPoolTaskExecutor
@Bean
public Executor applicationTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(2);
executor.setMaxPoolSize(2);
executor.setQueueCapacity(500);
executor.setThreadNamePrefix("Netkiller-");
executor.initialize();
return executor;
}