1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
|
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class BlockingQueue<T> {
private final Queue<T> queue = new LinkedList();
private final int capacity;
private final ReentrantLock lock = new ReentrantLock();
//条件变量,用于通知队列非空。
//Condition 是 Java java.util.concurrent.locks 包中的一个接口,通常与 ReentrantLock 一起使用,提供了一种线程间通信的机制,允许线程在某些条件不满足时等待,直到被其他线程唤醒。
private final Condition notEmpty = lock.newCondition();
//条件变量,用于通知队列不满。
private final Condition notFull = lock.newCondition();
//构造函数
public BlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException("Capacity must be greater than 0");
this.capacity = capacity;
}
//put方法
public void put(T element) throws InterruptedException {
lock.lock();
try {
// 容量已满,通知等待队列等待
while (queue.size() == capacity) {
notFull.await();
}
queue.add(element);
// 通知等待队列非空的线程
notEmpty.signal();
} finally {
lock.unlock();
}
}
//get方法
public T take() throws InterruptedException {
lock.lock();
try {
// 等待队列非空
while (queue.isEmpty()) {
notEmpty.await();
}
T element = queue.poll();
// 通知等待队列不满的线程
notFull.signal();
return element;
} finally {
lock.unlock();
}
}
public int size() {
lock.lock();
try {
return queue.size();
} finally {
lock.unlock();
}
}
public boolean isEmpty() {
lock.lock();
try {
return queue.isEmpty();
} finally {
lock.unlock();
}
}
public boolean isFull() {
lock.lock();
try {
return queue.size() == capacity;
} finally {
lock.unlock();
}
}
}
|