如何实现一个有并发多任务访问的阻塞队列? 使用synchronized ,object.wait(),object.notifyall()
1、当调用put()方法时,如果此时容器的长度等于限定的最大长度,那么该方法需要阻塞直到队列可以有空间容纳下添加的元素
2、当调用take()方法时,如果此时容器的长度等于最小长度0,那么该方法需要阻塞直到队列中有了元素能够取出
3、put() 和 take()方法是需要协作的,能够及时通知状态进行插入和移除操作
public class MyQueue {
//1、初始化容器
private final LinkedList<Object> list = new LinkedList<>();
//2、定义原子计数器
private AtomicInteger count = new AtomicInteger(0);
//3、设定容器的上限和下限
private final int minSize = 0;
private final int maxSize;
//4、构造器
public MyQueue(int size) {
this.maxSize = size;
}
//5、定义锁对象
private final Object lock = new Object();
//6、阻塞增加方法
public void put(Object obj) {
synchronized (lock) {
while (count.get() == this.maxSize) {
try {
lock.wait();//当前线程判断count.get()==this.maxSize时不让继续Put,在同步方法中执行 X.wait() 方法使当前线程进入阻塞状态(释放 X 对象的锁,进入 X 对象的等待池中)
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//加入元素 计数器累加 唤醒取数线程可以取数
list.add(obj);
count.incrementAndGet();
lock.notify();//当前线程同步方法逻辑执行完后,执行 X.notify() 方法,JVM 把 其他线程B唤醒,允许其他线程执行该Put同步方法
System.out.println(Thread.currentThread().getName()+" add:" + obj);
}
}
public Object take() {
Object result = null;
synchronized (lock) {
while (count.get() == this.minSize) {
try {
lock.wait();//当前线程判断count.get()==this.minSize时不让继续做take操作,在同步方法中执行 X.wait() 方法使当前线程进入阻塞状态
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//移除元素 计数器递减 唤醒添加的线程可以添加元素
result = list.removeFirst();
count.decrementAndGet();
lock.notify();//当前线程同步方法逻辑执行完后,执行 X.notify() 方法,JVM 把 其他线程C唤醒,允许其他线程执行Take同步操作
}
return result;
}
public int getSize() {
return this.count.get();
}
}
并发多任务编程 在使用了阻塞队列后, 队列的生产者和消费者本身就不需要再单独考虑同步和线程间通信(同步,加锁)的问题, 编码会简单很多
(java.util.concurrent包下提供了若干个阻塞队列)
public static void main(String[] args) {
final MyQueue myQueue = new MyQueue(5);
myQueue.put("a");
myQueue.put("b");
myQueue.put("c");
myQueue.put("d");
myQueue.put("e");
System.out.println(Thread.currentThread().getName()+" 当前队列长度:" + myQueue.getSize());
Thread t1 = new Thread(new Runnable() {
@Override public void run() {
myQueue.put("f");
myQueue.put("g");
}
}, "t1");
t1.start();
Thread t2 = new Thread(new Runnable() {
@Override public void run() {
Object obj = myQueue.take();
System.out.println(Thread.currentThread().getName()+" del:"+obj);
Object obj2 = myQueue.take();
System.out.println(Thread.currentThread().getName()+" del:"+obj2);
}
},"t2");
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
t2.start();
}
如果不使用非阻塞队列, 多线程的生产者和消费者的写法是怎样的?
缺点;
1.由于自己实现线程间通信,编码复杂很多;
2. 而且锁的粒度更大,是对象级别的锁,效率相对代码段的锁粒度要低
(PriorityQueue 是Java自带的非阻塞队列)
public class Test {
private int queueSize = 10;
private PriorityQueue<Integer> queue = new PriorityQueue<Integer>(queueSize);
public static void main(String[] args) {
Test test = new Test();
Producer producer = test.new Producer();
Consumer consumer = test.new Consumer();
producer.start();
consumer.start();
}
class Consumer extends Thread{
@Override
public void run() {
consume();
}
private void consume() {
while(true){
synchronized (queue) {//队列同步使用
while(queue.size() == 0){
try {
System.out.println("队列空,等待数据");
queue.wait();//查一下队列,现在为空,我不能消费, 自我阻塞
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.poll(); //每次移走队首元素
queue.notify();//如果同步流程完成,释放锁,JVM通知其他线程来使用consume
System.out.println("从队列取走一个元素,队列剩余"+queue.size()+"个元素");
}
}
}
}
class Producer extends Thread{
@Override
public void run() {
produce();
}
private void produce() {
while(true){
synchronized (queue) {
while(queue.size() == queueSize){
try {
System.out.println("队列满,等待有空余空间");
queue.wait();//查一下队列,现在满了,我不能生产, 自我阻塞
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.offer(1); //每次插入一个元素
queue.notify();//如果同步流程完成,释放锁,JVM通知其他线程来使用produce
System.out.println("向队列取中插入一个元素,队列剩余空间:"+(queueSize-queue.size()));
}
}
}
}
}
没有评论