知乎专栏 | 多维度架构 |
解决痛点,当我们使用 @Async 注解进行异步执行的时候,一旦请求之后就会进入后台,你不清楚线程什么时候执行完毕,也不清楚它的工作状态,此时你再次请求接口,就会出现两个不同执行的线程。
避免接口无序执行,被同时多次执行,使用 synchronized 可以实现同一时间只能有一个请求,请求完毕之后才能进行下一次请求。
@GetMapping("/lock/{id}") public String lock1(@PathVariable("id") String id) throws InterruptedException { synchronized (id.intern()) { log.info(Thread.currentThread().getName() + " 上锁"); Thread.sleep(10000); log.info(Thread.currentThread().getName() + " 解锁"); } return Thread.currentThread().getName(); }
例如我们有这样一个需求,每天进行一次数据汇总,第二天的数据依赖第一天的数据结果,我们需要生成从 1~31 号的数据,希望线程能排队执行,即每天一个线程,必须1号执行完成之后,才能进行2号数据的生成。
当生成1号数据的时候,再次请求接口会返回正在执行中。
实现方法如下,接口部分
@GetMapping("/async/mutex") public String asyncSynchronizedQueue() { if (!asyncTestService.isLock("queue")) { asyncTestService.asyncSynchronizedQueue("queue"); return ("Success"); } return ("Failure - 执行中,请稍后重试"); }
异步执行 Service
private final Map<String, Boolean> lock = new ConcurrentHashMap<String, Boolean>(); @Async("asyncExecutor") public void asyncSynchronizedQueue(String lockName) { log.info("锁状态 {}", this.lock.toString()); if (this.lock.isEmpty() || !this.lock.containsKey(lockName) || this.lock.get(lockName) == false) { log.info(Thread.currentThread().getName() + " 上锁"); this.lock.computeIfAbsent(lockName, k -> true); try { log.info(Thread.currentThread().getName() + " 服务执行"); Thread.sleep(5 * 1000); log.info(Thread.currentThread().getName() + " 执行完成 "); } catch (InterruptedException e) { e.printStackTrace(); } finally { this.lock.remove(lockName); log.info(Thread.currentThread().getName() + " 解锁"); } } else { log.info(Thread.currentThread().getName() + " 执行中,请等待"); } }
使用 ConcurrentHashMap 在线程间数据共享
private final Map<String, Object> share = new ConcurrentHashMap<>(); @GetMapping("/share/{id}") public Map<String, Object> shareTest(@PathVariable("id") String id) throws InterruptedException { share.computeIfAbsent(id, key -> { return new Date(); }); synchronized (share) { log.info(Thread.currentThread().getName() + " 上锁"); Thread.sleep(1000); log.info(Thread.currentThread().getName() + " 解锁"); } return share; }
通过 actuator 监控指标
neo@MacBook-Pro-M2 ~> curl -s http://www.netkiller.cn:8080/actuator/metrics | jq | grep executor "executor.active", "executor.completed", "executor.pool.core", "executor.pool.max", "executor.pool.size", "executor.queue.remaining", "executor.queued",
获取指标
neo@MacBook-Pro-M2 ~> curl -s http://www.netkiller.cn:8080/actuator/metrics/executor.active | jq { "name": "executor.active", "description": "The approximate number of threads that are actively executing tasks", "baseUnit": "threads", "measurements": [ { "statistic": "VALUE", "value": 0 } ], "availableTags": [ { "tag": "name", "values": [ "asyncExecutor" ] } ] }
actuator 更多是面向运维监控,而我们更多是需要在代码中获取线程池的状态。
@Autowired ThreadPoolTaskExecutor threadPoolTaskExecutor; @GetMapping("/pool") public String pool() { int activeCount = threadPoolTaskExecutor.getActiveCount(); long completedTaskCount = threadPoolTaskExecutor.getThreadPoolExecutor().getCompletedTaskCount(); long taskCount = threadPoolTaskExecutor.getThreadPoolExecutor().getTaskCount(); int queue = threadPoolTaskExecutor.getThreadPoolExecutor().getQueue().size(); String monitor = String.format("Task: %d, Queue: %d, Active: %d, Completed: %d\n", taskCount, queue, activeCount, completedTaskCount); log.info(monitor); return monitor; }
Springboot 并没有提供对异步 Service 的管理功能,异步执行一旦触发之后便进入后台,你对他再也无法控制,你不知道他的运行状态,不知道它什么时候运行完,只能通过日志监控它。
显示进程列表
@GetMapping("/list") public ResponseEntity<String> list() { String ps = threadManager.show(); return ResponseEntity.ok(ps); }
停止进程
@GetMapping("/stop/{threadId}") public ResponseEntity<String> stop(@PathVariable Long threadId) { System.out.println(threadId); try { log.info("Thread id: {}", threadId); threadManager.stop(threadId); return ResponseEntity.ok("true"); } catch (Exception e) { return ResponseEntity.ok(e.toString()); } }
最终呈现效果
neo@MacBook-Pro-M2 ~> curl http://localhost:8080/thread/list ============================================================================================ | ID | Name | Group | Daemon | State | Priority | -------------------------------------------------------------------------------------------- | 19 | Monitor Ctrl-Break | main | true | RUNNABLE | 5 | | 34 | mysql-cj-abandoned-connection-cleanup | main | true | TIMED_WAITING | 5 | | 35 | HikariPool-1 housekeeper | main | true | TIMED_WAITING | 5 | | 37 | lettuce-timer-3-1 | main | true | TIMED_WAITING | 5 | | 431 | XNIO-1 I/O-1 | main | false | RUNNABLE | 5 | | 432 | XNIO-1 I/O-2 | main | false | RUNNABLE | 5 | | 433 | XNIO-1 I/O-3 | main | false | RUNNABLE | 5 | | 434 | XNIO-1 I/O-4 | main | false | RUNNABLE | 5 | | 435 | XNIO-1 I/O-5 | main | false | RUNNABLE | 5 | | 436 | XNIO-1 I/O-6 | main | false | RUNNABLE | 5 | | 437 | XNIO-1 I/O-7 | main | false | RUNNABLE | 5 | | 438 | XNIO-1 I/O-8 | main | false | RUNNABLE | 5 | | 439 | XNIO-1 Accept | main | false | RUNNABLE | 5 | | 440 | DestroyJavaVM | main | false | RUNNABLE | 5 | | 441 | XNIO-1 task-2 | main | false | RUNNABLE | 5 | | 442 | test-8 | job | false | WAITING | 5 | | 443 | test-5 | job | false | WAITING | 5 | | 449 | test-12 | job | false | WAITING | 5 | | 450 | test-9 | job | false | WAITING | 5 | | 451 | test-11 | job | false | WAITING | 5 | ============================================================================================
Springboot 并没有提供对异步 Service 的管理功能,异步执行一旦触发之后便进入后台,你对他再也无法控制,你不知道他的运行状态,不知道它什么时候运行完,只能通过日志监控它。
有时我们希望提前终止线程
@GetMapping("/stop/{threadId}") public ResponseEntity<String> stop(@PathVariable Long threadId) { System.out.println(threadId); try { log.info("Thread id: {}", threadId); threadManager.stop(threadId); return ResponseEntity.ok("true"); } catch (Exception e) { return ResponseEntity.ok(e.toString()); } }
查看当前线程列表,找到线程 ID
neo@MacBook-Pro-M2 ~> curl http://localhost:8080/thread/list ============================================================================================ | ID | Name | Group | Daemon | State | Priority | -------------------------------------------------------------------------------------------- | 19 | Monitor Ctrl-Break | main | true | RUNNABLE | 5 | | 34 | mysql-cj-abandoned-connection-cleanup | main | true | TIMED_WAITING | 5 | | 35 | HikariPool-1 housekeeper | main | true | TIMED_WAITING | 5 | | 37 | lettuce-timer-3-1 | main | true | TIMED_WAITING | 5 | | 431 | XNIO-1 I/O-1 | main | false | RUNNABLE | 5 | | 432 | XNIO-1 I/O-2 | main | false | RUNNABLE | 5 | | 433 | XNIO-1 I/O-3 | main | false | RUNNABLE | 5 | | 434 | XNIO-1 I/O-4 | main | false | RUNNABLE | 5 | | 435 | XNIO-1 I/O-5 | main | false | RUNNABLE | 5 | | 436 | XNIO-1 I/O-6 | main | false | RUNNABLE | 5 | | 437 | XNIO-1 I/O-7 | main | false | RUNNABLE | 5 | | 438 | XNIO-1 I/O-8 | main | false | RUNNABLE | 5 | | 439 | XNIO-1 Accept | main | false | RUNNABLE | 5 | | 440 | DestroyJavaVM | main | false | RUNNABLE | 5 | | 441 | XNIO-1 task-2 | main | false | RUNNABLE | 5 | | 442 | test-8 | job | false | WAITING | 5 | | 443 | test-5 | job | false | WAITING | 5 | | 449 | test-12 | job | false | WAITING | 5 | | 450 | test-9 | job | false | WAITING | 5 | | 451 | test-11 | job | false | WAITING | 5 | ============================================================================================
停止线程
neo@MacBook-Pro-M2 ~> curl http://localhost:8080/thread/stop/451 true⏎
再次查看线程列表,确认线程已经被终止
neo@MacBook-Pro-M2 ~> curl http://localhost:8080/thread/list ============================================================================================ | ID | Name | Group | Daemon | State | Priority | -------------------------------------------------------------------------------------------- | 19 | Monitor Ctrl-Break | main | true | RUNNABLE | 5 | | 34 | mysql-cj-abandoned-connection-cleanup | main | true | TIMED_WAITING | 5 | | 35 | HikariPool-1 housekeeper | main | true | TIMED_WAITING | 5 | | 37 | lettuce-timer-3-1 | main | true | TIMED_WAITING | 5 | | 431 | XNIO-1 I/O-1 | main | false | RUNNABLE | 5 | | 432 | XNIO-1 I/O-2 | main | false | RUNNABLE | 5 | | 433 | XNIO-1 I/O-3 | main | false | RUNNABLE | 5 | | 434 | XNIO-1 I/O-4 | main | false | RUNNABLE | 5 | | 435 | XNIO-1 I/O-5 | main | false | RUNNABLE | 5 | | 436 | XNIO-1 I/O-6 | main | false | RUNNABLE | 5 | | 437 | XNIO-1 I/O-7 | main | false | RUNNABLE | 5 | | 438 | XNIO-1 I/O-8 | main | false | RUNNABLE | 5 | | 439 | XNIO-1 Accept | main | false | RUNNABLE | 5 | | 440 | DestroyJavaVM | main | false | RUNNABLE | 5 | | 441 | XNIO-1 task-2 | main | false | RUNNABLE | 5 | | 442 | test-8 | job | false | WAITING | 5 | | 443 | test-5 | job | false | WAITING | 5 | | 449 | test-12 | job | false | WAITING | 5 | | 450 | test-9 | job | false | WAITING | 5 | | 454 | async-job-11 | job | false | WAITING | 5 | ============================================================================================
这样我们就可以随时终止线程
线程管理 Serivce
package cn.netkiller.thread; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; /* * @author neo<netkiller@msn.com> */ @Slf4j @Service public class ThreadManager { public Thread[] listThreads() { int count = Thread.activeCount(); Thread[] threads = new Thread[count]; Thread.enumerate(threads); return threads; } public Thread fetchThread(long threadId) { int count = Thread.activeCount(); Thread[] threads = new Thread[count]; Thread.enumerate(threads); for (Thread thread : threads) { if (thread.threadId() == threadId) { return thread; } } return null; } public Thread fetchThread(String name) { return this.fetchThread(name, false); } public Thread fetchThread(String name, boolean ignoreCase) { Thread[] threadArray = listThreads(); for (Thread thread : threadArray) { if (ignoreCase) { if (thread.getName().equalsIgnoreCase(name)) { return thread; } } else { if (thread.getName().equals(name)) { return thread; } } } return null; } public int activeCount() { return Thread.activeCount(); } public void stop(Long threadId) { Thread thread = this.fetchThread(threadId); System.out.println(thread.toString()); thread.stop(); } public void stop(String name) { Thread thread = this.fetchThread(name); thread.stop(); } public void interrupt(Long threadId) { Thread thread = this.fetchThread(threadId); System.out.println(thread.toString()); thread.interrupt(); } public void interrupt(String name) { Thread thread = this.fetchThread(name); thread.interrupt(); } public void wait(Long threadId) { Thread thread = this.fetchThread(threadId); try { thread.wait(); } catch (InterruptedException e) { throw new RuntimeException(e); } } public void wait(String name) { Thread thread = this.fetchThread(name); try { thread.wait(); } catch (InterruptedException e) { throw new RuntimeException(e); } } public void start(Long threadId) { Thread thread = this.fetchThread(threadId); thread.start(); } public void start(String name) { Thread thread = this.fetchThread(name); thread.start(); } public String show() { StringBuilder out = new StringBuilder(); Thread[] threads = this.listThreads(); int idLength = 4; int nameLength = 4; int groupLength = 5; for (Thread thread : threads) { if (Long.toString(thread.threadId()).length() > idLength) { idLength = Long.toString(thread.threadId()).length(); } if (thread.getName().length() > nameLength) { nameLength = thread.getName().length(); } if (thread.getThreadGroup().getName().length() > groupLength) { groupLength = thread.getThreadGroup().getName().length(); } } String format = "| %" + idLength + "s | %" + nameLength + "s | %" + groupLength + "s | %6s | %13s | %8s |\n"; String line = String.format("%0" + (idLength + nameLength + groupLength + 30 + 16) + "d\n", 0).replace("0", "="); out.append(line); out.append(String.format(format, "ID", "Name", "Group", "Daemon", "State", "Priority")); out.append(line.replace("=", "-")); for (Thread thread : threads) { if (thread != null) { out.append(String.format(format, thread.threadId(), thread.getName(), thread.getThreadGroup().getName(), thread.isDaemon(), thread.getState().name(), thread.getPriority())); } } out.append(line); return out.toString(); } }