This discussion is archived
7 Replies Latest reply: Aug 1, 2012 4:27 PM by EJP RSS

Java NIO client

918970 Newbie
Currently Being Moderated
I need to make the server is able to hold about 500 connections and operates on a single thread. The server itself should make all the connections. Where can I find examples of finished implementations?
  • 1. Re: Java NIO client
    EJP Guru
    Currently Being Moderated
    Please clarify your question. Do you want to make a server or a client? What do you mean by 'the server itself should make all the connections'? This doesn't make sense. Servers accept connections, clients make them.
  • 2. Re: Java NIO client
    918970 Newbie
    Currently Being Moderated
    I need to make a client that can hold about 500 connections and working in the same thread

    Edited by: 915967 on 01.08.2012 5:57
  • 3. Re: Java NIO client
    918970 Newbie
    Currently Being Moderated
    I have an example, but it does not work
    import java.io.IOException;
    import java.net.InetAddress;
    import java.net.InetSocketAddress;
    import java.net.Socket;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.SocketChannel;
    import java.nio.channels.spi.SelectorProvider;
    import java.util.*;
    
    public class NioClient implements Runnable {
         // The host:port combination to connect to
         private InetAddress hostAddress;
         private String host;
         private int port;
    
         // The selector we'll be monitoring
         private Selector selector;
    
         // The buffer into which we'll read data when it's available
         private ByteBuffer readBuffer = ByteBuffer.allocate(8192);
    
         // A list of PendingChange instances
         private List pendingChanges = new LinkedList();
    
         // Maps a SocketChannel to a list of ByteBuffer instances
         private Map pendingData = new HashMap();
         
         // Maps a SocketChannel to a RspHandler
         private Map rspHandlers = Collections.synchronizedMap(new HashMap());
         
         public NioClient() {
              try {this.selector = this.initSelector();} catch(IOException e){}
         }
         public void connect(String host, int port, RspHandler handler) throws IOException {
              this.hostAddress = hostAddress;
              this.host = host;
              this.port = port;
              this.send("$Hello |".getBytes(), handler);
         }
    
         public void send(byte[] data, RspHandler handler) throws IOException {
              // Start a new connection
              SocketChannel socket = this.initiateConnection();
              
              // Register the response handler
              this.rspHandlers.put(socket, handler);
              
              // And queue the data we want written
              synchronized (this.pendingData) {
                   List queue = (List) this.pendingData.get(socket);
                   if (queue == null) {
                        queue = new ArrayList();
                        this.pendingData.put(socket, queue);
                   }
                   queue.add(ByteBuffer.wrap(data));
              }
    
              // Finally, wake up our selecting thread so it can make the required changes
              this.selector.wakeup();
              handler.waitForResponse();
         }
    
         public void run() {
              while (true) {
                   try {
                        // Process any pending changes
                        synchronized (this.pendingChanges) {
                             Iterator changes = this.pendingChanges.iterator();
                             while (changes.hasNext()) {
                                  ChangeRequest change = (ChangeRequest) changes.next();
                                  switch (change.type) {
                                  case ChangeRequest.CHANGEOPS:
                                       SelectionKey key = change.socket.keyFor(this.selector);
                                       key.interestOps(change.ops);
                                       break;
                                  case ChangeRequest.REGISTER:
                                       change.socket.register(this.selector, change.ops);
                                       break;
                                  }
                             }
                             this.pendingChanges.clear();
                        }
    
                        // Wait for an event one of the registered channels
                        this.selector.select();
    
                        // Iterate over the set of keys for which events are available
                        Iterator selectedKeys = this.selector.selectedKeys().iterator();
                        while (selectedKeys.hasNext()) {
                             SelectionKey key = (SelectionKey) selectedKeys.next();
                             selectedKeys.remove();
    
                             if (!key.isValid()) {
                                  continue;
                             }
    
                             // Check what event is available and deal with it
                             if (key.isConnectable()) {
                                  this.finishConnection(key);
                             } else if (key.isReadable()) {
                                  this.read(key);
                             } else if (key.isWritable()) {
                                  this.write(key);
                             }
                        }
                   } catch (Exception e) {
                        e.printStackTrace();
                   }
              }
         }
    
         private void read(SelectionKey key) throws IOException {
              SocketChannel socketChannel = (SocketChannel) key.channel();
    
              // Clear out our read buffer so it's ready for new data
              this.readBuffer.clear();
    
              // Attempt to read off the channel
              int numRead;
              try {
                   numRead = socketChannel.read(this.readBuffer);
              } catch (IOException e) {
                   // The remote forcibly closed the connection, cancel
                   // the selection key and close the channel.
                   key.cancel();
                   socketChannel.close();
                   return;
              }
              System.out.println("READ");
    
              if (numRead == -1) {
                   // Remote entity shut the socket down cleanly. Do the
                   // same from our end and cancel the channel.
                   key.channel().close();
                   key.cancel();
                   return;
              }
    
              // Handle the response
              this.handleResponse(socketChannel, this.readBuffer.array(), numRead);
         }
    
         private void handleResponse(SocketChannel socketChannel, byte[] data, int numRead) throws IOException {
              // Make a correctly sized copy of the data before handing it
              // to the client
              byte[] rspData = new byte[numRead];
              System.arraycopy(data, 0, rspData, 0, numRead);
              
              // Look up the handler for this channel
              RspHandler handler = (RspHandler) this.rspHandlers.get(socketChannel);
              
              // And pass the response to it
              if (handler.handleResponse(rspData)) {
                   // The handler has seen enough, close the connection
                   socketChannel.close();
                   socketChannel.keyFor(this.selector).cancel();
              }
         }
    
         private void write(SelectionKey key) throws IOException {
              SocketChannel socketChannel = (SocketChannel) key.channel();
    
              synchronized (this.pendingData) {
                   List queue = (List) this.pendingData.get(socketChannel);
    
                   // Write until there's not more data ...
                   while (!queue.isEmpty()) {
                        ByteBuffer buf = (ByteBuffer) queue.get(0);
                        socketChannel.write(buf);
                        if (buf.remaining() > 0) {
                             // ... or the socket's buffer fills up
                             break;
                        }
                        queue.remove(0);
                   }
    
                   if (queue.isEmpty()) {
                        // We wrote away all data, so we're no longer interested
                        // in writing on this socket. Switch back to waiting for
                        // data.
                        key.interestOps(SelectionKey.OP_READ);
                   }
              }
         }
    
         private void finishConnection(SelectionKey key) throws IOException {
              SocketChannel socketChannel = (SocketChannel) key.channel();
         
              // Finish the connection. If the connection operation failed
              // this will raise an IOException.
              try {
                   socketChannel.finishConnect();
              } catch (IOException e) {
                   // Cancel the channel's registration with our selector
                   System.out.println(e);
                   key.cancel();
                   return;
              }
         
              // Register an interest in writing on this channel
              key.interestOps(SelectionKey.OP_WRITE);
         }
    
         private SocketChannel initiateConnection() throws IOException {
              // Create a non-blocking socket channel
              SocketChannel socketChannel = SocketChannel.open();
              socketChannel.configureBlocking(false);
         
              // Kick off connection establishment
              socketChannel.connect(new InetSocketAddress(this.host, this.port));
         
              // Queue a channel registration since the caller is not the 
              // selecting thread. As part of the registration we'll register
              // an interest in connection events. These are raised when a channel
              // is ready to complete connection establishment.
              synchronized(this.pendingChanges) {
                   this.pendingChanges.add(new ChangeRequest(socketChannel, ChangeRequest.REGISTER, SelectionKey.OP_CONNECT));
              }          
              return socketChannel;
         }
    
         private Selector initSelector() throws IOException {
              // Create a new selector
              return SelectorProvider.provider().openSelector();
         }
    }
    public class RspHandler {
         private byte[] rsp = null;
         
         public synchronized boolean handleResponse(byte[] rsp) {
              this.rsp = rsp;
              this.notify();
              return true;
         }
         
         public synchronized void waitForResponse() {
              while(this.rsp == null) {
                   try {
                        this.wait();
                   } catch (InterruptedException e) {
                   }
              }
              
              System.out.println(new String(this.rsp));
         }
    }
              NioClient NioClient = new NioClient();
              Thread t = new Thread(NioClient);
              t.setDaemon(true);
              t.start();
              RspHandler handler = new RspHandler();          
              NioClient.connect("69.28.156.250", 27040, handler);
              NioClient.connect("72.165.61.188", 27040, handler);
              NioClient.connect("208.111.133.84", 27011, handler);
              NioClient.connect("72.165.61.136", 27012, handler);
              ...

    Edited by: 915967 on 01.08.2012 7:07
  • 4. Re: Java NIO client
    gimbal2 Guru
    Currently Being Moderated
    Well since you want to use NIO you must know everything about it, so find the problem in the example code and fix it.

    You did study NIO right? Perhaps bought a book?
  • 5. Re: Java NIO client
    918970 Newbie
    Currently Being Moderated
    I have read and tried to understand...
  • 6. Re: Java NIO client
    918970 Newbie
    Currently Being Moderated
    I have read and tried to understand... If there was a similar working example, then maybe I would be able to understand where the mistake
  • 7. Re: Java NIO client
    EJP Guru
    Currently Being Moderated
    I can see a lot of problems in that code, including at least one compile error, but you do need to tell us what you mean by "doesn't work".

    1. Closing the channel cancels the key. You don't need to do both.

    2. finishConnect() can return false, which means the channel isn't connected yet, and it can throw an IOException, which means it will never connect, which means you should log it and close the channel, not cancel the key. If it returns true, you must deregister OP_CONNECT.

    3. Never ignore exceptions.

    4. Never ignore the result of a write. If it returns zero you must register OP_WRITE, stop writing to the channel, return to the select loop, and when OP_WRITE fires retry the write, and if and only if it completes deregister OP_WRITE. This is the only circumstance under which you should register OP_WRITE: the rest of the time you should just write when you have something to write. The reason is that OP_WRITE is always ready unless the socket send buffer in the kernel is full.

    5. You need a read buffer per channel. You can keep it as the key attachment so you know where to find it.

    6. An IOException when reading can mean a lot more things than just the peer forcibly closed the connection. Don't write inaccurate comments.

    7. If you get an IOException when writing you must close the channel.

Legend

  • Correct Answers - 10 points
  • Helpful Answers - 5 points