注册中心应该有什么核心能力?

  1. 服务注册:服务提供者上报服务信息到注册中心。
  2. 服务发现:服务消费者从注册中心获取服务提供者信息。
  3. 心跳检测:定期检查服务提供者存活状态,如果服务提供者挂了,则从注册中心删除服务提供者信息。
  4. 服务注销:手动剔除节点、或者自动剔除失效节点。
  5. 数据分布式存储:集中的服务注册信息数据存储、读取和共享。
  6. 更多的优点:比如注册中心本身的容错、服务消费者缓存等。

Etcd

官方网站:https://github.com/etcd-io/etcd


### Etcd 介绍 Etcd 是一个 Go 语言实现的、开源的、分布式的键值存储系统,他主要用于分布式系统中的服务发现、配置管理和分布式锁等场景。

当提到 Go 语言实现,我们就可以想得到,Etcd 的性能是非常高的,而且它和云原生有密切的关系,通常被作为云原生应用的配置中心、基础设施,存储一些元信息。

比如经典的容器管理平台 K8s 就是使用了Etcd 来存储集群配置信息、集群状态信息、节点信息等。

除了性能高外,Etcd 采用 Raft 一致性算法,确保了在分布式环境中的数据强一致性和可靠性,具有高可用性、强一致性、分布式特性等特点。

Etcd 非常简单易用,提供了简单的 Api、数据的过期机制、数据的监听和通知机制等,完美满足注册中心场景的需求。

Etcd 的入门极低,只需要学过 Redis、Zookeeper 或者对象存储中的一个,就能够很快的理解 Etcd 并且投入实战运用。


Etcd 数据结构与特征

Etcd 在其数据模型和组织结构上更接近于 ZooKeeper 和对象存储,而不是 Redis。

它使用层次化的键值对来存储数据,支持类似于文件系统路径的层次结构,能够很灵活地单 key 查询、按前缀查询、按范围查询。

如下图:
etcd.png


Etcd 的核心数据结构

  1. Key:Etcd 中的基本数据单元,类似于文件系统中的文件名。每一个键都唯一标识一个值,并且可以包含子键,形成类似于路径的层次结构。
  2. Value:与键关联的数据,可以是任何类型的数据,通常是字符串形式。

只有 Key 和 Value,所以我们可以将数据序列化后写入到 Value 中。


Etcd 的核心特征

  1. Lease(租约):用于对键值对进行 TTL 超时设置,也就是设置键值对的过期时间。当租约过期的时候,相关的键值对将被自动删除。
  2. Watch(监听):用于监听特定键的变化,每当键的值发生变化时,会触发相应的回调函数。

有了这些特性,我们就可以能够实现注册中心的服务提供者节点的过期和监听。


Etcd 如何保证数据一致性?

Etcd 的一大优势就是能够保证数据的强一致性。

  1. 从表层来看,Etcd 支持事务操作,能够保证数据一致性。
  2. 从底层来看,Etcd 采用的是 Raft 算法来实现数据一致性。

Raft 是一种分布式一致性算法,它确保了分布式系统中的所有节点在任何时间点都能达成一致的数据视图。

  • 具体来说, Raft算法是通过选举机制选举出一个领导者(Leader)节点,领导者负责接受客户端的写请求,并且将写请求操作复制到其他节点上。
  • 当客户端发送写请求时,领导者首先将写操作写入自己的日志中,并且将写操作的日志条目分发到其他节点,其他节点通过复制日志条目来保持一致性。
  • 一旦大多数节点(至少是超过半数的节点)都将该日志条目成功写入到自己的日志中,该日志条目则视为已提交,领导者则向客户端发送成功响应。
  • 在领导者发送成功响应后,该写操作就被视为成功提交,从而保证了数据的一致性。
  • 如果领导者节点挂了,其他节点会自动选举出新的领导者,保证数据的一致性。
  • 新的领导者会继续接收客户端的写请求,并将写请求操作复制到其他节点上,从而保证数据的一致性。

Playground

我们可以使用官方提供的 Etcd Playground 来可视化操作 Etcd,便于学习。

官方网站:http://play.etcd.io/play


Etcd 安装

Etcd Github下载页:https://github.com/etcd-io/etcd/releases

Etcd 官方下载:https://etcd.io/docs/v3.2/install/

找到自己操作系统的版本执行即可,安装完成后,会得到 3 个脚本:

  • etcd: etcd 服务端
  • etcdctl: etcd 客户端,用于操作 etcd 服务端,比如读写数据等。
  • etcdutl: etcd 备份恢复工具

执行完 etcd 脚本后,就可以启动 etcd 服务,服务会默认监听 2379 和 2380 两个端口,作用分别如下:

  • 2379:提供 HTTP API 服务,和 etcdctl 通信。
  • 2380:急群中节点之间的通信。

Etcd 可视化工具

一般情况下,我们使用数据存储中间件时,一定要有一个可视化工具,能够更直观清晰地管理已经存储的数据。

比如 Redis 的 Redis Desktop Manager。

同样的,Etcd 也有一些可视化工具,比如:

推荐 etcdkeeper,安装成本更低,学习使用更方便。

进入项目的 GitHub,就能看到安装方式,直接按照指引下载、解压、运行脚本即可:
etcd-1.png

安装后,执行命令,可以在指定端口启动可视化界面(默认是 8080 端口),比如在 8081 端口启动。

1
./etcdkeeper -port 8081

安装后,访问本地 http://127.0.0.1:8081/etcdkeeper/ ,就能看到可视化页面了


Etcd Java 客户端

客户端,就是操作 Etcd 的工具。

etcd 主流的 Java 客户端是 jetcd:https://github.com/etcd-io/jetcd

注意,Java 版本必须要大于 11

用法非常简单,就像 curator 能够操作 ZooKeeper、jedis 能够操作 Redis 一样。


引入依赖

1
2
3
4
5
6
<!-- https://mvnrepository.com/artifact/io.etcd/jetcd-core -->
<dependency>
<groupId>io.etcd</groupId>
<artifactId>jetcd-core</artifactId>
<version>0.7.7</version>
</dependency>

示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class EtcdRegistry {

public static void main(String[] args) throws ExecutionException, InterruptedException {
// create client using endpoints
Client client = Client.builder().endpoints("http://localhost:2379")
.build();

KV kvClient = client.getKVClient();
ByteSequence key = ByteSequence.from("test_key".getBytes());
ByteSequence value = ByteSequence.from("test_value".getBytes());

// put the key-value
kvClient.put(key, value).get();

// get the CompletableFuture
CompletableFuture<GetResponse> getFuture = kvClient.get(key);

// get the value from CompletableFuture
GetResponse response = getFuture.get();

// delete the key
kvClient.delete(key).get();
}
}

在上述代码中,我们使用 KVClient 来操作 etcd 写入和读取数据。

除了 KVClient 客户端外,Etcd 还提供了很多其他客户端。

etcd-3.png


常用的客户端和作用如下:

  1. kvClient:用于对 etcd 中的键值对进行操作。通过 kvClient 可以进行设置值、获取值、删除值、列出目录等操作。
  2. leaseClient:用于管理 etcd 的租约机制。租约是 etcd 中的一种时间片,用于为键值对分配生存时间,并在租约到期时自动删除相关的键值对。通过 leaseClient 可以创建、获取、续约和撤销租约。
  3. watchClient:用于监视 etcd 中键的变化,并在键的值发生变化时接收通知。
  4. clusterClient:用于与 etcd 集群进行交互,包括添加、移除、列出成员、设置选举、获取集群的健康状态、获取成员列表信息等操作。
  5. authClient:用于管理 etcd 的身份验证和授权。通过 authClient 可以添加、删除、列出用户、角色等身份信息,以及授予或撤销用户或角色的权限。
  6. maintenanceClient:用于执行 etcd 的维护操作,如健康检查、数据库备份、成员维护、数据库快照、数据库压缩等。
  7. lockClient:用于实现分布式锁功能,通过 lockClient 可以在 etcd 上创建、获取、释放锁,能够轻松实现并发控制。
  8. electionClient:用于实现分布式选举功能,可以在 etcd 上创建选举、提交选票、监视选举结果等。

绝大多数情况下,用前 3 个客户端就足够了,其余的仅做了解。


Debug 代码

使用 Debug 执行上述代码,观察 Etcd 的数据结构,如图:
etcd-4.png

发现除了 key 和 value 外,还有 revision、create_revision 和 mod_revision,它们分别代表键的版本号、创建时间戳和修改时间戳。

这是因为 etcd 中的每一个键都有一个与之关联的版本号,用于跟踪键的修改历史,每当一个键的值发生变化时,它的版本号都会增加。

通过使用 etcd 的 Watch API,我们可以实时监听键的变化,并在键的值发生变化时接收通知。

这种版本号的机制,使得 etcd 可以实现在分布式系统中高效的实现乐观并发控制、一致性和可靠性的数据访问。


存储结构设计

了解了 Etcd 的基本使用后,还需要设计服务注册信息如何存储在注册中心内。

存储结构设计的几个要点:

  1. key 如何设计?
  2. value 如何设计?
  3. key 什么时候过期?

由于一个服务可能有多个服务提供者(负载均衡),我们可以有两种结构设计:

层级结构

将服务理解为文件夹,将服务对应的多个节点理解为文件夹下的子文件。

那么就可以通过服务名称,使用前缀查询的方法,获取到该服务的所有节点。

键名的规则可以是:/业务前缀/服务名/服务节点地址

服务A: key = /service
服务A节点1: key = /service/node1
服务A节点2: key = /service/node2
服务A节点3: key = /service/node3

列表结构

将服务理解为列表,将服务对应的多个节点理解为列表中的元素。

服务A: key = service
服务A节点1: key = node1
服务A节点2: key = node2
服务A节点3: key = node3

选择哪种存储结构呢?这个也会跟我们的技术选型有关。

对于 ZooKeeper 和 Etcd 这种支持层级查询的中间件,用第一种结构会更清晰。

对于 Redis,由于本身就支持列表数据结构,可以选择第二种结构。

最后,必须要给 key 设置过期时间,比如默认 30秒过期时间,这样如果服务提供者宕机了,也可以超时后自动移除。


Etcd 开发实现

注册中心开发

注册信息定义

在 model 包下定义服务注册信息类 ServiceMetaInfo,用于封装服务的注册信息,包括服务名称、服务版本号、服务地址(域名和端口号)、服务分组等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
* 服务元信息(注册信息)
*/
public class ServiceMetaInfo {


/**
* 服务名称
*/
private String serviceName;

/**
* 服务版本号
*/
private String serviceVersion = "1.0";

/**
* 服务域名
*/
private String serviceHost;

/**
* 服务端口号
*/
private Integer servicePort;

/**
* 服务分组(暂未实现)
*/
private String serviceGroup = "default";

}

另外还需要给ServiceMetaInfo类增加一些工具方法,用于获取服务注册信息的键名、获取服务注册节点的键名等。

可以把版本号和分组都放到服务键名中,就可以在查询时根据这些参数获取对应的版本和分组的服务。

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* 获取服务键名
*
* @return
*/
public String getServiceKey() {
// 后续可扩展服务分组
// return String.format("%s:%s:%s", serviceName, serviceVersion, serviceGroup);
return String.format("%s:%s", serviceName, serviceVersion);
}

/**
* 获取服务注册节点键名
*
* @return
*/
public String getServiceNodeKey() {
return String.format("%s/%s:%s", getServiceKey(), serviceHost, servicePort);
}

由于注册信息中包含了服务版本号字段,所以我们也可以给 RpcRequest 对象补充服务版本号字段,可以先作为预留字段,后续再实现。

RpcConstant 常量类中补充默认服务版本号常量:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* RPC 相关常量
*/
public interface RpcConstant {

/**
* 默认配置文件加载前缀
*/
String DEFAULT_CONFIG_PREFIX = "rpc";

/**
* 默认服务版本
*/
String DEFAULT_SERVICE_VERSION = "1.0";
}

RpcRequest 请求类中使用该常量,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
* RPC 请求
*/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class RpcRequest implements Serializable {

/**
* 服务名称
*/
private String serviceName;

/**
* 方法名称
*/
private String methodName;

/**
* 服务版本
*/
private String serviceVersion = RpcConstant.DEFAULT_SERVICE_VERSION;

/**
* 参数类型列表
*/
private Class<?>[] parameterTypes;

/**
* 参数列表
*/
private Object[] args;

}

注册中心配置

在 config 包下编写注册中心配置类 RegistryConfig,让用户配置连接注册中心所需的信息,比如注册中心类别、注册中心地址、用户名、密码、连接超时时间等。

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* RPC 框架注册中心配置
*/
@Data
public class RegistryConfig {

/**
* 注册中心类别
*/
private String registry = "etcd";

/**
* 注册中心地址
*/
private String address = "http://localhost:2380";

/**
* 用户名
*/
private String username;

/**
* 密码
*/
private String password;

/**
* 超时时间(单位毫秒)
*/
private Long timeout = 10000L;
}

还要为 RpcConfig 全局配置补充注册中心配置,代码如下:

1
2
3
4
5
6
7
8
9
@Data
public class RpcConfig {
...

/**
* 注册中心配置
*/
private RegistryConfig registryConfig = new RegistryConfig();
}

注册中心接口

遵循可拓展的设计原则,所以先写一个注册中心接口,后续可以实现多种不同的注册中心,并且和序列化器一样,可以使用 SPI 机制来动态加载。

注册中心接口主要提供了初始化、注册服务、注销服务、服务发现(获取服务节点列表)、服务销毁等方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/**
* 注册中心接口
*/
public interface Registry {

/**
* 初始化
*
* @param registryConfig
*/
void init(RegistryConfig registryConfig);

/**
* 注册服务(服务端)
*
* @param serviceMetaInfo
*/
void register(ServiceMetaInfo serviceMetaInfo) throws Exception;

/**
* 注销服务(服务端)
*
* @param serviceMetaInfo
*/
void unRegister(ServiceMetaInfo serviceMetaInfo);

/**
* 服务发现(获取某服务的所有节点,消费端)
*
* @param serviceKey 服务键名
* @return
*/
List<ServiceMetaInfo> serviceDiscovery(String serviceKey);

/**
* 服务销毁
*/
void destroy();
}

Etcd 注册中心实现

注册中心初始化

在 registry 目录下新建 Etcd 注册中心类 EtcdRegistry,实现注册中心接口,先完成初始化,读取注册中心配置并初始化客户端对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* Etcd 注册中心类
*/
public class EtcdRegistry implements Registry {

private Client client;

private KV kvClient;

/**
* 根节点,定义 Etcd 键存储的根路径为 /rpc/,为了区分不同的项目。
*/
private static final String ETCD_ROOT_PATH = "/rpc/";

@Override
public void init(RegistryConfig registryConfig) {
client = Client.builder().endpoints(registryConfig.getAddress()).connectTimeout(Duration.ofMillis(registryConfig.getTimeout())).build();
kvClient = client.getKVClient();
}
}

服务注册

创建 key 并设置过期时间,value 为服务注册信息的 JSON 序列化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* 服务注册
*/
@Override
public void register(ServiceMetaInfo serviceMetaInfo) throws Exception {
// 创建 Lease 和 KV 客户端
Lease leaseClient = client.getLeaseClient();

// 创建一个 30 秒的 Lease
long leaseId = leaseClient.grant(30).get().getID();

// 设置要存储的键值对
String registerKey = ETCD_ROOT_PATH + serviceMetaInfo.getServiceNodeKey();
ByteSequence key = ByteSequence.from(registerKey, StandardCharsets.UTF_8);
ByteSequence value = ByteSequence.from(JSONUtil.toJsonStr(serviceMetaInfo), StandardCharsets.UTF_8);

// 将键值对与租约关联起来,并设置过期时间
PutOption putOption = PutOption.builder().withLeaseId(leaseId).build();
kvClient.put(key, value, putOption).get();
}

服务注销

删除 key,代码如下:

1
2
3
4
5
6
7
/**
* 服务注销
*/
@Override
public void unRegister(ServiceMetaInfo serviceMetaInfo) {
kvClient.delete(ByteSequence.from(ETCD_ROOT_PATH + serviceMetaInfo.getServiceNodeKey(), StandardCharsets.UTF_8));
}

服务发现

服务发现根据服务名称作为前缀,从 Etcd 获取服务下的节点列表:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
* 服务发现
*/
@Override
public List<ServiceMetaInfo> serviceDiscovery(String serviceKey) {
// 前缀搜索,结尾一定要加 '/'
String searchPrefix = ETCD_ROOT_PATH + serviceKey + "/";

try {
// 前缀查询
GetOption getOption = GetOption.builder().isPrefix(true).build();
List<KeyValue> keyValues = kvClient.get(
ByteSequence.from(searchPrefix, StandardCharsets.UTF_8),
getOption)
.get()
.getKvs();
// 解析服务信息
return keyValues.stream()
.map(keyValue -> {
String value = keyValue.getValue().toString(StandardCharsets.UTF_8);
return JSONUtil.toBean(value, ServiceMetaInfo.class);
})
.collect(Collectors.toList());
} catch (Exception e) {
throw new RuntimeException("获取服务列表失败", e);
}
}

服务销毁

服务销毁时,需要关闭 Etcd 客户端连接,用于关闭后释放资源,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* 服务销毁
*/
@Override
public void destroy() {
System.out.println("当前节点下线");
// 释放资源
if (kvClient != null) {
kvClient.close();
}
if (client != null) {
client.close();
}
}

支持配置和拓展注册中心

让开发者能够填写配置来指定使用的注册中心,并且支持自定义注册中心,让框架更易用、更利于扩展。

我们可以使用工厂来创建注册中心对象、并且使用 SPI 机制来动态加载自定义的注册中心。


注册中心常量

在 registry 目录下新建一个类 RegistryKeys,用于列举所有支持的注册中心键名。

1
2
3
4
5
6
7
8
9
10
/**
* 注册中心键名常量
*/
public interface RegistryKeys {

String ETCD = "etcd";

String ZOOKEEPER = "zookeeper";

}

注册中心工厂

使用工厂模式,支持根据 key 从 SPI 获取注册中心对象实例。

在 registry 目录下新建一个类 RegistryFactory,实现工厂模式,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* 注册中心工厂(用于获取注册中心对象)
*/
public class RegistryFactory {

static {
SpiLoader.load(Registry.class);
}

/**
* 默认注册中心
*/
private static final Registry DEFAULT_REGISTRY = new EtcdRegistry();

/**
* 获取实例
*
* @param key
* @return
*/
public static Registry getInstance(String key) {
return SpiLoader.getInstance(Registry.class, key);
}

}

注册中心接口 SPI 配置文件

在 META-INF/rpc/system 目录下新建一个文件 com.sihai.rpc.registry.Registry,内容如下:

1
etcd=com.sihai.rpc.registry.EtcdRegistry

注册中心初始化

由于服务提供者和服务消费者都需要和注册中心建立连接,是一个 RPC 框架启动必不可少的环节,所以可以将初始化流程放在 RpcApplication 类中。

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* 框架初始化,支持传入自定义配置
*
* @param newRpcConfig
*/
public static void init(RpcConfig newRpcConfig) {
rpcConfig = newRpcConfig;
log.info("rpc init, config = {}", newRpcConfig.toString());
// 注册中心初始化
RegistryConfig registryConfig = rpcConfig.getRegistryConfig();
Registry registry = RegistryFactory.getInstance(registryConfig.getRegistry());
registry.init(registryConfig);
log.info("registry init, config = {}", registryConfig);
}