Dotnet Core 6.0 使用RabbitMQ

faith team

1.简介

RabbitMQ是一个开源的,基于AMQP(Advanced Message Queuing Protocol)协议的完整的可复用的企业级消息队,RabbitMQ可以实现点对点,发布订阅等消息处理模式。

RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持Linux,windows,macOS,FreeBSD等操作系统,同时也支持很多语言,如:Python,Java,Ruby,PHP,C#,JavaScript,Go,Elixir,Objective-C,Swift等。

当今市面上有很多主流的消息中间件,如老牌的ActiveMQ、RabbitMQ,炙手可热的Kafka,阿里巴巴自主开发RocketMQ等。

1.1 不同MQ特点

  • ActiveMQ
    是Apache出品,最流行的,能力强劲的开源消息总线。它是一个完全支持JMS规范的的消息中间件。丰富的API,多种集群架构模式让ActiveMQ在业界成为老牌的消息中间件,在中小型企业颇受欢迎!
  • Kafka
    是LinkedIn开源的分布式发布-订阅消息系统,目前归属于Apache顶级项目。Kafka主要特点是基于Pull的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输。0.8版本开始支持复制,不支持事务,对消息的重复、丢失、错误没有严格要求,适合产生大量数据的互联网服务的数据收集业务。
  • RocketMQ
    是阿里开源的消息中间件,它是纯Java开发,具有高吞吐量、高可用性、适合大规模分布式系统应用的特点。RocketMQ思路起源于Kafka,但并不是Kafka的一个Copy,它对消息的可靠传输及事务性做了优化,目前在阿里集团被广泛应用于交易、充值、流计算、消息推送、日志流式处理、binglog分发等场景。
  • RabbitMQ
    RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。AMQP协议更多用在企业系统内对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。

RabbitMQ比Kafka可靠,Kafka更适合IO高吞吐的处理,一般应用在大数据日志处理或对实时性(少量延迟),可靠性(少量丢数据)要求稍低的场景使用,比如ELK日志收集。

1.2 RabbitMQ的工作机制:

首先要知道RabbitMQ的三种角色:生产者消费者消息服务器

  • 生产者:消息的创建者,负责创建和推送消息到消息服务器
  • 消费者:消息的接收方,接受消息并处理消息
  • 消息服务器:其实RabbitMQ本身,不会产生和消费消息,相当于一个中转站,将生产者的消息路由给消费者

1.3 RabbitMQ的一些角色

ConnectionFactory:连接管理,应用程序或消费方与RabbitMQ建立连接的管理器
Channel:信道,推送消息的通道
Exchange:交换机,用于接收分配消息到队列中
Queue:保存消息
Routingkey:消息会携带routingKey,决定消息最终的队列
BindingKey:Queue通过bindingKey与交换机绑定

2. docker-compose安装rabbitMq

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
version: '3.1'
services:
rabbitmq:
restart: always
image: rabbitmq:management
container_name: rabbitmq
ports:
- 5672:5672
- 15672:15672
environment:
TZ: Asia/Shanghai
RABBITMQ_DEFAULT_USER: faith
RABBITMQ_DEFAULT_PASS: 123456
volumes:
- ./data:/var/lib/rabbitmq

3. 后台页面解析

3.1 15672管理界面主页

Test

3.2 admin页面

可以用于创建新用户
Test

  • 超级管理员(administrator)
    可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。
  • 监控者(monitoring)
    可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)
  • 策略制定者(policymaker)
    可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。
  • 普通管理者(management)
    仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。
  • 其他
    无法登陆管理控制台,通常就是普通的生产者和消费者。

3.3 创建虚拟主机

Test
为了让各个用户可以互不干扰的工作,RabbitMQ添加了虚拟主机(Virtual Hosts)的概念。其实就是一个独立的访问路径,不同用户使用不同路径,各自有自己的队列、交换机,互相不会影响。

4 Dotnet Core中使用RabbitMq

4.1 安装rabbitMq包

1
install-package rabbitmq.client

4.2 生产者代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
// See https://aka.ms/new-console-template for more information

using System.Text;
using RabbitMQ.Client;

Console.WriteLine("生产者客户端");

//创建连接工厂对象
var factory = new ConnectionFactory()
{
HostName = "47.92.157.215", //远程rabbit客户端
Port = 5267, //AMQP协议端口
UserName = "faith",//账户
Password = "123456"//密码
};

//连接rabbitMq
// 创建连接对象
var connectionAsync =await factory.CreateConnectionAsync();

// 创建连接会话对象
var channelAsync =await connectionAsync.CreateChannelAsync();

//创建队列 (先进先出)
/*#nullable disable
Task<QueueDeclareOk> QueueDeclareAsync(
this IChannel channel,
string queue = "", //队列名称
bool durable = false,//是否持久化,true持久化,队列会保存磁盘,服务器重启时可以保证不丢失相关信息
bool exclusive = true,//是否排他,如果一个队列声明为排他队列,该队列仅对时候次声明它的连接可见,并在连接断开时自动删除
bool autoDelete = true,// 是否自动删除,自动删除的前提是:至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除
IDictionary<string, object> arguments = null, // 设置队列的其他参数
bool noWait = false)
{
return channel.QueueDeclareAsync(queue, durable, exclusive, autoDelete, arguments, noWait: noWait);
}*/
await channelAsync.QueueDeclareAsync(
queue:"faithQueue",
durable: true,
exclusive:false,
autoDelete:false,
arguments: null
);
//模拟发送队列
string str = String.Empty;
do
{
Console.WriteLine("发送内容");
str = Console.ReadLine()!;
//需要将消息转成byte类型
var body = Encoding.UTF8.GetBytes(str);
//发送消息到通道
await channelAsync.BasicPublishAsync("", "faithQueue", body);
}while(str.Trim().ToLower() != "exit");
await channelAsync.CloseAsync();
await connectionAsync.CloseAsync();

总结

  • 使用了 IConnectionFactory, IConnection和IModel 来创建链接和通信管道
  • IConnection 实例对象只负责与 Rabbit 的连接,而发送接收这些实际操作全部由会话通道进行
  • 使用 QueneDeclare 方法进行创建消息队列,创建完成后可以在 RabbitMQ 的管理工具中看到此队列
  • QueneDelare 方法需要一个消息队列名称的必须参数.后面那些参数则代表缓存,参数等信息
  • 使用 BasicPublish 来发送消息,在一对一中 routingKey 必须和 queueName 一致。

4.3 消费者代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// See https://aka.ms/new-console-template for more information

using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

Console.WriteLine("消费者客户端");

//创建连接工厂对象
var factory = new ConnectionFactory()
{
HostName = "47.92.157.215", //远程rabbit客户端
Port = 5267, //AMQP协议端口
UserName = "revice-user",//账户
Password = "654321"//密码
};

//连接rabbitMq
// 创建连接对象
var connectionAsync =await factory.CreateConnectionAsync();

// 创建连接会话对象
var channelAsync =await connectionAsync.CreateChannelAsync();

string queueName = "queue1";
//声明一个队列
await channelAsync.QueueDeclareAsync(
queue: queueName,//消息队列名称
durable: false,//是否持久化,true持久化,队列会保存磁盘,服务器重启时可以保证不丢失相关信息。
exclusive: false,//是否排他,true排他的,如果一个队列声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除.
autoDelete: false,//是否自动删除。true是自动删除。自动删除的前提是:致少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除.
arguments: null ////设置队列的一些其它参数
);

//创建消费者对象
var consumer = new EventingBasicConsumer(channelAsync);

consumer.Received += (model,ea) =>
{
//接受队列
byte[] message = ea.Body.ToArray();
Console.WriteLine("接收到的消息为:" + Encoding.UTF8.GetString(message));
};
// 消费者开启监听
await channelAsync.BasicConsumeAsync(queueName, true, consumer);
Console.ReadKey();
channelAsync.Dispose();
connectionAsync.CloseAsync();

总结

  • 接收者中是定义一个EventingBasicConsumer对象的消费者(接收者),这个消费者与会话对象关联,
  • 定义接收事件,输出从消息队列中接收的数据,
  • 最后使用会话对象的BasicConsume方法来启动消费者监听。

4.4 运行代码

Test
Test

4.5 模拟宕机检查轮询模式

RabbitMQ会顺序的将message发给下一个消费者。每个消费者会得到平均数量的message。这种方式称之为round-robin(轮询),但是很多情况下并不希望消息平均分配,而是要消费快的多消费,消费少的少消费。还有很多情况下一旦其中一个宕机,那么另外接收者的无法接收原本这个接收者所要接收的数据。

Test

1
2
3
4
5
6
7
consumer.Received += (model,ea) =>
{
Thread.Sleep(3000);
//接受队列
byte[] message = ea.Body.ToArray();
Console.WriteLine("接收到的消息为:" + Encoding.UTF8.GetString(message));
};

增加3s间隔 模拟宕机
Test
如图上9数字丢失解决这个问题得方法就是改变其消息确认模式

4.5.1 Rabbit中存在两种消息确认模式

  • 自动模式 - 只要消息从队列获取,无论消费者获取到消息后是否成功消费,都认为是消息成功消费.
  • 手动模式 - 消费从队列中获取消息后,服务器会将该消息处于不可用状态,等待消费者反馈。如果消费者在消费过程中出现异常,断开连接切没有发送应答,那么RabbitMQ会将这个消息重新投递。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    //消费者代码中新增代码BasicAckAsync,修改BasicConsumeAsync
    //在消费者未确认消息会重新发送
    consumer.Received += (model,ea) =>
    {
    Thread.Sleep(3000);
    //接受队列
    byte[] message = ea.Body.ToArray();
    Console.WriteLine("接收到的消息为:" + Encoding.UTF8.GetString(message));
    channelAsync.BasicAckAsync(ea.DeliveryTag, true); // 开启返回消息确认
    };
    // 消费者开启监听
    await channelAsync.BasicConsumeAsync(queueName, false, consumer);// 将autoAck设置false 关闭自动确认.

4.5.2 能者多劳模式

能者多劳模式–告诉rabbit每次只能向消费者发送一条消息,再消费者未确认之前,不再向它发送信息

1
2
await channelAsync.BasicQosAsync(0, 1, false);

4.6 发布订阅模式(Fanout)

1
channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Fanout); // 把交换机设置为 fanout 发布订阅模式
  • 生产者代码
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    using RabbitMQ.Client;
    using System.Text;

    Console.WriteLine("Hello, World! 生产者");

    var factory = new ConnectionFactory() // 创建连接工厂对象
    {
    HostName = "localhost",
    Port = 5672,
    UserName = "guest",
    Password = "guest"
    };
    var connection = factory.CreateConnection(); // 创建连接对象
    var channel = connection.CreateModel(); // 创建连接会话对象

    #region 定义交换机
    string exchangeName = "exchange1";

    channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Fanout); // 把交换机设置为 fanout 发布订阅模式
    #endregion

    string str;
    do {
    Console.WriteLine("发送内容:");
    str = Console.ReadLine()!;

    byte[] body = Encoding.UTF8.GetBytes(str); // 消息内容

    channel.BasicPublish(exchangeName, "", null, body); // 发送消息
    } while (str.Trim().ToLower() != "exit");

    channel.Close();
    connection.Close();
  • 消费者代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
var factory = new ConnectionFactory()       // 创建连接工厂对象
{
HostName = "localhost",
Port = 5672,
UserName = "guest",
Password = "guest"
};

IConnection connection = factory.CreateConnection(); // 创建连接对象
IModel channel = connection.CreateModel(); // 创建连接会话对象

#region 声明交换机
string exchangeName = "exchange1";
channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout);
#endregion

#region 声明队列
string queueName = exchangeName + "_" + new Random().Next(1, 1000);
Console.WriteLine("队列名称:" + queueName);

channel.QueueDeclare(
queue: queueName,//消息队列名称
durable: false,//是否持久化,true持久化,队列会保存磁盘,服务器重启时可以保证不丢失相关信息。
exclusive: false,//是否排他,true排他的,如果一个队列声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除.
autoDelete: false,//是否自动删除。true是自动删除。自动删除的前提是:致少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除.
arguments: null ////设置队列的一些其它参数
);
#endregion


channel.QueueBind(queueName, exchangeName, ""); // 将队列与交换机绑定

channel.BasicQos(0, 1, false); // 告诉Rabbit每次只能向消费者发送一条信息,再消费者未确认之前,不再向他发送信息

// 创建消费者对象
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) => {

byte[] message = ea.Body.ToArray();
Console.WriteLine("接收到的消息为:" + Encoding.UTF8.GetString(message));

channel.BasicAck(ea.DeliveryTag, true); // 开启返回消息确认
};

channel.BasicConsume(queue: queueName, autoAck: false, consumer); // 将autoAck设置false 关闭自动确认.

Console.ReadKey();
channel.Dispose();
connection.Close();

通配符模式(Topic)路由模式(Direct)

路由模式(Direct):
路由模式下,在发布消息时指定不同的routeKey,交换机会根据不同的routeKey分发消息到不同的队列中
通配符模式(Topic):
通配符模式与路由模式一致,只不过通配符模式中的路由可以声明为模糊查询,RabbitMQ拥有两个通配符

  • #:匹配0-n个字符语句
  • *:匹配一个字符语句
  • 注意:RabbitMQ中通配符并不像正则中的单个字符,而是一个以“.”分割的字符串,如 ”topic1.*“匹配的规则以topic1开始并且”.”后只有一段语句的路由 例:“topic1.aaa”,“topic1.bb”
  • 而“#”可以匹配到 “topic1.aaa.bb”,“topic1.bb.cc”.

实战遇到问题

1.权限不够不能假如虚拟网络

1
The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=530, text='NOT_ALLOWED - access to vhost '/' refused for user 'revice-user'', classId=10, methodId=40

我想就创建一个普通用户用来进行消费者的时候遇到了一开始创建的时候tags设置的none
RabbitMQ 管理界面创建用户时选择了 none 作为 tags,只表示该用户没有被分配到任何默认的角色,但你仍然可以手动为该用户分配权限。
发现需要设置成policymaker角色,并且将角色假如到虚拟网络中

  1. 1
    The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=406, text='PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'faithQueue' in vhost '/': received 'false' but current is 'true'', classId=50, methodId=10
    我在生产者创建队列时durable属性设置成true,消费者时候设置的false,导致成为两个不同的队列需要一致
    1
    2
    3
    4
    5
    6
    7
    await channelAsync.QueueDeclareAsync(
    queue: queueName,//消息队列名称
    durable: true,//是否持久化,true持久化,队列会保存磁盘,服务器重启时可以保证不丢失相关信息。
    exclusive: false,//是否排他,true排他的,如果一个队列声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除.
    autoDelete: false,//是否自动删除。true是自动删除。自动删除的前提是:致少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除.
    arguments: null ////设置队列的一些其它参数
    );
  • Title: Dotnet Core 6.0 使用RabbitMQ
  • Author: faith team
  • Created at: 2024-02-26 11:21:05
  • Updated at: 2025-11-29 09:01:08
  • Link: https://redefine.ohevan.com/2024/02/26/20240226rabbitMq/
  • License: This work is licensed under CC BY-NC-SA 4.0.
 Comments