| 知乎专栏 |
解决痛点,当我们使用 @Async 注解进行异步执行的时候,一旦请求之后就会进入后台,你不清楚线程什么时候执行完毕,也不清楚它的工作状态,此时你再次请求接口,就会出现两个不同执行的线程。
避免接口无序执行,被同时多次执行,使用 synchronized 可以实现同一时间只能有一个请求,请求完毕之后才能进行下一次请求。
@GetMapping("/lock/{id}")
public String lock1(@PathVariable("id") String id) throws InterruptedException {
synchronized (id.intern()) {
log.info(Thread.currentThread().getName() + " 上锁");
Thread.sleep(10000);
log.info(Thread.currentThread().getName() + " 解锁");
}
return Thread.currentThread().getName();
}
例如我们有这样一个需求,每天进行一次数据汇总,第二天的数据依赖第一天的数据结果,我们需要生成从 1~31 号的数据,希望线程能排队执行,即每天一个线程,必须1号执行完成之后,才能进行2号数据的生成。
当生成1号数据的时候,再次请求接口会返回正在执行中。
实现方法如下,接口部分
@GetMapping("/async/mutex")
public String asyncSynchronizedQueue() {
if (!asyncTestService.isLock("queue")) {
asyncTestService.asyncSynchronizedQueue("queue");
return ("Success");
}
return ("Failure - 执行中,请稍后重试");
}
异步执行 Service
private final Map<String, Boolean> lock = new ConcurrentHashMap<String, Boolean>();
@Async("asyncExecutor")
public void asyncSynchronizedQueue(String lockName) {
log.info("锁状态 {}", this.lock.toString());
if (this.lock.isEmpty() || !this.lock.containsKey(lockName) || this.lock.get(lockName) == false) {
log.info(Thread.currentThread().getName() + " 上锁");
this.lock.computeIfAbsent(lockName, k -> true);
try {
log.info(Thread.currentThread().getName() + " 服务执行");
Thread.sleep(5 * 1000);
log.info(Thread.currentThread().getName() + " 执行完成 ");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
this.lock.remove(lockName);
log.info(Thread.currentThread().getName() + " 解锁");
}
} else {
log.info(Thread.currentThread().getName() + " 执行中,请等待");
}
}
使用 ConcurrentHashMap 在线程间数据共享
private final Map<String, Object> share = new ConcurrentHashMap<>();
@GetMapping("/share/{id}")
public Map<String, Object> shareTest(@PathVariable("id") String id) throws InterruptedException {
share.computeIfAbsent(id, key -> {
return new Date();
});
synchronized (share) {
log.info(Thread.currentThread().getName() + " 上锁");
Thread.sleep(1000);
log.info(Thread.currentThread().getName() + " 解锁");
}
return share;
}
通过 actuator 监控指标
neo@MacBook-Pro-M2 ~> curl -s http://www.netkiller.cn:8080/actuator/metrics | jq | grep executor
"executor.active",
"executor.completed",
"executor.pool.core",
"executor.pool.max",
"executor.pool.size",
"executor.queue.remaining",
"executor.queued",
获取指标
neo@MacBook-Pro-M2 ~> curl -s http://www.netkiller.cn:8080/actuator/metrics/executor.active | jq
{
"name": "executor.active",
"description": "The approximate number of threads that are actively executing tasks",
"baseUnit": "threads",
"measurements": [
{
"statistic": "VALUE",
"value": 0
}
],
"availableTags": [
{
"tag": "name",
"values": [
"asyncExecutor"
]
}
]
}
actuator 更多是面向运维监控,而我们更多是需要在代码中获取线程池的状态。
@Autowired
ThreadPoolTaskExecutor threadPoolTaskExecutor;
@GetMapping("/pool")
public String pool() {
int activeCount = threadPoolTaskExecutor.getActiveCount();
long completedTaskCount = threadPoolTaskExecutor.getThreadPoolExecutor().getCompletedTaskCount();
long taskCount = threadPoolTaskExecutor.getThreadPoolExecutor().getTaskCount();
int queue = threadPoolTaskExecutor.getThreadPoolExecutor().getQueue().size();
String monitor = String.format("Task: %d, Queue: %d, Active: %d, Completed: %d\n", taskCount, queue, activeCount, completedTaskCount);
log.info(monitor);
return monitor;
}
Springboot 并没有提供对异步 Service 的管理功能,异步执行一旦触发之后便进入后台,你对他再也无法控制,你不知道他的运行状态,不知道它什么时候运行完,只能通过日志监控它。
显示进程列表
@GetMapping("/list")
public ResponseEntity<String> list() {
String ps = threadManager.show();
return ResponseEntity.ok(ps);
}
停止进程
@GetMapping("/stop/{threadId}")
public ResponseEntity<String> stop(@PathVariable Long threadId) {
System.out.println(threadId);
try {
log.info("Thread id: {}", threadId);
threadManager.stop(threadId);
return ResponseEntity.ok("true");
} catch (Exception e) {
return ResponseEntity.ok(e.toString());
}
}
最终呈现效果
neo@MacBook-Pro-M2 ~> curl http://localhost:8080/thread/list ============================================================================================ | ID | Name | Group | Daemon | State | Priority | -------------------------------------------------------------------------------------------- | 19 | Monitor Ctrl-Break | main | true | RUNNABLE | 5 | | 34 | mysql-cj-abandoned-connection-cleanup | main | true | TIMED_WAITING | 5 | | 35 | HikariPool-1 housekeeper | main | true | TIMED_WAITING | 5 | | 37 | lettuce-timer-3-1 | main | true | TIMED_WAITING | 5 | | 431 | XNIO-1 I/O-1 | main | false | RUNNABLE | 5 | | 432 | XNIO-1 I/O-2 | main | false | RUNNABLE | 5 | | 433 | XNIO-1 I/O-3 | main | false | RUNNABLE | 5 | | 434 | XNIO-1 I/O-4 | main | false | RUNNABLE | 5 | | 435 | XNIO-1 I/O-5 | main | false | RUNNABLE | 5 | | 436 | XNIO-1 I/O-6 | main | false | RUNNABLE | 5 | | 437 | XNIO-1 I/O-7 | main | false | RUNNABLE | 5 | | 438 | XNIO-1 I/O-8 | main | false | RUNNABLE | 5 | | 439 | XNIO-1 Accept | main | false | RUNNABLE | 5 | | 440 | DestroyJavaVM | main | false | RUNNABLE | 5 | | 441 | XNIO-1 task-2 | main | false | RUNNABLE | 5 | | 442 | test-8 | job | false | WAITING | 5 | | 443 | test-5 | job | false | WAITING | 5 | | 449 | test-12 | job | false | WAITING | 5 | | 450 | test-9 | job | false | WAITING | 5 | | 451 | test-11 | job | false | WAITING | 5 | ============================================================================================
Springboot 并没有提供对异步 Service 的管理功能,异步执行一旦触发之后便进入后台,你对他再也无法控制,你不知道他的运行状态,不知道它什么时候运行完,只能通过日志监控它。
有时我们希望提前终止线程
@GetMapping("/stop/{threadId}")
public ResponseEntity<String> stop(@PathVariable Long threadId) {
System.out.println(threadId);
try {
log.info("Thread id: {}", threadId);
threadManager.stop(threadId);
return ResponseEntity.ok("true");
} catch (Exception e) {
return ResponseEntity.ok(e.toString());
}
}
查看当前线程列表,找到线程 ID
neo@MacBook-Pro-M2 ~> curl http://localhost:8080/thread/list ============================================================================================ | ID | Name | Group | Daemon | State | Priority | -------------------------------------------------------------------------------------------- | 19 | Monitor Ctrl-Break | main | true | RUNNABLE | 5 | | 34 | mysql-cj-abandoned-connection-cleanup | main | true | TIMED_WAITING | 5 | | 35 | HikariPool-1 housekeeper | main | true | TIMED_WAITING | 5 | | 37 | lettuce-timer-3-1 | main | true | TIMED_WAITING | 5 | | 431 | XNIO-1 I/O-1 | main | false | RUNNABLE | 5 | | 432 | XNIO-1 I/O-2 | main | false | RUNNABLE | 5 | | 433 | XNIO-1 I/O-3 | main | false | RUNNABLE | 5 | | 434 | XNIO-1 I/O-4 | main | false | RUNNABLE | 5 | | 435 | XNIO-1 I/O-5 | main | false | RUNNABLE | 5 | | 436 | XNIO-1 I/O-6 | main | false | RUNNABLE | 5 | | 437 | XNIO-1 I/O-7 | main | false | RUNNABLE | 5 | | 438 | XNIO-1 I/O-8 | main | false | RUNNABLE | 5 | | 439 | XNIO-1 Accept | main | false | RUNNABLE | 5 | | 440 | DestroyJavaVM | main | false | RUNNABLE | 5 | | 441 | XNIO-1 task-2 | main | false | RUNNABLE | 5 | | 442 | test-8 | job | false | WAITING | 5 | | 443 | test-5 | job | false | WAITING | 5 | | 449 | test-12 | job | false | WAITING | 5 | | 450 | test-9 | job | false | WAITING | 5 | | 451 | test-11 | job | false | WAITING | 5 | ============================================================================================
停止线程
neo@MacBook-Pro-M2 ~> curl http://localhost:8080/thread/stop/451 true⏎
再次查看线程列表,确认线程已经被终止
neo@MacBook-Pro-M2 ~> curl http://localhost:8080/thread/list ============================================================================================ | ID | Name | Group | Daemon | State | Priority | -------------------------------------------------------------------------------------------- | 19 | Monitor Ctrl-Break | main | true | RUNNABLE | 5 | | 34 | mysql-cj-abandoned-connection-cleanup | main | true | TIMED_WAITING | 5 | | 35 | HikariPool-1 housekeeper | main | true | TIMED_WAITING | 5 | | 37 | lettuce-timer-3-1 | main | true | TIMED_WAITING | 5 | | 431 | XNIO-1 I/O-1 | main | false | RUNNABLE | 5 | | 432 | XNIO-1 I/O-2 | main | false | RUNNABLE | 5 | | 433 | XNIO-1 I/O-3 | main | false | RUNNABLE | 5 | | 434 | XNIO-1 I/O-4 | main | false | RUNNABLE | 5 | | 435 | XNIO-1 I/O-5 | main | false | RUNNABLE | 5 | | 436 | XNIO-1 I/O-6 | main | false | RUNNABLE | 5 | | 437 | XNIO-1 I/O-7 | main | false | RUNNABLE | 5 | | 438 | XNIO-1 I/O-8 | main | false | RUNNABLE | 5 | | 439 | XNIO-1 Accept | main | false | RUNNABLE | 5 | | 440 | DestroyJavaVM | main | false | RUNNABLE | 5 | | 441 | XNIO-1 task-2 | main | false | RUNNABLE | 5 | | 442 | test-8 | job | false | WAITING | 5 | | 443 | test-5 | job | false | WAITING | 5 | | 449 | test-12 | job | false | WAITING | 5 | | 450 | test-9 | job | false | WAITING | 5 | | 454 | async-job-11 | job | false | WAITING | 5 | ============================================================================================
这样我们就可以随时终止线程
线程管理 Serivce
package cn.netkiller.thread;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
/*
* @author neo<netkiller@msn.com>
*/
@Slf4j
@Service
public class ThreadManager {
public Thread[] listThreads() {
int count = Thread.activeCount();
Thread[] threads = new Thread[count];
Thread.enumerate(threads);
return threads;
}
public Thread fetchThread(long threadId) {
int count = Thread.activeCount();
Thread[] threads = new Thread[count];
Thread.enumerate(threads);
for (Thread thread : threads) {
if (thread.threadId() == threadId) {
return thread;
}
}
return null;
}
public Thread fetchThread(String name) {
return this.fetchThread(name, false);
}
public Thread fetchThread(String name, boolean ignoreCase) {
Thread[] threadArray = listThreads();
for (Thread thread : threadArray) {
if (ignoreCase) {
if (thread.getName().equalsIgnoreCase(name)) {
return thread;
}
} else {
if (thread.getName().equals(name)) {
return thread;
}
}
}
return null;
}
public int activeCount() {
return Thread.activeCount();
}
public void stop(Long threadId) {
Thread thread = this.fetchThread(threadId);
System.out.println(thread.toString());
thread.stop();
}
public void stop(String name) {
Thread thread = this.fetchThread(name);
thread.stop();
}
public void interrupt(Long threadId) {
Thread thread = this.fetchThread(threadId);
System.out.println(thread.toString());
thread.interrupt();
}
public void interrupt(String name) {
Thread thread = this.fetchThread(name);
thread.interrupt();
}
public void wait(Long threadId) {
Thread thread = this.fetchThread(threadId);
try {
thread.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
public void wait(String name) {
Thread thread = this.fetchThread(name);
try {
thread.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
public void start(Long threadId) {
Thread thread = this.fetchThread(threadId);
thread.start();
}
public void start(String name) {
Thread thread = this.fetchThread(name);
thread.start();
}
public String show() {
StringBuilder out = new StringBuilder();
Thread[] threads = this.listThreads();
int idLength = 4;
int nameLength = 4;
int groupLength = 5;
for (Thread thread : threads) {
if (Long.toString(thread.threadId()).length() > idLength) {
idLength = Long.toString(thread.threadId()).length();
}
if (thread.getName().length() > nameLength) {
nameLength = thread.getName().length();
}
if (thread.getThreadGroup().getName().length() > groupLength) {
groupLength = thread.getThreadGroup().getName().length();
}
}
String format = "| %" + idLength + "s | %" + nameLength + "s | %" + groupLength + "s | %6s | %13s | %8s |\n";
String line = String.format("%0" + (idLength + nameLength + groupLength + 30 + 16) + "d\n", 0).replace("0", "=");
out.append(line);
out.append(String.format(format, "ID", "Name", "Group", "Daemon", "State", "Priority"));
out.append(line.replace("=", "-"));
for (Thread thread : threads) {
if (thread != null) {
out.append(String.format(format, thread.threadId(), thread.getName(), thread.getThreadGroup().getName(), thread.isDaemon(), thread.getState().name(), thread.getPriority()));
}
}
out.append(line);
return out.toString();
}
}