黑匣子
满天星
Fork me on GitHub

基于Netty的RPC架构学习笔记(二):netty服务器

简介

netty版本大致版本分为 netty3.x 和 netty4.x、netty5.x

netty可以运用在那些领域?

  1. 分布式进程通信
    例如: hadoop、dubbo、akka等具有分布式功能的框架,底层RPC通信都是基于netty实现的,这些框架使用的版本通常都还在用netty3.x

  2. 游戏服务器开发
    最新的游戏服务器有部分公司可能已经开始采用netty4.x 或 netty5.x

Netty服务端Hello World案例

举个🌰

server.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package com.server;

import java.net.InetSocketAddress;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.handler.codec.string.StringDecoder;
import org.jboss.netty.handler.codec.string.StringEncoder;
/**

- netty服务端入门

-
*
*/
public class Server {

public static void main(String[] args) {


//服务类
ServerBootstrap bootstrap = new ServerBootstrap();

//boss线程监听端口,worker线程负责数据读写
ExecutorService boss = Executors.newCachedThreadPool();
ExecutorService worker = Executors.newCachedThreadPool();

//设置niosocket工厂
bootstrap.setFactory(new NioServerSocketChannelFactory(boss, worker));

//设置管道的工厂,管道是服务,相当于装了一大堆的过滤器
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {

@Override
public ChannelPipeline getPipeline() throws Exception {

ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
//接收消息
pipeline.addLast("helloHandler", new HelloHandler());
return pipeline;
}
});
//绑定端口
bootstrap.bind(new InetSocketAddress(10101));

System.out.println("start!!!");


}

}

HelloHandler.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
package com.server;

import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
/**

- 消息接受处理类

-
*
*/
public class HelloHandler extends SimpleChannelHandler {

/**

- 接收消息
*/
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {

String s = (String) e.getMessage();
System.out.println(s);

//回写数据
ctx.getChannel().write("hi");
super.messageReceived(ctx, e);
}

/**

- 捕获异常
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
System.out.println("exceptionCaught");
super.exceptionCaught(ctx, e);
}

/**

- 新连接
*/
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
System.out.println("channelConnected");
super.channelConnected(ctx, e);
}

/**

- 必须是链接已经建立,关闭通道的时候才会触发
*/
@Override
public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
System.out.println("channelDisconnected");
super.channelDisconnected(ctx, e);
}

/**

- channel关闭的时候触发
*/
@Override
public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
System.out.println("channelClosed");
super.channelClosed(ctx, e);
}
}

启动Server.java
输出

1
start!!!

telnet
输入

1
telnet 127.0.0.1 10101

回车
输出

1
channelConnected

telnet
输入

1
send hello

回车
输出

1
messageReceived

在messageReceived方法处加int i = i/10
输出

1
2
messageReceived
exception

关闭telnet
输出

1
2
channelDisconnected
channelClosed

channelDisconnected和channelClosed的区别
channelDisconnected:必须是链接已经建立,关闭通道的时候才会触发
channelClosed:channel关闭的时候触发
客户端连接不成功的话不会触发channelDisconnected,只会触发channelClosed方法

小改进

  • 接收数据改进
    注意messageReceived方法中本应该是
1
2
ChannelBuffer message = (ChannelBuffer)e.getMessage();
String s = new String(message.array());

实际上应该是Netty进行了封装
Server.java中加入

1
pipeline.addLast("decoder", new StringDecoder());

messageReceived中就可以改成

1
String s = e.getMessage();

这样和上面是一样的效果

  • 回写数据
    messageReceived方法中
    1
    ctx.getChannel().write("hi");

这样是会报错的,应该传一个ChannelBuffer

1
2
ChannelBuffer copoedBuffer = ChannelBuffers.copiedBuffer("hi".getBytes());
ctx.getChannel.write(copoedBuffer);

可是这样还是挺麻烦的Netty还是帮我们封装好了
在Server.java中

1
pipeline.addLast("encoder", new StringEncoder());

会写数据就可知直接

1
ctx.getChannel().write("hi");

重点讲解

1
2
3
4
5
6
7
8
9
10
@Override
public ChannelPipeline getPipeline() throws Exception {

ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
//接收消息
pipeline.addLast("helloHandler", new HelloHandler());
return pipeline;
}

管道分为消息分为了上行和下行
StringDecoder继承ChannelIpstreamHandler
StringEncoder继承了ChannelDownStreamHandler
HelloHandler继承了SimpleChannelHandler
上行会经过HelloHandler,然后下行回写数据就会经过StringEncoder,然后再回写数据给客户端

总结

netty服务端hello world案例

1
2
3
4
5
6
7
8
9
10
11

SimpleChannelHandler 处理消息接收和写
{
messageReceived接收消息

channelConnected新连接,通常用来检测IP是否是黑名单,每次做一个统计,
当程序员恶意通过客户端不断地发送请求,
经过识别,就会channel.close关闭掉,通过处理加入黑名单。

channelDisconnected链接关闭,可以再用户断线的时候清楚用户的缓存数据等
}

boss和worker线程池里面其实是一个线程,里面是selector,boss selector是用来监听端口的,worker selector是负责channel的读写任务的

channelDisconnected与channelClosed的区别?

channelDisconnected只有在连接建立后断开才会调用
channelClosed无论连接是否成功都会调用关闭资源

-------------The End-------------

本文标题:基于Netty的RPC架构学习笔记(二):netty服务器

文章作者:Leesin.Dong

发布时间:2019年03月10日 - 10:03

最后更新:2019年03月10日 - 22:03

原始链接:http://mmmmmm.me/2019-03-10-2.html

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。

客官客官,不可以,你要对我负责~~~