您的当前位置:首页正文

【Netty 从成神到升仙系列 一】Netty 服务端的启动源码剖析(一)

2024-11-24 来源:个人技术集锦

Netty源码

相信学过 Netty 的小伙伴都应该熟悉 JavaNIO,在 Java 中创建服务端和客户端的代码如下所示:

服务端

// 1. 创建一个 selector 对象
Selector selector = Selector.open();

ServerSocketChannel ssc = ServerSocketChannel.open(); // 创建FD-1
ssc.configureBlocking(false); // 非阻塞模式

// 2. 建立 selector 与 channel 的联系(注册)
// SelectionKey:事件发生时,通过这个可以知道事件和那个channel的事件
// 这个key,只关注 accept 事件
SelectionKey sscKey = ssc.register(selector, SelectionKey.OP_ACCEPT, null);

// 3. 注册端口号
ssc.bind(new InetSocketAddress(8080));

客户端

// 建立客户端的channel
SocketChannel channel = SocketChannel.open();
// 连接服务端的IP和端口
channel.connect(new InetSocketAddress("localhost", 8080));
// 发送消息
channel.write(Charset.defaultCharset().encode("hello"));

我们的 Netty 正是在 Java NIO 做的一层封装

既然是封装,Netty 的源码中必然存在以上 服务端客户端 的代码

一、Netty 服务端

1. Netty 服务端启动代码

public class TestNettyServer {
    public static void main(String[] args) {
        // 1. 服务器端的启动器,负责组装 netty 主键,启动服务器
        new ServerBootstrap()
                // 2. BossEventLoop、WorkerEventLoop(selector、thread)
                .group(new NioEventLoopGroup())
                // 3. 选择服务器的IO模式
                .channel(NioServerSocketChannel.class)
                // 4. boss 负责处理连接 worker 负责处理读写
                .childHandler(
                        // 5. channel 代表和客户端进行数据读写的通道 Initializer 初始化,负责添加别的 handler
                        new ChannelInitializer<NioSocketChannel>() {
                            @Override
                            protected void initChannel(NioSocketChannel ch) throws Exception {
                                // 6. 添加具体的 handler
                                ch.pipeline().addLast(new LoggingHandler());
                            }
                        })
                // 7. 绑定监听端口
                .bind(8080);
    }
}
  • Netty 中获取选择器 SelectorNetty 中使用 NioEventloopGroup 中的 NioEventloop 封装了线程和选择器

  • 创建NioServerSocketChannel,该 Channel作为附件添加到 ServerSocketChannel

    // 创建 NioServerSocketChannel,同时会初始化它关联的 handler,以及为原生 ssc 存储 config
    NioServerSocketChannel attachment = new NioServerSocketChannel();
    //注册(仅关联 selector 和 NioServerSocketChannel),未关注事件
    SelectionKey selectionKey = serverSocketChannel.register(selector, 0, attachment);
    
  • 绑定端口

要剖析启动代码,我们直接从 bind 入手

2. Bind

选择器 Selector 的创建是在 NioEventloopGroup 中完成的。

NioServerSocketChannelServerSocketChannel 的创建,ServerSocketChannel 注册到 Selector 上以及绑定的操作都由 bind 完成。

所以,我们的启动入口:io.netty.bootstrap.AbstractBootstrap.bind

public ChannelFuture bind(SocketAddress localAddress) {
        this.validate();
        return this.doBind((SocketAddress)ObjectUtil.checkNotNull(localAddress, "localAddress"));
}

3. doBind

private ChannelFuture doBind(final SocketAddress localAddress) {
    // 负责NioServerSocketChannel和ServerSocketChannel的创建
    // ServerSocketChannel的注册工作
    // init由main线程完成,regisetr由NIO线程完成
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();
    if (regFuture.cause() != null) {
        return regFuture;
    }

    // 因为register操作是异步的
    // 所以要判断主线程执行到这里时,register操作是否已经执行完毕
    if (regFuture.isDone()) {
        // At this point we know that the registration was complete and successful.
        ChannelPromise promise = channel.newPromise();
        
        // 执行doBind0绑定操作
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {
        // Registration future is almost always fulfilled already, but just in case it's not.
        // 如果register操作还没执行完,就会到这个分支中来
        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
        
        // 添加监听器,NIO线程异步进行doBind0操作
        regFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                Throwable cause = future.cause();
                if (cause != null) {
                    // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                    // IllegalStateException once we try to access the EventLoop of the Channel.
                    promise.setFailure(cause);
                } else {
                    // Registration was successful, so set the correct executor to use.
                    // See https://github.com/netty/netty/issues/2586
                    promise.registered();
						 // 执行doBind0绑定操作
                    doBind0(regFuture, channel, localAddress, promise);
                }
            }
        });
        return promise;
    }
}
3.1 init
Channel channel = null;
try {
    // 通过 channelFactory 创建 NIOServerSocketChannel
    channel = this.channelFactory.newChannel();
    this.init(channel);
} catch (Throwable var3) {
    if (channel != null) {
        channel.unsafe().closeForcibly();
        return (new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE)).setFailure(var3);
    }
    return (new DefaultChannelPromise(new FailedChannel(),GlobalEventExecutor.INSTANCE)).setFailure(var3);
}
3.1.1 创建 NIOServerSocketChannel

我们看一下通过这个 channelFactory 怎么创建的 NIOServerSocketChannel,并如何实现 ServerSocketChannel.open()

public T newChannel() {
    try {
        return (Channel)this.constructor.newInstance();
    }
}

很明显,利用的反射的原理创建的 Channel

我们追进去看看,构造方法:

private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
// 构造方法
public NioServerSocketChannel() {
    this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
private static java.nio.channels.ServerSocketChannel newSocket(SelectorProvider provider) {
    return provider.openServerSocketChannel();
}

所以,最终调用的是:SelectorProvider.provider().openServerSocketChannel()

而这个调用,正好和我们 ServerSocketChannel ssc = ServerSocketChannel.open(); 的调用链路一模一样,我用图展示一下:

通过上图,我们可以明显的看到,Netty 在初始化的时候,实际上调用 NioServerSocketChannel 的构造方法,通过其实现了 ServerSocketChannel.open(),新建 FD

到这里,我们的初始化 NioServerSocketChannel 告一段落,下面看一下 Register 操作

3.1.2 添加 NIOServerSocketChannel 初始化 Handler

我们继续往下追,可以看到有一个 this.init(channel); 方法,这个方法添加了一个 Handler

p.addLast(new ChannelHandler[]{new ChannelInitializer<Channel>() {
    public void initChannel(final Channel ch) {
        final ChannelPipeline pipeline = ch.pipeline();
        ChannelHandler handler = ServerBootstrap.this.config.handler();
        if (handler != null) {
            pipeline.addLast(new ChannelHandler[]{handler});
        }

        ch.eventLoop().execute(new Runnable() {
            public void run() {
                pipeline.addLast(new ChannelHandler[]{new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)});
            }
        });
    }
}});

我们暂时不解释这里的作用,我们只需要记住这里添加了一个 Handler,后面会进行调用(后面调用的时候会讲)。

3.2 Register

我们先总览一下 Register 的源码部分:

ChannelFuture regFuture = this.config().group().register(channel);
if (regFuture.cause() != null) {
    if (channel.isRegistered()) {
        channel.close();
    } else {
        channel.unsafe().closeForcibly();
    }
}
return regFuture;
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
	 // 获取当前的eventLoop
    AbstractChannel.this.eventLoop = eventLoop;
	 // 此处完成了由 主线程 到 NIO线程 的切换
    // eventLoop.inEventLoop()用于判断当前线程是否为NIO线程
    if (eventLoop.inEventLoop()) {
        register0(promise);
    } else {
        // 向NIO线程中添加任务
        eventLoop.execute(new Runnable() {
            @Override
            public void run() {
                // 该方法中会执行doRegister
                // 执行真正的注册操作
                register0(promise);
            }
        });
    }
}
3.2.1 register0
private void register0(ChannelPromise promise) {
    try {
        // 执行真正的注册逻辑
        doRegister();
        neverRegistered = false;
        registered = true;

        // 调用init中的initChannel方法
        pipeline.invokeHandlerAddedIfNeeded();

        safeSetSuccess(promise);
    }
}

我们的注册整体上主要分为 2 部分

  • 执行真正的注册逻辑,也就是:ssc.register(selector, SelectionKey.OP_ACCEPT, attach);
  • 让我们之前写的 Handler 执行

doRegister

  • eventLoop().unwrappedSelector():我们的 Selector 存储的位置
  • this:将当前的 NIOServerSocketChannel 作为附件,便于之后的获取
protected void doRegister() throws Exception {
    boolean selected = false;
    for (;;) {
        try {
            // 注册
            selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
            return;
        }
    }
}

pipeline.invokeHandlerAddedIfNeeded:此方法会调用我们一开始定义的 Handler

p.addLast(new ChannelInitializer<Channel>() {
    @Override
    public void initChannel(final Channel ch) {
        final ChannelPipeline pipeline = ch.pipeline();
        ChannelHandler handler = config.handler();
        if (handler != null) {
            pipeline.addLast(handler);
        }
		  // 添加新任务,任务负责添加 handler
        // 该handler负责发生Accepet事件后建立连接
        ch.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                pipeline.addLast(new ServerBootstrapAcceptor(
                        ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
            }
        });
    }
});

Register 主要实现了 ssc.register(selector, SelectionKey.OP_ACCEPT, attach); 的注册功能,并在创建时进行了线程切换,从 main线程NIO线程

到这里,我们的 Register 也告一段落

3.3 doBind0

我们上面讲到了 safeSetSuccess(promise) ,向我们的 promise 设置成功的结果,并通过下面的 doBind0(regFuture, channel, localAddress, promise); 进行调用

final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
    return regFuture;
}
// 
if (regFuture.isDone()) {
    ChannelPromise promise = channel.newPromise();
    doBind0(regFuture, channel, localAddress, promise);
    return promise;
} else {
    final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
    regFuture.addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            Throwable cause = future.cause();
            if (cause != null) {.
                promise.setFailure(cause);
            } else {
                promise.registered();

                doBind0(regFuture, channel, localAddress, promise);
            }
        }
    });
    return promise;
}

底层实现如下:

public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
    assertEventLoop();
    boolean wasActive = isActive();
    try {
        // 注册端口
        doBind(localAddress);
    } 
    // 前面一系列操作后,我们的 NIOServerSocketChannel 是否可用
    if (!wasActive && isActive()) {
        invokeLater(new Runnable() {
            @Override
            public void run() {
                pipeline.fireChannelActive();
            }
        });
    }
    
    safeSetSuccess(promise);
}

注册端口的代码

@SuppressJava6Requirement(reason = "Usage guarded by java version check")
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
    // 根据当前 JDK 的版本是否大于 7
    if (PlatformDependent.javaVersion() >= 7) {
        // 调用ServerSocketChannel的bind方法,绑定端口
        // javaChannel() = ServerSocketChannel
        javaChannel().bind(localAddress, config.getBacklog());
    } else {
        javaChannel().socket().bind(localAddress, config.getBacklog());
    }
}

到这里,我们已经完成了端口的注册,距离我们服务器的启动只差绑定 SelectionKey.OP_ACCEPT

我们后续看一下:pipeline.fireChannelActive();

pipeline.fireChannelActive:触发所有该 pipeline 上的事件

我们 NettyHandler 信息如下:

除了我们自定义的 Handler,我们需要查看一下 Head 的代码

最终实现代码如下:

@Override
protected void doBeginRead() throws Exception {
    final SelectionKey selectionKey = this.selectionKey;
    if (!selectionKey.isValid()) {
        return;
    }

    readPending = true;

    final int interestOps = selectionKey.interestOps();
    // readInterestOp = 1 << 4 = 16
    if ((interestOps & readInterestOp) == 0) {
        selectionKey.interestOps(interestOps | readInterestOp);
    }
}

我们对比着 SelectionKey 的描述代码看一下:

public static final int OP_READ = 1 << 0;
public static final int OP_WRITE = 1 << 2;
public static final int OP_CONNECT = 1 << 3;
public static final int OP_ACCEPT = 1 << 4;

可以看到,上述代码添加的事件正是 OP_ACCEPT 事件。

至此,我们的 Netty 服务端就正式启动了

4. 总结

Netty 服务端的启动框架基本封装了 Java NIO 启动的部分源码。

剩余的源码将在以后的系列中持续更新,喜欢的小伙伴不妨点个关注~

显示全文