
答: 生产者消费者模型是一种经典的多线程同步协作模式, 通过一个缓冲器来实现生产者与消费者的解耦合, 生产者产出结果后不用再等待消费者出现, 而是直接放到缓冲器里面, 消费者也不从生产者那边拿结果, 而是直接从缓冲器里面取, 这里面的缓冲器就是阻塞队列

阻塞队列是可以循环使用的且不能扩容的, 我们把它设计成 循环队列 最核心的就是任务入队列 put 与 任务出队列 take
package test;
import java.util.concurrent.BlockingQueue;
/**
* Created with IntelliJ IDEA.
* Description:
* User: ran
* Date: 2025-08-03
* Time: 21:38
*/
public class MyBlockingQueue<T> {
private Object[] array ;
private static final int CAPACITY = 1000;
private static Object locker = new Object();
private int head;
private int tail;
private int size;
public MyBlockingQueue(int init) {
if (init <= 0) {
array = new Object[CAPACITY];
}else {
array = new Object[init];
}
}
// 循环队列
public void put(T task) throws InterruptedException {
synchronized (locker) {
while (size >= array.length) {
locker.wait();
}
array[tail++] = task;
tail = tail % array.length;
size++;
locker.notify();
}
}
public T take() throws InterruptedException {
synchronized (locker) {
while (size == 0) {
locker.wait();
}
T task = (T)array[head];
array[head++] = null;
head = head % array.length;
size--;
locker.notify();
return task;
}
}
public static void main(String[] args) {
MyBlockingQueue<Integer> blockingQueue = new MyBlockingQueue<Integer>(2000);
Thread thread1 = new Thread(() -> {
int count = 0;
while (true) {
try {
blockingQueue.put(count++);
System.out.println("生产元素: " + count);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
Thread thread2 = new Thread(() -> {
while (true) {
try {
int count = blockingQueue.take();
System.out.println("消费元素: " + count);
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
thread1.start();
thread2.start();
}
}

①:AbortPolicy(): 超过负荷, 直接抛出异常.

②:CallerRunsPolicy(): 调⽤者负责处理多出来的任务, 即哪个线程调用的线程池那么再把任务返还给它, 让它自己执行该任务

③:DiscardOldestPolicy(): 丢弃队列中最⽼的任务

④:DiscardPolicy(): 丢弃新来的任务.

线程池核心方法是 submit, 提交任务, 首先要先定义一个阻塞队列BlockingQueue, 定义submit方法向队列添加任务, 线程池的构造方法创建多个线程, 并且每个线程循环从队列中取出任务, 并执行任务

package test;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
/**
* Created with IntelliJ IDEA.
* Description:
* User: ran
* Date: 2025-08-03
* Time: 20:26
*/
public class MyThreadPool {
BlockingQueue<Runnable> blockingQueue ;
public MyThreadPool(int n) {
blockingQueue = new ArrayBlockingQueue<>(100);
for (int i = 0; i < n; i++) {
Thread thread = new Thread(() -> {
try {
while (true) {
Runnable task = blockingQueue.take();
task.run();
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
thread.start();
}
}
public void submit(Runnable task) throws InterruptedException {
blockingQueue.put(task);
}
public static void main(String[] args) throws InterruptedException {
MyThreadPool threadPool = new MyThreadPool(10);
for (int i = 0; i < 100; i++) {
int id = i;
threadPool.submit(() -> {
System.out.println("线程名: " + Thread.currentThread().getName() + " 任务ID: " + id);
});
}
}
}