用Java实现非阻塞通信

用ServerSocket和Socket来编写服务器程序和客户程序,是Java网络编程的最基本的方式。这些服务器程序或客户程序在运行过程中常常会阻塞。例如当一个线程执行ServerSocket的accept()方法时,假如没有客户连接,该线程就会一直等到有了客户连接才从accept()方法返回。再例如当线程执行Socket的read()方法时,如果输入流中没有数据,该线程就会一直等到读入了足够的数据才从read()方法返回。

假如服务器程序需要同时与多个客户通信,就必须分配多个工作线程,让它们分别负责与一个客户通信,当然每个工作线程都有可能经常处于长时间的阻塞状态。

从JDK1.4版本开始,引入了非阻塞的通信机制。服务器程序接收客户连接、客户程序建立与服务器的连接,以及服务器程序和客户程序收发数据的操作都可以按非阻塞的方式进行。服务器程序只需要创建一个线程,就能完成同时与多个客户通信的任务。

非阻塞的通信机制主要由java.nio包(新I/O包)中的类实现,主要的类包括ServerSocketChannel、SocketChannel、Selector、SelectionKey和ByteBuffer等。

一、线程阻塞

在生活中,最常见的阻塞现象是公路上汽车的堵塞。汽车在公路上快速运行,如果前方交通受阻,就只好停下来等待,等到公路顺畅,才能恢复运行。

线程在运行中也会因为某些原因而阻塞。所有处于阻塞状态的线程的共同特征是:放弃CPU,暂停运行,只有等到导致阻塞的原因消除,才能恢复运行;或者被其他线程中断,该线程会退出阻塞状态,并且抛出InterruptedException。

1.线程阻塞的原因

导致线程阻塞的原因主要有以下方面:

  • 线程执行了Thread.sleep(int n)方法,线程放弃CPU,睡眠n毫秒,然后恢复运行。
  • 线程要执行一段同步代码,由于无法获得相关的同步锁,只好进入阻塞状态,等到获得了同步锁,才能恢复运行。
  • 线程执行了一个对象的wait()方法,进入阻塞状态,只有等到其他线程执行了该对象的notify()或notifyAll()方法,才可能将其唤醒。
  • 线程执行I/O操作或进行远程通信时,会因为等待相关的资源而进入阻塞状态。例如当线程执行System.in.read()方法时,如果用户没有向控制台输入数据,则该线程会一直等读到了用户的输入数据才从read()方法返回。

进行远程通信时,在客户程序中,线程在以下情况可能进入阻塞状态:

  • 请求与服务器建立连接时,即当线程执行Socket的带参数的构造方法,或执行Socket的connect()方法时,会进入阻塞状态,直到连接成功,此线程才从Socket的构造方法或connect()方法返回。
  • 线程从Socket的输入流读入数据时,如果没有足够的数据,就会进入阻塞状态,直到读到了足够的数据,或者到达输入流的末尾,或者出现了异常,才从输入流的read()方法返回或异常中断。输入流中有多少数据才算足够呢?这要看线程执行的read()方法的类型:
  1.  
    1. int read():只要输入流中有一个字节,就算足够。
    2. int read(byte[] buff):只要输入流中的字节数目与参数buff数组的长度相同就算足够。
    3. String readLine():只要输入流中有一行字符串,就算足够。值得注意的是InputStream类并没有readLine()方法,在过滤流BufferedReader类中才有此方法。
  • 线程向Socket的输出流写一批数据时,可能会进入阻塞状态,等到输出了所有的数据,或者出现异常,才从输出流的write()方法返回或异常中断。
  • 当调用Socket的setSoLinger()方法设置了关闭Socket的延迟时间,那么当线程执行Socket的close()方法时,会进入阻塞状态,直到底层Socket发送完所有剩余数据,或者超过了setSoLinger()方法设置的延迟时间,才从close()方法返回。

在服务器程序中,线程在以下情况可能会进入阻塞状态:

  • 线程执行ServerSocket的accept()方法,等待客户的连接,直到接收到了客户连接,才从accept()方法返回。
  • 线程从Socket的输入流读入数据时, 如果输入流没有足够的数据,就会进入阻塞状态。
  • 线程向Socket的输出流写一批数据时,可能会进入阻塞状态,等到输出了所有的数据,或者出现异常,才从输出流的write()方法返回或异常中断。

由此可见,无论是在服务器程序还是客户程序中,当通过Socket的输入流和输出流来读写数据时,都可能进入阻塞状态。这种可能出现阻塞的输入和输出操作被称为阻塞I/O。与此对照,如果执行输入和输出操作时,不会发生阻塞,则称为非阻塞I/O。

2.服务器程序用多线程处理阻塞通信的局限

图1显示了服务器程序用多线程来同时处理多个客户连接的工作流程。主线程负责接收客户的连接。在线程池中有若干工作线程,它们负责处理具体的客户连接。每当主线程接收到一个客户连接,主线程就会把与这个客户交互的任务交一个空闲的工作线程去完成,主线程继续负责接收下一个客户连接。

a 

 图1  服务器程序用多线程处理阻塞通信

在图1中,用粗体框标识的步骤为可能引起阻塞的步骤。可以看出,当主线程接收客户连接,以及工作线程执行I/O操作时,都有可能进入阻塞状态。

服务器程序用多线程来处理阻塞I/O,尽管能满足同时响应多个客户请求的需求,但是有以下局限:

(1)Java虚拟机会为每个线程分配独立的堆栈空间,工作线程数目越多,系统开销就越大,而且增加了Java虚拟机调度线程的负担,增加了线程之间同步的复杂性,提高了线程死锁的可能性。

(2)工作线程的许多时间都浪费在阻塞I/O操作上,Java虚拟机需要频繁地转让CPU的使用权,使进入阻塞状态的线程放弃CPU,再把CPU分配给处于可运行状态的线程。

由此可见,工作线程并不是越多越好。如图2所示,保持适量的工作线程,会提高服务器的并发性能,但是当工作线程的数目到达某个极限,超出了系统的负荷时,反而会降低并发性能,使得多数客户无法快速得服务器的响应。

 

b 

图2线程数目与并发技能的关系 

3.非阻塞通信的基本思想

假如同事要做两件事:烧开水和烧粥。烧开水的步骤如下: 

锅里放水,打开煤气炉;

等待水烧开; //阻塞

关闭煤气炉,把开水灌到水壶里;

烧烧粥的步骤如下:

锅里放水和米,打开煤气炉;

等待粥烧开; //阻塞

调整煤气炉,改为小火;

等待粥烧熟; //阻塞

关闭煤气炉;

为了同时完成两件事,一种方案是同时请两个人分别做其中的一件事,这相当于采用多线程来同时完成多个任务。还有一种方案是让一个人同时完成两件事,这个人应该善于利用一件事的空闲时间去做另一件事,这个人一刻也不应该闲着:

锅里放水,打开煤气炉; //开始烧开水

锅里放水和米,打开煤气炉; //开始烧粥

while(一直等待,直到有水烧开、粥烧开或粥烧熟事件发生){  //阻塞

if(水烧开)

关闭煤气炉,把开水灌到水壶里;

if(粥烧开)

调整煤气炉,改为小火;

if(粥烧熟)

关闭煤气炉;

}

这个人不断监控烧水以及烧粥的状态,如果发生了“水烧开”、“粥烧开”或“粥烧熟”事件,就去处理这些事件,处理完一件事后继续监控烧水以及烧粥的状态,直到所有的任务都完成。

以上工作方式也可以运用到服务器程序中,服务器程序只需要一个线程就能同时负责接收客户的连接、接收各个客户发送的数据,以及向各个客户发送响应数据。服务器程序的处理流程如下:

while(一直等待,直到有接收连接就绪事件、读就绪事件或写就绪事件发生){ //阻塞

if(有客户连接)

接收客户的连接;  //非阻塞

if(某个Socket的输入流中有可读数据)

从输入流中读数据;  //非阻塞

if(某个Socket的输出流可以写数据)

向输出流写数据;  //非阻塞

}

以上处理流程采用了轮询的工作方式,当某一种操作就绪,就执行该操作,否则就察看是否还有其他就绪的操作可以执行。线程不会因为某一个操作还没有就绪,就进入阻塞状态,一直傻傻地在那里等待这个操作就绪。

为了使轮询的工作方式顺利进行,接收客户的连接、从输入流读数据、以及向输出流写数据的操作都应该以非阻塞的方式运行。所谓非阻塞,就是指当线程执行这些方法时,如果操作还没有就绪,就立即返回,而不会一直等到操作就绪。例如当线程接收客户连接时,如果没有客户连接,就立即返回;再例如当线程从输入流中读数据时,如果输入流中还没有数据,就立即返回,或者如果输入流还没有足够的数据,那么就读取现有的数据,然后返回。值得注意的是,以上while循环条件中的操作还是按照阻塞方式进行的,如果未发生任何事件,就会进入阻塞状态,直到接收连接就绪事件、读就绪事件或写就绪事件中至少有一个事件发生,此时就会执行while循环体中的操作。

二、java.nio包中的主要类

java.nio包提供了支持非阻塞通信的类,主要包括:

  • ServerSocketChannel:ServerSocket的替代类,支持阻塞通信与非阻塞通信。
  • SocketChannel:Socket的替代类,支持阻塞通信与非阻塞通信。
  • Selector:为ServerSocketChannel监控接收连接就绪事件,为SocketChannel监控连接就绪、读就绪和写就绪事件。
  • SelectionKey:代表ServerSocketChannel以及SocketChannel向Selector注册事件的句柄。当一个SelectionKey对象位于Selector对象的selected-keys集合中,就表示与这个SelectionKey对象相关的事件发生了。

ServerSocketChannel以及SocketChannel都是SelectableChannel的子类,如图3所示。SelectableChannel类以及其子类都能委托Selector来监控它们可能发生的一些事件,这种委托过程也称为注册事件过程。

 

 c

图3  SelectableChannel类及其子类的类框图

ServerSocketChannel向Selector注册接收连接就绪事件的代码如下:

SelectionKey key=serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);

SelectionKey类的一些静态常量表示事件类型,ServerSocketChannel只可能发生一种事件:

  • SelectionKey.OP_ACCEPT:接收连接就绪事件,表示至少有了一个客户连接,服务器可以接收这个连接。

SocketChannel可能发生以下三种事件:

  • SelectionKey.OP_CONNECT:连接就绪事件,表示客户与服务器的连接已经建立成功。
  • SelectionKey.OP_READ:读就绪事件,表示输入流中已经有了可读数据,可以执行读操作了。
  • SelectionKey.OP_WRITE:写就绪事件,表示已经可以向输出流写数据了。

SocketChannel提供了接收和发送数据的方法:

  • read(ByteBuffer buffer):接收数据,把它们存放到参数指定的ByteBuffer中。
  • write(ByteBuffer buffer):把参数指定的ByteBuffer中的数据发送出去。

ByteBuffer表示字节缓冲区,SocketChannel的read()和write()方法都会操纵ByteBuffer。ByteBuffer类继承于Buffer类。ByteBuffer中存放的是字节,为了把它们转换为字符串,还需要用到Charset类,Charset类代表字符编码,它提供了把字节流转换为字符串(解码过程)和把字符串转换为字节流(编码过程)的实用方法。

三、非阻塞编程实例  

1.创建非阻塞的EchoServer

在非阻塞模式下,EchoServer只需要启动一个主线程,就能同时处理三件事:

● 接收客户的连接。

● 接收客户发送的数据。

● 向客户发回响应数据。

EchoServer委托Selector来负责监控接收连接就绪事件、读就绪事件和写就绪事件,如果有特定事件发生,就处理该事件。

EchoServer类的构造方法负责启动服务器,把它绑定到一个本地端口,代码如下:

 ###################

//创建一个Selector对象
selector = Selector.open();

//创建一个ServerSocketChannel对象
serverSocketChannel = ServerSocketChannel.open();

//使得在同一个主机上关闭了服务器程序,紧接着再启动该服务器程序时,可以顺利绑定到相同的端口
serverSocketChannel.socket().setReuseAddress(true);

//使ServerSocketChannel工作于非阻塞模式
serverSocketChannel.configureBlocking(false);

//把服务器进程与一个本地端口绑定
serverSocketChannel.socket().bind(new InetSocketAddress(port));

 #################

EchoServer类的service()方法负责处理本节开头所说的三件事,体现其主要流程的代码如下: 

 #############

public void service() throws IOException
{
 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

 while( selector.select() > 0 )
 { //第一层while循环
  Set readyKeys = selector.selectedKeys(); //获得Selector的selected-keys集合
  Iterator it = readyKeys.iterator();

  while( it.hasNext() )
  { //第二层while循环
   SelectionKey key = null;
   try
   { //处理SelectionKey
    key = (SelectionKey)it.next(); //取出一个SelectionKey
    it.remove(); //把SelectionKey从Selector的selected-key集合中删除
    if( key.isAcceptable() )
    {
     //处理接收连接就绪事件;
    }
    if( key.isReadable() )
    {
     //处理读就绪事件;
    }
    if( key.isWritable() )
    {
     //处理写就绪事件;
    }
   }
   catch( IOException e )
   {
    e.printStackTrace();
    try
    {
     if( key != null )
     {
      //使这个SelectionKey失效, 使得Selector不再监控这个SelectionKey感兴趣的事件
      key.cancel();
      key.channel().close(); //关闭与这个SelectionKey关联的SocketChannel
     }
    }
    catch( Exception ex )
    {
     e.printStackTrace();
    }
   }
  }//#while
 }//#while
}

#################

在service()方法中,首先由ServerSocketChannel向Selector注册接收连接就绪事件。如果Selector监控到该事件发生,就会把相应的SelectionKey对象加入到selected-keys集合中。service()方法接下来在第一层while循环中不断询问Selector已经发生的事件,然后依次处理每个事件。

Selector的select()方法返回当前相关事件已经发生的SelectionKey的个数。如果当前没有任何事件发生,select()方法就会阻塞下去,直到至少有一个事件发生。Selector的selectedKeys()方法返回selected-keys集合,它存放了相关事件已经发生的SelectionKey对象。

service()方法在第二层while循环中,从selected-keys集合中依次取出每个SelectionKey对象,把它从selected-keys集合中删除,然后调用isAcceptable()、isReadable()和isWritable()方法判断到底是哪种事件发生了,从而作出相应的处理。处理每个SelectionKey的代码放在一个try语句中,如果出现异常,就会在catch语句中使这个SelectionKey失效,并且关闭与之关联的Channel。

(1)处理接收连接就绪事件

service()方法中处理接收连接就绪事件的代码如下:

###############

if( key.isAcceptable() )
{
 //获得与SelectionKey关联的ServerSocketChannel
 ServerSocketChannel ssc = (ServerSocketChannel)key.channel();
 
 //获得与客户连接的SocketChannel
 SocketChannel socketChannel = (SocketChannel)ssc.accept();
 System.out.println(“接收到客户连接,来自:” + socketChannel.socket().getInetAddress() + “:”
   + socketChannel.socket().getPort());
 
 //把SocketChannel设置为非阻塞模式
 socketChannel.configureBlocking(false);
 
 //创建一个用于存放用户发送来的数据的缓冲区
 ByteBuffer buffer = ByteBuffer.allocate(1024);
 
 //SocketChannel向Selector注册读就绪事件和写就绪事件, 关联了一个buffer附件
 socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, buffer);
}

################# 

如果SelectionKey的isAcceptable()方法返回true,就意味着这个 SelectionKey所感兴趣的接收连接就绪事件已经发生了。service()方法首先通过SelectionKey的channel()方法获得与之关联的ServerSocketChannel对象,然后调用ServerSocketChannel的accept()方法获得与客户连接的SocketChannel对象。这个SocketChannel对象默认情况下处于阻塞模式。如果希望它执行非阻塞的I/O操作,需要调用它的configureBlocking(false)方法。SocketChannel调用Selector的register()方法来注册读就绪事件和写就绪事件,还向register()方法传递了一个ByteBuffer类型的参数,这个ByteBuffer将作为附件与新建的SelectionKey对象关联。

(2)处理读就绪事件

如果SelectionKey的isReadable()方法返回true,就意味着这个SelectionKey所感兴趣的读就绪事件已经发生了。EchoServer类的receive()方法负责处理这一事件: 

############
public void receive(SelectionKey key) throws IOException
{
 //获得与SelectionKey关联的附件
 ByteBuffer buffer = (ByteBuffer)key.attachment();
 
 //获得与SelectionKey关联的SocketChannel
 SocketChannel socketChannel = (SocketChannel)key.channel();
 
 //创建一个ByteBuffer,用于存放读到的数据
 ByteBuffer readBuff = ByteBuffer.allocate(32);
 socketChannel.read(readBuff);
 readBuff.flip();
 
 //把buffer的极限设为容量
 buffer.limit(buffer.capacity());
 
 //把readBuff中的内容拷贝到buffer中,假定buffer的容量足够大,不会出现缓冲区溢出异常
 buffer.put(readBuff);
}
####################

在receive()方法中,先获得与这个SelectionKey关联的ByteBuffer和SocketChannel。SocketChannel每次读到的数据都被添加到这个ByteBuffer,在程序中,由buffer变量引用这个ByteBuffer对象。在非阻塞模式下,socketChannel.read(readBuff)方法读到多少数据是不确定的,假定读到的字节为n个,那么“0<=n<readBuff”的容量。EchoServer要求每接收到客户的一行字符串XXX(也就是字符串以“/r/n”结尾),就返回字符串echo:XXX。由于无法保证socketChannel.read(readBuff)方法一次读入一行字符串,因此只好把它每次读入的数据都放到buffer中,当这个buffer中凑足了一行字符串,再把它发送给客户。 

receive()方法的许多代码都涉及对ByteBuffer的三个属性(position、limit和capacity)的操作,图4演示了以上readBuff和buffer变量的三个属性的变化过程。假定SocketChannel的read()方法读入了6个字节,把它存放在readBuff中,并假定buffer中原来有10个字节,buffer.put(readBuff)方法把readBuff中的6个字节拷贝到buffer中,buffer中最后有16个字节。

 d

图4  receive()方法操纵readBuff和buffer的过程

(3)处理写就绪事件

如果SelectionKey的isWritable()方法返回true,就意味着这个SelectionKey所感兴趣的写就绪事件已经发生了。EchoServer类的send()方法负责处理这一事件:

###############

public void send(SelectionKey key) throws IOException
{
 //获得与SelectionKey关联的ByteBuffer
 ByteBuffer buffer = (ByteBuffer)key.attachment();

 //获得与SelectionKey关联的SocketChannel
 SocketChannel socketChannel = (SocketChannel)key.channel();

 buffer.flip(); //把极限设为位置,把位置设为0

 //按照GBK编码,把buffer中的字节转换为字符串
 String data = decode(buffer);

 //如果还没有读到一行数据,就返回
 if( data.indexOf(“/r/n”) == -1 )
  return;

 //截取一行数据
 String outputData = data.substring(0, data.indexOf(“/n”) + 1);

 System.out.print(outputData);

 //把输出的字符串按照GBK编码,转换为字节,把它放在outputBuffer中
 ByteBuffer outputBuffer = encode(“echo:” + outputData);

 //输出outputBuffer中的所有字节
 while( outputBuffer.hasRemaining() )
  socketChannel.write(outputBuffer);

 //把outputData字符串按照GBK编码,转换为字节,把它放在ByteBuffer中
 ByteBuffer temp = encode(outputData);

 //把buffer的位置设为temp的极限
 buffer.position(temp.limit());

 //删除buffer中已经处理的数据
 buffer.compact();

 //如果已经输出了字符串“bye/r/n”,就使SelectionKey失效,并关闭SocketChannel
 if( outputData.equals(“bye/r/n”) )
 {
  key.cancel();
  socketChannel.close();
  System.out.println(“关闭与客户的连接”);
 }
}

################### 

EchoServer的receive()方法把读入的数据都放到一个ByteBuffer中,send()方法就从这个ByteBuffer中取出数据。如果ByteBuffer中还没有一行字符串,就什么也不做,直接退出send()方法;否则,就从ByteBuffer中取出一行字符串XXX,然后向客户发送echo:XXX。接着,send()方法把ByteBuffer中的字符串XXX删除。如果send()方法处理的字符串为“bye/r/n”,就使SelectionKey失效,并关闭SocketChannel,从而断开与客户的连接。

(4)编码与解码

在ByteBuffer中存放的是字节,它表示字符串的编码。而程序需要把字节转换为字符串,才能进行字符串操作,比如判断里面是否包含“/r/n”,以及截取子字符串。EchoServer类的实用方法decode()负责解码,也就是把字节序列转换为字符串: 

 public String decode(ByteBuffer buffer) //解码
{
 CharBuffer charBuffer = charset.decode(buffer);
 return charBuffer.toString();
}

decode()方法中的charset变量是EchoServer类的成员变量,它表示GBK中文编码,它的定义如下:

private Charset charset=Charset.forName(“GBK”);

在send()方法中,当通过SocketChannel的write(ByteBuffer buffer)方法发送数据时,write(ByteBuffer buffer)方法不能直接发送字符串,而只能发送ByteBuffer中的字节。因此程序需要对字符串进行编码,把它们转换为字节序列,放在ByteBuffer中,然后再发送。

ByteBuffer outputBuffer=encode(“echo:”+outputData);

while(outputBuffer.hasRemaining())

  socketChannel.write(outputBuffer);

EchoServer类的实用方法encode()负责编码,也就是把字符串转换为字节序列:

public ByteBuffer encode(String str) //编码
{
  return charset.encode(str);

(5)在非阻塞模式下确保发送一行数据

在send()方法的outputBuffer中存放了字符串echo:XXX的编码。在非阻塞模式下,SocketChannel.write(outputBuffer)方法并不保证一次就把outputBuffer中的所有字节发送完,而是奉行能发送多少就发送多少的原则。如果希望把outputBuffer中的所有字节发送完,需要采用以下循环:

while(outputBuffer.hasRemaining())  //hasRemaining()方法判断是否还有未处理的字节

  socketChannel.write(outputBuffer); 

(6)删除ByteBuffer中的已处理数据

与SelectionKey关联的ByteBuffer附件中存放了读操作与写操作的共享数据。receive()方法把读到的数据放入ByteBuffer,而send()方法从ByteBuffer中一行行地取出数据。当send()方法从ByteBuffer中取出一行字符串XXX,就要把字符串从ByteBuffer中删除。在send()方法中,outputData变量就表示取出的一行字符串XXX,程序先把它编码为字节序列,放在一个名为temp的ByteBuffer中。接着把buffer的位置设为temp的极限,然后调用buffer的compact()方法删除代表字符串XXX的数据。

ByteBuffer temp=encode(outputData);

buffer.position(temp.limit());

buffer.compact();

图5演示了以上代码操纵buffer的过程。图5中假定temp中有10个字节,buffer中本来有16个字节,buffer.compact()方法删除缓冲区开头的10个字节,最后剩下6个字节。

  e

 图5  从buffer中删除已经处理过的一行字符串XXX

下例程1是EchoServer的源程序。

#################

//例程1  EchoServer.java(非阻塞模式)
import java.io.*;
import java.nio.*;
import java.nio.channels.*;
import java.nio.charset.*;
import java.net.*;
import java.util.*;

public class EchoServer
{
 private Selector selector = null;
 private ServerSocketChannel serverSocketChannel = null;
 private int port = 8000;
 private Charset charset = Charset.forName(“GBK”);

 public EchoServer() throws IOException
 {
  selector = Selector.open();
  serverSocketChannel = ServerSocketChannel.open();
  serverSocketChannel.socket().setReuseAddress(true);
  serverSocketChannel.configureBlocking(false);
  serverSocketChannel.socket().bind(new InetSocketAddress(port));
  System.out.println(“服务器启动”);
 }

 public void service() throws IOException
 {
  serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
  while( selector.select() > 0 )
  {
   Set readyKeys = selector.selectedKeys();
   Iterator it = readyKeys.iterator();
   while( it.hasNext() )
   {
    SelectionKey key = null;
    try
    {
     key = (SelectionKey)it.next();
     it.remove();
     if( key.isAcceptable() )
     {
      ServerSocketChannel ssc = (ServerSocketChannel)key.channel();
      SocketChannel socketChannel = (SocketChannel)ssc.accept();
      System.out.println(“接收到客户连接,来自:” + socketChannel.socket().getInetAddress()
        + “:” + socketChannel.socket().getPort());
      socketChannel.configureBlocking(false);
      ByteBuffer buffer = ByteBuffer.allocate(1024);
      socketChannel.register(selector, SelectionKey.OP_READ
        | SelectionKey.OP_WRITE, buffer);
     }
     if( key.isReadable() )
     {
      receive(key);
     }
     if( key.isWritable() )
     {
      send(key);
     }
    }
    catch( IOException e )
    {
     e.printStackTrace();
     try
     {
      if( key != null )
      {
       key.cancel();
       key.channel().close();
      }
     }
     catch( Exception ex )
     {
      e.printStackTrace();
     }
    }
   }//#while
  }//#while
 }

 public void send(SelectionKey key) throws IOException
 {
  ByteBuffer buffer = (ByteBuffer)key.attachment();
  SocketChannel socketChannel = (SocketChannel)key.channel();
  buffer.flip(); //把极限设为位置,把位置设为0
  String data = decode(buffer);
  if( data.indexOf(“/r/n”) == -1 )
   return;
  String outputData = data.substring(0, data.indexOf(“/n”) + 1);
  System.out.print(outputData);
  ByteBuffer outputBuffer = encode(“echo:” + outputData);
  //发送一行字符串
  while( outputBuffer.hasRemaining() )
   socketChannel.write(outputBuffer);
  ByteBuffer temp = encode(outputData);
  buffer.position(temp.limit());
  buffer.compact(); //删除已经处理的字符串
  if( outputData.equals(“bye/r/n”) )
  {
   key.cancel();
   socketChannel.close();
   System.out.println(“关闭与客户的连接”);
  }
 }

 public void receive(SelectionKey key) throws IOException
 {
  ByteBuffer buffer = (ByteBuffer)key.attachment();
  SocketChannel socketChannel = (SocketChannel)key.channel();
  ByteBuffer readBuff = ByteBuffer.allocate(32);
  socketChannel.read(readBuff);
  readBuff.flip();
  buffer.limit(buffer.capacity());
  buffer.put(readBuff); //把读到的数据放到buffer中
 }

 public String decode(ByteBuffer buffer)
 { //解码
  CharBuffer charBuffer = charset.decode(buffer);
  return charBuffer.toString();
 }

 public ByteBuffer encode(String str)
 { //编码
  return charset.encode(str);
 }

 public static void main(String args[]) throws Exception
 {
  EchoServer server = new EchoServer();
  server.service();
 }
}

################## 

2.在EchoServer中混合用阻塞模式与非阻塞模式

在例程1中,EchoServer的ServerSocketChannel以及SocketChannel都被设置为非阻塞模式,这使得接收连接、接收数据和发送数据的操作都采用非阻塞模式,EchoServer采用一个线程同时完成这些操作。假如有许多客户请求连接,可以把接收客户连接的操作单独由一个线程完成,把接收数据和发送数据的操作由另一个线程完成,这可以提高服务器的并发性能。

负责接收客户连接的线程按照阻塞模式工作,如果收到客户连接,就向Selector注册读就绪和写就绪事件,否则进入阻塞状态,直到接收到了客户的连接。负责接收数据和发送数据的线程按照非阻塞模式工作,只有在读就绪或写就绪事件发生时,才执行相应的接收数据和发送数据操作。

例程2是EchoServer类的源程序。其中receive()、send()、decode()和encode()方法的代码与例程1的EchoServer类相同,为了节省篇幅,不再重复显示。

###############

//例程2  EchoServer.java(混合使用阻塞模式与非阻塞模式)
import java.io.*;
import java.nio.*;
import java.nio.channels.*;
import java.nio.charset.*;
import java.net.*;
import java.util.*;

public class EchoServer
{
 private Selector selector = null;
 private ServerSocketChannel serverSocketChannel = null;
 private int port = 8000;
 private Charset charset = Charset.forName(“GBK”);

 public EchoServer() throws IOException
 {
  selector = Selector.open();
  serverSocketChannel = ServerSocketChannel.open();
  serverSocketChannel.socket().setReuseAddress(true);
  serverSocketChannel.socket().bind(new InetSocketAddress(port));
  System.out.println(“服务器启动”);
 }

 public void accept()
 {
  for( ;; )
  {
   try
   {
    SocketChannel socketChannel = serverSocketChannel.accept();
    System.out.println(“接收到客户连接,来自:” + socketChannel.socket().getInetAddress() + “:”
      + socketChannel.socket().getPort());
    socketChannel.configureBlocking(false);
    ByteBuffer buffer = ByteBuffer.allocate(1024);
    synchronized(gate)
    {
     selector.wakeup();
     socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE,
       buffer);
    }
   }
   catch( IOException e )
   {
    e.printStackTrace();
   }
  }
 }

 private Object gate = new Object();

 public void service() throws IOException
 {
  for( ;; )
  {
   synchronized(gate)
   {
   }
   int n = selector.select();
   if( n == 0 )
    continue;
   Set readyKeys = selector.selectedKeys();
   Iterator it = readyKeys.iterator();
   while( it.hasNext() )
   {
    SelectionKey key = null;
    try
    {
     key = (SelectionKey)it.next();
     it.remove();
     if( key.isReadable() )
     {
      receive(key);
     }
     if( key.isWritable() )
     {
      send(key);
     }
    }
    catch( IOException e )
    {
     e.printStackTrace();
     try
     {
      if( key != null )
      {
       key.cancel();
       key.channel().close();
      }
     }
     catch( Exception ex )
     {
      e.printStackTrace();
     }
    }
   }//#while
  }//#while
 }

 public void send(SelectionKey key)throws IOException{…}

 public void receive(SelectionKey key)throws IOException{…}

 public String decode(ByteBuffer buffer){…}

 public ByteBuffer encode(String str){…}

 public static void main(String args[]) throws Exception
 {
  final EchoServer server = new EchoServer();
  Thread accept = new Thread()
  {
   public void run()
   {
    server.accept();
   }
  };
  accept.start();
  server.service();
 }
}

################# 

以上EchoServer类的构造方法与例程1的EchoServer类的构造方法基本相同,唯一的区别是,在本例中, ServerSocketChannel采用默认的阻塞模式,即没有调用以下方法:

serverSocketChannel.configureBlocking(false);

EchoServer类的accept()方法负责接收客户连接,ServerSocketChannel的accept()方法工作于阻塞模式,如果没有客户连接,就会进入阻塞状态,直到接收到了客户连接。接下来调用socketChannel.configureBlocking(false)方法把SocketChannel设为非阻塞模式,然后向Selector注册读就绪和写就绪事件。

EchoServer类的service()方法负责接收和发送数据,它在一个无限for循环中,不断调用Selector的select()方法查寻已经发生的事件,然后作出相应的处理。

在EchoServer类的main()方法中,定义了一个匿名线程(暂且称它为Accept线程),它负责执行EchoServer的accept()方法。执行main()方法的主线程启动了Accept线程后,主线程就开始执行EchoServer的service()方法。因此当EchoServer启动后,共有两个线程在工作,Accept线程负责接收客户连接,主线程负责接收和发送数据:

 ################

public static void main(String args[]) throws Exception
{
 final EchoServer server = new EchoServer();

 Thread accept = new Thread()
 { //定义Accept线程
  public void run()
  {
   server.accept();
  }
 };
 
 accept.start(); //启动Accept线程
 server.service(); //主线程执行service()方法
}
############### 

当Accept线程开始执行以下方法时:

socketChannel.register(selector,SelectionKey.OP_READ|SelectionKey.OP_WRITE,buffer);

如果主线程正好在执行selector.select()方法,而且处于阻塞状态,那么Accept线程也会进入阻塞状态。两个线程都处于阻塞状态,很有可能导致死锁。导致死锁的具体情形为:Selector中尚没有任何注册的事件,即all-keys集合为空,主线程执行selector.select()方法时将进入阻塞状态,只有Accept线程向Selector注册了事件,并且该事件发生后,主线程才会从selector.select()方法中返回。假如Selector中尚没有任何注册的事件,此时Accept线程调用socketChannel.register()方法向Selector注册事件,由于主线程正在selector.select()方法中阻塞,这使得Accept线程也在socketChannel.register()方法中阻塞。Accept线程无法向Selector注册事件,而主线程没有任何事件可以监控,所以这两个线程都将永远阻塞下去。

为了避免死锁,程序必须保证当Accept线程正在通过socketChannel.register()方法向Selector注册事件时,不允许主线程正在selector.select()方法中阻塞。

为了协调Accept线程和主线程,EchoServer类在以下代码前加了同步标记。当Accept线程开始执行这段代码时,必须先获得gate对象的同步锁,然后进入同步代码块,先执行Selector对象的wakeup()方法,假如此时主线程正好在执行selector.select()方法,而且处于阻塞状态,那么主线程就会被唤醒,立即退出selector.select()方法。

 ######################

synchronized(gate)
{ //Accept线程执行这个同步代码块
 selector.wakeup();
 socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, buffer);
}

####################

 
主线程被唤醒后,在下一次循环中又会执行selector.select()方法,为了保证让Accept线程先执行完socketChannel.register()方法,再让主线程执行selector.select()方法,主线程必须先获得gate对象的同步锁:

for(;;){

  //一个空的同步代码块,其作用是为了让主线程等待Accept线程执行完同步代码块

  synchronized(gate){}  //主线程执行这个同步代码块

int n = selector.select();

}

假如Accept线程还没有执行完同步代码块,就不会释放gate对象的同步锁,这使得主线程必须等待片刻,等到Accept线程执行完同步代码块,释放了gate对象的同步锁,主线程才能恢复运行,再次执行selector.select()方法。

3.创建非阻塞的EchoClient

对于客户与服务器之间的通信,按照它们收发数据的协调程度来区分,可分为同步通信和异步通信。同步通信是指甲方向乙方发送了一批数据后,必须等接收到了乙方的响应数据后,再发送下一批数据。异步通信是指发送数据和接收数据的操作互不干扰,各自独立进行。值得注意的是,通信的两端并不要求都采用同样的通信方式,一方采用同步通信方式时,另一方可以采用异步通信方式。

同步通信要求一个I/O操作完成之后,才能完成下一个I/O操作,用阻塞模式更容易实现它。异步通信允许发送数据和接收数据的操作各自独立进行,用非阻塞模式更容易实现它。例程1和例程2介绍的EchoServer都采用异步通信,每次接收数据时,能读到多少数据,就读多少数据,并不要求必须读到一行数据后,才能执行发送数据的操作。

例程3的EchoClient类利用非阻塞模式来实现异步通信。在EchoClient类中,定义了两个ByteBuffer:sendBuffer和receiveBuffer。EchoClient把用户向控制台输入的数据存放到sendBuffer中,并且把sendBuffer中的数据发送给远程服务器;EchoClient把从远程服务器接收到的数据存放在receiveBuffer中,并且把receiveBuffer中的数据打印到控制台。图6显示了这两个Buffer的作用。

 aa 

图6  sendBuffer和receiveBuffer的作用

###############

//例程3  EchoClient.java(非阻塞模式)
import java.net.*;
import java.nio.channels.*;
import java.nio.*;
import java.io.*;
import java.nio.charset.*;
import java.util.*;

public class EchoClient
{
 private SocketChannel socketChannel = null;
 private ByteBuffer sendBuffer = ByteBuffer.allocate(1024);
 private ByteBuffer receiveBuffer = ByteBuffer.allocate(1024);
 private Charset charset = Charset.forName(“GBK”);
 private Selector selector;

 public EchoClient() throws IOException
 {
  socketChannel = SocketChannel.open();
  InetAddress ia = InetAddress.getLocalHost();
  InetSocketAddress isa = new InetSocketAddress(ia, 8000);
  socketChannel.connect(isa); //采用阻塞模式连接服务器
  socketChannel.configureBlocking(false); //设置为非阻塞模式
  System.out.println(“与服务器的连接建立成功”);
  selector = Selector.open();
 }

 public static void main(String args[]) throws IOException
 {
  final EchoClient client = new EchoClient();
  Thread receiver = new Thread()
  { //创建Receiver线程
   public void run()
   {
    client.receiveFromUser(); //接收用户向控制台输入的数据
   }
  };
  receiver.start(); //启动Receiver线程
  client.talk();
 }

 public void receiveFromUser()
 { //接收用户从控制台输入的数据,把它放到sendBuffer中
  try
  {
   BufferedReader localReader = new BufferedReader(new InputStreamReader(System.in));
   String msg = null;
   while( (msg = localReader.readLine()) != null )
   {
    synchronized(sendBuffer)
    {
     sendBuffer.put(encode(msg + “/r/n”));
    }
    if( msg.equals(“bye”) )
     break;
   }
  }
  catch( IOException e )
  {
   e.printStackTrace();
  }
 }

 public void talk() throws IOException
 { //接收和发送数据
  socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
  while( selector.select() > 0 )
  {
   Set readyKeys = selector.selectedKeys();
   Iterator it = readyKeys.iterator();
   while( it.hasNext() )
   {
    SelectionKey key = null;
    try
    {
     key = (SelectionKey)it.next();
     it.remove();
     if( key.isReadable() )
     {
      receive(key);
     }
     if( key.isWritable() )
     {
      send(key);
     }
    }
    catch( IOException e )
    {
     e.printStackTrace();
     try
     {
      if( key != null )
      {
       key.cancel();
       key.channel().close();
      }
     }
     catch( Exception ex )
     {
      e.printStackTrace();
     }
    }
   }//#while
  }//#while
 }

 public void send(SelectionKey key) throws IOException
 {
  //发送sendBuffer中的数据
  SocketChannel socketChannel = (SocketChannel)key.channel();
  synchronized(sendBuffer)
  {
   sendBuffer.flip(); //把极限设为位置,把位置设为零
   socketChannel.write(sendBuffer); //发送数据
   sendBuffer.compact(); //删除已经发送的数据
  }
 }

 public void receive(SelectionKey key) throws IOException
 {
  //接收EchoServer发送的数据,把它放到receiveBuffer中
  //如果receiveBuffer中有一行数据,就打印这行数据,然后把它从receiveBuffer中删除
  SocketChannel socketChannel = (SocketChannel)key.channel();
  socketChannel.read(receiveBuffer);
  receiveBuffer.flip();
  String receiveData = decode(receiveBuffer);
  if( receiveData.indexOf(“/n”) == -1 )
   return;
  String outputData = receiveData.substring(0, receiveData.indexOf(“/n”) + 1);
  System.out.print(outputData);
  if( outputData.equals(“echo:bye/r/n”) )
  {
   key.cancel();
   socketChannel.close();
   System.out.println(“关闭与服务器的连接”);
   selector.close();
   System.exit(0); //结束程序
  }
  ByteBuffer temp = encode(outputData);
  receiveBuffer.position(temp.limit());
  receiveBuffer.compact(); //删除已经打印的数据
 }

 public String decode(ByteBuffer buffer)
 { //解码
  CharBuffer charBuffer = charset.decode(buffer);
  return charBuffer.toString();
 }

 public ByteBuffer encode(String str)
 { //编码
  return charset.encode(str);
 }
}

############## 

在EchoClient类的构造方法中,创建了SocketChannel对象后,该SocketChannel对象采用默认的阻塞模式,随后调用socketChannel.connect(isa)方法,该方法将按照阻塞模式来与远程服务器EchoServer连接,只有当连接建立成功,该connect()方法才会返回。接下来程序再调用socketChannel.configureBlocking(false)方法把SocketChannel设为非阻塞模式,这使得接下来通过SocketChannel来接收和发送数据都会采用非阻塞模式。

socketChannel = SocketChannel.open();

socketChannel.connect(isa);

socketChannel.configureBlocking(false);

EchoClient类共使用了两个线程:主线程和Receiver线程。主线程主要负责接收和发送数据,这些操作由talk()方法实现。Receiver线程负责读取用户向控制台输入的数据,该操作由receiveFromUser()方法实现。

public static void main(String args[])throws IOException{

  final EchoClient client=new EchoClient();

  Thread receiver=new Thread(){  //创建receiver线程

    public void run(){

      client.receiveFromUser();  //读取用户向控制台输入的数据

    }

  };

 

  receiver.start();

  client.talk(); //接收和发送数据

}

receiveFromUser()方法读取用户输入的字符串,把它存放到sendBuffer中。如果用户输入字符串“bye”,就退出receiveFromUser()方法,这使得执行该方法的Receiver线程结束运行。由于主线程在执行send()方法时,也会操纵sendBuffer,为了避免两个线程对共享资源sendBuffer的竞争,receiveFromUser()方法对操纵sendBuffer的代码进行了同步。

BufferedReader localReader=new BufferedReader(new InputStreamReader(System.in));

String msg=null;

while((msg=localReader.readLine())!=null){

synchronized(sendBuffer){

     sendBuffer.put(encode(msg + “/r/n”));

  }

  if(msg.equals(“bye”))

    break;

}

talk()方法向Selector注册读就绪和写就绪事件,然后轮询已经发生的事件,并做出相应的处理。如果发生读就绪事件,就执行receive()方法,如果发生写就绪事件,就执行send()方法。

receive()方法接收EchoServer发回的响应数据,把它们存放在receiveBuffer中。如果receiveBuffer中已经满一行数据,就向控制台打印这一行数据,并且把这行数据从receiveBuffer中删除。如果打印的字符串为“echo:bye/r/n”,就关闭SocketChannel,并且结束程序。

send()方法把sendBuffer中的数据发送给EchoServer,然后删除已经发送的数据。由于Receiver线程以及执行send()方法的主线程都会操纵共享资源sendBuffer,为了避免对共享资源的竞争,对send()方法中操纵sendBuffer的代码进行了同步。

四、结语

本文介绍了用ServerSocketChannel与SocketChannel来创建服务器和客户程序的方法。ServerSocketChannel与SocketChannel既可以工作于阻塞模式,也可以工作于非阻塞模式,默认情况下,它们都工作于阻塞模式,可以调用configureBlocking()方法来重新设置模式。

总的说来,尽管阻塞模式与非阻塞模式都可以同时处理多个客户连接,但阻塞模式需要使用较多的线程,而非阻塞模式只需使用较少的线程,非阻塞模式能更有效地利用CPU,系统开销小,因此有更高的并发性能。

阻塞模式编程相对简单,但是当线程数目很多时,必须处理好线程之间的同步,如果自己编写线程池,要实现健壮的线程池难度较高。阻塞模式比较适用于同步通信,并且通信双方稳定地发送小批量的数据,双方都不需要花很长时间等待对方的回应。假如通信过程中,由于一方迟迟没有回应,导致另一方长时间的阻塞,为了避免线程无限期地阻塞下去,应该设置超时时间,及时中断长时间阻塞的线程。

非阻塞模式编程相对难一些,对ByteBuffer缓冲区的处理比较麻烦。非阻塞模式比较适用于异步通信,并且通信双方发送大批量的数据,尽管一方接收到另一方的数据可能要花一段时间,但在这段时间内,接收方不必傻傻地等待,可以处理其他事情。

转自:http://blog.csdn.net/lowzoom/article/details/5634697

此条目发表在JAVA SE分类目录,贴了, , 标签。将固定链接加入收藏夹。