0 Replies Latest reply: Mar 21, 2013 8:44 AM by 998334 RSS

    Unable to read AsynchronousSocketChannel after getting timeout exception

    998334
      i am trying to read some data on the server using a AsynchronousSocketChannel .

      I have this scenario :

      1- Call channel.read : this will print "Received"
      2- Call channel.read : this should fail due to timeout exception.
      3- Call channel.read : I am sending data in this case but I got the exception "Reading not allowed due to timeout or cancellation"

      Below is the source code

      My environment is :
      OS : Windows 7
      JDK :
      Java(TM) SE Runtime Environment (build 1.7.0-b147)
      Java HotSpot(TM) 64-Bit Server VM (build 21.0-b17, mixed mode)


      package com.qmic.asynchronous;

      import java.io.InputStream;
      import java.io.PrintWriter;
      import java.net.InetAddress;
      import java.net.InetSocketAddress;
      import java.net.Socket;
      import java.nio.ByteBuffer;
      import java.nio.channels.AsynchronousChannelGroup;
      import java.nio.channels.AsynchronousServerSocketChannel;
      import java.nio.channels.AsynchronousSocketChannel;
      import java.nio.channels.CompletionHandler;
      import java.util.concurrent.ConcurrentLinkedQueue;
      import java.util.concurrent.ExecutorService;
      import java.util.concurrent.Executors;
      import java.util.concurrent.Future;
      import java.util.concurrent.TimeUnit;

      public class NIOChannel {

           public static void main(String[] args) {
                try{
                     MyAsynchronousTcpServer server = new MyAsynchronousTcpServer();
                     Thread serverThread = new Thread(server);
                     serverThread.start();

                     Socket socket = new Socket(InetAddress.getByName("127.0.0.1"), 9001, InetAddress.getByName("127.0.0.1"), 9003);
                     socket.setSoTimeout(30000);
                     socket.setKeepAlive(true);

                     if (socket.isConnected()){
                          PrintWriter dos = new PrintWriter(socket.getOutputStream());
                          InputStream input = socket.getInputStream();
                          byte[] buffer = new byte[1024];

                          // Data of the first call
                          dos.print("ABCDEFGH");
                          dos.flush();

                          byte[] data = new byte[50];
                          socket.getInputStream().read(data);
                          // Print the result from server, written in handler of first read operation.
                          System.out.println("Result is:" + new String(data));
                          
                          if (data.length > 0){
                               Thread.sleep(10000); // Wait for 10 Seconds so the second read on the server will fail.
                          }

                          // Data of the third call
                          dos.print("ABCDEFGH");
                          dos.flush();
                     }
                }catch(Exception ex){
                     ex.printStackTrace();
                }
           }
      }

           class MyAsynchronousTcpServer implements Runnable{
                final int SERVER_PORT = 9001;
                final String SERVER_IP = "127.0.0.1";
                private int THREAD_POOL_SIZE = 10;
                private int DEFAULT_THREAD_POOL_SIZE = 2;

                ConcurrentLinkedQueue<ListenHandler> handlers = new ConcurrentLinkedQueue<ListenHandler>();
                boolean shutDown = false;

                public void run(){
                     //create asynchronous server-socket channel bound to the default group
                     try {
                          // Create the ChannelGroup(thread pool).
                          ExecutorService executorService = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
                          AsynchronousChannelGroup group = AsynchronousChannelGroup.withCachedThreadPool(executorService, DEFAULT_THREAD_POOL_SIZE);

                          AsynchronousServerSocketChannel asynchronousServerSocketChannel =
                               AsynchronousServerSocketChannel.open(group);

                          if (asynchronousServerSocketChannel.isOpen())
                          {
                               //bind to local address
                               asynchronousServerSocketChannel.bind(new InetSocketAddress(SERVER_IP, SERVER_PORT));

                               while(!shutDown){
                                    Future<AsynchronousSocketChannel> asynchronousSocketChannelFuture =asynchronousServerSocketChannel.accept();
                                    final AsynchronousSocketChannel channel = asynchronousSocketChannelFuture.get(); // Timeout can be specified in the get() function, thread is blocked here
                                    System.out.println("New channel created successfully");

                                    // First call, should print Result of call 1 is : 10 (size of ABCDEFGH)
                                    ByteBuffer buffer1 = ByteBuffer.allocateDirect(250);
                                    channel.read(buffer1, 5, TimeUnit.SECONDS, null, new CompletionHandler<Integer, Object>() {

                                         @Override
                                         public void completed(Integer result, Object attachment) {
                                              System.out.println("Result of call 1 is :" + result);
                                              ByteBuffer response = ByteBuffer.wrap("Received".getBytes());
                                              channel.write(response);
                                         }

                                         @Override
                                         public void failed(Throwable exc, Object attachment) {
                                              exc.printStackTrace();
                                         }

                                    });

                                    Thread.sleep(3000);

                                    // Second read, should print error InterruptedByTimeoutException
                                    ByteBuffer buffer2 = ByteBuffer.allocateDirect(250);
                                    channel.read(buffer2, 5, TimeUnit.SECONDS, null, new CompletionHandler<Integer, Object>() {

                                         @Override
                                         public void completed(Integer result, Object attachment) {
                                              System.out.println("Result of call 2 is :" + result);
                                         }

                                         @Override
                                         public void failed(Throwable exc, Object attachment) {
                                              exc.printStackTrace();
                                         }
                                    });

                                    
                                    Thread.sleep(9000);
                                    // Second read operation was failed, no try to read again . AN EXCEPTION IS THROWN HERE : Reading not allowed due to timeout or cancellation
                                    ByteBuffer buffer3 = ByteBuffer.allocateDirect(250);
                                    channel.read(buffer3, 5, TimeUnit.SECONDS, null, new CompletionHandler<Integer, Object>() {

                                         @Override
                                         public void completed(Integer result, Object attachment) {
                                              System.out.println("Result of call 3 is :" + result);
                                         }

                                         @Override
                                         public void failed(Throwable exc, Object attachment) {
                                              exc.printStackTrace();
                                         }
                                    });

                               }
                          }
                          else
                          {
                               System.out.println("The asynchronous server-socket channel cannot be opened!");
                          }
                     }
                     catch (Exception ex)
                     {
                          ex.printStackTrace();
                          System.err.println(ex);
                     }
                }
           }