相信学过 Netty
的小伙伴都应该熟悉 Java
的 NIO
,在 Java 中创建服务端和客户端的代码如下所示:
服务端
// 1. 创建一个 selector 对象
Selector selector = Selector.open();
ServerSocketChannel ssc = ServerSocketChannel.open(); // 创建FD-1
ssc.configureBlocking(false); // 非阻塞模式
// 2. 建立 selector 与 channel 的联系(注册)
// SelectionKey:事件发生时,通过这个可以知道事件和那个channel的事件
// 这个key,只关注 accept 事件
SelectionKey sscKey = ssc.register(selector, SelectionKey.OP_ACCEPT, null);
// 3. 注册端口号
ssc.bind(new InetSocketAddress(8080));
客户端
// 建立客户端的channel
SocketChannel channel = SocketChannel.open();
// 连接服务端的IP和端口
channel.connect(new InetSocketAddress("localhost", 8080));
// 发送消息
channel.write(Charset.defaultCharset().encode("hello"));
我们的 Netty
正是在 Java NIO
做的一层封装
既然是封装,Netty
的源码中必然存在以上 服务端
和 客户端
的代码
public class TestNettyServer {
public static void main(String[] args) {
// 1. 服务器端的启动器,负责组装 netty 主键,启动服务器
new ServerBootstrap()
// 2. BossEventLoop、WorkerEventLoop(selector、thread)
.group(new NioEventLoopGroup())
// 3. 选择服务器的IO模式
.channel(NioServerSocketChannel.class)
// 4. boss 负责处理连接 worker 负责处理读写
.childHandler(
// 5. channel 代表和客户端进行数据读写的通道 Initializer 初始化,负责添加别的 handler
new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
// 6. 添加具体的 handler
ch.pipeline().addLast(new LoggingHandler());
}
})
// 7. 绑定监听端口
.bind(8080);
}
}
Netty
中获取选择器 Selector
,Netty
中使用 NioEventloopGroup
中的 NioEventloop
封装了线程和选择器
创建NioServerSocketChannel
,该 Channel
作为附件添加到 ServerSocketChannel
中
// 创建 NioServerSocketChannel,同时会初始化它关联的 handler,以及为原生 ssc 存储 config
NioServerSocketChannel attachment = new NioServerSocketChannel();
//注册(仅关联 selector 和 NioServerSocketChannel),未关注事件
SelectionKey selectionKey = serverSocketChannel.register(selector, 0, attachment);
绑定端口
要剖析启动代码,我们直接从 bind
入手
选择器 Selector
的创建是在 NioEventloopGroup
中完成的。
而 NioServerSocketChannel
与 ServerSocketChannel
的创建,ServerSocketChannel
注册到 Selector
上以及绑定的操作都由 bind
完成。
所以,我们的启动入口:io.netty.bootstrap.AbstractBootstrap.bind
public ChannelFuture bind(SocketAddress localAddress) {
this.validate();
return this.doBind((SocketAddress)ObjectUtil.checkNotNull(localAddress, "localAddress"));
}
private ChannelFuture doBind(final SocketAddress localAddress) {
// 负责NioServerSocketChannel和ServerSocketChannel的创建
// ServerSocketChannel的注册工作
// init由main线程完成,regisetr由NIO线程完成
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
// 因为register操作是异步的
// 所以要判断主线程执行到这里时,register操作是否已经执行完毕
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
// 执行doBind0绑定操作
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
// 如果register操作还没执行完,就会到这个分支中来
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
// 添加监听器,NIO线程异步进行doBind0操作
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
// 执行doBind0绑定操作
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
Channel channel = null;
try {
// 通过 channelFactory 创建 NIOServerSocketChannel
channel = this.channelFactory.newChannel();
this.init(channel);
} catch (Throwable var3) {
if (channel != null) {
channel.unsafe().closeForcibly();
return (new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE)).setFailure(var3);
}
return (new DefaultChannelPromise(new FailedChannel(),GlobalEventExecutor.INSTANCE)).setFailure(var3);
}
我们看一下通过这个 channelFactory
怎么创建的 NIOServerSocketChannel
,并如何实现 ServerSocketChannel.open()
public T newChannel() {
try {
return (Channel)this.constructor.newInstance();
}
}
很明显,利用的反射的原理创建的 Channel
我们追进去看看,构造方法:
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
// 构造方法
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
private static java.nio.channels.ServerSocketChannel newSocket(SelectorProvider provider) {
return provider.openServerSocketChannel();
}
所以,最终调用的是:SelectorProvider.provider().openServerSocketChannel()
而这个调用,正好和我们 ServerSocketChannel ssc = ServerSocketChannel.open();
的调用链路一模一样,我用图展示一下:
通过上图,我们可以明显的看到,Netty
在初始化的时候,实际上调用 NioServerSocketChannel
的构造方法,通过其实现了 ServerSocketChannel.open()
,新建 FD
到这里,我们的初始化 NioServerSocketChannel
告一段落,下面看一下 Register
操作
我们继续往下追,可以看到有一个 this.init(channel);
方法,这个方法添加了一个 Handler
p.addLast(new ChannelHandler[]{new ChannelInitializer<Channel>() {
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = ServerBootstrap.this.config.handler();
if (handler != null) {
pipeline.addLast(new ChannelHandler[]{handler});
}
ch.eventLoop().execute(new Runnable() {
public void run() {
pipeline.addLast(new ChannelHandler[]{new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)});
}
});
}
}});
我们暂时不解释这里的作用,我们只需要记住这里添加了一个 Handler
,后面会进行调用(后面调用的时候会讲)。
我们先总览一下 Register
的源码部分:
ChannelFuture regFuture = this.config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
// 获取当前的eventLoop
AbstractChannel.this.eventLoop = eventLoop;
// 此处完成了由 主线程 到 NIO线程 的切换
// eventLoop.inEventLoop()用于判断当前线程是否为NIO线程
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
// 向NIO线程中添加任务
eventLoop.execute(new Runnable() {
@Override
public void run() {
// 该方法中会执行doRegister
// 执行真正的注册操作
register0(promise);
}
});
}
}
private void register0(ChannelPromise promise) {
try {
// 执行真正的注册逻辑
doRegister();
neverRegistered = false;
registered = true;
// 调用init中的initChannel方法
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
}
}
我们的注册整体上主要分为 2 部分
ssc.register(selector, SelectionKey.OP_ACCEPT, attach);
Handler
执行doRegister
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
// 注册
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
}
}
}
pipeline.invokeHandlerAddedIfNeeded:此方法会调用我们一开始定义的 Handler
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
// 添加新任务,任务负责添加 handler
// 该handler负责发生Accepet事件后建立连接
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
Register
主要实现了 ssc.register(selector, SelectionKey.OP_ACCEPT, attach);
的注册功能,并在创建时进行了线程切换,从 main线程
到 NIO线程
到这里,我们的 Register
也告一段落
我们上面讲到了 safeSetSuccess(promise)
,向我们的 promise
设置成功的结果,并通过下面的 doBind0(regFuture, channel, localAddress, promise);
进行调用
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
//
if (regFuture.isDone()) {
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {.
promise.setFailure(cause);
} else {
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
底层实现如下:
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
assertEventLoop();
boolean wasActive = isActive();
try {
// 注册端口
doBind(localAddress);
}
// 前面一系列操作后,我们的 NIOServerSocketChannel 是否可用
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}
safeSetSuccess(promise);
}
注册端口的代码
@SuppressJava6Requirement(reason = "Usage guarded by java version check")
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
// 根据当前 JDK 的版本是否大于 7
if (PlatformDependent.javaVersion() >= 7) {
// 调用ServerSocketChannel的bind方法,绑定端口
// javaChannel() = ServerSocketChannel
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
到这里,我们已经完成了端口的注册,距离我们服务器的启动只差绑定 SelectionKey.OP_ACCEPT
我们后续看一下:pipeline.fireChannelActive();
pipeline.fireChannelActive:触发所有该 pipeline 上的事件
我们 Netty
的 Handler
信息如下:
除了我们自定义的 Handler
,我们需要查看一下 Head
的代码
最终实现代码如下:
@Override
protected void doBeginRead() throws Exception {
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
// readInterestOp = 1 << 4 = 16
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}
我们对比着 SelectionKey
的描述代码看一下:
public static final int OP_READ = 1 << 0;
public static final int OP_WRITE = 1 << 2;
public static final int OP_CONNECT = 1 << 3;
public static final int OP_ACCEPT = 1 << 4;
可以看到,上述代码添加的事件正是 OP_ACCEPT
事件。
至此,我们的 Netty
服务端就正式启动了
Netty 服务端的启动框架基本封装了 Java NIO 启动的部分源码。
剩余的源码将在以后的系列中持续更新,喜欢的小伙伴不妨点个关注~