黑匣子
满天星
Fork me on GitHub

基于Netty的RPC架构学习笔记(七):netty学习之心跳

idleStateHandler

Netty提供的检测会话状态的工具。

netty3🌰

Server.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package com.heart;

import java.net.InetSocketAddress;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.handler.codec.string.StringDecoder;
import org.jboss.netty.handler.codec.string.StringEncoder;
import org.jboss.netty.handler.timeout.IdleStateHandler;
import org.jboss.netty.util.HashedWheelTimer;
/**
*
*/
public class Server {

public static void main(String[] args) {

//服务类
ServerBootstrap bootstrap = new ServerBootstrap();

//boss线程监听端口,worker线程负责数据读写
ExecutorService boss = Executors.newCachedThreadPool();
ExecutorService worker = Executors.newCachedThreadPool();

//设置niosocket工厂
bootstrap.setFactory(new NioServerSocketChannelFactory(boss, worker));
//new一个定时器
final HashedWheelTimer hashedWheelTimer = new HashedWheelTimer();
//设置管道的工厂
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {

@Override
public ChannelPipeline getPipeline() throws Exception {

ChannelPipeline pipeline = Channels.pipeline();
//需要在这里添加一个处理,handler才能处理
//IdleStateHandler参数,定时器(在上面定义了,需要new出来),读超时时间,写超时时间,读写超时时间。
pipeline.addLast("idle", new IdleStateHandler(hashedWheelTimer, 5, 5, 10));
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
pipeline.addLast("helloHandler", new HelloHandler());
return pipeline;
}
});

bootstrap.bind(new InetSocketAddress(10101));

System.out.println("start!!!");

}

}

HelloHanler.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package com.heart;

import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.handler.timeout.IdleState;
import org.jboss.netty.handler.timeout.IdleStateEvent;

public class HelloHandler extends SimpleChannelHandler {

@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {

System.out.println(e.getMessage());
}

@Override
public void handleUpstream(final ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
if (e instanceof IdleStateEvent) {
//e.getState返回状态,如果太久没有读,就会返回read,如果太久没有写就会返回write,没有度也没有写返回ALL_IDE
if(((IdleStateEvent)e).getState() == IdleState.ALL_IDLE){
System.out.println("提示玩家下线");
//关闭会话,踢玩家下线
//回写给用户,提示会话将会关闭。
ChannelFuture write = ctx.getChannel().write("time out, you will close");
write.addListener(new ChannelFutureListener() {

@Override
public void operationComplete(ChannelFuture future) throws Exception {
ctx.getChannel().close();
}
});
}
} else {
super.handleUpstream(ctx, e);
}
}

}

等10秒不读也不写就会自动关闭

netty5🌰

Server.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
package com.heart;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;

/**
* netty5服务端
* @au
*
*/
public class Server {

public static void main(String[] args) {
//服务类
ServerBootstrap bootstrap = new ServerBootstrap();

//boss和worker
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup();

try {
//设置线程池
bootstrap.group(boss, worker);

//设置socket工厂、
bootstrap.channel(NioServerSocketChannel.class);

//设置管道工厂
bootstrap.childHandler(new ChannelInitializer<Channel>() {

@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new IdleStateHandler(5, 5, 10));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new StringEncoder());
ch.pipeline().addLast(new ServerHandler());
}
});

//netty3中对应设置如下
//bootstrap.setOption("backlog", 1024);
//bootstrap.setOption("tcpNoDelay", true);
//bootstrap.setOption("keepAlive", true);
//设置参数,TCP参数
bootstrap.option(ChannelOption.SO_BACKLOG, 2048);//serverSocketchannel的设置,链接缓冲池的大小
bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);//socketchannel的设置,维持链接的活跃,清除死链接
bootstrap.childOption(ChannelOption.TCP_NODELAY, true);//socketchannel的设置,关闭延迟发送

//绑定端口
ChannelFuture future = bootstrap.bind(10101);

System.out.println("start");

//等待服务端关闭
future.channel().closeFuture().sync();

} catch (Exception e) {
e.printStackTrace();
} finally{
//释放资源
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}

ServerHandler.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
package com.heart;

import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
/**
* 服务端消息处理
* @
*
*/
public class ServerHandler extends SimpleChannelInboundHandler<String> {

@Override
protected void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception {

System.out.println(msg);

ctx.channel().writeAndFlush("hi");
ctx.writeAndFlush("hi");
}



@Override
public void userEventTriggered(final ChannelHandlerContext ctx, Object evt) throws Exception {
if(evt instanceof IdleStateEvent){
IdleStateEvent event = (IdleStateEvent)evt;
if(event.state() == IdleState.ALL_IDLE){

//清除超时会话
ChannelFuture writeAndFlush = ctx.writeAndFlush("you will close");
writeAndFlush.addListener(new ChannelFutureListener() {

@Override
public void operationComplete(ChannelFuture future) throws Exception {
ctx.channel().close();
}
});
}
}else{
super.userEventTriggered(ctx, evt);
}
}


/**
* 新客户端接入
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelActive");
}

/**
* 客户端断开
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelInactive");
}

/**
* 异常
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
}


}

总结

  1. 心跳其实就是一个普通的请求,特点数据简单,业务也简单
  1. 心跳对于服务端来说,定时清除闲置会话inactive(netty5) channelclose(netty3)

假如客户端突然停电了,这个时候会话保存着,然后有电了之后,再次请求会重新建立会话,原来的会话就成了僵尸会话,需要定时去清理这些僵尸会话。后台还可以返回系统时间,达到一个防作弊的效果,前端修改系统时间可以达到一个游戏加速的作用。

  1. 心跳对客户端来说,用来检测会话是否断开,是否重连! 用来检测网络延时!

手机app中经常有断开连接、重新连接的操作。玩王者的时候右上角有一个网络延迟,就是心跳做的。

-------------The End-------------

本文标题:基于Netty的RPC架构学习笔记(七):netty学习之心跳

文章作者:Leesin.Dong

发布时间:2019年03月10日 - 10:03

最后更新:2019年03月10日 - 23:03

原始链接:http://mmmmmm.me/2019-03-10-7.html

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。

客官客官,不可以,你要对我负责~~~