黑匣子
满天星
Fork me on GitHub

Java高并发程序设计学习笔记(十一):Jetty分析

本次笔记是对jetty,一个servlet容器,一个httpServer,内部的实现,因为代码量量比较多,不会从架构的方向介绍,作为一个servlet容器,不会介绍如何使用jsp,而是通过多线程的方向,从代码触发,看究竟是如何提高并发量的。
综上本次笔记并没有对jetty具体是如何用的做介绍,主要是学习jetty高并发的处理手段。

new Server()

1
2
3
4
5
public Server(@Name("port")int port) {
this((ThreadPool)null);
ServerConnector connector=new ServerConnector(this); connector.setPort(port);
setConnectors(new Connector[]{connector});
}

初始化线程池

1
2
3
4
public Server(@Name("threadpool") ThreadPool pool) {
_threadPool=pool!=null?pool:new QueuedThreadPool(); addBean(_threadPool);
setServer(this);
}

QueuedThreadPool

并不是初始化jdk的线程池,而是new自己的QueuedThreadPool,QueuedThreadPool实现了SizedThreadPool

execute()方法

1
2
3
4
5
6
7
8
9
10
@Override
public void execute(Runnable job) {
if (!isRunning() || !_jobs.offer(job)) {
LOG.warn("{} rejected {}", this, job);
throw new RejectedExecutionException(job.toString()); }
else {
// Make sure there is at least one thread executing the job. if (getThreads() == 0)
startThreads(1);
}
}

这里的核心是将任务压入队列 : _jobs.offer(job)

BlockingQueue

这里的_jobs是BlockingQueue,保存线程池执行的所有的任务。
将任务推入
BlockingQueueorg.eclipse.jetty.util.thread.QueuedThreadPool._jobs

BlockingQueue不是一个高性能的,所以execute不太可能被非常频繁的调用。

初始化ServerConnector

处理一些http的连接以及NIO、Selector一些的东西。
HTTP connector using NIO ByteChannels and Selectors
继承自 AbstractConnector

初始化ScheduledExecutorScheduler

调度器
based on JDK’s {@link ScheduledThreadPoolExecutor}.
有些任务是需要每隔一段时间执行一次的,比如每一分钟需要检查一些东西,诸如此类的任务就是由schedule来调度的。

初始化ByteBufferPool

ByteBuffer是一个数组,分为两种_indirect ByteBuffer(分配在堆当中)和_directByteBuffer(分配在内存当中),相当于一个对象池,这其中的byteBuffer实际上是可以复用的。对象池的好处就是可以减少gc,减少new。在java中new是经过了绝对的优化的,性能还是比较高,关键是回收,新生代的回收会更加的频繁。如果自己写一个线程池,还不如通过new、gc的方式,因为线程池必须是线程安全的,所以好多线程在用,拿到对象和归还对象的时候必须是线程安全的,如果简单的加synchronize,除非是特别大的超级对象,性能可能比new,好一点,否则,如果用synchronize构造的线程池性能还不如new出来的。

ArrayByteBufferPool

普通对象池中的对象都是对立的,我去拿任何一个对象都是一样的,都是等价的,但是ByteBufferPool不同,因为你可能需要2k的bytebuffer,也可能需要一个2m的bytebuffer

1
2
3
4
5
6
public ArrayByteBufferPool(int minSize, int increment, int maxSize)
public ArrayByteBufferPool() {
this(0,1024,64*1024);
}
_direct=new Bucket[maxSize/increment];
_indirect=new Bucket[maxSize/increment];

minSize其实大小(最小的容量),increment增量,maxSize最大的大小

结构

Bucket理解为篮子,一个篮子里面只放一中大小的buffer
因为1-63k不可能每一个都有,也许系统中只需要32k,这个时候1k、2k等等都不需要,所以有一个延迟加载的功能
_direct Bucket数组 _indirect Bucket数组
为每一个大小,新建一个Bucket 但不初始化ByteBuffer

1
2
3
4
5
6
int size=0;
for (int i=0;i<_direct.length;i++) {
size+=_inc;
_direct[i]=new Bucket(size);
_indirect[i]=new Bucket(size);
}

一个Bucekt存放大小相同的所有的ByteBuffer
_size
bytebuffer大小
_queue
public final Queue_queue= new ConcurrentLinkedQueue<>();
初始化ByteBuPool的时候所有的Bucket都创建一遍,但是Bucket里面的Queue里面的内容都是延迟加载的

acquire

请求线程池。

1
public ByteBuffer acquire(int size, boolean direct)

size多大的内存,direct直接内存还是堆。
取得合适的Bucket 每个Bucket的大小不同,这里找到最合适的,取最接近的Buket大小

1
Bucket bucket = bucketFor(size,direct);

从Bucket中取得ByteBuffer

1
ByteBuffer buffer = bucket==null?null:bucket._queue.poll();

不存在则新建

1
2
3
4
if (buffer == null) {
int capacity = bucket==null?size:bucket._size;
buffer = direct ? BufferUtil.allocateDirect(capacity) : BufferUtil.allocate(capacity);
}

release

用完了释放掉,还给线程池。

1
2
3
4
5
6
7
8
9
10
public void release(ByteBuffer buffer) {
if (buffer!=null) {
Bucket bucket = bucketFor(buffer.capacity(),buffer.isDirect());
if (bucket!=null)
{
BufferUtil.clear(buffer);
bucket._queue.offer(buffer);
}
}
}

取得合适的Bucket

1
Bucket bucket = bucketFor(buffer.capacity(),buffer.isDirect());

清空Buffer

1
BufferUtil.clear(buffer);

归还Pool

1
bucket._queue.offer(buffer);

例外处理

如果申请的ByteBuffer过大或者过小,无法在POOL中满足,则可以申请成功,但无法归还给POOL。
加入目前只有1-64k的bytebuffer,但是我需要120k大小的bytebuffer,就需要申请。
可以看到上面的require没有的话会创建,但是因为无法归还,所以最后会被gc

总结

通过学习ByteBufferPool,首先是对于所有的对象池都应该无锁的,如果有锁,还不如new出来,第二个就是bucket,因为bytebuffer大小是不确定的,所做的相应的处理。

维护ConnectionFactory

HttpConnectionFactory
用于创建连接, 比如Accept后,需要创建一个表示连接的对象

取得可用CPU数量

1
int cores = Runtime.getRuntime().availableProcessors();

我的系统中应该使用多少accept线程,应该使用多少个selector线程,都要由cores算出来的,这也就是对于高并发的程序来讲,你必须自适应cpu的数量,否则在2核的cpu上和64核的cpu上,完全没有办法很好的协调,

更新acceptor数量

根据cpu的数量更新acceptor数量

1
2
if (acceptors < 0)
acceptors=Math.max(1, Math.min(4,cores/8));

它认为accept的数量应该是比较小的,上面可以发现不会超过四个。

创建acceptor线程组

1
_acceptors = new Thread[acceptors];

初始化ServerConnectorManager

继承自 SelectorManager

1
_manager = new ServerConnectorManager(getExecutor(), getScheduler(), selectors>0?selectors:Math.max(1,Math.min(4,Runtime.getRuntime().availableProcessors()/2)));

保存selector线程数量

1
Math.min(4,Runtime.getRuntime().availableProcessors()/2))

最多也不超过四个

设置port

1
connector.setPort(port);

关联Sever和Connector

1
setConnectors(new Connector[]{connector});

Server.start()

org.eclipse.jetty.server.Server
启动web服务器

1
2
3
4
WebAppContext context = new WebAppContext();
context.setContextPath("/");
context.setResourceBase("./web/"); context.setClassLoader(Thread.currentThread().getContextClassLoader()); server.setHandler(context);
server.start();

设置启动状态

1
2
3
4
5
6
7
AbstractLifeCycle
private void setStarting() {
if (LOG.isDebugEnabled()) LOG.debug("starting {}",this);
_state = __STARTING;
for (Listener listener : _listeners)
listener.lifeCycleStarting(this);
}

启动过程doStart()

Server
启动整个server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
protected void doStart() throws Exception
{
//If the Server should be stopped when the jvm exits, register //with the shutdown handler thread.
if (getStopAtShutdown())
ShutdownThread.register(this);
//Register the Server with the handler thread for receiving //remote stop commands
ShutdownMonitor.register(this);
//Start a thread waiting to receive "stop" commands. ShutdownMonitor.getInstance().start(); // initialize
LOG.info("jetty-" + getVersion()); HttpGenerator.setJettyVersion(HttpConfiguration.SERVER_VERSION); MultiException mex=new MultiException();
// check size of thread pool
SizedThreadPool pool = getBean(SizedThreadPool.class); int max=pool==null?-1:pool.getMaxThreads();
int selectors=0;
int acceptors=0;
if (mex.size()==0)
{
for (Connector connector : _connectors) {
if (connector instanceof AbstractConnector) acceptors+=((AbstractConnector)connector).getAcceptors();
if (connector instanceof ServerConnector) selectors+=((ServerConnector)connector).getSelectorManager().getSelectorCount();
}
}
int needed=1+selectors+acceptors; if (max>0 && needed>max)
throw new IllegalStateException(String.format("Insufficient threads: max=%d < needed(acceptors=%d + selectors=%d + request=1)",max,acceptors,selectors));
try {
super.doStart();
}
catch(Throwable e) {
mex.add(e); }
// start connectors last
for (Connector connector : _connectors) {
try {
connector.start();
}catch(Throwable e) {
mex.add(e);
}
}
if (isDumpAfterStart()) dumpStdErr();
mex.ifExceptionThrow();
LOG.info(String.format("Started @%dms",Uptime.getUptime()));
}

注册ShutdownMonitor

远程控制接口

1
2
3
//Register the Server with the handler thread for receiving //remote stop commands
ShutdownMonitor.register(this);
//Start a thread waiting to receive "stop" commands. ShutdownMonitor.getInstance().start(); // initialize

允许远程的将jetty的server关掉。

获取化线程池

1
2
// check size of thread pool
SizedThreadPool pool = getBean(SizedThreadPool.class);

QueuedThreadPool

设置selector数量

根据Connector数量进行累计 大部分情况下,只有一个ServerConnector,因为每个connector都有selector

1
2
3
4
for (Connector connector : _connectors) {
if (connector instanceof AbstractConnector) acceptors+=((AbstractConnector)connector).getAcceptors();
if (connector instanceof ServerConnector) selectors+=((ServerConnector)connector).getSelectorManager().getSelectorCount();
}

累计所有Connector的需求

计算所需的所有线程数量

int needed=1+selectors+acceptors;

如果大于默认的200则中断程序

1
2
3
if (max>0 && needed>max)
throw new IllegalStateException(String.format("Insufficient threads: max=%d < needed(acceptors=%d
+ selectors=%d + request=1)",max,acceptors,selectors));

因为如果光accept和selector线程都超过200说明性能很差了,没有必要再继续往下跑了。

维护Bean

启动QueuedThreadPool

doStart()
startThreads()建立需要的线程

创建线程

Thread thread = newThread(_runnable);
_runnable _jobs中取任务并执行

设置线程的属性

thread.setDaemon(isDaemon()); thread.setPriority(getThreadsPriority()); thread.setName(_name + “-“ + thread.getId()); _threads.add(thread);

启动线程 thread.start();

启动WebAppContext

如果需要使用,在此处启动,去做servlet规范所做的内容,这里就不展开讲了。

启动Connector

取得ConnectionFactory

1
_defaultConnectionFactory = getConnectionFactory(_defaultProtocol);

创建selector线程并启动

1
2
3
4
5
for (int i = 0; i < _selectors.length; i++) {
ManagedSelector selector = newSelector(i); _selectors[i] = selector;
selector.start();
execute(new NonBlockingThread(selector));
}

newSelector()

1
2
3
protected ManagedSelector newSelector(int id) {
return new ManagedSelector(id);
}

创建Acceptor线程

1
2
3
4
5
6
7
8
9
//创建几个accept线程
_stopping=new CountDownLatch(_acceptors.length);
for (int i = 0; i < _acceptors.length; i++)
{
Acceptor a = new Acceptor(i);
addBean(a);
//执行
getExecutor().execute(a);
}

Acceptor

设置线程名字
1
2
3
4
5

final Thread thread = Thread.currentThread();
String name=thread.getName();
_name=String.format("%s-acceptor-%d@%x-
%s",name,_acceptor,hashCode(),AbstractConnector.this.toString()); thread.setName(_name);
设置优先级
将自己放入_acceptors数组
1
2
3
synchronized (AbstractConnector.this) {
_acceptors[_acceptor] = thread;
}
监听端口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
try {
while (isAccepting()) {
try {
accept(_acceptor); }
catch (Throwable e) {
if (isAccepting())
LOG.warn(e);
else LOG.ignore(e);
}
}
} finally {
thread.setName(name);
if (_acceptorPriorityDelta!=0) thread.setPriority(priority);
synchronized (AbstractConnector.this) {
_acceptors[_acceptor] = null;
}
CountDownLatch stopping=_stopping;
if (stopping!=null)
stopping.countDown();
}

ServerConnector.accept()

1
2
3
4
5
6
public void accept(int acceptorID) throws IOException {
ServerSocketChannel serverChannel = _acceptChannel; if (serverChannel != null && serverChannel.isOpen()) {
SocketChannel channel =serverChannel.accept();
accepted(channel);
}
}

在accept的地方等待

没有Acceptor的情况

channle默认是blocking的
如果acceptor数量为0,没有安排线程专门进行accept,则设置为非阻塞模式 若是非0,有专门线程进行accept,因此,为阻塞模式

1
2
3
4
5
6
7
protected void doStart() throws Exception {
super.doStart();
if (getAcceptors()==0) {
_acceptChannel.configureBlocking(false);
_manager.acceptor(_acceptChannel);
}
}

启动完毕

Http请求

Accept成功

1
2
3
4
5
6
private void accepted(SocketChannel channel) throws IOException {
channel.configureBlocking(false);
Socket socket = channel.socket();
configure(socket);
_manager.accept(channel);
}

设置为非阻塞模式

1
channel.configureBlocking(false);

配置Socket

1
2
Socket socket = channel.socket();
configure(socket);

正式处理

1
2
SelectorManager _manager;
_manager.accept(channel);

选择可用的ManagedSelector线程

1
2
3
4
5
6
7
8
private ManagedSelector chooseSelector() {
//The ++ increment here not atomic, but does not matter,
// so long as the value chages sometimes,then connections will
//be distributed over the available selectors
long s = _selectorIndex++;
int index = (int)(s % getSelectorCount());
return _selectors[index];
}

这里有一个很有意思的注释,++并不是原子操作,但是不影响,因为这里只需要++就可以了,并不需要每次的数字搜不一样。

ManagedSelector处理

ManagedSelector 是一个线程 封装了Selector 的使用

提交任务
1
selector.submit(selector.new Accept(channel, attachment));

提交这个处理任务到ManagedSelector:

1
private final Queue<Runnable> _changes = new ConcurrentArrayQueue<>(); _changes.offer(change);

任务提交到了_chages这个队列中,ConcurrentArrayQueue这个队列是jetty中自己实现的。
ConcurrentArrayQueue
与ConcurrentLinkedQueue相似的性能,但直接保存元素 而不是node,因此需要更少的对象,更少的GC
因为ConcurrentLinkedQueue是一个链表,所以要有node,并且指向下一个node,而ConcurrentArrayQueue不是一个链表,所以需要更少的对象,更少的GC

请求处理

ManagedSelector.run()

1
2
while (isRunning()) 
select();

select()

发现有任务就执行

runChanges();

参见: 提交任务

1
2
3
4
5
    private void runChanges() {
Runnable change;
while ((change = _changes.poll()) != null)
runChange(change);
}

将_changes中的任务拿出来做执行。
runChange()
change.run();

Accept.run

1
2
3
SelectionKey key = channel.register(_selector, 0, attachment); 
EndPoint endpoint = createEndPoint(channel, key);
key.attach(endpoint);

注册selector并将endpoint传给后面。

select()

runChanges实质上是将selector和channel连接到一起,接下来就是等待read或者write准备好

1
int selected = _selector.select();
处理SelectionKey

当发现任何一个selector可用的时候就会处理SelectionKey

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Set<SelectionKey> selectedKeys = _selector.selectedKeys(); for (SelectionKey key : selectedKeys)
{
if (key.isValid()) {
processKey(key);
}
else {
if (debug)
LOG.debug("Selector loop ignoring invalid key for channel {}", key.channel());
Object attachment = key.attachment();
if (attachment instanceof EndPoint)
((EndPoint)attachment).close();
}
}
selectedKeys.clear();

processKey()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private void processKey(SelectionKey key) {
Object attachment = key.attachment(); try
{
if (attachment instanceof SelectableEndPoint) {
((SelectableEndPoint)attachment).onSelected();
}
else if (key.isConnectable()) {
processConnect(key, (Connect)attachment);
}
else if (key.isAcceptable())
{
processAccept(key);
} else {
throw new IllegalStateException(); }
}
catch (CancelledKeyException x) {
LOG.debug("Ignoring cancelled key for channel {}", key.channel());
if (attachment instanceof EndPoint)
closeNoExceptions((EndPoint)attachment);
}
catch (Throwable x) {
LOG.warn("Could not process key for channel " + key.channel(), x);
if (attachment instanceof EndPoint)
closeNoExceptions((EndPoint)attachment);
}
}

onSelected()

1
2
3
4
5
6
7
8
9
10
11
@Override
public void onSelected() {
assert _selector.isSelectorThread();
int oldInterestOps = _key.interestOps();
int readyOps = _key.readyOps();
int newInterestOps = oldInterestOps & ~readyOps; setKeyInterests(oldInterestOps, newInterestOps);
updateLocalInterests(readyOps, false);
if (_key.isReadable())
getFillInterest().fillable(); if (_key.isWritable())
getWriteFlusher().completeWrite();
}

会使用新的线程进行HTTP业务处理 (提交到线程池)

-------------The End-------------

本文标题:Java高并发程序设计学习笔记(十一):Jetty分析

文章作者:Leesin.Dong

发布时间:2019年02月22日 - 19:02

最后更新:2019年02月22日 - 23:02

原始链接:http://mmmmmm.me/2019-02-22.html

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。

客官客官,不可以,你要对我负责~~~