网络 IO 模型是各种分布式框架的基础, 也是中间件研发的基本功之一; 深入掌握各种 IO 模型的工作逻辑, 内核实现原理, java 封装 API 等, 对于我们理解 netty、kafka、tomcat 等中间件的网络框架实现, 有着强有力的帮助;

基础概念

阻塞 (blocking) / 非阻塞 (nonblocking)

描述的是进程的一个操作是否会使得进程转变为 waiting 的状态:

阻塞 vs 非阻塞
阻塞 vs 非阻塞

  • 阻塞: 要让一个进程进入等待 (waiting) 的状态, 要么是它主动调用 wait() 或 sleep() 等挂起自己的操作, 另一种就是它进行系统调用, 而系统调用因为涉及到了 I/O 操作不能立即完成, 于是内核就会先将该进程置为 waiting 状态, 调度其他进程的运行, 等到它所请求的 I/O 操作完成了以后, 再将其状态更改回 ready;
    由于 DMA 技术的普及, 在现代计算机系统中这些 I/O 操作基本都是异步完成的, 但是大部分操作系统默认为用户级应用程序提供的都是阻塞式的系统调用接口 (因为可以使代码的执行顺序和编写顺序一致, 从而让应用级代码的编写更容易);
  • 非阻塞: 大部分现代操作系统也会提供非阻塞 I/O 系统调用接口, 一个非阻塞调用不会挂起调用程序, 而是会立即返回一个值, 表示有多少 bytes 的数据被成功读取或写入 (不一定是完整的结果);

同步 (synchronous) / 异步 (asychronous)

描述的是消息通信机制:

同步 vs 异步
同步 vs 异步

  • 同步: 发起一个请求时, 在没有得到结果之前, 该请求就不返回, 但是一旦调用返回, 就得到最终的返回值了;
  • 异步: 请求在发出之后就立即返回, 不会等待 I/O 操作的完成, 等到 I/O 操作完成 (必须是完整的) 了以后, 操作系统会将结果通知调用进程, 可能的通知的方式如下:
    1. 设置一个用户空间特殊的变量值;
    2. 触发一个信号;
    3. 产生一个软中断;
    4. 调用应用程序的回调函数;

两对概念的直观理解

如果我们站在用户的角度, 可以用不够严谨但通俗易懂的方式来理解以上两对概念:

  • 描述调用方的状态:
    • 阻塞: 调用方发起请求后被挂起, 必须等待该操作完全完成才能继续执行后续代码;
    • 非阻塞: 调用方发起操作后不被挂起立即返回, 无需等待操作完成, 后续代码可立即继续执行;
  • 描述被调用方返回结果的方式:
    • 同步: 调用结束即返回结果, 不会有后续回调通知;
    • 异步: 调用方发起操作后, 由被调用方回调通知调用方结果;

UNIX 网络编程的 IO 模型

在一次 UNIX 系统的网络 IO 中, 涉及到 NIC (网卡)、Kernel 和 User App 三种角色的通信 / 交互:

UNIX 网络 IO
UNIX 网络 IO

以 IO 读为例, 由上图可知一次 IO 读涉及两个阶段的数据拷贝:

  1. 数据从网卡复制到内核缓冲区;
  2. 数据从内核缓冲区复制到用户进程缓冲区;

我们将第一阶段称为数据准备, 将第二阶段称为数据复制; 由于计算机技术的不断发展 (比如 DMA 技术的普及), UNIX 网络 IO 中各阶段之间的交互形态衍生出了性能迥异的多种模式, 在《Unix 网络编程 · 卷1》一书中归纳出了五种网络 IO 模型:

  • 阻塞式 IO (Blocking)
  • 非阻塞式 IO (Non-Blocking)
  • IO 多路复用 (Multiplexing)
  • 信号驱动 IO (Signal-Driven)
  • 异步 IO (Asychronous)

随着更加前沿的计算机技术的实践, 传统的两阶段 UNIX 网络 IO 模型也在面临挑战, 比如无需内核介入的 RDMA 技术就省去了第一阶段的数据拷贝, 不过它需要特定的硬件支持, 现阶段这类新技术只在有限的环境下被应用, 为了安全和稳定性, 更多的通用场景依然遵循两阶段 IO 模型;

同步阻塞

应用进程被阻塞, 直到数据复制到应用进程缓冲区中才返回;
这是最原始的 IO 模型, 对于应用进程来说吞吐能力很低, 但因为其阻塞不再消耗 CPU 时间, 所以从操作系统整体来考虑反而效率较高;

同步阻塞 IO
同步阻塞 IO

内核支持

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#include <sys/types.h>  
#include <sys/socket.h>

/**
* 成功返回读入的字节数, 失败返回 -1 并设置 errno
* sockfd: 套接字文件描述符
* buf: 接收缓冲区
* len: 接收缓冲区长度
* flags: 调用操作方式
*/
ssize_t recv(int sockfd, void *buf, size_t len, int flags);

/**
* 从 (已连接的) 套接字上接收数据, 并获取数据发送源的地址
* 成功返回读入的字节数, 如果连接已中止返回 0, 其余情况返回 SOCKET_ERROR 错误
* s: 标识一个已连接套接字的描述字
* buf: 接收缓冲区
* len: 接收缓冲区长度
* flags: 调用操作方式
* from: (可选) 发送源套接字地址, 指向源地址的缓冲区
* fromlen: (可选) 发送源套接字缓冲区长度
*/
int recvfrom(SOCKET s, void *buf, int len, unsigned int flags, struct sockaddr *from, int *fromlen);

上层封装

java 对同步阻塞 IO 模型的支持, 举例如下:

1
2
3
4
5
6
7
final Socket socket = new Socket();
socket.connect(InetSocketAddress.createUnresolved("192.168.1.1", 7001));
InputStream inputStream = socket.getInputStream();
// 开辟用户态缓冲区
final byte[] content = new byte[128];
// 同步阻塞, 直到数据复制到缓冲区, 返回读取的字节数
final int bytesOfRead = inputStream.read(content);

同步非阻塞

应用进程执行系统调用之后, 如果数据没有准备好, 内核会立即返回一个错误码, 应用进程可以继续执行其他逻辑; 但要想感知 I/O 是否完成, 就需要以轮询的方式不断执行系统调用, 直到数据 ready 复制到用户空间返回成功, 才能停止轮询;
这种方式看起来不阻塞用户进程, 但是应用进程并没有更好的办法获取 IO 的结果, 最终也只能不断轮询, 不断消耗 CPU 时间, 并产生大量的系统调用, 相对于阻塞式 IO 来说其效率反而更低;

同步非阻塞 IO
同步非阻塞 IO

内核支持

针对上一节提及的 recv / recvfrom 函数, 其第四个入参 flags 有如下几种常见的值:

  • MSG_DONTWAIT: 在没有数据可接收的情况下立即返回 (非阻塞模式);
  • MSG_OOB: 接收带外数据 (紧急通知或控制信息);
  • MSG_PEEK: 预先查看数据, 但不会将其从套接字缓冲区中移除 (调试模式);
  • MSG_TRUNC: 如果数据长度大于缓冲区长度,数据将被截断 (不设置则会失败);
  • MSG_WAITALL: 会一直等待直到接收到指定长度的数据 (保证接收数据完整性);

使用非阻塞模式时, 如果没有接收到数据, recv() 函数会立即返回 -1, 并将 errno 设置为 EAGAIN 或 EWOULDBLOCK:

1
2
3
4
5
int sockfd, n;
char buffer[MAXLINE];
memset(buffer, '/0', sizeof(buffer));
# 设置 flags = MSG_DONTWAIT 可实现非阻塞
n = recv(sockfd, buffer, MAXLINE, MSG_DONTWAIT);

上层封装

java 对同步非阻塞 IO 模型的支持, 举例如下:

1
2
3
4
5
6
final SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
socketChannel.connect(InetSocketAddress.createUnresolved("192.168.1.1", 4591));
final ByteBuffer buffer = ByteBuffer.allocate(128);
// 轮询直到完成 IO
while (socketChannel.read(buffer) == 0) {}

IO 多路复用 (重点)

复用同一个线程, 在一次 multiplex 调用 (阻塞执行) 中可批量跟踪多个网络连接的状态变化, 特别适合连接数很多但连接时间短的高并发场景;
上面两节提到的 IO 模型在一个线程的一次调用中只能处理一个连接, 相对来说 IO 多路复用通过批处理的方式减少了系统调用的开销, 极大地提升了 IO 处理效率;

IO 多路复用模型
IO 多路复用模型

内核支持

linux 在内核层面提供了多种支持 IO 多路复用的 API:

  • select:
  • poll:
  • epoll (event poll):

kqueue

上层封装

java 针对 IO 多路复用模型封装了一套 Selector 接口, 其底层对应的实现是 epoll 系统调用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
final Selector selector = Selector.open();
final ServerSocketChannel socketChannel = ServerSocketChannel.open();
socketChannel.configureBlocking(false);
socketChannel.bind(new InetSocketAddress(8080));
// 注册接受事件, OP_ACCEPT 只适用于 ServerSocketChannel
socketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
// 阻塞等待注册的事件发生
selector.select();
final Set<SelectionKey> selectionKeys = selector.selectedKeys();
final Iterator<SelectionKey> iter = selectionKeys.iterator();
while(iter.hasNext()) {
final SelectionKey key = iter.next();
if (key.isAcceptable()) {
final SocketChannel channel = ((ServerSocketChannel) key.channel()).accept();
channel.configureBlocking(false);
// 注册读事件
channel.register(selector, SelectionKey.OP_READ);
}
if (key.isReadable()) {
final SocketChannel channel = (SocketChannel) key.channel();
final ByteBuffer readBuffer = ByteBuffer.allocate(512);
channel.read(readBuffer);
readBuffer.flip();
// 注册写事件
key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
// 绑定 Buffer
key.attach(buffer);
}
if (key.isWritable()) {
final ByteBuffer buffer = (ByteBuffer) key.attachment();
final SocketChannel channel = (SocketChannel) key.channel();
if (buffer.hasRemaining()) {
channel.write(buffer);
} else {
// 发送完了就取消写事件, 否则下次还会进入该分支
key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
}
}
iter.remove();
}
}

信号渠道 IO

应用进程进行 sigaction 系统调用, 内核立即返回, 应用进程可以继续执行; 内核在数据到达时向应用进程发送 SIGIO 信号 (通知应用可以进行数据复制), 应用进程收到信号之后再调用 recv / recvfrom 等系统调用将数据真正从内核空间复制到用户空间;

信号驱动 IO 模型
信号驱动 IO 模型

内核支持

上层封装

java 没有对信号渠道 IO 做对应封装;

异步 IO

应用进程进行 aio_read 系统调用, 内核立即返回, 应用进程可以继续执行; 内核会在所有操作完成 (包括将数据复制到用户空间) 之后向应用进程发送信号, 应用进程直接可以消费数据;

异步 IO 模型
异步 IO 模型

内核支持

Windows 的 IOCP

上层封装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
final AsynchronousSocketChannel asynchronousSocketChannel = AsynchronousSocketChannel.open();
Future<Void> connect = asynchronousSocketChannel.connect(InetSocketAddress.createUnresolved("192.168.31.80", 3456));
connect.get();
ByteBuffer buffer = ByteBuffer.wrap(new byte[128]);
asynchronousSocketChannel.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer buffer) {
//当读取到数据,流中止,或者读取超时到达时均会触发回调
if (result > 0) {
//result代表着本次读取的数据,代码执行到这里意味着数据已经被放入buffer了
processWithBuffer(buffer);
} else if (result == -1) {
//流中止,没有其他操作
} else {
asynchronousSocketChannel.read(buffer, buffer, this);
}
}

private void processWithBuffer(ByteBuffer buffer) {
}

@Override
public void failed(Throwable exc, ByteBuffer attachment) {
}
});

IO 模型的比较

前四种 I/O 模型的主要区别在于第一个阶段,而第二个阶段是一样的: 将数据从内核复制到应用进程过程中,应用进程会被阻塞

参考资料