首先来看一段使用 NIO 创建编写 Server 的代码:
public void start() throws IOException {
// 1.打开服务器套接字通道
ServerSocketChannel ssc = ServerSocketChannel.open();
// 必须配置为非阻塞,该Channel才能往selector上注册,否则会报错,selector模式本身就是非阻塞模式
ssc.configureBlocking(false);
// 进行服务的绑定
ssc.bind(new InetSocketAddress("localhost", 8001));
// 2.创建Selector对象
selector = Selector.open();
// 3.向Selector中注册感兴趣的事件(这里的ACCEPT就是新连接发生时所产生的事件)
// 注:对于ServerSocketChannel 通道来说,我们唯一可以指定的参数就是OP_ACCEPT
ssc.register(selector, SelectionKey.OP_ACCEPT);
// 当前只有 server 这一个线程,所以只要该线程不中断就能一直提供服务
while (!Thread.currentThread().isInterrupted()) {
// 4.该调用会阻塞,直至至少有一个事件发生
// 当有客户端来连接时,就会触发 ServerSocketChannel 的 ACCEPT 事件
// 当客户端发送来消息时,就会触发 ServerSocketChannel 的 READ 事件
// 当客户端读取了发送的消息时,就会触发 ServerSocketChannel 的 WRITE 事件
selector.select();
// 获取发生事件的 SelectionKey
Set<SelectionKey> keys = selector.selectedKeys();
// 迭代所有事件
Iterator<SelectionKey> keyIterator = keys.iterator();
while (keyIterator.hasNext()) {
// 拿到当前事件
SelectionKey key = keyIterator.next();
// 根据事件类型进行相应处理
dispatch(key);
// 丢弃已经处理过的事件
keyIterator.remove();
}
}
}
下面我们就按照上面标记的 1,2,3,4 来渐进的分析 NIO 底层源码…
1.创建服务端
ServerSocketChannel ssc = ServerSocketChannel.open();
ServerSocketChannel#open() 源码如下:
public static ServerSocketChannel open() throws IOException {
return SelectorProvider.provider().openServerSocketChannel();
}
再来看 SelectorProvider,它是一个抽象类,从名字就能看出它是用来提供选择器 Selector 的;并且根据 provider() 方法是一个静态方法,我们也可以推理出 Selector 应该是单例的
// 一个普通对象,在 provider() 方法中就是锁的该对象
private static final Object lock = new Object();
// 验证了我们说 Selector 是单例的猜想
private static SelectorProvider provider = null;
// 静态方法,提供Selector对象
public static SelectorProvider provider() {
// 锁,保证多个线程来获取时的线程安全
synchronized (lock) {
// 该判断保证了server程序中只有一个 SelectorProvider 对象
if (provider != null)
return provider;
return AccessController.doPrivileged(
new PrivilegedAction<SelectorProvider>() {
public SelectorProvider run() {
if (loadProviderFromProperty())
return provider;
if (loadProviderAsService())
return provider;
// 会根据操作系统来返回不同的实现类
// windows 平台返回 WindowsSelectorProvider
// Linux 平台返回 EPollSelectorProvider
provider = sun.nio.ch.DefaultSelectorProvider.create();
return provider;
}
});
}
}
来看一眼 SelectorProvider 的继承关系:
等返回 WindowsSelectorProvider 对象后,会继续走到其父类 SelectorProviderImpl 中定义实现 openServerSocketChannel() 方法
public ServerSocketChannel openServerSocketChannel() throws IOException {
// 创建 SocketChannel 实现类,ServerSocketChannelImpl
return new ServerSocketChannelImpl(this);
}
这里再多说一句,SelectorProviderImpl 中还提供了客户端的 SocketChannel 和读文件 DatagramChannel
下面来看 ServerSocketChannelImpl 的构造方法
private final FileDescriptor fd;
ServerSocketChannelImpl(SelectorProvider var1) throws IOException {
super(var1);
// 创建文件描述符 FileDescriptor
this.fd = Net.serverSocket(true);
// 将文件描述符保存下来
// 注:IOUtil.fdVal 是一个native方法
this.fdVal = IOUtil.fdVal(this.fd);
this.state = 0;
}
2.创建Selector
selector = Selector.open();
下面来看 Selector 的 open 方法,java.nio.channels.Selector:
public static Selector open() throws IOException {
return SelectorProvider.provider().openSelector();
}
SelectorProvider#provider() 我们在上一步已经分析过了,它会返回一个全局唯一的 WindowsSelectorProvider 实例。所以下面我们来看一下 WindowsSelectorProvider#openSelector() 方法:
public AbstractSelector openSelector() throws IOException {
// 返回 WindowsSelector 实现对象
return new WindowsSelectorImpl(this);
}
来看一眼 Selector 的继承关系:
下面来看 WindowsSelectorImpl 类的构造方法
// Pipe.open() 打开一个管道(wakeupPipe)
private final Pipe wakeupPipe = Pipe.open();
// 创建一个 PollArrayWrapper 对象(pollWrapper)
private PollArrayWrapper pollWrapper = new PollArrayWrapper(8);
WindowsSelectorImpl(SelectorProvider var1) throws IOException {
super(var1);
// 拿到 wakeupSourceFd 文件描述符
this.wakeupSourceFd = ((SelChImpl)this.wakeupPipe.source()).getFDVal();
// 禁用Nagle方法,以便更迅速的环形
SinkChannelImpl var2 = (SinkChannelImpl)this.wakeupPipe.sink();
var2.sc.socket().setTcpNoDelay(true);
// 拿到 wakeupSinkFd 文件描述符
this.wakeupSinkFd = var2.getFDVal();
// 把 pipe 内 Source 端的文件描述符(wakeupSourceFd)放到 pollWrapper 里
this.pollWrapper.addWakeupSocket(this.wakeupSourceFd, 0);
}
这里我们会有疑惑,为什么要创建一个管道,它是用来做什么的?PollArrayWrapper 又有什么作用?
2.1 Pipe
首先,我们来看 Pipe.open() 源码实现:
public static Pipe open() throws IOException {
// 调用 openPipe 方法
return SelectorProvider.provider().openPipe();
}
Provider 返回 WindowsSelectorProvider,但 openPipe() 方法在其父类 SelectorProviderImpl 中
public Pipe openPipe() throws IOException {
return new PipeImpl(this);
}
再看看怎么 new PipeImpl()的:
PipeImpl(SelectorProvider var1) throws IOException {
try {
// Initializer 是 PipeImpl 的内部类
AccessController.doPrivileged(new PipeImpl.Initializer(var1));
} catch (PrivilegedActionException var3) {
throw (IOException)var3.getCause();
}
}
我们可以看到, PipeImpl 中有一个私有内部类 Initializer,而 Initializer 还有一个私有内部类 LoopbackConnector。

private class LoopbackConnector implements Runnable {
private LoopbackConnector() {
}
public void run() {
ServerSocketChannel var1 = null;
SocketChannel var2 = null;
SocketChannel var3 = null;
try {
// Create secret with a backing array.
ByteBuffer var4 = ByteBuffer.allocate(16);
ByteBuffer var5 = ByteBuffer.allocate(16);
// Loopback address
InetAddress var6 = InetAddress.getByName("127.0.0.1");
assert var6.isLoopbackAddress();
InetSocketAddress var7 = null;
while(true) {
// Bind ServerSocketChannel to a port on the loopback address
// 将 ServerSocketChannel 绑定到端口
if (var1 == null || !var1.isOpen()) {
var1 = ServerSocketChannel.open();
var1.socket().bind(new InetSocketAddress(var6, 0));
var7 = new InetSocketAddress(var6, var1.socket().getLocalPort());
}
// Establish connection (assume connections are eagerly accepted()
// 建立连接
var2 = SocketChannel.open(var7);
PipeImpl.RANDOM_NUMBER_GENERATOR.nextBytes(var4.array());
do {
var2.write(var4);
} while(var4.hasRemaining());
var4.rewind();
// Get a connection and verify it is legitimate
// 获取连接并确认它是合法的
var3 = var1.accept();
do {
var3.read(var5);
} while(var5.hasRemaining());
var5.rewind();
if (var5.equals(var4)) {
// Create source and sink channels
// 创建资源和接收通道
PipeImpl.this.source = new SourceChannelImpl(Initializer.this.sp, var2);
PipeImpl.this.sink = new SinkChannelImpl(Initializer.this.sp, var3);
break;
}
// 关闭连接
var3.close();
var2.close();
}
} catch (IOException var18) {
//...异常捕获处理
}
}
}
这里即为创建 pipe 的过程,windows 下的实现是创建两个本地的 socketChannel,然后连接(连接的过程通过写一个随机数据做两个socket的连接校验),两个 socketChannel 分别实现了管道 pipe 的 source 与 sink 端。
而我们依然不清楚这个 pipe 到底干什么用的,假如大家熟悉系统调用的 C/C++ 的话,就可以知道,一个阻塞在 select 上的线程有以下三种方式可以被唤醒:
- 有数据可读/写,或出现异常
- 阻塞时间到,即 time out
- 收到一个 non-block 的信号。可由 kill或 pthread_kill 发出。
所以,Selector.wakeup() 要唤醒阻塞的 select,那么也只能通过这三种方法。其中,第二种方法可以排除,因为 select 一旦阻塞,无法修改其 time out 时间;而第三种看来只能在 Linux 上实现,Windows 上没有这种信号通知的机制。
所以,看来只有第一种方法了。假如我们多次调用 Selector.open(),那么在 Windows 上会每调用一次,就会建立一对自己和自己的 loopback 的 TCP 连接;在 Linux 上的话,每调用一次,会开一对 pipe(pipe在Linux下一般都成对打开),到这里,估计我们能够猜得出来——那就是如果想要唤醒 select,只需要朝着自己的这个 loopback 连接发点数据过去,于是,就可以唤醒阻塞在 select 上的线程了。
我们对上面所述做下总结:在 Windows 下,Java 虚拟机在 Selector.open() 时会自己和自己建立 loopback 的 TCP 连接;在 Linux 下,Selector 会创建 pipe。这主要是为了 Selector.wakeup() 可以方便唤醒阻塞在 select() 系统调用上的线程(通过向自己所建立的 TCP 链接和管道上随便写点什么就可以唤醒阻塞线程)。
注意,Windows 下是通过两个连接的 socketChannel 实现了 Pipe,而 Linux 下则直接使用系统的 pipe 即可。
2.2 PollArrayWrapper
在 WindowsSelectorImpl 构造器最后,我们看到这一句代码:pollWrapper.addWakeupSocket(wakeupSourceFd, 0);,即把pipe 内 Source端的文件描述符(wakeupSourceFd)放到 pollWrapper 里。pollWrapper 作为 PollArrayWrapper 的实例,它到底是什么?
class PollArrayWrapper {
private AllocatedNativeObject pollArray; // The fd array
long pollArrayAddress; // pollArrayAddress
@Native private static final short FD_OFFSET = 0; // fd offset in pollfd
@Native private static final short EVENT_OFFSET = 4; // events offset in pollfd
static short SIZE_POLLFD = 8; // sizeof pollfd struct
private int size; // Size of the pollArray
PollArrayWrapper(int newSize) {
int allocationSize = newSize * SIZE_POLLFD;
// AllocatedNativeObject 的父类有大量 Unsafe 类的操作,这些都是基于内存级别的操作
// => pollArray 是通过 unsafe.allocateMemory(size + ps)分配的一块系统内存
pollArray = new AllocatedNativeObject(allocationSize, true);
pollArrayAddress = pollArray.address();
this.size = newSize;
}
...
// Access methods for fd structures
void putDescriptor(int i, int fd) {
pollArray.putInt(SIZE_POLLFD * i + FD_OFFSET, fd);
}
void putEventOps(int i, int event) {
pollArray.putShort(SIZE_POLLFD * i + EVENT_OFFSET, (short)event);
}
...
// Adds Windows wakeup socket at a given index.
void addWakeupSocket(int fdVal, int index) {
putDescriptor(index, fdVal);
putEventOps(index, Net.POLLIN);
}
}
至此,我们算是完成了对Selector.open()的解读,其主要任务就是完成建立 Pipe,并把 pipe source 端的wakeupSourceFd 放入 pollArray 中,这个 pollArray 是 Selector 完成其角色任务的枢纽。
特别强调:关于 Pipe 和 PollArrayWrapper 的分析参考了这篇博客,很详细!
3.注册事件
ssc.register(selector, SelectionKey.OP_ACCEPT);
所谓的注册,其实就是将一个对象放到注册地对象内的一个容器字段上,这个字段可以是数组,队列,也可以是一个set集合,也可以是一个list。这里,同样是这样,只不过,其需要有个返回值,那么把这个要放入集合的对象返回即可。
从 ServerSocketChannel 一步步跟下去,最终走到了其父类 AbstractSelectableChannel:
// SelectionKey 保存了 selector和 channel 对应关系
// keys 记录了所有的
private SelectionKey[] keys = null;
// 入参:选择器 selector,要注册的事件 ops,att一般默认为 null
public final SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException {
synchronized (regLock) {
// 一些注册前的校验
if (!isOpen()) // 服务器是否打开
throw new ClosedChannelException();
// 判断服务器是否注册了接收连接事件
// 该方法仅一行代码: return SelectionKey.OP_ACCEPT;
if ((ops & ~validOps()) != 0)
throw new IllegalArgumentException();
if (blocking) // 是否阻塞
throw new IllegalBlockingModeException();
// 遍历 SelectionKey[],判断当前 Selector 是否已经有 SelectionKey 了
SelectionKey k = findKey(sel);
if (k != null) {
k.interestOps(ops);
k.attach(att);
}
if (k == null) {
// New registration
synchronized (keyLock) {
if (!isOpen())
throw new ClosedChannelException();
// 核心!!
// 把 selector和 channel 绑定在一起
// 也就是把 new ServerSocketChannel 时创建的FD与 selector 绑定在了一起
// 返回包含这俩的 SelectionKey
k = ((AbstractSelector)sel).register(this, ops, att);
// 将SelectionKey加到保存的SelectionKey[]全局变量
addKey(k);
}
}
return k;
}
}
我们下来看一下 register() 方法,最后一路点到了 SelectorImpl 类中
protected final SelectionKey register(AbstractSelectableChannel var1, int var2, Object var3) {
if (!(var1 instanceof SelChImpl)) {
throw new IllegalSelectorException();
} else {
// 创建 SelectionKey
// var1就是最初的 channel,this 就是当前 selector
SelectionKeyImpl var4 = new SelectionKeyImpl((SelChImpl)var1, this);
var4.attach(var3);
Set var5 = this.publicKeys;
synchronized(this.publicKeys) {
this.implRegister(var4);
}
var4.interestOps(var2);
return var4;
}
}
到这里为止其实初始化的工作就都做好了。
4.等待事件触发
selector.select();
接下来我们看 select() 方法,一直点下去我们走到了 WindowsSelectorImpl#doSelect()
// 直接调用空参select()时,这里var1=0
protected int doSelect(long var1) throws IOException {
if (this.channelArray == null) {
throw new ClosedSelectorException();
} else {
this.timeout = var1;
this.processDeregisterQueue();
if (this.interruptTriggered) {
this.resetWakeupSocket();
return 0;
} else {
// Calculate number of helper threads needed for poll. If necessary
// threads are created here and start waiting on startLock
// 计算轮询所需要的辅助线程数。如果需要,在这里创建线程并开始等待startLock。
this.adjustThreadsCount();
// reset finishLock
// 重置 finisLock
this.finishLock.reset();
// Wakeup helper threads, waiting on startLock, so they start polling.
// Redundant threads will exit here after wakeup.
// 唤醒辅助线程,它们开始轮询。唤醒后,冗余线程在此退出
this.startLock.startThreads();
// do polling in the main thread. Main thread is responsible for
// first MAX_SELECTABLE_FDS entries in pollArray.
// 在主线程中进行轮询。主线程负责 pollArray 中的前 MAX_SELECTABLE_FDS 个条目
try {
this.begin();
try {
// 核心!!!
this.subSelector.poll();
} catch (IOException var7) {
this.finishLock.setException(var7);
}
// Main thread is out of poll(). Wakeup others and wait for them
// 主线程不在 poll() 中,唤醒并等待它们
if (this.threads.size() > 0) {
this.finishLock.waitForHelperThreads();
}
} finally {
this.end();
}
// Done with poll(). Set wakeupSocket to nonsignaled for the next run.
// poll() 完成后将 wakeUpSocket 置为无信号,以便下次运行
this.finishLock.checkForException();
this.processDeregisterQueue();
// Done with poll(). Set wakeupSocket to nonsignaled for the next run.
// poll() 完成后将 wakeUpSocket 置为无信号,以便下次运行
int var3 = this.updateSelectedKeys();
this.resetWakeupSocket();
return var3;
}
}
}
其中 subSelector.poll()是核心,也就是轮训 pollWrapper 中保存的 FD;
private int poll() throws IOException {
return this.poll0(WindowsSelectorImpl.this.pollWrapper.pollArrayAddress, Math.min(WindowsSelectorImpl.this.totalChannels, 1024), this.readFds, this.writeFds, this.exceptFds, WindowsSelectorImpl.this.timeout);
}
poll() 的具体实现是调用 native 方法 poll0:
private native int poll0(long pollAddress, int numfds,
int[] readFds, int[] writeFds, int[] exceptFds, long timeout);
// These arrays will hold result of native select().
// The first element of each array is the number of selected sockets.
// Other elements are file descriptors of selected sockets.
private final int[] readFds = new int [MAX_SELECTABLE_FDS + 1];//保存发生read的FD
private final int[] writeFds = new int [MAX_SELECTABLE_FDS + 1]; //保存发生write的FD
private final int[] exceptFds = new int [MAX_SELECTABLE_FDS + 1]; //保存发生except的FD
//...
}
这个poll0() 会监听 pollWrapper 中的 FD 有没有数据进出,这会造成IO阻塞,直到有数据读写事件发生。
- 由于pollWrapper 中保存的也有 ServerSocketChannel 的FD,所以只要 ClientSocket 发一份数据到 ServerSocket,那么poll0() 就会返回;
- 由于 pollWrapper 中保存的也有 pipe 的 write 端的 FD,所以只要 pipe 的 write 端向FD发一份数据,也会造成 poll0() 返回;
如果这两种情况都没有发生,那么 poll0() 就一直阻塞,也就是 selector.select() 会一直阻塞;如果有任何一种情况发生,那么 selector.select() 就会返回,所以在 OperationServer 的 run() 里要用 while (true),这样就可以保证在selector接收到数据并处理完后继续监听poll()。
这里再说一下 wakeup 方法,看 WindowsSelectorImpl#Wakeup():
public Selector wakeup() {
Object var1 = this.interruptLock;
synchronized(this.interruptLock) {
if (!this.interruptTriggered) {
this.setWakeupSocket();
this.interruptTriggered = true;
}
return this;
}
}
private void setWakeupSocket() {
// setWakeupSocket0 是一个 native 方法
this.setWakeupSocket0(this.wakeupSinkFd);
}
private native void setWakeupSocket0(int wakeupSinkFd);
JNIEXPORT void JNICALL Java_sun_nio_ch_WindowsSelectorImpl_setWakeupSocket0(JNIEnv *env, jclass this,jint scoutFd)
{
/* Write one byte into the pipe */
const char byte = 1;
send(scoutFd, &byte, 1, 0);
}
可见 wakeup() 是通过 pipe 的 write 端 send(scoutFd, &byte, 1, 0),发生一个字节1,来唤醒 poll()。所以在需要的时候就可以调用 selector.wakeup() 来唤醒selector。











还没有评论,来说两句吧...