Consul架构 Consul 集群支持多数据中心,在上图中有两个 DataCenter,他们通过 Internet 互联,为了提高通信效率,只有 Server 节点才加入跨数据中心的通信。在单个数据中心中,Consul 分为 Client 和 Server 两种节点(所有的节点也被称为 Agent),Server 节点保存数据,Client 负责健康检查及转发数据请求到 Server ,本身不保存注册信息;Server 节点有一个 Leader 和多个 Follower,Leader 节点会将数据同步到 Follower,Server 节点的数量推荐是3个或者5个,在 Leader 挂掉的时候会启动选举机制产生一个新 Leader。
Consul集群的搭建 使用docker搭建3个Service节点和1个Client节点,Api服务通过Client节点进行服务注册和发现
1.下载镜像
2.编写docker-compose.yml 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 version: '3' services: cs1: image: consul:latest command: agent -server -client=0.0.0.0 -bootstrap-expect=3 -node=cs1 -data-dir=/data volumes: - /root/docker/consul/data/cs1:/data cs2: image: consul:latest command: agent -server -client=0.0.0.0 -retry-join=cs1 -node=cs2 -data-dir=/data volumes: - /root/docker/consul/data/cs2:/data depends_on: - cs1 cs3: image: consul:latest command: agent -server -client=0.0.0.0 -retry-join=cs1 -node=cs3 -data-dir=/data volumes: - /root/docker/consul/data/cs3:/data depends_on: - cs1 cc1: image: consul:latest command: agent -client=0.0.0.0 -retry-join=cs1 -ui -node=cc1 -data-dir=/data ports: - 8500 :8500 volumes: - /root/docker/consul/data/cc1:/data depends_on: - cs2 - cs3
3.运行docker-compose
4.遇到报错 发现运行容器后容器状态处于exited状态,查看容器日志
1 2 docker logs [容器id ] ==> failed to setup node ID: failed to write NodeID to disk: open /data/node-id: permission denied
表示无法创建data文件夹?我最开始按照volumes对应创建了/root/docker/consul/data/cc1,bing了一下发现在cc1下面还有一个data文件夹,创建,看他报错还有permission denied,设置文件夹的权限
遇到报错了不要着急还是要先看日志
5.主要参数说明:
参数名
解释
-server
设置为 Server 类型节点,不加则为 Client 类型节点
-client
注册或者查询等一系列客户端对它操作的IP,默认是127.0.0.1
-bootstrap-expect
集群期望的 Server 节点数,只有达到这个值才会选举 Leader
-node
指定节点名称
-data-dir
数据存放位置
-retry-join
指定要加入的节点地址(组建集群)
-ui
启用 UI 界面
6.集群状态 查看节点状态
1 docker exec -t e002ca62ac24 consul members
当前3个Service,1个Client
查看Server节点类型 当前leader为cs1 把cs1关了会自动选举leader
Dotnet Core接入Consul 进行服务注册 1.配置文件内容
1 2 3 4 5 6 7 8 9 10 11 12 "ConsulRegisterOption" : { "Address" : "http://localhost:8500" , "ServiceHealthCheck" : "http://localhost:7777/healthcheck" , "ServiceName" : "ServiceA" , "ServiceIP" : "localhost" , "ServicePort" : "7777" }
2.编写配置类
1 2 3 4 5 6 7 8 9 public class ConsulRegisterOption { public string Address { get ; set ; } public string ServiceHealthCheck { get ; set ; } public string ServiceName { get ; set ; } public string ServiceIP { get ; set ; } public int ServicePort { get ; set ; } }
3.编写IApplicationBuilder扩展类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 public static class ConsulBuilderExtensions { public static IApplicationBuilder RegisterConsul (this IApplicationBuilder app, IHostApplicationLifetime lifetime, ConsulRegisterOption consulOption ) { var consulClient = new ConsulClient(x => { x.Address = new Uri(consulOption.Address); }); var registration = new AgentServiceRegistration() { ID = Guid.NewGuid().ToString(), Name = consulOption.ServiceName, Address = consulOption.ServiceIP, Port = consulOption.ServicePort, Check = new AgentServiceCheck() { DeregisterCriticalServiceAfter = TimeSpan.FromSeconds(5 ), Interval = TimeSpan.FromSeconds(10 ), HTTP = consulOption.ServiceHealthCheck, Timeout = TimeSpan.FromSeconds(5 ) } }; consulClient.Agent.ServiceRegister(registration).Wait(); lifetime.ApplicationStopping.Register(() => { consulClient.Agent.ServiceDeregister(registration.ID).Wait(); }); return app; } }
4.program部分代码
1 2 3 4 builder.Services.Configure<ConsulRegisterOption>(builder.Configuration.GetSection("ConsulRegisterOption" )); var consulRegisterOption = app.Services.GetRequiredService<IOptionsMonitor<ConsulRegisterOption>>().CurrentValue;app.RegisterConsul(app.Lifetime, consulRegisterOption);
Dotnet Core接入Consul 进行服务发现 编写AbstractConsulDispatcher 服务发现抽象类 主要逻辑就是从cousl客户端去寻找服务,根据传入的服务名,负载均衡随机调用哪个服务。前提是这个服务是健康的服务。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 using Consul;using Microsoft.Extensions.Options;using System;using System.Collections.Generic;using System.Diagnostics;using System.Linq;using System.Text;namespace ConsulRegistHelper { public abstract class AbstractConsulDispatcher { protected ConsulRegisterOption _consulRegisterOptions; protected KeyValuePair<string , AgentService>[] _CurrentAgentServiceDictionary; public AbstractConsulDispatcher (IOptionsMonitor<ConsulRegisterOption> consulClientOption ) { _consulRegisterOptions = consulClientOption.CurrentValue; } public string GetAddress (string mappingUrl ) { Uri uri = new Uri(mappingUrl); var serviceName = uri.Host; string addressPort = this .chooseAddress(serviceName); string scheme = uri.Scheme; string path = uri.PathAndQuery; return $"{uri.Scheme} ://{addressPort} {uri.PathAndQuery} " ; } protected virtual string chooseAddress (string serviceName ) { ConsulClient client = new ConsulClient(c => { c.Address = new Uri($"http://localhost:8500" ); }); AgentService agentService = null ; var response = client.Agent.Services().Result.Response; var dictionary = response.Where(s => s.Value.Service.Equals(serviceName, StringComparison.OrdinalIgnoreCase)).ToArray(); var entrys = client.Health.Service(serviceName).Result.Response; List<KeyValuePair<string , AgentService>> serviceList = new List<KeyValuePair<string , AgentService>>(); for (int i = 0 ; i < entrys.Length; i++) { serviceList.Add(new KeyValuePair<string , AgentService>(i.ToString(), entrys[i].Service)); } this ._CurrentAgentServiceDictionary = serviceList.ToArray(); int index = this .GetIndex(); agentService = this ._CurrentAgentServiceDictionary[index].Value; return $"{agentService.Address} :{agentService.Port} " ; } protected abstract int GetIndex () ; } }
负载均很策略 扩展类 来设置服务发现的负载均很选用那种策略 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 using Microsoft.Extensions.DependencyInjection;using System;using System.Collections.Generic;using System.Text;namespace ConsulRegistHelper.ConsulExend { public static class ConsulExtend { public static void AddConsulDispatcher (this IServiceCollection services, ConsulDispatcherType consulDispatcherType ) { switch (consulDispatcherType) { case ConsulDispatcherType.Average: services.AddTransient<AbstractConsulDispatcher, AverageDispatcher>(); break ; case ConsulDispatcherType.Polling: services.AddTransient<AbstractConsulDispatcher, PollingDispatcher>(); break ; case ConsulDispatcherType.Weight: services.AddTransient<AbstractConsulDispatcher, WeightDispatcher>(); break ; default : break ; } } public enum ConsulDispatcherType { Average = 0 , Polling = 1 , Weight = 2 } } }
1.权重调度
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 using Consul;using Microsoft.Extensions.Options;using System;using System.Collections.Generic;using System.Linq;using System.Text;namespace ConsulRegistHelper.ConsulExend { public class WeightDispatcher : AbstractConsulDispatcher { #region Identity private static int _iTotalCount = 0 ; private static int iTotalCount { get { return _iTotalCount; } set { _iTotalCount = value >= Int32.MaxValue ? 0 : value ; } } public WeightDispatcher (IOptionsMonitor<ConsulRegisterOption> consulClientOption ) : base (consulClientOption ) { } #endregion protected override string chooseAddress (string serviceName ) { ConsulClient client = new ConsulClient(c => { c.Address = new Uri($"{_consulRegisterOptions.Address} " ); }); AgentService agentService = null ; var response = client.Agent.Services().Result.Response; this ._CurrentAgentServiceDictionary = response.Where(s => s.Value.Service.Equals(serviceName, StringComparison.OrdinalIgnoreCase)).ToArray(); var serviceDictionaryNew = new List<AgentService>(); foreach (var service in base ._CurrentAgentServiceDictionary) { serviceDictionaryNew.AddRange(Enumerable.Repeat(service.Value, int .TryParse(service.Value.Tags?[0 ], out int iWeight) ? 1 : iWeight)); } int index = new Random(DateTime.Now.Millisecond).Next(0 , int .MaxValue) % serviceDictionaryNew.Count; agentService = serviceDictionaryNew[index]; return $"{agentService.Address} :{agentService.Port} " ; } protected override int GetIndex () { throw new NotImplementedException(); } } }
2.轮询
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 using Microsoft.Extensions.Options;using System;using System.Collections.Generic;using System.Text;namespace ConsulRegistHelper.ConsulExend { public class PollingDispatcher : AbstractConsulDispatcher { #region Identity private static int _iTotalCount = 0 ; private static int iTotalCount { get { return _iTotalCount; } set { _iTotalCount = value >= Int32.MaxValue ? 0 : value ; } } public PollingDispatcher (IOptionsMonitor<ConsulRegisterOption> consulClientOption ) : base (consulClientOption ) { } #endregion protected override int GetIndex () { return iTotalCount++ % base ._CurrentAgentServiceDictionary.Length; } } }
3.平均调度
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 using Microsoft.Extensions.Options;using System;using System.Collections.Generic;using System.Text;namespace ConsulRegistHelper.ConsulExend { public class AverageDispatcher : AbstractConsulDispatcher { #region Identity private static int _iTotalCount = 0 ; private static int iTotalCount { get { return _iTotalCount; } set { _iTotalCount = value >= Int32.MaxValue ? 0 : value ; } } public AverageDispatcher (IOptionsMonitor<ConsulRegisterOption> consulClientOption ) : base (consulClientOption ) { } #endregion protected override int GetIndex () { return new Random(iTotalCount++).Next(0 , base ._CurrentAgentServiceDictionary.Length); } } }
注入方式
1 builder.Services.AddConsulDispatcher(ConsulExtend.ConsulDispatcherType.Polling);
使用Refit来进行服务之间的调用 首先引入Refit nuget包 调用代码
1 2 3 4 5 6 7 8 9 10 11 12 [HttpGet("/getService" ) ] public async Task<List<Person>> getServcieB() { string url = "http://ServerB/getPerson" ; string realUrl = abstractConsulDispatcher.GetAddress(url); var gitHubApi = RestService.For<IGitHubApi>(realUrl); return await gitHubApi.getPerson(); }
编写IGitHubApi接口
1 2 3 4 5 6 7 8 9 using Refit;namespace ServerB ;public interface IGitHubApi { [Get("/getperson" ) ] public Task<List<Person>> getPerson(); }