| 知乎专栏 |
private final ConcurrentLinkedQueue<Story> queue = new ConcurrentLinkedQueue();
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
public class NonBlockingQueueExample {
public static void main(String[] args) {
Queue<String> queue = new ConcurrentLinkedQueue<>();
// 生产者线程
Thread producer = new Thread(() -> {
for (int i = 0; i < 10; i++) {
String message = "Msg " + i;
queue.offer(message); // 非阻塞添加
System.out.println("Produced: " + message);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});
// 消费者线程
Thread consumer = new Thread(() -> {
int count = 0;
while (count < 10) {
String message = queue.poll(); // 非阻塞获取
if (message != null) {
System.out.println("Consumed: " + message);
count++;
}
try {
Thread.sleep(50);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});
producer.start();
consumer.start();
}
}
package cn.netkiller.test;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class QueueTest {
/**
* 定义装苹果的篮子
*/
public static class Basket {
// 篮子,能够容纳10个苹果
BlockingQueue<String> basket = new ArrayBlockingQueue<String>(10);
// 生产苹果,放入篮子
public void produce() throws InterruptedException {
// put方法放入一个苹果,若basket满了,等到basket有位置
basket.put("An apple");
}
// 消费苹果,从篮子中取走
public String consume() throws InterruptedException {
// get方法取出一个苹果,若basket为空,等到basket有苹果为止
return basket.take();
}
public int size() {
return basket.size();
}
}
// 测试方法
public static void testBasket() throws InterruptedException {
// 建立一个装苹果的篮子
final Basket basket = new Basket();
// 定义苹果生产者
class Producer implements Runnable {
public void run() {
try {
while (true) {
int n = random(1, 5);
for (int i = 0; i < n; i++) {
basket.produce();
}
System.out.println(System.currentTimeMillis() + " 放入" + n + "个,当前总数:" + basket.size() + "个");
Thread.sleep(random(450, 1000));
}
} catch (InterruptedException ex) {
}
}
}
// 定义苹果消费者
class Consumer implements Runnable {
public void run() {
try {
while (true) {
// 消费苹果
int n = random(1, 5);
for (int i = 0; i < n; i++) {
basket.consume();
}
System.out.println(System.currentTimeMillis() + " 取出" + n + "个,剩余数量:" + basket.size() + "个");
Thread.sleep(random(400, 1000));
}
} catch (InterruptedException ex) {
}
}
}
ExecutorService service = Executors.newCachedThreadPool();
Producer producer = new Producer();
Consumer consumer = new Consumer();
service.submit(producer);
// 延迟消费
Thread.sleep(5000);
service.submit(consumer);
// 程序运行10s后,所有任务停止
try {
Thread.sleep(20000);
} catch (InterruptedException e) {
}
service.shutdownNow();
}
public static int random(int min, int max) {
var value = new Random().ints(min, (max + 1)).limit(1).findFirst().getAsInt();
return value;
}
public static void main(String[] args) throws InterruptedException {
QueueTest.testBasket();
}
}
LinkedBlockingQueue是一种线程安全的阻塞队列,可以用于生产者-消费者模式,实现线程同步。
import java.util.concurrent.LinkedBlockingQueue;
public class BlockingSynchronizedThread {
private final LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
public void produce(int item) throws InterruptedException {
queue.put(item);
}
public int consume() throws InterruptedException {
return queue.take();
}
}
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class QueueThreadCommunication {
public static void main(String[] args) {
// 创建一个阻塞队列,容量为10
BlockingQueue<String> queue = new LinkedBlockingQueue<>(10);
// 生产者线程
Thread producer = new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
String message = "Message " + i;
queue.put(message); // 如果队列满,会阻塞
System.out.println("Produced: " + message);
Thread.sleep(100);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// 消费者线程
Thread consumer = new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
String message = queue.take(); // 如果队列空,会阻塞
System.out.println("Consumed: " + message);
Thread.sleep(200);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
producer.start();
consumer.start();
}
}
import java.util.concurrent.PriorityBlockingQueue;
public class PriorityQueueExample {
public static void main(String[] args) {
PriorityBlockingQueue<Integer> queue = new PriorityBlockingQueue<>();
// 生产者
Thread producer = new Thread(() -> {
queue.put(5);
queue.put(1);
queue.put(3);
queue.put(2);
queue.put(4);
});
// 消费者
Thread consumer = new Thread(() -> {
try {
while (!queue.isEmpty()) {
System.out.println("Consumed: " + queue.take());
Thread.sleep(100);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
producer.start();
consumer.start();
}
}
import java.util.concurrent.ConcurrentLinkedQueue;
public class ConcurrentLinkedQueueExample {
public static void main(String[] args) {
ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>();
// 生产者线程
new Thread(() -> {
for (int i = 0; i < 1000; i++) {
queue.offer(i); // 立即返回,不会阻塞
System.out.println("Produced: " + i);
}
}).start();
// 消费者线程
new Thread(() -> {
while (true) {
Integer item = queue.poll(); // 立即返回,队列为空时返回 null
if (item != null) {
System.out.println("Consumed: " + item);
}
}
}).start();
}
}
import java.util.concurrent.LinkedTransferQueue;
public class LinkedTransferQueueExample {
public static void main(String[] args) throws InterruptedException {
LinkedTransferQueue<Integer> queue = new LinkedTransferQueue<>();
// 消费者线程(先启动,等待数据)
new Thread(() -> {
try {
for (int i = 0; i < 5; i++) {
Integer item = queue.take(); // 等待生产者传递数据
System.out.println("Consumed: " + item);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
// 生产者线程
new Thread(() -> {
try {
for (int i = 0; i < 5; i++) {
queue.transfer(i); // 直接传递给消费者,若没有消费者则阻塞
System.out.println("Produced: " + i);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
}
}
特性 LinkedBlockingQueue LinkedTransferQueue ConcurrentLinkedQueue 队列边界 有界(可指定) 无界 无界 锁机制 分离读写锁 无锁(CAS) 无锁(CAS) 阻塞操作 支持 put/take 支持 transfer 不支持(立即返回) 直接传递 不支持 支持 transfer() 不支持 适用场景 生产者 - 消费者模型 即时传递、高性能 非阻塞、读多写少 性能建议 读多写少:优先使用 ConcurrentLinkedQueue(无锁,吞吐量高)。 生产者 - 消费者模型:优先使用 LinkedBlockingQueue(有界控制,平衡读写)。 即时传递:优先使用 LinkedTransferQueue(避免入队出队开销)。