黑匣子
满天星
Fork me on GitHub

基于Netty的RPC架构学习笔记(九):自定义序列化协议

为什么需要自定义序列化协议

上节中proto buff明显比java本身的序列化生成的byte数组短很多,因为java自身的序列化传入了很多信息(比如类信息、类型、字段等),通过自定义序列化协议能够通过自己定义的方式实现序列化和反序列化。

🌰

test1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package com.cn;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Arrays;

public class Test1 {

public static void main(String[] args) throws IOException {
int id = 101;
int age = 21;

ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream();
//希望将int写进去,可是看write的源码返现
//write(int b){
// buf[coung] = byte(b)
// count+=1;
//}
//发现是直接把int转化成了byte,int占有4个字节长度,所以这里出现了数据截断,所以自己写一个方法int2bytes。
arrayOutputStream.write(int2bytes(id));
arrayOutputStream.write(int2bytes(age));

byte[] byteArray = arrayOutputStream.toByteArray();

System.out.println(Arrays.toString(byteArray));

//==============================================================
ByteArrayInputStream arrayInputStream = new ByteArrayInputStream(byteArray);
byte[] idBytes = new byte[4];
arrayInputStream.read(idBytes);
System.out.println("id:" + bytes2int(idBytes));

byte[] ageBytes = new byte[4];
arrayInputStream.read(ageBytes);
System.out.println("age:" + bytes2int(ageBytes));

}


/**
* 大端字节序列(先写高位,再写低位)
* 百度下 大小端字节序列
* @param i
* @return
*/
public static byte[] int2bytes(int i){
byte[] bytes = new byte[4];
//一个字节八位,所以3*8
bytes[0] = (byte)(i >> 3*8);
bytes[1] = (byte)(i >> 2*8);
bytes[2] = (byte)(i >> 1*8);
bytes[3] = (byte)(i >> 0*8);
return bytes;
}


/**
* 大端
* @param bytes
* @return
*/
public static int bytes2int(byte[] bytes){
//原来向右移动了三个字节,希望还原回来,所以向左移动三个字节
//或运算就是为了把数据合并起来,变成int数据
return (bytes[0] << 3*8) |
(bytes[1] << 2*8) |
(bytes[2] << 1*8) |
(bytes[3] << 0*8);

}

}

输出

1
2
3
[0,0,0,101,0,0,0,21]
id:101
age:21

每次都需要进行位运算,而且先在是转换int,long、float、double转换呢?

test2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package com.cn;

import java.nio.ByteBuffer;
import java.util.Arrays;

public class Test2 {

public static void main(String[] args) {
int id = 101;
int age = 21;
//通过nio的bytebuffer转换,省去了位运算方法
//申请八个空间大小
ByteBuffer buffer = ByteBuffer.allocate(8);
buffer.putInt(id);
buffer.putInt(age);
byte[] array = buffer.array();
System.out.println(Arrays.toString(buffer.array()));

//====================================================

ByteBuffer buffer2 = ByteBuffer.wrap(array);
System.out.println("id:"+buffer2.getInt());
System.out.println("age:"+buffer2.getInt());

}

}

输出

1
2
3
[0,0,0,101,0,0,0,21]
id:101
age:21

简化了很多操作,但是

1
ByteBuffer buffer = ByteBuffer.allocate(8);

可是每次需要给定申请的给定空间大小,不能自动扩容

test3(使用netty中的ChannelBuffers)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package com.cn;

import java.util.Arrays;

import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;

public class Test3 {

public static void main(String[] args) {

ChannelBuffer buffer = ChannelBuffers.dynamicBuffer();
buffer.writeInt(101);
buffer.writeDouble(80.1);

byte[] bytes = new byte[buffer.writerIndex()];
buffer.readBytes(bytes);

System.out.println(Arrays.toString(bytes));

"abc".getBytes();

//================================================
ChannelBuffer wrappedBuffer = ChannelBuffers.wrappedBuffer(bytes);
System.out.println(wrappedBuffer.readInt());
System.out.println(wrappedBuffer.readDouble());

}

}

输出

1
2
3
[0,0,0,101,0,0,0,21]
id:101
age:21

注意

1
2
//channelBuffer根据写指针的位置,获取byte数组大小
byte[] bytes = new byte[buffer.writerIndex()];

除了能够自动扩容,还能够自动写入int double等类型,可是还是优缺点,这里没有一个writeString的方法

通过“abc”.getBytes()得到字节数据,可是无法确定字节的大小,不像int知道是四个字节,所以String LIst Map都需要在前面加一个长度字段。

|-|-|
|-|-|
|String List Map | short长度+字节数组|

自定义序列化协议(test4)

SeriaLizer.java (自定义了序列化的规则)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
package com.serial;


import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.jboss.netty.buffer.ChannelBuffer;
/**
* 自定义序列化接口
* @
*
*/
public abstract class Serializer {


public static final Charset CHARSET = Charset.forName("UTF-8");

protected ChannelBuffer writeBuffer;

protected ChannelBuffer readBuffer;

/**
* 反序列化具体实现
*/
protected abstract void read();

/**
* 序列化具体实现
*/
protected abstract void write();

/**
* 从byte数组获取数据
* @param bytes 读取的数组
*/
public Serializer readFromBytes(byte[] bytes) {
readBuffer = BufferFactory.getBuffer(bytes);
read();
readBuffer.clear();
return this;
}

/**
* 从buff获取数据
* @param readBuffer
*/
public void readFromBuffer(ChannelBuffer readBuffer) {
this.readBuffer = readBuffer;
read();
}

/**
* 写入本地buff
* @return
*/
public ChannelBuffer writeToLocalBuff(){
writeBuffer = BufferFactory.getBuffer();
write();
return writeBuffer;
}

/**
* 写入目标buff
* @param buffer
* @return
*/
public ChannelBuffer writeToTargetBuff(ChannelBuffer buffer){
writeBuffer = buffer;
write();
return writeBuffer;
}

/**
* 返回buffer数组
*
* @return
*/
public byte[] getBytes() {
writeToLocalBuff();
byte[] bytes = null;
if (writeBuffer.writerIndex() == 0) {
bytes = new byte[0];
} else {
bytes = new byte[writeBuffer.writerIndex()];
writeBuffer.readBytes(bytes);
}
writeBuffer.clear();
return bytes;
}


public byte readByte() {
return readBuffer.readByte();
}

public short readShort() {
return readBuffer.readShort();
}

public int readInt() {
return readBuffer.readInt();
}

public long readLong() {
return readBuffer.readLong();
}

public float readFloat() {
return readBuffer.readFloat();
}

public double readDouble() {
return readBuffer.readDouble();
}

public String readString() {
int size = readBuffer.readShort();
if (size <= 0) {
return "";
}

byte[] bytes = new byte[size];
readBuffer.readBytes(bytes);

return new String(bytes, CHARSET);
}

public <T> List<T> readList(Class<T> clz) {
List<T> list = new ArrayList<>();
int size = readBuffer.readShort();
for (int i = 0; i < size; i++) {
list.add(read(clz));
}
return list;
}

public <K,V> Map<K,V> readMap(Class<K> keyClz, Class<V> valueClz) {
Map<K,V> map = new HashMap<>();
int size = readBuffer.readShort();
for (int i = 0; i < size; i++) {
K key = read(keyClz);
V value = read(valueClz);
map.put(key, value);
}
return map;
}

@SuppressWarnings("unchecked")
public <I> I read(Class<I> clz) {
Object t = null;
if ( clz == int.class || clz == Integer.class) {
t = this.readInt();
} else if (clz == byte.class || clz == Byte.class){
t = this.readByte();
} else if (clz == short.class || clz == Short.class){
t = this.readShort();
} else if (clz == long.class || clz == Long.class){
t = this.readLong();
} else if (clz == float.class || clz == Float.class){
t = readFloat();
} else if (clz == double.class || clz == Double.class){
t = readDouble();
} else if (clz == String.class ){
t = readString();
} else if (Serializer.class.isAssignableFrom(clz)){
try {
byte hasObject = this.readBuffer.readByte();
if(hasObject == 1){
Serializer temp = (Serializer)clz.newInstance();
temp.readFromBuffer(this.readBuffer);
t = temp;
}else{
t = null;
}
} catch (Exception e) {
e.printStackTrace();
}

} else {
throw new RuntimeException(String.format("不支持类型:[%s]", clz));
}
return (I) t;
}


public Serializer writeByte(Byte value) {
writeBuffer.writeByte(value);
return this;
}

public Serializer writeShort(Short value) {
writeBuffer.writeShort(value);
return this;
}

public Serializer writeInt(Integer value) {
writeBuffer.writeInt(value);
return this;
}

public Serializer writeLong(Long value) {
writeBuffer.writeLong(value);
return this;
}

public Serializer writeFloat(Float value) {
writeBuffer.writeFloat(value);
return this;
}

public Serializer writeDouble(Double value) {
writeBuffer.writeDouble(value);
return this;
}

public <T> Serializer writeList(List<T> list) {
if (isEmpty(list)) {
writeBuffer.writeShort((short) 0);
return this;
}
writeBuffer.writeShort((short) list.size());
for (T item : list) {
writeObject(item);
}
return this;
}

public <K,V> Serializer writeMap(Map<K, V> map) {
if (isEmpty(map)) {
writeBuffer.writeShort((short) 0);
return this;
}
writeBuffer.writeShort((short) map.size());
for (Entry<K, V> entry : map.entrySet()) {
writeObject(entry.getKey());
writeObject(entry.getValue());
}
return this;
}

public Serializer writeString(String value) {
if (value == null || value.isEmpty()) {
writeShort((short) 0);
return this;
}

byte data[] = value.getBytes(CHARSET);
short len = (short) data.length;
writeBuffer.writeShort(len);
writeBuffer.writeBytes(data);
return this;
}

public Serializer writeObject(Object object) {

if(object == null){
writeByte((byte)0);
}else{



if (object instanceof Integer) {
writeInt((int) object);
return this;
}

if (object instanceof Long) {
writeLong((long) object);
return this;
}

if (object instanceof Short) {
writeShort((short) object);
return this;
}

if (object instanceof Byte) {
writeByte((byte) object);
return this;
}

if (object instanceof String) {
String value = (String) object;
writeString(value);
return this;
}
if (object instanceof Serializer) {
writeByte((byte)1);
Serializer value = (Serializer) object;
value.writeToTargetBuff(writeBuffer);
return this;
}

throw new RuntimeException("不可序列化的类型:" + object.getClass());
}

return this;
}

private <T> boolean isEmpty(Collection<T> c) {
return c == null || c.size() == 0;
}
public <K,V> boolean isEmpty(Map<K,V> c) {
return c == null || c.size() == 0;
}
}

注意其中的readString等方法

1
2
3
4
5
6
7
8
9
10
11
public String readString() {
int size = readBuffer.readShort();
if (size <= 0) {
return "";
}

byte[] bytes = new byte[size];
readBuffer.readBytes(bytes);

return new String(bytes, CHARSET);
}

将需要的String(list map一样)的byte大小保存到readbuffer中,反序列化的时候再从里面读出来。
BufferFactory.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package com.serial;


import java.nio.ByteOrder;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
/**
* buff工厂
*
*
*/
public class BufferFactory {

public static ByteOrder BYTE_ORDER = ByteOrder.BIG_ENDIAN;

/**
* 获取一个buffer
*
* @return
*/
public static ChannelBuffer getBuffer() {
ChannelBuffer dynamicBuffer = ChannelBuffers.dynamicBuffer();
return dynamicBuffer;
}

/**
* 将数据写入buffer
* @param bytes
* @return
*/
public static ChannelBuffer getBuffer(byte[] bytes) {
ChannelBuffer copiedBuffer = ChannelBuffers.copiedBuffer(bytes);
return copiedBuffer;
}

}

Player.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
package com.serial;

import java.util.ArrayList;
import java.util.List;

/**
* 玩家对象
*
*/
public class Player extends Serializer{

public Player() {
}

public Player(long playerId, int age, String name) {
this.playerId = playerId;
this.age = age;
this.name = name;
}

private long playerId;

private int age;

private String name;

private List<Integer> skills = new ArrayList<>();

public long getPlayerId() {
return playerId;
}

public void setPlayerId(long playerId) {
this.playerId = playerId;
}

public int getAge() {
return age;
}

public void setAge(int age) {
this.age = age;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public List<Integer> getSkills() {
return skills;
}

public void setSkills(List<Integer> skills) {
this.skills = skills;
}

@Override
protected void read() {
this.playerId = readLong();
this.age = readInt();
this.name = readString();
this.skills = readList(Integer.class);
}

@Override
protected void write() {
writeLong(playerId);
writeInt(age);
writeString(name);
writeList(skills);
}
}

test4.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package com.cn;

import java.util.Arrays;

public class Test4 {

public static void main(String[] args) {

Player player = new Player();
player.setPlayerId(10001);
player.setAge(22);
player.getSkills().add(101);
player.getResource().setGold(99999);

byte[] bytes = player.getBytes();

System.out.println(Arrays.toString(bytes));

//==============================================

Player player2 = new Player();
player2.readFromBytes(bytes);
System.out.println(player2.getPlayerId() + " "+player2.getAge() + " "+ Arrays.toString(player2.getSkills().toArray())+" " +player2.getResource().getGold());

}

}

输出

1
2
[0,0,0,0,39,17,0,0。。。。]
1001 33 101 9999

通过Seriazer的封装,结合前面ChannelBuffers的方式,将String list map需要传入大小的问题进行了结局。

对比分析protobuff原理 重点学习proto位运算的原理

初窥

通过上面的例子和上节的protobuff,传入相同的值到Player类中

1
2
[0,0,0,0,39,17,0,0。。。。]
1001 33 101 9999
1
2
[0,0,0,0,39,17]
1001 33 101 9999

当然上面的数据是我编的,可是protobuff生产出来的byte数组大小,比我们尽最大努力自定义的数组大小还要小很多。

分析protobuff源码

上节中的例子

1
2
Player player = builder.build();
byte[] byteArray = player.toByteArray();

所以看tobyteArray方法

1
2
3
4
5
6
7
8
9
10
11
12
13
public byte[] toByteArray() {
try {
final byte[] result = new byte[getSerializedSize()];
final CodedOutputStream output = CodedOutputStream.newInstance(result);
writeTo(output);
output.checkNoSpaceLeft();
return result;
} catch (IOException e) {
throw new RuntimeException(
"Serializing to a byte array threw an IOException " +
"(should never happen).", e);
}
}

看writeTo方法,writeTo是一个接口方法,查看具体实现类PlayerModule.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
  public void writeTo(com.google.protobuf.CodedOutputStream output)
throws java.io.IOException {
getSerializedSize();
if (((bitField0_ & 0x00000001) == 0x00000001)) {
output.writeInt64(1, playerID_);
}
if (((bitField0_ & 0x00000002) == 0x00000002)) {
output.writeInt32(2, age_);
}
if (((bitField0_ & 0x00000001) == 0x00000001)) {
output.writeBytes(3, getNameBytes());
}
for (int i = 0; i < skill.size(); i++) {
output.writeInt32(4, skill.get(i));
}

getUnknownFields().writeTo(output);
}

这里的writeInt64等方法中的第一个参数1234是上节中proto文件中的key,表示是第几个字节

1
2
3
4
5
required int64 playerId = 1;
required int32 age = 2;
required string name = 3;
//repeated的意思是list,重复int的list
repeated int32 skills = 4;

查看WriteInt32方法

1
2
3
4
5
6
7
public void writeInt32(final int fieldNumber, final int value)
throws IOException {
//写第几个字节
writeTag(fieldNumber, WireFormat.WIRETYPE_VARINT);
//写具体的年龄
writeInt32NoTag(value);
}

查看writeInt32NoTag方法

1
2
3
4
5
6
7
8
public void writeInt32NoTag(final int value) throws IOException {
if (value >= 0) {
writeRawVarint32(value);
} else {
// Must sign-extend.
writeRawVarint64(value);
}
}

查看writeRawVarint32方法

1
2
3
4
5
6
7
8
9
10
11
public void writeRawVarint32(int value) throws IOException {
while (true) {
if ((value & ~0x7F) == 0) {
writeRawByte(value);
return;
} else {
writeRawByte((value & 0x7F) | 0x80);
value >>>= 7;
}
}
}

0x7F 转化成二进制 0111 1111
~0x7F 取反后 1000 0000
value& ~0x7F 的结果就是value的1-7位都被置为了0,value是32位,前面还有25位可能有数据,所以可能value & ~0x7F != 0
所以如果value & ~0x7F == 0,说明value值的大小是小于7位的

如果小于后面7位的大小,就写一个字节的数据

1
2
3
4
public void writeRawByte(final byte value) throws IOException {
if (position == limit) {
refreshBuffer();
}

如果大于后面7位的大小

1
2
writeRawByte((value & 0x7F) | 0x80);
value >>>= 7;

value & 0x7F获取1-7位数据
0x80表示成二进制1000 0000
或运算 1xxx xxxx 通过第八位判断后面还有没有数据,如果有数据就是1,如果没有数据就是0,
所以这里说明有数据,往右移7位,接着读,然后在此判断剩下的数据是不是小于七位,如此循环。

因为这里的第一位用来表示还有没有数据,所以只能表示28位的数据位,而真正需要表示的是32位,所以还需要一个字节存储丢失的4位,所以int 的字节长度不是传统的4个,在proto中为伸缩的1-5个字节,long不是传统的8位,在proto中是伸缩的1-9位

-------------The End-------------

本文标题:基于Netty的RPC架构学习笔记(九):自定义序列化协议

文章作者:Leesin.Dong

发布时间:2019年03月10日 - 10:03

最后更新:2019年03月12日 - 20:03

原始链接:http://mmmmmm.me/2019-03-10-9.html

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。

客官客官,不可以,你要对我负责~~~