Home | 简体中文 | 繁体中文 | 杂文 | Github | 知乎专栏 | 51CTO学院 | CSDN程序员研修院 | OSChina 博客 | 腾讯云社区 | 阿里云栖社区 | Facebook | Linkedin | Youtube | 打赏(Donations) | About
知乎专栏多维度架构

2.21. 多维度架构设计之线程池

解决痛点,当我们使用 @Async 注解进行异步执行的时候,一旦请求之后就会进入后台,你不清楚线程什么时候执行完毕,也不清楚它的工作状态,此时你再次请求接口,就会出现两个不同执行的线程。

2.21.1. 并行控制(同步阻塞)

避免接口无序执行,被同时多次执行,使用 synchronized 可以实现同一时间只能有一个请求,请求完毕之后才能进行下一次请求。

		
   @GetMapping("/lock/{id}")
    public String lock1(@PathVariable("id") String id) throws InterruptedException {
        synchronized (id.intern()) {
            log.info(Thread.currentThread().getName() + " 上锁");
            Thread.sleep(10000);
            log.info(Thread.currentThread().getName() + " 解锁");
        }
        return Thread.currentThread().getName();
    }		
		
		

2.21.2. 并行控制(异步非阻塞)

例如我们有这样一个需求,每天进行一次数据汇总,第二天的数据依赖第一天的数据结果,我们需要生成从 1~31 号的数据,希望线程能排队执行,即每天一个线程,必须1号执行完成之后,才能进行2号数据的生成。

当生成1号数据的时候,再次请求接口会返回正在执行中。

实现方法如下,接口部分

		
    @GetMapping("/async/mutex")
    public String asyncSynchronizedQueue() {
        if (!asyncTestService.isLock("queue")) {
            asyncTestService.asyncSynchronizedQueue("queue");
            return ("Success");
        }
        return ("Failure - 执行中,请稍后重试");
    }		
		
		

异步执行 Service

		
	private final Map<String, Boolean> lock = new ConcurrentHashMap<String, Boolean>();

    @Async("asyncExecutor")
    public void asyncSynchronizedQueue(String lockName) {

        log.info("锁状态 {}", this.lock.toString());

        if (this.lock.isEmpty() || !this.lock.containsKey(lockName) || this.lock.get(lockName) == false) {
            log.info(Thread.currentThread().getName() + " 上锁");
            this.lock.computeIfAbsent(lockName, k -> true);

            try {
                log.info(Thread.currentThread().getName() + " 服务执行");
                Thread.sleep(5 * 1000);
                log.info(Thread.currentThread().getName() + " 执行完成 ");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                this.lock.remove(lockName);
                log.info(Thread.currentThread().getName() + " 解锁");
            }

        } else {
            log.info(Thread.currentThread().getName() + " 执行中,请等待");
        }
    }
		
		
		

2.21.3. 数据共享

使用 ConcurrentHashMap 在线程间数据共享

		
	private final Map<String, Object> share = new ConcurrentHashMap<>();

    @GetMapping("/share/{id}")
    public Map<String, Object> shareTest(@PathVariable("id") String id) throws InterruptedException {

        share.computeIfAbsent(id, key -> {
            return new Date();
        });

        synchronized (share) {
            log.info(Thread.currentThread().getName() + " 上锁");
            Thread.sleep(1000);
            log.info(Thread.currentThread().getName() + " 解锁");
        }
        return share;
    }
		
		

2.21.4. 线程池监控

通过 actuator 监控指标

		
neo@MacBook-Pro-M2 ~> curl -s http://www.netkiller.cn:8080/actuator/metrics | jq | grep executor
    "executor.active",
    "executor.completed",
    "executor.pool.core",
    "executor.pool.max",
    "executor.pool.size",
    "executor.queue.remaining",
    "executor.queued",		
		
		

获取指标

		
neo@MacBook-Pro-M2 ~> curl -s http://www.netkiller.cn:8080/actuator/metrics/executor.active | jq
{
  "name": "executor.active",
  "description": "The approximate number of threads that are actively executing tasks",
  "baseUnit": "threads",
  "measurements": [
    {
      "statistic": "VALUE",
      "value": 0
    }
  ],
  "availableTags": [
    {
      "tag": "name",
      "values": [
        "asyncExecutor"
      ]
    }
  ]
}		
		
		

actuator 更多是面向运维监控,而我们更多是需要在代码中获取线程池的状态。

		
    @Autowired
    ThreadPoolTaskExecutor threadPoolTaskExecutor;
    		
    @GetMapping("/pool")
    public String pool() {
        int activeCount = threadPoolTaskExecutor.getActiveCount();
        long completedTaskCount = threadPoolTaskExecutor.getThreadPoolExecutor().getCompletedTaskCount();
        long taskCount = threadPoolTaskExecutor.getThreadPoolExecutor().getTaskCount();
        int queue = threadPoolTaskExecutor.getThreadPoolExecutor().getQueue().size();
        String monitor = String.format("Task: %d, Queue: %d, Active: %d, Completed: %d\n", taskCount, queue, activeCount, completedTaskCount);
        log.info(monitor);
        return monitor;
    }	
		
		

2.21.5. 线程监控

Springboot 并没有提供对异步 Service 的管理功能,异步执行一旦触发之后便进入后台,你对他再也无法控制,你不知道他的运行状态,不知道它什么时候运行完,只能通过日志监控它。

显示进程列表

		
    @GetMapping("/list")
    public ResponseEntity<String> list() {
        String ps = threadManager.show();
        return ResponseEntity.ok(ps);
    }    
		
		

停止进程

		
    @GetMapping("/stop/{threadId}")
    public ResponseEntity<String> stop(@PathVariable Long threadId) {
        System.out.println(threadId);
        try {
            log.info("Thread id: {}", threadId);
            threadManager.stop(threadId);
            return ResponseEntity.ok("true");
        } catch (Exception e) {
            return ResponseEntity.ok(e.toString());
        }
    }
		
		

最终呈现效果

		
neo@MacBook-Pro-M2 ~> curl http://localhost:8080/thread/list
============================================================================================
|   ID |                                  Name | Group | Daemon |         State | Priority |
--------------------------------------------------------------------------------------------
|   19 |                    Monitor Ctrl-Break |  main |   true |      RUNNABLE |        5 |
|   34 | mysql-cj-abandoned-connection-cleanup |  main |   true | TIMED_WAITING |        5 |
|   35 |              HikariPool-1 housekeeper |  main |   true | TIMED_WAITING |        5 |
|   37 |                     lettuce-timer-3-1 |  main |   true | TIMED_WAITING |        5 |
|  431 |                          XNIO-1 I/O-1 |  main |  false |      RUNNABLE |        5 |
|  432 |                          XNIO-1 I/O-2 |  main |  false |      RUNNABLE |        5 |
|  433 |                          XNIO-1 I/O-3 |  main |  false |      RUNNABLE |        5 |
|  434 |                          XNIO-1 I/O-4 |  main |  false |      RUNNABLE |        5 |
|  435 |                          XNIO-1 I/O-5 |  main |  false |      RUNNABLE |        5 |
|  436 |                          XNIO-1 I/O-6 |  main |  false |      RUNNABLE |        5 |
|  437 |                          XNIO-1 I/O-7 |  main |  false |      RUNNABLE |        5 |
|  438 |                          XNIO-1 I/O-8 |  main |  false |      RUNNABLE |        5 |
|  439 |                         XNIO-1 Accept |  main |  false |      RUNNABLE |        5 |
|  440 |                         DestroyJavaVM |  main |  false |      RUNNABLE |        5 |
|  441 |                         XNIO-1 task-2 |  main |  false |      RUNNABLE |        5 |
|  442 |                                test-8 |   job |  false |       WAITING |        5 |
|  443 |                                test-5 |   job |  false |       WAITING |        5 |
|  449 |                               test-12 |   job |  false |       WAITING |        5 |
|  450 |                                test-9 |   job |  false |       WAITING |        5 |
|  451 |                               test-11 |   job |  false |       WAITING |        5 |
============================================================================================
		
		

2.21.6. 线程管理

Springboot 并没有提供对异步 Service 的管理功能,异步执行一旦触发之后便进入后台,你对他再也无法控制,你不知道他的运行状态,不知道它什么时候运行完,只能通过日志监控它。

有时我们希望提前终止线程

		
    @GetMapping("/stop/{threadId}")
    public ResponseEntity<String> stop(@PathVariable Long threadId) {
        System.out.println(threadId);
        try {
            log.info("Thread id: {}", threadId);
            threadManager.stop(threadId);
            return ResponseEntity.ok("true");
        } catch (Exception e) {
            return ResponseEntity.ok(e.toString());
        }
    }				
		
		

查看当前线程列表,找到线程 ID

		
neo@MacBook-Pro-M2 ~> curl http://localhost:8080/thread/list
============================================================================================
|   ID |                                  Name | Group | Daemon |         State | Priority |
--------------------------------------------------------------------------------------------
|   19 |                    Monitor Ctrl-Break |  main |   true |      RUNNABLE |        5 |
|   34 | mysql-cj-abandoned-connection-cleanup |  main |   true | TIMED_WAITING |        5 |
|   35 |              HikariPool-1 housekeeper |  main |   true | TIMED_WAITING |        5 |
|   37 |                     lettuce-timer-3-1 |  main |   true | TIMED_WAITING |        5 |
|  431 |                          XNIO-1 I/O-1 |  main |  false |      RUNNABLE |        5 |
|  432 |                          XNIO-1 I/O-2 |  main |  false |      RUNNABLE |        5 |
|  433 |                          XNIO-1 I/O-3 |  main |  false |      RUNNABLE |        5 |
|  434 |                          XNIO-1 I/O-4 |  main |  false |      RUNNABLE |        5 |
|  435 |                          XNIO-1 I/O-5 |  main |  false |      RUNNABLE |        5 |
|  436 |                          XNIO-1 I/O-6 |  main |  false |      RUNNABLE |        5 |
|  437 |                          XNIO-1 I/O-7 |  main |  false |      RUNNABLE |        5 |
|  438 |                          XNIO-1 I/O-8 |  main |  false |      RUNNABLE |        5 |
|  439 |                         XNIO-1 Accept |  main |  false |      RUNNABLE |        5 |
|  440 |                         DestroyJavaVM |  main |  false |      RUNNABLE |        5 |
|  441 |                         XNIO-1 task-2 |  main |  false |      RUNNABLE |        5 |
|  442 |                                test-8 |   job |  false |       WAITING |        5 |
|  443 |                                test-5 |   job |  false |       WAITING |        5 |
|  449 |                               test-12 |   job |  false |       WAITING |        5 |
|  450 |                                test-9 |   job |  false |       WAITING |        5 |
|  451 |                               test-11 |   job |  false |       WAITING |        5 |
============================================================================================		
		
		

停止线程

		
neo@MacBook-Pro-M2 ~> curl http://localhost:8080/thread/stop/451
true⏎  
		
		

再次查看线程列表,确认线程已经被终止

		
neo@MacBook-Pro-M2 ~> curl http://localhost:8080/thread/list
============================================================================================
|   ID |                                  Name | Group | Daemon |         State | Priority |
--------------------------------------------------------------------------------------------
|   19 |                    Monitor Ctrl-Break |  main |   true |      RUNNABLE |        5 |
|   34 | mysql-cj-abandoned-connection-cleanup |  main |   true | TIMED_WAITING |        5 |
|   35 |              HikariPool-1 housekeeper |  main |   true | TIMED_WAITING |        5 |
|   37 |                     lettuce-timer-3-1 |  main |   true | TIMED_WAITING |        5 |
|  431 |                          XNIO-1 I/O-1 |  main |  false |      RUNNABLE |        5 |
|  432 |                          XNIO-1 I/O-2 |  main |  false |      RUNNABLE |        5 |
|  433 |                          XNIO-1 I/O-3 |  main |  false |      RUNNABLE |        5 |
|  434 |                          XNIO-1 I/O-4 |  main |  false |      RUNNABLE |        5 |
|  435 |                          XNIO-1 I/O-5 |  main |  false |      RUNNABLE |        5 |
|  436 |                          XNIO-1 I/O-6 |  main |  false |      RUNNABLE |        5 |
|  437 |                          XNIO-1 I/O-7 |  main |  false |      RUNNABLE |        5 |
|  438 |                          XNIO-1 I/O-8 |  main |  false |      RUNNABLE |        5 |
|  439 |                         XNIO-1 Accept |  main |  false |      RUNNABLE |        5 |
|  440 |                         DestroyJavaVM |  main |  false |      RUNNABLE |        5 |
|  441 |                         XNIO-1 task-2 |  main |  false |      RUNNABLE |        5 |
|  442 |                                test-8 |   job |  false |       WAITING |        5 |
|  443 |                                test-5 |   job |  false |       WAITING |        5 |
|  449 |                               test-12 |   job |  false |       WAITING |        5 |
|  450 |                                test-9 |   job |  false |       WAITING |        5 |
|  454 |                          async-job-11 |   job |  false |       WAITING |        5 |
============================================================================================		
		
		

这样我们就可以随时终止线程

2.21.7. 线程管理代码

线程管理 Serivce

		
package cn.netkiller.thread;

import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

/*
 * @author neo<netkiller@msn.com>
 */
@Slf4j
@Service
public class ThreadManager {

    public Thread[] listThreads() {
        int count = Thread.activeCount();
        Thread[] threads = new Thread[count];
        Thread.enumerate(threads);
        return threads;
    }

    public Thread fetchThread(long threadId) {
        int count = Thread.activeCount();
        Thread[] threads = new Thread[count];
        Thread.enumerate(threads);
        for (Thread thread : threads) {
            if (thread.threadId() == threadId) {
                return thread;
            }
        }
        return null;
    }

    public Thread fetchThread(String name) {
        return this.fetchThread(name, false);
    }

    public Thread fetchThread(String name, boolean ignoreCase) {
        Thread[] threadArray = listThreads();
        for (Thread thread : threadArray) {
            if (ignoreCase) {
                if (thread.getName().equalsIgnoreCase(name)) {
                    return thread;
                }
            } else {
                if (thread.getName().equals(name)) {
                    return thread;
                }
            }
        }
        return null;
    }


    public int activeCount() {
        return Thread.activeCount();
    }


    public void stop(Long threadId) {
        Thread thread = this.fetchThread(threadId);
        System.out.println(thread.toString());
        thread.stop();
    }

    public void stop(String name) {
        Thread thread = this.fetchThread(name);
        thread.stop();
    }

    public void interrupt(Long threadId) {
        Thread thread = this.fetchThread(threadId);
        System.out.println(thread.toString());
        thread.interrupt();
    }

    public void interrupt(String name) {
        Thread thread = this.fetchThread(name);
        thread.interrupt();
    }

    public void wait(Long threadId) {
        Thread thread = this.fetchThread(threadId);
        try {
            thread.wait();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    public void wait(String name) {
        Thread thread = this.fetchThread(name);
        try {
            thread.wait();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    public void start(Long threadId) {
        Thread thread = this.fetchThread(threadId);
        thread.start();
    }

    public void start(String name) {
        Thread thread = this.fetchThread(name);
        thread.start();
    }

    public String show() {
        StringBuilder out = new StringBuilder();
        Thread[] threads = this.listThreads();
        int idLength = 4;
        int nameLength = 4;
        int groupLength = 5;
        for (Thread thread : threads) {

            if (Long.toString(thread.threadId()).length() > idLength) {
                idLength = Long.toString(thread.threadId()).length();
            }
            if (thread.getName().length() > nameLength) {
                nameLength = thread.getName().length();
            }
            if (thread.getThreadGroup().getName().length() > groupLength) {
                groupLength = thread.getThreadGroup().getName().length();
            }
        }

        String format = "| %" + idLength + "s | %" + nameLength + "s | %" + groupLength + "s | %6s | %13s | %8s |\n";
        String line = String.format("%0" + (idLength + nameLength + groupLength + 30 + 16) + "d\n", 0).replace("0", "=");
        out.append(line);
        out.append(String.format(format, "ID", "Name", "Group", "Daemon", "State", "Priority"));
        out.append(line.replace("=", "-"));
        for (Thread thread : threads) {
            if (thread != null) {
                out.append(String.format(format, thread.threadId(), thread.getName(), thread.getThreadGroup().getName(), thread.isDaemon(), thread.getState().name(), thread.getPriority()));
            }
        }
        out.append(line);
        return out.toString();
    }
}