Skip to content

Latest commit

 

History

History
336 lines (241 loc) · 8.03 KB

File metadata and controls

336 lines (241 loc) · 8.03 KB

生产者消费者模式实现

java 线程协作 wait notify 方式

package com.gaolv.demo;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;

public class Demo {


    public static Integer CAPACITY = 10;
    public static Random random = new Random(100);

    public static class Producer implements Runnable {

        List<String> data;

        public Producer(List<String> data) {

            this.data = data;
        }

        @Override
        public void run() {

            while (true) {

                synchronized (data) {
                    while (data.size() == CAPACITY) {
                        try {
                            data.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    int d = random.nextInt(100);
                    data.add(d + "");
                    System.out.println("生产了: " + d);
                    data.notify();
                }

            }

        }

    }


    public static class Consumer implements Runnable {

        List<String> data;

        public Consumer(List<String> data) {

            this.data = data;
        }

        @Override
        public void run() {

            while (true) {

                synchronized (data) {

                    if (data.size() == 0) {

                        try {
                            data.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }

                    }
                    String removed = data.remove(0);
                    System.out.println("消费了: " + removed);
                    data.notifyAll();

                }

            }

        }

    }

    public static void main(String[] args) {

        ArrayList<String> data = new ArrayList<>();
        new Thread(new Producer(data)).start();
        new Thread(new Consumer(data)).start();
    }

}

Condition 方式现实

public class ConditionDemo {

    public final Lock lock = new ReentrantLock();

    public final Condition fullCondition = lock.newCondition();
    public final Condition emptyCondition = lock.newCondition();

    public int CAPACITY = 100;

    public List<String> data;

    public ConditionDemo(List<String> data) {
        this.data = data;
    }

    /**
     * 生产
     */
    public void produce() {

        lock.lock();

        try {

            if (data.size() == CAPACITY) {
                fullCondition.await();
            }
            long d = Thread.currentThread().getId();
            data.add(d + "");
            System.out.println("生产了: " + d);

            emptyCondition.signalAll();

        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }

    }
    
    /**
     * 消费
     */
    public void consume() {

        lock.lock();
        try {

            if (data.size() == 0) {
                emptyCondition.await();
            }
            String d = data.remove(0);
            System.out.println("消费了: " + d);
            fullCondition.signalAll();

        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }

    }


    public static void main(String[] args) {
        
        ConditionDemo demo = new ConditionDemo(new ArrayList<>());

        IntStream.range(0, 30).forEach(x -> {
            new Thread(() -> {
                demo.produce();
            }).start();

            new Thread(() -> {
                demo.consume();
            }).start();
        });

    }
}

与 condition 相关知识,请参考 condition

BlockingQueue 方式实现

注意要用 BlockingQueue 的阻塞方法 put()take()

 public static void main(String[] args) throws InterruptedException {

        BlockingQueue queue = new ArrayBlockingQueue(100);

        //生产者
        Thread producer = new Thread(new Runnable() {
            @SneakyThrows
            @Override
            public void run() {

                for (int i = 1; i <= 100; i++) {

                    queue.put(i);
                    System.out.println("生产了:" + i);

                }
            }
        });

        //消费者
        Thread consumer = new Thread(new Runnable() {
            @SneakyThrows
            @Override
            public void run() {
                for (int i = 1; i <=100; i++) {
                    System.out.println("消费了:" + queue.take()
                    );
                }
            }
        });

        producer.start();
        consumer.start();


    }

Semaphore 方式

参考: Semaphore

import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Semaphore;
 
class Buffer {
 
    List<Integer> buffer = new LinkedList<Integer>();
    // 互斥量,控制buffer的互斥访问
    private Semaphore mutex = new Semaphore(1);
 
    // canProduceCount可以生产的数量(表示缓冲区可用的数量)。 通过生产者调用acquire,减少permit数目
    private Semaphore canProduceCount = new Semaphore(10);
 
    // canConsumerCount可以消费的数量。通过生产者调用release,增加permit数目
    private Semaphore canConsumerCount = new Semaphore(0);
    Random rn = new Random(10);
 
    public void get() throws InterruptedException {
        canConsumerCount.acquire();
        try {
            mutex.acquire();
            int val = buffer.remove(0);
            System.out
                    .println(Thread.currentThread().getName() + " 正在消费数据为:" + val + "    buffer目前大小为:" + buffer.size());
        } finally {
            mutex.release();
            canProduceCount.release();
        }
 
    }
 
    public void put() throws InterruptedException {
        canProduceCount.acquire();
        try {
            mutex.acquire();
            int val = rn.nextInt(10);
            buffer.add(val);
            System.out
                    .println(Thread.currentThread().getName() + " 正在生产数据为:" + val + "    buffer目前大小为:" + buffer.size());
        } finally {
            mutex.release();
            // 生产者调用release,增加可以消费的数量
            canConsumerCount.release();
        }
 
    }
}
 
public class SemaphoreProducerComsumer1 {
 
    public static void main(String[] args) {
        final Buffer buffer = new Buffer();
        startProducer(buffer);
        startProducer(buffer);
        startConsumer(buffer);
        startConsumer(buffer);
 
    }
 
    public static void startProducer(final Buffer buffer) {
        new Thread(new Runnable() {
 
            @Override
            public void run() {
                try {
                    while (true) {
                        buffer.put();
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
 
            }
        }).start();
    }
 
    public static void startConsumer(final Buffer buffer) {
        new Thread(new Runnable() {
 
            @Override
            public void run() {
                try {
                    while (true) {
                        buffer.get();
                    }
                } catch (Exception e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
 
            }
        }).start();
    }
 
}

参考