博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Netty服务器端启动流程分析
阅读量:4150 次
发布时间:2019-05-25

本文共 10690 字,大约阅读时间需要 35 分钟。

简介:上面一篇文章从宏观上介绍了,会有一个大概的认识,这篇文章主要是从源码上来分析netty启动的过程。

这个是netty服务器启动的一段代码:

首先:创建了一个NioEventLoopGroup对象,我们先去看下这个对象主要的内容是什么

接下来在MultithreadEventLoopGroup中

从参数传递上来看,传入的是nThreads ==0

这段代码的意思就是,如果环境变量里面配置了io.netty.eventLoopThread这个属性,那么就拿这个配置的线程数来和1比较,两者之间比较大的,就是默认DEFAULT_EVENT_LOOP_THREADS 的值,否则的话,就去当前操作系统核心线程数*2 来作为这个数据的值。

继续往下进入,MultiThreadEventExectorGroup对象当中。

 
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,                                        EventExecutorChooserFactory chooserFactory, Object... args) {    if (nThreads <= 0) {        throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));    }    if (executor == null) {        executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());    }    children = new EventExecutor[nThreads]; //创建一个数组为n的NioEventLoop    for (int i = 0; i < nThreads; i ++) {        boolean success = false;        try {            children[i] = newChild(executor, args);            success = true;        } catch (Exception e) {            // TODO: Think about if this is a good exception type            throw new IllegalStateException("failed to create a child event loop", e);        } finally {            if (!success) {                for (int j = 0; j < i; j ++) {                    children[j].shutdownGracefully();                }                for (int j = 0; j < i; j ++) {                    EventExecutor e = children[j];                    try {                        while (!e.isTerminated()) {                            e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);                        }                    } catch (InterruptedException interrupted) {                        // Let the caller handle the interruption.                        Thread.currentThread().interrupt();                        break;                    }                }            }        }    }    chooser = chooserFactory.newChooser(children);    final FutureListener terminationListener = new FutureListener() {        @Override        public void operationComplete(Future future) throws Exception {            if (terminatedChildren.incrementAndGet() == children.length) {                terminationFuture.setSuccess(null);            }        }    };    for (EventExecutor e: children) {        e.terminationFuture().addListener(terminationListener);    }    Set
childrenSet = new LinkedHashSet
(children.length); Collections.addAll(childrenSet, children); readonlyChildren = Collections.unmodifiableSet(childrenSet);}

创建一个数组长度为n的EventExector数组,

这个对象其实就是NioEventLoop,数组的创建就是基本属性的一些赋值,

provider对象,当前在windows系统上,WindowsSelectorProvider对象

一下是NioEventLoop的继承关系图,是EventLoopGroup的子类,EventLoopGroup当中包含的是一个NioEventLoop的数组

接下来看一下ChooserFactory创建一个chooser对象,这个chooser主要是用来决定每次在调用eventLoop时,选择数组当中哪个eventLoop对象。

对于上面的代码,如果EventLoop的数组长度是2的幂次方,会调用PowerOfTwo

如果不是的话,会调用下面这个生成选择策略,这两个类的实现就是当前类的一个内部类,netty当中对性能已经达到了极致的追求。

如果是2的幂次方,采用&的方式来直接决定返回那个EventLoop对象,如果不是的话,就使用取模的方式,来获取返回的EventLoop对象。

以上就是NioEventLoop对象的创建时,netty底层所执行的操作。

接下来我们按照上面代码的编写顺序,进行下一步的分析。

ServerBootstrap b = new ServerBootstrap();
 

创建服务端的启动类

这个类比较简单,就是创建一个对象

ServerBootstrap这个类的编写符合链式调用,所以我们从group方法,其实后面的一系列方法都是对对象属性的赋值,我们还是要大概过一遍。

ServerBootstrap中childGroup赋值

父类当中group赋值,服务器启动时,一个是bossGroup,事件循环组,负责接收连接,接收到之后传给workerGroup,workerGroup 负责具体的I/O事件。

ServerBootstrap 中的 channel 

接下来到ReflectiveChannelFactory 这个类中,这个类,见名知意,反射的Channel工厂,主要是用类来通过反射的方式,创建其对象,newChannel() 方法这里可以留意下,后面还会分析到 q

AbstractBootstrap中,生成了channelFactory 对象,就是我们上面说的ReflectiveChannelFactory对象

以上就是ServerBootstrap对象创建之后,相关属性的赋值.

 
ChannelFuture channelFuture =  serverBootstrap.bind(8899).sync();

接下来去看bind方法的实现

① 这个方法开始逐步分析,这个方法在AbstractBootstrap 类当中

 
final ChannelFuture initAndRegister() {    Channel channel = null;    try {        channel = channelFactory.newChannel(); ①// 上面我们分析到 channelFactory 是reflectiveChannelFactory
    //它的newChannel 方法创建一个NioServerSocketChannel 方法的实例,通过不带参数的构造方法        init(channel);    } catch (Throwable t) {        if (channel != null) {            // channel can be null if newChannel crashed (eg SocketException("too many open files"))            channel.unsafe().closeForcibly();        }        // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor        return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);    }    ChannelFuture regFuture = config().group().register(channel);    if (regFuture.cause() != null) {        if (channel.isRegistered()) {            channel.close();        } else {            channel.unsafe().closeForcibly();        }    }    // If we are here and the promise is not failed, it's one of the following cases:    // 1) If we attempted registration from the event loop, the registration has been completed at this point.    //    i.e. It's safe to attempt bind() or connect() now because the channel has been registered.    // 2) If we attempted registration from the other thread, the registration request has been successfully    //    added to the event loop's task queue for later execution.    //    i.e. It's safe to attempt bind() or connect() now:    //         because bind() or connect() will be executed *after* the scheduled registration task is executed    //         because register(), bind(), and connect() are all bound to the same thread.    return regFuture;}

① NioServerSocketChannel 通过反射的方式创建一个对象,接下来我们去分析NioServerSocketChannel的创建过程

注:这个地方使用的都是Java NIO 包当中的内容,下面这个链接中是曾经提出来的一个问题,之前是在每一个Channel创建时去获取provider,但是provider的创建中包含锁,这会造成在创建大量channel时不必要的线程阻塞,所以建议就是保存一个provider的对象,直接调用openServerSocketChannel 创建一个Channel对象,而不是通过ServerSocketChannel.open()的方式,这样每次创建的时候都会阻塞

接下来会看到调用构造方法:

监听到SelectionKey.OP_ACCPET连接事件。

1、方法:

最后一行可以看到 创建一个ChannelPipepline 对象

this就是当前channel对象,因为在创建channel的时候,会创建其关联的 pipeline,一个channel在其生命周期内只和一个ChannelPipeline关联。

pipeline在创建的时候,会指定其head,tail这里暂时先不详细分析Pipeline对象

AbstractNioChannel 中把ServerSocketChannel 设置成非阻塞

2、方法 创建ChannelConfig对象

这里面会创建一个 AdaptiveRecvByteBufAllocator对象,是netty中用来根据每次读到的数据大小来自动分配下次读取时内存大小,这个我们后面再分析。

到此这个是NioServerSocketChannel的创建过程。

到这里是channelFactory.newChannel()的创建过程

接下来就是init(channel)这个方法的代码

void init(Channel channel) throws Exception {    final Map
, Object> options = options0(); synchronized (options) { setChannelOptions(channel, options, logger); } final Map
, Object> attrs = attrs0(); synchronized (attrs) { for (Entry
, Object> e: attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey
key = (AttributeKey) e.getKey(); channel.attr(key).set(e.getValue()); } } ChannelPipeline p = channel.pipeline(); final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry
, Object>[] currentChildOptions; final Entry
, Object>[] currentChildAttrs; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size())); } synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size())); } p.addLast(new ChannelInitializer
() { @Override public void initChannel(final Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } });}
 

这个例子中目前没有options, attrs 所以可以直接跳过,看下面的,pipeline就是我们上面创建好的DefaultChannelPipeline()

在初始化的时候,会在pipeline中添加一个 handler,ServerBootstrapAcceptor 是 ChannelInboundHandlerAdaptor 的一个实现类。

接下来是回到 AbstractBootstrap 中

关于注册这段代码

group() 这个我们前面在创建ServerBootstrap 时分析过,就是workerGroup 对象是NioEventLoopGroup()

regitster() 方法的实现,我们debug可以看到是在MultithreadEventLoopGroup中

接下来我们去看这个super.next()方法(MultithreadEventExecutorGroup)

 这里面我们看到chooser.next() 的方法,这个chooser 就是我们之前分析过的

就是上面分析的,会根据children 数组长度,是不是2的幂次方,来决定使用什么chooser,这个chooser就是我们之前说的为了保证 children 数组当中每个元素都可以被均匀的选中。

round-robin 算法来进行

chooser 当中next() 方法返回的就是children 数组中的一个元素。NioEventLoop

接下来再去看这个register方法

SingleThreadEventLoop当中

promise().channel() 就是我们前面创建的channel对象,unsafe() 对象

register方法的实现是在AbatractChannel中

@Overridepublic final void register(EventLoop eventLoop, final ChannelPromise promise) {    if (eventLoop == null) {        throw new NullPointerException("eventLoop");    }    if (isRegistered()) { ①        promise.setFailure(new IllegalStateException("registered to an event loop already"));        return;    }    if (!isCompatible(eventLoop)) {        promise.setFailure(                new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));        return;    }    AbstractChannel.this.eventLoop = eventLoop;    if (eventLoop.inEventLoop()) { ②        register0(promise);    } else {        try {            eventLoop.execute(new Runnable() { ③                @Override                public void run() {                    register0(promise);                }            });        } catch (Throwable t) {            logger.warn(                    "Force-closing a channel whose registration task was not accepted by an event loop: {}",                    AbstractChannel.this, t);            closeForcibly();            closeFuture.setClosed();            safeSetFailure(promise, t);        }    }}
 
① 判断channel对象是否已经注册到NioEventLoop上面,如果已经注册了,直接返回

    ② 判断当前线程是否在NioEventLoop所在的线程

在SingleThreadEventLoop当中维护了一个thread,如果当前线程和SingleThreadEventLoop中的线程属于一个线程,直接执行③,如果不是,将当前注册方法,作为任务提交到eventLoop中

③ 

主要是分析doRegister() 方法,实现体在AbstractNioChannel 中

@Overrideprotected void doRegister() throws Exception {    boolean selected = false;    for (;;) {        try {            selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);            return;        } catch (CancelledKeyException e) {            if (!selected) {                // Force the Selector to select now as the "canceled" SelectionKey may still be                // cached and not removed because no Select.select(..) operation was called yet.                eventLoop().selectNow();                selected = true;            } else {                // We forced a select operation on the selector before but the SelectionKey is still cached                // for whatever reason. JDK bug ?                throw e;            }        }    }}
 

这部分就是我们熟悉的Nio 相关部分代码,eventLoop当中维护了一个selector对象,javaChannel() 就是ServerSocketChannelImpl()对象,到目前,已经将Channel对象注册到了Selector上面。

你可能感兴趣的文章
终于有人把安卓程序员必学知识点全整理出来了,面试必会
查看>>
经典Android开发教程!面试字节跳动两轮后被完虐,附面试题答案
查看>>
经典实战教程!闭关60天学懂NDK+Flutter,值得收藏!
查看>>
给Android程序员的一些面试建议,实战解析
查看>>
给Android程序员的一些面试建议,已开源
查看>>
网易资深安卓架构师:Android开发经验的有效总结,醍醐灌顶!
查看>>
网络优化软件apk,金九银十怎么从中小企业挤进一线大厂?我先收藏为敬
查看>>
美团Android开发工程师岗位职能要求,知乎上已获万赞
查看>>
android结束进程,卧薪尝胆70天内推入职阿里,附答案
查看>>
android自学教程!BAT等大厂必问技术面试题,BAT大厂面试总结
查看>>
Android自定义View详解,知乎上已获万赞
查看>>
android路由器app,Android权限处理,2年以上经验必看
查看>>
android路由表,我了解到的面试的一些小内幕!附超全教程文档
查看>>
android路由跳转,Android面试资料集合,完整PDF
查看>>
android进程共享,记一次字节跳动Android社招面试,全网最新
查看>>
Android进程管理,有了这些中高端面试专题-大厂还会远吗?使用指南
查看>>
android适配屏幕大小,Android-MVP模式详解,全网疯传
查看>>
Android面试中常问的MMAP到底是啥东东?终局之战
查看>>
Android开发者跳槽面试,积累总结
查看>>
Android开发者面试如何系统复习?帮你突破瓶颈
查看>>