黑匣子
满天星

基于Netty的RPC架构学习笔记(五):netty线程模型源码分析(二)

小技巧(如何看开源框架的源码)

一断点
二打印
三看调用栈
四搜索

源码解析

1
2
3
//设置niosocket工厂
//NioServerSocketChannelFactory看下面
bootstrap.setFactory(new NioServerSocketChannelFactory(boss, worker));

NioServerSocketChannelFactory.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public NioServerSocketChannelFactory(
Executor bossExecutor, Executor workerExecutor) {
//首先获取当前worker的数量,代码看下面的SelectorUtil.java
//接着会调用下面三个参数的构造方法NioServerSocketChannelFactory
this(bossExecutor, workerExecutor, getMaxThreads(workerExecutor));
}
public NioServerSocketChannelFactory(
Executor bossExecutor, Executor workerExecutor,
int workerCount) {
//接着调用下面四个参数的构造方法NioServerSocketChannelFactory
//boss默认给的1
this(bossExecutor, 1, workerExecutor, workerCount);
}
public NioServerSocketChannelFactory(
Executor bossExecutor, int bossCount, Executor workerExecutor,
int workerCount) {
//开始new一个worker的池子
//代码看下面的NioWorkerPool.java
this(bossExecutor, bossCount, new NioWorkerPool(workerExecutor, workerCount));
}

SelectorUtil.java

1
2
3
4
5
6
7
8
9
10
11
12
13
//默认数量是当前的核数成2
static final int DEFAULT_IO_THREADS = Runtime.getRuntime().availableProcessors() * 2;

private static int getMaxThreads(Executor executor) {
//MaxThreads最大池大小
if (executor instanceof ThreadPoolExecutor) {
final int maxThreads = ((ThreadPoolExecutor) executor).getMaximumPoolSize();
//取maxThreads和DEFAULT_IO_THREADS两者的最小值
return Math.min(maxThreads, SelectorUtil.DEFAULT_IO_THREADS);
}
//因为我们之前的例子中是给的无限大小的池子,所以这里返回DEFAULT_IO_THREADS
return SelectorUtil.DEFAULT_IO_THREADS;
}

NioWorkerPool.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public NioWorkerPool(Executor workerExecutor, int workerCount) {
//调用自己的NioWorkerPool方法,在下面
this(workerExecutor, workerCount, null);
}
public NioWorkerPool(Executor workerExecutor, int workerCount, ThreadNameDeterminer determiner) {
//调用父类的构造方法
super(workerExecutor, workerCount, false);
this.determiner = determiner;
//init了一次,代码看下面init方法
init();
}
//实现了抽象方法newWorker
@Override
protected NioWorker newWorker(Executor executor) {
//new了一个NioWorker
return new NioWorker(executor, determiner);
}

NioWOrker.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
public NioWorker(Executor executor, ThreadNameDeterminer determiner) {
//调用父类方法,看下面AbstractNioWorker.java
super(executor, determiner);
}
@Override
protected boolean read(SelectionKey k) {
final SocketChannel ch = (SocketChannel) k.channel();
final NioSocketChannel channel = (NioSocketChannel) k.attachment();

final ReceiveBufferSizePredictor predictor =
channel.getConfig().getReceiveBufferSizePredictor();
final int predictedRecvBufSize = predictor.nextReceiveBufferSize();
final ChannelBufferFactory bufferFactory = channel.getConfig().getBufferFactory();

int ret = 0;
int readBytes = 0;
boolean failure = true;

ByteBuffer bb = recvBufferPool.get(predictedRecvBufSize).order(bufferFactory.getDefaultOrder());
try {
while ((ret = ch.read(bb)) > 0) {
readBytes += ret;
if (!bb.hasRemaining()) {
break;
}
}
failure = false;
} catch (ClosedChannelException e) {
// Can happen, and does not need a user attention.
} catch (Throwable t) {
fireExceptionCaught(channel, t);
}

if (readBytes > 0) {
bb.flip();
//在这里封装成了ChannelBuffer
final ChannelBuffer buffer = bufferFactory.getBuffer(readBytes);
buffer.setBytes(0, bb);
buffer.writerIndex(readBytes);

// Update the predictor.
predictor.previousReceiveBufferSize(readBytes);

// Fire the event.产生一个上传的事件
//channels里面的一个方法
// * @param message the received message
//public static void fireMessageReceived(Channel channel, Object message) {
//fireMessageReceived(channel, message, null);
//}
fireMessageReceived(channel, buffer);
}

if (ret < 0 || failure) {
k.cancel(); // Some JDK implementations run into an infinite loop without this.
close(channel, succeededFuture(channel));
return false;
}

return true;
}

AbstractNioWorker.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
AbstractNioWorker(Executor executor, ThreadNameDeterminer determiner) {
//又调用了父类的抽象方法,看下面AbstractNioSelector.java
super(executor, determiner);
}
@Override
protected void process(Selector selector) throws IOException {
Set<SelectionKey> selectedKeys = selector.selectedKeys();
// check if the set is empty and if so just return to not create garbage by
// creating a new Iterator every time even if there is nothing to process.
// See https://github.com/netty/netty/issues/597
if (selectedKeys.isEmpty()) {
return;
}
for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
SelectionKey k = i.next();
i.remove();
try {
int readyOps = k.readyOps();
if ((readyOps & SelectionKey.OP_READ) != 0 || readyOps == 0) {
//读数据,向上看NioWorker中的read方法
if (!read(k)) {
// Connection already closed - no need to handle write.
continue;
}
}
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
writeFromSelectorLoop(k);
}
} catch (CancelledKeyException e) {
close(k);
}

if (cleanUpCancelledKeys()) {
break; // break the loop to avoid ConcurrentModificationException
}
}
}

AbstractNioSelector.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
AbstractNioSelector(Executor executor, ThreadNameDeterminer determiner) {
//给了一个线程池
this.executor = executor;
//openSelector看下面openSelector方法
openSelector(determiner);
}
private void openSelector(ThreadNameDeterminer determiner) {
try {
//设置当前的selector
selector = SelectorUtil.open();
} catch (Throwable t) {
throw new ChannelException("Failed to create a selector.", t);
}

// Start the worker thread with the new Selector.
boolean success = false;
try {
//把这个Nioworker跑起来,因为NioWorker本身是继承AbstractNioSelector这个类的
//所以跑的是这个类的run方法
//从哪里启动呢?往下看DeadLockProofWorker.java
//也就是调用的newThreadRenamingRunnable(id, determiner)
//newThreadRenamingRunnable看下面AbstractNioWorker.java
DeadLockProofWorker.start(executor, newThreadRenamingRunnable(id, determiner));
success = true;
} finally {
if (!success) {
// Release the Selector if the execution fails.
try {
selector.close();
} catch (Throwable t) {
logger.warn("Failed to close a selector.", t);
}
selector = null;
// The method will return to the caller at this point.
}
}
assert selector != null && selector.isOpen();
}
public void run() {
for (;;) {
//标记wakenup状态
wakenUp.set(false);
//状态监测的代码
...
//取任务
processTaskQueue();
//业务处理,这是一个抽象方法,被三个类实现AbstractNioWorker、NioClientBoss、
//AbstractNioWorker中的process在上方
//NioServerBoss中的process在下方
process(selector);
}

NioServerBoss.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
@Override
protected void process(Selector selector) {
Set<SelectionKey> selectedKeys = selector.selectedKeys();
if (selectedKeys.isEmpty()) {
return;
}
for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
SelectionKey k = i.next();
i.remove();
NioServerSocketChannel channel = (NioServerSocketChannel) k.attachment();

try {
// accept connections in a for loop until no new connection is ready
for (;;) {
//accept事件
SocketChannel acceptedSocket = channel.socket.accept();
if (acceptedSocket == null) {
break;
}
//注册的方法在本类的下方,向worker线程里面注册任务
registerAcceptedChannel(channel, acceptedSocket, thread);
}
} catch (CancelledKeyException e) {
// Raised by accept() when the server socket was closed.
k.cancel();
channel.close();
} catch (SocketTimeoutException e) {
// Thrown every second to get ClosedChannelException
// raised.
} catch (ClosedChannelException e) {
// Closed as requested.
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to accept a connection.", t);
}

try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
// Ignore
}
}
}
}

private static void registerAcceptedChannel(NioServerSocketChannel parent, SocketChannel acceptedSocket,
Thread currentThread) {
try {
ChannelSink sink = parent.getPipeline().getSink();
ChannelPipeline pipeline =
parent.getConfig().getPipelineFactory().getPipeline();
//找到一个worker,通过下面的方法把每个工作均匀的分配给每一个worker
// public E nextWorker() {
//return (E) workers[Math.abs(workerIndex.getAndIncrement() % workers.length)];
// }
NioWorker worker = parent.workerPool.nextWorker();
//向worker里面注册任务,而不是直接去操作worker
//让他关注一下socketchannel,即acceptedSocket注册这个东西
//因为register这个方法是继承AbstractNioSelector的,即完成之后需要提交任务并记过wakenup
//代码粘贴过来了
//public void register(Channel channel, ChannelFuture future) {
//Runnable task = createRegisterTask(channel, future);
//registerTask(task);
// }
// protected final void registerTask(Runnable task) {
// taskQueue.add(task);
//Selector selector = this.selector;
//if (selector != null) {
//if (wakenUp.compareAndSet(false, true)) {
//selector.wakenup();
//}
//} else {
//if (taskQueue.remove(task)) {
// the selector was null this means the Worker has already been shutdown.
//throw new RejectedExecutionException("Worker has already been shutdown");
//}
// }
// }
worker.register(new NioAcceptedSocketChannel(
parent.getFactory(), pipeline, parent, sink
, acceptedSocket,
worker, currentThread), null);
} catch (Exception e) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to initialize an accepted socket.", e);
}

try {
acceptedSocket.close();
} catch (IOException e2) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to close a partially accepted socket.",
e2);
}
}
}
}

AbstractNioWorker.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
protected ThreadRenamingRunnable newThreadRenamingRunnable(int id, ThreadNameDeterminer determiner) {
//this当前的NioWorker,然后给了一个线程的名称
return new ThreadRenamingRunnable(this, "New I/O worker #" + id, determiner);
}
public ThreadRenamingRunnable(Runnable runnable, String proposedThreadName, ThreadNameDeterminer determiner) {
if (runnable == null) {
throw new NullPointerException("runnable");
}
if (proposedThreadName == null) {
throw new NullPointerException("proposedThreadName");
}
this.runnable = runnable;
this.determiner = determiner;
this.proposedThreadName = proposedThreadName;
}
try {
//这里runnable执行其实就是NioWorker进行了run
//NioWorker.run其实是运行父类的AbstractNioWorker run
//AbstractNioWorker 的run又调用的是父类AbstractNioSelector的run方法
//AbstractNioSelector的run方法,往上面看
runnable.run();
}

DeadLockProofWorker.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public final class DeadLockProofWorker {

/**
* An <em>internal use only</em> thread-local variable that tells the
* {@link Executor} that this worker acquired a worker thread from.
*/
public static final ThreadLocal<Executor> PARENT = new ThreadLocal<Executor>();

public static void start(final Executor parent, final Runnable runnable) {
if (parent == null) {
throw new NullPointerException("parent");
}
if (runnable == null) {
throw new NullPointerException("runnable");
}
//通过线程池启动一个Runnable
parent.execute(new Runnable() {
public void run() {
PARENT.set(parent);
try {
//调用Runnable的run方法,然后返回去看
runnable.run();
} finally {
PARENT.remove();
}
}
});
}

private DeadLockProofWorker() {
}
}

AbstractNioWorkerPool.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
//这里到了他的父类AbstractNioWorkerPool
blic void run() {
thread = Thread.currentThread();
startupLatch.countDown();

int selectReturnsImmediately = 0;
Selector selector = this.selector;
...
}
//数组workers
private final AbstractNioWorker[] workers;
//父类的构造方法方法
AbstractNioWorkerPool(Executor workerExecutor, int workerCount, boolean autoInit) {
if (workerExecutor == null) {
throw new NullPointerException("workerExecutor");
}
if (workerCount <= 0) {
throw new IllegalArgumentException(
"workerCount (" + workerCount + ") " + "must be a positive integer.");
}
//有一个workers的数组,new了workerCount的数组
workers = new AbstractNioWorker[workerCount];
this.workerExecutor = workerExecutor;
if (autoInit) {
init();
}
}
protected void init() {
if (!initialized.compareAndSet(false, true)) {
throw new IllegalStateException("initialized already");
}

for (int i = 0; i < workers.length; i++) {
//对worker进行初始化,具体实现往下看
workers[i] = newWorker(workerExecutor);
}

waitForWorkerThreads();
}
//是一个抽象方法,具体实现看上面的NioWorkerPool.java中的newWorker方法
protected abstract E newWorker(Executor executor);

@SuppressWarnings("unchecked")
public E nextWorker() {
return (E) workers[Math.abs(workerIndex.getAndIncrement() % workers.length)];
}

public void rebuildSelectors() {
for (AbstractNioWorker worker: workers) {
worker.rebuildSelector();
}
}

阅读源码技巧

打印查看

1
2
3
4
5
6
7
8
9
10
11
12
13
for(;;){
sout(Thread.currentThread().getName()+" "+wakenup);
wakenUp.set(false);
...
sout(Thread.currentThread().getName()+" "+select);
int selected = select(selector);
...
sout(Thread.currentThread().getName()+" "+processTaskQueue);
processTaskQueue();
...
sout(Thread.currentThread().getName()+" "+process);
process(selector);
}

输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
...
New I/O server boss #2 process
New I/O server boss #2 wakenup
New I/O server boss #2 select
New I/O server boss #2 processTaskQueue
New I/O server boss #2 process
New I/O server boss #2 wakenup
New I/O server boss #2 select
...
New I/O worker #1 wakenup
New I/O worker #1 select
New I/O worker #1 processTaskQueue
New I/O worker #1 process
New I/O worker #1 wakenup
New I/O worker #1 select
New I/O worker #1 processTaskQueue
New I/O worker #1 process
...

可以看到worker是四个四个循环往复,可是boss线程为什么在select就没有继续执行了?

通过打断点调试

将断点打在wakenup.set
为了只看boss线程,eclipse进入断点的控制页面–》右上角breanpoints–》选中本类右击–》BreakPoint Properties–》新的页面中勾选Conditional–》下面输入“Thread.currentThread().getName().contains(“boss”)”–》点击ok

通过打断点发现到了select方法的时候点进去,然后没有执行默认的select(500),这样的方法,而是执行的select()这种阻塞的方式,所以阻塞住了。

查看调用栈

通过查看调用栈的方式查看某个方法具体调用的堆栈信息
eclipse在debug界面的左上角
idea在debug界面的左下角

-------------The End-------------

本文标题:基于Netty的RPC架构学习笔记(五):netty线程模型源码分析(二)

文章作者:Leesin.Dong

发布时间:2019年03月10日 - 10:03

最后更新:2019年03月10日 - 23:03

原始链接:http://mmmmmm.me/2019-03-10-5.html

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。

客官客官,不可以,你要对我负责~~~