| 知乎专栏 |
newCachedThreadPool 线程池尺寸没有固定上线
private void startTask(List<String> usersList){
ExecutorService executor = Executors.newCachedThreadPool();
executor.submit(()->{
//do someting
});
}
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 100; i++) {
executor.execute(() -> {
try {
TimeUnit.SECONDS.sleep(5);
System.out.println(Thread.currentThread().getName());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
}
executor.shutdown();
package cn.netkiller.test.grey;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class GreyTest {
public GreyTest() {
// TODO Auto-generated constructor stub
}
static class MyThread implements Runnable {
public void run() {
System.out.println("Thread Name:" + Thread.currentThread().getName());
}
}
public static void main(String[] args) {
// 创建五个线程池
int nThreads = 5;
ExecutorService pool = Executors.newFixedThreadPool(nThreads);
// 创建实现了Runnable接口对象
MyThread t1 = new MyThread();
MyThread t2 = new MyThread();
MyThread t3 = new MyThread();
MyThread t4 = new MyThread();
MyThread t5 = new MyThread();
// 将线程放入池中进行执行
pool.execute(t1);
pool.execute(t2);
pool.execute(t3);
pool.execute(t4);
pool.execute(t5);
// 关闭线程池
pool.shutdown();
}
}
提交线程数大于线程池尺寸时会同步等待,然后复用已经处理完的空间线程。
System.out.println(Thread.currentThread());
AtomicInteger count = new AtomicInteger(1);
int nThread = Runtime.getRuntime().availableProcessors();
ExecutorService executor = Executors.newFixedThreadPool(nThread);
for (int i = 0; i < 100; i++) {
executor.execute(() -> {
try {
TimeUnit.SECONDS.sleep(2);
System.out.println(count.getAndIncrement() + " [" + Thread.currentThread().getName() + "] " + System.currentTimeMillis());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
}
executor.shutdown();
CompletableFuture 线程池
package cn.netkiller.test;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.function.Supplier;
public class Test {
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println(Thread.currentThread());
ExecutorService executor = Executors.newFixedThreadPool(10);
Parallel parallel = new Parallel();
parallel.addAsyncTask(() -> {
System.out.println(Thread.currentThread().getName());
return "task1";
}, executor)
.addAsyncTask(() -> {
System.out.println(Thread.currentThread().getName());
return "task2";
}, executor)
.addAsyncTask(() -> {
System.out.println(Thread.currentThread().getName());
return "task3";
}, executor)
.addAsyncTask(() -> {
System.out.println(Thread.currentThread().getName());
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "finally";
}, executor);
parallel.join();
List<CompletableFuture<String>> futures = parallel.get();
futures.stream().forEach(item -> {
System.out.println(item.getNow("no result"));
});
executor.shutdown();
}
public static class Parallel<T> {
private final List<CompletableFuture<T>> futures;
Parallel() {
this(10);
}
Parallel(int size) {
futures = new ArrayList<>(size);
}
public Parallel addAsyncTask(Supplier<T> supplier) {
futures.add(CompletableFuture.supplyAsync(supplier));
return this;
}
public Parallel addAsyncTask(Supplier<T> supplier, Executor executor) {
futures.add(CompletableFuture.supplyAsync(supplier, executor));
return this;
}
public List<CompletableFuture<T>> get() {
return futures;
}
public void join() {
CompletableFuture.allOf(futures.toArray(new CompletableFuture[]{})).join();
}
public void clear() {
futures.clear();
}
}
}
@Configuration
public class ScheduleConfig implements SchedulingConfigurer {
@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
//当然了,这里设置的线程池是corePoolSize也是很关键了,自己根据业务需求设定
taskRegistrar.setScheduler(Executors.newScheduledThreadPool(10));
}
}
ScheduledExecutorService service = Executors.newScheduledThreadPool(10); service.schedule(new Task(), 10, TimeUnit.SECONDS); service.scheduleAtFixedRate(new Task(), 10, 10, TimeUnit.SECONDS); service.scheduleWithFixedDelay(new Task(), 10, 10, TimeUnit.SECONDS);
可以理解为 SingleThreadExecutor = Executors.newFixedThreadPool(1);
ExecutorService executor = Executors.newSingleThreadExecutor(); ExecutorService executor = Executors.newSingleThreadScheduledExecutor();
System.out.println(Thread.currentThread());
AtomicInteger count = new AtomicInteger(1);
int nThread = Runtime.getRuntime().availableProcessors();
ExecutorService executor = Executors.newSingleThreadExecutor();
for (int i = 0; i < 100; i++) {
executor.execute(() -> {
try {
TimeUnit.SECONDS.sleep(2);
System.out.println(count.getAndIncrement() + " [" + Thread.currentThread().getName() + "] " + System.currentTimeMillis());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
}
executor.shutdown();
ExecutorService 功能包括,提交任务、执行任务、关闭线程池。
首先通过 executor.shutdown(); 发送关闭信号,然后再通过 executor.awaitTermination(10, TimeUnit.SECONDS) 设置超时时间,超时抛出异常,最后通过 executor.shutdownNow(); 强制关闭。
package cn.netkiller.test;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.function.Supplier;
public class Test {
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println(Thread.currentThread());
ExecutorService executor = Executors.newFixedThreadPool(10);
Parallel parallel = new Parallel();
parallel.addAsyncTask(() -> {
System.out.println(Thread.currentThread().getName());
return "task1";
}, executor)
.addAsyncTask(() -> {
System.out.println(Thread.currentThread().getName());
return "task2";
}, executor)
.addAsyncTask(() -> {
System.out.println(Thread.currentThread().getName());
return "task3";
}, executor)
.addAsyncTask(() -> {
System.out.println(Thread.currentThread().getName());
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "finally";
}, executor);
parallel.join();
List<CompletableFuture<String>> futures = parallel.get();
futures.stream().forEach(item -> {
System.out.println(item.getNow("no result"));
});
executor.shutdown();
try {
executor.shutdown();
if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
// 超时的时候向线程池中所有的线程发出中断(interrupted)。
executor.shutdownNow();
}
} catch (InterruptedException e) {
// awaitTermination方法被中断的时候也中止线程池中全部的线程的执行。
System.out.println("awaitTermination interrupted: " + e);
executor.shutdownNow();
}
}
public static class Parallel<T> {
private final List<CompletableFuture<T>> futures;
Parallel() {
this(10);
}
Parallel(int size) {
futures = new ArrayList<>(size);
}
public Parallel addAsyncTask(Supplier<T> supplier) {
futures.add(CompletableFuture.supplyAsync(supplier));
return this;
}
public Parallel addAsyncTask(Supplier<T> supplier, Executor executor) {
futures.add(CompletableFuture.supplyAsync(supplier, executor));
return this;
}
public List<CompletableFuture<T>> get() {
return futures;
}
public void join() {
CompletableFuture.allOf(futures.toArray(new CompletableFuture[]{})).join();
}
public void clear() {
futures.clear();
}
}
}