RabbitMQ——使用Exchange中的fanout交换机实现消息发送和接收

文章目录:

1.写在前面

2.使用fanout交换机实现消息的发送和接收

2.1 编写消息接收类(有两个)

2.2 编写消息发送类


1.写在前面

所有 MQ 产品从模型抽象上来说都是一样的过程:
消费者(consumer)订阅某个队列。生产者(producer)创建消息,然后发布到队列(queue)中,最后将消息发送到监听的消费者。

上面是MQ的基本抽象模型,但是不同的MQ产品有有者不同的机制,RabbitMQ实际基于AMQP协议的一个开源实现,因此RabbitMQ内部也是AMQP的基本概念。

RabbitMQ的内部接收如下:

1、Message
消息,消息是不具体的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。

2、Publisher
消息的生产者,也是一个向交换器发布消息的客户端应用程序。

3、Exchange
交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。

4、Binding
绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。

5、Queue
消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。

6、Connection
网络连接,比如一个TCP连接。

7、Channel
信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。

8、Consumer
消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。

9、Virtual Host
虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。

10、Broker
表示消息队列服务器实体。


2.使用fanout交换机实现消息的发送和接收

每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去。fanout 交换器不处理路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。fanout 类型转发消息是最快的。

注意:fanout模式的消息需要将一个消息同时绑定到多个队列中因此这里不能创建并指定某个队列。

注意:

1、使用fanout模式获取消息时不需要绑定特定的队列名称,只需使用channel.queueDeclare().getQueue();获取一个随机的队列名称,然后绑定到指定的Exchange即可获取消息。

2、这种模式中可以同时启动多个接收者只要都绑定到同一个Exchang即可让所有接收者同时接收同一个消息是一种广播的消息机制

Fanout交换机中是一种广播模式,消息是一对多的。这种模式种,没有RoutingKey以及BindingKey的概念,Bindings只是简单的将消息与交换机进行了绑定,如果消息进入了交换机中,那么这个消息会被转发到所有与当前交换机进行绑定的所有队列中。这种模式就像我们收看电视或者电台直播一样,必须要先打开消息接收者来监听队列(就像要先打开电视等待节目开始),这个时候只要有消息发送过来,那么所有的监听者都可以收到消息;如果没有提前监听队列,那么一旦消息发送了,消息接收者就可能错过这条消息。

也就是Fanout交换机模式下,它是会丢失数据的,但是它的速度是最快的。


2.1 编写消息接收类(有两个)

package com.szh.rabbitmq.exchange.fanout;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 *
 */
public class Receive01 {
    public static void main(String[] args) {
        ConnectionFactory factory=new ConnectionFactory();
        factory.setHost("192.168.40.130");
        factory.setPort(5672);
        factory.setUsername("root");
        factory.setPassword("root");

        Connection connection=null;
        Channel channel=null;

        try {
            connection=factory.newConnection();
            channel=connection.createChannel();

            /**
             * 由于Fanout类型的交换机的消息模式类似于广播模式,它不需要绑定RoutingKey
             * 又有可能会有很多个消费者来接收这个交换机中的数据,因此我们创建队列时,要创建一个随机的队列名称
             *
             * queueDeclare()方法会创建一个随机名称的一个队列,非持久化,排外的(最多允许一个消费者监听该队列)
             * 同时也是自动删除的,当没有消费者监听这个队列时,它会自动删除
             *
             * getQueue()方法用于获取这个随机队列的名称
             */
            String queueName=channel.queueDeclare().getQueue();
            channel.exchangeDeclare("fanoutExchange","fanout",true);
            channel.queueBind(queueName,"fanoutExchange","");
            //监听某个消息队列,同时获取消息队列中的数据
            channel.basicConsume(queueName,true,"",new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message=new String(body);
                    System.out.println("Receive01-消息接收成功:" + message);
                }
            });
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}
package com.szh.rabbitmq.exchange.fanout;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 *
 */
public class Receive02 {
    public static void main(String[] args) {
        ConnectionFactory factory=new ConnectionFactory();
        factory.setHost("192.168.40.130");
        factory.setPort(5672);
        factory.setUsername("root");
        factory.setPassword("root");

        Connection connection=null;
        Channel channel=null;

        try {
            connection=factory.newConnection();
            channel=connection.createChannel();

            /**
             * 由于Fanout类型的交换机的消息模式类似于广播模式,它不需要绑定RoutingKey
             * 又有可能会有很多个消费者来接收这个交换机中的数据,因此我们创建队列时,要创建一个随机的队列名称
             *
             * queueDeclare()方法会创建一个随机名称的一个队列,非持久化,排外的(最多允许一个消费者监听该队列)
             * 同时也是自动删除的,当没有消费者监听这个队列时,它会自动删除
             *
             * getQueue()方法用于获取这个随机队列的名称
             */
            String queueName=channel.queueDeclare().getQueue();
            channel.exchangeDeclare("fanoutExchange","fanout",true);
            channel.queueBind(queueName,"fanoutExchange","");
            //监听某个消息队列,同时获取消息队列中的数据
            channel.basicConsume(queueName,true,"",new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message=new String(body);
                    System.out.println("Receive02-消息接收成功:" + message);
                }
            });
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}

此时直接运行这两个消息接收者,使得它们俩一直处于对消息队列的监听状态,一旦有消息发送,则会立刻接收到消息。

运行之后,在RabbitMQ的管理界面可以看到Queues中,会生成两个随机名称的消息队列。

2.2 编写消息发送类

package com.szh.rabbitmq.exchange.fanout;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

/**
 *
 */
public class Send {
    public static void main(String[] args) {
        ConnectionFactory factory=new ConnectionFactory();
        factory.setHost("192.168.40.130");
        factory.setPort(5672);
        factory.setUsername("root");
        factory.setPassword("root");

        Connection connection=null;
        Channel channel=null;

        try {
            connection=factory.newConnection();
            channel=connection.createChannel();

            /**
             * 由于使用Fanout类型的交换机,因此消息的接收方可能会有多个,因此不建议在消息发送时,创建队列
             * 同时也不建议将该队列绑定到fanout交换机中,因为一旦绑定了一个队列,那么其他队列将无法获得消息
             * 但是发送消息时,至少应该确保交换机存在
             */
//            channel.queueDeclare("myDirectQueue",true,false,false,null);
            channel.exchangeDeclare("fanoutExchange","fanout",true);
//            channel.queueBind("myDirectQueue","directExchange","directRoutingKey");

            String message="Exchange的fanout消息绑定";
            channel.basicPublish("fanoutExchange","",null,message.getBytes(StandardCharsets.UTF_8));
            System.out.println("消息发送成功:" + message);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        } finally {
            if (channel != null) {
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }
            if (connection != null ) {
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

运行消息发送者即可,可以看到两个消息接收者都正确无误的接收到了数据。

 

热门文章

暂无图片
编程学习 ·

那些年让我们目瞪口呆的bug

程序员一生与bug奋战,可谓是杀敌无数,见怪不怪了!在某知识社交平台中,一个“有哪些让程序员目瞪口呆的bug”的话题引来了6700多万的阅读,可见程序员们对一个话题的敏感度有多高。 1、麻省理工“只能发500英里的邮件” …
暂无图片
编程学习 ·

redis的下载与安装

下载redis wget http://download.redis.io/releases/redis-5.0.0.tar.gz解压redis tar -zxvf redis-5.0.0.tar.gz编译 make安装 make install快链方便进入redis ln -s redis-5.0.0 redis
暂无图片
编程学习 ·

《大话数据结构》第三章学习笔记--线性表(一)

线性表的定义 线性表:零个或多个数据元素的有限序列。 线性表元素的个数n定义为线性表的长度。n为0时,为空表。 在比较复杂的线性表中,一个数据元素可以由若干个数据项组成。 线性表的存储结构 顺序存储结构 可以用C语言中的一维数组来…
暂无图片
编程学习 ·

对象的扩展

文章目录对象的扩展属性的简洁表示法属性名表达式方法的name属性属性的可枚举性和遍历可枚举性属性的遍历super关键字对象的扩展运算符解构赋值扩展运算符AggregateError错误对象对象的扩展 属性的简洁表示法 const foo bar; const baz {foo}; baz // {foo: "bar"…
暂无图片
编程学习 ·

让程序员最头疼的5种编程语言

世界上的编程语言,按照其应用领域,可以粗略地分成三类。 有的语言是多面手,在很多不同的领域都能派上用场。大家学过的编程语言很多都属于这一类,比如说 C,Java, Python。 有的语言专注于某一特定的领域&…
暂无图片
编程学习 ·

写论文注意事项

参考链接 给研究生修改了一篇论文后,该985博导几近崩溃…… 重点分析 摘要与结论几乎重合 这一条是我见过研究生论文中最常出现的事情,很多情况下,他们论文中摘要部分与结论部分重复率超过70%。对于摘要而言,首先要用一小句话引…
暂无图片
编程学习 ·

安卓 串口开发

上图: 上码: 在APP grable添加 // 串口 需要配合在项目build.gradle中的repositories添加 maven {url "https://jitpack.io" }implementation com.github.licheedev.Android-SerialPort-API:serialport:1.0.1implementation com.jakewhart…
暂无图片
编程学习 ·

2021-2027年中国铪市场调研与发展趋势分析报告

2021-2027年中国铪市场调研与发展趋势分析报告 本报告研究中国市场铪的生产、消费及进出口情况,重点关注在中国市场扮演重要角色的全球及本土铪生产商,呈现这些厂商在中国市场的铪销量、收入、价格、毛利率、市场份额等关键指标。此外,针对…
暂无图片
编程学习 ·

Aggressive cows题目翻译

描述&#xff1a; Farmer John has built a new long barn, with N (2 < N < 100,000) stalls.&#xff08;John农民已经新建了一个长畜棚带有N&#xff08;2<N<100000&#xff09;个牛棚&#xff09; The stalls are located along a straight line at positions…
暂无图片
编程学习 ·

剖析组建PMO的6个大坑︱PMO深度实践

随着事业环境因素的不断纷繁演进&#xff0c;项目时代正在悄悄来临。设立项目经理转岗、要求PMP等项目管理证书已是基操&#xff0c;越来越多的组织开始组建PMO团队&#xff0c;大有曾经公司纷纷建造中台的气质&#xff08;当然两者的本质并不相同&#xff0c;只是说明这个趋势…
暂无图片
编程学习 ·

Flowable入门系列文章118 - 进程实例 07

1、获取流程实例的变量 GET运行时/进程实例/ {processInstanceId} /变量/ {变量名} 表1.获取流程实例的变量 - URL参数 参数需要值描述processInstanceId是串将流程实例的id添加到变量中。变量名是串要获取的变量的名称。 表2.获取流程实例的变量 - 响应代码 响应码描述200指…
暂无图片
编程学习 ·

微信每天自动给女[男]朋友发早安和土味情话

微信通知&#xff0c;每天给女朋友发早安、情话、诗句、天气信息等~ 前言 之前逛GitHub的时候发现了一个自动签到的小工具&#xff0c;b站、掘金等都可以&#xff0c;我看了下源码发现也是很简洁&#xff0c;也尝试用了一下&#xff0c;配置也都很简单&#xff0c;主要是他有一…
暂无图片
编程学习 ·

C语言二分查找详解

二分查找是一种知名度很高的查找算法&#xff0c;在对有序数列进行查找时效率远高于传统的顺序查找。 下面这张动图对比了二者的效率差距。 二分查找的基本思想就是通过把目标数和当前数列的中间数进行比较&#xff0c;从而确定目标数是在中间数的左边还是右边&#xff0c;将查…
暂无图片
编程学习 ·

项目经理,你有什么优势吗?

大侠被一个问题问住了&#xff1a;你和别人比&#xff0c;你的优势是什么呢? 大侠听到这个问题后&#xff0c;脱口而出道&#xff1a;“项目管理能力和经验啊。” 听者抬头看了一下大侠&#xff0c;显然听者对大侠的这个回答不是很满意&#xff0c;但也没有继续追问。 大侠回家…
暂无图片
编程学习 ·

nginx的负载均衡和故障转移

#注&#xff1a;proxy_temp_path和proxy_cache_path指定的路径必须在同一分区 proxy_temp_path /data0/proxy_temp_dir; #设置Web缓存区名称为cache_one&#xff0c;内存缓存空间大小为200MB&#xff0c;1天没有被访问的内容自动清除&#xff0c;硬盘缓存空间大小为30GB。 pro…
暂无图片
编程学习 ·

业务逻辑漏洞

身份认证安全 绕过身份认证的几种方法 暴力破解 测试方法∶在没有验证码限制或者一次验证码可以多次使用的地方&#xff0c;可以分为以下几种情况︰ (1)爆破用户名。当输入的用户名不存在时&#xff0c;会显示请输入正确用户名&#xff0c;或者用户名不存在 (2)已知用户名。…