Java多线程–阻塞队列

2018年11月20日

如何实现一个有并发多任务访问的阻塞队列? 使用synchronized ,object.wait(),object.notifyall()

1、当调用put()方法时,如果此时容器的长度等于限定的最大长度,那么该方法需要阻塞直到队列可以有空间容纳下添加的元素

2、当调用take()方法时,如果此时容器的长度等于最小长度0,那么该方法需要阻塞直到队列中有了元素能够取出

3、put() 和 take()方法是需要协作的,能够及时通知状态进行插入和移除操作

public class MyQueue {
    //1、初始化容器
    private final LinkedList<Object> list = new LinkedList<>();
    //2、定义原子计数器 
    private AtomicInteger count = new AtomicInteger(0);

    //3、设定容器的上限和下限
    private final int minSize = 0;
    private final int maxSize;

    //4、构造器
    public MyQueue(int size) {
        this.maxSize = size;
    }

    //5、定义锁对象
    private final Object lock = new Object();

    //6、阻塞增加方法
    public void put(Object obj) {
        synchronized (lock) {
            while (count.get() == this.maxSize) {
                try {
                    lock.wait();//当前线程判断count.get()==this.maxSize时不让继续Put,在同步方法中执行 X.wait() 方法使当前线程进入阻塞状态(释放 X 对象的锁,进入 X 对象的等待池中)
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            //加入元素 计数器累加 唤醒取数线程可以取数
            list.add(obj);
            count.incrementAndGet();
            lock.notify();//当前线程同步方法逻辑执行完后,执行 X.notify() 方法,JVM 把 其他线程B唤醒,允许其他线程执行该Put同步方法
            System.out.println(Thread.currentThread().getName()+" add:" + obj);
        }
    }

    public Object take() {
        Object result = null;
        synchronized (lock) {
            while (count.get() == this.minSize) {
                try {
                    lock.wait();//当前线程判断count.get()==this.minSize时不让继续做take操作,在同步方法中执行 X.wait() 方法使当前线程进入阻塞状态
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            //移除元素 计数器递减 唤醒添加的线程可以添加元素
            result = list.removeFirst();
            count.decrementAndGet();
            lock.notify();//当前线程同步方法逻辑执行完后,执行 X.notify() 方法,JVM 把 其他线程C唤醒,允许其他线程执行Take同步操作
        }
        return result;
    }

    public int getSize() {
        return this.count.get();
    }   
}

并发多任务编程 在使用了阻塞队列后, 队列的生产者和消费者本身就不需要再单独考虑同步和线程间通信(同步,加锁)的问题,  编码会简单很多

(java.util.concurrent包下提供了若干个阻塞队列)

 public static void main(String[] args) {
        final MyQueue myQueue = new MyQueue(5);
        myQueue.put("a");
        myQueue.put("b");
        myQueue.put("c");
        myQueue.put("d");
        myQueue.put("e");

        System.out.println(Thread.currentThread().getName()+" 当前队列长度:" + myQueue.getSize());
        Thread t1 = new Thread(new Runnable() {
            @Override public void run() {
                myQueue.put("f");
                myQueue.put("g");
            }
        }, "t1");

        t1.start();

        Thread t2 = new Thread(new Runnable() {
            @Override public void run() {
                Object obj = myQueue.take();
                System.out.println(Thread.currentThread().getName()+" del:"+obj);
                Object obj2 = myQueue.take();
                System.out.println(Thread.currentThread().getName()+" del:"+obj2);
            }
        },"t2");

        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        t2.start();
    }

 

如果不使用非阻塞队列, 多线程的生产者和消费者的写法是怎样的?

缺点;

1.由于自己实现线程间通信,编码复杂很多;

2. 而且锁的粒度更大,是对象级别的锁,效率相对代码段的锁粒度要低

(PriorityQueue 是Java自带的非阻塞队列)

public class Test {
    private int queueSize = 10;
    private PriorityQueue<Integer> queue = new PriorityQueue<Integer>(queueSize);
     
    public static void main(String[] args)  {
        Test test = new Test();
        Producer producer = test.new Producer();
        Consumer consumer = test.new Consumer();
         
        producer.start();
        consumer.start();
    }
     
    class Consumer extends Thread{
         
        @Override
        public void run() {
            consume();
        }
         
        private void consume() {
            while(true){
                synchronized (queue) {//队列同步使用
                    while(queue.size() == 0){
                        try {
                            System.out.println("队列空,等待数据");
                            queue.wait();//查一下队列,现在为空,我不能消费, 自我阻塞
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    queue.poll();          //每次移走队首元素
                    queue.notify();//如果同步流程完成,释放锁,JVM通知其他线程来使用consume
                    System.out.println("从队列取走一个元素,队列剩余"+queue.size()+"个元素");
                }
            }
        }
    }
     
    class Producer extends Thread{
         
        @Override
        public void run() {
            produce();
        }
         
        private void produce() {
            while(true){
                synchronized (queue) {
                    while(queue.size() == queueSize){
                        try {
                            System.out.println("队列满,等待有空余空间");
                            queue.wait();//查一下队列,现在满了,我不能生产, 自我阻塞
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    queue.offer(1);        //每次插入一个元素
                    queue.notify();//如果同步流程完成,释放锁,JVM通知其他线程来使用produce
                    System.out.println("向队列取中插入一个元素,队列剩余空间:"+(queueSize-queue.size()));
                }
            }
        }
    }
}

 

 

没有评论

发表评论

邮箱地址不会被公开。 必填项已用*标注