Home | 简体中文 | 繁体中文 | 杂文 | Github | 知乎专栏 | Facebook | Linkedin | Youtube | 打赏(Donations) | About
知乎专栏

10.7. 队列 / 阻塞队列

10.7.1. ConcurrentLinkedQueue 非阻塞队列

		
private final ConcurrentLinkedQueue<Story> queue = new ConcurrentLinkedQueue();		
		
		
		
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;

public class NonBlockingQueueExample {
    public static void main(String[] args) {
        Queue<String> queue = new ConcurrentLinkedQueue<>();
        
        // 生产者线程
        Thread producer = new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                String message = "Msg " + i;
                queue.offer(message); // 非阻塞添加
                System.out.println("Produced: " + message);
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        });
        
        // 消费者线程
        Thread consumer = new Thread(() -> {
            int count = 0;
            while (count < 10) {
                String message = queue.poll(); // 非阻塞获取
                if (message != null) {
                    System.out.println("Consumed: " + message);
                    count++;
                }
                try {
                    Thread.sleep(50);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        });
        
        producer.start();
        consumer.start();
    }
}
		
		

10.7.2. ArrayBlockingQueue

		
package cn.netkiller.test;

import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class QueueTest {
	/**
	 * 定义装苹果的篮子
	 */
	public static class Basket {
		// 篮子,能够容纳10个苹果
		BlockingQueue<String> basket = new ArrayBlockingQueue<String>(10);

		// 生产苹果,放入篮子
		public void produce() throws InterruptedException {
			// put方法放入一个苹果,若basket满了,等到basket有位置
			basket.put("An apple");
		}

		// 消费苹果,从篮子中取走
		public String consume() throws InterruptedException {
			// get方法取出一个苹果,若basket为空,等到basket有苹果为止
			return basket.take();
		}

		public int size() {
			return basket.size();
		}

	}

	// 测试方法
	public static void testBasket() throws InterruptedException {
		// 建立一个装苹果的篮子
		final Basket basket = new Basket();
		// 定义苹果生产者
		class Producer implements Runnable {
			public void run() {
				try {
					while (true) {
						int n = random(1, 5);
						for (int i = 0; i < n; i++) {
							basket.produce();
						}
						System.out.println(System.currentTimeMillis() + " 放入" + n + "个,当前总数:" + basket.size() + "个");
						Thread.sleep(random(450, 1000));
					}
				} catch (InterruptedException ex) {
				}
			}
		}
		// 定义苹果消费者
		class Consumer implements Runnable {
			public void run() {
				try {
					while (true) {
						// 消费苹果
						int n = random(1, 5);
						for (int i = 0; i < n; i++) {
							basket.consume();
						}
						System.out.println(System.currentTimeMillis() + " 取出" + n + "个,剩余数量:" + basket.size() + "个");
						Thread.sleep(random(400, 1000));
					}
				} catch (InterruptedException ex) {
				}
			}
		}

		ExecutorService service = Executors.newCachedThreadPool();
		Producer producer = new Producer();
		Consumer consumer = new Consumer();
		service.submit(producer);
		// 延迟消费
		Thread.sleep(5000);
		service.submit(consumer);
		// 程序运行10s后,所有任务停止
		try {
			Thread.sleep(20000);
		} catch (InterruptedException e) {
		}
		service.shutdownNow();
	}

	public static int random(int min, int max) {
		var value = new Random().ints(min, (max + 1)).limit(1).findFirst().getAsInt();
		return value;
	}

	public static void main(String[] args) throws InterruptedException {
		QueueTest.testBasket();
	}
}

		
			

10.7.3. LinkedBlockingQueue

LinkedBlockingQueue是一种线程安全的阻塞队列,可以用于生产者-消费者模式,实现线程同步。

		
import java.util.concurrent.LinkedBlockingQueue;

public class BlockingSynchronizedThread {
private final LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>();

	public void produce(int item) throws InterruptedException {
		queue.put(item);
	}
	
	public int consume() throws InterruptedException {
		return queue.take();
	}
}		
		
			
			
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class QueueThreadCommunication {
    public static void main(String[] args) {
        // 创建一个阻塞队列,容量为10
        BlockingQueue<String> queue = new LinkedBlockingQueue<>(10);
        
        // 生产者线程
        Thread producer = new Thread(() -> {
            try {
                for (int i = 0; i < 10; i++) {
                    String message = "Message " + i;
                    queue.put(message); // 如果队列满,会阻塞
                    System.out.println("Produced: " + message);
                    Thread.sleep(100);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
        
        // 消费者线程
        Thread consumer = new Thread(() -> {
            try {
                for (int i = 0; i < 10; i++) {
                    String message = queue.take(); // 如果队列空,会阻塞
                    System.out.println("Consumed: " + message);
                    Thread.sleep(200);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
        
        producer.start();
        consumer.start();
    }
}			
			
			

10.7.4. PriorityBlockingQueue

			
import java.util.concurrent.PriorityBlockingQueue;

public class PriorityQueueExample {
    public static void main(String[] args) {
        PriorityBlockingQueue<Integer> queue = new PriorityBlockingQueue<>();
        
        // 生产者
        Thread producer = new Thread(() -> {
            queue.put(5);
            queue.put(1);
            queue.put(3);
            queue.put(2);
            queue.put(4);
        });
        
        // 消费者
        Thread consumer = new Thread(() -> {
            try {
                while (!queue.isEmpty()) {
                    System.out.println("Consumed: " + queue.take());
                    Thread.sleep(100);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
        
        producer.start();
        consumer.start();
    }
}			
			
			

10.7.5. ConcurrentLinkedQueue

			
import java.util.concurrent.ConcurrentLinkedQueue;

public class ConcurrentLinkedQueueExample {
    public static void main(String[] args) {
        ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>();
        
        // 生产者线程
        new Thread(() -> {
            for (int i = 0; i < 1000; i++) {
                queue.offer(i); // 立即返回,不会阻塞
                System.out.println("Produced: " + i);
            }
        }).start();
        
        // 消费者线程
        new Thread(() -> {
            while (true) {
                Integer item = queue.poll(); // 立即返回,队列为空时返回 null
                if (item != null) {
                    System.out.println("Consumed: " + item);
                }
            }
        }).start();
    }
}
			
			

10.7.6. LinkedTransferQueue

			
import java.util.concurrent.LinkedTransferQueue;

public class LinkedTransferQueueExample {
    public static void main(String[] args) throws InterruptedException {
        LinkedTransferQueue<Integer> queue = new LinkedTransferQueue<>();
        
        // 消费者线程(先启动,等待数据)
        new Thread(() -> {
            try {
                for (int i = 0; i < 5; i++) {
                    Integer item = queue.take(); // 等待生产者传递数据
                    System.out.println("Consumed: " + item);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }).start();
        
        // 生产者线程
        new Thread(() -> {
            try {
                for (int i = 0; i < 5; i++) {
                    queue.transfer(i); // 直接传递给消费者,若没有消费者则阻塞
                    System.out.println("Produced: " + i);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }).start();
    }
}
			
			

10.7.7. LinkedBlockingQueue LinkedTransferQueue ConcurrentLinkedQueue

			
特性		LinkedBlockingQueue	LinkedTransferQueue	ConcurrentLinkedQueue

队列边界	有界(可指定)	无界	无界
锁机制	分离读写锁	无锁(CAS)	无锁(CAS)
阻塞操作	支持 put/take	支持 transfer	不支持(立即返回)
直接传递	不支持	支持 transfer()	不支持
适用场景	生产者 - 消费者模型	即时传递、高性能	非阻塞、读多写少
性能建议

读多写少:优先使用 ConcurrentLinkedQueue(无锁,吞吐量高)。
生产者 - 消费者模型:优先使用 LinkedBlockingQueue(有界控制,平衡读写)。
即时传递:优先使用 LinkedTransferQueue(避免入队出队开销)。