Dotnet Core基于Consul服务注册与发现

faith team

Consul架构

Test
Consul 集群支持多数据中心,在上图中有两个 DataCenter,他们通过 Internet 互联,为了提高通信效率,只有 Server 节点才加入跨数据中心的通信。在单个数据中心中,Consul 分为 Client 和 Server 两种节点(所有的节点也被称为 Agent),Server 节点保存数据,Client 负责健康检查及转发数据请求到 Server,本身不保存注册信息;Server 节点有一个 Leader 和多个 Follower,Leader 节点会将数据同步到 Follower,Server 节点的数量推荐是3个或者5个,在 Leader 挂掉的时候会启动选举机制产生一个新 Leader。

Consul集群的搭建

使用docker搭建3个Service节点和1个Client节点,Api服务通过Client节点进行服务注册和发现

1.下载镜像

1
docker pull consul

2.编写docker-compose.yml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
version: '3'
services:
cs1:
image: consul:latest
command: agent -server -client=0.0.0.0 -bootstrap-expect=3 -node=cs1 -data-dir=/data
volumes:
- /root/docker/consul/data/cs1:/data
cs2:
image: consul:latest
command: agent -server -client=0.0.0.0 -retry-join=cs1 -node=cs2 -data-dir=/data
volumes:
- /root/docker/consul/data/cs2:/data
depends_on:
- cs1
cs3:
image: consul:latest
command: agent -server -client=0.0.0.0 -retry-join=cs1 -node=cs3 -data-dir=/data
volumes:
- /root/docker/consul/data/cs3:/data
depends_on:
- cs1
cc1:
image: consul:latest
command: agent -client=0.0.0.0 -retry-join=cs1 -ui -node=cc1 -data-dir=/data
ports:
- 8500:8500
volumes:
- /root/docker/consul/data/cc1:/data
depends_on:
- cs2
- cs3

3.运行docker-compose

1
docker-compose up -d

4.遇到报错

发现运行容器后容器状态处于exited状态,查看容器日志

1
2
docker logs [容器id]
==> failed to setup node ID: failed to write NodeID to disk: open /data/node-id: permission denied

表示无法创建data文件夹?我最开始按照volumes对应创建了/root/docker/consul/data/cc1,bing了一下发现在cc1下面还有一个data文件夹,创建,看他报错还有permission denied,设置文件夹的权限

1
chmod -R 777 [url]

遇到报错了不要着急还是要先看日志

5.主要参数说明:

参数名 解释
-server 设置为 Server 类型节点,不加则为 Client 类型节点
-client 注册或者查询等一系列客户端对它操作的IP,默认是127.0.0.1
-bootstrap-expect 集群期望的 Server 节点数,只有达到这个值才会选举 Leader
-node 指定节点名称
-data-dir 数据存放位置
-retry-join 指定要加入的节点地址(组建集群)
-ui 启用 UI 界面

6.集群状态

查看节点状态

1
docker exec -t e002ca62ac24 consul members

Test
当前3个Service,1个Client

查看Server节点类型
Test
当前leader为cs1 把cs1关了会自动选举leader

Dotnet Core接入Consul 进行服务注册

1.配置文件内容

1
2
3
4
5
6
7
8
9
10
11
12
"ConsulRegisterOption": {
//你要去连接的consul服务地址
"Address": "http://localhost:8500",
//consul需要往服务发送心跳检测所以需要自己写一个healcheck接口并告知cousl
"ServiceHealthCheck": "http://localhost:7777/healthcheck",
//服务名称
"ServiceName": "ServiceA",
//服务ip
"ServiceIP": "localhost",
//服务端口
"ServicePort": "7777"
}

2.编写配置类

1
2
3
4
5
6
7
8
9
public class ConsulRegisterOption
{
public string Address { get; set; }
public string ServiceHealthCheck { get; set; }
//consul 的心跳检测和nacos有区别 nacos是主动给服务发心跳 而consul是服务主动给consul发心跳所以需要告诉consul自己的ip和端口
public string ServiceName { get; set; }
public string ServiceIP { get; set; }
public int ServicePort { get; set; }
}

3.编写IApplicationBuilder扩展类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public static class ConsulBuilderExtensions
{
public static IApplicationBuilder RegisterConsul(this IApplicationBuilder app, IHostApplicationLifetime lifetime, ConsulRegisterOption consulOption)
{
var consulClient = new ConsulClient(x =>
{
x.Address = new Uri(consulOption.Address);
});

var registration = new AgentServiceRegistration()
{
ID = Guid.NewGuid().ToString(),
Name = consulOption.ServiceName,// 服务名
Address = consulOption.ServiceIP, // 服务绑定IP
Port = consulOption.ServicePort, // 服务绑定端口
Check = new AgentServiceCheck()
{
DeregisterCriticalServiceAfter = TimeSpan.FromSeconds(5),//服务启动多久后注册
Interval = TimeSpan.FromSeconds(10),//健康检查时间间隔
HTTP = consulOption.ServiceHealthCheck,//健康检查地址
Timeout = TimeSpan.FromSeconds(5)
}
};

// 服务注册
consulClient.Agent.ServiceRegister(registration).Wait();

// 应用程序终止时,服务取消注册
lifetime.ApplicationStopping.Register(() =>
{
consulClient.Agent.ServiceDeregister(registration.ID).Wait();
});
return app;
}
}

4.program部分代码

1
2
3
4
builder.Services.Configure<ConsulRegisterOption>(builder.Configuration.GetSection("ConsulRegisterOption"));
//读取配置类注册
var consulRegisterOption = app.Services.GetRequiredService<IOptionsMonitor<ConsulRegisterOption>>().CurrentValue;
app.RegisterConsul(app.Lifetime, consulRegisterOption);

Dotnet Core接入Consul 进行服务发现

编写AbstractConsulDispatcher 服务发现抽象类

主要逻辑就是从cousl客户端去寻找服务,根据传入的服务名,负载均衡随机调用哪个服务。前提是这个服务是健康的服务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
using Consul;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;

namespace ConsulRegistHelper
{
public abstract class AbstractConsulDispatcher
{
protected ConsulRegisterOption _consulRegisterOptions;
protected KeyValuePair<string, AgentService>[] _CurrentAgentServiceDictionary;
public AbstractConsulDispatcher(IOptionsMonitor<ConsulRegisterOption> consulClientOption) {
_consulRegisterOptions = consulClientOption.CurrentValue;
}
/// <summary>
/// 负载均衡获取地址
/// </summary>
/// <param name="mappingUrl">映射后的地址例如http://SeriveB/test</param>
/// <returns></returns>
public string GetAddress(string mappingUrl) {
Uri uri = new Uri(mappingUrl);
var serviceName = uri.Host;
string addressPort = this.chooseAddress(serviceName);
string scheme = uri.Scheme;
string path = uri.PathAndQuery;
return $"{uri.Scheme}://{addressPort}{uri.PathAndQuery}";
}
protected virtual string chooseAddress(string serviceName) {
ConsulClient client = new ConsulClient(c =>
{
c.Address = new Uri($"http://localhost:8500");
});
AgentService agentService = null;

//获取consul 所有服务清单
var response = client.Agent.Services().Result.Response;
var dictionary = response.Where(s => s.Value.Service.Equals(serviceName, StringComparison.OrdinalIgnoreCase)).ToArray();
var entrys = client.Health.Service(serviceName).Result.Response;
List<KeyValuePair<string, AgentService>> serviceList = new List<KeyValuePair<string, AgentService>>();
for (int i = 0; i < entrys.Length; i++)
{
serviceList.Add(new KeyValuePair<string, AgentService>(i.ToString(), entrys[i].Service));
}
this._CurrentAgentServiceDictionary = serviceList.ToArray();
int index = this.GetIndex();
agentService = this._CurrentAgentServiceDictionary[index].Value;

return $"{agentService.Address}:{agentService.Port}";
}
protected abstract int GetIndex();
}
}

负载均很策略 扩展类 来设置服务发现的负载均很选用那种策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
using Microsoft.Extensions.DependencyInjection;
using System;
using System.Collections.Generic;
using System.Text;

namespace ConsulRegistHelper.ConsulExend
{
public static class ConsulExtend
{
/// <summary>
/// 注册Consul调度策略
/// </summary>
/// <param name="services"></param>
/// <param name="consulDispatcherType"></param>
public static void AddConsulDispatcher(this IServiceCollection services, ConsulDispatcherType consulDispatcherType)
{
switch (consulDispatcherType)
{
case ConsulDispatcherType.Average:
services.AddTransient<AbstractConsulDispatcher, AverageDispatcher>();
break;
case ConsulDispatcherType.Polling:
services.AddTransient<AbstractConsulDispatcher, PollingDispatcher>();
break;
case ConsulDispatcherType.Weight:
services.AddTransient<AbstractConsulDispatcher, WeightDispatcher>();
break;
default:
break;
}
}

public enum ConsulDispatcherType
{
Average = 0,
Polling = 1,
Weight = 2
}
}
}

1.权重调度

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
using Consul;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace ConsulRegistHelper.ConsulExend
{
/// <summary>
/// 权重调度
/// </summary>
public class WeightDispatcher : AbstractConsulDispatcher
{
#region Identity
private static int _iTotalCount = 0;
private static int iTotalCount
{
get
{
return _iTotalCount;
}
set
{
_iTotalCount = value >= Int32.MaxValue ? 0 : value;
}
}
public WeightDispatcher(IOptionsMonitor<ConsulRegisterOption> consulClientOption) : base(consulClientOption)
{

}
#endregion

protected override string chooseAddress(string serviceName)
{
ConsulClient client = new ConsulClient(c =>
{
c.Address = new Uri($"{_consulRegisterOptions.Address}");
});
AgentService agentService = null;
var response = client.Agent.Services().Result.Response;

this._CurrentAgentServiceDictionary = response.Where(s => s.Value.Service.Equals(serviceName, StringComparison.OrdinalIgnoreCase)).ToArray();


var serviceDictionaryNew = new List<AgentService>();
foreach (var service in base._CurrentAgentServiceDictionary)
{
serviceDictionaryNew.AddRange(Enumerable.Repeat(service.Value, int.TryParse(service.Value.Tags?[0], out int iWeight) ? 1 : iWeight));
}
int index = new Random(DateTime.Now.Millisecond).Next(0, int.MaxValue) % serviceDictionaryNew.Count;
agentService = serviceDictionaryNew[index];

return $"{agentService.Address}:{agentService.Port}";
}
/// <summary>
/// 不需要了
/// </summary>
/// <returns></returns>
protected override int GetIndex()
{
throw new NotImplementedException();
}
}
}

2.轮询

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Text;

namespace ConsulRegistHelper.ConsulExend
{
public class PollingDispatcher : AbstractConsulDispatcher
{
#region Identity
private static int _iTotalCount = 0;
private static int iTotalCount
{
get
{
return _iTotalCount;
}
set
{
_iTotalCount = value >= Int32.MaxValue ? 0 : value;
}
}

public PollingDispatcher(IOptionsMonitor<ConsulRegisterOption> consulClientOption) : base(consulClientOption)
{
}
#endregion

/// <summary>
/// 轮询
/// </summary>
/// <param name="serviceCount"></param>
/// <returns></returns>
protected override int GetIndex()
{
return iTotalCount++ % base._CurrentAgentServiceDictionary.Length;
}
}
}

3.平均调度

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Text;

namespace ConsulRegistHelper.ConsulExend
{
/// <summary>
/// 平均调度
/// </summary>
public class AverageDispatcher : AbstractConsulDispatcher
{
#region Identity
private static int _iTotalCount = 0;
private static int iTotalCount
{
get
{
return _iTotalCount;
}
set
{
_iTotalCount = value >= Int32.MaxValue ? 0 : value;
}
}

public AverageDispatcher(IOptionsMonitor<ConsulRegisterOption> consulClientOption) : base(consulClientOption)
{
}
#endregion
/// <summary>
/// 平均
/// </summary>
/// <returns></returns>
protected override int GetIndex()
{
return new Random(iTotalCount++).Next(0, base._CurrentAgentServiceDictionary.Length);
}
}
}

注入方式

1
builder.Services.AddConsulDispatcher(ConsulExtend.ConsulDispatcherType.Polling);

使用Refit来进行服务之间的调用

首先引入Refit nuget包
调用代码

1
2
3
4
5
6
7
8
9
10
11
12
[HttpGet("/getService")]
public async Task<List<Person>> getServcieB() {
//基于consul获取url地址
string url = "http://ServerB/getPerson";
//获取真实的地址
string realUrl = abstractConsulDispatcher.GetAddress(url);
//调用接口
var gitHubApi = RestService.For<IGitHubApi>(realUrl);
return await gitHubApi.getPerson();
// var res =await ExtensionsService.GetAsync(realUrl);
//return JsonConvert.DeserializeObject<List<Person>>(res);
}

编写IGitHubApi接口

1
2
3
4
5
6
7
8
9
using Refit;

namespace ServerB;

public interface IGitHubApi
{
[Get("/getperson")]
public Task<List<Person>> getPerson();
}
  • Title: Dotnet Core基于Consul服务注册与发现
  • Author: faith team
  • Created at: 2023-09-18 11:21:05
  • Updated at: 2025-11-29 09:01:08
  • Link: https://redefine.ohevan.com/2023/09/18/20230918Dotnet服务注册与发现/
  • License: This work is licensed under CC BY-NC-SA 4.0.
 Comments