yeskery

在 Java 7 中体会 NIO.2 异步执行的快乐

简单介绍 Asynchronous I/O

JDK7 已经大致确定发布时间。JSR 203 提出很久了。2009.11.13,JDK7 M5(b76)已经发布。JSR 203 习惯上称为 NIO.2,主要包括新的:

  • 异步 I/O(简称 AIO);
  • Multicase 多播;
  • Stream Control Transport Protocol(SCTP);
  • 文件系统 API;
  • 以及一些 I/O API 的更新,例如:java.io.File.toPath,NetworkChannel 的完整抽象,等等。

本文将主要关注 AIO。AIO 包括 Sockets 和 Files 两部分的异步通道接口及其实现,并尽量使用操作系统提供的原生本地 I/O 功能进行实现。例如 Windows 版本的实现就使用了所谓的完成端口模型(IOCP)。其实 JDK 7 中 AIO 实现本质上说应该是 Proactor 模式的实现。Alexander Libman 提供 NIO 版本的 JProactor 的实现。NIO.2 版本 JProactor 正在进行。Grizzly 也已经提供新的基于 AIO 的实现。如果您只想检查这些最新的 API,NIO.2 项目的 Javadoc 站点只列出了 NIO.2 部分的 API。

  • AIO 的核心概念:发起非阻塞方式的 I/O 操作。当 I/O 操作完成时通知。
  • 应用程序的责任就是:什么时候发起操作? I/O 操作完成时通知谁?

AIO 的 I/O 操作,有两种方式的 API 可以进行:

  • Future 方式;
  • Callback 方式。

下面我们分别对这两种方式的 API 进行举例说明。

Future 方式

Future 方式:即提交一个 I/O 操作请求,返回一个 Future。然后您可以对 Future 进行检查,确定它是否完成,或者阻塞 IO 操作直到操作正常完成或者超时异常。使用 Future 方式很简单,比较典型的代码通常像清单 1 所示。

清单 1. 使用 Future 方式的代码示例

  1. AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
  2. // 连接远程服务器,等待连接完成或者失败
  3. Future<Void> result = ch.connect(remote);
  4. // 进行其他工作,例如,连接后的准备环境,f.e.
  5. //prepareForConnection();
  6. //Future 返回 null 表示连接成功
  7. if(result.get()!=null){
  8. // 连接失败,清理刚才准备好的环境,f.e.
  9. //clearPreparation();
  10. return;
  11. }
  12. // 网络连接正常建立
  13. ...
  14. ByteBuffer buffer = ByteBuffer.allocateDirect(8192);
  15. // 进行读操作
  16. Future<Integer> result = ch.read(buffer);
  17. // 此时可以进行其他工作,f.e.
  18. //prepareLocalFile();
  19. // 然后等待读操作完成
  20. try {
  21. int bytesRead = result.get();
  22. if(bytesRead==-1){
  23. // 返回 -1 表示没有数据了而且通道已经结束,即远程服务器正常关闭连接。
  24. //clear();
  25. return;
  26. }
  27. // 处理读到的内容,例如,写入本地文件,f.e.
  28. //writeToLocolFile(buffer);
  29. } catch (ExecutionExecption x) {
  30. //failed
  31. }

需要注意的是,因为 Future.get() 是同步的,所以如果不仔细考虑使用场合,使用 Future 方式可能很容易进入完全同步的编程模式,从而使得异步操作成为一个摆设。如果这样,那么原来旧版本的 Socket API 便可以完全胜任,大可不必使用异步 I/O。

Callback 方式

Callback 方式:即提交一个 I/O 操作请求,并且指定一个 CompletionHandler。当异步 I/O 操作完成时,便发送一个通知,此时这个 CompletionHandler 对象的 completed 或者 failed 方法将会被调用,样例代码如清单 2 所示。

清单 2. 完成处理接口

  1. public interface CompletionHandler<V,A> {
  2. // 当操作完成后被调用,result 参数表示操作结果,
  3. //attachment 参数表示提交操作请求时的参数。
  4. void completed(V result, A attachment);
  5. // 当操作失败是调用,exc 参数表示失败原因。attachment 参数同上。
  6. void failed(Throwable exc, A attachment);
  7. }
  • V表示结果值的类型。对于异步网络通道的读写操作而言,这个结果值 V 都是整数类型,表示已经操作的卦数,如果是 -1,NIO.2 内核实现保证传递的 ByteBuffer参数不会有变化。
  • A表示关联到 I/O 操作的对象的类型。用于传递操作环境。通常会封装一个连接环境。
  • 如果成功则 completed 方法被调用。如果失败则 failed 方法被调用。

关于 Attachment 参数
Attachment 参数是不是看着十分眼熟呢?是的,NIO 中也使用类似的方法。当然 I/O 操作是不会对这个参数进行任何操作的,可以用于在不同的 CompletionHandler 对象之间进行通信。

准备好 CompletionHandler 之后,如何使用 CompletionHandler 呢? AIO 提供四种类型的异步通道以及不同的 I/O 操作接受一个 CompletionHandler 对象,它们分别是:

  • AsynchronousSocketChannel:connect,read,write
  • AsynchronousFileChannel:lock,read,write
  • AsynchronousServerSocketChannel:accept
  • AsynchronousDatagramChannel:read,write,send,receive

本文重点关注 AsynchronousSocketChannel 的使用,首先简单浏览一下该类型的 API。

AsynchronousSocketChannel

  1. public abstract class AsynchronousSocketChannel
  2. implements AsynchronousByteChannel, NetworkChannel

创建一个异步网络通道,并且绑定到一个默认组。

  1. public static AsynchronousSocketChannel open() throws IOException

将异步网络通道连接到远程服务器,使用指定的 CompletionHandler 听候完成通知。

  1. public abstract <A> void connect(SocketAddress remote,
  2. A attachment,
  3. CompletionHandler<Void,? super A> handler)

从异步网络通道读取数据到指定的缓冲区,使用指定的 CompletionHandler 听候完成通知。

  1. public final <A> void read(ByteBuffer dst,
  2. A attachment,
  3. CompletionHandler<Integer,? super A> handler)

向异步网络通道写缓冲区中的数据,使用指定的 CompletionHandler 听候完成通知。

  1. public final <A> void write(ByteBuffer src,
  2. A attachment,
  3. CompletionHandler<Integer,? super A> handler)

开始简单的异步 I/O 网络客户端程序

本文重点关注 AIO 的 socket 部分。接下来,我们以 AIO 方式的 FTP 客户端程序为例,开始体会异步执行的快乐。需要提醒的是:快乐和痛苦如影随行。好,那“痛并快乐着”吧。

使用 AIO,可以想象一个在线视频播放的应用场景。使用异步 I/O 回调方式,可以这样描述一边下载视频一边播放的功能:

为什么选择客户端编程的主题呢?
为什么选择客户端编程的主题呢?相对来说,其他文章和资料通常使用网络服务器作为主题。客户端的相对较少。 使用 AIO 进行客户端编程有什么好处呢?想象一个 UI 的客户端程序,再看看时下流行的下载工具,线程一大堆,搞得你手上的工作做得不爽。好有一比,工厂希望有订单的时候多些工人,订单少的时候就少些工人。 使用 AIO,程序通常可以使用更少的线程。

  1. 准备好网络连接
  2. 准备一个缓冲区,提交读操作希望下载部分视频内容,(这个读请求马上完成)
  3. 等待读请求完成操作,此时可以进行其他工作,比如播放广告
  4. 读操作真正完成,得到通知,CompletionHandler#completed 方法被调用
  5. 启动另外的播放线程,从下载的缓冲区读取内容播放视频。
  6. 再准备一个另外的缓冲区,回到第二步

这样,第二步到第六步自动构成一个执行循环,但不是 while 之类的代码循环。

本文以 FTP 客户端程序为例,来阐述如何使用异步 I/O 进行网络程序的编写。

FTP 分为两个通道进行处理:控制通道和数据通道。

首先,开始 FTP 的控制通道的编程。FTP 的控制通道使用 telnet 行命令方式进行请求和响应处理。 第一个例子不会复杂,我们只是连接到一个远程服务器,并且检查某个文件的大小,然后退出。基本步骤如下:

  1. 连接到 FTP 服务器。为了便于测试,本文将“攻击”ftp.gnu.org 服务器。
  2. 读取服务器的欢迎信息,检查远程服务器是否已经准备就绪。
  3. 如果服务器没有准备好,关闭连接,退出
  4. 如果服务器没有问题,发送登录命令。
  5. 检查登录命令结果。如果登录失败,转到第 8 步。
  6. 如果服务器没有问题,发送检查文件大小的命令。
  7. 检查命令结果。并且显示结果。
  8. 发送退出命令
  9. 关闭连接。

使用进行一个简单的设计:

第一个简单有问题的例子

  1. import java.io.IOException;
  2. import java.net.InetSocketAddress;
  3. import java.nio.ByteBuffer;
  4. import java.nio.channels.AsynchronousSocketChannel;
  5. import java.nio.channels.CompletionHandler;
  6. public class FTPClient1 {
  7. public static void main(String[] args) throws IOException {
  8. // 第一个,创建异步网络通道
  9. AsynchronousSocketChannel channel = AsynchronousSocketChannel.open();
  10. // 连接到服务器,以 ftp.gnu.org 为目标
  11. channel.connect(new InetSocketAddress("ftp.gnu.org", 21), channel,
  12. // 使用连接完成的回调
  13. new CompletionHandler<Void, AsynchronousSocketChannel>() {
  14. @Override
  15. public void completed(Void result, AsynchronousSocketChannel attachment) {
  16. // 完成连接后,启动 FTP 的控制逻辑
  17. FTPClient1 client = new FTPClient1();
  18. client.start(attachment);
  19. }
  20. @Override
  21. public void failed(Throwable exc, AsynchronousSocketChannel attachment) {
  22. exc.printStackTrace();
  23. }
  24. });
  25. //connect 的调用异步执行,马上完成,阻止 JVM 退出
  26. System.in.read();
  27. }
  28. AsynchronousSocketChannel channel;
  29. public void start(AsynchronousSocketChannel channel) {
  30. this.channel = channel;
  31. // 准备读缓冲区
  32. ByteBuffer buffer = ByteBuffer.allocateDirect(128);
  33. // 发出读操作请求,
  34. channel.read(buffer, buffer,
  35. // 读操作完成后通知
  36. new CompletionHandler<Integer, ByteBuffer>() {
  37. @Override
  38. public void completed(Integer result, ByteBuffer attachment) {
  39. if (result > 0) {
  40. // 简单处理读到的响应结果
  41. int position = attachment.position() - 1;
  42. if (attachment.get(position - 1) == 13 &&
  43. attachment.get(position) == 10) {
  44. if (isValidReply(attachment)) {
  45. attachment.flip();
  46. showReply(attachment);
  47. if (getReplyCode(attachment) == 220)
  48. login();
  49. }
  50. } else {
  51. // 继续读
  52. FTPClient1.this.channel.read(attachment, attachment, this);
  53. }
  54. } else {
  55. System.out.println("remote server closed");
  56. }
  57. }
  58. @Override
  59. public void failed(Throwable exc, ByteBuffer attachment) {
  60. exc.printStackTrace();
  61. }
  62. });
  63. }
  64. protected void login() {
  65. // 准备写缓冲区
  66. String user = "user anonymous\r\n";
  67. ByteBuffer buffer = ByteBuffer.wrap(user.getBytes());
  68. // 发出写操作请求
  69. channel.write(buffer, buffer,
  70. // 写操作完成通知
  71. new CompletionHandler<Integer, ByteBuffer>() {
  72. @Override
  73. public void completed(Integer result, ByteBuffer attachment) {
  74. if (attachment.hasRemaining())
  75. channel.write(attachment, attachment, this);
  76. else {
  77. // channel.read(dst, attachment, handler);
  78. // readReply();
  79. // 此处有问题
  80. }
  81. }
  82. @Override
  83. public void failed(Throwable exc, ByteBuffer attachment) {
  84. exc.printStackTrace();
  85. }
  86. });
  87. }
  88. protected void showReply(ByteBuffer attachment) {
  89. while (attachment.hasRemaining())
  90. System.out.print((char) attachment.get());
  91. }
  92. public static int getReplyCode(ByteBuffer buffer) {
  93. return Character.digit(buffer.get(0), 10) * 100 +
  94. Character.digit(buffer.get(1), 10) * 10
  95. + Character.digit(buffer.get(2), 10);
  96. }
  97. public static boolean isValidReply(ByteBuffer buffer) {
  98. return buffer.get(3) == 32 && Character.isDigit(buffer.get(0))
  99. && Character.isDigit(buffer.get(1))
  100. && Character.isDigit(buffer.get(2));
  101. }
  102. }

问题:上面的代码中,login 方法中,完成 login 命令之后,如何继续?

答案是:不能继续。实际上,上面的例子代码回到了同步处理时代。典型的错误使用方式。痛。 同时,CompletionHandler 的创建也成了问题,需要不停地创建这种类型的对象吗?痛! 回顾前面提到的核心:应用程序的责任:什么时候发起操作? I/O 操作完成时通知谁?

就本例而言,FTPClient 本身应该承担应用程序的责任,正如 Client 名称所示,应该由 Client 来实现 CompletionHandler。 Client 负责发出 I/O 操作请求,I/O 操作完成通知 Client。正如世界上其他诸多问题一样,名称本身就是个问题。此处的 Client 的意思是真正的顾客。

可以想象另外一个场景:去一个有叫号机的银行大厅办理业务。“我”到银行,“我”决定办理个人业务,所以取个人业务的号码。然后看看前面等待的其他客人还不少,计算一下时间,“我”决定去隔壁馋嘴一个冰淇淋,回来后,在大厅到处晃晃,这时候,大厅广播通知,333 号顾客请到 3 号窗口办理业务,“我”听到了,检查一下号码,“我”持有 333 号,所以“我”去 3 号窗口。

上面这个场景中有几个非常重要的事实 “我”决定取个人业务号码,“我”听到了,“我”是顾客。 因此,上面例子应该让 FTPClient1 实现 CompletionHandler。这是对的。但是 FTPClient1 有两个重要的职责:发出读操作请求和发出写操作请求。需要两个 CompletionHandler 的角色,但是不能重复实现 CompletionHandler 接口,此时内部类是个不错的选择。修改上面的例子,如下:

第二个简单的例子

  1. import java.io.IOException;
  2. import java.net.InetSocketAddress;
  3. import java.nio.ByteBuffer;
  4. import java.nio.channels.AsynchronousSocketChannel;
  5. import java.nio.channels.CompletionHandler;
  6. public class FTPClient2 {
  7. public static void main(String[] args) throws IOException {
  8. AsynchronousSocketChannel channel = AsynchronousSocketChannel.open();
  9. channel.connect(new InetSocketAddress("ftp.gnu.org", 21), channel,
  10. new CompletionHandler<Void, AsynchronousSocketChannel>() {
  11. @Override
  12. public void completed(Void result,
  13. AsynchronousSocketChannel attachment) {
  14. FTPClient2 client = new FTPClient2();
  15. client.start(attachment);
  16. }
  17. @Override
  18. public void failed(Throwable exc,
  19. AsynchronousSocketChannel attachment) {
  20. exc.printStackTrace();
  21. }
  22. });
  23. System.in.read();
  24. }
  25. AsynchronousSocketChannel channel;
  26. void readResponse() {
  27. ByteBuffer buffer = ByteBuffer.allocateDirect(128);
  28. read(buffer);
  29. }
  30. void read(ByteBuffer buffer) {
  31. channel.read(buffer, buffer, reader);
  32. }
  33. // 使用内部类接收读操作完成通知
  34. CompletionHandler<Integer, ByteBuffer> reader =
  35. new CompletionHandler<Integer, ByteBuffer>() {
  36. @Override
  37. public void completed(Integer result, ByteBuffer attachment) {
  38. if (result > 0) {
  39. int position = attachment.position() - 1;
  40. if (attachment.get(position - 1) == 13 &&
  41. attachment.get(position) == 10) {
  42. if (isValidReply(attachment, 0)) {
  43. attachment.flip();
  44. showReply(attachment);
  45. // 状态逻辑,处理响应
  46. onReply(getReplyCode(attachment, 0));
  47. } else {
  48. removeLine(attachment, position - 2);
  49. if (isValidReply(attachment, 0)) {
  50. attachment.flip();
  51. showReply(attachment);
  52. onReply(getReplyCode(attachment, 0));
  53. } else
  54. read(attachment);
  55. }
  56. } else {
  57. if (!attachment.hasRemaining())
  58. removeLine(attachment, position - 1);
  59. read(attachment);
  60. }
  61. } else {
  62. System.out.println("remote server closed");
  63. }
  64. }
  65. @Override
  66. public void failed(Throwable exc, ByteBuffer attachment) {
  67. exc.printStackTrace();
  68. }
  69. };
  70. public void start(AsynchronousSocketChannel channel) {
  71. this.channel = channel;
  72. readResponse();
  73. }
  74. protected void onReply(int replyCode) {
  75. // 按照前面定义好的步骤,处理状态逻辑
  76. if (replyCode == 220)
  77. login();
  78. if (replyCode == 230)
  79. writeCommand("size README");
  80. else if (replyCode == 213)
  81. writeCommand("QUIT");
  82. else if (replyCode == 221)
  83. try {
  84. channel.close();
  85. } catch (IOException e) {
  86. e.printStackTrace();
  87. }
  88. }
  89. void writeCommand(String string) {
  90. System.out.print("==>");
  91. System.out.println(string);
  92. ByteBuffer buffer = ByteBuffer.wrap((string + "\r\n").getBytes());
  93. write(buffer);
  94. }
  95. void write(ByteBuffer buffer) {
  96. channel.write(buffer, buffer, writer);
  97. }
  98. // 使用内部类接收写操作完成通知
  99. CompletionHandler<Integer, ByteBuffer> writer =
  100. new CompletionHandler<Integer, ByteBuffer>() {
  101. @Override
  102. public void completed(Integer result, ByteBuffer attachment) {
  103. if (attachment.hasRemaining())
  104. channel.write(attachment, attachment, this);
  105. else
  106. readResponse();
  107. }
  108. @Override
  109. public void failed(Throwable exc, ByteBuffer attachment) {
  110. exc.printStackTrace();
  111. }
  112. };
  113. protected void login() {
  114. String user = "user anonymous";
  115. writeCommand(user);
  116. }
  117. // 处理多行响应
  118. protected void removeLine(ByteBuffer buffer, int position) {
  119. int limit = buffer.position();
  120. byte c;
  121. while (position >= 0) {
  122. c = buffer.get(position);
  123. if (c == 13 || c == 10) {
  124. showReply(buffer, position);
  125. buffer.position(position + 1);
  126. buffer.limit(limit);
  127. buffer.compact();
  128. break;
  129. }
  130. position--;
  131. }
  132. }
  133. // 针对多数 FTP 服务器的响应的偷懒的方法,不用费劲处理 String。
  134. protected void showReply(ByteBuffer buffer) {
  135. while (buffer.hasRemaining())
  136. System.out.print((char) buffer.get());
  137. }
  138. protected void showReply(ByteBuffer buffer, int position) {
  139. for (int i = 0; i < position; i++)
  140. System.out.print((char) buffer.get(i));
  141. }
  142. public static int getReplyCode(ByteBuffer buffer, int start) {
  143. return Character.digit(buffer.get(start), 10) * 100 +
  144. Character.digit(buffer.get(start + 1), 10) * 10
  145. + Character.digit(buffer.get(start + 2), 10);
  146. }
  147. public static boolean isValidReply(ByteBuffer buffer, int start) {
  148. return buffer.get(start + 3) == 32 &&
  149. Character.isDigit(buffer.get(start))
  150. && Character.isDigit(buffer.get(start + 1))
  151. && Character.isDigit(buffer.get(start + 2));
  152. }
  153. public static int findCRLF(ByteBuffer buffer, int start, int end) {
  154. while (start < end) {
  155. if (buffer.get(start++) == 13) {
  156. if (start < end) {
  157. if (buffer.get(start) == 10) {
  158. return start + 1;
  159. }
  160. }
  161. }
  162. }
  163. return -1;
  164. }
  165. }

对比两个代码,可以发现:修改后的代码的 onReply 方法,与上文中描述的需求步骤基本上一模一样。与使用阻塞模式编写的代码相比,应该更加简洁。阻塞模式下,你至少需要一个控制循环。似乎有点快乐了。

继续 FTP 的编程,升华完成通知类型

因为读写操作的使用远远多于其他类型的操作,所以重点考虑如何处理读写操作。 回顾前面的第二个例子中的 reader 和 writer 成员, 其实与对象编程理论和实践中的一个很重要的原理“单一职责原理”比较吻合。 但是,如果需要写很多的网络程序,或者提供一个网络编程的框架(虽然现在有不少,例如:grizzly,JProactor),那么内部类的方式显然显得局限。

重用?重用什么?如何重用?

完成读写操作的回调次数?
发送 1K 的数据,到底需要几次回调呢?鬼知道。实际上鬼也不知道。

需要注意的是:AIO 读写操作并不保证操作一次全部完成。单个读写操作请求可能收到多次完成通知

多数网络应用程序发送响应或者请求消息,都需要将准备好的缓冲区全部内容发送出去。可以预见,前面的 writer 内部类成员可以独立,改编为抽象的 Writer 类型。这时候,前文中内部类的隐式引用好处就会失去,而且诞生出新的回调接口。

BUG ?太重复的劳动
很多情况下都应该检查 CompletionHandler.completed的 Integer 类型结果是否为 -1,看看是否通常已经关闭。
if(result==-1) {
// 通道已经关闭,执行 onChannelClosed
} else {
// 正常处理
}
实际上 AIO 的内核实现已经对 result 是否等于 -1 做出了判断,不知道基于何种考虑,completed 方法的 result 参数包含 -1 值。read() 方法的传统吗?这种处理直接导致人类的资源浪费:重复考虑这种判断,重复考虑判断后的处理。作者在 NIO2 的 dev list 中已经提起“诉讼”,但是看样子不果。

FTP 使用 telnet 协议的消息格式。消息以 <CRLF> 结束。 Telnet 协议家族的响应消息基本上都使用“code<SPACE>message<CRLF>“。

从处理 FTP 或者 telnet 协议家族的响应消息来看,前文的 reader 成员应该可以独立,至少可以抽象一个专门用于读取 Telnet 响应的 TelnetReader 类型。同样,也诞生出新的回调接口。对于 Reader 类型,还可以想象几种应用模式:

  • 读取指定长度的数据,SizeReader;
  • 一直读直到对方关闭通道,EOFReader。
  • 多数情况下读操作都会去检查读到的数据长度是否为 -1,以检测对方是否已经关闭通道

这样,对于 Reader 类型,某种程度的策略模式的应用需求已经浮现出来。

但是 Client 类型本身至少也可以实现一种类型的 CompletionHandler。如果这样,将产生一个争论:继承还是委托? 很多情况下这实际上是口味的问题,并非优劣的选择。

同时,对于读写操作而言,CompletionHandler 的类型是确定的 Integer 类型,似乎增加一个新的派生接口 Callback<T> 更加满足需要。

新的读写操作回调接口

  1. public interface Callback<T> extends CompletionHandler<Integer, T> {
  2. @Override
  3. void completed(Integer result, T context);
  4. @Override
  5. void failed(Throwable cause, T context);
  6. }

除上述考虑之外,最重要的一点是,有状态还是无状态。CompletionHandler 或者 Callback 接口本身无状态可言,但其实现存在有无状态的选择。AIO 内核并不关心 CompletionHandler 的 attachment 参数,内核不会使用也不会施加任何限制。但是实现类则大不同。有状态和无状态的设计将直接影响到 attachment 参数的使用。如您所看见,Callback 接口已经将 attachment 参数更名为 context。同时,因为 AsynchronousChannel 都需要 ByteBuffer,attachment 的使用也必须考虑 ByteBuffer 的使用方式。对于每一个读写操作而言,有三个因素是必须考虑的:AsynchronousChannel,ByteBuffer,attachment。普通应用程序也好,还是框架,实际上只考虑一个问题,就是如何组合这三个因素。某种程度上说,AIO 编程其实是 attachment编程,实不为过。怎一个痛字了得!

与此同时,因为诞生新的回调接口,预示着 Client 的层次在不断增加,也意味着 Client 的职责在进行分化。某些网络应用框架中的 filter 类型与此类似。

在没有更好的方案的时候,作者选择有状态方式的设计。

简单的有状态写操作类型

  1. public class BufferWriter implements Callback<WriteCallback> {
  2. private AsynchronousSocketChannel channel;
  3. private ByteBuffer buffer;
  4. private Charset charset;
  5. public BufferWriter(AsynchronousSocketChannel channel, Charset charset) {
  6. this.channel = channel;
  7. this.charset = charset;
  8. }
  9. public void write(String string, WriteCallback write) {
  10. buffer = ByteBuffer.wrap(string.getBytes(charset));
  11. channel.write(buffer, write, this);
  12. }
  13. @Override
  14. public void completed(Integer result, WriteCallback context) {
  15. if (buffer.hasRemaining())
  16. channel.write(buffer, context, this);
  17. else {
  18. buffer = null;
  19. context.writeCompleted();
  20. }
  21. }
  22. @Override
  23. public void failed(Throwable cause, WriteCallback context) {
  24. buffer = null;
  25. context.writeFailed(cause);
  26. }
  27. }

抽象读操作模板类型

  1. public abstract class AbstractReadCallback<T> implements Callback<T> {
  2. protected abstract void readCompleted(Integer result, T context);
  3. protected abstract void onChannelClose(T context);
  4. @Override
  5. public void completed(Integer result, T context) {
  6. // 重新分发回调通知
  7. if (result > 0)
  8. readCompleted(result, context);
  9. else
  10. onChannelClose(context);
  11. }
  12. }

简单的有状态读操作类型

  1. public class TelnetReplyReader extends AbstractReadCallback
  2. <ResponseCallback<Reply>> {
  3. private AsynchronousSocketChannel channel;
  4. private CharsetDecoder decoder;
  5. // 简单的 ByteBuffer 工厂,来自 JDK 的 corba 中的实现
  6. private ByteBufferPool pool;
  7. private ByteBuffer buffer;
  8. //FTP 响应数据对象
  9. private Reply reply = new Reply();
  10. public TelnetReplyReader(AsynchronousSocketChannel channel,
  11. ByteBufferPool pool, Charset charset) {
  12. this.channel = channel;
  13. this.pool = pool;
  14. decoder = charset.newDecoder();
  15. }
  16. public void read(ResponseCallback<Reply> protocol) {
  17. reply.reset();
  18. if (buffer == null)
  19. buffer = pool.get(1024);
  20. buffer.clear();
  21. channel.read(buffer, protocol, this);
  22. }
  23. @Override
  24. protected void onChannelClose(ResponseCallback<Reply> context) {
  25. try {
  26. channel.close();
  27. } catch (IOException e) {
  28. // ignore;
  29. }
  30. // 转换为特定的异常类型
  31. failed(new ClosedChannelException(), context);
  32. }
  33. @Override
  34. protected void readCompleted(Integer result,ResponseCallback<Reply> context){
  35. ByteBuffer buffer = this.buffer;
  36. try {
  37. // 响应代码的处理逻辑,直到获得有效的响应代码,否则哭到长城
  38. int position = buffer.position();
  39. if (buffer.get(position - 2) == 13 &&
  40. buffer.get(position - 1) == 10) {
  41. // Yes check reply code;
  42. if (findReplyCode(buffer, position - 2)) {
  43. // buffer position at the code first char;
  44. int first = buffer.position();
  45. reply.code = getReplyCode(buffer, first);
  46. if (first > 0) {
  47. buffer.flip();
  48. reply.other.append(decoder.decode(buffer));
  49. }
  50. buffer.limit(position - 2);
  51. buffer.position(first + 4);
  52. reply.message = decoder.decode(buffer).toString();
  53. returnBuffer();
  54. context.onResponse(reply);
  55. return;
  56. }
  57. buffer.flip();
  58. reply.other.append(decoder.decode(buffer));
  59. buffer.clear();
  60. channel.read(buffer, context, this);
  61. return;
  62. }
  63. // No reply code, consider cache other message
  64. if (buffer.hasRemaining()) {
  65. channel.read(buffer, context, this);
  66. return;
  67. }
  68. // Have to cache some message, but may be have reply code, so just check CRLF;
  69. int index = findLF(buffer, position - 2);
  70. if (index == -1) {
  71. buffer.flip();
  72. reply.other.append(decoder.decode(buffer));
  73. } else {
  74. buffer.position(0).limit(index + 1);
  75. reply.other.append(decoder.decode(buffer));
  76. buffer.position(index);
  77. }
  78. buffer.limit(position);
  79. buffer.compact();
  80. channel.read(buffer, context, this);
  81. } catch (CharacterCodingException ex) {
  82. failed(ex, context);
  83. }
  84. }
  85. @Override
  86. public void failed(Throwable cause, ResponseCallback<Reply> context) {
  87. returnBuffer();
  88. context.failed(cause);
  89. }
  90. private void returnBuffer() {
  91. pool.releaseBuffer(buffer);
  92. buffer = null;
  93. }
  94. ...

使用有状态读写操作类型的控制类

  1. public class FTPClient implements ResponseCallback<Reply>,
  2. WriteCallback, CommandProvider {
  3. private TelnetReplyReader reader;
  4. private BufferWriter writer;
  5. private Semaphore semaphore = new Semaphore(0);
  6. // 传输通道的处理环境
  7. private TransferContext transferContext;
  8. protected void start(Context context, AsynchronousSocketChannel channel) {
  9. InetSocketAddress remote;
  10. try {
  11. remote = (InetSocketAddress) channel.getRemoteAddress();
  12. } catch (IOException e) {
  13. failed(e);
  14. return;
  15. }
  16. InetSocketAddress local;
  17. try {
  18. local = (InetSocketAddress) channel.getLocalAddress();
  19. } catch (IOException e) {
  20. failed(e);
  21. return;
  22. }
  23. Charset charset = Charset.forName("UTF-8");
  24. reader = new TelnetReplyReader(channel, context.pool(), charset);
  25. writer = new BufferWriter(channel, charset);
  26. // 发起读操作请求
  27. reader.read(this);
  28. // 同时,预备传输通道环境
  29. transferContext = new SimpleTransferContext(context,
  30. remote.getAddress(), local.getAddress());
  31. }
  32. @Override
  33. public void onResponse(Reply reply) {
  34. // 简单的响应处理逻辑
  35. try {
  36. transferContext.check(reply);
  37. } catch (Throwable ex) {
  38. ex.printStackTrace();
  39. }
  40. // If reply not process right, just pending any advance operation.
  41. if (reply.code / 100 == 1)
  42. reader.read(this);
  43. else
  44. semaphore.release();
  45. }
  46. @Override
  47. public void writeCompleted() {
  48. //FTP 规则,发出请求命令后,开始等待对方的响应
  49. reader.read(this);
  50. }
  51. ...

除协议相关的部分代码,其余的看上去还蛮简单,似乎抽象 Reader 和 Writer 的代价值得的。上面代码中的 Context,Reply 等小类型,可以在完整的源代码中检查。

继续 FTP,处理传输通道

本文之所以选择 FTP 作为 AIO 的实践例子,FTP 的控制通道必须协调单独的数据传输通道。不仅如此,使用 Port 方式的话,客户端程序还需要建立一个简单的网络服务器。

关于防火墙
本文的编程环境是 Windows7,由于防火墙的原因,忽略服务器方式。 如果网络程序出现故障,防火墙是否为问题的根源,可以优先考虑。 作者在实践过程中,曾经遭遇到这样的问题。Windows 7 自带的防火墙识别 FTP 的 PASV 命令,并且阻止该命令的执行。 而且,AIO 的内核实现使用转换后系统错误消息作为异常消息,会让你痛的哭。但是,请放心,长城不会倒。

上文中,我们尽量回避建立网络连接的 CompletionHandler 的再处理问题。FTP 的数据传输通道

  • 要么使用服务器方式,使用 AsynchronousServerSocketChannel.accept 方法
  • 要么使用客户端方式,使用 AsynchronousSocketChannel.connect 方法,与前文类似

延续上文的处理思路,继续抽象用于 connect 的 CompletionHandler 类型。与前不同的是,该连接回调类型使用无状态方式设计。该例演示下载文件的处理。

无状态类型的连接回调类型

  1. public class SocketConnector implements Connector<Object[]> {
  2. public void connect(InetSocketAddress remote, ConnectionCallback client)
  3. throws IOException {
  4. // 创建新的异步网络通道
  5. AsynchronousSocketChannel channel = AsynchronousSocketChannel.open();
  6. // 无状态方式处理,将所有需要的参数打包为单个 attachment 参数
  7. Object[] attachment = { client, remote, channel };
  8. // 启动连接操作
  9. channel.connect(remote, attachment, this);
  10. }
  11. public void connect(InetSocketAddress remote, InetSocketAddress local,
  12. ConnectionCallback client)throws IOException {
  13. AsynchronousSocketChannel channel = AsynchronousSocketChannel.open();
  14. // 绑定本地网络地址,对于客户端而言,通常是 IP,对于服务器而言,一定需要端口号
  15. channel.bind(local);
  16. Object[] attachment = { client, remote, channel };
  17. channel.connect(remote, attachment, this);
  18. }
  19. @Override
  20. public void completed(Void result, Object[] attachment) {
  21. // 连接完成,通知 Client 启动协议控制逻辑
  22. ((ConnectionCallback) attachment[0]).
  23. start((AsynchronousSocketChannel) attachment[2]);
  24. }
  25. @Override
  26. public void failed(Throwable cause, Object[] attachment) {
  27. ((ConnectionCallback) attachment[0]).
  28. connectFailed(new Exception(attachment[1].toString(), cause));
  29. }
  30. }

但是,有些 FTP 服务器要求数据传输通道必须使用与控制通道相同的 ip 地址,导致连接必须知道并保持控制通道的 ip 地址。唉!又痛到有状态方式了。

有状态类型的连接回调类型

  1. public class TransferConnector extends SocketConnector {
  2. private InetAddress localAddress;
  3. private InetAddress remoteAddress;
  4. public TransferConnector(InetAddress remoteAddress, InetAddress localAddress) {
  5. this.remoteAddress = remoteAddress;
  6. this.localAddress = localAddress;
  7. }
  8. protected InetSocketAddress createRemoteAddress(int port) {
  9. return new InetSocketAddress(remoteAddress, port);
  10. }
  11. protected InetSocketAddress createLocalAddress() {
  12. return new InetSocketAddress(localAddress, 0);
  13. }
  14. public void connect(int port, ConnectionCallback client) throws IOException {
  15. if (port < 1)
  16. throw new IOException("Error remote server port number: " + port);
  17. super.connect(createRemoteAddress(port), createLocalAddress(), client);
  18. }
  19. }

使用连接回调类型建立数据传输通道

  1. public class SimpleTransferContext
  2. implements TransferContext, ConnectionCallback, FileLockCallback {
  3. ...
  4. // 使用单独的传输连接回调对象再次进行连接完成通知
  5. connector = new TransferConnector(remoteAddress, localAddress);
  6. ...
  7. // 发起传输通道的连接操作请求
  8. case RETR:
  9. connector.connect(port, this);
  10. // clear for next time
  11. port = 0;
  12. ...
  13. // 传输通道连接完成
  14. @Override
  15. public void start(AsynchronousSocketChannel channel) {
  16. this.channel = channel;
  17. }

因为涉及到文件的处理,FTP 的数据传输通道起始控制看起来相当简单。快乐其实是很简单的东西。

继续 FTP,使用 AIO 的异步文件操作

AsynchronousFileChannel 没有 connect 方法,但是有一个类似的方法 lock。JDK7 中该方法的声明如下:

异步文件通道的 lock 方法 API

  1. public abstract <A> void lock(long position,
  2. long size,
  3. boolean shared,
  4. A attachment,
  5. CompletionHandler<FileLock,? super A> handler)

无状态的文件连接回调类型

  1. public class FileLocker implements CompletionHandler<FileLock, FileLockCallback> {
  2. public void lock(String filename, long position, long size,
  3. boolean shared, FileLockCallback client,
  4. OpenOption... options) throws IOException {
  5. // 使用新的 AIO 中的 Path API
  6. Path path = Paths.get(filename);
  7. // 创建异步文件通道对象
  8. AsynchronousFileChannel file = AsynchronousFileChannel.open(path, options);
  9. // 锁定要写的区域
  10. file.lock(position, size, shared, client, this);
  11. }
  12. @Override
  13. public void completed(FileLock result, FileLockCallback attachment) {
  14. // 文件锁(或者文件连接)完成通知传输通道环境可以工作
  15. attachment.start(result);
  16. }
  17. @Override
  18. public void failed(Throwable cause, FileLockCallback attachment) {
  19. attachment.lockFailed(cause);
  20. }
  21. }

使用文件连接回调类型建立文件通道

  1. public class SimpleTransferContext
  2. implements TransferContext, ConnectionCallback, FileLockCallback {
  3. ...
  4. @Override
  5. public void start(FileLock fileLock) {
  6. this.fileLock = fileLock;
  7. // at here socket channel already prepared
  8. // 启动下载过程
  9. startDownload();
  10. }
  11. private Downloader download;
  12. private void startDownload() {
  13. download = new Downloader(context, channel, fileLock, size);
  14. channel = null;
  15. fileLock = null;
  16. download.run();
  17. }
  18. public void check(Reply reply) {
  19. if (currentCommand == null) {
  20. System.out.println(reply);
  21. return;
  22. }
  23. int code = reply.code;
  24. String message = reply.message;
  25. switch (currentCommand) {
  26. case SIZE...
  27. case RETR:
  28. if (code == 150) {
  29. // 150 Opening BINARY mode data connection for README (1765 bytes).
  30. int end = message.lastIndexOf(')');
  31. if (end != -1) {
  32. int start = message.lastIndexOf('(', end - 1);
  33. if (start != -1) {
  34. //RETR 命令响应正确,检查本地文件,预备下载
  35. lockFile(checkSize(message.substring(start + 1, end - 6)));
  36. break;
  37. }
  38. }
  39. ...
  40. }
  41. protected void lockFile(long size) {
  42. try {
  43. locker.lock(filename, 0, size, false, this,
  44. StandardOpenOption.CREATE,
  45. StandardOpenOption.READ,
  46. StandardOpenOption.WRITE);
  47. } catch (IOException e) {
  48. e.printStackTrace();
  49. }
  50. }

当 FTP 的 RETR 命令正确响应后,准备下载文件。首先准备好要写入的本地文件通道,锁住文件。 文件锁完成后,创建新的 Downloader 对象,开始真正的下载操作。

使用文件连接回调类型建立文件通道

  1. public abstract class Transfer {
  2. // 用于 Socket 和 File 读写操作使用的 ByteBuffer 的交换队列
  3. protected BlockingDeque<ByteBuffer> bufferQueue =
  4. new LinkedBlockingDeque<ByteBuffer>();
  5. protected Context context;
  6. public Transfer(Context context) {
  7. this.context = context;
  8. }
  9. public ByteBuffer getBuffer(int size) {
  10. return context.pool().get(size);
  11. }
  12. protected void releaseBuffer(ByteBuffer buffer) {
  13. context.pool().releaseBuffer(buffer);
  14. }
  15. }

下载实现,读和写

  1. public class Downloader extends Transfer
  2. implements ReadCallback, FileWriteCallback2, Runnable {
  3. // 读入指定长度内容的回调对象,处理网络内容
  4. private SizeReader reader;
  5. // 写入指定长度内容的回调对象,处理文件内容
  6. private FileWriter2 writer;
  7. private AtomicBoolean writable = new AtomicBoolean(true);
  8. // 用于显示网络数据传输速率的工具
  9. private ConsoleProgress progress = new ConsoleProgress();
  10. public Downloader(Context context, AsynchronousSocketChannel socket,
  11. FileLock fileLock, long size) {
  12. super(context);
  13. reader = new SizeReader(socket, size, this);
  14. writer = new FileWriter2(fileLock, this);
  15. progress.reset(size);
  16. }
  17. @Override
  18. public void run() {
  19. reader.read();
  20. }
  21. @Override
  22. public void writeCompleted(ByteBuffer buffer) {
  23. // 一个缓冲区写入文件完毕
  24. releaseBuffer(buffer);
  25. buffer = bufferQueue.poll();
  26. if (buffer != null)
  27. // 如果网络已经读好一个缓冲区,继续写入文件
  28. writer.write(buffer);
  29. else
  30. // 否则清除写状态
  31. writable.set(true);
  32. }
  33. @Override
  34. public void readCompletedBytes(Integer bytes, long start, long end) {
  35. // 显示网络传输进度
  36. progress.update(bytes, start, end);
  37. progress.run();
  38. }
  39. @Override
  40. public void completedReadBuffer(ByteBuffer buffer) {
  41. if (writable.compareAndSet(true, false)) {
  42. // 从网络下载了一个缓冲区的内容,如果写文件空闲,通知写文件
  43. writer.write(buffer);
  44. } else {
  45. // 如果文件正在写,将当前缓冲区放入后备队列
  46. bufferQueue.offer(buffer);
  47. }
  48. }
  49. @Override
  50. public void writeCompleted() {
  51. System.out.println("file saved OK");
  52. }
  53. @Override
  54. public void readCompleted() {
  55. System.out.println("file transfer OK");
  56. }
  57. ...
  • Downloader 使用 SizeReader 读取网络数据。SizeReader 使用自主的缓冲区申请,不需要调用者传递 ByteBuffer 参数。
  • Downloader 使用 FileWriter2 写文件内容。FileWriter2 使用一次性写完外部传递的缓冲区的策略。 需要调用者传递 ByteBuffer 参数。

总结

线程池和 Group

前文提到到 group,但是没有解释。group 指 AsynchronousChannelGroup,用于管理异步通道资源的环境对象,封装一个处理 I/O 完成的机制。 这个组对象关联一个线程池。可以将处理 I/O 事件的任务提交到这个线程池,通过 channel 的 read,write,connect 等方法进行。线程池中的工作线程将会带着 channel 上 I/O 操作结果调用 CompletionHandler.complete方法。除了处理 I/O 事件,组关联的线程池可能会执行其他与 I/O 操作相关的任务。这个 group 对象相当于 Proactor 模式中 Dispatcher。

四种异步通道的 open 方法可以指定 group 参数,或者不指定。 每个异步通道都必须关联一个组,要么是系统默认组,要么是创建的一个特定的组。例如,不能直接从一个 socket 对象上创建一个 AsynchronousSocketChannel。 如果不使用 group 参数,java 使用一个默认的系统范围的组对象。系统默认的组对象的线程池参数可以使用两个属性进行配置:

  1. java.nio.channels.DefaultThreadPool.threadFactory 默认组对象不会将其关联的线程池中的线程进行额外的配置,因此,这些线程都是 daemon 线程。
  2. java.nio.channels.DefaultThreadPool.initialSize: 处理 I/O 事件的最大线程数量。

是否使用自定义的 group 对象,各有优劣,由你决定。

  • 使用 group,好处是你可以将文件通常与网络通道分开,避免线程干扰。缺点是:使用者通常必须负责关闭组,多数时候取决于使用的现成工厂类型。组与 ExecutorService 类似,这意味着关闭过程通常是两步关闭方法。 在多层次 Client 结构(例如 FTP 的控制通道需要衍生新的数据传输通道)中,如果要使用 group,很讨厌的一点就是 group 参数传递。没有环境编程之类的工具进行辅助的话,使用者必须考虑如何有效传递 group 参数。
  • 不使用 group,最大的好处是不用传递 group 参数。缺点是:必须注意处理非 daemon 线程的完成和退出,不小心的话,将会导致异步通道的工作丢失;同时还需要处理线程工厂和最大线程数的配置。

*PendingException 和 AsynchronousChannel

AsynchronousChannel 设计为线程安全的,即可以同时进行读写操作,全双工模式操作。不少协议使用半双工模式。读完写或者写完读。什么时候会进行并发访问 AsynchronousChannel,即使用全双工模式?主要看协议的实现。例如 FTP 的 abort 命令,要求可以控制连接可以同时进行读写。数据连接在进行文件传输的时候,控制连接等待服务器响应。实际上此时也可以进行写操作,发送一个 abort 命令,促使数据传输过程中断。这个 abort 可以从 UI 线程或者从 UI 事件产生的线程中发出。虽然如此,但是不少系统实现最多只允许一个写操作和一个读操作。如果一个读写操作没有完成,程序又发送一个读写操作命令,则导致 ReadPendingException 或者 WritePendingException。如果你的程序非要这样的话,只有一个解决办法,将读写操作的命令使用队列排队进行。通常应该不会出现这种需求,如果有的话,很有可能是设计上的缺陷。

读写超时。AsynchronousChannel 的读写操作可以指定超时参数,但是超时发生之后,传递给读写操作的 ByteBuffer 参数不应该向正常读写完成一样进行处理。通常设计如果超时发生,一般应该丢弃当前期望数据结果。

ByteBuffer 和解码

AIO 鼓励使用 DirectByteBuffer。就算应用程序代码中不使用 DirectByteBuffer,AIO 内核实现也会使用 DirectByteBuffer 来复制外部传入的 HeadByteBuffer 内容。在某些情况下完全可以利用这一特征,偷懒而不会有损失。例如:传输协议中发送普通命令,完全可以不使用 DirectByteBuffer,这些命令的提供通常以 String 类型出现,而 String 到 DirectByteBuffer 无论如何必须经过两个步骤: String—byte[]—DirectByteBuffer. 第二步完全可以由 AIO 内核进行。

如果需要从 DirectByteBuffer 解码到 String,有选择余地:

  • 使用 Decoder 和 CharBuffer:DirectByteBuffer—CharBuffer—(char[])String。
  • 使用 String 和 byte[]:DirectByteBuffer—byte[]—(char[])String

可以看出,这种情况数组复制的工作量不小。如果没有使用 Javolution 方式的栈内存分配和对象工厂,其实没有什么区别。

关于性能
Java 已经不少的 NIO 类型的框架, 这里有个很有意思:“Announcement: Java NIO Framework” “也许您想要确认某些: performance comparison: nio v nio2” 从第二个例子可以看出,使用 AIO 方式进行有时候出奇的简单,真让人快乐。 本文提供的 FTPClient 的例子 main 演示了单个目标下载,在测试过程中与 c 语言实现的 wget 比较毫不逊色。 简单修改一下就可以执行多个目标下载,应该更快乐。

本文转载自:在 Java 7 中体会 NIO.2 异步执行的快乐

评论

发表评论 点击刷新验证码

提示

该功能暂未开放