【多线程】深入剖析生产者-消费者模型

💐个人主页:初晴~

📚相关专栏:多线程 / javaEE初阶


一、阻塞队列

阻塞队列是⼀种特殊的队列,也遵守 "先进先出" 的原则。是在普通的队列基础上做出了补充。
java标准库中的原有的队列Queue及其子类,默认都是线程不安全的,而阻塞队列能是⼀种线程安全的数据结构,具有阻塞特性
  • 当队列满的时候, 继续⼊队列就会阻塞, 直到有其他线程从队列中取⾛元素
  • 当队列空的时候, 继续出队列也会阻塞, 直到有其他线程往队列中插⼊元素
基于阻塞队列最大的应用场景就是“生产消费者模型”了。

二、生产者消费者模型

1、概念

比如一个生产线需要制作零件,组装两个步骤,这时就由两种方式进行生产部署:

1、每个人都分别进行零件制作和组装两个操作

2、将员工分成两拨人,一拨人专门负责制作零件,另一拨人专门负责组装

显然第一种方法是比较低效的,制作零件的机器数量是有限的,所有人就会去竞争这几台机器,可能会导致阻塞等待导致效率低下。而第二种方法则实现了各司其职,自然效率会更高,这时零件制作就相当于是生产者,组装就相当于是消费者,这种方式就被称之为“生产者消费者模型”,在后端开发中经常会涉及到。

2、作用

⽣产者消费者模式就是通过⼀个容器来解决⽣产者和消费者的强耦合问题
⽣产者和消费者彼此之间不直接通讯,⽽通过阻塞队列来进⾏通讯,所以⽣产者⽣产完数据之后不⽤等待消费者处理,直接扔给阻塞队列,消费者不找⽣产者要数据,⽽是直接从阻塞队列⾥取.
1. 阻塞队列能使⽣产者和消费者之间解耦合.
直接耦合的代码结构:
这样A与BC的耦合度非常高,出一点问题就可能导致程序崩溃,并且对于后期维护来说也十分不方便。
引入生产者消费者模型的代码结构:
这样显然A与B直接就解耦合了,不会直接关联。
注意:
阻塞队列是一种 数据结构,由于比较好用,会将其单独封装为一个服务器程序,并且在单独的服务器机器上进行部署。这时的阻塞队列就被称之为 “消息队列”(Messae Queue,MQ)了。而消息队列相对是比较成熟的,代码不会频繁修改,因此我们认为A与队列,B与队列之间的交互是 低耦合的。

2. 阻塞队列就相当于⼀个缓冲区,平衡了⽣产者和消费者的处理能⼒. (削峰填⾕)

⽐如在 "秒杀" 场景下, 服务器同⼀时刻可能会收到⼤量的⽀付请求. 如果直接处理这些⽀付请求, 服务器可能扛不住(每个⽀付请求的处理都需要⽐较复杂的流程). 这个时候就可以把这些请求都放到⼀个阻塞队列中, 然后再由消费者线程慢慢的来处理每个⽀付请求.
这样做可以有效进⾏ "削峰", 防⽌服务器被突然到来的⼀波请求直接冲垮.
(1)为什么一个服务器请求收到过多就有可能会崩溃?
一台服务器就是一台“电脑”,提供了一些 硬件资源(CPU、内存、硬盘、网络带宽……),服务器每次收到一个请求,在处理的过程中就需要执行一系列代码,执行代码的过程中就需要 消耗一定的硬件资源,而服务器的硬件资源是 有限的,当请求需要消耗的硬件资源的量 超过机器上限时,机器就会出现问题(卡死、程序崩溃……)
(2)当请求激增时,为什么A与消息队列不会崩溃?
A相当于是一个 “网关服务器”,负责接收客户端的请求,再把请求发给其它服务器。这样的服务器做的工作是比较简单的( 单纯的数据转发),处理同样一个请求消耗的硬件资源更少,在同等情况下能处理更多请求。消息队列中的程序也比较简单,消耗的资源也相对较少,因此他们都不太容易会崩溃。
而B是真正 处理业务逻辑的服务器,代码量更加庞大, 消耗的时间与硬件资源更多,当请求激增时就更容易崩溃了。
优点:
1、解耦合
2、削峰填谷
缺点:
1、需要更多的机器来部署消息队列
2、生产者与消费者之间的通信会延时,响应时间会变长

三、实现方式

1、手写一个阻塞队列

在之前的文章中,我们曾在 栈和队列 一文深入探讨过队列的写法,这里就不做过多赘述,直接看一下最基础的队列的代码实现:

java">class MyBlockingQueue{
    private String[] data=null;
    private int head=0;
    private int tail=0;
    private int size=0;
    public MyBlockingQueue(int capacity){
        data=new String[capacity];
    }

    public void put(String s){
        if(size==data.length){
            //队列满了
            return;
        }
        data[tail]=s;
        tail++;
        if(tail>=data.length){
            tail=0;
        }
        size++;
    }

    public String take(){
        if(size==0){
            //队列为空
            return null;
        }
        String ret=data[head];
        head++;
        if(head>=data.length){
            head=0;
        }
        size--;
        return ret;
    }
}

由于put与take方法中涉及了很多的修改操作,这样的代码在多线程环境下肯定是会有线程安全问题的,博主在 深入剖析线程安全问题 一文中做过详细分析。

那么该如何解决这一问题呢,显然就是通过加锁操作了:

但这样还是不够的。当线程发现队列为空时,不应该继续再去参与锁的竞争应该直接进入阻塞等待的状态,等到其它线程调用put方法,队列不为空时,再恢复执行,从而避免浪费不必要的资源,提高执行效率,这时就可以利用wait-notify来解决了。这在博主的 等待通知机制 一文过详细的介绍。于是最终代码实现就如下:

java">class MyBlockingQueue{
    private String[] data=null;
    private int head=0;
    private int tail=0;
    private int size=0;
    public MyBlockingQueue(int capacity){
        data=new String[capacity];
    }

    public void put(String s){
        synchronized (this){
            if(size==data.length){
                //队列满了
                return;
            }
            data[tail]=s;
            tail++;
            if(tail>=data.length){
                tail=0;
            }
            size++;
            this.notify();
        }
    }

    public String take() throws InterruptedException {
        synchronized (this){
            if(size==0){
                //队列为空
                this.wait();
            }
            String ret=data[head];
            head++;
            if(head>=data.length){
                head=0;
            }
            size--;
            return ret;
        }
    }
}

2、实现生产者消费者模型

在 Java 标准库中内置了阻塞队列。如果我们需要在⼀些程序中使⽤阻塞队列, 直接使⽤标准库中的BlockingQueue即可。
  • BlockingQueue 是⼀个接⼝. 真正实现的类LinkedBlockingQueue/ArrayBlockingQueue等
  • put ⽅法⽤于阻塞式的⼊队列, take ⽤于阻塞式的出队列
  • BlockingQueue 也有 offer, poll, peek 等⽅法, 但是这些⽅法不带有阻塞特性

使用示例:

java">public class Main {
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<String> queue=new ArrayBlockingQueue<>(3);
        queue.put("111");
        System.out.println("put 成功");
        queue.put("111");
        System.out.println("put 成功");
        
        queue.take();
        System.out.println("take 成功");
        queue.take();
        System.out.println("take 成功");
        queue.take();
        System.out.println("take 成功");
    }
}

我们可以看到当执行第三个take时,由于此时队列为空,因此线程就会进入阻塞状态了。

接着我们就可以用它来试着简单实现一个生产者消费者模型了:

java">public class Main{
    public static void main(String[] args) {
        BlockingQueue<Integer> queue=new ArrayBlockingQueue<>(1000);
        //生产者线程
        Thread t1=new Thread(()->{
            int i=1;
            while(true){
                try {
                    queue.put(i);
                    System.out.println("生产元素 "+ i);
                    i++;
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });
        
        //消费者线程
        Thread t2=new Thread(()->{
            while (true){
                try {
                    Integer i=queue.take();
                    System.out.println("消费元素 "+ i);
                    
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });
        
        t1.start();
        t2.start();
    }
}

其中t1就作为生产者线程,t2作为消费者线程,让我们试试看让生产慢于消费会发生什么:

这时我们得益于生产者消费者模型,由于生产会慢一些,消费者就会进入阻塞等待,每当生产者线程生产一个元素,便紧接着消费一个元素,从而形成了这种井然有序的执行结果。

让我们再试试消费慢于生产会发生什么:

我们可以看到虽然上传线程一下产生了非常多生产元素,但是由于阻塞队列的存在,消费线程依旧会不紧不慢的依次处理消费掉元素,这能有效防止服务器在高并发环境下面对激增的需求量无法执行而崩溃。

总结

生产者消费者模型可以有效地让程序解耦合,便于后续代码的维护和优化,并且可以有效地“削峰填谷”,在高并发场景下能有效缓解服务器压力,避免服务器崩溃。在以后的代码开发中是非常重要一种结构。


那么本篇文章就到此为止了,如果觉得这篇文章对你有帮助的话,可以点一下关注和点赞来支持作者哦。作者还是一个萌新,如果有什么讲的不对的地方欢迎在评论区指出,希望能够和你们一起进步✊


http://www.niftyadmin.cn/n/5641054.html

相关文章

Qt-常用控件

1.控件概述 Widget 是 Qt 中的核⼼概念.英⽂原义是"⼩部件", 我们此处也把它翻译为 “控件” .控件是构成⼀个图形化界⾯的基本要素. 像上述⽰例中的, 按钮, 列表视图, 树形视图, 单⾏输⼊框, 多⾏输⼊框, 滚动条, 下拉框等,都可以称为 “控件”. Qt作为⼀个成熟的G…

mysql用时间戳还是时间存储比较好

各自都有优缺点 在 MySQL 数据库中&#xff0c;选择使用时间戳&#xff08;TIMESTAMP&#xff09;还是时间&#xff08;DATETIME 或 DATETIME(6)&#xff09;来存储日期和时间信息&#xff0c;取决于你的具体需求和使用场景。以下是两者的一些特点和考虑因素&#xff1a; 1. …

京东API接口:商品详情页呈现商品的全网价格数据信息

如今&#xff0c;不少品牌企业为更好销售商品、塑造品牌&#xff0c;都开设了自己的自有商城。那么&#xff0c;对于这类自有商城平台&#xff0c;该如何做才能更好地提升商品的呈现效果呢&#xff1f;一个比较好的建议是&#xff0c;在电商商品详情界面上呈现出商品的全网价格…

开放式耳机和骨传导耳机哪个好?2024年开放式耳机排行榜10强

随着耳机市场的不断发展&#xff0c;开放式耳机和骨传导耳机逐渐成为两大热门选择。无论是追求高音质还是重视佩戴舒适度&#xff0c;消费者在选购耳机时都面临着一个重要问题&#xff1a;开放式耳机和骨传导耳机到底哪个更好&#xff1f;今天我们就来深入对比这两种耳机的优缺…

Leetcode Day21组合总和

39 元素可重复选 输入&#xff1a;candidates [2,3,6,7], target 7 输出&#xff1a;[[2,2,3],[7]] 可以重复选, 代表for j in range(start, n)中, 下一个dfs起点可以是j, 这样代表了重复选择, 但是如何保证不会死循环呢, 就需要利用都是正数的条件了 class Solution:def c…

BUUCTF—[网鼎杯 2020 朱雀组]phpweb

题解 打开题目是这样子的。 啥也不管抓个包看看&#xff0c;从它返回的信息判断出func后面的是要调用的函数&#xff0c;p后面的是要执行的内容。 那我们直接执行个系统命令看看&#xff0c;可以看到返回了hack&#xff0c;估计是做了过滤。 funcsystem&pls 直接读取源码…

【消息中间件】Kafka从入门到精通

1 Kafka入门 概念 架构 1.1 概述 1.1.1 初始Kafka Kafka是一个由Scala和Java语言开发的&#xff0c;经典高吞吐量的分布式消息发布和订阅系统&#xff0c;也是大数据技术领域中用作数据交换的核心组件之一。以高吞吐&#xff0c;低延迟&#xff0c;高伸缩&#xff0c;高可靠…

python进阶篇-day05-网络编程(TCP)与进程

day05网络编程 一. 网编三要素 ip 概述 设备(电脑, 手机, IPad, 耳机...)在网络中的唯一标识. 分类 按照 代数 划分: IPv4: 4字节, 十进制来表示, 例如: 192.168.13.157 IPv6: 8字节, 十六进制来表示, 理论上来讲, 可以让地球上的每一粒沙子都有自己的IP. Ipv4 常用类别划…