序列化器

在编写处理请求的逻辑前,我们要先实现序列化器。因为无论是请求或响应,都会涉及参数的传输。

而 Java 对象是存活在 JVM 虚拟机中的,如果想在其他位置存储并访问、或者在网络中进行传输,就需要进行序列化和反序列化。


序列化和反序列化

  • 序列化:将 Java 对象转换为字节序列的过程。
  • 反序列化:将字节序列转换为 Java 对象的过程。

序列化方式

  • Java 序列化:Java 提供的序列化方式,将 Java 对象转换为字节序列,然后通过网络传输。
  • JSON 序列化:将 Java 对象转换为 JSON 字符串,然后通过网络传输。
  • XML 序列化:将 Java 对象转换为 XML 字符串,然后通过网络传输。
  • Protobuf 序列化:将 Java 对象转换为 Protobuf 字节序列,然后通过网络传输。
  • Hessian 序列化:将 Java 对象转换为 Hessian 字节序列,然后通过网络传输。
  • Kryo 序列化:将 Java 对象转换为 Kryo 字节序列,然后通过网络传输。

序列化的作用

无论是请求或响应,都会涉及参数的传输。

而 Java 对象是存活在 JVM 虚拟机中的,如果想在其他位置存储并访问、或者在网络中进行传输,就需要进行序列化和反序列化。


序列化器实现方式

我们所追求的 “更好的” 序列化器,可以是具有更高的性能、或者更小的序列化结果,这样就能够更快地完成 RPC 的请求和响应。

我们实现了 Java 原生序列化实现序列化器,但这未必是最好的。

市面上还有很多种主流的序列化方式,比如 JSON、Hessian、Kryo、protobuf 等。


主流序列化方式优缺点对比

JSON:

优点:

  • 易读性好,可读性强,便于人类理解和调试。
  • 跨语言支持广泛,几乎所有编程语言都有 JSON 的解析和生成库。

缺点:

  • 占用空间大,JSON 字符串通常比二进制序列化的数据量大很多,因为JSON 是文本格式存储数据,需要额外的字符表示键、值和数据结构。
  • 不可以很好的处理复杂的数据结构和循环引用,可能会导致性能降低和序列化失败。

Hessian:

官方网站:https://hessian.caucho.com/

优点:

  • 占用空间小,序列化后的数据量比较小,因为 Protobuf 是一种二进制序列化格式,可以节省空间,减少网络传输时间。
  • 兼容性高,支持跨语言,适用于分布式系统中的RPC 调用。

缺点:

  • Hessian 的序列化速度比 JSON 慢,因为需要将对象转换为二进制格式。
  • 对象必须要实现Serializable 接口,限制了可序列化的对象范围。

Kryo:

官方网站:https://github.com/EsotericSoftware/kryo

优点:

  • 高性能:Kryo 是一个高性能的序列化框架,序列化和反序列化速度非常快,适用于高吞吐量场景。
  • 支持循环引用,可以处理复杂的数据结构,如树形结构、循环引用等。
  • 可定制:允许自定义序列化器,适用于复杂的对象结构。
  • 无需实现 Serializable 接口,可以序列化任意对象。

缺点:

  • 兼容性问题:不支持跨语言,只适用于 Java。
  • 对象的序列化格式不够友好,不易于读懂和调试。

Protobuf:

优点:

  • 高效的二进制序列化,序列化后的数据量极小,占用空间小。
  • 支持跨语言,并且提供了多种语言的实现库。
  • 支持版本化,解决了向前/向后兼容的问题。

缺点:

  • 配置相对复杂,需要先定义数据结构的消息格式。
  • 对象的序列化格式不易读懂,不便于调试。

序列化器实现

序列化接口

首先我们提供一个序列化接口 Serializer,它包含两个方法,方便后续扩展更多的序列化器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* 序列化器接口
*/
public interface Serializer {

/**
* 序列化
*
* @param object
* @param <T>
* @return
* @throws IOException
*/
<T> byte[] serialize(T object) throws IOException;

/**
* 反序列化
*
* @param bytes
* @param type
* @param <T>
* @return
* @throws IOException
*/
<T> T deserialize(byte[] bytes, Class<T> type) throws IOException;
}

Java 原生序列化实现

基于Java自带的序列化方式来实现序列化器 JdkSerializer:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
/**
* JDK 序列化器
*/
public class JdkSerializer implements Serializer {

/**
* 序列化
*
* @param object
* @param <T>
* @return
* @throws IOException
*/
@Override
public <T> byte[] serialize(T object) throws IOException {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
ObjectOutputStream objectOutputStream = new ObjectOutputStream(outputStream);
objectOutputStream.writeObject(object);
objectOutputStream.close();
return outputStream.toByteArray();
}

/**
* 反序列化
*
* @param bytes
* @param type
* @param <T>
* @return
* @throws IOException
*/
@Override
public <T> T deserialize(byte[] bytes, Class<T> type) throws IOException {
ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes);
ObjectInputStream objectInputStream = new ObjectInputStream(inputStream);
try {
return (T) objectInputStream.readObject();
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
} finally {
objectInputStream.close();
}
}
}

JSON 序列化实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
/**
* Json 序列化器
*/
public class JsonSerializer implements Serializer {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

@Override
public <T> byte[] serialize(T obj) throws IOException {
return OBJECT_MAPPER.writeValueAsBytes(obj);
}

@Override
public <T> T deserialize(byte[] bytes, Class<T> classType) throws IOException {
T obj = OBJECT_MAPPER.readValue(bytes, classType);
if (obj instanceof RpcRequest) {
return handleRequest((RpcRequest) obj, classType);
}
if (obj instanceof RpcResponse) {
return handleResponse((RpcResponse) obj, classType);
}
return obj;
}

/**
* 由于 Object 的原始对象会被擦除,导致反序列化时会被作为 LinkedHashMap 无法转换成原始对象,因此这里做了特殊处理
*
* @param rpcRequest rpc 请求
* @param type 类型
* @return {@link T}
* @throws IOException IO异常
*/
private <T> T handleRequest(RpcRequest rpcRequest, Class<T> type) throws IOException {
Class<?>[] parameterTypes = rpcRequest.getParameterTypes();
Object[] args = rpcRequest.getArgs();

// 循环处理每个参数的类型
for (int i = 0; i < parameterTypes.length; i++) {
Class<?> clazz = parameterTypes[i];
// 如果类型不同,则重新处理一下类型
if (!clazz.isAssignableFrom(args[i].getClass())) {
byte[] argBytes = OBJECT_MAPPER.writeValueAsBytes(args[i]);
args[i] = OBJECT_MAPPER.readValue(argBytes, clazz);
}
}
return type.cast(rpcRequest);
}

/**
* 由于 Object 的原始对象会被擦除,导致反序列化时会被作为 LinkedHashMap 无法转换成原始对象,因此这里做了特殊处理
*
* @param rpcResponse rpc 响应
* @param type 类型
* @return {@link T}
* @throws IOException IO异常
*/
private <T> T handleResponse(RpcResponse rpcResponse, Class<T> type) throws IOException {
// 处理响应数据
byte[] dataBytes = OBJECT_MAPPER.writeValueAsBytes(rpcResponse.getData());
rpcResponse.setData(OBJECT_MAPPER.readValue(dataBytes, rpcResponse.getDataType()));
return type.cast(rpcResponse);
}
}

Kryo 序列化实现

依赖库:

1
2
3
4
5
6
<!-- https://mvnrepository.com/artifact/com.esotericsoftware/kryo -->
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo</artifactId>
<version>5.6.0</version>
</dependency>

由于 Kryo 本身是线程不安全的,所以需要使用 ThreadLocal 来保证每个线程都有一个单独的 Kryo 对象实例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
* Kryo 序列化器
*/
public class KryoSerializer implements Serializer {
/**
* kryo 线程不安全,使用 ThreadLocal 保证每个线程只有一个 Kryo
*/
private static final ThreadLocal<Kryo> KRYO_THREAD_LOCAL = ThreadLocal.withInitial(() -> {
Kryo kryo = new Kryo();
// 设置动态动态序列化和反序列化类,不提前注册所有类(可能有安全问题)
kryo.setRegistrationRequired(false);
return kryo;
});

@Override
public <T> byte[] serialize(T obj) {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
Output output = new Output(byteArrayOutputStream);
KRYO_THREAD_LOCAL.get().writeObject(output, obj);
output.close();
return byteArrayOutputStream.toByteArray();
}

@Override
public <T> T deserialize(byte[] bytes, Class<T> classType) {
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
Input input = new Input(byteArrayInputStream);
T result = KRYO_THREAD_LOCAL.get().readObject(input, classType);
input.close();
return result;
}
}

Hessian 序列化实现

依赖库:

1
2
3
4
5
6
<!-- https://mvnrepository.com/artifact/com.caucho/hessian -->
<dependency>
<groupId>com.caucho</groupId>
<artifactId>hessian</artifactId>
<version>4.0.66</version>
</dependency>

实现比较简单,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* Hessian 序列化器
*/
public class HessianSerializer implements Serializer {
@Override
public <T> byte[] serialize(T object) throws IOException {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
HessianOutput ho = new HessianOutput(bos);
ho.writeObject(object);
return bos.toByteArray();
}

@Override
public <T> T deserialize(byte[] bytes, Class<T> tClass) throws IOException {
ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
HessianInput hi = new HessianInput(bis);
return (T) hi.readObject(tClass);
}
}