您的当前位置:首页正文

【Netty】netty启动流程源码解析

2024-11-24 来源:个人技术集锦

Netty整体架构

一个启动流程

这是一个简单的服务端HTTP的Demo,我们按照每行代码的方式 ,逐步分析整个启动过程具体是做了哪些工作。


/**
 * @author qxlx
 * @date 2024/7/28 20:52
 */
public class SImpleServer {

    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup bossGrpup = new NioEventLoopGroup(1);
        EventLoopGroup wrokerGrpup = new NioEventLoopGroup();

        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(bossGrpup,wrokerGrpup)
                .channel(NioServerSocketChannel.class)
                .handler(new SimpleServerHandler())
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {

                    }
                });

        ChannelFuture f = bootstrap.bind(8888).sync();

        f.channel().closeFuture().sync();
    }

    private static class SimpleServerHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("channelActive");
        }

        @Override
        public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
            System.out.println("channelRegistered");
        }

        @Override
        public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
            System.out.println("handlerAdded");
        }
    }

}

源码解析

new NioEventLoopGroup(1)

    public NioEventLoopGroup(int nThreads) {
        this(nThreads, (Executor) null);
    }

    protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
        // 不指定线程数,则使用默认自定义的线程数
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
    }

可以看到通过调用super父类的方式,判断传入的nThreads的线程数等于0,就使用默认的方式,比如当前是10核,就创建20个线程。否则就采用传入执行的线程数。我们知道对于boss线程来说,其实本身就只有一个线程进行接收客户端传入的线程,所以线程数就是1。

    private static final int DEFAULT_EVENT_LOOP_THREADS;

    // 获取属性对应的值,获取不到 就是CPU的两倍
    static {
        DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
                "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
        }
    }

构建线程池

    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
		// 判断线程数是否小于0                                            
        checkPositive(nThreads, "nThreads");

        // 如果没有构建,使用默认的
        if (executor == null) {
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }

        // 产生nThreads个Nio保存在数组中
        // EventExecutor 是excutor的子类
        // 这里因为创建的是1个线程。创数组大小是0
        children = new EventExecutor[nThreads];

        for (int i = 0; i < nThreads; i ++) {
            boolean success = false;
            try {
               // ⭐️
                children[i] = newChild(executor, args);
                success = true;
            } catch (Exception e) {
                // TODO: Think about if this is a good exception type
                throw new IllegalStateException("failed to create a child event loop", e);
            } finally {
                // 执行失败 将前面创建成功的直接shutDown  下面是关于异常的处理
            }
        }

        chooser = chooserFactory.newChooser(children);

        final FutureListener<Object> terminationListener = new FutureListener<Object>() {
            @Override
            public void operationComplete(Future<Object> future) throws Exception {
                if (terminatedChildren.incrementAndGet() == children.length) {
                    terminationFuture.setSuccess(null);
                }
            }
        };
		
		// 遍历线程 给每个线程创建都田间一个关闭的监听器
        for (EventExecutor e: children) {
            e.terminationFuture().addListener(terminationListener);
        }
    }

可以看到EventExecutor 其实顶层的父类就是Executor,所以就是一个线程池的实现。

基础信息构建

    protected EventLoop newChild(Executor executor, Object... args) throws Exception {
        // 参数传递进来的
        return new NioEventLoop(this, executor, selectorProvider,
                selectStrategyFactory.newSelectStrategy(),
                rejectedExecutionHandler, taskQueueFactory, tailTaskQueueFactory);
    }

    // 构造方法进行初始化
    NioEventLoop(参数 忽略) {
        super(parent, executor, false, newTaskQueue(taskQueueFactory), newTaskQueue(tailTaskQueueFactory),
                rejectedExecutionHandler);
        this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
        this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
        // NIO的封装 // 完成selector的创建
        final SelectorTuple selectorTuple = openSelector();
        // 创建selector
        this.selector = selectorTuple.selector;
        this.unwrappedSelector = selectorTuple.unwrappedSelector;
    }

    this.taskQueue = ObjectUtil.checkNotNull(taskQueue, "taskQueue");

线程选择策略

这里根据不同的选择器 实现不同策略,如果是2的倍数,选择使用& 否则就是 %的算法。

    public EventExecutorChooser newChooser(EventExecutor[] executors) {
        if (isPowerOfTwo(executors.length)) {
            return new PowerOfTwoEventExecutorChooser(executors);
        } else {
            return new GenericEventExecutorChooser(executors);
        }
    }
	
     public EventExecutor next() {
         return executors[idx.getAndIncrement() & executors.length - 1];
     }
     
     public EventExecutor next() {
         return executors[(int) Math.abs(idx.getAndIncrement() % executors.length)];
     }

总结下,上述过程就是这样的。

group

这里其实就是调用父类 AbstractBootstrap 设置主group , 然后在设置 子group

    public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
        // 初始化父类 将bossGroup赋值
        super.group(parentGroup);
        // 如果childGroup变量 如果已经有值,抛出异常
        if (this.childGroup != null) {
            throw new IllegalStateException("childGroup set already");
        }
        // childGroup = workerGroup赋值
        this.childGroup = ObjectUtil.checkNotNull(childGroup, "childGroup");
        return this;
    }

channel

将 NioServerSocketChannel 作为Class对象传入。然后获取构造方法,然后获取NioServerSocketChannel的构造方法,在之后的流程中,其实就是利用反射创建NioServerSocketChannel对象。

    public B channel(Class<? extends C> channelClass) {
        // 直接调用channelFactory工厂,创建一个ReflectiveChannelFactory 工厂方法
        return channelFactory(new ReflectiveChannelFactory<C>(
                ObjectUtil.checkNotNull(channelClass, "channelClass")
        ));
    }

    public ReflectiveChannelFactory(Class<? extends T> clazz) {
		// 通过Class对象 获取构造参数
        // 等待后续的利用反射进行创建对象
        this.constructor = clazz.getConstructor();
    }

	// channelFactory赋值
    this.channelFactory = channelFactory;

handler

设置handler , 然后返回self(); 其实就是返回this。这样就可以实现链式编程。

    public B handler(ChannelHandler handler) {
        // 设置handler
        this.handler = ObjectUtil.checkNotNull(handler, "handler");
        return self();
    }

    private B self() {
        return (B) this;
    }

childHandler

设置childHandler 属性,可以看到很多地方都做了判空的处理逻辑。

    public ServerBootstrap childHandler(ChannelHandler childHandler) {
        // 设置childHanlder
        this.childHandler = ObjectUtil.checkNotNull(childHandler, "childHandler");
        // this是当前对象的引用
        return this;
    }

其实到这里,都是一些前戏的工作。

bind

initAndRegister

反射创建 NioServerSocketChannel 对象

因为之前channelFactory设置的是 ReflectiveChannelFactory ,所以就是利用构造方法创建NioServerSocketChannel的对象,既然创建对象,那么必定会调用 NioServerSocketChannel的构造方法。

	channel = channelFactory.newChannel();
	// 对象实例化
	return constructor.newInstance();

构造方法里,其实就是 创建了一个NIO的通道对象,ServerSocketChannel的实现 ServerSocketChannelImpl

    public NioServerSocketChannel() {
        this(DEFAULT_SELECTOR_PROVIDER);
    }
	// 创建一个底层变量
	private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();

  // 核心是创建一个 ServerSocketChannel
   private static ServerSocketChannel newChannel(SelectorProvider provider, InternetProtocolFamily family) {
       try {
           // 创建一个channel
           ServerSocketChannel channel =
                   SelectorProviderUtil.newChannel(OPEN_SERVER_SOCKET_CHANNEL_WITH_FAMILY, provider, family);
           // 打开一个服务端套接字通道对象
           // 底层其实就是创建ServerSocketChannel ⭐️ ServerSocketChannelImpl
           return channel == null ? provider.openServerSocketChannel() : channel;
       } catch (IOException e) {
           throw new ChannelException("Failed to open a socket.", e);
       }
   }

    public NioServerSocketChannel(ServerSocketChannel channel) {
        // 设置为接受事件
        super(null, channel, SelectionKey.OP_ACCEPT);
        // 设置config属性
        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    }

    protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent);
        this.ch = ch;
        this.readInterestOp = readInterestOp; // OP_ACCEPT
            // 设置当前的serverSocketChannel为非阻塞的
            ch.configureBlocking(false);
    }
init

1.设置channel的option
2.设置channel的attr
3.设置handler的pipeline
4.pipeline添加channelInitializer对象 ,并且使用 ch.eventLoop().execute 线程池 创建了一个专门接受客户端请求的Acceptor对象。添加到pipeline管道对象中

    void init(Channel channel) {
        // 1.设置新介入的channel的option
        setChannelOptions(channel, newOptionsArray(), logger);

        // 2、设置新接入channel的attr
        setAttributes(channel, newAttributesArray());

        // 3、设置handler到pipeline上
        // 获取到通道流水线对象
        ChannelPipeline p = channel.pipeline();

        final EventLoopGroup currentChildGroup = childGroup;
        final ChannelHandler currentChildHandler = childHandler;
        final Entry<ChannelOption<?>, Object>[] currentChildOptions = newOptionsArray(childOptions);
        final Entry<AttributeKey<?>, Object>[] currentChildAttrs = newAttributesArray(childAttrs);
        final Collection<ChannelInitializerExtension> extensions = getInitializerExtensions();

        // 4.pipeline 添加一个channelInitializer对象 ServerBootstrapAcceptor
        // 通道初始化对象
        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) {
                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }

                // 这是一个接入器,专门接受新请求,把新的请求扔给某个事件循环器
                // 看到这里,我们发现其实init只是初始化了一些基本的配置和属性,
                // 以及在pipeline上加入了一个接入器,用来专门接受新连接,并没有启动服务.
                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs,
                                extensions));
                    }
                });
            }
        });
        if (!extensions.isEmpty() && channel instanceof ServerChannel) {
            ServerChannel serverChannel = (ServerChannel) channel;
            for (ChannelInitializerExtension extension : extensions) {
                try {
                    extension.postInitializeServerListenerChannel(serverChannel);
                } catch (Exception e) {
                    logger.warn("Exception thrown from postInitializeServerListenerChannel", e);
                }
            }
        }
    }

注册channel

	ChannelFuture regFuture = config().group().register(channel);
	// 启动线程池 
	eventLoop.execute(new Runnable() { // ⭐️
         @Override
         public void run() {
             register0(promise); // 分析
         }
    });
    
	doRegister();

这里其实启动一个线程,一个死循环 接受将channel注册到selector中。 然后通过selector就可以获取有事件的线程进行操作。 ServerSocketChannel注册到Selector中。

    protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                // ServerSocketChannel注册到Selector中。
                selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                return;
            } catch (CancelledKeyException e) {
                if (!selected) {
                    // Force the Selector to select now as the "canceled" SelectionKey may still be
                    // cached and not removed because no Select.select(..) operation was called yet.
                    eventLoop().selectNow();
                    selected = true;
                } else {
                    // We forced a select operation on the selector before but the SelectionKey is still cached
                    // for whatever reason. JDK bug ?
                    throw e;
                }
            }
        }
    }
	// 在注册完毕之后,调用这个就会触发
    pipeline.invokeHandlerAddedIfNeeded();

doBind0

因为调用链路比较长,最终 其实就是根据版本的不同 将端口和channel进行绑定。

    protected void doBind(SocketAddress localAddress) throws Exception {
        if (PlatformDependent.javaVersion() >= 7) { // 全连接队列的大小
            javaChannel().bind(localAddress, config.getBacklog());
        } else {
            javaChannel().socket().bind(localAddress, config.getBacklog());
        }
    }

流程图

思考

对于一个NIO原生程序来说,本质做了几件事情

  • 创建EventLoop对象
  • 完成ServerSocketChannel的创建 初始化过程
  • 以及ServetSocketChannel的注册工作
  • 端口绑定
  • 设置事件

根据 我们从源码分析一个常见netty的启动过程的过程本质上,也是完成了对上述过程NIO的封装过程。基础属性的设置,创建 NioServerSocketChannel的实例 本质就是 ServetSocketChannel。以及完成注册到selector的过程。最后完成端口绑定的工作。虽然netty抽象几个概念,但是万变不离其中,本质还是NIO的封装。

那么剩下来,要研究的重点是什么呢?

  • eventloop 线程是如何接受任务的。
  • unsafe的写过程
  • channeloutbounBuffer
  • channelPipeline
  • netty常见优化方案
  • fastThreadLocal
  • 内存池
  • 事件轮
  • 编码器
显示全文