This discussion is archived
3 Replies Latest reply: Jul 23, 2012 2:41 AM by tmazight RSS

send a message and the disconnect Client with my architectur 'NIO'

tmazight Newbie
Currently Being Moderated
Hi,

In a special situation, i want to send a message to client and after that disconnect him, the way I'm doing it now disconnect client before sending the message.

My Server methods:
public void startServer() throws IOException {
          while (true) {
               try {
                    // Process any pending changes
                    synchronized (this.changeRequests) {
                         Iterator<ChangeRequest> changes = this.changeRequests
                                   .iterator();
                         while (changes.hasNext()) {
                              ChangeRequest change = (ChangeRequest) changes.next();
                              switch (change.type) {
                              case ChangeRequest.CHANGEOPS:
                                   SelectionKey key = change.socket.keyFor(selector);

                                   // if (key != null) {
                                   if (key.isValid())
                                        key.interestOps(change.ops);
                                   // }
                              }
                         }
                         this.changeRequests.clear();
                    }
                    numberOfKeys = 0;
                    numberOfKeys = selector.select();
                    if (numberOfKeys == 0) {
                         // Fermeture des connexion si la fermeture a été demandé et
                         // qu'il y a aucun événement

                         continue; // None of request available
                    }
                    // Get iterator through the selected keys list
                    Iterator<SelectionKey> iterKeys = selector.selectedKeys()
                              .iterator();
                    while (iterKeys.hasNext()) {

                         SelectionKey selectedKey = (SelectionKey) iterKeys.next();
                         iterKeys.remove();
                         // Verify the key validity
                         // if (!selectedKey.isValid()) {
                         // logger.error("Received key is invalid");
                         // continue;
                         // }

                         try {
                              if (selectedKey.isValid() && selectedKey.isAcceptable()) {
                                   // Accept the client request
                                   accept(selectedKey);
                              }
                              if (selectedKey.isValid() && selectedKey.isReadable()) {
                                   readMessage(selectedKey);
                              }
                              if (selectedKey.isValid() && selectedKey.isWritable()) {
                                   write(selectedKey);
                              }

                         } catch (IOException e) {
                              logger.error("Exception au cours de l'iteration sur le resultat du select(): Fermeture du canal ");
                              selectedKey.cancel();
                              selectedKey.channel().close();
                              e.printStackTrace();
                         }
                    }

               } catch (Exception e) {
                    logger.error("Exception au cours du changement des option d'interêt pour les clés");
                    logger.error(e.getMessage());
                    e.printStackTrace();
               }

          }

     }
Write method
private void write(SelectionKey key) throws IOException {
          SocketChannel socketChannel = (SocketChannel) key.channel();

          synchronized (this.pendingData) {
               Queue<ByteBuffer> queue = (LinkedList<ByteBuffer>) this.pendingData
                         .get(socketChannel);
               nbrWrites++;
               // Write until there's not more data ...
               while (!queue.isEmpty()) {
                    ByteBuffer buf = (ByteBuffer) queue.remove();
                    socketChannel.write(buf);
               }
               key.interestOps(SelectionKey.OP_READ);
               selector.wakeup();
          }
     }
Send method which add the message to a queue for this socket, and add the change interest to the key
     public void send(SocketChannel socket, byte[] data) {
          synchronized (this.changeRequests) {
               // Indicate we want the interest ops set changed
               this.changeRequests.add(new ChangeRequest(socket,
                         ChangeRequest.CHANGEOPS, SelectionKey.OP_WRITE));
          }
          // And queue the data we want written
          synchronized (this.pendingData) {
               Queue<ByteBuffer> queue = (LinkedList<ByteBuffer>) this.pendingData
                         .get(socket);

               queue.add(ByteBuffer.wrap(data));
          }

          selector.wakeup();
     }
CloseConnection method called when i want to close someConnection

     
public void closeConnection(Voie voie) {

          try {
               if (voie.getChannelPrio() != null && voie.getKeyPrio() != null) {
                    voie.getChannelPrio().close();
               }
               if (voie.getChannelNPrio() != null && voie.getKeyNPrio() != null) {
                    voie.getChannelNPrio().close();
               }
               ServeurNonBloquant.clients.remove(voie.getAdresse());
               selector.wakeup();
          } catch (IOException e) {
               e.printStackTrace();
          }
     }
Here is the method of my worker thread that call the send on the server Class, and call close connection after.
public void verifyConnection(ServeurNonBloquant server, byte[] data,
               SelectionKey key) {
          Voie voie = ServeurNonBloquant.clients.get(((SocketChannel) key
                    .channel()).socket().getInetAddress());
          ByteBuffer buffer;
          //Si flux sur le cannal prioritaire
          if (((SocketChannel) key.channel()).socket().getLocalPort() == ServeurNonBloquant.PORT_PRIO) {
               //Phase de connexion
               if (voie.getPhase() == Voie.PH_STARTUP) {

                    if (!ConnectionHandler.isStartUpMessage(data)) {
                              ServeurNonBloquant.getInstance().closeConnection(voie);               
                         //si le serveur a atteint le nombre MAX de clients
                    } else if (ServeurNonBloquant.clients.size() >= ServeurNonBloquant.MAX_CLIENT) {
                         
                         CompteRenduStartup compte = new CompteRenduStartup(
                                   BinaryUtils.getIntervalByteFromArray(46, 47, data),
                                   (byte) 0x03, BinaryUtils.getIntervalByteFromArray(
                                             4, 5, data));
                         buffer = ByteBuffer
                                   .allocate(CompteRenduStartup.CR_STARTUP_SIZE);
                         buffer = compte.getStartupMessage();
                                        *//add the message to be sent to a queue and notify "run" to process this
                         synchronized (queue) {
                              queue.add(new ServerDataEvent(server,
                                        (SocketChannel) key.channel(), buffer.array()));
                              queue.notify();
                         }
                                       *//call close connection on the server class*
                         ServeurNonBloquant.getInstance().closeConnection(voie);
                                     ..............
                                    ...........
the "run" of the worker thread, which is calling send method
public void run() {
          ServerDataEvent dataEvent;
          while (true) {
               // Wait for data to become available
               synchronized (queue) {
                    while (queue.isEmpty()) {
                         try {
                              queue.wait();
                         } catch (InterruptedException e) {
                         }
                    }
                    dataEvent = (ServerDataEvent) queue.remove();
               }
               // Return to sender
               dataEvent.server.send(dataEvent.socket, dataEvent.data);
          }
     }
As I said, I add the message to queue on worker thread, and then call closeConnection, but the connection is closed before the message could be sent.

Do you see any issue for that plz?

Legend

  • Correct Answers - 10 points
  • Helpful Answers - 5 points