[奇思异想]使用Zookeeper管理数据库连接串

释放双眼,带上耳机,听听看~!

背景

  有一套特定规格的应用(程序+数据库),当有业务需求时,就需要多部署应用,并且所有的应用都使用一个共同的后台来管理。应用新增后,如何通知后台更新连接串成了一个关键的问题。于是就产生了使用ZooKeeper管理数据库连接串的奇思异想。具体方案如下:

  [奇思异想]使用Zookeeper管理数据库连接串

 

 

  1. 运维负责搭建数据库,并执行初始化脚本,然后把对应的数据库配置刷入ZooKeeper;

  2. 运维完成App(1...N)的部署,App(1...N)从ZooKeeper读取对应的数据库配置;

  3. 后台监听ZooKeeper,更新数据库配置到后台应用内存。

 

环境准备

  1. 安装Zookeeper

  docker pull zookeeper:3.4.13

  [奇思异想]使用Zookeeper管理数据库连接串

  

  docker run --name zookeeper -d -p 2181:2181 zookeeper:3.4.13

  [奇思异想]使用Zookeeper管理数据库连接串

 

  2. 安装Mysql

  docker pull mysql:5.7

  [奇思异想]使用Zookeeper管理数据库连接串

 

  docker run --name mysql -e MYSQL_ROOT_PASSWORD=root -p 3306:3306 -d mysql:5.7
  docker run --name mysql2 -e MYSQL_ROOT_PASSWORD=root -p 3307:3306 -d mysql:5.7
  docker run --name mysql3 -e MYSQL_ROOT_PASSWORD=root -p 3308:3306 -d mysql:5.7

  [奇思异想]使用Zookeeper管理数据库连接串

 

  3. 初始化数据库

CREATE DATABASE test;
USE test;
CREATE TABLE `table` (
  `id` int(11) NOT NULL,
  `name` varchar(50) NOT NULL,
  PRIMARY KEY (`id`)
);

  [奇思异想]使用Zookeeper管理数据库连接串 [奇思异想]使用Zookeeper管理数据库连接串

 

  分别在各个数据库插入测试数据

  mysql:

USE test;
INSERT INTO `table` (id, name) VALUES (1, \'A1\');
INSERT INTO `table` (id, name) VALUES (2, \'B1\');
INSERT INTO `table` (id, name) VALUES (3, \'C1\');

 

  mysql2:

USE test;
INSERT INTO `table` (id, name) VALUES (1, \'A2\');
INSERT INTO `table` (id, name) VALUES (2, \'B2\');
INSERT INTO `table` (id, name) VALUES (3, \'C2\');

 

  mysql3:

USE test;
INSERT INTO `table` (id, name) VALUES (1, \'A3\');
INSERT INTO `table` (id, name) VALUES (2, \'B3\');
INSERT INTO `table` (id, name) VALUES (3, \'C3\');

 

  4. 基于数据库生成POCO

  Install-Package MySql.Data.EntityFrameworkCore -Version 8.0.13

  Scaffold-DbContext \"server=127.0.0.1;port=3306;user=root;password=123456;database=test\" MySql.Data.EntityFrameworkCore -OutputDir DataAccess -f

  [奇思异想]使用Zookeeper管理数据库连接串

  [奇思异想]使用Zookeeper管理数据库连接串

 

  5. 引用ZooKeeper相关组件

  Install-Package ZooKeeperNetEx -Version 3.4.12.1

  

核心代码

  1. ZookeeperOption:从appsettings中读取ZooKeeper相关配置

    public class ZookeeperOption
    {
        public ZookeeperOption(IConfiguration config)
        {
            if (config == null)
            {
                throw new ArgumentNullException(nameof(config));
            }

            var section = config.GetSection(\"zookeeper\");
            section.Bind(this);
        }

        public string ConnectionString { get; set; }

        public int Timeout { get; set; }
    }

 

  2. ZookeeperServiceCollectionExtensions:注册ZooKeeper服务

    public static class ZookeeperServiceCollectionExtensions
    {
        public static IServiceCollection AddZookeeper(this IServiceCollection services, IConfiguration config)
        {
            if (services == null)
            {
                throw new ArgumentNullException(nameof(services));
            }

            if (config == null)
            {
                throw new ArgumentNullException(nameof(config));
            }

            services.AddOptions();

            var option = new ZookeeperOption(config);

            var zookeeper = new org.apache.zookeeper.ZooKeeper(option.ConnectionString, option.Timeout * 1000, new DefaultWatcher());

            services.Add(ServiceDescriptor.Singleton(zookeeper));
            return services;
        }
    }

    public class DefaultWatcher : Watcher
    {
        public override Task process(WatchedEvent @event)
        {
            return Task.CompletedTask;
        }
    }

 

  3. ZookeeperHandler:ZooKeeper初始化及目录变化处理类,并把数据库连接信息写入程序内存

    public interface IZookeeperHandler
    {
        Task InitAsync();

        Task RefreshAsync();
    }

    public class ZookeeperHandler: IZookeeperHandler
    {
        private readonly org.apache.zookeeper.ZooKeeper _zooKeeper;
        private readonly IMemoryCache _cache;

        public ZookeeperHandler(org.apache.zookeeper.ZooKeeper zooKeeper, IMemoryCache cache)
        {
            _zooKeeper = zooKeeper;
            _cache = cache;
        }

        public async Task InitAsync()
        {
            await RefreshAsync();
        }

        public async Task RefreshAsync()
        {
            var connDic = new Dictionary<string, string>();

            var isExisted = await _zooKeeper.existsAsync(\"/connections\");
            if (isExisted == null)
            {
                await _zooKeeper.createAsync(\"/connections\", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }

            var connResult = await _zooKeeper.getChildrenAsync(\"/connections\", new ConnectionWatcher(this));

            foreach (var conn in connResult.Children)
            {
                var connData = await _zooKeeper.getDataAsync($\"/connections/{conn}/value\");
                var connStr = Encoding.UTF8.GetString(connData.Data);
                connDic[conn] = connStr;
            }

            _cache.Set(\"connections\", connDic);
        }
    }

 

  4. ConnectionWatcher:监听者,内容变化时调用ZookeeperHandler的RefreshAsync()方法,其中,变化只通知一次,因此需要再次建立监听

    public class ConnectionWatcher : Watcher
    {
        private readonly IZookeeperHandler _zookeeperService;

        public ConnectionWatcher(IZookeeperHandler zookeeperService)
        {
            _zookeeperService = zookeeperService;
        }

        public override async Task process(WatchedEvent @event)
        {
            var type = @event.get_Type();

            if (type != Event.EventType.None)
            {
                await _zookeeperService.RefreshAsync();
            }
        }
    }

 

  5. ZookeeperApplicationBuilderExtensions:初始化

    public static class ZookeeperApplicationBuilderExtensions
    {
        public static IApplicationBuilder UseZookeeper(this IApplicationBuilder app)
        {
            var service = app.ApplicationServices.GetRequiredService<IZookeeperHandler>();
            service.InitAsync().Wait();
            return app;
        }
    }

 

  6. ContextProvider:根据Id从内存中读取对应的数据库连接串,并提供DbContext实例

    public interface IContextProvider
    {
        TestContext GetContext(string id);
    }

    public class ContextProvider : IContextProvider
    {
        private readonly IMemoryCache _cache;

        public ContextProvider(IMemoryCache cache)
        {
            _cache = cache;
        }

        public TestContext GetContext(string id)
        {
            var dic = _cache.Get<Dictionary<string, string>>(\"connections\");
            var connectionStr = dic[id];

            var optionsBuilder = new DbContextOptionsBuilder<TestContext>();
            optionsBuilder.UseMySQL(connectionStr);
            return new TestContext(optionsBuilder.Options);
        }
    }

 

效果演示

  1. 刚开始没有任何连接信息

  [奇思异想]使用Zookeeper管理数据库连接串

 

  2. 添加一个连接信息

  [奇思异想]使用Zookeeper管理数据库连接串

  [奇思异想]使用Zookeeper管理数据库连接串

 

  3. 查询连接对应的数据

  [奇思异想]使用Zookeeper管理数据库连接串

 

  4. 再添加两个连接信息

  [奇思异想]使用Zookeeper管理数据库连接串

  [奇思异想]使用Zookeeper管理数据库连接串

  [奇思异想]使用Zookeeper管理数据库连接串

  [奇思异想]使用Zookeeper管理数据库连接串

  [奇思异想]使用Zookeeper管理数据库连接串

 

  5. 查看ZooKeeper的信息

  docker run -it --rm --link zookeeper:zookeeper zookeeper:3.4.13 zkCli.sh -server zookeeper

  ls /connections

  get /connections/1/value

  get /connections/2/value

  get /connections/3/value

  [奇思异想]使用Zookeeper管理数据库连接串

 

补充说明

  因为把数据库连接信息写到了程序内存中,因此,如果当ZooKeeper出现了故障:

  1. 老的(正在运行)应用正在使用的数据库不会受到影响,但无法监听到数据库信息的变化;

  2. 新的应用无法启动。

  ZooKeeper恢复后:

  1. 老的(正在运行)应用会重连,重新监听到数据库信息的变化

  2. 新的应用可以成功启动。

 

源码地址

  https://github.com/ErikXu/zookeeper-connection-management

给TA打赏
共{{data.count}}人
人已打赏
随笔日记

正则表达式:后面不要包含指定的字符串内容

2020-11-9 4:14:56

随笔日记

上周热点回顾(4.1-4.7)

2020-11-9 4:14:58

0 条回复 A文章作者 M管理员
    暂无讨论,说说你的看法吧
个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索