生产者模式 常用方式
使用 BlockingQueue 使用 BlockingQueue 核心是多个生产者和消费者共用一个 queue 生产者去阻塞 offer (offer 可以添加超时机制,一直无人消费就丢弃), 消费者去 take
使用 ReentrantLock 配合 Condition 消费者与生产者共享一把 ReentrantLock 然后使用不同的 Condition 作为触发条件,消费者在去消费的时候发现数据为空 则使用 empty Condition 发信号,并且在 full Condition await,反之亦然

使用 Condition 的用法



package com.xpj.javagrowth.produce;

/**
 * author : xpj
 * date : 8/31/21 4:41 PM
 * description :
 */


import java.util.List;
import java.util.Random;

public class TestProduceAndConsumer {

    public static void PRTMsg(String msg) {
        System.out.println("CURRENT " + Thread.currentThread().getName() +
                " TIME " + System.currentTimeMillis() + " THREAD ID: " + Thread.currentThread().getId()
                + " MSG -> " + msg);
    }

    /**
     * 消费者
     */
    public static class Consumer implements Runnable {
        private List<PCData> queue;

        public Consumer(List<PCData> queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            try {
                while (!Thread.currentThread().isInterrupted()) {
                    PCData data = null;
                    // 核心是这里都需要先加锁
                    Main.INSTANCE.getLock().lock();
                    PRTMsg("++++消费者ID: size " + queue.size());
                    // 这里确认 queue 里面没有数据了,那么就会给 empty condition 发送信号,说我没有数据了,
                    // 来生产呀
                    if (queue.size() == 0) {
                        Main.INSTANCE.getEmpty().signal();
                        Main.INSTANCE.getFull().await();
                    }
                    PRTMsg("++++++++++++消费者ID AFTER:------");
                    Thread.sleep(300);
                    data = queue.remove(0);
                    // 处理完毕之后释放锁
                    Main.INSTANCE.getLock().unlock();
                    PRTMsg(">>>>>>>>>>>>>消费者ID:消费了:" + data.getData());
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

    }


    /**
     * 生产者
     */
    public static class Producter implements Runnable {
        private List<PCData> queue;
        private int len;

        public Producter(List<PCData> queue, int len) {
            this.queue = queue;
            this.len = len;
        }

        @Override
        public void run() {
            try {
                while (!Thread.currentThread().isInterrupted()) {
                    Random r = new Random();
                    PCData data = new PCData();
                    data.setData(r.nextInt(500));
                    Main.INSTANCE.getLock().lock();
                    PRTMsg("生产者ID: size : " + queue.size());
                    // 增加这个是为了 queue 里面有数据就通知 消费者,不积压数据
                    if (queue.size() != 0) {
                        Main.INSTANCE.getFull().signal();
                        PRTMsg("NOTIFY +++++ 生产者ID: size : " + queue.size());
                    }
                    // 这里数据达到上限了会通知 full condition 已经满了,来消费吧,同时 empty 等待
                    if (queue.size() >= len) {
                        Main.INSTANCE.getFull().signal();
                        Main.INSTANCE.getEmpty().await();
                    }
                    Thread.sleep(330);
                    queue.add(data);
                    Main.INSTANCE.getLock().unlock();
                    PRTMsg("<<<<<<生产者ID:  生产了:" + data.getData());
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

    }


    public static class PCData {
        private int data;

        public int getData() {
            return data;
        }

        public void setData(int data) {
            this.data = data;
        }
    }
}


使用 BlockingQueue 的生产者与消费者模式

package com.xpj.javagrowth.produce

import java.lang.Exception
import java.util.concurrent.BlockingQueue
import java.util.concurrent.TimeUnit
import kotlin.random.Random

/**
 * author : xpj
 * date : 8/31/21 6:44 PM
 * description :
 */
class TestProduceUseBlock {

}

class Produce(val blockQueue: BlockingQueue<TestProduceAndConsumer.PCData>) : Runnable {
    override fun run() {
        var data: TestProduceAndConsumer.PCData?
        val random = Random(System.currentTimeMillis())
        try {
            while (!Thread.currentThread().isInterrupted) {
                Thread.sleep(300)
                data = TestProduceAndConsumer.PCData()
                data.data = random.nextInt(10000)
                // 这里尝试插入数据到 array blockingqueue 中,如果此时满了就超时 1s ,之后如果还插入不了就失败
                if (!blockQueue.offer(data, 1, TimeUnit.SECONDS)) {
                    TestProduceAndConsumer.PRTMsg("insert data fail. ${blockQueue.size}")
                } else {
                    TestProduceAndConsumer.PRTMsg(" insert data success ${data.data}")
                }
            }
        } catch (e: Exception) {
            e.printStackTrace()
        }
    }

}

class Consumer1(val blockQueue: BlockingQueue<TestProduceAndConsumer.PCData>) : Runnable {
    override fun run() {
        try {
            while (!Thread.currentThread().isInterrupted) {
                // 每 900ms 消费一条数据,这里是为了营造生产者过量生产的场景,
                Thread.sleep(900)
                // take 之后 queue 会 size 减一
                val data = blockQueue.take()
                if (data != null) {
                    TestProduceAndConsumer.PRTMsg(" consumer1 take value is ${data.data}")
                }
            }
        } catch (e: Exception) {
            e.printStackTrace()
        }
    }
}

测试驱动代码

package com.xpj.javagrowth.produce

import com.xpj.javagrowth.produce.TestProduceAndConsumer.*
import java.util.*
import java.util.concurrent.ArrayBlockingQueue
import java.util.concurrent.Executors
import java.util.concurrent.locks.ReentrantLock
import kotlin.system.exitProcess

/**
 * author : xpj
 * date : 8/31/21 4:43 PM
 * description :
 */

object Main {
    var lock = ReentrantLock()
    var empty = lock.newCondition()
    var full = lock.newCondition()
    @JvmStatic
    fun main(args: Array<String>) {
        if (true) {
            return main1()
        }
        val queue: List<PCData> = ArrayList()
        val length = 10
        val p1 = Producter(queue, length)
        val p2 = Producter(queue, length)
        val p3 = Producter(queue, length)
        val c1 = Consumer(queue)
        val c2 = Consumer(queue)
//        val c3 = Consumer(queue)
        // 使用这种方式生成的 ThreadPool 会持续的创建新的线程
        val service = Executors.newCachedThreadPool()
        service.execute(p1)
        service.execute(p2)
        service.execute(p3)
        service.execute(c1)
        service.execute(c2)
        val scanner = Scanner(System.`in`)
        while (true) {
            val quit = scanner.nextInt()
            if (quit == 1) {
                exitProcess(1)
            }
        }

    }

    fun main1() {
        PRTMsg("Test use blocking queue")
        val queue = ArrayBlockingQueue<PCData>(10)
        val p1 = Produce(queue)
        val p2 = Produce(queue)
        val p3 = Produce(queue)
        val c1 = Consumer1(queue)
        val c2 = Consumer1(queue)
        val service = Executors.newCachedThreadPool()
        service.execute(p1)
        service.execute(p2)
        service.execute(p3)
        service.execute(c1)
        service.execute(c2)
        val scanner = Scanner(System.`in`)
        while (true) {
            val quit = scanner.nextInt()
            if (quit == 1) {
                exitProcess(1)
            }
        }
    }
}

1 2
hello : world h: d
hello baby